[Rxjs] publish().refCount() と share() の微妙な違い


2020/07/10

publish().refCount()share()と似たような挙動をします。

ですがこの双方は決して置き換え可能ではありません。

今回はどのように違うのかのワンポイント講座です。


TL;DR

まずは結論から。

publish().refCount()は、HOTなストリームで分配させた支流のうちのどれか1つでも処理完了(すなわちCompleteが処理)されたタイミングで、他に分岐させたストリームも開始・処理中の状態もキャンセルされて、HOTストリーム全てで処理完了させます。

対して、
share()は、HOTなストリームを複数に分配した後、どこかの支流で処理完了したとしても、その他の支流は干渉を受けずに開始・処理が継続することができます。

文面だけだと少しイメージが付きにくいと思いますので、以降では具体例を交えつつこの特性を説明していきます。


share()

shareメソッドに関しては前回の内容でshareオペレーターによるHOTストリームの開始の項目においても説明しました。

おさらいになりますが、shareオペレーターはColdなストリームをHotなストリームに分配する関数です。

publishオペレーターはconnectメソッドを使ってHOTストリームを開始するタイミングを明確にコードの中に実装する必要があります。

shareではconnectメソッドがなくてもsubscribeを受け取るとストリームが即時発行される仕様です。

            
            import { Observable } from 'rxjs';
import { of } from 'rxjs';
import { switchMap, share } from 'rxjs/operators';

function hot$() : Observable<any> {
    console.log(`👇Hot stream is getting started.`);
    return of(1).pipe(
            switchMap(_ => of(new Date())
        ),
        share() // HOTストリームで分配 + 支流は即時処理を開始
    );
}

const share$ = hot$();

share$.pipe(
    tap(_ => console.log('Sub stream #1 :'))
).subscribe(
    res => console.log(res),
    err => console.log(err),
    () => console.log('Sub stream #1 is Finished!')
);

share$.pipe(
    tap(_ => console.log('Sub stream #2 :'))
).subscribe(
    res => console.log(res),
    err => console.log(err),
    () => console.log('Sub stream #2 is Finished!')
);
        
このコードをビルドして、nodeで実行してみます。

            
            $ node dist/index.js
👇Hot stream is getting started.
Sub stream #1 :
2020-07-09T16:54:26.568Z
Sub stream #1 is Finished!
Sub stream #2 :
2020-07-09T16:54:26.575Z
Sub stream #2 is Finished!
        
これは前回の内容と同様です。

Coldなストリームが
shareでHotなストリームに変換・分配され、Sub stream #1Sub stream #2が独立して処理されています。

つまり、
shareで分配された支流のストリームは独立した非同期処理を実現したい時に利用することが分かります。


publish().refCount()

それでは次にpublish().refCount()のパターンを使ったHOTなObservableの挙動を確かめてみます。

            
            import { Observable } from 'rxjs';
import { of } from 'rxjs';
import { switchMap, publish, refCount } from 'rxjs/operators';

function hot$() : Observable<any> {
    console.log(`👇Hot Stream is getting started.`);
    return of(1).pipe(
            switchMap(_ => of(new Date())
        ),
        publish(), // HOTストリームとして分配するだけ
        refCount() // 分岐したストリームは任意のタイミングで開始
    );
}

const publishRefCount$ = hot$();
publishRefCount$.pipe(
    tap(_ => console.log('Sub stream #1 :'))
).subscribe(
    res => console.log(res),
    err => console.log(err),
    () => console.log('Sub stream #1 is Finished!')
);
publishRefCount$.pipe(
    tap(_ => console.log('Sub stream #2 :'))
).subscribe(
    res => console.log(res),
    err => console.log(err),
    () => console.log('Sub stream #2 is Finished!')
);
        
このコードもビルドして、実行してみましょう。

            
            $ node dist/index.js
👇Hot stream is getting started.
Sub stream #1 :
2020-07-09T16:54:26.578Z
Sub stream #1 is Finished!
Sub stream #2 is Finished!
        
こちらはshareと比較して随分と結果が異なります。

refCountメソッドは、支流のストリームをsubscribeを受け取り次第即時開始させることは、shareと似ていると言えますが、処理される結果は別物です。

このコード例の場合、まずHotに分岐された支流のうち、先に
Sub stream #1が先に処理開始され、その後Sub stream #2もちょっと遅れで処理開始されます。

同じ処理をしているので当然
Sub stream #1が先に処理完了してしまう訳ですが、処理中であったSub stream #2も問答無用で処理を止めて途中で完了してしまっています。

すなわち、
publish().refCount()は支流のどこか1つでも完了すると、他の支流も全て完了させてしまう同期的な処理を作り出すことができるのです。


まとめ

では今回のポイントを復唱しておきましょう。

publish().refCount()は、HOTなストリームで分配させた支流のうちのどれか1つでも処理完了(すなわちCompleteが処理)されたタイミングで、他に分岐させたストリームも開始・処理中の状態もキャンセルされて、HOTストリーム全てで処理完了させます。

対して、
share()は、HOTなストリームを複数に分配した後、どこかの支流で処理完了したとしても、その他の支流は干渉を受けずに開始・処理が継続することができます。


参考サイト

Rxjs Observable publish refcount vs share

記事を書いた人

記事の担当:taconocat

ナンデモ系エンジニア

主にAngularでフロントエンド開発することが多いです。 開発環境はLinuxメインで進めているので、シェルコマンドも多用しております。 コツコツとプログラミングするのが好きな人間です。