カテゴリー
【Rxjsのすゝめ】ObservableのCOLDとHOTって結局なんなの?
※ 当ページには【広告/PR】を含む場合があります。
2020/07/07
2022/10/05
ColdHot事前の説明のないままこの単語に出会すと、Rxjsに慣れ親しむ前の方にとっては少し身構えてしまうかも知れません。
つまずく前に最初に一読いただきたい『Cold & Hot』に関する内容をまとめてみました。
TL;DR
さて、この記事の要点だけまとめると、
COLD(通常のObservableの持つ性質):
+ subscribe()で明示にストリーミングが開始される
+ 1つのストリームには1つのオブザーバー(1対1)
+ ストリームの分配はできない
+ 他からの割り込みも受けない
+ つまりはObservableクラスそのもの
HOT(通常の用途とは異なる性質):
+ ストリーミングにsubscribe()は不要(垂れ流し)
+ ストリームを分配し、同一の値を複数のオブザーバーへ
送信することができる(1対多)
+ ストリームの値を共有できる
+ COLDからHOTへのストリーム分配にはpublish()を使う
+ 正体はConnectableObservableという派生クラス
結局のところ、
HOTCOLD普通に使う分には、Observableから生成された購読(Subscription)は、対象Observerが1人であり、そのストリームは分配されません。
subscribe()実際のところ、こう言った通常のObservableの持つ性質を
COLDHOT区別がつけば、
甲種と乙種A面・B面ともあれ、考案者の方のセンスにしっくりきたのが
COLD/HOTHot/Coldの源流を探る
HOT/COLDの言葉で一番古く遡れるのは2009年頃のRx Dev Videoの中の
抜粋しますと、その登壇した方曰く、質問投稿者の
hotとかcoldとかいうのは、つまりはstaticとdynamic、もしくはimmutableやmutableとかみたいなもの? Hot = Running / Cold = Not Running
Static/Dynamic would be misleading.
Immutable/Mutable is also misleading.
Perhaps, the closest terms are Deferred / Not-Deferred or Latent / Active.
The terms Hot and Cold came from whiteboarding and
design sessions where we needed a term to describe the difference
between something that running versus something that represents a thing that can be run.
We definitely didn't sit down and figure out a vocabulary, these things just happen.
I'm sure that at some point in time people will think about standard terminology,
but we usually can't wait for that to happen.
と返されています。
当時としてはRxも盛んに新しい技術が議論されており、適当な
terminology(専門用語)実際、
Deferred / Not-Deferred良くも悪くも
Hot/ColdCold ObservableとHot Observableの比較
ここからは実際のコードの挙動からColdとHotの違いを見ていきます。
Cold Observable
ColdなObservableは、言ってしまえば
普通のObservableおそらくRxプログラミングを最初に学習する時、HotなObservableから始めた方はいないと思います。
通常使うObservableで何かの値をストリームで流す性質を
Cold以下のコードでは、日付を内部で取得して返すObservableを返す関数で、
Cold import { Observable, of } from 'rxjs';
import { switchMap } from 'rxjs/operators';
function cold$() : Observable<any> {
console.log(`👇Stream is getting started.`);
return of(1).pipe(
switchMap(_ => of(new Date()))
);
}
cold$().subscribe(res => console.log(res));
cold$().subscribe(res => console.log(res));
cold$().subscribe(res => console.log(res));
cold$().subscribe(res => console.log(res));
cold$().subscribe(res => console.log(res));
cold$().subscribe(res => console.log(res));
これをビルドして実行します。
$ node dist/index.js
👇Stream is getting started.
2020-07-06T08:33:42.390Z
👇Stream is getting started.
2020-07-06T08:33:42.398Z
👇Stream is getting started.
2020-07-06T08:33:42.399Z
👇Stream is getting started.
2020-07-06T08:33:42.402Z
👇Stream is getting started.
2020-07-06T08:33:42.403Z
👇Stream is getting started.
2020-07-06T08:33:42.404Z
このプログラムから理解できることとして、各ストリームごとに日付を取得したタイミングが違います。
subscribeObservableのインスタンスから1つのストリームcold$subscribeでは、同一のObservableインスタンスから
subscribe検証するため、以下のように書き換えてみます。
import { Observable, of } from 'rxjs';
import { switchMap, tap } from 'rxjs/operators';
function cold$() : Observable<any> {
console.log(`👇Stream is getting started.`);
return of(1).pipe(
switchMap(_ => of(new Date()))
);
}
const coldObj = cold$(); // Observableをインスタンス化
// 同一のObservableから3回のsubscribeしてみる
coldObj.pipe(
tap(_ => console.log('HOGE')),
).subscribe(res => console.log(res));
coldObj.pipe(
tap(_ => console.log('PIYO')),
).subscribe(res => console.log(res));
coldObj.pipe(
tap(_ => console.log('FUGA')),
).subscribe(res => console.log(res));
ビルド後に実行すると、
$ node dist/index.js
👇Stream is getting started.
HOGE
2020-07-06T08:44:46.075Z
PIYO
2020-07-06T08:44:46.080Z
FUGA
2020-07-06T08:44:46.081Z
となります。
結果が示す通り、同一のObserbavleインスタンスだったとしても、
subscribeということで、同一のObservableインスタンスからいくらでもストリーム(Subscription)を生成・開始でき、一旦
subscribeunsubscribeRxjsプログラマーにとっては、最初に触れる仕組みがこの
ColdHot Observable
先ほどの
Cold当然、同期的に値を共有したりしなければならない時もあったりするときには、Observableの別の特性というべき、
Hotイメージとして、ストリームを複数の購読者(Subscriber)で共有するということは、1つだった流れを複数に分波させることとして擬えることができます。
それを可能とするのが、
上のソースコードを試しに何も考慮なしに、
publish import { Observable, of } from 'rxjs';
import { switchMap, publish } from 'rxjs/operators';
function hot$() : Observable<any> {
console.log(`👇Stream is getting started.`);
return of(1).pipe(
switchMap(_ => of(new Date())),
publish() // 👈publish追加。これでHOTになる...?
);
}
hot$().subscribe(res => console.log(res));
hot$().subscribe(res => console.log(res));
hot$().subscribe(res => console.log(res));
hot$().subscribe(res => console.log(res));
hot$().subscribe(res => console.log(res));
hot$().subscribe(res => console.log(res));
これをビルドして実行します。
$ node dist/index.js
👇Stream is getting started.
👇Stream is getting started.
👇Stream is getting started.
👇Stream is getting started.
👇Stream is getting started.
👇Stream is getting started.
...何も起きません。
上記の例で、ColdなObservableから生じたストリームを
publishHOTなObservable(ConnectableObservable)HotsubscribeHOTなストリームを流し始める方法は
connectconnectによるHOTストリームの開始
手始めに、
connect import { Observable, ConnectableObservable, of } from 'rxjs';
import { switchMap, publish } from 'rxjs/operators';
function hot$() : Observable<any> {
console.log(`👇Stream is getting started.`);
return of(1).pipe(
switchMap(_ => of(new Date())),
publish() // 👈publishでHOTストリームに分配にされる
);
}
// Typescriptの場合、ConnectableObservable(Hot Observable)型にキャスト
const hot = hot$() as ConnectableObservable<any>;
// 先に分岐先を登録。6つの支流に分岐
hot.pipe(tap(_ => console.log('Stream #1 :'))).subscribe(res => console.log(res));
hot.pipe(tap(_ => console.log('Stream #2 :'))).subscribe(res => console.log(res));
hot.pipe(tap(_ => console.log('Stream #3 :'))).subscribe(res => console.log(res));
hot.pipe(tap(_ => console.log('Stream #4 :'))).subscribe(res => console.log(res));
hot.pipe(tap(_ => console.log('Stream #5 :'))).subscribe(res => console.log(res));
hot.pipe(tap(_ => console.log('Stream #6 :'))).subscribe(res => console.log(res));
// ストリーミング開始(タイミングは同時にストリーム開放)
hot.connect();
これをビルドして実行します。
$ node dist/index.js
👇Stream is getting started.
Stream #1 :
2020-07-06T16:09:59.816Z
Stream #2 :
2020-07-06T16:09:59.816Z
Stream #3 :
2020-07-06T16:09:59.816Z
Stream #4 :
2020-07-06T16:09:59.816Z
Stream #5 :
2020-07-06T16:09:59.816Z
Stream #6 :
2020-07-06T16:09:59.816Z
きちんと1つのストリームで流れてきた値が、登録済みの6つの流れに同じタイミングで分岐され、内部の処理が共有されてることが解ります。
shareオペレーターによるHOTストリームの開始
最後に、Hotストリームをshareオペレーターで分配・開始する方法を説明します。
なお、shareオペレーターに似た方法で
publishオペレーター+refCountオペレーターshareオペレーターpublishオペレーター異なる点は、
publishconnectshareconnectいらずまずは以下のソースコードを見ていただくと、
import { Observable, ConnectableObservable, of } from 'rxjs';
import { switchMap, share } from 'rxjs/operators';
function hot$() : Observable<any> {
console.log(`👇Stream is getting started.`);
return of(1).pipe(
switchMap(_ => of(new Date())
),
share() // 👈shareでもHOTストリームに分配にされる
);
}
const hot = hot$() as ConnectableObservable<any>;
// 先に分岐先を登録・分配したら即時新しいストリーム開始
hot.pipe(tap(_ => console.log('Stream #1 :'))).subscribe(res => console.log(res));
hot.pipe(tap(_ => console.log('Stream #2 :'))).subscribe(res => console.log(res));
hot.pipe(tap(_ => console.log('Stream #3 :'))).subscribe(res => console.log(res));
hot.pipe(tap(_ => console.log('Stream #4 :'))).subscribe(res => console.log(res));
hot.pipe(tap(_ => console.log('Stream #5 :'))).subscribe(res => console.log(res));
hot.pipe(tap(_ => console.log('Stream #6 :'))).subscribe(res => console.log(res));
これをビルドして実行してみると、
$ node dist/index.js
👇Stream is getting started.
Stream #1 :
2020-07-06T16:12:27.121Z
Stream #2 :
2020-07-06T16:12:27.130Z
Stream #3 :
2020-07-06T16:12:27.132Z
Stream #4 :
2020-07-06T16:12:27.133Z
Stream #5 :
2020-07-06T16:12:27.134Z
Stream #6 :
2020-07-06T16:12:27.135Z
こちらは、内部の処理(今回は日付時刻の取得)自体は共有されているものの、
shareHotなストリームを分岐して、データを共有したい場合においても、同期的な結果を期待したい場合には
publish & connectshareまとめ
以上、Rxライブラリを使いこなす上で、一度は疑問に思ってしまうであろう
ColdとHotここまで呼んでいただくと分かると思いますが、
ColdなObservableObservableHotなObservableConnectableObservableJavascriptには型が見えにくいために、
ColdHot