Skip to content

NimTechnology

Trình bày các công nghệ CLOUD một cách dễ hiểu.

  • Kubernetes & Container
    • Docker
    • Kubernetes
      • Ingress
      • Pod
    • Helm Chart
    • OAuth2 Proxy
    • Isito-EnvoyFilter
    • Apache Kafka
      • Kafka
      • Kafka Connect
      • Lenses
    • Vault
    • Longhorn – Storage
    • VictoriaMetrics
    • MetalLB
    • Kong Gateway
  • CI/CD
    • ArgoCD
    • ArgoWorkflows
    • Argo Events
    • Spinnaker
    • Jenkins
    • Harbor
    • TeamCity
    • Git
      • Bitbucket
  • Coding
    • DevSecOps
    • Terraform
      • GCP – Google Cloud
      • AWS – Amazon Web Service
      • Azure Cloud
    • Golang
    • Laravel
    • Python
    • Jquery & JavaScript
    • Selenium
  • Log, Monitor & Tracing
    • DataDog
    • Prometheus
    • Grafana
    • ELK
      • Kibana
      • Logstash
  • BareMetal
    • NextCloud
  • Toggle search form

[Golang] Use Cache to consume messages in Kafka with GO

Posted on January 1, 2024January 1, 2024 By nim No Comments on [Golang] Use Cache to consume messages in Kafka with GO

To receive results in the main function from StartKafkaConsumer, you can use a Go channel. Channels are a powerful feature in Go for communication between goroutines. Here’s how you can modify your program:

Contents

Toggle
  • 1. Modify StartKafkaConsumer to Use Channels
  • 2. Handling Messages in main
  • 3. Considerations for Production Use
  • 1. Modify StartKafkaConsumer to Send Errors to a Channel
  • 2. Handle Messages and Errors in main
  • 3. Considerations

1. Modify StartKafkaConsumer to Use Channels

Update the StartKafkaConsumer function to send Kafka messages through a channel. You’ll also need to pass the channel as a parameter to this function.

package main

import (
    "fmt"
    "github.com/confluentinc/confluent-kafka-go/v2/kafka"
)

// Message represents a Kafka message
type Message struct {
    Topic     string
    Partition int32
    Offset    int64
    Value     []byte
}

// StartKafkaConsumer starts a Kafka consumer and sends messages and errors to channels
func StartKafkaConsumer(brokers string, groupID string, topics []string, messages chan<- Message, errors chan<- error) {
    config := &kafka.ConfigMap{
        "bootstrap.servers": brokers,
        "group.id":          groupID,
        "auto.offset.reset": "latest",
        "enable.auto.commit": false,
    }

    consumer, err := kafka.NewConsumer(config)
    if err != nil {
        errors <- err
        return
    }
    defer consumer.Close()

    err = consumer.SubscribeTopics(topics, nil)
    if err != nil {
        errors <- err
        return
    }

    for {
        msg, err := consumer.ReadMessage(-1)
        if err != nil {
            errors <- err
            continue
        }
        messages <- Message{
            Topic:     *msg.TopicPartition.Topic,
            Partition: msg.TopicPartition.Partition,
            Offset:    int64(msg.TopicPartition.Offset),
            Value:     msg.Value,
        }
    }
}

func main() {
    brokers := "localhost:9092"
    groupID := "my-group"
    topics := []string{"your-topic"}

    // Create channels for messages and errors
    messages := make(chan Message)
    errors := make(chan error)

    // Start Kafka consumer in a new goroutine
    go StartKafkaConsumer(brokers, groupID, topics, messages, errors)

    // Process messages and errors
    for {
        select {
        case message := <-messages:
            fmt.Printf("Received message: Topic=%s, Partition=%d, Offset=%d, Value=%s\n",
                message.Topic, message.Partition, message.Offset, string(message.Value))
        case err := <-errors:
            fmt.Println("Error:", err)
            // Optional: Add logic to handle specific errors, such as exiting the loop
        }
    }
}

2. Handling Messages in main

In the main function, the program reads messages from the messages channel and processes them. You can modify the processing logic as per your application’s needs.

3. Considerations for Production Use

  • Error Handling: More robust error handling might be necessary, especially for production use.
  • Channel Blocking: Be aware that sending messages to a channel is blocking. If the channel buffer is full, it will block the Kafka consumer. You might want to use buffered channels or implement other concurrency patterns.
  • Graceful Shutdown: Implement a way to gracefully shut down the Kafka consumer, such as listening for a termination signal and closing the consumer and channel properly.
  • Consumer Lifecycle Management: Consider how you’ll manage the lifecycle of the consumer, especially if your application needs to start and stop the consumer dynamically.

With this setup, your main function now receives messages directly from the Kafka consumer via a channel, providing a clean and efficient way to process streaming data.

To receive errors from the StartKafkaConsumer function in the main function, you can use another channel specifically for errors. This approach allows you to handle both messages and errors separately in your main function. Here’s how you can implement it:

1. Modify StartKafkaConsumer to Send Errors to a Channel

Update the StartKafkaConsumer function to include an error channel as a parameter and send any errors to this channel.

package main

import (
    "fmt"
    "github.com/confluentinc/confluent-kafka-go/v2/kafka"
)

// Message represents a Kafka message
type Message struct {
    Topic     string
    Partition int32
    Offset    int64
    Value     []byte
}

