[Rxjs] GeneratorをObservableへ変換する方法


2020/07/17

以前の
記事 | deferでasync/await関数からObservableへ変換するでは、async/awaitをObservableに変えてから処理を行うことが分かりました。

もっとニッチなところで、jsの
generator関数もObservableへ変換できるのか、という疑問が出てきます。

今回は
generator関数をObserbavleで実行するやり方を検証します。


Generatorの簡単なおさらい

javascript generatorでウェブ検索すると沢山の説明記事がヒットします。

なのでここでは深く説明しませんが、ECMAScript2015(ES6)から盛り込まれた機能で、ES7から正式に追加された
async/await構文からすると先輩にあたるようです。

ジェネレーターは一見すると従来のイタレーターの拡張版に見えます。

しかし単なるイタレーターとは違って、遅延評価・同期処理・関数外部からの制御・無限シークエンス表現...などのテクニックを提供してくれる便利なものです。

typescriptではジェネレーターは
Generator<T>型で扱います。

例えば簡単な実装で遊んでみますと、

            
            function* generator(): Generator<any> {
    yield 0;
    let i = 4;
    yield 1;
    yield 'HOGE';
    yield 2 + i;
    i++;
    yield 'PIYO';
    yield 1.0e-3 * i;
    yield false;
    yield 'FUGA';
    yield i - 3;
    yield true;
}

const iterator = generator();

console.log(iterator.next());
console.log(iterator.next());
console.log(iterator.next());
console.log(iterator.next());
console.log(iterator.next());
console.log(iterator.next());
console.log(iterator.next());
console.log(iterator.next());
console.log(iterator.next());
console.log(iterator.next());
console.log(iterator.next());
console.log(iterator.next());
        
これをビルドして実行すると、

            
            $ node dist/index.js
{ value: 0, done: false }
{ value: 1, done: false }
{ value: 'HOGE', done: false }
{ value: 6, done: false }
{ value: 'PIYO', done: false }
{ value: 0.005, done: false }
{ value: false, done: false }
{ value: 'FUGA', done: false }
{ value: 2, done: false }
{ value: true, done: false }
{ value: undefined, done: true }
{ value: undefined, done: true } # 処理が終わってもGeneratorは消滅せずに残る
        
というような出力を得ます。

見てお分かりのように、ジェネレーター内部で
yieldされた値が、外部からのnext()の呼び出しによってその都度処理されて引き出されています。

そのときの返り値は
{value: any, done: boolean}型のオブジェクトで、valueで処理された値を参照でき、doneでジェネレーターの処理完了を受け取ることができます。


GeneratorからObservableへの変換

それでは早速、先程のジェネレーターを元にして、Observableに変換した処理を行ってみます。

検証① ~ fromオペレーターから

別のブログ記事で以前からfromオペレーターでPromise --> Observableへの変換async/await --> Observableへの変換を取り上げました。

実のところ、
fromオペレーターはGenerator型のオブジェクトからもObservableへ変換してくれるので、ジェネレーターも以下のように簡単に変換できます。

            
            import { Observable, from } from 'rxjs';
import { take } from 'rxjs/operators';

function* generator(): Generator<any> {
    yield 0;
    let i = 4;
    yield 1;
    yield 'HOGE';
    yield 2 + i;
    i++;
    yield 'PIYO';
    yield 1.0e-3 * i;
    yield false;
    yield 'FUGA';
    yield i - 3;
    yield true;
}

// 👇fromオペレーターからジェネレーターもObservableへ一発変換
const iterator$ = from(generator()).pipe(
    take(11)
);

iterator$.subscribe(
    res => console.log(res),
    err => console.log(err),
    () => console.log('DONE!')
);
        
これをビルドして実行すると、

            
            $ node dist/index.js
0
1
HOGE
6
PIYO
0.005
false
FUGA
2
true
DONE! # Generatorの処理が終わったらtakeでUnsubscribe
        
このようにジェネレーターの返り値の内valueの値が、Observableのsubscribeのnext()へ流れてきているようです。

また、ジェネレーターが
donetrueに達したときと連動して、Observableのsubscribeもcomplete()が着火するように設計されています。

検証② ~ deferオペレーターから

Fromオペレーターが使えるのなら、deferオペレーターも使えそうです。

以下試してみましょう。

            
            import { Observable, defer } from 'rxjs';
import { take } from 'rxjs/operators';

function* generator(): Generator<any> {
    yield 0;
    let i = 4;
    yield 1;
    yield 'HOGE';
    yield 2 + i;
    i++;
    yield 'PIYO';
    yield 1.0e-3 * i;
    yield false;
    yield 'FUGA';
    yield i - 3;
    yield true;
}

// 👇deferオペレーターからジェネレーターもObservableへ一発変換
const iterator$ = defer(generator).pipe(
    take(11)
);

