Bạn nên xem video này:
package main import ( "fmt" "time" ) type Job interface { Process()// có 1 hàm là process } type Worker struct { // định danh WorkerId int Done chan bool //là true hoặc false báo hiện worker chạy xong hay chưa? JobRunning chan Job // worker nó running cài job nào } // Đầu tiên cần new 1 worker. func NewWorker(workerID int, jobChan chan Job) *Worker { return &Worker{ WorkerId: workerID, Done: make(chan bool),//khởi tạo 1 chanel JobRunning: jobChan, } } func (w *Worker) Run() { fmt.Println("Run worker id", w.WorkerId) // run dướng dạng concurrency go func() {// khởi tạo goroutine for {// vòng lặp vô tân để bắt các sự kiện select { case job := <- w.JobRunning: // nhận tín hiệu job running fmt.Println("Job running", w.WorkerId) job.Process()//thực hiện Process Job // đối với cài job logic nó như thế nào thì nó tự định nghĩa process bên trong như thế. case <- w.Done:// nhận được tín hiện done. fmt.Println("Stop worker", w.WorkerId) return } } }() } func (w *Worker) StopWorker() { w.Done <- true } type JobQueue struct { // chứa các worker bên trong. Worker []*Worker JobRunning chan Job Done chan bool } // Queue sẽ sử dụng bao nhiêu cài worker func NewJobQueue(numOfWorker int) JobQueue { //khởi tọa worker. // khởi slice []*Worker, leng worker và capacity worker. workers := make([]*Worker, numOfWorker, numOfWorker) jobRunning := make(chan Job)//jobRunning là 1 thành phần của JobQueue // sau đó cho thằng jobRunning đi vào Worker //jobRunning như biến toàn cục để liên hệ JobQueue và Worker for i := 0; i < numOfWorker; i++ { workers[i] = NewWorker(i, jobRunning) } return JobQueue{ Worker: workers, JobRunning: jobRunning, Done: make(chan bool), } } // pointer receiver func (jq *JobQueue) Push(job Job) { jq.JobRunning <- job// đẩy Job vào chanel JobRunning } func (jq *JobQueue) Stop() { jq.Done <- true } func (jq *JobQueue) Start() { // lấy từng con worker ra chạy go func() {// chúng ta đưa chúng vào goroutine for i := 0; i < len(jq.Worker); i++ { jq.Worker[i].Run()// lấy từng worker ra } }() // các goroutine nó chạy đọc lập ko cần đợi lần lượt. go func() {// chúng ta đưa chúng vào goroutine for {// vòng for vo tận để lắng nghe queue khi nào nó done select { case <- jq.Done:// khi có tín hiệu done for i := 0; i < len(jq.Worker); i++ { jq.Worker[i].StopWorker() } return } } }() } type Sender struct {// coi cài này là 1 Job chúng ta cần implement phương thức của interface Job Email string } func (s Sender) Process() { //send mail fmt.Println(s.Email) } func main() { emails := []string {// đây là slice "a@gmail.com", "b@gmail.com", "c@gmail.com", "d@gmail.com", "e@gmail.com", } JobQueue := NewJobQueue(100)// bốn thằng worker JobQueue.Start() // thao tác là loop qua từng email for _, email := range emails { sender := Sender{Email: email} // this is a job JobQueue.Push(sender) } //sau bao nhiêu giây thì sẽ làm cái j? time.AfterFunc(time.Second*2, func() { JobQueue.Stop() }) //sau 6s kết thúc goroutine time.Sleep(time.Second * 6) }
Đầu tiên chúng ta cần tạo ra 1 interface to implement process
type Job interface { Process()// có 1 hàm là process }
Ở đây sẽ là interface bới vì chúng ta muốn linh hoạt trong các implement một job chứ không fix vào 1 case cụ thể nào cả.
Tiếp theo chúng ta cần phải define ra worker.
type Worker struct { // định danh WorkerId int Done chan bool //là true hoặc false báo hiện worker chạy xong hay chưa? JobRunning chan Job // worker nó running cài job nào }
- WorkerId (int):
- Purpose: This is like an identification number for each worker. Just like in a company where each employee has a unique ID,
WorkerId
helps in identifying which worker is which. - Type: It’s an
int
, a basic data type in Go, used for storing whole numbers.
- Purpose: This is like an identification number for each worker. Just like in a company where each employee has a unique ID,
- Done (chan bool):
- Purpose: This channel is used to signal whether the worker has completed its current job or not. Think of it like a report or notification system.
- Type:
chan bool
is a channel that transmits boolean values (true
orfalse
). In concurrent programming, channels are used for communication between different go routines (like threads). - Usage: If
Done
receivestrue
, it means the worker is finished with its job. This can be used to safely stop the worker or to notify other parts of the system about the worker’s status.
- JobRunning (chan Job):
- Purpose: This channel is used to assign a new job to the worker. It’s like a task list or a queue where the worker picks up the next job to process.
- Type:
chan Job
means it’s a channel through whichJob
objects are passed. Remember,Job
is an interface, so any type that implements theJob
interface can be passed through this channel. - Usage: The worker listens to this channel, and whenever a new job is sent to this channel, the worker picks it up and starts processing it.
Tiếp theo chúng ta cần tạo 1 số phương thức cho worker.
// Đầu tiên cần new 1 worker. func NewWorker(workerID int, jobChan chan Job) *Worker { return &Worker{ WorkerId: workerID, Done: make(chan bool),//khởi tạo 1 chanel JobRunning: jobChan, } }
Creating New Workers: It allows you to easily create new workers with a unique ID and a channel to receive jobs.
- Function Signature:
func NewWorker(workerID int, jobChan chan Job) *Worker
: This meansNewWorker
is a function that takes two parameters:workerID int
: An integer to set theWorkerId
of the new worker.jobChan chan Job
: A channel that will be used to send jobs to the worker. This channel is of typechan Job
, meaning it can send and receive values that are of theJob
interface type.
- The function returns a pointer to a
Worker
(*Worker
).
- Function Body:
return &Worker{...}
: This line creates a newWorker
struct with the given parameters and returns a pointer to it.- Inside the
Worker
struct initialization:WorkerId: workerID
: Sets theWorkerId
of the new worker to theworkerID
parameter passed to the function.Done: make(chan bool)
: Initializes theDone
channel as a new channel for transmitting boolean values. This channel is used to signal whether the worker has completed its job.JobRunning: jobChan
: Sets theJobRunning
channel of the worker to thejobChan
parameter passed to the function. This channel is used to receive jobs that the worker needs to process.
Tiếp theo là run worker:
func (w *Worker) Run() { fmt.Println("Run worker id", w.WorkerId) // run dướng dạng concurrency go func() {// khởi tạo goroutine for {// vòng lặp vô tân để bắt các sự kiện select { case job := <- w.JobRunning: // nhận tín hiệu job running fmt.Println("Job running", w.WorkerId) job.Process()//thực hiện Process Job // đối với cài job logic nó như thế nào thì nó tự định nghĩa process bên trong như thế. case <- w.Done:// nhận được tín hiện done. fmt.Println("Stop worker", w.WorkerId) return } } }() }
- Method Signature:
func (w *Worker) Run() { ... }
: This is a method of theWorker
struct. The(w *Worker)
part indicates that this method is associated with theWorker
struct andw
is a pointer to aWorker
instance. This means the method will have access to the data and channels (Done
andJobRunning
) in theWorker
struct.
- Starting the Worker:
fmt.Println("Run worker id", w.WorkerId)
: When theRun
method is called, it first prints a message indicating that the worker with the specificWorkerId
is starting to run.
- Concurrency with Goroutines:
go func() { ... }()
: This line starts a new goroutine, which is a lightweight thread managed by the Go runtime. Goroutines allow concurrent operations – in this case, enabling each worker to run independently and handle jobs concurrently.
- Infinite Loop for Job Processing:
for { ... }
: Inside the goroutine, there is an infinite loop. This loop keeps running, waiting for jobs to process or a signal to stop the worker.
- Select Statement for Handling Channels:
- The
select
statement is used to wait on multiple channel operations. It blocks until one of its cases can run, then it executes that case. case job := <- w.JobRunning:
: This case waits for a job to be received on theJobRunning
channel. When a job is received, it prints a message indicating the worker is running a job and then calls theProcess()
method of the job. TheProcess()
method is where the actual job logic is executed.case <- w.Done:
: This case waits for a signal on theDone
channel. If atrue
is received on this channel, it prints a message indicating the worker is stopping and then exits the loop (and consequently the goroutine), effectively stopping the worker.
- The
GIờ chúng ta cần định nghía Stop worker:
func (w *Worker) StopWorker() { w.Done <- true }
Tiếp đến là JobQueue sẽ chứa nhiều Worker.
control bao nhiều worker,
type JobQueue struct { // chứa các worker bên trong. Worker []*Worker JobRunning chan Job Done chan bool }
- Worker (
[]*Worker
):- Purpose: This is a slice of pointers to
Worker
instances. It holds a list of workers that are available to process jobs. Each worker in this slice is capable of running jobs concurrently. - Type:
[]*Worker
is a slice ofWorker
pointers. This means it can dynamically grow and shrink as needed and each element of the slice is a reference to aWorker
instance.
- Purpose: This is a slice of pointers to
- JobRunning (
chan Job
):- Purpose: This channel is used to send jobs to the workers. Jobs sent to this channel are picked up by available workers for processing.
- Type:
chan Job
is a channel that can send and receive values that are of theJob
interface type. This channel acts as a medium through which jobs are distributed to the workers.
- Done (
chan bool
):- Purpose: This channel is used to signal when the job queue should stop processing. It can be used to gracefully shut down the job queue and all its workers.
- Type:
chan bool
is a channel for transmitting boolean values (true
orfalse
). When atrue
is sent to this channel, it indicates that the job queue should stop its operations.
NewJobQueue
// Queue sẽ sử dụng bao nhiêu cài worker func NewJobQueue(numOfWorker int) JobQueue { //khởi tọa worker. // khởi slice []*Worker, leng worker và capacity worker. workers := make([]*Worker, numOfWorker, numOfWorker) jobRunning := make(chan Job)//jobRunning là 1 thành phần của JobQueue // sau đó cho thằng jobRunning đi vào Worker //jobRunning như biến toàn cục để liên hệ JobQueue và Worker for i := 0; i < numOfWorker; i++ { workers[i] = NewWorker(i, jobRunning) } return JobQueue{ Worker: workers, JobRunning: jobRunning, Done: make(chan bool), } }
The NewJobQueue
function in your Go code is another constructor function, similar to NewWorker
. Its purpose is to create and initialize a new JobQueue
struct with a specified number of workers and the necessary channels for job processing and control. Let’s break it down for easier understanding:
- Function Signature:
func NewJobQueue(numOfWorker int) JobQueue
: This function takes one parameter,numOfWorker
, which is an integer specifying how many workers should be in the job queue. It returns an instance of theJobQueue
struct.
- Initializing Workers:
workers := make([]*Worker, numOfWorker, numOfWorker)
: This line creates a slice of worker pointers with a length and capacity equal tonumOfWorker
. This slice will hold the workers for the job queue.
- Creating the
JobRunning
Channel:jobRunning := make(chan Job)
: Here, a new channel of typechan Job
is created. This channel is used to distribute jobs to the workers.
- Creating and Assigning Workers:
- The
for
loop (for i := 0; i < numOfWorker; i++ { ... }
) iterates from 0 tonumOfWorker - 1
. - In each iteration,
workers[i] = NewWorker(i, jobRunning)
creates a new worker with a unique ID (i
) and the samejobRunning
channel. This means all workers will receive jobs from the same channel, ensuring job distribution among them.
- The
- Returning a New
JobQueue
:return JobQueue{...}
: Finally, aJobQueue
struct is returned with the initialized workers, theJobRunning
channel, and a newDone
channel (make(chan bool)
). TheDone
channel is used to signal when the job queue should stop processing.
Purpose of the NewJobQueue
Function:
- Setting Up the Queue: It sets up a job queue with a specified number of workers, ready to process jobs.
- Sharing Job Channel: All workers share the same
JobRunning
channel, which is essential for evenly distributing jobs among them. - Flexibility in Scaling: You can easily adjust the number of workers in the job queue by changing the
numOfWorker
parameter. - Centralized Control: The
Done
channel provides a centralized way to stop all workers and the job processing in the queue.
In simple terms, NewJobQueue
is like setting up a work team and a task list. You decide how many team members you want (numOfWorker
) and set up a system where tasks (jobs) can be assigned to them. Once set up, the team is ready to start working on the tasks assigned to them through a shared task list (JobRunning
channel).
Push and Stop the jobs to JobQueue
// pointer receiver func (jq *JobQueue) Push(job Job) { jq.JobRunning <- job// đẩy Job vào chanel JobRunning } func (jq *JobQueue) Stop() { jq.Done <- true }
Push
Method: It’s used for adding new jobs to the queue. It’s like telling the job queue, “Here’s a new task; please have one of the workers handle it.”Stop
Method: It’s used to signal that the job queue and all its workers should stop their operations. It’s like saying, “Work is over; everyone can stop now.”
func (jq *JobQueue) Start() { // lấy từng con worker ra chạy go func() {// chúng ta đưa chúng vào goroutine for i := 0; i < len(jq.Worker); i++ { jq.Worker[i].Run()// lấy từng worker ra } }() // các goroutine nó chạy đọc lập ko cần đợi lần lượt. go func() {// chúng ta đưa chúng vào goroutine for {// vòng for vo tận để lắng nghe queue khi nào nó done select { case <- jq.Done:// khi có tín hiệu done for i := 0; i < len(jq.Worker); i++ { jq.Worker[i].StopWorker() } return } } }() }
1. Starting Workers:
- Method Signature:
func (jq *JobQueue) Start() { ... }
: This method is associated with theJobQueue
struct and uses a pointer receiver, meaning it can modify the state of theJobQueue
instance it’s called on.
- Starting All Workers:
go func() { for i := 0; i < len(jq.Worker); i++ { jq.Worker[i].Run() } }()
: This section starts a new goroutine (a lightweight thread). Within this goroutine, there’s a loop that iterates over all the workers in theJobQueue
.jq.Worker[i].Run()
: For each worker in the queue, theRun
method is called. This starts each worker in a separate goroutine, enabling them to operate concurrently. Each worker starts listening for jobs on theJobRunning
channel and processes them as they come in.
2. Listening for a Stop Signal:
- Monitoring for Stop Command:
go func() { ... for { select { case <- jq.Done: ... } } }()
: Another goroutine is started here. This one continuously listens for a signal on theDone
channel.select { case <- jq.Done: ... }
: Theselect
statement is used to wait for a signal on theDone
channel. Whentrue
is received on this channel, it indicates that the job queue should stop.for i := 0; i < len(jq.Worker); i++ { jq.Worker[i].StopWorker() }
: Once a stop signal is received, this loop iterates over all workers and calls theStopWorker
method on each. This method sends a signal to each worker to stop processing.
Tiếp đến là phần chúng ta implement process mà chúng ta muốn:
type Sender struct {// coi cài này là 1 Job chúng ta cần implement phương thức của interface Job Email string } func (s Sender) Process() { //send mail fmt.Println(s.Email) } func main() { emails := []string {// đây là slice "a@gmail.com", "b@gmail.com", "c@gmail.com", "d@gmail.com", "e@gmail.com", } JobQueue := NewJobQueue(100)// bốn thằng worker JobQueue.Start() // thao tác là loop qua từng email for _, email := range emails { sender := Sender{Email: email} // this is a job JobQueue.Push(sender) } //sau bao nhiêu giây thì sẽ làm cái j? time.AfterFunc(time.Second*2, func() { JobQueue.Stop() }) //sau 6s kết thúc goroutine time.Sleep(time.Second * 6) }
The Sender
Struct and Process
Method:
- Sender Struct:
type Sender struct { Email string }
: This struct represents a job type. It has one field,Email
, which presumably stores an email address.- The comment indicates that
Sender
is intended to be considered aJob
, meaning it should implement theJob
interface.
- Process Method:
func (s Sender) Process() { fmt.Println(s.Email) }
: This method is the implementation of theProcess
method required by theJob
interface.- In this case, the
Process
method simply prints the email address. In a real-world scenario, this might be where you put code to actually send an email.
The main
Function:
- Creating a Job Queue:
JobQueue := NewJobQueue(100)
: Here, a new job queue is created with 100 workers. This queue is responsible for managing and processing jobs.JobQueue.Start()
: This starts all the workers and readies the job queue to process jobs.
- Queueing Jobs:
- The loop
for _, email := range emails { ... }
iterates through a slice of email addresses. - Inside the loop,
sender := Sender{Email: email}
creates aSender
instance (a job) for each email address. JobQueue.Push(sender)
: EachSender
instance is then added to the job queue for processing.
- The loop
- Stopping the Job Queue:
time.AfterFunc(time.Second*2, func() { JobQueue.Stop() })
: This schedules the job queue to stop after 2 seconds.time.AfterFunc
waits for the specified duration (2 seconds), then calls the provided function, which sends a signal to stop the job queue.
- Ending the Program:
time.Sleep(time.Second * 6)
: This line causes themain
function to wait for 6 seconds before exiting. This wait time ensures that you can observe the processing of jobs for a certain duration before the program terminates.
Summary:
- The
Sender
struct represents a simple job that, when processed, prints an email address. In a practical application, this could be replaced with code to actually perform a task like sending an email. - The
main
function demonstrates how to create and use a job queue with multiple workers. It shows adding jobs to the queue, starting the job processing, and then stopping the queue after a set duration. - This code is a basic example of concurrent job processing in Go, where multiple workers can process different jobs (in this case, “sending” emails) simultaneously.