Skip to content

NimTechnology

Trình bày các công nghệ CLOUD một cách dễ hiểu.

  • Kubernetes & Container
    • Docker
    • Kubernetes
      • Ingress
      • Pod
    • Helm Chart
    • OAuth2 Proxy
    • Isito-EnvoyFilter
    • Apache Kafka
      • Kafka
      • Kafka Connect
      • Lenses
    • Vault
    • Longhorn – Storage
    • VictoriaMetrics
    • MetalLB
    • Kong Gateway
  • CI/CD
    • ArgoCD
    • ArgoWorkflows
    • Argo Events
    • Spinnaker
    • Jenkins
    • Harbor
    • TeamCity
    • Git
      • Bitbucket
  • Coding
    • DevSecOps
    • Terraform
      • GCP – Google Cloud
      • AWS – Amazon Web Service
      • Azure Cloud
    • Golang
    • Laravel
    • Python
    • Jquery & JavaScript
    • Selenium
  • Log, Monitor & Tracing
    • DataDog
    • Prometheus
    • Grafana
    • ELK
      • Kibana
      • Logstash
  • BareMetal
    • NextCloud
  • Toggle search form

[Golang] Implement Job Queue inside GOlang.

Posted on December 31, 2023December 31, 2023 By nim No Comments on [Golang] Implement Job Queue inside GOlang.

Bạn nên xem video này:

package main

import (
	"fmt"
	"time"
)

type Job interface {
	Process()// có 1 hàm là process
}

type Worker struct {
	// định danh
	WorkerId int
	Done chan bool //là true hoặc false báo hiện worker chạy xong hay chưa?
	JobRunning chan Job // worker nó running cài job nào
}

// Đầu tiên cần new 1 worker.
func NewWorker(workerID int, jobChan chan Job) *Worker {
	return &Worker{
		WorkerId:   workerID,
		Done:       make(chan bool),//khởi tạo 1 chanel
		JobRunning: jobChan,
	}
}

func (w *Worker) Run() {
	fmt.Println("Run worker id", w.WorkerId)
	// run dướng dạng concurrency
	go func() {// khởi tạo goroutine
		for {// vòng lặp vô tân để bắt các sự kiện
			select {
				case job := <- w.JobRunning: // nhận tín hiệu job running
					fmt.Println("Job running", w.WorkerId)
					job.Process()//thực hiện Process Job
					// đối với cài job logic nó như thế nào thì nó tự định nghĩa process bên trong như thế.

				case <- w.Done:// nhận được tín hiện done.
					fmt.Println("Stop worker", w.WorkerId)
					return
			}
		}
	}()
}

func (w *Worker) StopWorker() {
	w.Done <- true
}

type JobQueue struct {
	// chứa các worker bên trong.
	Worker []*Worker
	JobRunning chan Job
	Done chan bool
}

// Queue sẽ sử dụng bao nhiêu cài worker
func NewJobQueue(numOfWorker int) JobQueue {
	//khởi tọa worker.
	// khởi slice []*Worker, leng worker và capacity worker.
	workers := make([]*Worker, numOfWorker, numOfWorker)
	jobRunning := make(chan Job)//jobRunning là 1 thành phần của JobQueue
	// sau đó cho thằng jobRunning đi vào Worker
	//jobRunning như biến toàn cục để liên hệ JobQueue và Worker

	for i := 0; i < numOfWorker; i++ {
		workers[i] = NewWorker(i, jobRunning)
	}
	return JobQueue{
		Worker:     workers,
		JobRunning: jobRunning,
		Done:       make(chan bool),
	}
}

// pointer receiver
func (jq *JobQueue) Push(job Job) {
	jq.JobRunning <- job// đẩy Job vào chanel JobRunning
}

func (jq *JobQueue) Stop() {
	jq.Done <- true
}

