カテゴリー
【Rxjs活用講座】SubjectでMulticastさせて使う場合の勘所
※ 当ページには【広告/PR】を含む場合があります。
2020/07/11
2022/10/05
Cold
Hot
Subject
Unicast
Multicast
Unicastとは
Cold
Unicast
ColdなSubject
Unicast
multicastオペレーター
multicastオペレーター
publishオペレーター
index.ts
import { Subject } from 'rxjs';
function unicast$() : Subject<any> {
console.log(`👇Cold stream is getting started.`);
return new Subject<any>();
}
const unicast = unicast$();
unicast.pipe(tap(_ => console.log('Stream #1 :'))).subscribe(res => console.log(res));
unicast.pipe(tap(_ => console.log('Stream #2 :'))).subscribe(res => console.log(res));
unicast.pipe(tap(_ => console.log('Stream #3 :'))).subscribe(res => console.log(res));
unicast.pipe(tap(_ => console.log('Stream #4 :'))).subscribe(res => console.log(res));
unicast.pipe(tap(_ => console.log('Stream #5 :'))).subscribe(res => console.log(res));
unicast.pipe(tap(_ => console.log('Stream #6 :'))).subscribe(res => console.log(res));
unicast.next(new Date());
$ node dist/index.js
👇Cold stream is getting started.
Stream #1 :
2020-07-10T07:14:06.925Z
Stream #2 :
2020-07-10T07:14:06.925Z
Stream #3 :
2020-07-10T07:14:06.925Z
Stream #4 :
2020-07-10T07:14:06.925Z
Stream #5 :
2020-07-10T07:14:06.925Z
Stream #6 :
2020-07-10T07:14:06.925Z
next
multicastオペレーター
Multicast
multicastオペレーター
publishオペレーター
Cold
Hot
import { Subject, Observable, ConnectableObservable } from 'rxjs';
import { of } from 'rxjs';
import { multicast } from 'rxjs/operators';
function multicast$() : Observable<any> {
console.log(`👇Hot stream is getting started.`);
return of(new Date()).pipe(
multicast(new Subject<any>()) // SubjectからHotなObservableへ分配
);
}
const multicast = multicast$();
multicast.pipe(
tap(_ => console.log('Sub stream #1 :'))
).subscribe(
res => console.log(res),
err => console.log(err),
() => console.log('Sub stream #1 is Finished!')
);
multicast.pipe(
tap(_ => console.log('Sub stream #2 :'))
).subscribe(
res => console.log(res),
err => console.log(err),
() => console.log('Sub stream #2 is Finished!')
);
// TypescriptではConnectableObservable型へキャストが必要
(multicast as ConnectableObservable<any>).connect();
$ node dist/index.js
👇Hot stream is getting started.
Sub stream #1 :
2020-07-10T08:05:29.412Z
Sub stream #2 :
2020-07-10T08:05:29.412Z
Sub stream #1 is Finished!
Sub stream #2 is Finished!
publish() ~ connect()
multicastオペレーター
publish().refCount()相当への書き換え
publish() ~ connect()
import { Subject, Observable } from 'rxjs';
import { of } from 'rxjs';
import { multicast, refCount } from 'rxjs/operators';
function multicast$() : Observable<any> {
console.log(`👇Hot stream is getting started.`);
return of(new Date()).pipe(
multicast(new Subject<any>()),
refCount() // HotなObservableはsubscript次第即時開始される
);
}
const multicast = multicast$();
multicast.pipe(
tap(_ => console.log('Sub stream #1 :'))
).subscribe(
res => console.log(res),
err => console.log(err),
() => console.log('Sub stream #1 is Finished!')
);
multicast.pipe(
tap(_ => console.log('Sub stream #2 :'))
).subscribe(
res => console.log(res),
err => console.log(err),
() => console.log('Sub stream #2 is Finished!')
);
$ node dist/index.js
👇Hot stream is getting started.
Sub stream #1 :
2020-07-10T08:17:50.057Z
Sub stream #1 is Finished!
Sub stream #2 is Finished!
Sub stream #1
publish().refCount()
まとめ
Observableでpublishオペレーター
Subjectでmulticastオペレーター
記事を書いた人
ナンデモ系エンジニア
主にAngularでフロントエンド開発することが多いです。 開発環境はLinuxメインで進めているので、シェルコマンドも多用しております。 コツコツとプログラミングするのが好きな人間です。
カテゴリー