
Chúng ta cùng nhau sử dụng golang to send data vào elasticsearch.
Install Go Elasticsearch Library
You can install the Go Elasticsearch library using go get:
go get github.com/elastic/go-elasticsearch/v8
Create Go Code
Here is a Go code snippet that demonstrates how to create an index in Elasticsearch:
package main
import (
"bytes"
"context"
"encoding/json"
"log"
"github.com/elastic/go-elasticsearch/v8"
"github.com/elastic/go-elasticsearch/v8/esapi"
)
func main() {
// Initialize Elasticsearch client
cfg := elasticsearch.Config{
Addresses: []string{
"http://localhost:9200",
},
}
es, err := elasticsearch.NewClient(cfg)
if err != nil {
log.Fatalf("Error initializing Elasticsearch client: %s", err)
}
// Define the index mapping
indexMapping := map[string]interface{}{
"mappings": map[string]interface{}{
"properties": map[string]interface{}{
"name": map[string]interface{}{
"type": "text",
},
"age": map[string]interface{}{
"type": "integer",
},
},
},
}
// Convert map to JSON
mappingJSON, err := json.Marshal(indexMapping)
if err != nil {
log.Fatalf("Error marshaling index mapping: %s", err)
}
// Create the index
req := esapi.IndicesCreateRequest{
Index: "my_index",
Body: bytes.NewReader(mappingJSON),
}
res, err := req.Do(context.Background(), es)
if err != nil {
log.Fatalf("Error creating index: %s", err)
}
defer res.Body.Close()
if res.IsError() {
log.Fatalf("Error: %s", res.String())
}
log.Println("Index created successfully")
}
Nếu bạn muốn xem mappingJSON là cái j?
fmt.Println(string(mappingJSON))
{
"mappings": {
"properties": {
"age": {
"type": "integer"
},
"name": {
"type": "text"
}
}
}
}
Apply Singleton with Elasticsearch and Golang.
1) Create Index in Elasticsearch by Golang
Đầu tiên mình sẽ define riêng 1 folder helper/search