func (jq *JobQueue) Start() {
	// lấy từng con worker ra chạy
	go func() {// chúng ta đưa chúng vào goroutine
		for i := 0; i < len(jq.Worker); i++ {
			jq.Worker[i].Run()// lấy từng worker ra
		}
	}()
	// các goroutine nó chạy đọc lập ko cần đợi lần lượt.

	go func() {// chúng ta đưa chúng vào goroutine
		for {// vòng for vo tận để lắng nghe queue khi nào nó done
			select {
			case <- jq.Done:// khi có tín hiệu done
				for i := 0; i < len(jq.Worker); i++ {
					jq.Worker[i].StopWorker()
				}
				return
			}
		}
	}()
}

type Sender struct {// coi cài này là 1 Job chúng ta cần implement phương thức của interface Job
	Email string
}

func (s Sender) Process() {
	//send mail
	fmt.Println(s.Email)
}

func main()  {
	emails := []string {// đây là slice
		"a@gmail.com",
		"b@gmail.com",
		"c@gmail.com",
		"d@gmail.com",
		"e@gmail.com",
	}

	JobQueue := NewJobQueue(100)// bốn thằng worker
	JobQueue.Start()

	// thao tác là loop qua từng email
	for _, email := range emails {
		sender := Sender{Email: email} // this is a job
		JobQueue.Push(sender)
	}

	//sau bao nhiêu giây thì sẽ làm cái j?
	time.AfterFunc(time.Second*2, func() {
		JobQueue.Stop()
	})

	//sau 6s kết thúc goroutine

	time.Sleep(time.Second * 6)
}

Đầu tiên chúng ta cần tạo ra 1 interface to implement process

type Job interface {
	Process()// có 1 hàm là process
}

Ở đây sẽ là interface bới vì chúng ta muốn linh hoạt trong các implement một job chứ không fix vào 1 case cụ thể nào cả.

Tiếp theo chúng ta cần phải define ra worker.

type Worker struct {
	// định danh
	WorkerId int
	Done chan bool //là true hoặc false báo hiện worker chạy xong hay chưa?
	JobRunning chan Job // worker nó running cài job nào
}
  1. WorkerId (int):
    • Purpose: This is like an identification number for each worker. Just like in a company where each employee has a unique ID, WorkerId helps in identifying which worker is which.
    • Type: It’s an int, a basic data type in Go, used for storing whole numbers.
  2. Done (chan bool):
    • Purpose: This channel is used to signal whether the worker has completed its current job or not. Think of it like a report or notification system.
    • Type: chan bool is a channel that transmits boolean values (true or false). In concurrent programming, channels are used for communication between different go routines (like threads).
    • Usage: If Done receives true, it means the worker is finished with its job. This can be used to safely stop the worker or to notify other parts of the system about the worker’s status.
  3. JobRunning (chan Job):
    • Purpose: This channel is used to assign a new job to the worker. It’s like a task list or a queue where the worker picks up the next job to process.
    • Type: chan Job means it’s a channel through which Job objects are passed. Remember, Job is an interface, so any type that implements the Job interface can be passed through this channel.
    • Usage: The worker listens to this channel, and whenever a new job is sent to this channel, the worker picks it up and starts processing it.

Tiếp theo chúng ta cần tạo 1 số phương thức cho worker.

// Đầu tiên cần new 1 worker.
func NewWorker(workerID int, jobChan chan Job) *Worker {
	return &Worker{
		WorkerId:   workerID,
		Done:       make(chan bool),//khởi tạo 1 chanel
		JobRunning: jobChan,
	}
}

Creating New Workers: It allows you to easily create new workers with a unique ID and a channel to receive jobs.

  1. Function Signature:
    • func NewWorker(workerID int, jobChan chan Job) *Worker: This means NewWorker is a function that takes two parameters:
      • workerID int: An integer to set the WorkerId of the new worker.
      • jobChan chan Job: A channel that will be used to send jobs to the worker. This channel is of type chan Job, meaning it can send and receive values that are of the Job interface type.
    • The function returns a pointer to a Worker (*Worker).
  2. Function Body:
    • return &Worker{...}: This line creates a new Worker struct with the given parameters and returns a pointer to it.
    • Inside the Worker struct initialization:
      • WorkerId: workerID: Sets the WorkerId of the new worker to the workerID parameter passed to the function.
      • Done: make(chan bool): Initializes the Done channel as a new channel for transmitting boolean values. This channel is used to signal whether the worker has completed its job.
      • JobRunning: jobChan: Sets the JobRunning channel of the worker to the jobChan parameter passed to the function. This channel is used to receive jobs that the worker needs to process.

