Coroutines SharedFlow解説

Coroutines 1.4.0-M1SharedFlow が入ったので、使い方を紹介します。

Kenji Abe

--

Photo by Fidel Fernando on Unsplash

SharedFlow?

SharedFlow は名前の通り Flow ですが、普通の Flow は Cold Stream に対して、こちらは Hot Stream になります。Flow と異なり Complete することはないのです。

また、キャッシュとバッファリングに対応することができます。

SharedFlow で発行された値は collect (サブスクライブ)してる箇所すべてに通知されるます。

ユースケース

以下のIssueのdescriptionを見てみてください。

同期的なイベントリスナーとか、イベントをキャッシュとかリプレイするとか、みたいな感じですかね。

個人的に、Androidでは LiveData を使ってイベント通知(SingleLiveEvent)などやってたと思いますが、それの代わりになりそうだなぁって思っています。
(これは後日まとめたいと思います)

簡単な使い方

Androidやってる方は LiveData と同じような使い方に見えると思いますが、だいたいそんな感じです。

MutableSharedFlow という値を発行可能なものを用意して SharedFlow でそれを外部に公開する。公開された SharedFlowcollect して値を受信できるようにする感じです。

値を発行するときは emit という suspend 関数を呼び出します。suspend関数なので注意してください。

キャッシュ と バッファリング

まずは MutableSharedFlow の生成するときの引数を見てみます。デフォルト引数は次のような感じになっています。

private val _event = MutableSharedFlow<Event>(
replay = 0,
extraBufferCapacity = 0,
onBufferOverflow = BufferOverflow.SUSPEND

)

これらの引数を使って、キャッシュとバッファリングの制御を行います。

キャッシュ

ドキュメントでは Replay cache と呼んでるみたいです

最初の replay 引数は発行された値を保存するキャッシュの件数になります。デフォルトは 0 でキャッシュされません。

private val _event = MutableSharedFlow<Event>(replay = 2)
val event: SharedFlow<Event> = _event
_event.emit( ... )
_event.emit( ... )
_event.emit( ... )

上の例では、3回 emit した場合に、最新の2件を保持してくれます。

保持した値を参照するには replayCacne を使うことでListで取得することができます。

sample.event.replayCache // [Event, Event]

バッファリング

SharedFlow では collect の処理が時間がかかってる場合に何度も emit にて値が発行された場合にバッファリングしてくれる仕組みがあります。

まず、大事になるのが、 onBufferOverflow のパラメータです。SUSPEND , DROP_OLDEST , DROP_LATEST の3つを指定することができます。

BufferOverflow.SUSPEND

まず SUSPEND ですが、こちらの場合は、バッファリングはせずに emit したときに処理中のものがあれば終わるまで待ちます。これがデフォルトの挙動になります。

private val _event = MutableSharedFlow<Event>() // すべてデフォルト
val event: SharedFlow<Event> = _event
event.collect { event ->
// 時間のかかる処理
}
_event.emit( ... )
_event.emit( ... ) // 最初のcollectの処理が終わるまで待機

図にするとこんな感じです。

BufferOverflow.SUSPEND

BufferOverflow.DROP_OLDEST

次に DROP_OLDEST です。この設定のときは replay または extraBufferCapacity のパラメータを 0 より大きい値にする必要があります。

この設定の場合は、短時間に emit が多く発生した場合はバッファーに入らない古いものが破棄されていく感じになります。

また、バッファーのサイズは replayextraBufferCapacity の値を足したものになります。以下の例だと 2 になります。

private val _event = MutableSharedFlow<Event>(
replay = 1,
extraBufferCapacity = 1,
onBufferOverflow = BufferOverflow.DROP_OLDEST

)
val event: SharedFlow<Event> = _event
event.collect { event ->
// 時間のかかる処理
}
_event.emit( ... ) // 短時間にemitした場合、古いのが破棄される
_event.emit( ... )
_event.emit( ... )

図にすると以下の感じです。

BufferOverflow.DROP_OLDEST

BufferOverflow.DROP_LATEST

最後に DROP_LATEST ですが、これは先程の DROP_OLDEST と逆にバッファーに入らない最新のものが破棄されます。

似たような感じなので図だけ載せておきます。

BufferOverflow.DROP_LATEST

tryEmit

普通の emitsuspend 関数になっていますが、 tryEmit というのもあり、こちらは通常の関数になっています。
tryEmit は戻り値があり、trueのときは正常に値を発行できて、falseは失敗したことになります。

これはバッファーを使用した仕組みになり、 BufferOverflow.SUSPEND の場合のみ false が発生可能性があります。バッファーの容量がないときにfalseになります。

以下の単純な例です。キャッシュは不要だけどバッファリングする感じにしています。

private val _event = MutableSharedFlow<Event>(
extraBufferCapacity = 1

)
val event: SharedFlow<Event> = _event
_event.tryEmit(...) // true
_event.tryEmit(...) // false
_event.tryEmit(...) // true

ただし、以下の例ではバッファーの容量がないので、 tryEmit はすべて失敗してfalseを返します。

private val _event = MutableSharedFlow<Event>()
val event: SharedFlow<Event> = _event
_event.tryEmit(...) // false
_event.tryEmit(...) // false
_event.tryEmit(...) // false

--

--

Kenji Abe

Programmer / Gamer / Google Developers Expert for Android, Kotlin / @STAR_ZERO