Bạn nên xem video này:
package main
import (
"fmt"
"time"
)
type Job interface {
Process()// có 1 hàm là process
}
type Worker struct {
// định danh
WorkerId int
Done chan bool //là true hoặc false báo hiện worker chạy xong hay chưa?
JobRunning chan Job // worker nó running cài job nào
}
// Đầu tiên cần new 1 worker.
func NewWorker(workerID int, jobChan chan Job) *Worker {
return &Worker{
WorkerId: workerID,
Done: make(chan bool),//khởi tạo 1 chanel
JobRunning: jobChan,
}
}
func (w *Worker) Run() {
fmt.Println("Run worker id", w.WorkerId)
// run dướng dạng concurrency
go func() {// khởi tạo goroutine
for {// vòng lặp vô tân để bắt các sự kiện
select {
case job := <- w.JobRunning: // nhận tín hiệu job running
fmt.Println("Job running", w.WorkerId)
job.Process()//thực hiện Process Job
// đối với cài job logic nó như thế nào thì nó tự định nghĩa process bên trong như thế.
case <- w.Done:// nhận được tín hiện done.
fmt.Println("Stop worker", w.WorkerId)
return
}
}
}()
}
func (w *Worker) StopWorker() {
w.Done <- true
}
type JobQueue struct {
// chứa các worker bên trong.
Worker []*Worker
JobRunning chan Job
Done chan bool
}
// Queue sẽ sử dụng bao nhiêu cài worker
func NewJobQueue(numOfWorker int) JobQueue {
//khởi tọa worker.
// khởi slice []*Worker, leng worker và capacity worker.
workers := make([]*Worker, numOfWorker, numOfWorker)
jobRunning := make(chan Job)//jobRunning là 1 thành phần của JobQueue
// sau đó cho thằng jobRunning đi vào Worker
//jobRunning như biến toàn cục để liên hệ JobQueue và Worker
for i := 0; i < numOfWorker; i++ {
workers[i] = NewWorker(i, jobRunning)
}
return JobQueue{
Worker: workers,
JobRunning: jobRunning,
Done: make(chan bool),
}
}
// pointer receiver
func (jq *JobQueue) Push(job Job) {
jq.JobRunning <- job// đẩy Job vào chanel JobRunning
}
func (jq *JobQueue) Stop() {
jq.Done <- true
}
func (jq *JobQueue) Start() {
// lấy từng con worker ra chạy
go func() {// chúng ta đưa chúng vào goroutine
for i := 0; i < len(jq.Worker); i++ {
jq.Worker[i].Run()// lấy từng worker ra
}
}()
// các goroutine nó chạy đọc lập ko cần đợi lần lượt.
go func() {// chúng ta đưa chúng vào goroutine
for {// vòng for vo tận để lắng nghe queue khi nào nó done
select {
case <- jq.Done:// khi có tín hiệu done
for i := 0; i < len(jq.Worker); i++ {
jq.Worker[i].StopWorker()
}
return
}
}
}()
}
type Sender struct {// coi cài này là 1 Job chúng ta cần implement phương thức của interface Job
Email string
}
func (s Sender) Process() {
//send mail
fmt.Println(s.Email)
}
func main() {
emails := []string {// đây là slice
"a@gmail.com",
"b@gmail.com",
"c@gmail.com",
"d@gmail.com",
"e@gmail.com",
}
JobQueue := NewJobQueue(100)// bốn thằng worker
JobQueue.Start()
// thao tác là loop qua từng email
for _, email := range emails {
sender := Sender{Email: email} // this is a job
JobQueue.Push(sender)
}
//sau bao nhiêu giây thì sẽ làm cái j?
time.AfterFunc(time.Second*2, func() {
JobQueue.Stop()
})
//sau 6s kết thúc goroutine
time.Sleep(time.Second * 6)
}
Đầu tiên chúng ta cần tạo ra 1 interface to implement process
type Job interface {
Process()// có 1 hàm là process
}
Ở đây sẽ là interface bới vì chúng ta muốn linh hoạt trong các implement một job chứ không fix vào 1 case cụ thể nào cả.
Tiếp theo chúng ta cần phải define ra worker.
type Worker struct {
// định danh
WorkerId int
Done chan bool //là true hoặc false báo hiện worker chạy xong hay chưa?
JobRunning chan Job // worker nó running cài job nào
}
- WorkerId (int):
- Purpose: This is like an identification number for each worker. Just like in a company where each employee has a unique ID,
WorkerIdhelps in identifying which worker is which. - Type: It’s an
int, a basic data type in Go, used for storing whole numbers.
- Purpose: This is like an identification number for each worker. Just like in a company where each employee has a unique ID,
- Done (chan bool):
- Purpose: This channel is used to signal whether the worker has completed its current job or not. Think of it like a report or notification system.
- Type:
chan boolis a channel that transmits boolean values (trueorfalse). In concurrent programming, channels are used for communication between different go routines (like threads). - Usage: If
Donereceivestrue, it means the worker is finished with its job. This can be used to safely stop the worker or to notify other parts of the system about the worker’s status.
- JobRunning (chan Job):
- Purpose: This channel is used to assign a new job to the worker. It’s like a task list or a queue where the worker picks up the next job to process.
- Type:
chan Jobmeans it’s a channel through whichJobobjects are passed. Remember,Jobis an interface, so any type that implements theJobinterface can be passed through this channel. - Usage: The worker listens to this channel, and whenever a new job is sent to this channel, the worker picks it up and starts processing it.
Tiếp theo chúng ta cần tạo 1 số phương thức cho worker.
// Đầu tiên cần new 1 worker.
func NewWorker(workerID int, jobChan chan Job) *Worker {
return &Worker{
WorkerId: workerID,
Done: make(chan bool),//khởi tạo 1 chanel
JobRunning: jobChan,
}
}
Creating New Workers: It allows you to easily create new workers with a unique ID and a channel to receive jobs.
- Function Signature:
func NewWorker(workerID int, jobChan chan Job) *Worker: This meansNewWorkeris a function that takes two parameters:workerID int: An integer to set theWorkerIdof the new worker.jobChan chan Job: A channel that will be used to send jobs to the worker. This channel is of typechan Job, meaning it can send and receive values that are of theJobinterface type.
- The function returns a pointer to a
Worker(*Worker).
- Function Body:
return &Worker{...}: This line creates a newWorkerstruct with the given parameters and returns a pointer to it.- Inside the
Workerstruct initialization:WorkerId: workerID: Sets theWorkerIdof the new worker to theworkerIDparameter passed to the function.Done: make(chan bool): Initializes theDonechannel as a new channel for transmitting boolean values. This channel is used to signal whether the worker has completed its job.JobRunning: jobChan: Sets theJobRunningchannel of the worker to thejobChanparameter passed to the function. This channel is used to receive jobs that the worker needs to process.
Tiếp theo là run worker:
func (w *Worker) Run() {
fmt.Println("Run worker id", w.WorkerId)
// run dướng dạng concurrency
go func() {// khởi tạo goroutine
for {// vòng lặp vô tân để bắt các sự kiện
select {
case job := <- w.JobRunning: // nhận tín hiệu job running
fmt.Println("Job running", w.WorkerId)
job.Process()//thực hiện Process Job
// đối với cài job logic nó như thế nào thì nó tự định nghĩa process bên trong như thế.
case <- w.Done:// nhận được tín hiện done.
fmt.Println("Stop worker", w.WorkerId)
return
}
}
}()
}
- Method Signature:
func (w *Worker) Run() { ... }: This is a method of theWorkerstruct. The(w *Worker)part indicates that this method is associated with theWorkerstruct andwis a pointer to aWorkerinstance. This means the method will have access to the data and channels (DoneandJobRunning) in theWorkerstruct.
- Starting the Worker:
fmt.Println("Run worker id", w.WorkerId): When theRunmethod is called, it first prints a message indicating that the worker with the specificWorkerIdis starting to run.
- Concurrency with Goroutines:
go func() { ... }(): This line starts a new goroutine, which is a lightweight thread managed by the Go runtime. Goroutines allow concurrent operations – in this case, enabling each worker to run independently and handle jobs concurrently.
- Infinite Loop for Job Processing:
for { ... }: Inside the goroutine, there is an infinite loop. This loop keeps running, waiting for jobs to process or a signal to stop the worker.
- Select Statement for Handling Channels:
- The
selectstatement is used to wait on multiple channel operations. It blocks until one of its cases can run, then it executes that case. case job := <- w.JobRunning:: This case waits for a job to be received on theJobRunningchannel. When a job is received, it prints a message indicating the worker is running a job and then calls theProcess()method of the job. TheProcess()method is where the actual job logic is executed.case <- w.Done:: This case waits for a signal on theDonechannel. If atrueis received on this channel, it prints a message indicating the worker is stopping and then exits the loop (and consequently the goroutine), effectively stopping the worker.
- The
GIờ chúng ta cần định nghía Stop worker:
func (w *Worker) StopWorker() {
w.Done <- true
}
Tiếp đến là JobQueue sẽ chứa nhiều Worker.
control bao nhiều worker,
type JobQueue struct {
// chứa các worker bên trong.
Worker []*Worker
JobRunning chan Job
Done chan bool
}
- Worker (
[]*Worker):- Purpose: This is a slice of pointers to
Workerinstances. It holds a list of workers that are available to process jobs. Each worker in this slice is capable of running jobs concurrently. - Type:
[]*Workeris a slice ofWorkerpointers. This means it can dynamically grow and shrink as needed and each element of the slice is a reference to aWorkerinstance.
- Purpose: This is a slice of pointers to
- JobRunning (
chan Job):- Purpose: This channel is used to send jobs to the workers. Jobs sent to this channel are picked up by available workers for processing.
- Type:
chan Jobis a channel that can send and receive values that are of theJobinterface type. This channel acts as a medium through which jobs are distributed to the workers.
- Done (
chan bool):- Purpose: This channel is used to signal when the job queue should stop processing. It can be used to gracefully shut down the job queue and all its workers.
- Type:
chan boolis a channel for transmitting boolean values (trueorfalse). When atrueis sent to this channel, it indicates that the job queue should stop its operations.
NewJobQueue
// Queue sẽ sử dụng bao nhiêu cài worker
func NewJobQueue(numOfWorker int) JobQueue {
//khởi tọa worker.
// khởi slice []*Worker, leng worker và capacity worker.
workers := make([]*Worker, numOfWorker, numOfWorker)
jobRunning := make(chan Job)//jobRunning là 1 thành phần của JobQueue
// sau đó cho thằng jobRunning đi vào Worker
//jobRunning như biến toàn cục để liên hệ JobQueue và Worker
for i := 0; i < numOfWorker; i++ {
workers[i] = NewWorker(i, jobRunning)
}
return JobQueue{
Worker: workers,
JobRunning: jobRunning,
Done: make(chan bool),
}
}
The NewJobQueue function in your Go code is another constructor function, similar to NewWorker. Its purpose is to create and initialize a new JobQueue struct with a specified number of workers and the necessary channels for job processing and control. Let’s break it down for easier understanding:
- Function Signature:
func NewJobQueue(numOfWorker int) JobQueue: This function takes one parameter,numOfWorker, which is an integer specifying how many workers should be in the job queue. It returns an instance of theJobQueuestruct.
- Initializing Workers:
workers := make([]*Worker, numOfWorker, numOfWorker): This line creates a slice of worker pointers with a length and capacity equal tonumOfWorker. This slice will hold the workers for the job queue.
- Creating the
JobRunningChannel:jobRunning := make(chan Job): Here, a new channel of typechan Jobis created. This channel is used to distribute jobs to the workers.
- Creating and Assigning Workers:
- The
forloop (for i := 0; i < numOfWorker; i++ { ... }) iterates from 0 tonumOfWorker - 1. - In each iteration,
workers[i] = NewWorker(i, jobRunning)creates a new worker with a unique ID (i) and the samejobRunningchannel. This means all workers will receive jobs from the same channel, ensuring job distribution among them.
- The
- Returning a New
JobQueue:return JobQueue{...}: Finally, aJobQueuestruct is returned with the initialized workers, theJobRunningchannel, and a newDonechannel (make(chan bool)). TheDonechannel is used to signal when the job queue should stop processing.
Purpose of the NewJobQueue Function:
- Setting Up the Queue: It sets up a job queue with a specified number of workers, ready to process jobs.
- Sharing Job Channel: All workers share the same
JobRunningchannel, which is essential for evenly distributing jobs among them. - Flexibility in Scaling: You can easily adjust the number of workers in the job queue by changing the
numOfWorkerparameter. - Centralized Control: The
Donechannel provides a centralized way to stop all workers and the job processing in the queue.
In simple terms, NewJobQueue is like setting up a work team and a task list. You decide how many team members you want (numOfWorker) and set up a system where tasks (jobs) can be assigned to them. Once set up, the team is ready to start working on the tasks assigned to them through a shared task list (JobRunning channel).
Push and Stop the jobs to JobQueue
// pointer receiver
func (jq *JobQueue) Push(job Job) {
jq.JobRunning <- job// đẩy Job vào chanel JobRunning
}
func (jq *JobQueue) Stop() {
jq.Done <- true
}
PushMethod: It’s used for adding new jobs to the queue. It’s like telling the job queue, “Here’s a new task; please have one of the workers handle it.”StopMethod: It’s used to signal that the job queue and all its workers should stop their operations. It’s like saying, “Work is over; everyone can stop now.”
func (jq *JobQueue) Start() {
// lấy từng con worker ra chạy
go func() {// chúng ta đưa chúng vào goroutine
for i := 0; i < len(jq.Worker); i++ {
jq.Worker[i].Run()// lấy từng worker ra
}
}()
// các goroutine nó chạy đọc lập ko cần đợi lần lượt.
go func() {// chúng ta đưa chúng vào goroutine
for {// vòng for vo tận để lắng nghe queue khi nào nó done
select {
case <- jq.Done:// khi có tín hiệu done
for i := 0; i < len(jq.Worker); i++ {
jq.Worker[i].StopWorker()
}
return
}
}
}()
}
1. Starting Workers:
- Method Signature:
func (jq *JobQueue) Start() { ... }: This method is associated with theJobQueuestruct and uses a pointer receiver, meaning it can modify the state of theJobQueueinstance it’s called on.
- Starting All Workers:
go func() { for i := 0; i < len(jq.Worker); i++ { jq.Worker[i].Run() } }(): This section starts a new goroutine (a lightweight thread). Within this goroutine, there’s a loop that iterates over all the workers in theJobQueue.jq.Worker[i].Run(): For each worker in the queue, theRunmethod is called. This starts each worker in a separate goroutine, enabling them to operate concurrently. Each worker starts listening for jobs on theJobRunningchannel and processes them as they come in.
2. Listening for a Stop Signal:
- Monitoring for Stop Command:
go func() { ... for { select { case <- jq.Done: ... } } }(): Another goroutine is started here. This one continuously listens for a signal on theDonechannel.select { case <- jq.Done: ... }: Theselectstatement is used to wait for a signal on theDonechannel. Whentrueis received on this channel, it indicates that the job queue should stop.for i := 0; i < len(jq.Worker); i++ { jq.Worker[i].StopWorker() }: Once a stop signal is received, this loop iterates over all workers and calls theStopWorkermethod on each. This method sends a signal to each worker to stop processing.
Tiếp đến là phần chúng ta implement process mà chúng ta muốn:
type Sender struct {// coi cài này là 1 Job chúng ta cần implement phương thức của interface Job
Email string
}
func (s Sender) Process() {
//send mail
fmt.Println(s.Email)
}
func main() {
emails := []string {// đây là slice
"a@gmail.com",
"b@gmail.com",
"c@gmail.com",
"d@gmail.com",
"e@gmail.com",
}
JobQueue := NewJobQueue(100)// bốn thằng worker
JobQueue.Start()
// thao tác là loop qua từng email
for _, email := range emails {
sender := Sender{Email: email} // this is a job
JobQueue.Push(sender)
}
//sau bao nhiêu giây thì sẽ làm cái j?
time.AfterFunc(time.Second*2, func() {
JobQueue.Stop()
})
//sau 6s kết thúc goroutine
time.Sleep(time.Second * 6)
}
The Sender Struct and Process Method:
- Sender Struct:
type Sender struct { Email string }: This struct represents a job type. It has one field,Email, which presumably stores an email address.- The comment indicates that
Senderis intended to be considered aJob, meaning it should implement theJobinterface.
- Process Method:
func (s Sender) Process() { fmt.Println(s.Email) }: This method is the implementation of theProcessmethod required by theJobinterface.- In this case, the
Processmethod simply prints the email address. In a real-world scenario, this might be where you put code to actually send an email.
The main Function:
- Creating a Job Queue:
JobQueue := NewJobQueue(100): Here, a new job queue is created with 100 workers. This queue is responsible for managing and processing jobs.JobQueue.Start(): This starts all the workers and readies the job queue to process jobs.
- Queueing Jobs:
- The loop
for _, email := range emails { ... }iterates through a slice of email addresses. - Inside the loop,
sender := Sender{Email: email}creates aSenderinstance (a job) for each email address. JobQueue.Push(sender): EachSenderinstance is then added to the job queue for processing.
- The loop
- Stopping the Job Queue:
time.AfterFunc(time.Second*2, func() { JobQueue.Stop() }): This schedules the job queue to stop after 2 seconds.time.AfterFuncwaits for the specified duration (2 seconds), then calls the provided function, which sends a signal to stop the job queue.
- Ending the Program:
time.Sleep(time.Second * 6): This line causes themainfunction to wait for 6 seconds before exiting. This wait time ensures that you can observe the processing of jobs for a certain duration before the program terminates.
Summary:
- The
Senderstruct represents a simple job that, when processed, prints an email address. In a practical application, this could be replaced with code to actually perform a task like sending an email. - The
mainfunction demonstrates how to create and use a job queue with multiple workers. It shows adding jobs to the queue, starting the job processing, and then stopping the queue after a set duration. - This code is a basic example of concurrent job processing in Go, where multiple workers can process different jobs (in this case, “sending” emails) simultaneously.