Tiếp theo là run worker:

func (w *Worker) Run() {
	fmt.Println("Run worker id", w.WorkerId)
	// run dướng dạng concurrency
	go func() {// khởi tạo goroutine
		for {// vòng lặp vô tân để bắt các sự kiện
			select {
				case job := <- w.JobRunning: // nhận tín hiệu job running
					fmt.Println("Job running", w.WorkerId)
					job.Process()//thực hiện Process Job
					// đối với cài job logic nó như thế nào thì nó tự định nghĩa process bên trong như thế.

				case <- w.Done:// nhận được tín hiện done.
					fmt.Println("Stop worker", w.WorkerId)
					return
			}
		}
	}()
}
  1. Method Signature:
    • func (w *Worker) Run() { ... }: This is a method of the Worker struct. The (w *Worker) part indicates that this method is associated with the Worker struct and w is a pointer to a Worker instance. This means the method will have access to the data and channels (Done and JobRunning) in the Worker struct.
  2. Starting the Worker:
    • fmt.Println("Run worker id", w.WorkerId): When the Run method is called, it first prints a message indicating that the worker with the specific WorkerId is starting to run.
  3. Concurrency with Goroutines:
    • go func() { ... }(): This line starts a new goroutine, which is a lightweight thread managed by the Go runtime. Goroutines allow concurrent operations – in this case, enabling each worker to run independently and handle jobs concurrently.
  4. Infinite Loop for Job Processing:
    • for { ... }: Inside the goroutine, there is an infinite loop. This loop keeps running, waiting for jobs to process or a signal to stop the worker.
  5. Select Statement for Handling Channels:
    • The select statement is used to wait on multiple channel operations. It blocks until one of its cases can run, then it executes that case.
    • case job := <- w.JobRunning:: This case waits for a job to be received on the JobRunning channel. When a job is received, it prints a message indicating the worker is running a job and then calls the Process() method of the job. The Process() method is where the actual job logic is executed.
    • case <- w.Done:: This case waits for a signal on the Done channel. If a true is received on this channel, it prints a message indicating the worker is stopping and then exits the loop (and consequently the goroutine), effectively stopping the worker.

GIờ chúng ta cần định nghía Stop worker:

func (w *Worker) StopWorker() {
	w.Done <- true
}

Tiếp đến là JobQueue sẽ chứa nhiều Worker.
control bao nhiều worker,

type JobQueue struct {
	// chứa các worker bên trong.
	Worker []*Worker
	JobRunning chan Job
	Done chan bool
}
  1. Worker ([]*Worker):
    • Purpose: This is a slice of pointers to Worker instances. It holds a list of workers that are available to process jobs. Each worker in this slice is capable of running jobs concurrently.
    • Type: []*Worker is a slice of Worker pointers. This means it can dynamically grow and shrink as needed and each element of the slice is a reference to a Worker instance.
  2. JobRunning (chan Job):
    • Purpose: This channel is used to send jobs to the workers. Jobs sent to this channel are picked up by available workers for processing.
    • Type: chan Job is a channel that can send and receive values that are of the Job interface type. This channel acts as a medium through which jobs are distributed to the workers.
  3. Done (chan bool):
    • Purpose: This channel is used to signal when the job queue should stop processing. It can be used to gracefully shut down the job queue and all its workers.
    • Type: chan bool is a channel for transmitting boolean values (true or false). When a true is sent to this channel, it indicates that the job queue should stop its operations.

Contents

