[Rxjs] ObservableのCOLDとHOTって結局なんなの?


2020/07/07

Rxjs使いこなしていく過程でAPIリファレンス等を読み込んでいくと、
ColdHotなObservableという単語をしばしば目にすることがあります。

事前の説明のないままこの単語に出会すと、Rxjsに慣れ親しむ前の方にとっては少し身構えてしまうかも知れません。

つまずく前に一読いただきたいCold&Hotに関する内容をまとめてみました。


TL;DR

さて、この記事の要点だけまとめると、

            
            COLD(通常のObservableの持つ性質):
    + subscribe()で明示にストリーミングが開始される
    + 1つのストリームには1つのオブザーバー(1対1)
    + ストリームの分配はできない
    + 他からの割り込みも受けない
    + つまりはObservableクラスそのもの

HOT(通常の用途とは異なる性質):
    + ストリーミングにsubscribe()は不要(垂れ流し)
    + ストリームを分配し、同一の値を複数のオブザーバーへ
        送信することができる(1対多)
    + ストリームの値を共有できる
    + COLDからHOTへのストリーム分配にはpublish()を使う
    + 正体はConnectableObservableという派生クラス
        
結局のところ、HOTとかCOLDとかいう言葉じりに囚われてしまうと、本質が良くわからなくなって来ます。

普通に使う分には、Observableから生成された購読(Subscription)は、対象Observerが1人であり、そのストリームは分配されません。

subscribe()を呼ばなけれは、値をストリーミングもできません。

実際のところ、こう言った通常のObservableの持つ性質を
COLDと呼んで、通常の利用法でないObservableの性質をHOTと区別するために、熱い/冷たいという言葉を当てたようです。

区別がつけば、
甲種と乙種とか、A面・B面とか...など言葉はなんでもよかったと思います。

ともあれ、考案者の方のセンスにしっくりきたのが
COLD/HOTだったのでは...というエピソードも含めて、以降で深掘り・検証してまいりましょう。


Hot/Coldの源流を探る

HOT/COLDの言葉で一番古く遡れるのは2009年頃のRx Dev Videoの中のRx API in depth: Hot and Cold observablesの公開講座チャンネルで、登壇者がホワイトボードに徐に書いた内容が元になっている資料が一番古そうです。

抜粋しますと、その登壇した方曰く、質問投稿者の
hotとかcoldとかいうのは、つまりはstaticとdynamic、もしくはimmutableやmutableとかみたいなもの?という問いに、

            
            Hot = Running / Cold = Not Running

Static/Dynamic would be misleading.
Immutable/Mutable is also misleading.
Perhaps, the closest terms are Deferred / Not-Deferred or Latent / Active.

The terms Hot and Cold came from whiteboarding and
design sessions where we needed a term to describe the difference
between something that running versus something that represents a thing that can be run.
We definitely didn't sit down and figure out a vocabulary, these things just happen.

I'm sure that at some point in time people will think about standard terminology,
but we usually can't wait for that to happen.
        
と返されています。

当時としてはRxも盛んに新しい技術が議論されており、適当な
terminology(専門用語)が見つからなかったので、どうもその場限りの説明のためにホワイトボードに書かれた言葉が、現行のRxで標準用語として利用されていたようです。

実際、
Deferred / Not-Deferredとかだと、あまりバズってないので流行らなかったかも知れません。

良くも悪くも
Hot/Coldの歴史がホワイトボードの走り書きから生まれた、というのはRxユーザーにとってはちょっとしたトリビアなのかも知れません。


Cold ObservableとHot Observableの比較

ここからは実際のコードの挙動からColdとHotの違いを見ていきます。

Cold Observable

ColdなObservableは、言ってしまえば普通のObservableです。

おそらくRxプログラミングを最初に学習する時、HotなObservableから始めた方はいないと思います。

通常使うObservableで何かの値をストリームで流す性質を
Coldと呼んでいるにすぎません。

以下のコードでは、日付を内部で取得して返すObservableを返す関数で、
Coldなストリーミングをやっているプログラムです。

            
            import { Observable, of } from 'rxjs';
import { switchMap } from 'rxjs/operators';

function cold$() : Observable<any> {
    console.log(`👇Stream is getting started.`);
    return of(1).pipe(
        switchMap(_ => of(new Date()))
    );
}

cold$().subscribe(res => console.log(res));
cold$().subscribe(res => console.log(res));
cold$().subscribe(res => console.log(res));
cold$().subscribe(res => console.log(res));
cold$().subscribe(res => console.log(res));
cold$().subscribe(res => console.log(res));
        
これをビルドして実行します。

            
            $ node dist/index.js
👇Stream is getting started.
2020-07-06T08:33:42.390Z
👇Stream is getting started.
2020-07-06T08:33:42.398Z
👇Stream is getting started.
2020-07-06T08:33:42.399Z
👇Stream is getting started.
2020-07-06T08:33:42.402Z
👇Stream is getting started.
2020-07-06T08:33:42.403Z
👇Stream is getting started.
2020-07-06T08:33:42.404Z
        
