2.3K Views
January 23, 23
スライド概要
Go言語での並行処理を実装するうえで気にすると良いこと
LIFULL HOME'Sを運営する株式会社LIFULLのアカウントです。 LIFULLが主催するエンジニア向けイベント「Ltech」等で公開されたスライド等をこちらで共有しております。
Go言語で並行処理 社内技術勉強会 テクノロジー本部事業基盤UプラットフォームG 宮崎泰輔 1 © LIFULL Co.,Ltd. 本書の無断転載、複製を固く禁じます。
並行処理はなぜ難しいか 競合状態 var data int go func() { data++ }() if data == 0 { fmt.Printf(“the value is %v.\n”, data) } これはどうなる? 1. 何も表示されない 2. the value is 0. が表示される 3. the value is 1. が表示される
並行処理はなぜ難しいか アトミック性 異なるコンテキストで、アトミックであることが保証できないと、 平行なコンテキストで安全なプログラムにならない。 i++ => i = i + 1 1. iの値を取得する 2. iの値を1増やす 3. iの値を保存する
並行処理はなぜ難しいか
デッドロック
- 2つのgoroutineがそれぞれ相手の
type value struct {
mu sync.Mutex
value int
}
var wg sync.WaitGroup
printSum := func(v1, v2 *value) {
defer wg.Done()
v1.mu.Lock()
完了を待っている
defer v1.mu.Unlock()
time.Sleep(2 * time.Second)
v2.mu.Lock()
defer v2.mu.Unlock()
そこから進むことはない
fmt.Printf("sum = %v\n", v1.value + v2.value)
}
var a, b value
wg.Add(2)
go printSum(&a, &b)
go printSum(&b, &a)
wg.Wait()
並行処理はなぜ難しいか - Goを使っても、並行処理は依然難しいもの - ただ、Goの並行処理のプリミティブを使えば、簡潔に書ける そのパターンや、注意点についていくつか紹介
Goの並行プリミティブ go 文: go に続く関数を別のgoroutineで動かす channel: 異なるgoroutine間でメッセージをやり取りするためのもの select文: 複数のchannelからメッセージを待ち受けるためのもの sync.WaitGroup: 安全なカウンターみたいなもの sync.Mutex, sync.RWMutex: クリティカルセクションを保護するもの sync.Once: 平行であろうと、1度しかコードを実行しない sync.Pool: オブジェクトプールを並行に使えるようにしたもの
sync.WaitGroup goroutine起動時に、Addしておいて、goroutineの中で defer Done() するのが定番 var wg sync.WaitGroup wg.Add(1) go func() { defer wg.Done() fmt.Println("1st goroutine sleeping...") time.Sleep(1) }() wg.Add(1) go func() { defer wg.Done() fmt.Println("2nd goroutine sleeping...") time.Sleep(2) }() wg.Wait() fmt.Println("All goroutines complete.")
sync.WaitGroup
forで起動する場合はこんな感じ
const numGreeters = 5
var wg sync.WaitGroup
wg.Add(numGreeters)
for i := 0; i < numGreeters; i++ {
// i := i
go func(i int) {
defer wg.Done()
fmt.Printf("Hello from %v!\n", i)
}(i)
}
wg.Wait()
i := i としてるのは、ループのiを参照していると、goroutine起動後に
書き換わることがあるため
channelのパターン
generator
関数の中でchannelを作成して
generator := func(ctx context.Context, integers ...int) <-chan int {
stream := make(chan int, len(integers))
go func() {
defer close(stream)
for _, i := range integers {
select {
case <-ctx.Done():
return
case stream <- i:
}
}
goroutineの中でchannelに値を入れる
defer で、goroutineを抜けるときに
channelをcloseすることで、
チャンネルからの読み取りが終了する
}()
return stream
}
for i := range generator(context.Background(), 1, 2, 3, 4, 5) {
fmt.Println(i)
}
sync.Once
sync.Once#Do は、何回呼ばれても一度しか実行されない
var count int
increment := func() {
count++
}
var once sync.Once
var increments sync.WaitGroup
increments.Add(100)
for i := 0; i < 100; i++ {
go func() {
defer increments.Done()
once.Do(increment)
}()
}
increments.Wait()
fmt.Printf("Count is %v!\n", count)
=> Count is 1!
sync.Pool
使うものを決まった数だけ作る方法
workerが1024*1024動いているが、
実際のメモリ確保は、数KBのみ
Poolを使っていない場合、
最悪1024B * 1024 * 1024 = 1GB
var numCalcsCreated int
calcPool := &sync.Pool{
New: func() interface{} {
numCalcsCreated += 1
mem := make([]byte, 1024)
return &mem
},
}
calcPool.Put(calcPool.New())
calcPool.Put(calcPool.New())
calcPool.Put(calcPool.New())
calcPool.Put(calcPool.New())
const numWorkers = 1024 * 1024
var wg sync.WaitGroup
wg.Add(numWorkers)
for i := numWorkers; i > 0; i-- {
go func() {
defer wg.Done()
mem := calcPool.Get().(*[]byte)
defer calcPool.Put(mem)
}()
オブジェクトを使い回せる場合に便利
}
wg.Wait()
fmt.Printf("%d calculators were created.", numCalcsCreated)
channelのパターン
for-selectループ
無限ループしつつ、doneに
値が入ったらbreakするようなコード
100[ms]毎に終了すべきか
チェックしているとも言える
done := make(chan struct{})
go func() {
time.Sleep(1 * time.Second)
done <- struct{}{}
}()
Loop:
for {
select {
case <-done:
break Loop
default:
}
fmt.Println("hi")
time.Sleep(100 * time.Millisecond)
}
channelのパターン
for-selectループ
time.Afterを使う場合はこんな感じ
default節を書かないと、
timeoutChから値が来るまでずっと
ブロックし続けるので注意
(ミスりやすい)
time.NewTimerを使う
timeoutCh := time.After(1 * time.Second)
Loop:
for {
select {
case <-timeoutCh: // 内部のgoroutineがリークする
break Loop
case <-ctx.Done():
return
default:
}
fmt.Println("hi")
time.Sleep(100 * time.Millisecond)
}
fmt.Println("finished")
channelのパターン
便利な関数
これがあると、
func orDone[T any](ctx context.Context, c <-chan T) <-chan T {
stream := make(chan T)
go func() {
defer close(stream)
for val := range orDone(ctx, myChan) {
// valに対してなにかする
}
for {
select {
case <-ctx.Done():
return nil
case v, ok := <-c:
if !ok {
return
}
select {
case stream <- v:
case <-ctx.Done():
}
}
}
こう書くだけでcontextが終了するか
myChanがcloseされるまでのループを
安全にできる。
}()
return stream
}
channelのパターン
fan-in
複数のチャンネルをまとめて
1つのチャンネルにする
パイプラインパターンのときに
よく出てくる
var wg sync.WaitGroup
multiplexed := make(chan T)
multiplex := func(c <-chan T) {
defer wg.Done()
for i := range c {
{
select {
case <-ctx.Done():
return
case multiplexed <- i:
}
}
}
}
wg.Add(len(channels))
for _, c := range channels {
go multiplex(c)
}
go func() {
wg.Wait()
close(multiplexed)
}()
return multiplexed
channelのパターン パイプライン processB input processA processC processB それぞれのステージは、goroutine それらをつないでいるのがchannel つまり、ステージはchannelを受け取ってchannelを返す output
channelのパターン
だいたい全部こんな感じになる
それらのステージを
func processA(ctx context.Context, input <-chan int) <-chan int {
ret := make(chan int)
go func() {
defer close(ret)
つなぎ合わせて処理する
for i := range input {
// ここでなんか処理する
select {
case <-ctx.Done():
return
case ret <- i:
}
}
}()
return ret
}
個人的に良くないと思ってるパターン 都度goroutineを起動する - forでループのたびにgoroutineを起動すると、goroutineの起動数に制限を かけておかないとgoroutineが起動しすぎてしまう。 制限をかけていれば問題ない(webサーバーとかはこのパターン) よく考えずchannelのバッファーを1以上にする - バッファを設定しても、それぞれのステージがブロック状態になる 時間を短くできるだけ 全体としての処理時間が早くなるわけではない
注意点 goroutineがリークしないように注意 - 関数が終わることで、channelから読み込まれるものが無い状態にならな いことを注意したい
他にも有用なパターン - ハートビート - 流量制限 - errgroup