Toggle
  • NewJobQueue
    • Purpose of the NewJobQueue Function:
  • Push and Stop the jobs to JobQueue
      • 1. Starting Workers:
      • 2. Listening for a Stop Signal:
    • The Sender Struct and Process Method:
    • The main Function:
    • Summary:

NewJobQueue

// Queue sẽ sử dụng bao nhiêu cài worker
func NewJobQueue(numOfWorker int) JobQueue {
	//khởi tọa worker.
	// khởi slice []*Worker, leng worker và capacity worker.
	workers := make([]*Worker, numOfWorker, numOfWorker)
	jobRunning := make(chan Job)//jobRunning là 1 thành phần của JobQueue
	// sau đó cho thằng jobRunning đi vào Worker
	//jobRunning như biến toàn cục để liên hệ JobQueue và Worker

	for i := 0; i < numOfWorker; i++ {
		workers[i] = NewWorker(i, jobRunning)
	}
	return JobQueue{
		Worker:     workers,
		JobRunning: jobRunning,
		Done:       make(chan bool),
	}
}

The NewJobQueue function in your Go code is another constructor function, similar to NewWorker. Its purpose is to create and initialize a new JobQueue struct with a specified number of workers and the necessary channels for job processing and control. Let’s break it down for easier understanding:

  1. Function Signature:
    • func NewJobQueue(numOfWorker int) JobQueue: This function takes one parameter, numOfWorker, which is an integer specifying how many workers should be in the job queue. It returns an instance of the JobQueue struct.
  2. Initializing Workers:
    • workers := make([]*Worker, numOfWorker, numOfWorker): This line creates a slice of worker pointers with a length and capacity equal to numOfWorker. This slice will hold the workers for the job queue.
  3. Creating the JobRunning Channel:
    • jobRunning := make(chan Job): Here, a new channel of type chan Job is created. This channel is used to distribute jobs to the workers.
  4. Creating and Assigning Workers:
    • The for loop (for i := 0; i < numOfWorker; i++ { ... }) iterates from 0 to numOfWorker - 1.
    • In each iteration, workers[i] = NewWorker(i, jobRunning) creates a new worker with a unique ID (i) and the same jobRunning channel. This means all workers will receive jobs from the same channel, ensuring job distribution among them.
  5. Returning a New JobQueue:
    • return JobQueue{...}: Finally, a JobQueue struct is returned with the initialized workers, the JobRunning channel, and a new Done channel (make(chan bool)). The Done channel is used to signal when the job queue should stop processing.

Purpose of the NewJobQueue Function:

  • Setting Up the Queue: It sets up a job queue with a specified number of workers, ready to process jobs.
  • Sharing Job Channel: All workers share the same JobRunning channel, which is essential for evenly distributing jobs among them.
  • Flexibility in Scaling: You can easily adjust the number of workers in the job queue by changing the numOfWorker parameter.
  • Centralized Control: The Done channel provides a centralized way to stop all workers and the job processing in the queue.

In simple terms, NewJobQueue is like setting up a work team and a task list. You decide how many team members you want (numOfWorker) and set up a system where tasks (jobs) can be assigned to them. Once set up, the team is ready to start working on the tasks assigned to them through a shared task list (JobRunning channel).

Push and Stop the jobs to JobQueue

// pointer receiver
func (jq *JobQueue) Push(job Job) {
	jq.JobRunning <- job// đẩy Job vào chanel JobRunning
}

func (jq *JobQueue) Stop() {
	jq.Done <- true
}
  • Push Method: It’s used for adding new jobs to the queue. It’s like telling the job queue, “Here’s a new task; please have one of the workers handle it.”
  • Stop Method: It’s used to signal that the job queue and all its workers should stop their operations. It’s like saying, “Work is over; everyone can stop now.”

