Coroutines SharedFlow解説
Coroutines 1.4.0-M1 に SharedFlow
が入ったので、使い方を紹介します。
SharedFlow?
SharedFlow
は名前の通り Flow
ですが、普通の Flow は Cold Stream に対して、こちらは Hot Stream になります。Flow と異なり Complete することはないのです。
また、キャッシュとバッファリングに対応することができます。
SharedFlow
で発行された値は collect
(サブスクライブ)してる箇所すべてに通知されるます。
ユースケース
以下のIssueのdescriptionを見てみてください。
同期的なイベントリスナーとか、イベントをキャッシュとかリプレイするとか、みたいな感じですかね。
個人的に、Androidでは LiveData
を使ってイベント通知(SingleLiveEvent
)などやってたと思いますが、それの代わりになりそうだなぁって思っています。
(これは後日まとめたいと思います)
簡単な使い方
Androidやってる方は LiveData と同じような使い方に見えると思いますが、だいたいそんな感じです。
MutableSharedFlow
という値を発行可能なものを用意して SharedFlow
でそれを外部に公開する。公開された SharedFlow
を collect
して値を受信できるようにする感じです。
値を発行するときは emit
という suspend
関数を呼び出します。suspend
関数なので注意してください。
キャッシュ と バッファリング
まずは MutableSharedFlow
の生成するときの引数を見てみます。デフォルト引数は次のような感じになっています。
private val _event = MutableSharedFlow<Event>(
replay = 0,
extraBufferCapacity = 0,
onBufferOverflow = BufferOverflow.SUSPEND
)
これらの引数を使って、キャッシュとバッファリングの制御を行います。
キャッシュ
ドキュメントでは Replay cache
と呼んでるみたいです
最初の replay
引数は発行された値を保存するキャッシュの件数になります。デフォルトは 0
でキャッシュされません。
private val _event = MutableSharedFlow<Event>(replay = 2)
val event: SharedFlow<Event> = _event_event.emit( ... )
_event.emit( ... )
_event.emit( ... )
上の例では、3回 emit
した場合に、最新の2件を保持してくれます。
保持した値を参照するには replayCacne
を使うことでListで取得することができます。
sample.event.replayCache // [Event, Event]
バッファリング
SharedFlow
では collect
の処理が時間がかかってる場合に何度も emit
にて値が発行された場合にバッファリングしてくれる仕組みがあります。
まず、大事になるのが、 onBufferOverflow
のパラメータです。SUSPEND
, DROP_OLDEST
, DROP_LATEST
の3つを指定することができます。
BufferOverflow.SUSPEND
まず SUSPEND
ですが、こちらの場合は、バッファリングはせずに emit
したときに処理中のものがあれば終わるまで待ちます。これがデフォルトの挙動になります。
private val _event = MutableSharedFlow<Event>() // すべてデフォルト
val event: SharedFlow<Event> = _eventevent.collect { event ->
// 時間のかかる処理
}_event.emit( ... )
_event.emit( ... ) // 最初のcollectの処理が終わるまで待機
図にするとこんな感じです。

BufferOverflow.DROP_OLDEST
次に DROP_OLDEST
です。この設定のときは replay
または extraBufferCapacity
のパラメータを 0
より大きい値にする必要があります。
この設定の場合は、短時間に emit
が多く発生した場合はバッファーに入らない古いものが破棄されていく感じになります。
また、バッファーのサイズは replay
と extraBufferCapacity
の値を足したものになります。以下の例だと 2
になります。
private val _event = MutableSharedFlow<Event>(
replay = 1,
extraBufferCapacity = 1,
onBufferOverflow = BufferOverflow.DROP_OLDEST
)
val event: SharedFlow<Event> = _eventevent.collect { event ->
// 時間のかかる処理
}_event.emit( ... ) // 短時間にemitした場合、古いのが破棄される
_event.emit( ... )
_event.emit( ... )
図にすると以下の感じです。

BufferOverflow.DROP_LATEST
最後に DROP_LATEST
ですが、これは先程の DROP_OLDEST
と逆にバッファーに入らない最新のものが破棄されます。
似たような感じなので図だけ載せておきます。

tryEmit
普通の emit
は suspend
関数になっていますが、 tryEmit
というのもあり、こちらは通常の関数になっています。tryEmit
は戻り値があり、trueのときは正常に値を発行できて、falseは失敗したことになります。
これはバッファーを使用した仕組みになり、 BufferOverflow.SUSPEND
の場合のみ false が発生可能性があります。バッファーの容量がないときにfalseになります。
以下の単純な例です。キャッシュは不要だけどバッファリングする感じにしています。
private val _event = MutableSharedFlow<Event>(
extraBufferCapacity = 1
)
val event: SharedFlow<Event> = _event_event.tryEmit(...) // true
_event.tryEmit(...) // false
_event.tryEmit(...) // true
ただし、以下の例ではバッファーの容量がないので、 tryEmit
はすべて失敗してfalseを返します。
private val _event = MutableSharedFlow<Event>()
val event: SharedFlow<Event> = _event_event.tryEmit(...) // false
_event.tryEmit(...) // false
_event.tryEmit(...) // false