[Rxjs] SubjectでMulticastさせて使う場合の勘所


2020/07/11

Subjectは、値を流すことに特化した特殊なObservableです。

前回の解説記事|ObservableのCOLDとHOTって結局なんなの?ではObservalbeのColdHotの性質の違いに触れました。

今回は丁度いい機会ですので、
Subjectを利用する場合に遭遇するであろう、UnicastMulticastの違いに関して考察してみましょう。


Unicastとは

Subjectを利用する場合、ColdなObservableの性質を、Unicastと呼んでいます。

ObserbavleのColdな性質に関しては、
前回の記事で解説した通りです。

Subjectは特別扱いされたObservableとはいえ、普通に使えば単なるObservableです。

ここで
ColdなSubjectと言わずに、Unicastと呼んで区別しているのは、後述に説明するSubject専用のmulticastオペレーターの存在があるためではないかと思われます。

ちなみに
multicastオペレーターはHOTなストリームへ分配してくれる機能をもってる、Subject専用のpublishオペレーターと言えます。

例えば以下のように
index.tsのコードを一例にとって、SubjectをUnicastして使ってみましょう。

            
            import { Subject } from 'rxjs';

function unicast$() : Subject<any> {
    console.log(`👇Cold stream is getting started.`);
    return new Subject<any>();
}

const unicast = unicast$();

unicast.pipe(tap(_ => console.log('Stream #1 :'))).subscribe(res => console.log(res));
unicast.pipe(tap(_ => console.log('Stream #2 :'))).subscribe(res => console.log(res));
unicast.pipe(tap(_ => console.log('Stream #3 :'))).subscribe(res => console.log(res));
unicast.pipe(tap(_ => console.log('Stream #4 :'))).subscribe(res => console.log(res));
unicast.pipe(tap(_ => console.log('Stream #5 :'))).subscribe(res => console.log(res));
unicast.pipe(tap(_ => console.log('Stream #6 :'))).subscribe(res => console.log(res));

unicast.next(new Date());
        
このコードをビルド後に実行すると、

            
            $ node dist/index.js
👇Cold stream is getting started.
Stream #1 :
2020-07-10T07:14:06.925Z
Stream #2 :
2020-07-10T07:14:06.925Z
Stream #3 :
2020-07-10T07:14:06.925Z
Stream #4 :
2020-07-10T07:14:06.925Z
Stream #5 :
2020-07-10T07:14:06.925Z
Stream #6 :
2020-07-10T07:14:06.925Z
        
となります。

まずSubjectの特徴としてObservableとの違いとして、普通のObservableではsubscribeを呼び出すと即時ストリームが開始されています。

そして各ストリームで処理が実行されていきますが、Subjectの場合は
nextメソッドを呼び出した時にsubscribe登録していたストリームの処理が開始され、引数にとった値が流れ始めます。

一見流れている値が同じです。

ストリーム自体も共有されているような錯覚に陥りますが、同じ値を個々のストリームに流しているに過ぎず、1つずつのストリーム自体は独立しています(ColdなObservableの性質)。

このプログラムの例は単純です。

より複雑なストリーム上で処理の同期やストリーミング値の共有などを実現したいときに、subscribeするたびに独立した新しいストリームが乱立していては思いどおりの挙動とはならないはずです。

このような場合には、
multicastオペレーターを使って実装する必要が出てきます。


Multicast

先程も述べましたが、multicastオペレーターは、Subject版publishオペレーターともいう位置づけの関数です。

これによって、
ColdなSubjectからHotなストリームへ分配できます。

            
            import { Subject, Observable, ConnectableObservable } from 'rxjs';
import { of } from 'rxjs';
import { multicast } from 'rxjs/operators';

function multicast$() : Observable<any> {
    console.log(`👇Hot stream is getting started.`);
    return of(new Date()).pipe(
        multicast(new Subject<any>()) // SubjectからHotなObservableへ分配
    );
}

const multicast = multicast$();

multicast.pipe(
    tap(_ => console.log('Sub stream #1 :'))
).subscribe(
    res => console.log(res),
    err => console.log(err),
    () => console.log('Sub stream #1 is Finished!')
);
multicast.pipe(
    tap(_ => console.log('Sub stream #2 :'))
).subscribe(
    res => console.log(res),
    err => console.log(err),
    () => console.log('Sub stream #2 is Finished!')
);

// TypescriptではConnectableObservable型へキャストが必要
(multicast as ConnectableObservable<any>).connect();
        
このコードをビルド後に実行すると、

            
            $ node dist/index.js
👇Hot stream is getting started.
Sub stream #1 :
2020-07-10T08:05:29.412Z
Sub stream #2 :
2020-07-10T08:05:29.412Z
Sub stream #1 is Finished!
Sub stream #2 is Finished!
        
これは前回のHotなObservableで説明したpublish() ~ connect()相当の結果になっています。

使い方だけみると、
multicastオペレーターはObservableのpipe関数のなかで利用する仕様です。

また、引数にSubjectインタンスを取るのですが、Subjectは随分と脇役的な扱われ方になっているので、多少はSubjectの特別なObservable感は一気に無くなっている気はします。

とりあえずSubjectをMulticast(HotなObservable化)することができるようになりました。


publish().refCount()相当

publish() ~ connect()相当の結果にできるということは、当然前回の記事で説明したようなpublish().refCount()相当の結果にすることも可能です。

            
            import { Subject, Observable } from 'rxjs';
import { of } from 'rxjs';
import { multicast, refCount } from 'rxjs/operators';

function multicast$() : Observable<any> {
    console.log(`👇Hot stream is getting started.`);
    return of(new Date()).pipe(
        multicast(new Subject<any>()),
        refCount() // HotなObservableはsubscript次第即時開始される
    );
}

const multicast = multicast$();

multicast.pipe(
    tap(_ => console.log('Sub stream #1 :'))
).subscribe(
    res => console.log(res),
    err => console.log(err),
    () => console.log('Sub stream #1 is Finished!')
);
multicast.pipe(
    tap(_ => console.log('Sub stream #2 :'))
).subscribe(
    res => console.log(res),
    err => console.log(err),
    () => console.log('Sub stream #2 is Finished!')
);
        
このコードをビルド後に実行すると、

            
            $ node dist/index.js
👇Hot stream is getting started.
Sub stream #1 :
2020-07-10T08:17:50.057Z
Sub stream #1 is Finished!
Sub stream #2 is Finished!
        
となり、先に処理されたSub stream #1が処理完了した時点で、他の支流で処理中だったストリームも処理完了しています。

これは正しく、
publish().refCount()相当の挙動となっていることが分かります。


まとめ

Subjectは元々Multicasted Observablesを意識して設計されたクラスです.

特に値を流して分配するようなストリーミングにおいてmulticastオペレーターを使って、Hotなストリームを作る、というのが定石の作法のようです。

ただ、本家のObservableでHotなストリームを実装しても大して手間は変わりません。

混ぜて使うと後々訳が分からなくなりそうなので、
Observableでpublishオペレーターで攻めるか、Subjectでmulticastオペレーターで攻めるかは、予めチームで統一しておきたいところです。
記事を書いた人

記事の担当:taconocat

ナンデモ系エンジニア

主にAngularでフロントエンド開発することが多いです。 開発環境はLinuxメインで進めているので、シェルコマンドも多用しております。 コツコツとプログラミングするのが好きな人間です。