第2の人生の構築ログ

自分の好きなことをやりつつ、インカムもしっかりと。FIRA60 (Financial Independence, Retire Around 60) の実現を目指します。SE を生業としていますが、自分でプログラミングしながら自分が欲しいと思うアプリケーションを作ることが楽しみです。旅行と温泉、音楽と読書は欠かすことができません。

【Go言語】channel(チャネル)をキューとして使う / 固定数のgoroutine(ゴルーチン)を起動して処理を分散させて平行処理する

f:id:dr_taka_n:20200210104739p:plain:w300

Go言語には平行処理を行うために goroutine (ゴルーチン) というものが用意されており、非常に便利です。この goroutine とセットで使うことになる channel は goroutine 間の値の受け渡しに使われます。非同期で実行される goroutine 間でデータの受け渡しを行う目的で用意された仕組み(データ構造)です。

go をさわり始めた頃は、channelはただの値の受け渡しだけの役割として意識していたのですが、キューのデータ構造としての仕組みが便利に利用できると最近気付きました。

処理を平行処理したい時にその処理を効率的に(待ち時間の無駄を無くし使えるリソースをフル活用)、分散させようとするといろいろ頭を悩ませます。例えば、ここではその解決にchannelのキューの仕組みを利用します。キューに処理させたいデータを貯め込んでおき、ワーカーである各goroutineにそのキューから手が空いた人からデータを取り出して処理してもらい、キューからデータがなくなったところで処理終了、というようなやり方を試してみます。

キューしての役割の channel(チャネル)

まず、channel はバッファを持てるということを意識しておきます。 channel の作成には make を使いますが、その第2引数に指定するのがバッファの数です。省略した場合には、バッファ無しの channel が作られています。値をため込みませんので、誰かがバッファから値を取り出すまでは値を入れることができません。

特に平行処理などは動作させてみないと実際の動きがピンとこないことが多いです。実際に動かして何を言っているのか確認してみます。

package main

import (
    "log"
    "sync"
    "time"
)

func main() {
    var wg sync.WaitGroup

    log.Println("main start")
    queue := make(chan string) // 1)

    wg.Add(1)
    go func() {
        defer wg.Done()
        log.Println("goroutine: start")
        // 受け取り側が受信操作をするまでは停止
        queue <- "hello"  // 2)
        log.Println("goroutine: Put 'hello'")
        log.Println("goroutine: end")
    }()

    time.Sleep(3 * time.Second)  // 3)
    ret := <-queue               // 4)
    log.Printf("main: Got '%s'\n", ret)
    wg.Wait()
    log.Println("main: completed")
}

1) にあるように、第2引数を指定していませんので、バッファ無しのキューになっています。

キューに値を詰めているのは、goroutine の 2) の部分で、取り出しているのは、メインスレッドの 4) の部分です。 キューから値を取り出す直前に 3) で3秒間のスリープを入れています。

channel がキューになっていて、バッファ無しとなっていることの意味を正しく把握していないと、起動後、2) が実施されて、キューに値が追加され、4) の取り出しの直前で 3 秒間待たされる動作になるように想像してしまいます。

実際の上記のソースの実行結果は以下のようになります。

2020/02/10 08:16:31 main start
2020/02/10 08:16:31 goroutine: start   <- A
2020/02/10 08:16:34 main: Got 'hello'  <- B
2020/02/10 08:16:34 goroutine: Put 'hello'
2020/02/10 08:16:34 goroutine: end
2020/02/10 08:16:34 main: completed

A と B の処理時間を見るとわかりますが、2) のキューに値を追加する処理自体が 3秒間待たされています。処理の順番としては、3秒間のスリープを待たずに実施されるはずなのですが、キューにバッファリングできないため、取り出しとセットでないと動けない状態なんですね。

値を保持するためにキューのサイズを増やしてあげると、値を詰める処理はブロックされずに、先に値を入れておくことができるようになります。以下のように先程のソースコードのキューのサイズを1にしておきます。

