Пример sync и async worker

В этом примере мы собираемся создать рабочий пакет, который будет использоваться для синхронной и асинхронной обработки заданий.

Упаковка

Хотя этот пакет делает то, что обещает, ему нужны еще три бонусные функции.

  • Возможность завершать длительные задания.
  • Возможность закрытия каналов.
  • Возможность убить всех рабочих.

В этом примере для заданий используются «небуферизованные» каналы. Однако, если вы хотите ограничить количество заданий одновременно, вы можете обновить NewWorker()функцию, чтобы использовать «буферизованные» каналы, как показано ниже.

1
2
3
4
5
6
7
func NewWorker(workerTotal, jobTotal int) Worker {
return Worker{
total: workerTotal,
jobChan: make(chan Job, jobTotal),
resultChan: make(chan interface{}, jobTotal),
}
}

код

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
package worker

import (
"fmt"
"math/rand"
"time"
)

// -----------------------------------------------------------------------------

type Job struct {
// id represents the job identifier.
// id представляет идентификатор задания.
id interface{}
}

// NewJob returns a `Job` instance.
// NewJob возвращает экземпляр Job.
func NewJob(id interface{}) Job {
return Job{
id: id,
}
}

// -----------------------------------------------------------------------------

type Worker struct {
// total represents the amount of workers to be run at startup.
// total представляет количество воркеров, которые будут запущены при запуске.
total int
// jobChan represents a two-way "unbuffered" channel that has unlimited
// capacity for the jobs.
// jobChan представляет собой двусторонний «небуферизованный» канал с неограниченной
// пропускной способностью для заданий.
jobChan chan Job
// resultChan represents a two-way "unbuffered" channel that has unlimited
// capacity for the job results.
// resultChan представляет собой двусторонний «небуферизованный» канал, который имеет неограниченную
// емкость для результатов задания.
resultChan chan interface{}
}

// NewWorker returns a `Worker` instance.
// NewWorker возвращает экземпляр Worker.
func NewWorker(workerTotal int) Worker {
return Worker{
total: workerTotal,
jobChan: make(chan Job),
resultChan: make(chan interface{}),
}
}

// Start brings up certain amount of worker(s) so that they can pick up and work
// on the job(s).
// Start вызывает определенное количество рабочих(воркеров), чтобы они могли взять и работать
// на задании (джобе(ах)).
func (w Worker) Start() {
for i := 1; i <= w.total; i++ {
go w.run(i)
}
}

// Add adds a job to a channel so that it could be picked up and worked on by
// the running worker(s).
// Add добавляет задание(джоб) в канал, чтобы его мог взять и обработать
// работающий работник(и)(воркер(ы)).
func (w Worker) Add(job Job) {
w.jobChan <- job
}

// Result returns a channel so that it could be ranged over in order to fetch
// job results from the running worker(s).
// Result возвращает канал, чтобы его можно было ранжировать для получения
// результатов задания от работающего работника(ов)(воркера).
func (w Worker) Result() <-chan interface{} {
return w.resultChan
}

// run runs a worker and works on the job(s).
// run запускает работника и работает над заданием(ями) джобами.
func (w Worker) run(id int) {
fmt.Println(id, "running...")

for {
select {
case job := <- w.jobChan:
fmt.Printf("%d выбрал задание %v @ %s\n", id, job.id, time.Now().UTC())
//fmt.Printf("%d picked up job %v @ %s\n", id, job.id, time.Now().UTC())

// Pretend like doing something.
// Делаем вид, будто что-то делаем.
rand.Seed(time.Now().UnixNano())
time.Sleep(time.Duration(rand.Intn(len([]int{0, 1, 2, 3, 4}))) * time.Second)
// Done. (сделано)

fmt.Printf("%d завершил задание %v @ %s\n", id, job.id, time.Now().UTC())
//fmt.Printf("%d completed job %v @ %s\n", id, job.id, time.Now().UTC())
w.resultChan <- job.id
default:
time.Sleep(1 * time.Second)
//fmt.Println(id, "waiting...")
fmt.Println(id, "ожидание...")
}
}
}

Применение

Асинхронный

Это асинхронный (неблокирующий) пример, поэтому задания обрабатываются случайным образом и в произвольном порядке. Программа никогда не выходит. Результаты заданий печатаются независимо друг от друга. Важно иметь несколько работников.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package main

import (
"fmt"

"internal/worker"
)

func main() {
// Create new worker(s) and start.
// Создаем новых рабочих и запускаем.
w := worker.NewWorker(3)
w.Start()

go func() {
// Add jobs.
// Добовляем задания
for i := 1; i <= 5; i++ {
w.Add(worker.NewJob(i))
}
}()

// Print results.
// Печать результатов.
for v := range w.Result() {
fmt.Println("Result:", v)
}
}

синхронный

Это синхронный (блокирующий) пример, поэтому задания обрабатываются одно за другим по порядку. Программа завершает работу после завершения заданий. Результаты заданий печатаются один за другим по порядку. Иметь несколько работников бессмысленно, поскольку одновременно обрабатывается одно задание.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
package main

import (
"fmt"
"internal/worker"
)

func main() {
// Create new worker(s) and start.
// Создаем новых рабочих и запускаем.
w := worker.NewWorker(3)
w.Start()

// Add jobs.
// Добавляем задания.
w.Add(worker.NewJob(1))
// Print results.
// Распечатываем результаты.
v := <- w.Result()
//fmt.Println("Result:", v)
fmt.Println("Результат:", v)

// Add jobs.
// Добавляем задания.
w.Add(worker.NewJob(2))
// Print results.
v = <- w.Result()
fmt.Println("Result:", v)

// Add jobs.
w.Add(worker.NewJob(3))
// Print results.
v = <- w.Result()
fmt.Println("Result:", v)

// Add jobs.
w.Add(worker.NewJob(4))
// Print results.
v = <- w.Result()
fmt.Println("Result:", v)

// Add jobs.
w.Add(worker.NewJob(5))
// Print results.
v = <- w.Result()
fmt.Println("Result:", v)
}

Вот вам и пример синхронного и асинхронного воркера с Golang

Поделиться