Skip to content

NimTechnology

Trình bày các công nghệ CLOUD một cách dễ hiểu.

  • Kubernetes & Container
    • Docker
    • Kubernetes
      • Ingress
      • Pod
    • Helm Chart
    • OAuth2 Proxy
    • Isito-EnvoyFilter
    • Apache Kafka
      • Kafka
      • Kafka Connect
      • Lenses
    • Vault
    • Longhorn – Storage
    • VictoriaMetrics
    • MetalLB
    • Kong Gateway
  • CI/CD
    • ArgoCD
    • ArgoWorkflows
    • Argo Events
    • Spinnaker
    • Jenkins
    • Harbor
    • TeamCity
    • Git
      • Bitbucket
  • Coding
    • DevSecOps
    • Terraform
      • GCP – Google Cloud
      • AWS – Amazon Web Service
      • Azure Cloud
    • Golang
    • Laravel
    • Python
    • Jquery & JavaScript
    • Selenium
  • Log, Monitor & Tracing
    • DataDog
    • Prometheus
    • Grafana
    • ELK
      • Kibana
      • Logstash
  • BareMetal
    • NextCloud
  • Toggle search form

[Golang/Elasticsearch] How to send data or anything to Elasticsearch by Golang

Posted on October 17, 2023October 17, 2023 By nim No Comments on [Golang/Elasticsearch] How to send data or anything to Elasticsearch by Golang

Chúng ta cùng nhau sử dụng golang to send data vào elasticsearch.

Contents

Toggle
  • Install Go Elasticsearch Library
  • Create Go Code
  • Apply Singleton with Elasticsearch and Golang.
    • 1) Create Index in Elasticsearch by Golang
    • 2) Push data to Elasticsearch.

Install Go Elasticsearch Library

You can install the Go Elasticsearch library using go get:

go get github.com/elastic/go-elasticsearch/v8

Create Go Code

Here is a Go code snippet that demonstrates how to create an index in Elasticsearch:

package main

import (
	"bytes"
	"context"
	"encoding/json"
	"log"

	"github.com/elastic/go-elasticsearch/v8"
	"github.com/elastic/go-elasticsearch/v8/esapi"
)

func main() {
	// Initialize Elasticsearch client
	cfg := elasticsearch.Config{
		Addresses: []string{
			"http://localhost:9200",
		},
	}
	es, err := elasticsearch.NewClient(cfg)
	if err != nil {
		log.Fatalf("Error initializing Elasticsearch client: %s", err)
	}

	// Define the index mapping
	indexMapping := map[string]interface{}{
		"mappings": map[string]interface{}{
			"properties": map[string]interface{}{
				"name": map[string]interface{}{
					"type": "text",
				},
				"age": map[string]interface{}{
					"type": "integer",
				},
			},
		},
	}

	// Convert map to JSON
	mappingJSON, err := json.Marshal(indexMapping)
	if err != nil {
		log.Fatalf("Error marshaling index mapping: %s", err)
	}

	// Create the index
	req := esapi.IndicesCreateRequest{
		Index: "my_index",
		Body:  bytes.NewReader(mappingJSON),
	}

	res, err := req.Do(context.Background(), es)
	if err != nil {
		log.Fatalf("Error creating index: %s", err)
	}
	defer res.Body.Close()

	if res.IsError() {
		log.Fatalf("Error: %s", res.String())
	}

	log.Println("Index created successfully")
}

Nếu bạn muốn xem mappingJSON là cái j?

fmt.Println(string(mappingJSON))
{
  "mappings": {
    "properties": {
      "age": {
        "type": "integer"
      },
      "name": {
        "type": "text"
      }
    }
  }
}

Apply Singleton with Elasticsearch and Golang.

1) Create Index in Elasticsearch by Golang

Đầu tiên mình sẽ define riêng 1 folder helper/search

Ở helper/search/elasticEngine.go
Mình sẽ thực hiện tạo 1 connection và retry nếu có lỗi:

package search

import (
	"beerus-land/log"
	"github.com/elastic/go-elasticsearch/v8"
	"time"
)

type ElasticEngine struct {
	EE            *elasticsearch.Client
	ConnectString string
}

func (ee *ElasticEngine) Connect() error {
	cfg := elasticsearch.Config{
		Addresses: []string{
			ee.ConnectString,
		},
	}

	maxRetries := 3               // Maximum number of retry attempts
	retryDelay := 5 * time.Second // Delay between retry attempts

	var err error
	for i := 0; i < maxRetries; i++ {
		ee.EE, err = elasticsearch.NewClient(cfg)
		if err == nil {
			log.Info("Connected to ElasticSearch")
			return nil // Connection successful
		}

		log.Error("Failed to connect to ElasticSearch: ", err)
		log.Info("Retrying in ", retryDelay)
		time.Sleep(retryDelay) // Wait before retrying
	}

	log.Error("Max retries reached. Failing...")
	return err // Return the last error
}

func (ee *ElasticEngine) Close() {
	if ee.EE != nil && ee.EE.Transport != nil {
		if transport, ok := ee.EE.Transport.(interface{ CloseIdleConnections() }); ok {
			transport.CloseIdleConnections()
		}
	}
	ee.EE = nil
}

