
Chúng ta cùng nhau sử dụng golang to send data vào elasticsearch.
Install Go Elasticsearch Library
You can install the Go Elasticsearch library using go get:
go get github.com/elastic/go-elasticsearch/v8Create Go Code
Here is a Go code snippet that demonstrates how to create an index in Elasticsearch:
package main
import (
	"bytes"
	"context"
	"encoding/json"
	"log"
	"github.com/elastic/go-elasticsearch/v8"
	"github.com/elastic/go-elasticsearch/v8/esapi"
)
func main() {
	// Initialize Elasticsearch client
	cfg := elasticsearch.Config{
		Addresses: []string{
			"http://localhost:9200",
		},
	}
	es, err := elasticsearch.NewClient(cfg)
	if err != nil {
		log.Fatalf("Error initializing Elasticsearch client: %s", err)
	}
	// Define the index mapping
	indexMapping := map[string]interface{}{
		"mappings": map[string]interface{}{
			"properties": map[string]interface{}{
				"name": map[string]interface{}{
					"type": "text",
				},
				"age": map[string]interface{}{
					"type": "integer",
				},
			},
		},
	}
	// Convert map to JSON
	mappingJSON, err := json.Marshal(indexMapping)
	if err != nil {
		log.Fatalf("Error marshaling index mapping: %s", err)
	}
	// Create the index
	req := esapi.IndicesCreateRequest{
		Index: "my_index",
		Body:  bytes.NewReader(mappingJSON),
	}
	res, err := req.Do(context.Background(), es)
	if err != nil {
		log.Fatalf("Error creating index: %s", err)
	}
	defer res.Body.Close()
	if res.IsError() {
		log.Fatalf("Error: %s", res.String())
	}
	log.Println("Index created successfully")
}
Nếu bạn muốn xem mappingJSON là cái j?
fmt.Println(string(mappingJSON)){
  "mappings": {
    "properties": {
      "age": {
        "type": "integer"
      },
      "name": {
        "type": "text"
      }
    }
  }
}
Apply Singleton with Elasticsearch and Golang.
1) Create Index in Elasticsearch by Golang
Đầu tiên mình sẽ define riêng 1 folder helper/search