このプログラムから理解できることとして、各ストリームごとに日付を取得したタイミングが違います。

subscribeメソッドでObservableのインスタンスから1つのストリームが個別に開始されているということがわかります。

cold$関数によって、計6つのObserableのインスタンスからsubscribeでそれぞれの1つのストリームが作成され、処理が開始されています。

では、同一のObservableインスタンスから
subscribeを複数回呼び出せば、ストリームは共有されるのでしょうか。

検証するため、以下のように書き換えてみます。

            
            import { Observable, of } from 'rxjs';
import { switchMap, tap } from 'rxjs/operators';

function cold$() : Observable<any> {
    console.log(`👇Stream is getting started.`);
    return of(1).pipe(
        switchMap(_ => of(new Date()))
    );
}

const coldObj = cold$(); // Observableをインスタンス化

// 同一のObservableから3回のsubscribeしてみる
coldObj.pipe(
    tap(_ => console.log('HOGE')),
).subscribe(res => console.log(res));
coldObj.pipe(
    tap(_ => console.log('PIYO')),
).subscribe(res => console.log(res));
coldObj.pipe(
    tap(_ => console.log('FUGA')),
).subscribe(res => console.log(res));
        
ビルド後に実行すると、

            
            $ node dist/index.js
👇Stream is getting started.
HOGE
2020-07-06T08:44:46.075Z
PIYO
2020-07-06T08:44:46.080Z
FUGA
2020-07-06T08:44:46.081Z
        
となります。

結果が示す通り、同一のObserbavleインスタンスだったとしても、
subscribeしてしまえば、別のストリームとして開始されています。

ということで、同一のObservableインスタンスからいくらでもストリーム(Subscription)を生成・開始でき、一旦
subscribeしてしまうと、どこかでunsubscribeしない限り外部からストリームに干渉したり、値の共有もできません。

Rxjsプログラマーにとっては、最初に触れる仕組みがこの
Coldの性質になるので、特に違和感がない作法だと思います。


Hot Observable

先ほどのColdなObserbableの説明にあった通り、原則ストリームに流れる内部の値に干渉したり共有したりすることはできません。

当然、同期的に値を共有したりしなければならない時もあったりするときには、Observableの別の特性というべき、
Hotな状態で利用することになります。

イメージとして、ストリームを複数の購読者(Subscriber)で共有するということは、1つだった流れを複数に分波させることとして擬えることができます。

それを可能とするのが、
publishオペレーターと呼ばれるパイプ関数です。

上のソースコードを試しに何も考慮なしに、
publishパイプを追加してストリーム分配してみましょう。

            
            import { Observable, of } from 'rxjs';
import { switchMap, publish } from 'rxjs/operators';

function hot$() : Observable<any> {
    console.log(`👇Stream is getting started.`);
    return of(1).pipe(
        switchMap(_ => of(new Date())),
        publish() // 👈publish追加。これでHOTになる...?
    );
}

hot$().subscribe(res => console.log(res));
hot$().subscribe(res => console.log(res));
hot$().subscribe(res => console.log(res));
hot$().subscribe(res => console.log(res));
hot$().subscribe(res => console.log(res));
hot$().subscribe(res => console.log(res));
        
これをビルドして実行します。

            
            $ node dist/index.js
👇Stream is getting started.
👇Stream is getting started.
👇Stream is getting started.
👇Stream is getting started.
👇Stream is getting started.
👇Stream is getting started.
        
...何も起きません。

上記の例で、ColdなObservableから生じたストリームを
publishに通すことで、HOTなObservable(ConnectableObservable)にすることができましたが、Hotなストリームはsubscribeでは流れを生成・開始できないのです。

HOTなストリームを流し始める方法は
connectメソッドを呼び出す方法、shareオペレーターを呼び出す方法などがあります。

connectによるHOTストリームの開始

手始めに、connectメソッドによって、HOTストリームを生成・開始してみましょう。

            
            import { Observable, ConnectableObservable, of } from 'rxjs';
import { switchMap, publish } from 'rxjs/operators';

function hot$() : Observable<any> {
    console.log(`👇Stream is getting started.`);
    return of(1).pipe(
        switchMap(_ => of(new Date())),
        publish() // 👈publishでHOTストリームに分配にされる
    );
}

// Typescriptの場合、ConnectableObservable(Hot Observable)型にキャスト
const hot = hot$() as ConnectableObservable<any>;

