カテゴリー
【Rxjs活用講座】SubjectでMulticastさせて使う場合の勘所
※ 当ページには【広告/PR】を含む場合があります。
2020/07/11
2022/10/05
以前の解説記事・
Cold
Hot
今回は丁度いい機会ですので、
Subject
Unicast
Multicast
Unicastとは
Subjectを利用する場合、
Cold
Unicast
ObserbavleのColdな性質に関しては、
Subjectは特別扱いされたObservableとはいえ、普通に使えば単なるObservableです。
ここで
ColdなSubject
Unicast
multicastオペレーター
ちなみに
multicastオペレーター
publishオペレーター
例えば以下のように
index.ts
import { Subject } from 'rxjs';
function unicast$() : Subject<any> {
console.log(`👇Cold stream is getting started.`);
return new Subject<any>();
}
const unicast = unicast$();
unicast.pipe(tap(_ => console.log('Stream #1 :'))).subscribe(res => console.log(res));
unicast.pipe(tap(_ => console.log('Stream #2 :'))).subscribe(res => console.log(res));
unicast.pipe(tap(_ => console.log('Stream #3 :'))).subscribe(res => console.log(res));
unicast.pipe(tap(_ => console.log('Stream #4 :'))).subscribe(res => console.log(res));
unicast.pipe(tap(_ => console.log('Stream #5 :'))).subscribe(res => console.log(res));
unicast.pipe(tap(_ => console.log('Stream #6 :'))).subscribe(res => console.log(res));
unicast.next(new Date());
このコードをビルド後に実行すると、
$ node dist/index.js
👇Cold stream is getting started.
Stream #1 :
2020-07-10T07:14:06.925Z
Stream #2 :
2020-07-10T07:14:06.925Z
Stream #3 :
2020-07-10T07:14:06.925Z
Stream #4 :
2020-07-10T07:14:06.925Z
Stream #5 :
2020-07-10T07:14:06.925Z
Stream #6 :
2020-07-10T07:14:06.925Z
となります。
まずSubjectの特徴としてObservableとの違いとして、普通のObservableではsubscribeを呼び出すと即時ストリームが開始されています。
そして各ストリームで処理が実行されていきますが、Subjectの場合は
next
一見流れている値が同じです。
ストリーム自体も共有されているような錯覚に陥りますが、同じ値を個々のストリームに流しているに過ぎず、1つずつのストリーム自体は独立しています(ColdなObservableの性質)。
このプログラムの例は単純です。
より複雑なストリーム上で処理の同期やストリーミング値の共有などを実現したいときに、subscribeするたびに独立した新しいストリームが乱立していては思いどおりの挙動とはならないはずです。
このような場合には、
multicastオペレーター
Multicast
先程も述べましたが、
multicastオペレーター
publishオペレーター
これによって、
Cold
Hot
import { Subject, Observable, ConnectableObservable } from 'rxjs';
import { of } from 'rxjs';
import { multicast } from 'rxjs/operators';
function multicast$() : Observable<any> {
console.log(`👇Hot stream is getting started.`);
return of(new Date()).pipe(
multicast(new Subject<any>()) // SubjectからHotなObservableへ分配
);
}
const multicast = multicast$();
multicast.pipe(
tap(_ => console.log('Sub stream #1 :'))
).subscribe(
res => console.log(res),
err => console.log(err),
() => console.log('Sub stream #1 is Finished!')
);
multicast.pipe(
tap(_ => console.log('Sub stream #2 :'))
).subscribe(
res => console.log(res),
err => console.log(err),
() => console.log('Sub stream #2 is Finished!')
);
// TypescriptではConnectableObservable型へキャストが必要
(multicast as ConnectableObservable<any>).connect();
このコードをビルド後に実行すると、
$ node dist/index.js
👇Hot stream is getting started.
Sub stream #1 :
2020-07-10T08:05:29.412Z
Sub stream #2 :
2020-07-10T08:05:29.412Z
Sub stream #1 is Finished!
Sub stream #2 is Finished!
これは前回のHotなObservableで説明した
publish() ~ connect()
使い方だけみると、
multicastオペレーター
また、引数にSubjectインタンスを取るのですが、Subjectは随分と脇役的な扱われ方になっているので、多少はSubjectの特別なObservable感は一気に無くなっている気はします。
とりあえずSubjectをMulticast(HotなObservable化)することができるようになりました。
publish().refCount()相当への書き換え
publish() ~ connect()
import { Subject, Observable } from 'rxjs';
import { of } from 'rxjs';
import { multicast, refCount } from 'rxjs/operators';
function multicast$() : Observable<any> {
console.log(`👇Hot stream is getting started.`);
return of(new Date()).pipe(
multicast(new Subject<any>()),
refCount() // HotなObservableはsubscript次第即時開始される
);
}
const multicast = multicast$();
multicast.pipe(
tap(_ => console.log('Sub stream #1 :'))
).subscribe(
res => console.log(res),
err => console.log(err),
() => console.log('Sub stream #1 is Finished!')
);
multicast.pipe(
tap(_ => console.log('Sub stream #2 :'))
).subscribe(
res => console.log(res),
err => console.log(err),
() => console.log('Sub stream #2 is Finished!')
);
このコードをビルド後に実行すると、
$ node dist/index.js
👇Hot stream is getting started.
Sub stream #1 :
2020-07-10T08:17:50.057Z
Sub stream #1 is Finished!
Sub stream #2 is Finished!
となり、先に処理された
Sub stream #1
これは正しく、
publish().refCount()
まとめ
Subjectは元々Multicasted Observablesを意識して設計されたクラスです.
特に値を流して分配するようなストリーミングにおいてmulticastオペレーターを使って、Hotなストリームを作る、というのが定石の作法のようです。
ただ、本家のObservableでHotなストリームを実装しても大して手間は変わりません。
混ぜて使うと後々訳が分からなくなりそうなので、
Observableでpublishオペレーター
Subjectでmulticastオペレーター
記事を書いた人
ナンデモ系エンジニア
主にAngularでフロントエンド開発することが多いです。 開発環境はLinuxメインで進めているので、シェルコマンドも多用しております。 コツコツとプログラミングするのが好きな人間です。
カテゴリー