Tiếp đến là helper/search/search_impl
Đây là nơi mà mình design các function và public chúng cho handler:

helper/search/search_impl/elasticEngine_impl.go

package search_impl

import "beerus-land/helper/search"

type ElasticEngineImpl struct {
	elasticEngine *search.ElasticEngine
}

func NewLanderElastic(elasticEngine *search.ElasticEngine) search.LanderElastic {
	return &ElasticEngineImpl{
		elasticEngine: elasticEngine,
	}
}

helper/search/search_impl/landAndHouse_search_impl.go
Chúng ta sẽ cùng đi vào function đầu tiên là create Index trên elastichsearch

package search_impl

import (
	"beerus-land/log"
	"beerus-land/model"
	"bytes"
	"context"
	"encoding/json"
	"fmt"
	"github.com/elastic/go-elasticsearch/v8/esapi"
	"time"
)

func (eei *ElasticEngineImpl) CreateSearchIndices(name string) (model.Indices, error) {

	indeX := model.Indices{}

	formattedIndexName := name + "-" + time.Now().Format("2006.01.02")
	// Create a context for the API calls
	ctx := context.Background()

	// Check if the index already exists
	existsReq := esapi.IndicesExistsRequest{
		Index: []string{formattedIndexName},
	}
	existsRes, err := existsReq.Do(ctx, eei.elasticEngine.EE)
	if err != nil {
		log.Error(err.Error())
		return indeX, err
	}
	defer existsRes.Body.Close()

	if existsRes.StatusCode == 200 {
		indeX = model.Indices{IndexName: formattedIndexName}
		log.Info("Index " + formattedIndexName + " already exists")
		return indeX, nil
	}

	// Define the mapping for HouseAndLand
	mapping := map[string]interface{}{
		"mappings": map[string]interface{}{
			"properties": map[string]interface{}{
				"time-stamp": map[string]interface{}{
					"type": "date",
				},
				"city": map[string]interface{}{
					"type": "text",
					"fields": map[string]interface{}{
						"keyword": map[string]interface{}{
							"type": "keyword",
						},
					},
				},
				"district": map[string]interface{}{
					"type": "text",
					"fields": map[string]interface{}{
						"keyword": map[string]interface{}{
							"type": "keyword",
						},
					},
				},
				"ward": map[string]interface{}{
					"type": "text",
					"fields": map[string]interface{}{
						"keyword": map[string]interface{}{
							"type": "keyword",
						},
					},
				},
				"post-title": map[string]interface{}{
					"type": "text",
					"fields": map[string]interface{}{
						"keyword": map[string]interface{}{
							"type": "keyword",
						},
					},
				},
				"land-area": map[string]interface{}{
					"type": "float",
				},
				"price": map[string]interface{}{
					"type": "float",
				},
				"post_owner": map[string]interface{}{
					"type": "text",
					"fields": map[string]interface{}{
						"keyword": map[string]interface{}{
							"type": "keyword",
						},
					},
				},
				"link": map[string]interface{}{
					"type": "text",
					"fields": map[string]interface{}{
						"keyword": map[string]interface{}{
							"type": "keyword",
						},
					},
				},
				"priority": map[string]interface{}{
					"type": "keyword",
				},
				"number_of_bedroom": map[string]interface{}{
					"type": "integer",
				},
			},
		},
	}

	// Convert the mapping to JSON
	var buf bytes.Buffer
	if err := json.NewEncoder(&buf).Encode(mapping); err != nil {
		log.Fatalf("Error encoding mapping: %s", err)
	}

	req := esapi.IndicesCreateRequest{
		Index: formattedIndexName,
		Body:  &buf,
	}

	// Execute the index creation request
	res, err := req.Do(ctx, eei.elasticEngine.EE)
	if err != nil {
		log.Error(err.Error())
		return indeX, err
	}
	defer res.Body.Close()

	// Check the response status
	if res.IsError() {
		log.Error("Error creating index: %s", res.Status())
		return indeX, fmt.Errorf("Error creating index: %s", res.Status())
	}

	indeX = model.Indices{IndexName: formattedIndexName}
	log.Info("Index " + formattedIndexName + " created successfully")
	return indeX, nil
}

Ở phần này bạn sẽ cần hiểu nhất về mapping trong khi tạo index
Bạn đọc bài bên dưới

[Golang] Define mapping when creating an index on Elasticsearch by Golang.

Sau khi đã code xong function create index thì mình cần public nó ra:
helper/search/landerElastic.go

package search

import (
	"beerus-land/model"
	"time"
)

type LanderElastic interface {
	CreateSearchIndices(name string) (model.Indices, error)
}

ở file main.yaml

package main

import (
	"beerus-land/handler"
	"beerus-land/helper/automation/automation_impl"
	"beerus-land/helper/search"
	"beerus-land/helper/search/search_impl"
	"beerus-land/log"
	"github.com/go-co-op/gocron"
	"github.com/labstack/echo/v4"
	"os"
	"time"
)

