1.介绍
Elasticsearch(ES)
是一个基于Lucene
构建的开源、分布式、RESTful
接口的全文搜索引擎。Elasticsearch
还是一个分布式文档数据库,其中每个字段均可被索引,而且每个字段的数据均可被搜索,ES
能够横向扩展至数以百计的服务器存储以及处理PB级的数据。可以在极短的时间内存储、搜索和分析大量的数据。通常作为具有复杂搜索场景情况下的核心发动机。根据DB-Engines
的排名显示,Elasticsearch
是最受欢迎的企业搜索引擎。
在Go
语言中经常使用的包有以下两个,截止到(2021.07.10):
2.安装
这里使用olivere/elastic
,@注意: 下载包的版本需要和ES版本相同,如我们这里使用的ES是7.13.3的版本,那么我们就需要下载olivere/elastic/v7
。
go get github.com/olivere/elastic/v7
|
3. 使用
3.1 创建客户端
package test
import ( "context" "fmt" "github.com/olivere/elastic/v7" "log" "os" "testing" "time" )
func connectEs() (*elastic.Client, error) { return elastic.NewClient( elastic.SetURL("http://127.0.0.1:9200"), elastic.SetSniff(false), elastic.SetHealthcheckInterval(time.Second*5), elastic.SetErrorLog(log.New(os.Stderr, "ES-ERROR ", log.LstdFlags)), elastic.SetInfoLog(log.New(os.Stdout, "ES-INFO ", log.LstdFlags)), ) }
func TestConnectES(t *testing.T) { client, err := connectEs() if err != nil { t.Error(err) return } do, _ := client.ClusterHealth().Index().Do(context.TODO()) fmt.Println("健康检查:",do) }
|
a.参数设置整理
elastic.SetURL(url)
elastic.SetBasicAuth("user", "secret")
elastic.SetGzip(true),
elastic.SetHealthcheckInterval(10*time.Second),
elastic.SetSniff(false)
elastic.SetErrorLog(log.New(os.Stderr, "ELASTIC-ERROR ", log.LstdFlags)),
elastic.SetInfoLog(log.New(os.Stdout, "ELASTIC-INFO ", log.LstdFlags)),
|
@注意:如果你的ElasticSearch是通过docker安装,若不设置elastic.SetSniff(false)
,会报错: no active connection found: no Elasticsearch node available
3.2 创建索引
func TestCreateIndexMapping(t *testing.T) { userMapping := `{ "mappings":{ "properties":{ "name":{ "type":"keyword" }, "age":{ "type":"byte" }, "birth":{ "type":"date" } } } }` client, _ := connectEs() indexName := "go-test" ctx := context.Background() exist, err := client.IndexExists(indexName).Do(ctx) if err != nil { t.Errorf("检测索引失败:%s", err) return } if exist { t.Error("索引已经存在,无需重复创建!") return } res, err := client.CreateIndex(indexName).BodyString(userMapping).Do(ctx) if exist { t.Errorf("创建索引失败:%s", err) return } fmt.Println("创建成功:", res) }
|
如果想直接创建索引,只需删除BodyString(userMapping)
,如下:
res, err := client.CreateIndex(indexName).BodyString(userMapping).Do(ctx)
res, err := client.CreateIndex(indexName).Do(ctx)
|
3.3 添加数据
1. 单条添加
type UserInfo struct { Name string `json:"name"` Age int `json:"age"` Birth string `json:"birth"` }
func TestAddOne(t *testing.T) { client, _ := connectEs() ctx := context.Background() userInfo := UserInfo{ Name: "张三", Age: 18, Birth: "1991-03-04", } res, err := client.Index().Index("go-test").Id("1").BodyJson(userInfo).Do(ctx) if err != nil { t.Errorf("添加失败:%s",err) } fmt.Println("添加成功",res) }
|
2. 批量添加
func TestBatchAdd(t *testing.T) { client, _ := connectEs() ctx := context.Background() userNames := map[string]string{ "李四": "1992-04-25", "张亮": "1994-07-15", "小明": "1991-12-03", } rand.Seed(time.Now().Unix()) userBulk := client.Bulk().Index("go-test") id := 4 for n, b := range userNames { userTmp := UserInfo{Name: n, Age: rand.Intn(50), Birth: b} doc := elastic.NewBulkIndexRequest().Id(strconv.Itoa(id)).Doc(userTmp) userBulk.Add(doc) id++ } if userBulk.NumberOfActions() < 1 { t.Error("被添加的数据不能为空!") return } res, err := userBulk.Do(ctx) if err != nil { t.Errorf("保存失败:%s", err) return } fmt.Println("保存成功: ", res) }
|
3.4 单条更新
1. 单字段更新(Script
)
func TestUpdateOneByScript(t *testing.T) { client, _ := connectEs() ctx := context.Background()
res, err := client.Update().Index("go-test").Id("1"). Script(elastic.NewScript("ctx._source.birth='1999-09-09'")).Do(ctx) if err != nil { t.Errorf("根据ID更新单条记录失败:%s", err) return } fmt.Println("根据ID更新成功:", res.Result) res2, err := client.UpdateByQuery("go-test").Query(elastic.NewTermQuery("name", "小明")). Script(elastic.NewScript("ctx._source.age=22")).ProceedOnVersionConflict().Do(ctx) if err != nil { t.Errorf("根据条件更新单条记录失败:%s", err) return } fmt.Println("根据条件更新成功:", res2.Updated) }
|
2. 多字段更新(doc
)
func TestUpdateOneByDoc(t *testing.T) { client, _ := connectEs() ctx := context.Background() res, _ := client.Update().Index("go-test").Id("5").Doc(map[string]interface{}{ "name": "小白", "age": 30, }).Do(ctx) fmt.Println("更新结果:", res.Result) }
|
3.5 批量更新
func TestBatchUpdate(t *testing.T) { client,_ := connectEs() ctx := context.Background() bulkReq := client.Bulk().Index("go-test") for _, id := range []string{"4","5","6","7"} { doc := elastic.NewBulkUpdateRequest().Id(id).Doc(map[string]interface{}{"age": 18}) bulkReq.Add(doc) } if bulkReq.NumberOfActions() < 0 { t.Error("被更新的数量不能为空") return } do, err := bulkReq.Do(ctx) if err != nil { t.Errorf("批量更新失败:%v",err) return } fmt.Println("更新成功:",do.Updated()) }
|
3.6 查询
1. 单条查询
func TestSearchOneEs(t *testing.T) { client,_ := connectEs() ctx := context.Background() getResult, err := client.Get().Index("go-test").Id("1").Do(ctx) if err != nil { t.Errorf("获取失败: %s",err) return } json, _ := getResult.Source.MarshalJSON() fmt.Printf("查询单条结果:%s \n",json) }
|
2. 批量查询
func TestSearchMoreES(t *testing.T) { client,_ := connectEs() ctx := context.Background() searchResult, err := client.Search().Index("go-test"). Query(elastic.NewMatchQuery("age", 18)). From(0). Size(10). Pretty(true). Do(ctx) if err != nil { t.Errorf("获取失败: %s",err) return } var userList []UserInfo for _, val := range searchResult.Each(reflect.TypeOf(UserInfo{})) { tmp := val.(UserInfo) userList = append(userList,tmp) } fmt.Printf("查询结果:%v\n",userList) }
|
3.7 删除
1. 根据ID删除
func TestDelById(t *testing.T) { client, _ := connectEs() ctx := context.Background() do, err := client.Delete().Index("go-test").Id("1").Do(ctx) if err != nil { t.Errorf("删除失败:%s",err) return } fmt.Println("删除成功: ",do.Result) }
|
2. 根据条件删除
func TestDelByWhere(t *testing.T) { client, _ := connectEs() ctx := context.Background() do, err := client.DeleteByQuery("go-test").Query(elastic.NewTermQuery("age", 18)). ProceedOnVersionConflict().Do(ctx) if err != nil { t.Errorf("删除失败:%s",err) return } fmt.Println("删除成功: ",do.Deleted) }
|
https://godoc.org/github.com/olivere/elastic