【Rxjsのすゝめ】ObservableのCOLDとHOTって結局なんなの?


※ 当ページには【広告/PR】を含む場合があります。
2020/07/07
2022/10/05
【Rxjs基礎講座】Typescript & Babelでrxjsを手軽に試せる砂場(sandbox)を構築してみよう
『Rxjs』 を使いこなしていく過程でAPIリファレンス等を読み込んでいくと、 ColdHot なObservableという単語をしばしば目にすることがあります。

参考|RxJS - Reactive Extensions Library for JavaScript

事前の説明のないままこの単語に出会すと、Rxjsに慣れ親しむ前の方にとっては少し身構えてしまうかも知れません。
つまずく前に最初に一読いただきたい『Cold & Hot』に関する内容をまとめてみました。


合同会社タコスキングダム|蛸壺の技術ブログ【効果的学習法レポート・2025年最新】Angular(JSフレームワーク)をこれから学びたい人のためのオススメ書籍&教材特集

TL;DR



さて、この記事の要点だけまとめると、

            COLD(通常のObservableの持つ性質):
    + subscribe()で明示にストリーミングが開始される
    + 1つのストリームには1つのオブザーバー(1対1)
    + ストリームの分配はできない
    + 他からの割り込みも受けない
    + つまりはObservableクラスそのもの

HOT(通常の用途とは異なる性質):
    + ストリーミングにsubscribe()は不要(垂れ流し)
    + ストリームを分配し、同一の値を複数のオブザーバーへ
        送信することができる(1対多)
    + ストリームの値を共有できる
    + COLDからHOTへのストリーム分配にはpublish()を使う
    + 正体はConnectableObservableという派生クラス

        

結局のところ、
HOT とか COLD とかいう言葉じりに囚われてしまうと、本質が良くわからなくなって来ます。
普通に使う分には、Observableから生成された購読(Subscription)は、対象Observerが1人であり、そのストリームは分配されません。

subscribe() を呼ばなけれは、値をストリーミングもできません。
実際のところ、こう言った通常のObservableの持つ性質を
COLD と呼んで、通常の利用法でないObservableの性質を HOT と区別するために、熱い/冷たいという言葉を当てたようです。
区別がつけば、
甲種と乙種 とか、 A面・B面 とか...など言葉はなんでもよかったと思います。
ともあれ、考案者の方のセンスにしっくりきたのが
COLD/HOT だったのでは...というエピソードも含めて、以降で深掘り・検証してまいりましょう。


合同会社タコスキングダム|蛸壺の技術ブログ【効果的学習法レポート・2025年最新】Angular(JSフレームワーク)をこれから学びたい人のためのオススメ書籍&教材特集

Hot/Coldの源流を探る



HOT/COLDの言葉で一番古く遡れるのは2009年頃のRx Dev Videoの中の
Rx API in depth: Hot and Cold observables の公開講座チャンネルで、登壇者がホワイトボードに徐に書いた内容が元になっている資料が一番古そうです。
抜粋しますと、その登壇した方曰く、質問投稿者の
hotとかcoldとかいうのは、つまりはstaticとdynamic、もしくはimmutableやmutableとかみたいなもの? という問いに、

            Hot = Running / Cold = Not Running

Static/Dynamic would be misleading.
Immutable/Mutable is also misleading.
Perhaps, the closest terms are Deferred / Not-Deferred or Latent / Active.

The terms Hot and Cold came from whiteboarding and
design sessions where we needed a term to describe the difference
between something that running versus something that represents a thing that can be run.
We definitely didn't sit down and figure out a vocabulary, these things just happen.

I'm sure that at some point in time people will think about standard terminology,
but we usually can't wait for that to happen.

        

と返されています。
当時としてはRxも盛んに新しい技術が議論されており、適当な
terminology(専門用語) が見つからなかったので、どうもその場限りの説明のためにホワイトボードに書かれた言葉が、現行のRxで標準用語として利用されていたようです。
実際、
Deferred / Not-Deferred とかだと、あまりバズってないので流行らなかったかも知れません。
良くも悪くも
Hot/Cold の歴史がホワイトボードの走り書きから生まれた、というのはRxユーザーにとってはちょっとしたトリビアなのかも知れません。