iterator$.subscribe(
    res => console.log(res),
    err => console.log(err),
    () => console.log('DONE!')
);
        
とすることで、先程の結果と同じ結果を得ます。

fromオペレーターとの違いは、fromのほうが引数に
Generator<any>型のインスタンス(実体)を指定するのに対して、deferはジェネレーター関数そのものを指定します。


応用編 ~ mergeMap(flatMap)のresultSelector(第二引数)を理解する

上記まででfromオペレーターやdeferオペレーターがGenerator型の引数をカバーしてることを示したました。

map系のオペレーターも第一引数にGenerator関数を指定することで利用可能です。

ここではmap系も種類が多いのですべてのオペレーターを個別に使用例をご紹介することができません。

丁度いい機会ですので、とりわけmap系オペレーターの持つ謎多き第二引数
resultSelectorの挙動の詳細に迫ってみます。

resultSelector

公式のAPIリファレンスにはこうあります。

関数の定義の部分だけ抜粋すると、

            
            Projects each source value to an Observable which is merged in the output Observable.

mergeMap<T, R, O extends ObservableInput<any>>(
    project: (value: T, index: number) => O,
    resultSelector?: number |
        ((outerValue: T, innerValue: ObservedValueOf<O>, outerIndex: number, innerIndex: number) => R),
    concurrent: number = Number.POSITIVE_INFINITY
): OperatorFunction<T, ObservedValueOf<O> | R>

Parameters:
    project:
        A function that, when applied to an item emitted by the source Observable,
        returns an Observable.

    resultSelector:
        Optional. Default is undefined.
        Type:
            number |
            ((outerValue: T, innerValue: ObservedValueOf, outerIndex: number, innerIndex: number) => R).

    concurrent:
        Optional.
        Default is Number.POSITIVE_INFINITY.
        Maximum number of input Observables being subscribed to concurrently.

Returns:
    OperatorFunction<T, ObservedValueOf<O> | R>:
        An Observable that emits the result of applying the projection function
        (and the optional deprecated resultSelector) to each item emitted
        by the source Observable and merging the results of the Observables
        obtained from this transformation.
        
そもそも単にmergeMap等を利用する分には、resultSelectorを使うほどの必要性はないかも知れません。

もし必要に迫られ、mergeMapの複数の内部処理を可能な限りカスタマイズする場合には、
resultSelectorの挙動を理解しておく必要があります。

resultSelectorの挙動を探るため、mergeMapにジェネレーターを噛ませた以下のようなテストコードでその挙動を探ってみましょう。

            
            import { Observable, of } from 'rxjs';
import { mergeMap } from 'rxjs/operators';

function genWithMergemap$(): Observable<any> {
    return of(1,5,8).pipe(
        mergeMap(
            (x: any, i: number) => (function* () {
                yield x;
                yield i;
            })(),
            // (outerValue: T, innerValue: ObservedValueOf, outerIndex: number, innerIndex: number)
            // (outerValue) oV: 1 --> 5 --> 8 (member in array of "of")
            // (innerValue) iV: return a member in array as the the generator has observed.
            // (outerIndex) oI: 0 --> 1 --> 2 (index of member in the array in "of").
            // (innerIndex) iI: 0 --> 1 (index that the generator rolls inside).
            (oV: any, iV: any, oI: number, iI: number) => {
                return `oV: ${oV}, iV: ${iV}, oI: ${oI}, iI: ${iI}`;
            }
        )
    );
}

const source = genWithMergemap$();
source.subscribe(
    x => console.log('Next: | %s |', x),
    e => console.log('Error: %s', e),
    () => console.log('Completed')
);
        
これをビルドして実行してみますと、

            
            $ node dist/index.js
Next: | oV: 1, iV: 1, oI: 0, iI: 0 |
Next: | oV: 1, iV: 0, oI: 0, iI: 1 |
Next: | oV: 5, iV: 5, oI: 1, iI: 0 |
Next: | oV: 5, iV: 1, oI: 1, iI: 1 |
Next: | oV: 8, iV: 8, oI: 2, iI: 0 |
Next: | oV: 8, iV: 2, oI: 2, iI: 1 |
Completed
        
...とresultSelectorを初見の方には、ちんぷんかんぷんな数字が4つ跳ね返ってきているように感じでしまうでしょう。

mergeMap(flatMap)オペレーターは1つの主ストリームから、また別のサブストリーム一つと合成して、新しいストリームを生成して流す仕組みです。

合同会社タコスキングダム|蛸壺の技術ブログ

※出典: Rxjs/mergeMap APIリファレンスページ
https://rxjs-dev.firebaseapp.com/api/operators/mergeMapより抜粋

公式ページの説明図でいうと、
1 --> 3 --> 5 -->という主ストリームが、10 --> 10 --> 10 -->というサブストリームに合成され、新しいストリームとして下流に流されるイメージが模してあります。