func (jq *JobQueue) Start() {
	// lấy từng con worker ra chạy
	go func() {// chúng ta đưa chúng vào goroutine
		for i := 0; i < len(jq.Worker); i++ {
			jq.Worker[i].Run()// lấy từng worker ra
		}
	}()
	// các goroutine nó chạy đọc lập ko cần đợi lần lượt.

	go func() {// chúng ta đưa chúng vào goroutine
		for {// vòng for vo tận để lắng nghe queue khi nào nó done
			select {
			case <- jq.Done:// khi có tín hiệu done
				for i := 0; i < len(jq.Worker); i++ {
					jq.Worker[i].StopWorker()
				}
				return
			}
		}
	}()
}

1. Starting Workers:

  • Method Signature:
    • func (jq *JobQueue) Start() { ... }: This method is associated with the JobQueue struct and uses a pointer receiver, meaning it can modify the state of the JobQueue instance it’s called on.
  • Starting All Workers:
    • go func() { for i := 0; i < len(jq.Worker); i++ { jq.Worker[i].Run() } }(): This section starts a new goroutine (a lightweight thread). Within this goroutine, there’s a loop that iterates over all the workers in the JobQueue.
    • jq.Worker[i].Run(): For each worker in the queue, the Run method is called. This starts each worker in a separate goroutine, enabling them to operate concurrently. Each worker starts listening for jobs on the JobRunning channel and processes them as they come in.

2. Listening for a Stop Signal:

  • Monitoring for Stop Command:
    • go func() { ... for { select { case <- jq.Done: ... } } }(): Another goroutine is started here. This one continuously listens for a signal on the Done channel.
    • select { case <- jq.Done: ... }: The select statement is used to wait for a signal on the Done channel. When true is received on this channel, it indicates that the job queue should stop.
    • for i := 0; i < len(jq.Worker); i++ { jq.Worker[i].StopWorker() }: Once a stop signal is received, this loop iterates over all workers and calls the StopWorker method on each. This method sends a signal to each worker to stop processing.

Tiếp đến là phần chúng ta implement process mà chúng ta muốn:

type Sender struct {// coi cài này là 1 Job chúng ta cần implement phương thức của interface Job
	Email string
}

func (s Sender) Process() {
	//send mail
	fmt.Println(s.Email)
}

func main()  {
	emails := []string {// đây là slice
		"a@gmail.com",
		"b@gmail.com",
		"c@gmail.com",
		"d@gmail.com",
		"e@gmail.com",
	}

	JobQueue := NewJobQueue(100)// bốn thằng worker
	JobQueue.Start()

	// thao tác là loop qua từng email
	for _, email := range emails {
		sender := Sender{Email: email} // this is a job
		JobQueue.Push(sender)
	}

	//sau bao nhiêu giây thì sẽ làm cái j?
	time.AfterFunc(time.Second*2, func() {
		JobQueue.Stop()
	})

	//sau 6s kết thúc goroutine

	time.Sleep(time.Second * 6)
}

The Sender Struct and Process Method:

  1. Sender Struct:
    • type Sender struct { Email string }: This struct represents a job type. It has one field, Email, which presumably stores an email address.
    • The comment indicates that Sender is intended to be considered a Job, meaning it should implement the Job interface.
  2. Process Method:
    • func (s Sender) Process() { fmt.Println(s.Email) }: This method is the implementation of the Process method required by the Job interface.
    • In this case, the Process method simply prints the email address. In a real-world scenario, this might be where you put code to actually send an email.

The main Function:

  1. Creating a Job Queue:
    • JobQueue := NewJobQueue(100): Here, a new job queue is created with 100 workers. This queue is responsible for managing and processing jobs.
    • JobQueue.Start(): This starts all the workers and readies the job queue to process jobs.
  2. Queueing Jobs:
    • The loop for _, email := range emails { ... } iterates through a slice of email addresses.
    • Inside the loop, sender := Sender{Email: email} creates a Sender instance (a job) for each email address.
    • JobQueue.Push(sender): Each Sender instance is then added to the job queue for processing.
  3. Stopping the Job Queue:
    • time.AfterFunc(time.Second*2, func() { JobQueue.Stop() }): This schedules the job queue to stop after 2 seconds. time.AfterFunc waits for the specified duration (2 seconds), then calls the provided function, which sends a signal to stop the job queue.
  4. Ending the Program:
    • time.Sleep(time.Second * 6): This line causes the main function to wait for 6 seconds before exiting. This wait time ensures that you can observe the processing of jobs for a certain duration before the program terminates.