合同会社タコスキングダム|蛸壺の技術ブログ【効果的学習法レポート・2025年最新】Angular(JSフレームワーク)をこれから学びたい人のためのオススメ書籍&教材特集

Cold ObservableとHot Observableの比較



ここからは実際のコードの挙動からColdとHotの違いを見ていきます。

Cold Observable



ColdなObservableは、言ってしまえば
普通のObservable です。
おそらくRxプログラミングを最初に学習する時、HotなObservableから始めた方はいないと思います。
通常使うObservableで何かの値をストリームで流す性質を
Cold と呼んでいるにすぎません。
以下のコードでは、日付を内部で取得して返すObservableを返す関数で、
Cold なストリーミングをやっているプログラムです。

            import { Observable, of } from 'rxjs';
import { switchMap } from 'rxjs/operators';

function cold$() : Observable<any> {
    console.log(`👇Stream is getting started.`);
    return of(1).pipe(
        switchMap(_ => of(new Date()))
    );
}

cold$().subscribe(res => console.log(res));
cold$().subscribe(res => console.log(res));
cold$().subscribe(res => console.log(res));
cold$().subscribe(res => console.log(res));
cold$().subscribe(res => console.log(res));
cold$().subscribe(res => console.log(res));

        

これをビルドして実行します。

            $ node dist/index.js
👇Stream is getting started.
2020-07-06T08:33:42.390Z
👇Stream is getting started.
2020-07-06T08:33:42.398Z
👇Stream is getting started.
2020-07-06T08:33:42.399Z
👇Stream is getting started.
2020-07-06T08:33:42.402Z
👇Stream is getting started.
2020-07-06T08:33:42.403Z
👇Stream is getting started.
2020-07-06T08:33:42.404Z

        

このプログラムから理解できることとして、各ストリームごとに日付を取得したタイミングが違います。

subscribe メソッドで Observableのインスタンスから1つのストリーム が個別に開始されているということがわかります。

cold$ 関数によって、計6つのObserableのインスタンスから subscribe でそれぞれの1つのストリームが作成され、処理が開始されています。
では、同一のObservableインスタンスから
subscribe を複数回呼び出せば、ストリームは共有されるのでしょうか。
検証するため、以下のように書き換えてみます。

            import { Observable, of } from 'rxjs';
import { switchMap, tap } from 'rxjs/operators';

function cold$() : Observable<any> {
    console.log(`👇Stream is getting started.`);
    return of(1).pipe(
        switchMap(_ => of(new Date()))
    );
}

const coldObj = cold$(); // Observableをインスタンス化

// 同一のObservableから3回のsubscribeしてみる
coldObj.pipe(
    tap(_ => console.log('HOGE')),
).subscribe(res => console.log(res));
coldObj.pipe(
    tap(_ => console.log('PIYO')),
).subscribe(res => console.log(res));
coldObj.pipe(
    tap(_ => console.log('FUGA')),
).subscribe(res => console.log(res));

        

ビルド後に実行すると、

            $ node dist/index.js
👇Stream is getting started.
HOGE
2020-07-06T08:44:46.075Z
PIYO
2020-07-06T08:44:46.080Z
FUGA
2020-07-06T08:44:46.081Z

        

となります。
結果が示す通り、同一のObserbavleインスタンスだったとしても、
subscribe してしまえば、別のストリームとして開始されています。
ということで、同一のObservableインスタンスからいくらでもストリーム(Subscription)を生成・開始でき、一旦
subscribe してしまうと、どこかで unsubscribe しない限り外部からストリームに干渉したり、値の共有もできません。
Rxjsプログラマーにとっては、最初に触れる仕組みがこの
Cold の性質になるので、特に違和感がない作法だと思います。


合同会社タコスキングダム|蛸壺の技術ブログ【効果的学習法レポート・2025年最新】Angular(JSフレームワーク)をこれから学びたい人のためのオススメ書籍&教材特集

