Go言語には平行処理を行うために goroutine (ゴルーチン) というものが用意されており、非常に便利です。この goroutine とセットで使うことになる channel
は goroutine 間の値の受け渡しに使われます。非同期で実行される goroutine 間でデータの受け渡しを行う目的で用意された仕組み(データ構造)です。
go をさわり始めた頃は、channel
はただの値の受け渡しだけの役割として意識していたのですが、キューのデータ構造としての仕組みが便利に利用できると最近気付きました。
処理を平行処理したい時にその処理を効率的に(待ち時間の無駄を無くし使えるリソースをフル活用)、分散させようとするといろいろ頭を悩ませます。例えば、ここではその解決にchannel
のキューの仕組みを利用します。キューに処理させたいデータを貯め込んでおき、ワーカーである各goroutineにそのキューから手が空いた人からデータを取り出して処理してもらい、キューからデータがなくなったところで処理終了、というようなやり方を試してみます。
キューしての役割の channel
(チャネル)
まず、channel
はバッファを持てるということを意識しておきます。
channel
の作成には make
を使いますが、その第2引数に指定するのがバッファの数です。省略した場合には、バッファ無しの channel
が作られています。値をため込みませんので、誰かがバッファから値を取り出すまでは値を入れることができません。
特に平行処理などは動作させてみないと実際の動きがピンとこないことが多いです。実際に動かして何を言っているのか確認してみます。
package main import ( "log" "sync" "time" ) func main() { var wg sync.WaitGroup log.Println("main start") queue := make(chan string) // 1) wg.Add(1) go func() { defer wg.Done() log.Println("goroutine: start") // 受け取り側が受信操作をするまでは停止 queue <- "hello" // 2) log.Println("goroutine: Put 'hello'") log.Println("goroutine: end") }() time.Sleep(3 * time.Second) // 3) ret := <-queue // 4) log.Printf("main: Got '%s'\n", ret) wg.Wait() log.Println("main: completed") }
1) にあるように、第2引数を指定していませんので、バッファ無しのキューになっています。
キューに値を詰めているのは、goroutine の 2) の部分で、取り出しているのは、メインスレッドの 4) の部分です。 キューから値を取り出す直前に 3) で3秒間のスリープを入れています。
channel
がキューになっていて、バッファ無しとなっていることの意味を正しく把握していないと、起動後、2) が実施されて、キューに値が追加され、4) の取り出しの直前で 3 秒間待たされる動作になるように想像してしまいます。
実際の上記のソースの実行結果は以下のようになります。
2020/02/10 08:16:31 main start 2020/02/10 08:16:31 goroutine: start <- A 2020/02/10 08:16:34 main: Got 'hello' <- B 2020/02/10 08:16:34 goroutine: Put 'hello' 2020/02/10 08:16:34 goroutine: end 2020/02/10 08:16:34 main: completed
A と B の処理時間を見るとわかりますが、2) のキューに値を追加する処理自体が 3秒間待たされています。処理の順番としては、3秒間のスリープを待たずに実施されるはずなのですが、キューにバッファリングできないため、取り出しとセットでないと動けない状態なんですね。
値を保持するためにキューのサイズを増やしてあげると、値を詰める処理はブロックされずに、先に値を入れておくことができるようになります。以下のように先程のソースコードのキューのサイズを1にしておきます。
diff -u 02-08-094901.go 02-08-094901-2.go --- 02-08-094901.go 2020-02-10 08:16:28.000000000 +0900 +++ 02-08-094901-2.go 2020-02-10 08:35:25.000000000 +0900 @@ -10,7 +10,7 @@ var wg sync.WaitGroup log.Println("main start") - queue := make(chan string) // 1) + queue := make(chan string, 1) // 1) wg.Add(1) go func() {
実行してみます。
2020/02/10 08:37:26 main start 2020/02/10 08:37:26 goroutine: start 2020/02/10 08:37:26 goroutine: Put 'hello' 2020/02/10 08:37:26 goroutine: end <- A 2020/02/10 08:37:29 main: Got 'hello' <- B 2020/02/10 08:37:29 main: completed
値を1つバッファリングできるようになったため、3秒のスリープ前に値が詰められ、スリープ後取り出されていることがわかります。
channel
(チャネル)を活用して負荷分散 / 固定数のgoroutineを起動して処理を分散させて平行処理する
では、実際にchannel
(チャネル)を活用して負荷分散のイメージを実装します。
package main import ( "log" "sync" "time" ) const ( MaxGoRoutine = 3 QueueSize = 2 ) /** 起動する Go ルーチンの数を固定とし、 空いている Go ルーチンからキューに入っている URL の処理を行ってもらう */ func main() { var wg sync.WaitGroup urls := []string{ "https://www.example.com", "https://www.example.net", "https://www.example.com/foo", "https://www.example.com/bar", "https://www.example.com/?1", "https://www.example.com/?2", "https://www.example.com/?3", } queue := make(chan string, QueueSize) for i := 0; i < MaxGoRoutine; i++ { wg.Add(1) go fetch(&wg, queue, i) } for _, url := range urls { queue <- url } close(queue) wg.Wait() } func fetch(wg *sync.WaitGroup, queue chan string, i int) { defer wg.Done() for { // queue が close されると more は false になる。 url, more := <-queue if more { log.Printf("[%d]: fetching %s\n", i, url) time.Sleep(1 * time.Second) } else { log.Printf("[%d]: worker exit\n", i) return } } }
何をしているかと言いますと、リクエストを行う URL のリストを queue に貯め込み、処理を行うワーカーさんたちに手が空いたワーカーさんからその queue から値を取り出してもらい、処理を実施してもらいます。ワーカーさんは、上記では3つ起動しています。
詰め込む側で詰め込み終わったら、queue をクローズしています(close(queue)
)。これにより、ワーカー側では、queue から値を取り出す時の戻り値で知ることができますので、全て処理が行われたことがわかり、終了することができます。
実行結果は以下の通りです。この量、処理ですと、うまく分散されているかよくわかりませんが、、、とりあえず分散されています。。
2020/02/10 08:54:21 [2]: fetching https://www.example.com 2020/02/10 08:54:21 [0]: fetching https://www.example.net 2020/02/10 08:54:21 [1]: fetching https://www.example.com/foo 2020/02/10 08:54:22 [2]: fetching https://www.example.com/bar 2020/02/10 08:54:22 [0]: fetching https://www.example.com/?2 2020/02/10 08:54:22 [1]: fetching https://www.example.com/?1 2020/02/10 08:54:23 [2]: worker exit 2020/02/10 08:54:23 [1]: fetching https://www.example.com/?3 2020/02/10 08:54:23 [0]: worker exit 2020/02/10 08:54:24 [1]: worker exit
あとは、HTTP リクエストのレスポンスも channel
を使って受け渡しをし、処理するところまでやる感じでしょうか。
とりあえずここではこれで以上です。
ここで書いている内容は、mattn さんの書かれている以下のあれこれのうちの「ワーカー」の部分にあたりますね。