Ở helper/search/elasticEngine.go
Mình sẽ thực hiện tạo 1 connection và retry nếu có lỗi:
package search
import (
	"beerus-land/log"
	"github.com/elastic/go-elasticsearch/v8"
	"time"
)
type ElasticEngine struct {
	EE            *elasticsearch.Client
	ConnectString string
}
func (ee *ElasticEngine) Connect() error {
	cfg := elasticsearch.Config{
		Addresses: []string{
			ee.ConnectString,
		},
	}
	maxRetries := 3               // Maximum number of retry attempts
	retryDelay := 5 * time.Second // Delay between retry attempts
	var err error
	for i := 0; i < maxRetries; i++ {
		ee.EE, err = elasticsearch.NewClient(cfg)
		if err == nil {
			log.Info("Connected to ElasticSearch")
			return nil // Connection successful
		}
		log.Error("Failed to connect to ElasticSearch: ", err)
		log.Info("Retrying in ", retryDelay)
		time.Sleep(retryDelay) // Wait before retrying
	}
	log.Error("Max retries reached. Failing...")
	return err // Return the last error
}
func (ee *ElasticEngine) Close() {
	if ee.EE != nil && ee.EE.Transport != nil {
		if transport, ok := ee.EE.Transport.(interface{ CloseIdleConnections() }); ok {
			transport.CloseIdleConnections()
		}
	}
	ee.EE = nil
}
Tiếp đến là helper/search/search_impl
Đây là nơi mà mình design các function và public chúng cho handler:
helper/search/search_impl/elasticEngine_impl.go
package search_impl
import "beerus-land/helper/search"
type ElasticEngineImpl struct {
	elasticEngine *search.ElasticEngine
}
func NewLanderElastic(elasticEngine *search.ElasticEngine) search.LanderElastic {
	return &ElasticEngineImpl{
		elasticEngine: elasticEngine,
	}
}
helper/search/search_impl/landAndHouse_search_impl.go
Chúng ta sẽ cùng đi vào function đầu tiên là create Index trên elastichsearch
package search_impl
import (
	"beerus-land/log"
	"beerus-land/model"
	"bytes"
	"context"
	"encoding/json"
	"fmt"
	"github.com/elastic/go-elasticsearch/v8/esapi"
	"time"
)
func (eei *ElasticEngineImpl) CreateSearchIndices(name string) (model.Indices, error) {
	indeX := model.Indices{}
	formattedIndexName := name + "-" + time.Now().Format("2006.01.02")
	// Create a context for the API calls
	ctx := context.Background()
	// Check if the index already exists
	existsReq := esapi.IndicesExistsRequest{
		Index: []string{formattedIndexName},
	}
	existsRes, err := existsReq.Do(ctx, eei.elasticEngine.EE)
	if err != nil {
		log.Error(err.Error())
		return indeX, err
	}
	defer existsRes.Body.Close()
	if existsRes.StatusCode == 200 {
		indeX = model.Indices{IndexName: formattedIndexName}
		log.Info("Index " + formattedIndexName + " already exists")
		return indeX, nil
	}
	// Define the mapping for HouseAndLand
	mapping := map[string]interface{}{
		"mappings": map[string]interface{}{
			"properties": map[string]interface{}{
				"time-stamp": map[string]interface{}{
					"type": "date",
				},
				"city": map[string]interface{}{
					"type": "text",
					"fields": map[string]interface{}{
						"keyword": map[string]interface{}{
							"type": "keyword",
						},
					},
				},
				"district": map[string]interface{}{
					"type": "text",
					"fields": map[string]interface{}{
						"keyword": map[string]interface{}{
							"type": "keyword",
						},
					},
				},
				"ward": map[string]interface{}{
					"type": "text",
					"fields": map[string]interface{}{
						"keyword": map[string]interface{}{
							"type": "keyword",
						},
					},
				},
				"post-title": map[string]interface{}{
					"type": "text",
					"fields": map[string]interface{}{
						"keyword": map[string]interface{}{
							"type": "keyword",
						},
					},
				},
				"land-area": map[string]interface{}{
					"type": "float",
				},
				"price": map[string]interface{}{
					"type": "float",
				},
				"post_owner": map[string]interface{}{
					"type": "text",
					"fields": map[string]interface{}{
						"keyword": map[string]interface{}{
							"type": "keyword",
						},
					},
				},
				"link": map[string]interface{}{
					"type": "text",
					"fields": map[string]interface{}{
						"keyword": map[string]interface{}{
							"type": "keyword",
						},
					},
				},
				"priority": map[string]interface{}{
					"type": "keyword",
				},
				"number_of_bedroom": map[string]interface{}{
					"type": "integer",
				},
			},
		},
	}
	// Convert the mapping to JSON
	var buf bytes.Buffer
	if err := json.NewEncoder(&buf).Encode(mapping); err != nil {
		log.Fatalf("Error encoding mapping: %s", err)
	}
	req := esapi.IndicesCreateRequest{
		Index: formattedIndexName,
		Body:  &buf,
	}
	// Execute the index creation request
	res, err := req.Do(ctx, eei.elasticEngine.EE)
	if err != nil {
		log.Error(err.Error())
		return indeX, err
	}
	defer res.Body.Close()
	// Check the response status
	if res.IsError() {
		log.Error("Error creating index: %s", res.Status())
		return indeX, fmt.Errorf("Error creating index: %s", res.Status())
	}
	indeX = model.Indices{IndexName: formattedIndexName}
	log.Info("Index " + formattedIndexName + " created successfully")
	return indeX, nil
}
Ở phần này bạn sẽ cần hiểu nhất về mapping trong khi tạo index
Bạn đọc bài bên dưới
Sau khi đã code xong function create index thì mình cần public nó ra:
helper/search/landerElastic.go
package search
import (
	"beerus-land/model"
	"time"
)
type LanderElastic interface {
	CreateSearchIndices(name string) (model.Indices, error)
}
ở file main.yaml
package main
import (
	"beerus-land/handler"
	"beerus-land/helper/automation/automation_impl"
	"beerus-land/helper/search"
	"beerus-land/helper/search/search_impl"
	"beerus-land/log"
	"github.com/go-co-op/gocron"
	"github.com/labstack/echo/v4"
	"os"
	"time"
)
func init() {
	os.Setenv("APP_NAME", "BeerUs LanD")
	log.InitLogger(false)
	os.Setenv("TZ", "Asia/Ho_Chi_Minh")
}
func main() {
	e := echo.New()
	elas := &search.ElasticEngine{
		ConnectString: "http://192.168.101.25:9200",
	}
	elas.Connect()
	defer elas.Close()
	autoCrawHandler := handler.AutoCrawHandler{
		Echo:              nil,
		ElasticEngineImpl: search_impl.NewLanderElastic(elas),
	}
	s := gocron.NewScheduler(time.UTC)
	s.Cron("*/5 * * * *").Do(func() { autoCrawHandler.HandlerCrawPrice() })
	s.StartAsync()
	autoCrawHandler.HandlerCrawPrice()
	e.Logger.Fatal(e.Start(":1323"))
}
Và cuối cùng là trong handler;
handler/autoCraw_handler.go
package handler
import (
	"beerus-land/helper/automation"
	"beerus-land/helper/notify"
	"beerus-land/helper/search"
	"beerus-land/log"
	"beerus-land/model"
	"fmt"
	"github.com/labstack/echo/v4"
	"time"
)
type AutoCrawHandler struct {
	Echo              *echo.Context
	ElasticEngineImpl search.LanderElastic
}
func (a *AutoCrawHandler) HandlerCrawPrice() error {
	lookUp := model.HouseAndLand{
		City:     "tp-ho-chi-minh",
		District: "quan-go-vap",
	}
	crawData, err := a.Automation.CrawPrice(lookUp)
	if err != nil {
		log.Error(err.Error())
		return err
	}
	//fmt.Printf("%+v\n", crawData)
	index, err := a.ElasticEngineImpl.CreateSearchIndices("land-house")
...
...
 
			 
												 
												 
												 
												 
												