Elasticsearch写入、读取、更新、删除以及批量操作(Golang)
1. 背景
加入自动驾驶事业部数仓团队后,工作围绕数据和检索展开,熟练掌握 Elasticsearch 的基本操作成为提高工作效率的关键。在项目中使用官方提供的包 github.com/elastic/go-elasticsearch
,但其底层设计复杂,开发成本较高,因此使用了三方包 github.com/olivere/elastic
(版本 7.0.32),以实现更简便的开发。
2. Elasticsearch基础操作
2.1 创建 Elasticsearch 客户端
定义一个 ES 实例结构:
type EsInstance struct {
Client *elastic.Client // es客户端
Index map[string]Indexes // 所有索引
}
type Indexes struct {
Name string `json:"name"` // 索引名称
Mapping string `json:"mapping"` // 索引结构
}
创建实例:
func NewEsInstance() (*EsInstance, error) {
client, err := elastic.NewClient(
elastic.SetURL("http://127.0.0.1:9200"),
elastic.SetBasicAuth("user_name", "user_password"),
elastic.SetSniff(false), // 跳过ip检查
)
if err != nil {
return &EsInstance{}, err
}
return &EsInstance{
Client: client,
Index: map[string]Indexes{},
}, nil
}
2.2 创建、删除索引
创建索引:
func (es *EsInstance) CreateIndex(index string, mapping string) error {
exists, err := es.Client.IndexExists(index).Do(context.Background())
if err != nil {
return err
}
if exists {
return nil
}
createIndex, err := es.Client.CreateIndex(index).BodyString(mapping).Do(context.Background())
if err != nil {
return err
}
if !createIndex.Acknowledged {
return errors.New("create index failed")
}
es.Index[index] = Indexes{Name: index, Mapping: mapping}
return nil
}
删除索引:
func (es *EsInstance) DeleteIndex(index string) error {
exists, err := es.Client.IndexExists(index).Do(context.Background())
if err != nil {
return err
}
if !exists {
return nil
}
deleteIndex, err := es.Client.DeleteIndex(index).Do(context.Background())
if err != nil {
return err
}
if !deleteIndex.Acknowledged {
return errors.New("delete index failed")
}
return nil
}
2.3 插入文档
定义文档结构:
type Record struct {
Index string `json:"_index"`
Type string `json:"_type"`
Id string `json:"_id"`
Source Source `json:"source"`
}
type Source struct {
Id string `json:"id"`
Name string `json:"name"`
Age int `json:"age"`
Sex string `json:"sex"`
}
插入文档:
func (es *EsInstance) InsertOneRecord(indexName string, record Record) error {
_, err := es.Client.Index().Index(indexName).Id(record.Source.Id).BodyJson(record).Do(context.Background())
return err
}
2.4 查询文档
根据文档 id 获取文档:
func (es *EsInstance) GetOneRecord(indexName, id string) (*Record, error) {
record := &Record{}
result, err := es.Client.Get().Index(indexName).Id(id).Do(context.Background())
if err != nil {
return record, err
}
if result.Found {
err := json.Unmarshal(result.Source, &record.Source)
}
record.Id = result.Id
record.Index = result.Index
record.Type = result.Type
return record, nil
}
2.5 删除文档
根据文档 id 删除文档:
func (es *EsInstance) DeleteOneRecord(indexName, id string) error {
_, err := es.Client.Delete().Index(indexName).Id(id).Do(context.Background())
return err
}
2.6 更新文档
根据文档 id 更新文档:
func (es *EsInstance) UpdateOneRecord(indexName, id string, record Source) error {
_, err := es.Client.Update().Index(indexName).Id(id).Doc(record).Do(context.Background())
return err
}
2.7 逻辑查询
自定义 DSL 查询:
func (es *EsInstance) Search(indexName string, size int, query elastic.Query) ([]Record, error) {
records := make([]Record, 0)
searchResult, err := es.Client.Search().Index(indexName).Query(query).Size(size).Do(context.Background())
if err != nil {
return records, err
}
if searchResult.Hits.TotalHits.Value > 0 {
for _, hit := range searchResult.Hits.Hits {
record := &Record{}
json.Unmarshal(hit.Source, &record.Source)
record.Id = hit.Id
records = append(records, *record)
}
}
return records, nil
}
2.8 滚动查询
滚动查询数据:
func (es *EsInstance) SearchScroll(indexName string, size int, query elastic.Query) ([]Record, error) {
records := make([]Record, 0)
searchResult, err := es.Client.Scroll(indexName).Query(query).Size(size).Do(context.Background())
scrollId := searchResult.ScrollId
for {
scrollResult, err := es.Client.Scroll().ScrollId(scrollId).Do(context.Background())
if err != nil || scrollResult.Hits.TotalHits.Value == 0 {
break
}
scrollId = scrollResult.ScrollId
for _, hit := range scrollResult.Hits.Hits {
record := &Record{}
json.Unmarshal(hit.Source, &record.Source)
record.Id = hit.Id
records = append(records, *record)
}
}
es.Client.ClearScroll(scrollId).Do(context.Background())
return records, err
}
2.9 批量插入
批量插入数据:
func (es *EsInstance) BatchInsertRecords(indexName string, records []Record) error {
req := es.Client.Bulk().Index(indexName)
for _, record := range records {
doc := elastic.NewBulkIndexRequest().Id(record.Id).Doc(&record.Source)
req.Add(doc)
}
_, err := req.Do(context.Background())
return err
}
2.10 批量更新
批量更新数据:
func (es *EsInstance) BatchUpdateRecords(indexName string, records []Record) error {
req := es.Client.Bulk().Index(indexName)
for _, record := range records {
doc := elastic.NewBulkUpdateRequest().Id(record.Id).Doc(&record.Source)
req.Add(doc)
}
_, err := req.Do(context.Background())
return err
}
2.11 批量删除
批量删除数据:
func (es *EsInstance) BatchDeleteRecords(indexName string, records []Record) error {
req := es.Client.Bulk().Index(indexName)
for _, record := range records {
doc := elastic.NewBulkDeleteRequest().Id(record.Id)
req.Add(doc)
}
_, err := req.Do(context.Background())
return err
}
3. 检索条件设置
示例:
boolQuery := elastic.NewBoolQuery()
boolQuery.Filter(elastic.NewMatchQuery("tag_name", "turn_left"))
boolQuery.Filter(elastic.NewRangeQuery("start_time").Gte(1723737600000))
4. 测试
检索某辆车、某个时间段的数据:
boolQuery.Filter(elastic.NewMatchQuery("tag_name", "turn_left"))
boolQuery.Filter(elastic.NewMatchQuery("car_id", "京A0001"))
boolQuery.Filter(elastic.NewRangeQuery("start_time").Gte(1723737600000))
boolQuery.Filter(elastic.NewRangeQuery("end_time").Lte(1723823999000))
SearchScroll("index_2024", 4000, boolQuery)
5. 总结
本文介绍了 Elasticsearch 的基础操作,包括创建、删除、更新、查询、批量操作等。使用三方包 github.com/olivere/elastic
能有效降低开发成本,提升工作效率。