Ở helper/search/elasticEngine.go
Mình sẽ thực hiện tạo 1 connection và retry nếu có lỗi:
package search
import (
"beerus-land/log"
"github.com/elastic/go-elasticsearch/v8"
"time"
)
type ElasticEngine struct {
EE *elasticsearch.Client
ConnectString string
}
func (ee *ElasticEngine) Connect() error {
cfg := elasticsearch.Config{
Addresses: []string{
ee.ConnectString,
},
}
maxRetries := 3 // Maximum number of retry attempts
retryDelay := 5 * time.Second // Delay between retry attempts
var err error
for i := 0; i < maxRetries; i++ {
ee.EE, err = elasticsearch.NewClient(cfg)
if err == nil {
log.Info("Connected to ElasticSearch")
return nil // Connection successful
}
log.Error("Failed to connect to ElasticSearch: ", err)
log.Info("Retrying in ", retryDelay)
time.Sleep(retryDelay) // Wait before retrying
}
log.Error("Max retries reached. Failing...")
return err // Return the last error
}
func (ee *ElasticEngine) Close() {
if ee.EE != nil && ee.EE.Transport != nil {
if transport, ok := ee.EE.Transport.(interface{ CloseIdleConnections() }); ok {
transport.CloseIdleConnections()
}
}
ee.EE = nil
}
Tiếp đến là helper/search/search_impl
Đây là nơi mà mình design các function và public chúng cho handler:
helper/search/search_impl/elasticEngine_impl.go
package search_impl
import "beerus-land/helper/search"
type ElasticEngineImpl struct {
elasticEngine *search.ElasticEngine
}
func NewLanderElastic(elasticEngine *search.ElasticEngine) search.LanderElastic {
return &ElasticEngineImpl{
elasticEngine: elasticEngine,
}
}
helper/search/search_impl/landAndHouse_search_impl.go
Chúng ta sẽ cùng đi vào function đầu tiên là create Index trên elastichsearch
package search_impl
import (
"beerus-land/log"
"beerus-land/model"
"bytes"
"context"
"encoding/json"
"fmt"
"github.com/elastic/go-elasticsearch/v8/esapi"
"time"
)
func (eei *ElasticEngineImpl) CreateSearchIndices(name string) (model.Indices, error) {
indeX := model.Indices{}
formattedIndexName := name + "-" + time.Now().Format("2006.01.02")
// Create a context for the API calls
ctx := context.Background()
// Check if the index already exists
existsReq := esapi.IndicesExistsRequest{
Index: []string{formattedIndexName},
}
existsRes, err := existsReq.Do(ctx, eei.elasticEngine.EE)
if err != nil {
log.Error(err.Error())
return indeX, err
}
defer existsRes.Body.Close()
if existsRes.StatusCode == 200 {
indeX = model.Indices{IndexName: formattedIndexName}
log.Info("Index " + formattedIndexName + " already exists")
return indeX, nil
}
// Define the mapping for HouseAndLand
mapping := map[string]interface{}{
"mappings": map[string]interface{}{
"properties": map[string]interface{}{
"time-stamp": map[string]interface{}{
"type": "date",
},
"city": map[string]interface{}{
"type": "text",
"fields": map[string]interface{}{
"keyword": map[string]interface{}{
"type": "keyword",
},
},
},
"district": map[string]interface{}{
"type": "text",
"fields": map[string]interface{}{
"keyword": map[string]interface{}{
"type": "keyword",
},
},
},
"ward": map[string]interface{}{
"type": "text",
"fields": map[string]interface{}{
"keyword": map[string]interface{}{
"type": "keyword",
},
},
},
"post-title": map[string]interface{}{
"type": "text",
"fields": map[string]interface{}{
"keyword": map[string]interface{}{
"type": "keyword",
},
},
},
"land-area": map[string]interface{}{
"type": "float",
},
"price": map[string]interface{}{
"type": "float",
},
"post_owner": map[string]interface{}{
"type": "text",
"fields": map[string]interface{}{
"keyword": map[string]interface{}{
"type": "keyword",
},
},
},
"link": map[string]interface{}{
"type": "text",
"fields": map[string]interface{}{
"keyword": map[string]interface{}{
"type": "keyword",
},
},
},
"priority": map[string]interface{}{
"type": "keyword",
},
"number_of_bedroom": map[string]interface{}{
"type": "integer",
},
},
},
}
// Convert the mapping to JSON
var buf bytes.Buffer
if err := json.NewEncoder(&buf).Encode(mapping); err != nil {
log.Fatalf("Error encoding mapping: %s", err)
}
req := esapi.IndicesCreateRequest{
Index: formattedIndexName,
Body: &buf,
}
// Execute the index creation request
res, err := req.Do(ctx, eei.elasticEngine.EE)
if err != nil {
log.Error(err.Error())
return indeX, err
}
defer res.Body.Close()
// Check the response status
if res.IsError() {
log.Error("Error creating index: %s", res.Status())
return indeX, fmt.Errorf("Error creating index: %s", res.Status())
}
indeX = model.Indices{IndexName: formattedIndexName}
log.Info("Index " + formattedIndexName + " created successfully")
return indeX, nil
}
Ở phần này bạn sẽ cần hiểu nhất về mapping trong khi tạo index
Bạn đọc bài bên dưới
Sau khi đã code xong function create index thì mình cần public nó ra:
helper/search/landerElastic.go
package search
import (
"beerus-land/model"
"time"
)
type LanderElastic interface {
CreateSearchIndices(name string) (model.Indices, error)
}
ở file main.yaml
package main
import (
"beerus-land/handler"
"beerus-land/helper/automation/automation_impl"
"beerus-land/helper/search"
"beerus-land/helper/search/search_impl"
"beerus-land/log"
"github.com/go-co-op/gocron"
"github.com/labstack/echo/v4"
"os"
"time"
)
func init() {
os.Setenv("APP_NAME", "BeerUs LanD")
log.InitLogger(false)
os.Setenv("TZ", "Asia/Ho_Chi_Minh")
}
func main() {
e := echo.New()
elas := &search.ElasticEngine{
ConnectString: "http://192.168.101.25:9200",
}
elas.Connect()
defer elas.Close()
autoCrawHandler := handler.AutoCrawHandler{
Echo: nil,
ElasticEngineImpl: search_impl.NewLanderElastic(elas),
}
s := gocron.NewScheduler(time.UTC)
s.Cron("*/5 * * * *").Do(func() { autoCrawHandler.HandlerCrawPrice() })
s.StartAsync()
autoCrawHandler.HandlerCrawPrice()
e.Logger.Fatal(e.Start(":1323"))
}
Và cuối cùng là trong handler;
handler/autoCraw_handler.go
package handler
import (
"beerus-land/helper/automation"
"beerus-land/helper/notify"
"beerus-land/helper/search"
"beerus-land/log"
"beerus-land/model"
"fmt"
"github.com/labstack/echo/v4"
"time"
)
type AutoCrawHandler struct {
Echo *echo.Context
ElasticEngineImpl search.LanderElastic
}
func (a *AutoCrawHandler) HandlerCrawPrice() error {
lookUp := model.HouseAndLand{
City: "tp-ho-chi-minh",
District: "quan-go-vap",
}
crawData, err := a.Automation.CrawPrice(lookUp)
if err != nil {
log.Error(err.Error())
return err
}
//fmt.Printf("%+v\n", crawData)
index, err := a.ElasticEngineImpl.CreateSearchIndices("land-house")
...
...