Hot Observable



先ほどの
Cold なObserbableの説明にあった通り、原則ストリームに流れる内部の値に干渉したり共有したりすることはできません。
当然、同期的に値を共有したりしなければならない時もあったりするときには、Observableの別の特性というべき、
Hot な状態で利用することになります。
イメージとして、ストリームを複数の購読者(Subscriber)で共有するということは、1つだった流れを複数に分波させることとして擬えることができます。
それを可能とするのが、
publishオペレーター と呼ばれるパイプ関数です。
上のソースコードを試しに何も考慮なしに、
publish パイプを追加してストリーム分配してみましょう。

            import { Observable, of } from 'rxjs';
import { switchMap, publish } from 'rxjs/operators';

function hot$() : Observable<any> {
    console.log(`👇Stream is getting started.`);
    return of(1).pipe(
        switchMap(_ => of(new Date())),
        publish() // 👈publish追加。これでHOTになる...?
    );
}

hot$().subscribe(res => console.log(res));
hot$().subscribe(res => console.log(res));
hot$().subscribe(res => console.log(res));
hot$().subscribe(res => console.log(res));
hot$().subscribe(res => console.log(res));
hot$().subscribe(res => console.log(res));

        

これをビルドして実行します。

            $ node dist/index.js
👇Stream is getting started.
👇Stream is getting started.
👇Stream is getting started.
👇Stream is getting started.
👇Stream is getting started.
👇Stream is getting started.

        

...何も起きません。
上記の例で、ColdなObservableから生じたストリームを
publish に通すことで、 HOTなObservable(ConnectableObservable) にすることができましたが、 Hot なストリームは subscribe では流れを生成・開始できないのです。
HOTなストリームを流し始める方法は
connect メソッドを呼び出す方法、 shareオペレーター を呼び出す方法などがあります。

connectによるHOTストリームの開始



手始めに、
connect メソッドによって、HOTストリームを生成・開始してみましょう。

            import { Observable, ConnectableObservable, of } from 'rxjs';
import { switchMap, publish } from 'rxjs/operators';

function hot$() : Observable<any> {
    console.log(`👇Stream is getting started.`);
    return of(1).pipe(
        switchMap(_ => of(new Date())),
        publish() // 👈publishでHOTストリームに分配にされる
    );
}

// Typescriptの場合、ConnectableObservable(Hot Observable)型にキャスト
const hot = hot$() as ConnectableObservable<any>;

// 先に分岐先を登録。6つの支流に分岐
hot.pipe(tap(_ => console.log('Stream #1 :'))).subscribe(res => console.log(res));
hot.pipe(tap(_ => console.log('Stream #2 :'))).subscribe(res => console.log(res));
hot.pipe(tap(_ => console.log('Stream #3 :'))).subscribe(res => console.log(res));
hot.pipe(tap(_ => console.log('Stream #4 :'))).subscribe(res => console.log(res));
hot.pipe(tap(_ => console.log('Stream #5 :'))).subscribe(res => console.log(res));
hot.pipe(tap(_ => console.log('Stream #6 :'))).subscribe(res => console.log(res));

// ストリーミング開始(タイミングは同時にストリーム開放)
hot.connect();

        

これをビルドして実行します。

            $ node dist/index.js
👇Stream is getting started.
Stream #1 :
2020-07-06T16:09:59.816Z
Stream #2 :
2020-07-06T16:09:59.816Z
Stream #3 :
2020-07-06T16:09:59.816Z
Stream #4 :
2020-07-06T16:09:59.816Z
Stream #5 :
2020-07-06T16:09:59.816Z
Stream #6 :
2020-07-06T16:09:59.816Z

        

きちんと1つのストリームで流れてきた値が、登録済みの6つの流れに同じタイミングで分岐され、内部の処理が共有されてることが解ります。

shareオペレーターによるHOTストリームの開始



