カテゴリー
【Rxjsのすゝめ】ObservableのCOLDとHOTって結局なんなの?
※ 当ページには【広告/PR】を含む場合があります。
2020/07/07
2022/10/05
Cold
Hot
TL;DR
COLD(通常のObservableの持つ性質):
+ subscribe()で明示にストリーミングが開始される
+ 1つのストリームには1つのオブザーバー(1対1)
+ ストリームの分配はできない
+ 他からの割り込みも受けない
+ つまりはObservableクラスそのもの
HOT(通常の用途とは異なる性質):
+ ストリーミングにsubscribe()は不要(垂れ流し)
+ ストリームを分配し、同一の値を複数のオブザーバーへ
送信することができる(1対多)
+ ストリームの値を共有できる
+ COLDからHOTへのストリーム分配にはpublish()を使う
+ 正体はConnectableObservableという派生クラス
HOT
COLD
subscribe()
COLD
HOT
甲種と乙種
A面・B面
COLD/HOT
Hot/Coldの源流を探る
hotとかcoldとかいうのは、つまりはstaticとdynamic、もしくはimmutableやmutableとかみたいなもの?
Hot = Running / Cold = Not Running
Static/Dynamic would be misleading.
Immutable/Mutable is also misleading.
Perhaps, the closest terms are Deferred / Not-Deferred or Latent / Active.
The terms Hot and Cold came from whiteboarding and
design sessions where we needed a term to describe the difference
between something that running versus something that represents a thing that can be run.
We definitely didn't sit down and figure out a vocabulary, these things just happen.
I'm sure that at some point in time people will think about standard terminology,
but we usually can't wait for that to happen.
terminology(専門用語)
Deferred / Not-Deferred
Hot/Cold
Cold ObservableとHot Observableの比較
Cold Observable
普通のObservable
Cold
Cold
import { Observable, of } from 'rxjs';
import { switchMap } from 'rxjs/operators';
function cold$() : Observable<any> {
console.log(`👇Stream is getting started.`);
return of(1).pipe(
switchMap(_ => of(new Date()))
);
}
cold$().subscribe(res => console.log(res));
cold$().subscribe(res => console.log(res));
cold$().subscribe(res => console.log(res));
cold$().subscribe(res => console.log(res));
cold$().subscribe(res => console.log(res));
cold$().subscribe(res => console.log(res));
$ node dist/index.js
👇Stream is getting started.
2020-07-06T08:33:42.390Z
👇Stream is getting started.
2020-07-06T08:33:42.398Z
👇Stream is getting started.
2020-07-06T08:33:42.399Z
👇Stream is getting started.
2020-07-06T08:33:42.402Z
👇Stream is getting started.
2020-07-06T08:33:42.403Z
👇Stream is getting started.
2020-07-06T08:33:42.404Z
subscribe
Observableのインスタンスから1つのストリーム
cold$
subscribe
subscribe
import { Observable, of } from 'rxjs';
import { switchMap, tap } from 'rxjs/operators';
function cold$() : Observable<any> {
console.log(`👇Stream is getting started.`);
return of(1).pipe(
switchMap(_ => of(new Date()))
);
}
const coldObj = cold$(); // Observableをインスタンス化
// 同一のObservableから3回のsubscribeしてみる
coldObj.pipe(
tap(_ => console.log('HOGE')),
).subscribe(res => console.log(res));
coldObj.pipe(
tap(_ => console.log('PIYO')),
).subscribe(res => console.log(res));
coldObj.pipe(
tap(_ => console.log('FUGA')),
).subscribe(res => console.log(res));
$ node dist/index.js
👇Stream is getting started.
HOGE
2020-07-06T08:44:46.075Z
PIYO
2020-07-06T08:44:46.080Z
FUGA
2020-07-06T08:44:46.081Z
subscribe
subscribe
unsubscribe
Cold
Hot Observable
Cold
Hot
publish
import { Observable, of } from 'rxjs';
import { switchMap, publish } from 'rxjs/operators';
function hot$() : Observable<any> {
console.log(`👇Stream is getting started.`);
return of(1).pipe(
switchMap(_ => of(new Date())),
publish() // 👈publish追加。これでHOTになる...?
);
}
hot$().subscribe(res => console.log(res));
hot$().subscribe(res => console.log(res));
hot$().subscribe(res => console.log(res));
hot$().subscribe(res => console.log(res));
hot$().subscribe(res => console.log(res));
hot$().subscribe(res => console.log(res));
$ node dist/index.js
👇Stream is getting started.
👇Stream is getting started.
👇Stream is getting started.
👇Stream is getting started.
👇Stream is getting started.
👇Stream is getting started.
publish
HOTなObservable(ConnectableObservable)
Hot
subscribe
connect
connectによるHOTストリームの開始
connect
import { Observable, ConnectableObservable, of } from 'rxjs';
import { switchMap, publish } from 'rxjs/operators';
function hot$() : Observable<any> {
console.log(`👇Stream is getting started.`);
return of(1).pipe(
switchMap(_ => of(new Date())),
publish() // 👈publishでHOTストリームに分配にされる
);
}
// Typescriptの場合、ConnectableObservable(Hot Observable)型にキャスト
const hot = hot$() as ConnectableObservable<any>;
// 先に分岐先を登録。6つの支流に分岐
hot.pipe(tap(_ => console.log('Stream #1 :'))).subscribe(res => console.log(res));
hot.pipe(tap(_ => console.log('Stream #2 :'))).subscribe(res => console.log(res));
hot.pipe(tap(_ => console.log('Stream #3 :'))).subscribe(res => console.log(res));
hot.pipe(tap(_ => console.log('Stream #4 :'))).subscribe(res => console.log(res));
hot.pipe(tap(_ => console.log('Stream #5 :'))).subscribe(res => console.log(res));
hot.pipe(tap(_ => console.log('Stream #6 :'))).subscribe(res => console.log(res));
// ストリーミング開始(タイミングは同時にストリーム開放)
hot.connect();
$ node dist/index.js
👇Stream is getting started.
Stream #1 :
2020-07-06T16:09:59.816Z
Stream #2 :
2020-07-06T16:09:59.816Z
Stream #3 :
2020-07-06T16:09:59.816Z
Stream #4 :
2020-07-06T16:09:59.816Z
Stream #5 :
2020-07-06T16:09:59.816Z
Stream #6 :
2020-07-06T16:09:59.816Z
shareオペレーターによるHOTストリームの開始
publishオペレーター+refCountオペレーター
shareオペレーター
publishオペレーター
publish
connect
share
connectいらず
import { Observable, ConnectableObservable, of } from 'rxjs';
import { switchMap, share } from 'rxjs/operators';
function hot$() : Observable<any> {
console.log(`👇Stream is getting started.`);
return of(1).pipe(
switchMap(_ => of(new Date())
),
share() // 👈shareでもHOTストリームに分配にされる
);
}
const hot = hot$() as ConnectableObservable<any>;
// 先に分岐先を登録・分配したら即時新しいストリーム開始
hot.pipe(tap(_ => console.log('Stream #1 :'))).subscribe(res => console.log(res));
hot.pipe(tap(_ => console.log('Stream #2 :'))).subscribe(res => console.log(res));
hot.pipe(tap(_ => console.log('Stream #3 :'))).subscribe(res => console.log(res));
hot.pipe(tap(_ => console.log('Stream #4 :'))).subscribe(res => console.log(res));
hot.pipe(tap(_ => console.log('Stream #5 :'))).subscribe(res => console.log(res));
hot.pipe(tap(_ => console.log('Stream #6 :'))).subscribe(res => console.log(res));
$ node dist/index.js
👇Stream is getting started.
Stream #1 :
2020-07-06T16:12:27.121Z
Stream #2 :
2020-07-06T16:12:27.130Z
Stream #3 :
2020-07-06T16:12:27.132Z
Stream #4 :
2020-07-06T16:12:27.133Z
Stream #5 :
2020-07-06T16:12:27.134Z
Stream #6 :
2020-07-06T16:12:27.135Z
share
publish & connect
share
まとめ
ColdとHot
ColdなObservable
Observable
HotなObservable
ConnectableObservable
Cold
Hot
参照サイト
記事を書いた人
ナンデモ系エンジニア
主にAngularでフロントエンド開発することが多いです。 開発環境はLinuxメインで進めているので、シェルコマンドも多用しております。 コツコツとプログラミングするのが好きな人間です。
カテゴリー