初学者向けの説明とはいえこの図だと、1つのストリームがもう一つ別のストリームと掛合わさって、新しい1つのストリームに合流するような印象を与えそうな気がします。

ですが実際は、
前回の記事 | ObservableのCOLDとHOTって結局なんなの?でも詳しく取り上げたように、通常のColdな性質を持つストリーム同士が合流することはありません。

この図でいうと、
1 --> 3 --> 5 -->と流される3つのColdなメインストリームの流れが、10 --> 10 --> 10 -->と流される3つのColdなサブストリームとぶつかって、新しい9つのColdなストリームとなって下流に流されるのです。

つまりは下流に流されるストリームの数は
3×3=93 \times 3 = 9です。

再び上記のソースコードに目を向けると、
ofオペレーターで1 --> 5 --> 8 -->として3つのOuterストリーム(主ストリーム)が流され、それをジェネレーターで流したInnerストリーム(サブストリーム)とぶつかります。

今回はmergeMapの第一引数をジェネレーター関数で指定して利用します。

この場合、
yeildが2回吐き出されている部分で、2つのストリームが新たに生成されていることになります。

これは、Innerストリームは2回相当となります。

ストリーム総数だけでいうと、
3×2=63 \times 2 = 6回処理が行われています。

Innerストリーム

コードの中身を詳しく見ていきます。

まず、mergeMapの第一引数(
projectと呼ばれる引数)に指定しているジェネレーター関数です。

ジェネレーター指定する場合には、
(value: T, index: number) => Oという形で利用する必要があります。

ここでの
Oに当たる部分にジェネレーター関数を指定しますので、

            
            //....
(x: any, i: number) => (function* () {
    yield x;
    yield i;
})()
//...
        
という形になっています。

こうすると、
yieldの数だけObservableが生成されるように働きます。

また、
projectで指定したObservableのストリームを、公式ではInnerストリームと呼んでいるようですので、以降では本記事内でもInnerストリームと呼びます。

Outerストリーム

元となったベースのObservableから生成された流れです。

今回のコードでは
of(1,5,8)から生成しています。

resultSelectorのマトリックス表記

resultSelectorの4つの引数である(outerValue: T, innerValue: ObservedValueOf<O>, outerIndex: number, innerIndex: number)をマトリックス表記でまとめてみます。

outerValueはOuterストリームから流されている値1,5,8であり、outerIndexでそれぞれのストリームに番号が与えられています。

対して
innerValueは、Innerストリームから流されている値です。

ここでの
innerIndexとは、yieldで押し出される順番であり、最初のyield x;0で、次のyield i;1の番号として押し出されます。

innerValueが特に混乱しやすいのですが、ジェネレーター関数が処理時点でyieldして押し出そうとしているオブザーブ値です。

今回でいうと
yieldしているのがxiのタイミングで出力される値が異なります。

では、
outerValueの方からマトリックス表記すると、以下のような式になります。

0    1Inner index012[115588]Outer index\begin{smallmatrix} & 0 \ \ \ \ 1 & \rightarrow \mathrm{Inner\ index} \\ & & \\ \begin{matrix} {\scriptsize 0} \\ {\scriptsize 1} \\ {\scriptsize 2} \end{matrix} & \begin{bmatrix} 1 & 1 \\ 5 & 5 \\ 8 & 8 \end{bmatrix} \\ \downarrow & & \\ \mathrm{Outer\ index} & & \end{smallmatrix}Eq. (1)

outerValueからすると、innerIndexがどうあれ値としてはOuterストリームの方の値が常に参照されます。

innerValueから値を取りたい場合には、以下のマトリックス表記のようになります。

0    1Inner index012[105182]Outer index\begin{smallmatrix} & 0 \ \ \ \ 1 & \rightarrow \mathrm{Inner\ index} \\ & & \\ \begin{matrix} {\scriptsize 0} \\ {\scriptsize 1} \\ {\scriptsize 2} \end{matrix} & \begin{bmatrix} 1 & 0 \\ 5 & 1 \\ 8 & 2 \end{bmatrix} \\ \downarrow & & \\ \mathrm{Outer\ index} & & \end{smallmatrix}Eq. (2)

こちらはouterValueの時と違い、
innerIndex0の時にyield x;が処理されたときの返り値の列、1の時にyield i;が処理されたときの返り値の列でそれぞれ構成される行列となります。


まとめ

以上までで、mergeMapでのresultSelectorを使う際の挙動をジェネレーターで説明しました。

これは
projectに通常のObservableや、Promiseや、async/awaitを入れたときにも応用の効く知識ですので、ぜひここで理解していただけたらと思います。


参考サイト

イテレーターとジェネレーター | MDN web docs

Generators and Observable Sequences