最後に、Hotストリームをshareオペレーターで分配・開始する方法を説明します。
なお、shareオペレーターに似た方法で
publishオペレーター+refCountオペレーター を利用するという方法ありますが、こちらについては取り扱い注意なので、 次回のブログ記事 で比較しています。

shareオペレーターpublishオペレーター 同様に、ColdなストリームをHotなストリームに分配する関数です。
異なる点は、
publishconnect を呼び出して、ストリームを開始するタイミングをプログラマーが明示に手動で呼び出さなければならないのに対し、 share は自動的に任意のタイミングで開始してくれる、 connectいらず な点です。
まずは以下のソースコードを見ていただくと、

            import { Observable, ConnectableObservable, of } from 'rxjs';
import { switchMap, share } from 'rxjs/operators';

function hot$() : Observable<any> {
    console.log(`👇Stream is getting started.`);
    return of(1).pipe(
            switchMap(_ => of(new Date())
        ),
        share() // 👈shareでもHOTストリームに分配にされる
    );
}

const hot = hot$() as ConnectableObservable<any>;

// 先に分岐先を登録・分配したら即時新しいストリーム開始
hot.pipe(tap(_ => console.log('Stream #1 :'))).subscribe(res => console.log(res));
hot.pipe(tap(_ => console.log('Stream #2 :'))).subscribe(res => console.log(res));
hot.pipe(tap(_ => console.log('Stream #3 :'))).subscribe(res => console.log(res));
hot.pipe(tap(_ => console.log('Stream #4 :'))).subscribe(res => console.log(res));
hot.pipe(tap(_ => console.log('Stream #5 :'))).subscribe(res => console.log(res));
hot.pipe(tap(_ => console.log('Stream #6 :'))).subscribe(res => console.log(res));

        

これをビルドして実行してみると、

            $ node dist/index.js
👇Stream is getting started.
Stream #1 :
2020-07-06T16:12:27.121Z
Stream #2 :
2020-07-06T16:12:27.130Z
Stream #3 :
2020-07-06T16:12:27.132Z
Stream #4 :
2020-07-06T16:12:27.133Z
Stream #5 :
2020-07-06T16:12:27.134Z
Stream #6 :
2020-07-06T16:12:27.135Z

        

こちらは、内部の処理(今回は日付時刻の取得)自体は共有されているものの、
share に分岐させたストリームを開始させるタイミングをおまかせにしているため、値としてはバラバラに出ています。
Hotなストリームを分岐して、データを共有したい場合においても、同期的な結果を期待したい場合には
publish & connect を利用し、ストリーム分配後の内部処理のタイミングに気を使わないで良いのであれば share を利用するなどの使い分けは重要です。


合同会社タコスキングダム|蛸壺の技術ブログ【効果的学習法レポート・2025年最新】Angular(JSフレームワーク)をこれから学びたい人のためのオススメ書籍&教材特集

まとめ



以上、Rxライブラリを使いこなす上で、一度は疑問に思ってしまうであろう
ColdとHot の微妙な違い問題を特集しました。
ここまで呼んでいただくと分かると思いますが、
ColdなObservable とは Observable そのもので、 HotなObservable と呼んでいるの ConnectableObservable です。
Javascriptには型が見えにくいために、
Cold ってなんだ、 Hot とどう違うんだという議論があったのですが、Typescriptで型付けありのプログラミングで実装してみると、スッキリと理解できるのではないかと思います。

参照サイト

Hot vs Cold ObservablesRxのHotとColdについて【Reactive Extensions】 Hot変換はどういう時に必要なのか?RxJS を学ぼう #4 - COLD と HOT について学ぶ / ConnectableObservable
記事を書いた人

記事の担当:taconocat

ナンデモ系エンジニア

主にAngularでフロントエンド開発することが多いです。 開発環境はLinuxメインで進めているので、シェルコマンドも多用しております。 コツコツとプログラミングするのが好きな人間です。

合同会社タコスキングダム|蛸壺の技術ブログ【効果的学習法レポート・2025年最新】Angular(JSフレームワーク)をこれから学びたい人のためのオススメ書籍&教材特集