
Chúng ta cùng nhau sử dụng golang to send data vào elasticsearch.
Install Go Elasticsearch Library
You can install the Go Elasticsearch library using go get
:
go get github.com/elastic/go-elasticsearch/v8
Create Go Code
Here is a Go code snippet that demonstrates how to create an index in Elasticsearch:
package main import ( "bytes" "context" "encoding/json" "log" "github.com/elastic/go-elasticsearch/v8" "github.com/elastic/go-elasticsearch/v8/esapi" ) func main() { // Initialize Elasticsearch client cfg := elasticsearch.Config{ Addresses: []string{ "http://localhost:9200", }, } es, err := elasticsearch.NewClient(cfg) if err != nil { log.Fatalf("Error initializing Elasticsearch client: %s", err) } // Define the index mapping indexMapping := map[string]interface{}{ "mappings": map[string]interface{}{ "properties": map[string]interface{}{ "name": map[string]interface{}{ "type": "text", }, "age": map[string]interface{}{ "type": "integer", }, }, }, } // Convert map to JSON mappingJSON, err := json.Marshal(indexMapping) if err != nil { log.Fatalf("Error marshaling index mapping: %s", err) } // Create the index req := esapi.IndicesCreateRequest{ Index: "my_index", Body: bytes.NewReader(mappingJSON), } res, err := req.Do(context.Background(), es) if err != nil { log.Fatalf("Error creating index: %s", err) } defer res.Body.Close() if res.IsError() { log.Fatalf("Error: %s", res.String()) } log.Println("Index created successfully") }
Nếu bạn muốn xem mappingJSON là cái j?
fmt.Println(string(mappingJSON))
{ "mappings": { "properties": { "age": { "type": "integer" }, "name": { "type": "text" } } } }
Apply Singleton with Elasticsearch and Golang.
1) Create Index in Elasticsearch by Golang
Đầu tiên mình sẽ define riêng 1 folder helper/search