func init() {
	os.Setenv("APP_NAME", "BeerUs LanD")
	log.InitLogger(false)
	os.Setenv("TZ", "Asia/Ho_Chi_Minh")
}

func main() {
	e := echo.New()

	elas := &search.ElasticEngine{
		ConnectString: "http://192.168.101.25:9200",
	}
	elas.Connect()
	defer elas.Close()

	autoCrawHandler := handler.AutoCrawHandler{
		Echo:              nil,
		ElasticEngineImpl: search_impl.NewLanderElastic(elas),
	}

	s := gocron.NewScheduler(time.UTC)
	s.Cron("*/5 * * * *").Do(func() { autoCrawHandler.HandlerCrawPrice() })
	s.StartAsync()
	autoCrawHandler.HandlerCrawPrice()

	e.Logger.Fatal(e.Start(":1323"))
}

Và cuối cùng là trong handler;
handler/autoCraw_handler.go

package handler

import (
	"beerus-land/helper/automation"
	"beerus-land/helper/notify"
	"beerus-land/helper/search"
	"beerus-land/log"
	"beerus-land/model"
	"fmt"
	"github.com/labstack/echo/v4"
	"time"
)

type AutoCrawHandler struct {
	Echo              *echo.Context
	ElasticEngineImpl search.LanderElastic
}

func (a *AutoCrawHandler) HandlerCrawPrice() error {
	lookUp := model.HouseAndLand{
		City:     "tp-ho-chi-minh",
		District: "quan-go-vap",
	}
	crawData, err := a.Automation.CrawPrice(lookUp)
	if err != nil {
		log.Error(err.Error())
		return err
	}
	//fmt.Printf("%+v\n", crawData)

	index, err := a.ElasticEngineImpl.CreateSearchIndices("land-house")

...
...

2) Push data to Elasticsearch.

ELK

Post navigation

Previous Post: [Golang] How is Singleton in Golang.
Next Post: How do companies ship code to production?

More Related Articles

[Kibana] Tutorial export and import much data(dashboard,…) in Kibana – ELK ELK
[error] Solve elasticsearch error Limit of total fields [1000] in index [xxx] ELK
[Elastic] Install Elasticsearch through package manager. ELK
[Fortigate] Cấu hình firewall fortigate gửi syslog sang Logstash BareMetal
[APM/Elastic] Install APM Elastic inside container WordPress. ELK
[Logstash] Discovering Pipeline inside Logstash ELK

Leave a Reply Cancel reply

Your email address will not be published. Required fields are marked *

Tham Gia Group DevOps nhé!
Để Nim có nhiều động lực ra nhiều bài viết.
Để nhận được những thông báo mới nhất.

Recent Posts

  • [Azure] The subscription is not registered to use namespace ‘Microsoft.ContainerService’ May 8, 2025
  • [Azure] Insufficient regional vcpu quota left May 8, 2025
  • [WordPress] How to add a Dynamic watermark on WordPress. May 6, 2025
  • [vnet/Azure] VNet provisioning via Terraform. April 28, 2025
  • [tracetcp] How to perform a tracert command using a specific port. April 3, 2025

Archives

  • May 2025
  • April 2025
  • March 2025
  • February 2025
  • January 2025
  • December 2024
  • November 2024
  • October 2024
  • September 2024
  • August 2024
  • July 2024
  • June 2024
  • May 2024
  • April 2024
  • March 2024
  • February 2024
  • January 2024
  • December 2023
  • November 2023
  • October 2023
  • September 2023
  • August 2023
  • July 2023
  • June 2023
  • May 2023
  • April 2023
  • March 2023
  • February 2023
  • January 2023
  • December 2022
  • November 2022
  • October 2022
  • September 2022
  • August 2022
  • July 2022
  • June 2022
  • May 2022
  • April 2022
  • March 2022
  • February 2022
  • January 2022
  • December 2021
  • November 2021
  • October 2021
  • September 2021
  • August 2021
  • July 2021
  • June 2021

Categories

  • BareMetal
    • NextCloud
  • CI/CD
    • Argo Events
    • ArgoCD
    • ArgoWorkflows
    • Git
      • Bitbucket
    • Harbor
    • Jenkins
    • Spinnaker
    • TeamCity
  • Coding
    • DevSecOps
    • Golang
    • Jquery & JavaScript
    • Laravel
    • NextJS 14 & ReactJS & Type Script
    • Python
    • Selenium
    • Terraform
      • AWS – Amazon Web Service
      • Azure Cloud
      • GCP – Google Cloud
  • Kubernetes & Container
    • Apache Kafka
      • Kafka
      • Kafka Connect
      • Lenses
    • Docker
    • Helm Chart
    • Isito-EnvoyFilter
    • Kong Gateway
    • Kubernetes
      • Ingress
      • Pod
    • Longhorn – Storage
    • MetalLB
    • OAuth2 Proxy
    • Vault
    • VictoriaMetrics
  • Log, Monitor & Tracing
    • DataDog
    • ELK
      • Kibana
      • Logstash
    • Fluent
    • Grafana
    • Prometheus
  • Uncategorized
  • Admin

Copyright © 2025 NimTechnology.