В этом примере мы собираемся создать рабочий пакет, который будет использоваться для синхронной и асинхронной обработки заданий.
Упаковка
Хотя этот пакет делает то, что обещает, ему нужны еще три бонусные функции.
Возможность завершать длительные задания.
Возможность закрытия каналов.
Возможность убить всех рабочих.
В этом примере для заданий используются «небуферизованные» каналы. Однако, если вы хотите ограничить количество заданий одновременно, вы можете обновить NewWorker()функцию, чтобы использовать «буферизованные» каналы, как показано ниже.
package worker import ( "fmt" "math/rand" "time" ) // ----------------------------------------------------------------------------- type Job struct { // id represents the job identifier. // id представляет идентификатор задания. id interface{} } // NewJob returns a `Job` instance. // NewJob возвращает экземпляр Job. funcNewJob(id interface{})Job { return Job{ id: id, } } // ----------------------------------------------------------------------------- type Worker struct { // total represents the amount of workers to be run at startup. // total представляет количество воркеров, которые будут запущены при запуске. total int // jobChan represents a two-way "unbuffered" channel that has unlimited // capacity for the jobs. // jobChan представляет собой двусторонний «небуферизованный» канал с неограниченной // пропускной способностью для заданий. jobChan chan Job // resultChan represents a two-way "unbuffered" channel that has unlimited // capacity for the job results. // resultChan представляет собой двусторонний «небуферизованный» канал, который имеет неограниченную // емкость для результатов задания. resultChan chaninterface{} } // NewWorker returns a `Worker` instance. // NewWorker возвращает экземпляр Worker. funcNewWorker(workerTotal int)Worker { return Worker{ total: workerTotal, jobChan: make(chan Job), resultChan: make(chaninterface{}), } } // Start brings up certain amount of worker(s) so that they can pick up and work // on the job(s). // Start вызывает определенное количество рабочих(воркеров), чтобы они могли взять и работать // на задании (джобе(ах)). func(w Worker)Start() { for i := 1; i <= w.total; i++ { go w.run(i) } } // Add adds a job to a channel so that it could be picked up and worked on by // the running worker(s). // Add добавляет задание(джоб) в канал, чтобы его мог взять и обработать // работающий работник(и)(воркер(ы)). func(w Worker)Add(job Job) { w.jobChan <- job } // Result returns a channel so that it could be ranged over in order to fetch // job results from the running worker(s). // Result возвращает канал, чтобы его можно было ранжировать для получения // результатов задания от работающего работника(ов)(воркера). func(w Worker)Result() <-chaninterface{} { return w.resultChan } // run runs a worker and works on the job(s). // run запускает работника и работает над заданием(ями) джобами. func(w Worker)run(id int) { fmt.Println(id, "running...")
Это асинхронный (неблокирующий) пример, поэтому задания обрабатываются случайным образом и в произвольном порядке. Программа никогда не выходит. Результаты заданий печатаются независимо друг от друга. Важно иметь несколько работников.
package main import ( "fmt" "internal/worker" ) funcmain() { // Create new worker(s) and start. // Создаем новых рабочих и запускаем. w := worker.NewWorker(3) w.Start() gofunc() { // Add jobs. // Добовляем задания for i := 1; i <= 5; i++ { w.Add(worker.NewJob(i)) } }() // Print results. // Печать результатов. for v := range w.Result() { fmt.Println("Result:", v) } }
синхронный
Это синхронный (блокирующий) пример, поэтому задания обрабатываются одно за другим по порядку. Программа завершает работу после завершения заданий. Результаты заданий печатаются один за другим по порядку. Иметь несколько работников бессмысленно, поскольку одновременно обрабатывается одно задание.