【Rxjs活用講座】SubjectでMulticastさせて使う場合の勘所


※ 当ページには【広告/PR】を含む場合があります。
2020/07/11
2022/10/05
【Rxjs活用講座】publish().refCount() と share() の微妙な違い
【Rxjs基礎講座】rxjs#ajaxでCookie付きのログイン認証を行う
Subject は、値を流すことに特化した特殊なObservableです。
以前の解説記事・
ObservableのCOLDとHOTって結局なんなの? ではObservalbeの ColdHot の性質の違いに触れました。

合同会社タコスキングダム|蛸壺の技術ブログ
【Rxjsのすゝめ】ObservableのCOLDとHOTって結局なんなの?

Rxjsを使いこなしていく中で一度は疑問に思うObservableのColdとHotの違いをまとめてみました。



今回は丁度いい機会ですので、
Subject を利用する場合に遭遇するであろう、 UnicastMulticast の違いに関して考察してみましょう。


合同会社タコスキングダム|蛸壺の技術ブログ【効果的学習法レポート・2025年最新】Angular(JSフレームワーク)をこれから学びたい人のためのオススメ書籍&教材特集

Unicastとは



Subjectを利用する場合、
Cold なObservableの性質を、 Unicast と呼んでいます。
ObserbavleのColdな性質に関しては、
前回の記事 で解説した通りです。

合同会社タコスキングダム|蛸壺の技術ブログ
【Rxjsのすゝめ】ObservableのCOLDとHOTって結局なんなの?

Rxjsを使いこなしていく中で一度は疑問に思うObservableのColdとHotの違いをまとめてみました。



Subjectは特別扱いされたObservableとはいえ、普通に使えば単なるObservableです。
ここで
ColdなSubject と言わずに、 Unicast と呼んで区別しているのは、後述に説明するSubject専用の multicastオペレーター の存在があるためではないかと思われます。
ちなみに
multicastオペレーター はHOTなストリームへ分配してくれる機能をもってる、Subject専用の publishオペレーター と言えます。
例えば以下のように
index.ts のコードを一例にとって、SubjectをUnicastして使ってみましょう。

            import { Subject } from 'rxjs';

function unicast$() : Subject<any> {
    console.log(`👇Cold stream is getting started.`);
    return new Subject<any>();
}

const unicast = unicast$();

unicast.pipe(tap(_ => console.log('Stream #1 :'))).subscribe(res => console.log(res));
unicast.pipe(tap(_ => console.log('Stream #2 :'))).subscribe(res => console.log(res));
unicast.pipe(tap(_ => console.log('Stream #3 :'))).subscribe(res => console.log(res));
unicast.pipe(tap(_ => console.log('Stream #4 :'))).subscribe(res => console.log(res));
unicast.pipe(tap(_ => console.log('Stream #5 :'))).subscribe(res => console.log(res));
unicast.pipe(tap(_ => console.log('Stream #6 :'))).subscribe(res => console.log(res));

unicast.next(new Date());

        

このコードをビルド後に実行すると、

            $ node dist/index.js
👇Cold stream is getting started.
Stream #1 :
2020-07-10T07:14:06.925Z
Stream #2 :
2020-07-10T07:14:06.925Z
Stream #3 :
2020-07-10T07:14:06.925Z
Stream #4 :
2020-07-10T07:14:06.925Z
Stream #5 :
2020-07-10T07:14:06.925Z
Stream #6 :
2020-07-10T07:14:06.925Z

        

となります。
まずSubjectの特徴としてObservableとの違いとして、普通のObservableではsubscribeを呼び出すと即時ストリームが開始されています。
そして各ストリームで処理が実行されていきますが、Subjectの場合は
next メソッドを呼び出した時にsubscribe登録していたストリームの処理が開始され、引数にとった値が流れ始めます。
一見流れている値が同じです。
ストリーム自体も共有されているような錯覚に陥りますが、同じ値を個々のストリームに流しているに過ぎず、1つずつのストリーム自体は独立しています(ColdなObservableの性質)。
このプログラムの例は単純です。
より複雑なストリーム上で処理の同期やストリーミング値の共有などを実現したいときに、subscribeするたびに独立した新しいストリームが乱立していては思いどおりの挙動とはならないはずです。
このような場合には、
multicastオペレーター を使って実装する必要が出てきます。


合同会社タコスキングダム|蛸壺の技術ブログ【効果的学習法レポート・2025年最新】Angular(JSフレームワーク)をこれから学びたい人のためのオススメ書籍&教材特集

Multicast



先程も述べましたが、
multicastオペレーター は、Subject版 publishオペレーター ともいう位置づけの関数です。
これによって、
Cold なSubjectから Hot なストリームへ分配できます。

            import { Subject, Observable, ConnectableObservable } from 'rxjs';
