编程 Elasticsearch写入、读取、更新、删除以及批量操作(Golang)

2024-11-18 17:43:54 +0800 CST views 765

Elasticsearch写入、读取、更新、删除以及批量操作(Golang)

1. 背景

加入自动驾驶事业部数仓团队后,工作围绕数据和检索展开,熟练掌握 Elasticsearch 的基本操作成为提高工作效率的关键。在项目中使用官方提供的包 github.com/elastic/go-elasticsearch,但其底层设计复杂,开发成本较高,因此使用了三方包 github.com/olivere/elastic(版本 7.0.32),以实现更简便的开发。


2. Elasticsearch基础操作

2.1 创建 Elasticsearch 客户端

定义一个 ES 实例结构:

type EsInstance struct {
  Client *elastic.Client    // es客户端
  Index  map[string]Indexes // 所有索引
}
 
type Indexes struct {
  Name    string `json:"name"`    // 索引名称
  Mapping string `json:"mapping"` // 索引结构
}

创建实例:

func NewEsInstance() (*EsInstance, error) {
  client, err := elastic.NewClient(
    elastic.SetURL("http://127.0.0.1:9200"), 
    elastic.SetBasicAuth("user_name", "user_password"),
    elastic.SetSniff(false), // 跳过ip检查
  )
  if err != nil {
    return &EsInstance{}, err
  }
  return &EsInstance{
    Client: client,
    Index:  map[string]Indexes{},
  }, nil
}

2.2 创建、删除索引

创建索引:

func (es *EsInstance) CreateIndex(index string, mapping string) error {
  exists, err := es.Client.IndexExists(index).Do(context.Background())
  if err != nil {
    return err
  }
  if exists {
    return nil
  }
  createIndex, err := es.Client.CreateIndex(index).BodyString(mapping).Do(context.Background())
  if err != nil {
    return err
  }
  if !createIndex.Acknowledged {
    return errors.New("create index failed")
  }
  es.Index[index] = Indexes{Name: index, Mapping: mapping}
  return nil
}

删除索引:

func (es *EsInstance) DeleteIndex(index string) error {
  exists, err := es.Client.IndexExists(index).Do(context.Background())
  if err != nil {
    return err
  }
  if !exists {
    return nil
  }
  deleteIndex, err := es.Client.DeleteIndex(index).Do(context.Background())
  if err != nil {
    return err
  }
  if !deleteIndex.Acknowledged {
    return errors.New("delete index failed")
  }
  return nil
}

2.3 插入文档

定义文档结构:

type Record struct {
  Index  string `json:"_index"`
  Type   string `json:"_type"`
  Id     string `json:"_id"`
  Source Source `json:"source"`
}

type Source struct {
  Id   string `json:"id"`
  Name string `json:"name"`
  Age  int    `json:"age"`
  Sex  string `json:"sex"`
}

插入文档:

func (es *EsInstance) InsertOneRecord(indexName string, record Record) error {
  _, err := es.Client.Index().Index(indexName).Id(record.Source.Id).BodyJson(record).Do(context.Background())
  return err
}

2.4 查询文档

根据文档 id 获取文档:

func (es *EsInstance) GetOneRecord(indexName, id string) (*Record, error) {
  record := &Record{}
  result, err := es.Client.Get().Index(indexName).Id(id).Do(context.Background())
  if err != nil {
    return record, err
  }
  if result.Found {
    err := json.Unmarshal(result.Source, &record.Source)
  }
  record.Id = result.Id
  record.Index = result.Index
  record.Type = result.Type
  return record, nil
}

2.5 删除文档

根据文档 id 删除文档:

func (es *EsInstance) DeleteOneRecord(indexName, id string) error {
  _, err := es.Client.Delete().Index(indexName).Id(id).Do(context.Background())
  return err
}

2.6 更新文档

根据文档 id 更新文档:

func (es *EsInstance) UpdateOneRecord(indexName, id string, record Source) error {
  _, err := es.Client.Update().Index(indexName).Id(id).Doc(record).Do(context.Background())
  return err
}

2.7 逻辑查询

自定义 DSL 查询:

func (es *EsInstance) Search(indexName string, size int, query elastic.Query) ([]Record, error) {
  records := make([]Record, 0)
  searchResult, err := es.Client.Search().Index(indexName).Query(query).Size(size).Do(context.Background())
  if err != nil {
    return records, err
  }
  if searchResult.Hits.TotalHits.Value > 0 {
    for _, hit := range searchResult.Hits.Hits {
      record := &Record{}
      json.Unmarshal(hit.Source, &record.Source)
      record.Id = hit.Id
      records = append(records, *record)
    }
  }
  return records, nil
}