Summary:

  • The Sender struct represents a simple job that, when processed, prints an email address. In a practical application, this could be replaced with code to actually perform a task like sending an email.
  • The main function demonstrates how to create and use a job queue with multiple workers. It shows adding jobs to the queue, starting the job processing, and then stopping the queue after a set duration.
  • This code is a basic example of concurrent job processing in Go, where multiple workers can process different jobs (in this case, “sending” emails) simultaneously.
Golang

Post navigation

Previous Post: [Golang] Mastering File Handling in Go: Download, Extract, and Analyze ZIP Archives
Next Post: [Golang] Use Cache to consume messages in Kafka with GO

More Related Articles

[Golang] Zap log in Golang Golang
[Golang] Lưu log trên server với golang Golang
[Golang] Convert this data type to another data type in Golang Golang
[Golang] List the files in a directory with Go Golang
[Golang] Mastering File Handling in Go: Download, Extract, and Analyze ZIP Archives Golang
[Selenium] Save cookies of any website that is running on Selenium. Golang

Leave a Reply Cancel reply

Your email address will not be published. Required fields are marked *

Tham Gia Group DevOps nhé!
Để Nim có nhiều động lực ra nhiều bài viết.
Để nhận được những thông báo mới nhất.

Recent Posts

  • [Laravel] Laravel Helpful June 26, 2025
  • [VScode] Hướng dẫn điều chỉnh font cho terminal June 20, 2025
  • [WordPress] Hướng dấn gửi mail trên WordPress thông qua gmail. June 15, 2025
  • [Bitbucket] Git Clone/Pull/Push with Bitbucket through API Token. June 12, 2025
  • [Teamcity] How to transfer the value from pipeline A to pipeline B June 9, 2025

Archives

  • June 2025
  • May 2025
  • April 2025
  • March 2025
  • February 2025
  • January 2025
  • December 2024
  • November 2024
  • October 2024
  • September 2024
  • August 2024
  • July 2024
  • June 2024
  • May 2024
  • April 2024
  • March 2024
  • February 2024
  • January 2024
  • December 2023
  • November 2023
  • October 2023
  • September 2023
  • August 2023
  • July 2023
  • June 2023
  • May 2023
  • April 2023
  • March 2023
  • February 2023
  • January 2023
  • December 2022
  • November 2022
  • October 2022
  • September 2022
  • August 2022
  • July 2022
  • June 2022
  • May 2022
  • April 2022
  • March 2022
  • February 2022
  • January 2022
  • December 2021
  • November 2021
  • October 2021
  • September 2021
  • August 2021
  • July 2021
  • June 2021

Categories

  • BareMetal
    • NextCloud
  • CI/CD
    • Argo Events
    • ArgoCD
    • ArgoWorkflows
    • Git
      • Bitbucket
    • Harbor
    • Jenkins
    • Spinnaker
    • TeamCity
  • Coding
    • DevSecOps
    • Golang
    • Jquery & JavaScript
    • Laravel
    • NextJS 14 & ReactJS & Type Script
    • Python
    • Selenium
    • Terraform
      • AWS – Amazon Web Service
      • Azure Cloud
      • GCP – Google Cloud
  • Kubernetes & Container
    • Apache Kafka
      • Kafka
      • Kafka Connect
      • Lenses
    • Docker
    • Helm Chart
    • Isito-EnvoyFilter
    • Kong Gateway
    • Kubernetes
      • Ingress
      • Pod
    • Longhorn – Storage
    • MetalLB
    • OAuth2 Proxy
    • Vault
    • VictoriaMetrics
  • Log, Monitor & Tracing
    • DataDog
    • ELK
      • Kibana
      • Logstash
    • Fluent
    • Grafana
    • Prometheus
  • Uncategorized
  • Admin

Copyright © 2025 NimTechnology.