// 先に分岐先を登録。6つの支流に分岐
hot.pipe(tap(_ => console.log('Stream #1 :'))).subscribe(res => console.log(res));
hot.pipe(tap(_ => console.log('Stream #2 :'))).subscribe(res => console.log(res));
hot.pipe(tap(_ => console.log('Stream #3 :'))).subscribe(res => console.log(res));
hot.pipe(tap(_ => console.log('Stream #4 :'))).subscribe(res => console.log(res));
hot.pipe(tap(_ => console.log('Stream #5 :'))).subscribe(res => console.log(res));
hot.pipe(tap(_ => console.log('Stream #6 :'))).subscribe(res => console.log(res));

// ストリーミング開始(タイミングは同時にストリーム開放)
hot.connect();
        
これをビルドして実行します。

            
            $ node dist/index.js
👇Stream is getting started.
Stream #1 :
2020-07-06T16:09:59.816Z
Stream #2 :
2020-07-06T16:09:59.816Z
Stream #3 :
2020-07-06T16:09:59.816Z
Stream #4 :
2020-07-06T16:09:59.816Z
Stream #5 :
2020-07-06T16:09:59.816Z
Stream #6 :
2020-07-06T16:09:59.816Z
        
きちんと1つのストリームで流れてきた値が、登録済みの6つの流れに同じタイミングで分岐され、内部の処理が共有されてることが解ります。

shareオペレーターによるHOTストリームの開始

最後に、Hotストリームをshareオペレーターで分配・開始する方法を説明します。

なお、shareオペレーターに似た方法で
publishオペレーター+refCountオペレーターを利用するという方法ありますが、こちらについては取り扱い注意なので、次回のブログ記事で比較しています。

shareオペレーターpublishオペレーター同様に、ColdなストリームをHotなストリームに分配する関数です。

異なる点は、
publishconnectを呼び出して、ストリームを開始するタイミングをプログラマーが明示に手動で呼び出さなければならないのに対し、shareは自動的に任意のタイミングで開始してくれる、connectいらずな点です。

まずは以下のソースコードを見ていただくと、

            
            import { Observable, ConnectableObservable, of } from 'rxjs';
import { switchMap, share } from 'rxjs/operators';

function hot$() : Observable<any> {
    console.log(`👇Stream is getting started.`);
    return of(1).pipe(
            switchMap(_ => of(new Date())
        ),
        share() // 👈shareでもHOTストリームに分配にされる
    );
}

const hot = hot$() as ConnectableObservable<any>;

// 先に分岐先を登録・分配したら即時新しいストリーム開始
hot.pipe(tap(_ => console.log('Stream #1 :'))).subscribe(res => console.log(res));
hot.pipe(tap(_ => console.log('Stream #2 :'))).subscribe(res => console.log(res));
hot.pipe(tap(_ => console.log('Stream #3 :'))).subscribe(res => console.log(res));
hot.pipe(tap(_ => console.log('Stream #4 :'))).subscribe(res => console.log(res));
hot.pipe(tap(_ => console.log('Stream #5 :'))).subscribe(res => console.log(res));
hot.pipe(tap(_ => console.log('Stream #6 :'))).subscribe(res => console.log(res));
        
これをビルドして実行してみると、

            
            $ node dist/index.js
👇Stream is getting started.
Stream #1 :
2020-07-06T16:12:27.121Z
Stream #2 :
2020-07-06T16:12:27.130Z
Stream #3 :
2020-07-06T16:12:27.132Z
Stream #4 :
2020-07-06T16:12:27.133Z
Stream #5 :
2020-07-06T16:12:27.134Z
Stream #6 :
2020-07-06T16:12:27.135Z
        
こちらは、内部の処理(今回は日付時刻の取得)自体は共有されているものの、shareに分岐させたストリームを開始させるタイミングをおまかせにしているため、値としてはバラバラに出ています。

Hotなストリームを分岐して、データを共有したい場合においても、同期的な結果を期待したい場合には
publish & connectを利用し、ストリーム分配後の内部処理のタイミングに気を使わないで良いのであればshareを利用するなどの使い分けは重要です。


まとめ

以上、Rxライブラリを使いこなす上で、一度は疑問に思ってしまうであろうColdとHotの微妙な違い問題を特集しました。

ここまで呼んでいただくと分かると思いますが、
ColdなObservableとはObservableそのもので、HotなObservableと呼んでいるのConnectableObservableです。

Javascriptには型が見えにくいために、
Coldってなんだ、Hotとどう違うんだという議論があったのですが、Typescriptで型付けありのプログラミングで実装してみると、スッキリと理解できるのではないかと思います。


参照サイト

Hot vs Cold Observables

RxのHotとColdについて

【Reactive Extensions】 Hot変換はどういう時に必要なのか?

RxJS を学ぼう #4 - COLD と HOT について学ぶ / ConnectableObservable

記事を書いた人

記事の担当:taconocat

ナンデモ系エンジニア

主にAngularでフロントエンド開発することが多いです。 開発環境はLinuxメインで進めているので、シェルコマンドも多用しております。 コツコツとプログラミングするのが好きな人間です。