2.8 滚动查询

滚动查询数据:

func (es *EsInstance) SearchScroll(indexName string, size int, query elastic.Query) ([]Record, error) {
  records := make([]Record, 0)
  searchResult, err := es.Client.Scroll(indexName).Query(query).Size(size).Do(context.Background())
  scrollId := searchResult.ScrollId

  for {
    scrollResult, err := es.Client.Scroll().ScrollId(scrollId).Do(context.Background())
    if err != nil || scrollResult.Hits.TotalHits.Value == 0 {
      break
    }
    scrollId = scrollResult.ScrollId
    for _, hit := range scrollResult.Hits.Hits {
      record := &Record{}
      json.Unmarshal(hit.Source, &record.Source)
      record.Id = hit.Id
      records = append(records, *record)
    }
  }
  es.Client.ClearScroll(scrollId).Do(context.Background())
  return records, err
}

2.9 批量插入

批量插入数据:

func (es *EsInstance) BatchInsertRecords(indexName string, records []Record) error {
  req := es.Client.Bulk().Index(indexName)
  for _, record := range records {
    doc := elastic.NewBulkIndexRequest().Id(record.Id).Doc(&record.Source)
    req.Add(doc)
  }
  _, err := req.Do(context.Background())
  return err
}

2.10 批量更新

批量更新数据:

func (es *EsInstance) BatchUpdateRecords(indexName string, records []Record) error {
  req := es.Client.Bulk().Index(indexName)
  for _, record := range records {
    doc := elastic.NewBulkUpdateRequest().Id(record.Id).Doc(&record.Source)
    req.Add(doc)
  }
  _, err := req.Do(context.Background())
  return err
}

2.11 批量删除

批量删除数据:

func (es *EsInstance) BatchDeleteRecords(indexName string, records []Record) error {
  req := es.Client.Bulk().Index(indexName)
  for _, record := range records {
    doc := elastic.NewBulkDeleteRequest().Id(record.Id)
    req.Add(doc)
  }
  _, err := req.Do(context.Background())
  return err
}

3. 检索条件设置

示例:

boolQuery := elastic.NewBoolQuery()
boolQuery.Filter(elastic.NewMatchQuery("tag_name", "turn_left"))
boolQuery.Filter(elastic.NewRangeQuery("start_time").Gte(1723737600000))

4. 测试

检索某辆车、某个时间段的数据:

boolQuery.Filter(elastic.NewMatchQuery("tag_name", "turn_left"))
boolQuery.Filter(elastic.NewMatchQuery("car_id", "京A0001"))
boolQuery.Filter(elastic.NewRangeQuery("start_time").Gte(1723737600000))
boolQuery.Filter(elastic.NewRangeQuery("end_time").Lte(1723823999000))

SearchScroll("index_2024", 4000, boolQuery)

5. 总结

本文介绍了 Elasticsearch 的基础操作,包括创建、删除、更新、查询、批量操作等。使用三方包 github.com/olivere/elastic 能有效降低开发成本,提升工作效率。

复制全文 生成海报 Elasticsearch Golang 数据处理 开发工具

推荐文章

使用 Nginx 获取客户端真实 IP
2024-11-18 14:51:58 +0800 CST
nginx反向代理
2024-11-18 20:44:14 +0800 CST
一些实用的前端开发工具网站
2024-11-18 14:30:55 +0800 CST
JavaScript设计模式:装饰器模式
2024-11-19 06:05:51 +0800 CST
java MySQL如何获取唯一订单编号?
2024-11-18 18:51:44 +0800 CST
#免密码登录服务器
2024-11-19 04:29:52 +0800 CST
api远程把word文件转换为pdf
2024-11-19 03:48:33 +0800 CST
Go的父子类的简单使用
2024-11-18 14:56:32 +0800 CST
如何在Vue 3中使用Ref访问DOM元素
2024-11-17 04:22:38 +0800 CST
如何在Vue3中处理全局状态管理?
2024-11-18 19:25:59 +0800 CST
Rust开发笔记 | Rust的交互式Shell
2024-11-18 19:55:44 +0800 CST
为什么大厂也无法避免写出Bug?
2024-11-19 10:03:23 +0800 CST
Vue3中的Store模式有哪些改进?
2024-11-18 11:47:53 +0800 CST
php微信文章推广管理系统
2024-11-19 00:50:36 +0800 CST
程序员茄子在线接单