编程 Elasticsearch写入、读取、更新、删除以及批量操作(Golang)

2024-11-18 17:43:54 +0800 CST views 1122

Elasticsearch写入、读取、更新、删除以及批量操作(Golang)

1. 背景

加入自动驾驶事业部数仓团队后,工作围绕数据和检索展开,熟练掌握 Elasticsearch 的基本操作成为提高工作效率的关键。在项目中使用官方提供的包 github.com/elastic/go-elasticsearch,但其底层设计复杂,开发成本较高,因此使用了三方包 github.com/olivere/elastic(版本 7.0.32),以实现更简便的开发。


2. Elasticsearch基础操作

2.1 创建 Elasticsearch 客户端

定义一个 ES 实例结构:

type EsInstance struct {
  Client *elastic.Client    // es客户端
  Index  map[string]Indexes // 所有索引
}
 
type Indexes struct {
  Name    string `json:"name"`    // 索引名称
  Mapping string `json:"mapping"` // 索引结构
}

创建实例:

func NewEsInstance() (*EsInstance, error) {
  client, err := elastic.NewClient(
    elastic.SetURL("http://127.0.0.1:9200"), 
    elastic.SetBasicAuth("user_name", "user_password"),
    elastic.SetSniff(false), // 跳过ip检查
  )
  if err != nil {
    return &EsInstance{}, err
  }
  return &EsInstance{
    Client: client,
    Index:  map[string]Indexes{},
  }, nil
}

2.2 创建、删除索引

创建索引:

func (es *EsInstance) CreateIndex(index string, mapping string) error {
  exists, err := es.Client.IndexExists(index).Do(context.Background())
  if err != nil {
    return err
  }
  if exists {
    return nil
  }
  createIndex, err := es.Client.CreateIndex(index).BodyString(mapping).Do(context.Background())
  if err != nil {
    return err
  }
  if !createIndex.Acknowledged {
    return errors.New("create index failed")
  }
  es.Index[index] = Indexes{Name: index, Mapping: mapping}
  return nil
}

删除索引:

func (es *EsInstance) DeleteIndex(index string) error {
  exists, err := es.Client.IndexExists(index).Do(context.Background())
  if err != nil {
    return err
  }
  if !exists {
    return nil
  }
  deleteIndex, err := es.Client.DeleteIndex(index).Do(context.Background())
  if err != nil {
    return err
  }
  if !deleteIndex.Acknowledged {
    return errors.New("delete index failed")
  }
  return nil
}

2.3 插入文档

定义文档结构:

type Record struct {
  Index  string `json:"_index"`
  Type   string `json:"_type"`
  Id     string `json:"_id"`
  Source Source `json:"source"`
}

type Source struct {
  Id   string `json:"id"`
  Name string `json:"name"`
  Age  int    `json:"age"`
  Sex  string `json:"sex"`
}

插入文档:

func (es *EsInstance) InsertOneRecord(indexName string, record Record) error {
  _, err := es.Client.Index().Index(indexName).Id(record.Source.Id).BodyJson(record).Do(context.Background())
  return err
}

2.4 查询文档

根据文档 id 获取文档:

func (es *EsInstance) GetOneRecord(indexName, id string) (*Record, error) {
  record := &Record{}
  result, err := es.Client.Get().Index(indexName).Id(id).Do(context.Background())
  if err != nil {
    return record, err
  }
  if result.Found {
    err := json.Unmarshal(result.Source, &record.Source)
  }
  record.Id = result.Id
  record.Index = result.Index
  record.Type = result.Type
  return record, nil
}

2.5 删除文档

根据文档 id 删除文档:

func (es *EsInstance) DeleteOneRecord(indexName, id string) error {
  _, err := es.Client.Delete().Index(indexName).Id(id).Do(context.Background())
  return err
}

2.6 更新文档

根据文档 id 更新文档:

func (es *EsInstance) UpdateOneRecord(indexName, id string, record Source) error {
  _, err := es.Client.Update().Index(indexName).Id(id).Doc(record).Do(context.Background())
  return err
}

2.7 逻辑查询

自定义 DSL 查询:

func (es *EsInstance) Search(indexName string, size int, query elastic.Query) ([]Record, error) {
  records := make([]Record, 0)
  searchResult, err := es.Client.Search().Index(indexName).Query(query).Size(size).Do(context.Background())
  if err != nil {
    return records, err
  }
  if searchResult.Hits.TotalHits.Value > 0 {
    for _, hit := range searchResult.Hits.Hits {
      record := &Record{}
      json.Unmarshal(hit.Source, &record.Source)
      record.Id = hit.Id
      records = append(records, *record)
    }
  }
  return records, nil
}

