カテゴリー
【Rxjsのすゝめ】ObservableのCOLDとHOTって結局なんなの?
※ 当ページには【広告/PR】を含む場合があります。
2020/07/07
2022/10/05
Cold
Hot
事前の説明のないままこの単語に出会すと、Rxjsに慣れ親しむ前の方にとっては少し身構えてしまうかも知れません。
つまずく前に最初に一読いただきたい『Cold & Hot』に関する内容をまとめてみました。
TL;DR
さて、この記事の要点だけまとめると、
COLD(通常のObservableの持つ性質):
+ subscribe()で明示にストリーミングが開始される
+ 1つのストリームには1つのオブザーバー(1対1)
+ ストリームの分配はできない
+ 他からの割り込みも受けない
+ つまりはObservableクラスそのもの
HOT(通常の用途とは異なる性質):
+ ストリーミングにsubscribe()は不要(垂れ流し)
+ ストリームを分配し、同一の値を複数のオブザーバーへ
送信することができる(1対多)
+ ストリームの値を共有できる
+ COLDからHOTへのストリーム分配にはpublish()を使う
+ 正体はConnectableObservableという派生クラス
結局のところ、
HOT
COLD
普通に使う分には、Observableから生成された購読(Subscription)は、対象Observerが1人であり、そのストリームは分配されません。
subscribe()
実際のところ、こう言った通常のObservableの持つ性質を
COLD
HOT
区別がつけば、
甲種と乙種
A面・B面
ともあれ、考案者の方のセンスにしっくりきたのが
COLD/HOT
Hot/Coldの源流を探る
HOT/COLDの言葉で一番古く遡れるのは2009年頃のRx Dev Videoの中の
抜粋しますと、その登壇した方曰く、質問投稿者の
hotとかcoldとかいうのは、つまりはstaticとdynamic、もしくはimmutableやmutableとかみたいなもの?
Hot = Running / Cold = Not Running
Static/Dynamic would be misleading.
Immutable/Mutable is also misleading.
Perhaps, the closest terms are Deferred / Not-Deferred or Latent / Active.
The terms Hot and Cold came from whiteboarding and
design sessions where we needed a term to describe the difference
between something that running versus something that represents a thing that can be run.
We definitely didn't sit down and figure out a vocabulary, these things just happen.
I'm sure that at some point in time people will think about standard terminology,
but we usually can't wait for that to happen.
と返されています。
当時としてはRxも盛んに新しい技術が議論されており、適当な
terminology(専門用語)
実際、
Deferred / Not-Deferred
良くも悪くも
Hot/Cold
Cold ObservableとHot Observableの比較
ここからは実際のコードの挙動からColdとHotの違いを見ていきます。
Cold Observable
ColdなObservableは、言ってしまえば
普通のObservable
おそらくRxプログラミングを最初に学習する時、HotなObservableから始めた方はいないと思います。
通常使うObservableで何かの値をストリームで流す性質を
Cold
以下のコードでは、日付を内部で取得して返すObservableを返す関数で、
Cold
import { Observable, of } from 'rxjs';
import { switchMap } from 'rxjs/operators';
function cold$() : Observable<any> {
console.log(`👇Stream is getting started.`);
return of(1).pipe(
switchMap(_ => of(new Date()))
);
}
cold$().subscribe(res => console.log(res));
cold$().subscribe(res => console.log(res));
cold$().subscribe(res => console.log(res));
cold$().subscribe(res => console.log(res));
cold$().subscribe(res => console.log(res));
cold$().subscribe(res => console.log(res));
これをビルドして実行します。
$ node dist/index.js
👇Stream is getting started.
2020-07-06T08:33:42.390Z
👇Stream is getting started.
2020-07-06T08:33:42.398Z
👇Stream is getting started.
2020-07-06T08:33:42.399Z
👇Stream is getting started.
2020-07-06T08:33:42.402Z
👇Stream is getting started.
2020-07-06T08:33:42.403Z
👇Stream is getting started.
2020-07-06T08:33:42.404Z
このプログラムから理解できることとして、各ストリームごとに日付を取得したタイミングが違います。
subscribe
Observableのインスタンスから1つのストリーム
cold$
subscribe
では、同一のObservableインスタンスから
subscribe
検証するため、以下のように書き換えてみます。
import { Observable, of } from 'rxjs';
import { switchMap, tap } from 'rxjs/operators';
function cold$() : Observable<any> {
console.log(`👇Stream is getting started.`);
return of(1).pipe(
switchMap(_ => of(new Date()))
);
}
const coldObj = cold$(); // Observableをインスタンス化
// 同一のObservableから3回のsubscribeしてみる
coldObj.pipe(
tap(_ => console.log('HOGE')),
).subscribe(res => console.log(res));
coldObj.pipe(
tap(_ => console.log('PIYO')),
).subscribe(res => console.log(res));
coldObj.pipe(
tap(_ => console.log('FUGA')),
).subscribe(res => console.log(res));
ビルド後に実行すると、
$ node dist/index.js
👇Stream is getting started.
HOGE
2020-07-06T08:44:46.075Z
PIYO
2020-07-06T08:44:46.080Z
FUGA
2020-07-06T08:44:46.081Z
となります。
結果が示す通り、同一のObserbavleインスタンスだったとしても、
subscribe
ということで、同一のObservableインスタンスからいくらでもストリーム(Subscription)を生成・開始でき、一旦
subscribe
unsubscribe
Rxjsプログラマーにとっては、最初に触れる仕組みがこの
Cold
Hot Observable
先ほどの
Cold
当然、同期的に値を共有したりしなければならない時もあったりするときには、Observableの別の特性というべき、
Hot
イメージとして、ストリームを複数の購読者(Subscriber)で共有するということは、1つだった流れを複数に分波させることとして擬えることができます。
それを可能とするのが、
上のソースコードを試しに何も考慮なしに、
publish
import { Observable, of } from 'rxjs';
import { switchMap, publish } from 'rxjs/operators';
function hot$() : Observable<any> {
console.log(`👇Stream is getting started.`);
return of(1).pipe(
switchMap(_ => of(new Date())),
publish() // 👈publish追加。これでHOTになる...?
);
}
hot$().subscribe(res => console.log(res));
hot$().subscribe(res => console.log(res));
hot$().subscribe(res => console.log(res));
hot$().subscribe(res => console.log(res));
hot$().subscribe(res => console.log(res));
hot$().subscribe(res => console.log(res));
これをビルドして実行します。
$ node dist/index.js
👇Stream is getting started.
👇Stream is getting started.
👇Stream is getting started.
👇Stream is getting started.
👇Stream is getting started.
👇Stream is getting started.
...何も起きません。
上記の例で、ColdなObservableから生じたストリームを
publish
HOTなObservable(ConnectableObservable)
Hot
subscribe
HOTなストリームを流し始める方法は
connect
connectによるHOTストリームの開始
手始めに、
connect
import { Observable, ConnectableObservable, of } from 'rxjs';
import { switchMap, publish } from 'rxjs/operators';
function hot$() : Observable<any> {
console.log(`👇Stream is getting started.`);
return of(1).pipe(
switchMap(_ => of(new Date())),
publish() // 👈publishでHOTストリームに分配にされる
);
}
// Typescriptの場合、ConnectableObservable(Hot Observable)型にキャスト
const hot = hot$() as ConnectableObservable<any>;
// 先に分岐先を登録。6つの支流に分岐
hot.pipe(tap(_ => console.log('Stream #1 :'))).subscribe(res => console.log(res));
hot.pipe(tap(_ => console.log('Stream #2 :'))).subscribe(res => console.log(res));
hot.pipe(tap(_ => console.log('Stream #3 :'))).subscribe(res => console.log(res));
hot.pipe(tap(_ => console.log('Stream #4 :'))).subscribe(res => console.log(res));
hot.pipe(tap(_ => console.log('Stream #5 :'))).subscribe(res => console.log(res));
hot.pipe(tap(_ => console.log('Stream #6 :'))).subscribe(res => console.log(res));
// ストリーミング開始(タイミングは同時にストリーム開放)
hot.connect();
これをビルドして実行します。
$ node dist/index.js
👇Stream is getting started.
Stream #1 :
2020-07-06T16:09:59.816Z
Stream #2 :
2020-07-06T16:09:59.816Z
Stream #3 :
2020-07-06T16:09:59.816Z
Stream #4 :
2020-07-06T16:09:59.816Z
Stream #5 :
2020-07-06T16:09:59.816Z
Stream #6 :
2020-07-06T16:09:59.816Z
きちんと1つのストリームで流れてきた値が、登録済みの6つの流れに同じタイミングで分岐され、内部の処理が共有されてることが解ります。
shareオペレーターによるHOTストリームの開始
最後に、Hotストリームをshareオペレーターで分配・開始する方法を説明します。
なお、shareオペレーターに似た方法で
publishオペレーター+refCountオペレーター
shareオペレーター
publishオペレーター
異なる点は、
publish
connect
share
connectいらず
まずは以下のソースコードを見ていただくと、
import { Observable, ConnectableObservable, of } from 'rxjs';
import { switchMap, share } from 'rxjs/operators';
function hot$() : Observable<any> {
console.log(`👇Stream is getting started.`);
return of(1).pipe(
switchMap(_ => of(new Date())
),
share() // 👈shareでもHOTストリームに分配にされる
);
}
const hot = hot$() as ConnectableObservable<any>;
// 先に分岐先を登録・分配したら即時新しいストリーム開始
hot.pipe(tap(_ => console.log('Stream #1 :'))).subscribe(res => console.log(res));
hot.pipe(tap(_ => console.log('Stream #2 :'))).subscribe(res => console.log(res));
hot.pipe(tap(_ => console.log('Stream #3 :'))).subscribe(res => console.log(res));
hot.pipe(tap(_ => console.log('Stream #4 :'))).subscribe(res => console.log(res));
hot.pipe(tap(_ => console.log('Stream #5 :'))).subscribe(res => console.log(res));
hot.pipe(tap(_ => console.log('Stream #6 :'))).subscribe(res => console.log(res));
これをビルドして実行してみると、
$ node dist/index.js
👇Stream is getting started.
Stream #1 :
2020-07-06T16:12:27.121Z
Stream #2 :
2020-07-06T16:12:27.130Z
Stream #3 :
2020-07-06T16:12:27.132Z
Stream #4 :
2020-07-06T16:12:27.133Z
Stream #5 :
2020-07-06T16:12:27.134Z
Stream #6 :
2020-07-06T16:12:27.135Z
こちらは、内部の処理(今回は日付時刻の取得)自体は共有されているものの、
share
Hotなストリームを分岐して、データを共有したい場合においても、同期的な結果を期待したい場合には
publish & connect
share
まとめ
以上、Rxライブラリを使いこなす上で、一度は疑問に思ってしまうであろう
ColdとHot
ここまで呼んでいただくと分かると思いますが、
ColdなObservable
Observable
HotなObservable
ConnectableObservable
Javascriptには型が見えにくいために、
Cold
Hot
参照サイト
記事を書いた人
ナンデモ系エンジニア
主にAngularでフロントエンド開発することが多いです。 開発環境はLinuxメインで進めているので、シェルコマンドも多用しております。 コツコツとプログラミングするのが好きな人間です。
カテゴリー