// StartKafkaConsumer starts a Kafka consumer and sends messages and errors to channels
func StartKafkaConsumer(brokers string, groupID string, topics []string, messages chan<- Message, errors chan<- error) {
    config := &kafka.ConfigMap{
        "bootstrap.servers": brokers,
        "group.id":          groupID,
        "auto.offset.reset": "latest",
        "enable.auto.commit": false,
    }

    consumer, err := kafka.NewConsumer(config)
    if err != nil {
        errors <- err
        return
    }
    defer consumer.Close()

    err = consumer.SubscribeTopics(topics, nil)
    if err != nil {
        errors <- err
        return
    }

    for {
        msg, err := consumer.ReadMessage(-1)
        if err != nil {
            errors <- err
            continue
        }
        messages <- Message{
            Topic:     *msg.TopicPartition.Topic,
            Partition: msg.TopicPartition.Partition,
            Offset:    int64(msg.TopicPartition.Offset),
            Value:     msg.Value,
        }
    }
}

func main() {
    brokers := "localhost:9092"
    groupID := "my-group"
    topics := []string{"your-topic"}

    // Create channels for messages and errors
    messages := make(chan Message)
    errors := make(chan error)

    // Start Kafka consumer in a new goroutine
    go StartKafkaConsumer(brokers, groupID, topics, messages, errors)

    // Process messages and errors
    for {
        select {
        case message := <-messages:
            fmt.Printf("Received message: Topic=%s, Partition=%d, Offset=%d, Value=%s\n",
                message.Topic, message.Partition, message.Offset, string(message.Value))
        case err := <-errors:
            fmt.Println("Error:", err)
            // Optional: Add logic to handle specific errors, such as exiting the loop
        }
    }
}

2. Handle Messages and Errors in main

In the main function, use a select statement to handle messages and errors coming from their respective channels. This allows you to process messages and handle errors concurrently.

3. Considerations

  • Error Handling: In a real-world application, you would likely want to implement more sophisticated error handling. For example, you might want to retry on certain errors or clean up resources before exiting.
  • Graceful Shutdown: Consider how to gracefully stop the consumer, especially in response to certain types of errors or a system shutdown signal.
  • Channel Blocking: If the errors channel is not read from quickly enough and its buffer is full (if you use a buffered channel), it could block the Kafka consumer loop. Plan accordingly based on your application’s needs.

With this setup, your main function is now capable of receiving and handling both messages and errors from the Kafka consumer, providing a robust and flexible structure for your application.

Golang

Post navigation

Previous Post: [Golang] Implement Job Queue inside GOlang.
Next Post: [Golang] In-Memory Cache or Local Cache with Golang.

More Related Articles

[Golang] Checking a string inside or in another string by Golang Golang
[issue/alpine] docker 20.10.2 -> golang:1-alpine3.14 error: make: go: Operation not permitted Docker
[Golang] Create multiple CRON functions by looping through a list Golang
[Golang/Swagger] Apply Swagger to describe the information API on Golang Golang
[Golang] Return Error in Golang Golang
[Golang] List the files in a directory with Go Golang

Leave a Reply Cancel reply

Your email address will not be published. Required fields are marked *

Tham Gia Group DevOps nhé!
Để Nim có nhiều động lực ra nhiều bài viết.
Để nhận được những thông báo mới nhất.

Recent Posts

  • [Upload File] How to upload large files and download them using curl. July 23, 2025
  • [Argo Workflow] Create an access token for Argo Workflows July 14, 2025
  • [Argo Workflow] SSO Authentication for Argo Workflows. July 14, 2025
  • [AWS/EKS] Cache Docker image to accelerate EKS container deployment. July 10, 2025
  • [Laravel] Laravel Helpful June 26, 2025

Archives

  • July 2025
  • June 2025
  • May 2025
  • April 2025
  • March 2025
  • February 2025
  • January 2025
  • December 2024
  • November 2024
  • October 2024
  • September 2024
  • August 2024
  • July 2024
  • June 2024
  • May 2024
  • April 2024
  • March 2024
  • February 2024
  • January 2024
  • December 2023
  • November 2023
  • October 2023
  • September 2023
  • August 2023
  • July 2023
  • June 2023
  • May 2023
  • April 2023
  • March 2023
  • February 2023
  • January 2023
  • December 2022
  • November 2022
  • October 2022
  • September 2022
  • August 2022
  • July 2022
  • June 2022
  • May 2022
  • April 2022
  • March 2022
  • February 2022
  • January 2022
  • December 2021
  • November 2021
  • October 2021
  • September 2021
  • August 2021
  • July 2021
  • June 2021

Categories

  • BareMetal
    • NextCloud
  • CI/CD
    • Argo Events
    • ArgoCD
    • ArgoWorkflows
    • Git
      • Bitbucket
    • Harbor
    • Jenkins
    • Spinnaker
    • TeamCity
  • Coding
    • DevSecOps
    • Golang
    • Jquery & JavaScript
    • Laravel
    • NextJS 14 & ReactJS & Type Script
    • Python
    • Selenium
    • Terraform
      • AWS – Amazon Web Service
      • Azure Cloud
      • GCP – Google Cloud
  • Kubernetes & Container
    • Apache Kafka
      • Kafka
      • Kafka Connect
      • Lenses
    • Docker
    • Helm Chart
    • Isito-EnvoyFilter
    • Kong Gateway
    • Kubernetes
      • Ingress
      • Pod
    • Longhorn – Storage
    • MetalLB
    • OAuth2 Proxy
    • Vault
    • VictoriaMetrics
  • Log, Monitor & Tracing
    • DataDog
    • ELK
      • Kibana
      • Logstash
    • Fluent
    • Grafana
    • Prometheus
  • Uncategorized
  • Admin

Copyright © 2025 NimTechnology.