2.8 滚动查询

滚动查询数据:

func (es *EsInstance) SearchScroll(indexName string, size int, query elastic.Query) ([]Record, error) {
  records := make([]Record, 0)
  searchResult, err := es.Client.Scroll(indexName).Query(query).Size(size).Do(context.Background())
  scrollId := searchResult.ScrollId

  for {
    scrollResult, err := es.Client.Scroll().ScrollId(scrollId).Do(context.Background())
    if err != nil || scrollResult.Hits.TotalHits.Value == 0 {
      break
    }
    scrollId = scrollResult.ScrollId
    for _, hit := range scrollResult.Hits.Hits {
      record := &Record{}
      json.Unmarshal(hit.Source, &record.Source)
      record.Id = hit.Id
      records = append(records, *record)
    }
  }
  es.Client.ClearScroll(scrollId).Do(context.Background())
  return records, err
}

2.9 批量插入

批量插入数据:

func (es *EsInstance) BatchInsertRecords(indexName string, records []Record) error {
  req := es.Client.Bulk().Index(indexName)
  for _, record := range records {
    doc := elastic.NewBulkIndexRequest().Id(record.Id).Doc(&record.Source)
    req.Add(doc)
  }
  _, err := req.Do(context.Background())
  return err
}

2.10 批量更新

批量更新数据:

func (es *EsInstance) BatchUpdateRecords(indexName string, records []Record) error {
  req := es.Client.Bulk().Index(indexName)
  for _, record := range records {
    doc := elastic.NewBulkUpdateRequest().Id(record.Id).Doc(&record.Source)
    req.Add(doc)
  }
  _, err := req.Do(context.Background())
  return err
}

2.11 批量删除

批量删除数据:

func (es *EsInstance) BatchDeleteRecords(indexName string, records []Record) error {
  req := es.Client.Bulk().Index(indexName)
  for _, record := range records {
    doc := elastic.NewBulkDeleteRequest().Id(record.Id)
    req.Add(doc)
  }
  _, err := req.Do(context.Background())
  return err
}

3. 检索条件设置

示例:

boolQuery := elastic.NewBoolQuery()
boolQuery.Filter(elastic.NewMatchQuery("tag_name", "turn_left"))
boolQuery.Filter(elastic.NewRangeQuery("start_time").Gte(1723737600000))

4. 测试

检索某辆车、某个时间段的数据:

boolQuery.Filter(elastic.NewMatchQuery("tag_name", "turn_left"))
boolQuery.Filter(elastic.NewMatchQuery("car_id", "京A0001"))
boolQuery.Filter(elastic.NewRangeQuery("start_time").Gte(1723737600000))
boolQuery.Filter(elastic.NewRangeQuery("end_time").Lte(1723823999000))

SearchScroll("index_2024", 4000, boolQuery)

5. 总结

本文介绍了 Elasticsearch 的基础操作,包括创建、删除、更新、查询、批量操作等。使用三方包 github.com/olivere/elastic 能有效降低开发成本,提升工作效率。

复制全文 生成海报 Elasticsearch Golang 数据处理 开发工具

推荐文章

jQuery中向DOM添加元素的多种方法
2024-11-18 23:19:46 +0800 CST
WebSQL数据库:HTML5的非标准伴侣
2024-11-18 22:44:20 +0800 CST
PHP 代码功能与使用说明
2024-11-18 23:08:44 +0800 CST
如何实现虚拟滚动
2024-11-18 20:50:47 +0800 CST
120个实用CSS技巧汇总合集
2025-06-23 13:19:55 +0800 CST
PHP 唯一卡号生成
2024-11-18 21:24:12 +0800 CST
Go配置镜像源代理
2024-11-19 09:10:35 +0800 CST
js函数常见的写法以及调用方法
2024-11-19 08:55:17 +0800 CST
关于 `nohup` 和 `&` 的使用说明
2024-11-19 08:49:44 +0800 CST
Nginx 状态监控与日志分析
2024-11-19 09:36:18 +0800 CST
ElasticSearch集群搭建指南
2024-11-19 02:31:21 +0800 CST
php内置函数除法取整和取余数
2024-11-19 10:11:51 +0800 CST
PHP 如何输出带微秒的时间
2024-11-18 01:58:41 +0800 CST
GROMACS:一个美轮美奂的C++库
2024-11-18 19:43:29 +0800 CST
windows下mysql使用source导入数据
2024-11-17 05:03:50 +0800 CST
使用 sync.Pool 优化 Go 程序性能
2024-11-19 05:56:51 +0800 CST
Golang 中你应该知道的 Range 知识
2024-11-19 04:01:21 +0800 CST
程序员茄子在线接单