Elasticsearch是什么
存储、搜索、分析
PB级数据,近实时
倒排索引
以空间换时间
安装
elasticsearch
elasticsearch.yml
xpack.ml.enabled: false
elasticsearch -d
注意点
不要以root用户启动
用户 文件数和打开进程数调大
kibana
Elastic 官方可视化工具,需要与kibana保持版本一致
kibana.yml
# Default Kibana configuration for docker target
server.name: kibana
server.host: "0"
elasticsearch.hosts: ["http://172.16.213.133:9200"]
xpack.monitoring.ui.container.elasticsearch.enabled: true
i18n.locale: "zh-CN"
docker 命令
docker run -d --restart=always --log-driver json-file --log-opt max-size=100m --log-opt max-file=2 --name kibana -p 5601:5601 -v /Users/xingzhiwei/kibana/kibana.yml:/usr/share/kibana/config/kibana.yml kibana:7.13.4
代码示例
type Tweet struct {
User string `json:"user"`
Message string `json:"message"`
Retweets int `json:"retweets"` // 转发数
Image string `json:"image,omitempty"`
Created time.Time `json:"created,omitempty"`
Tags []string `json:"tags,omitempty"`
Location string `json:"location,omitempty"`
Suggest *elastic.SuggestField `json:"suggest_field,omitempty"`
}
建立连接
client, err := elastic.NewClient(
// elasticsearch 服务地址,多个服务地址使用逗号分隔
elastic.SetURL("http://node02:9200", "http://node03:9200"),
// 基于http base auth验证机制的账号和密码
elastic.SetBasicAuth("user", "secret"),
// 启用gzip压缩
elastic.SetGzip(true),
// 设置监控检查时间间隔
elastic.SetHealthcheckInterval(10*time.Second),
// 设置请求失败最大重试次数
elastic.SetMaxRetries(5),
// 设置错误日志输出
elastic.SetErrorLog(log.New(os.Stderr, "ELASTIC ", log.LstdFlags)),
// 设置info日志输出
elastic.SetInfoLog(log.New(os.Stdout, "", log.LstdFlags)))
if err != nil {
// Handle error
panic(err)
}
_ = client
创建索引
exists, err := client.IndexExists("twitter").Do(ctx)
if err != nil {
panic(err)
}
if !exists {
const mapping = `
{
"settings":{
"number_of_shards":1,
"number_of_replicas":0
},
"mappings": {
"properties": {
"user":{
"type":"keyword"
},
"message":{
"type":"text",
"store":true,
"fielddata":true
},
"retweets":{
"type":"long"
},
"image":{
"type":"keyword"
},
"created":{
"type":"date"
},
"tag":{
"type":"date"
},
"location":{
"type":"geo_point"
},
"suggest_field":{
"type":"completion"
}
}
}
}`
createIndex, err := client.CreateIndex("twitter").
BodyString(mapping).
Do(ctx)
if err != nil {
panic(err)
}
if !createIndex.Acknowledged {
// not acknowledged
}
}
删除索引
//delete an index
deleteIndex, err := client.
DeleteIndex("twitter").
Do(ctx)
if err != nil {
panic(err)
}
if !deleteIndex.Acknowledged {
// not acknowledged
fmt.Println("not acknowledged")
}
添加文档
tweet1 := Tweet{ User: "olivere", Message: "Take Five", Retweets: 0, Created: time.Now(),}put1, err := client.Index().Index("twitter").Id("1").BodyJson(tweet1).Do(ctx)if err != nil { panic(err)}fmt.Printf("Indexed tweet %s to index %s, type %s\n", put1.Id, put1.Index, put1.Type)tweet2 := `{"user":"olivere", "message":"It's a Raggy Waltz", "retweets":0, "created":"2021-08-01T14:43:05.840648+08:00"}`put2, err := client.Index().Index("twitter").Id("2").BodyString(tweet2).Do(ctx)if err != nil { panic(err)}fmt.Printf("Indexed tweet %s to index %s, type %s\n", put2.Id, put2.Index, put2.Type)
查询文档
// Get tweet with specified ID get1, err := client.Get(). Index("twitter"). Id("1"). Do(ctx) if err != nil { switch { case elastic.IsNotFound(err): panic(fmt.Sprintf("Document not found: %v", err)) case elastic.IsTimeout(err): panic(fmt.Sprintf("Timeout retrieving document: %v", err)) case elastic.IsConnErr(err): panic(fmt.Sprintf("Connection problem: %v", err)) default: // Some other kind of error panic(err) } } msg1 := Tweet{} data, _ := get1.Source.MarshalJSON() json.Unmarshal(data, &msg1) fmt.Println(msg1) fmt.Printf("Got document %s in version %d from index %s, type %s\n", get1.Id, get1.Version, get1.Index, get1.Type) // search with a term query searchResult, err := client.Search(). Index("twitter"). Query(elastic.NewTermQuery("user", "olivere")). Sort("user", true). From(0). Size(10). Pretty(true). Do(ctx) if err != nil { panic(err) } fmt.Printf("Query took %d milliseconds\n", searchResult.TookInMillis) var ttyp Tweet for _, item := range searchResult.Each(reflect.TypeOf(ttyp)) { t := item.(Tweet) fmt.Printf("Tweet by %s: %s\n", t.User, t.Message) } fmt.Printf("Found a total of %d tweets\n", searchResult.TotalHits()) if searchResult.TotalHits() > 0 { fmt.Printf("Found a total of %d tweets\n", searchResult.TotalHits()) for _, hit := range searchResult.Hits.Hits { var t Tweet err := json.Unmarshal(hit.Source, &t) if err != nil { panic(err) } fmt.Printf("Tweet by %s: %s\n", t.User, t.Message) } } else { fmt.Print("Found no tweets\n") }
更新文档
// update a tweet by the update API of Elasticsearch script := elastic.NewScript("ctx._source.retweets += params.num").Param("num", 1) update, err := client.Update().Index("twitter").Id("1").Script(script). Upsert(map[string]interface{}{"retweets": 0}).Do(ctx) if err != nil { panic(err) } fmt.Printf("New version of tweet %q is now %d", update.Id, update.Version)
删除文档
// delete a tweet by id _, err = client.Delete(). Index("twitter"). Id("1"). Do(ctx) if err != nil { panic(err) } // delete a tweet by termQuery _, err = client.DeleteByQuery("twitter"). Query(elastic.NewTermQuery("user", "olivere")). // 版本冲突继续执行 ProceedOnVersionConflict(). Do(ctx) if err != nil { panic(err) }
精确查询
// 精确匹配单个字段 searchResult, err = client.Search(). Index("twitter"). Query(elastic.NewTermQuery("message", "take")). Sort("created", true). From(0). Size(10). Pretty(true). Do(ctx) if err != nil { panic(err) } fmt.Printf("查询消耗时间 %d ms, 结果总数: %d\n", searchResult.TookInMillis, searchResult.TotalHits()) if searchResult.TotalHits() > 0 { // 查询结果不为空,则遍历结果 var r Tweet // 通过Each方法,将es结果的json结构转换成struct对象 for _, item := range searchResult.Each(reflect.TypeOf(r)) { if t, ok := item.(Tweet); ok { fmt.Println(t) } } }// 通过terms实现SQL的in查询 searchResult, err = client.Search(). Index("twitter"). Query(elastic.NewTermsQuery("message", "take", "five", "raggy")). From(0). Size(10). Do(ctx) if err != nil { panic(err) }
模糊查询
// 模糊匹配单个字段 searchResult, err = client.Search(). Index("twitter"). Query(elastic.NewMatchQuery("message", "take raggy")). From(0). Size(10). Do(ctx) if err != nil { panic(err) }
范围查询
// 范围查询 searchResult, err = client.Search(). Index("twitter"). Query(elastic.NewRangeQuery("created").Gte("2021-08-01").Lt("2021-08-20")). From(0). Size(10). Do(ctx)
bool组合查询
must
// must // 创建 bool 查询条件 boolQuery := elastic.NewBoolQuery().Must() // 创建 term 查询 termQuery := elastic.NewTermQuery("user", "olivere") matchQuery := elastic.NewMatchQuery("message", "take a") // 设置bool查询的must条件, 组合了两个子查询 // 表示搜索匹配user=olivere且message匹配"take a"的文档 boolQuery.Must(termQuery, matchQuery) searchResult, err = client.Search(). Index("twitter"). Query(boolQuery). From(0). Size(10). Do(ctx)
must_not
// 创建 bool 查询条件 boolQuery = elastic.NewBoolQuery().Must() // 创建 term 查询 //termQuery = elastic.NewTermQuery("user", "olivere") matchQuery = elastic.NewMatchQuery("message", "take five") boolQuery.MustNot(matchQuery)
should
// should 条件 boolQuery = elastic.NewBoolQuery().Must() // 创建 term 查询 termQuery = elastic.NewTermQuery("user", "olivere") matchQuery = elastic.NewMatchQuery("message", "take five") boolQuery.Should(termQuery, matchQuery) searchResult, err = client.Search(). Index("twitter"). Query(boolQuery). From(0). Size(10). Do(ctx)