import { of } from 'rxjs';
import { multicast } from 'rxjs/operators';

function multicast$() : Observable<any> {
    console.log(`👇Hot stream is getting started.`);
    return of(new Date()).pipe(
        multicast(new Subject<any>()) // SubjectからHotなObservableへ分配
    );
}

const multicast = multicast$();

multicast.pipe(
    tap(_ => console.log('Sub stream #1 :'))
).subscribe(
    res => console.log(res),
    err => console.log(err),
    () => console.log('Sub stream #1 is Finished!')
);
multicast.pipe(
    tap(_ => console.log('Sub stream #2 :'))
).subscribe(
    res => console.log(res),
    err => console.log(err),
    () => console.log('Sub stream #2 is Finished!')
);

// TypescriptではConnectableObservable型へキャストが必要
(multicast as ConnectableObservable<any>).connect();

        

このコードをビルド後に実行すると、

            $ node dist/index.js
👇Hot stream is getting started.
Sub stream #1 :
2020-07-10T08:05:29.412Z
Sub stream #2 :
2020-07-10T08:05:29.412Z
Sub stream #1 is Finished!
Sub stream #2 is Finished!

        

これは前回のHotなObservableで説明した
publish() ~ connect() 相当の結果になっています。
使い方だけみると、
multicastオペレーター はObservableのpipe関数のなかで利用する仕様です。
また、引数にSubjectインタンスを取るのですが、Subjectは随分と脇役的な扱われ方になっているので、多少はSubjectの特別なObservable感は一気に無くなっている気はします。
とりあえずSubjectをMulticast(HotなObservable化)することができるようになりました。


合同会社タコスキングダム|蛸壺の技術ブログ【効果的学習法レポート・2025年最新】Angular(JSフレームワーク)をこれから学びたい人のためのオススメ書籍&教材特集

publish().refCount()相当への書き換え

publish() ~ connect() 相当の結果にできるということは、当然前回の記事で説明したような 「publish().refCount()」 相当の結果にすることも可能です。


合同会社タコスキングダム|蛸壺の技術ブログ
【Rxjs活用講座】publish().refCount() と share() の微妙な違い

Rxjsで良くみるパターンである「publish().refCount()」と「share()」と微妙な作用の違いを具体的なコードで検証してみます。

            import { Subject, Observable } from 'rxjs';
import { of } from 'rxjs';
import { multicast, refCount } from 'rxjs/operators';

function multicast$() : Observable<any> {
    console.log(`👇Hot stream is getting started.`);
    return of(new Date()).pipe(
        multicast(new Subject<any>()),
        refCount() // HotなObservableはsubscript次第即時開始される
    );
}

const multicast = multicast$();

multicast.pipe(
    tap(_ => console.log('Sub stream #1 :'))
).subscribe(
    res => console.log(res),
    err => console.log(err),
    () => console.log('Sub stream #1 is Finished!')
);
multicast.pipe(
    tap(_ => console.log('Sub stream #2 :'))
).subscribe(
    res => console.log(res),
    err => console.log(err),
    () => console.log('Sub stream #2 is Finished!')
);

        

このコードをビルド後に実行すると、

            $ node dist/index.js
👇Hot stream is getting started.
Sub stream #1 :
2020-07-10T08:17:50.057Z
Sub stream #1 is Finished!
Sub stream #2 is Finished!

        

となり、先に処理された
Sub stream #1 が処理完了した時点で、他の支流で処理中だったストリームも処理完了しています。
これは正しく、
publish().refCount() 相当の挙動となっていることが分かります。


合同会社タコスキングダム|蛸壺の技術ブログ【効果的学習法レポート・2025年最新】Angular(JSフレームワーク)をこれから学びたい人のためのオススメ書籍&教材特集

まとめ



Subjectは元々Multicasted Observablesを意識して設計されたクラスです.
特に値を流して分配するようなストリーミングにおいてmulticastオペレーターを使って、Hotなストリームを作る、というのが定石の作法のようです。
ただ、本家のObservableでHotなストリームを実装しても大して手間は変わりません。
混ぜて使うと後々訳が分からなくなりそうなので、
Observableでpublishオペレーター で攻めるか、 Subjectでmulticastオペレーター で攻めるかは、予めチームで統一しておきたいところです。
記事を書いた人

記事の担当:taconocat

ナンデモ系エンジニア

主にAngularでフロントエンド開発することが多いです。 開発環境はLinuxメインで進めているので、シェルコマンドも多用しております。 コツコツとプログラミングするのが好きな人間です。

合同会社タコスキングダム|蛸壺の技術ブログ【効果的学習法レポート・2025年最新】Angular(JSフレームワーク)をこれから学びたい人のためのオススメ書籍&教材特集