diff -u 02-08-094901.go 02-08-094901-2.go                                                                                     
--- 02-08-094901.go     2020-02-10 08:16:28.000000000 +0900
+++ 02-08-094901-2.go   2020-02-10 08:35:25.000000000 +0900
@@ -10,7 +10,7 @@
        var wg sync.WaitGroup
 
        log.Println("main start")
-       queue := make(chan string) // 1)
+       queue := make(chan string, 1) // 1)
 
        wg.Add(1)
        go func() {

実行してみます。

2020/02/10 08:37:26 main start
2020/02/10 08:37:26 goroutine: start
2020/02/10 08:37:26 goroutine: Put 'hello'
2020/02/10 08:37:26 goroutine: end    <- A
2020/02/10 08:37:29 main: Got 'hello' <- B
2020/02/10 08:37:29 main: completed

値を1つバッファリングできるようになったため、3秒のスリープ前に値が詰められ、スリープ後取り出されていることがわかります。

channel(チャネル)を活用して負荷分散 / 固定数のgoroutineを起動して処理を分散させて平行処理する

では、実際にchannel(チャネル)を活用して負荷分散のイメージを実装します。

package main

import (
    "log"
    "sync"
    "time"
)

const (
    MaxGoRoutine = 3
    QueueSize = 2
)

/**
起動する Go ルーチンの数を固定とし、
空いている Go ルーチンからキューに入っている URL の処理を行ってもらう
*/
func main() {
    var wg sync.WaitGroup

    urls := []string{
        "https://www.example.com",
        "https://www.example.net",
        "https://www.example.com/foo",
        "https://www.example.com/bar",
        "https://www.example.com/?1",
        "https://www.example.com/?2",
        "https://www.example.com/?3",
    }

    queue := make(chan string, QueueSize)
    for i := 0; i < MaxGoRoutine; i++ {
        wg.Add(1)
        go fetch(&wg, queue, i)
    }

    for _, url := range urls {
        queue <- url
    }
    close(queue)
    wg.Wait()
}

func fetch(wg *sync.WaitGroup, queue chan string, i int) {
    defer wg.Done()
    for {
        // queue が close されると more は false になる。
        url, more := <-queue
        if more {
            log.Printf("[%d]: fetching %s\n", i, url)
            time.Sleep(1 * time.Second)
        } else {
            log.Printf("[%d]: worker exit\n", i)
            return
        }
    }
}

何をしているかと言いますと、リクエストを行う URL のリストを queue に貯め込み、処理を行うワーカーさんたちに手が空いたワーカーさんからその queue から値を取り出してもらい、処理を実施してもらいます。ワーカーさんは、上記では3つ起動しています。

詰め込む側で詰め込み終わったら、queue をクローズしています(close(queue))。これにより、ワーカー側では、queue から値を取り出す時の戻り値で知ることができますので、全て処理が行われたことがわかり、終了することができます。

実行結果は以下の通りです。この量、処理ですと、うまく分散されているかよくわかりませんが、、、とりあえず分散されています。。

2020/02/10 08:54:21 [2]: fetching https://www.example.com
2020/02/10 08:54:21 [0]: fetching https://www.example.net
2020/02/10 08:54:21 [1]: fetching https://www.example.com/foo
2020/02/10 08:54:22 [2]: fetching https://www.example.com/bar
2020/02/10 08:54:22 [0]: fetching https://www.example.com/?2
2020/02/10 08:54:22 [1]: fetching https://www.example.com/?1
2020/02/10 08:54:23 [2]: worker exit
2020/02/10 08:54:23 [1]: fetching https://www.example.com/?3
2020/02/10 08:54:23 [0]: worker exit
2020/02/10 08:54:24 [1]: worker exit

あとは、HTTP リクエストのレスポンスも channel を使って受け渡しをし、処理するところまでやる感じでしょうか。

とりあえずここではこれで以上です。

ここで書いている内容は、mattn さんの書かれている以下のあれこれのうちの「ワーカー」の部分にあたりますね。

mattn.kaoriya.net