Ở helper/search/elasticEngine.go
Mình sẽ thực hiện tạo 1 connection và retry nếu có lỗi:
package search import ( "beerus-land/log" "github.com/elastic/go-elasticsearch/v8" "time" ) type ElasticEngine struct { EE *elasticsearch.Client ConnectString string } func (ee *ElasticEngine) Connect() error { cfg := elasticsearch.Config{ Addresses: []string{ ee.ConnectString, }, } maxRetries := 3 // Maximum number of retry attempts retryDelay := 5 * time.Second // Delay between retry attempts var err error for i := 0; i < maxRetries; i++ { ee.EE, err = elasticsearch.NewClient(cfg) if err == nil { log.Info("Connected to ElasticSearch") return nil // Connection successful } log.Error("Failed to connect to ElasticSearch: ", err) log.Info("Retrying in ", retryDelay) time.Sleep(retryDelay) // Wait before retrying } log.Error("Max retries reached. Failing...") return err // Return the last error } func (ee *ElasticEngine) Close() { if ee.EE != nil && ee.EE.Transport != nil { if transport, ok := ee.EE.Transport.(interface{ CloseIdleConnections() }); ok { transport.CloseIdleConnections() } } ee.EE = nil }
Tiếp đến là helper/search/search_impl
Đây là nơi mà mình design các function và public chúng cho handler:
helper/search/search_impl/elasticEngine_impl.go
package search_impl import "beerus-land/helper/search" type ElasticEngineImpl struct { elasticEngine *search.ElasticEngine } func NewLanderElastic(elasticEngine *search.ElasticEngine) search.LanderElastic { return &ElasticEngineImpl{ elasticEngine: elasticEngine, } }
helper/search/search_impl/landAndHouse_search_impl.go
Chúng ta sẽ cùng đi vào function đầu tiên là create Index trên elastichsearch
package search_impl import ( "beerus-land/log" "beerus-land/model" "bytes" "context" "encoding/json" "fmt" "github.com/elastic/go-elasticsearch/v8/esapi" "time" ) func (eei *ElasticEngineImpl) CreateSearchIndices(name string) (model.Indices, error) { indeX := model.Indices{} formattedIndexName := name + "-" + time.Now().Format("2006.01.02") // Create a context for the API calls ctx := context.Background() // Check if the index already exists existsReq := esapi.IndicesExistsRequest{ Index: []string{formattedIndexName}, } existsRes, err := existsReq.Do(ctx, eei.elasticEngine.EE) if err != nil { log.Error(err.Error()) return indeX, err } defer existsRes.Body.Close() if existsRes.StatusCode == 200 { indeX = model.Indices{IndexName: formattedIndexName} log.Info("Index " + formattedIndexName + " already exists") return indeX, nil } // Define the mapping for HouseAndLand mapping := map[string]interface{}{ "mappings": map[string]interface{}{ "properties": map[string]interface{}{ "time-stamp": map[string]interface{}{ "type": "date", }, "city": map[string]interface{}{ "type": "text", "fields": map[string]interface{}{ "keyword": map[string]interface{}{ "type": "keyword", }, }, }, "district": map[string]interface{}{ "type": "text", "fields": map[string]interface{}{ "keyword": map[string]interface{}{ "type": "keyword", }, }, }, "ward": map[string]interface{}{ "type": "text", "fields": map[string]interface{}{ "keyword": map[string]interface{}{ "type": "keyword", }, }, }, "post-title": map[string]interface{}{ "type": "text", "fields": map[string]interface{}{ "keyword": map[string]interface{}{ "type": "keyword", }, }, }, "land-area": map[string]interface{}{ "type": "float", }, "price": map[string]interface{}{ "type": "float", }, "post_owner": map[string]interface{}{ "type": "text", "fields": map[string]interface{}{ "keyword": map[string]interface{}{ "type": "keyword", }, }, }, "link": map[string]interface{}{ "type": "text", "fields": map[string]interface{}{ "keyword": map[string]interface{}{ "type": "keyword", }, }, }, "priority": map[string]interface{}{ "type": "keyword", }, "number_of_bedroom": map[string]interface{}{ "type": "integer", }, }, }, } // Convert the mapping to JSON var buf bytes.Buffer if err := json.NewEncoder(&buf).Encode(mapping); err != nil { log.Fatalf("Error encoding mapping: %s", err) } req := esapi.IndicesCreateRequest{ Index: formattedIndexName, Body: &buf, } // Execute the index creation request res, err := req.Do(ctx, eei.elasticEngine.EE) if err != nil { log.Error(err.Error()) return indeX, err } defer res.Body.Close() // Check the response status if res.IsError() { log.Error("Error creating index: %s", res.Status()) return indeX, fmt.Errorf("Error creating index: %s", res.Status()) } indeX = model.Indices{IndexName: formattedIndexName} log.Info("Index " + formattedIndexName + " created successfully") return indeX, nil }
Ở phần này bạn sẽ cần hiểu nhất về mapping trong khi tạo index
Bạn đọc bài bên dưới
Sau khi đã code xong function create index thì mình cần public nó ra:
helper/search/landerElastic.go
package search import ( "beerus-land/model" "time" ) type LanderElastic interface { CreateSearchIndices(name string) (model.Indices, error) }
ở file main.yaml
package main import ( "beerus-land/handler" "beerus-land/helper/automation/automation_impl" "beerus-land/helper/search" "beerus-land/helper/search/search_impl" "beerus-land/log" "github.com/go-co-op/gocron" "github.com/labstack/echo/v4" "os" "time" ) func init() { os.Setenv("APP_NAME", "BeerUs LanD") log.InitLogger(false) os.Setenv("TZ", "Asia/Ho_Chi_Minh") } func main() { e := echo.New() elas := &search.ElasticEngine{ ConnectString: "http://192.168.101.25:9200", } elas.Connect() defer elas.Close() autoCrawHandler := handler.AutoCrawHandler{ Echo: nil, ElasticEngineImpl: search_impl.NewLanderElastic(elas), } s := gocron.NewScheduler(time.UTC) s.Cron("*/5 * * * *").Do(func() { autoCrawHandler.HandlerCrawPrice() }) s.StartAsync() autoCrawHandler.HandlerCrawPrice() e.Logger.Fatal(e.Start(":1323")) }
Và cuối cùng là trong handler;
handler/autoCraw_handler.go
package handler import ( "beerus-land/helper/automation" "beerus-land/helper/notify" "beerus-land/helper/search" "beerus-land/log" "beerus-land/model" "fmt" "github.com/labstack/echo/v4" "time" ) type AutoCrawHandler struct { Echo *echo.Context ElasticEngineImpl search.LanderElastic } func (a *AutoCrawHandler) HandlerCrawPrice() error { lookUp := model.HouseAndLand{ City: "tp-ho-chi-minh", District: "quan-go-vap", } crawData, err := a.Automation.CrawPrice(lookUp) if err != nil { log.Error(err.Error()) return err } //fmt.Printf("%+v\n", crawData) index, err := a.ElasticEngineImpl.CreateSearchIndices("land-house") ... ...