【Rxjs基礎講座】GeneratorをObservableへ変換する方法


※ 当ページには【広告/PR】を含む場合があります。
2020/07/17
2022/10/05
【Rxjs基礎講座】deferでasync/await関数からObservableへ変換する
【Rxjs活用講座】deferで作るステイトフルでPipeableなカスタムオペレーターの作り方



以前の
記事 | deferでasync/await関数からObservableへ変換する では、 async/await をObservableに変えてから処理を行うことが分かりました。

合同会社タコスキングダム|蛸壺の技術ブログ
【Rxjs基礎講座】deferでasync/await関数からObservableへ変換する

asycn/awaitで実装された関数をRxjs#deferオペレーターでObservableに変換する方法を解説します。



もっとニッチなところで、
「javascriptのgenerator関数もObservableへ変換できるのか」 、という疑問が出てきます。
今回は
generator 関数をObserbavleで実行するやり方を検証します。


合同会社タコスキングダム|蛸壺の技術ブログ【効果的学習法レポート・2025年最新】Angular(JSフレームワーク)をこれから学びたい人のためのオススメ書籍&教材特集

Generatorの簡単なおさらい



javascript generator 」でウェブ検索すると沢山の説明記事がヒットします。
なのでここでは深く説明しませんが、ECMAScript2015(ES6)から盛り込まれた機能で、ES7から正式に追加された
async/await 構文からすると先輩にあたるようです。
ジェネレーターは一見すると従来のイタレーターの拡張版に見えます。
しかし単なるイタレーターとは違って、遅延評価・同期処理・関数外部からの制御・無限シークエンス表現...などのテクニックを提供してくれる便利なものです。
typescriptではジェネレーターは
Generator<T> 型で扱います。
例えば簡単な実装で遊んでみますと、

            function* generator(): Generator<any> {
    yield 0;
    let i = 4;
    yield 1;
    yield 'HOGE';
    yield 2 + i;
    i++;
    yield 'PIYO';
    yield 1.0e-3 * i;
    yield false;
    yield 'FUGA';
    yield i - 3;
    yield true;
}

const iterator = generator();

console.log(iterator.next());
console.log(iterator.next());
console.log(iterator.next());
console.log(iterator.next());
console.log(iterator.next());
console.log(iterator.next());
console.log(iterator.next());
console.log(iterator.next());
console.log(iterator.next());
console.log(iterator.next());
console.log(iterator.next());
console.log(iterator.next());

        

これをビルドして実行すると、

            $ node dist/index.js
{ value: 0, done: false }
{ value: 1, done: false }
{ value: 'HOGE', done: false }
{ value: 6, done: false }
{ value: 'PIYO', done: false }
{ value: 0.005, done: false }
{ value: false, done: false }
{ value: 'FUGA', done: false }
{ value: 2, done: false }
{ value: true, done: false }
{ value: undefined, done: true }
{ value: undefined, done: true } # 処理が終わってもGeneratorは消滅せずに残る

        

というような出力を得ます。
見てお分かりのように、ジェネレーター内部で
yield された値が、外部からの next() の呼び出しによってその都度処理されて引き出されています。
そのときの返り値は
{value: any, done: boolean} 型のオブジェクトで、 value で処理された値を参照でき、 done でジェネレーターの処理完了を受け取ることができます。


合同会社タコスキングダム|蛸壺の技術ブログ【効果的学習法レポート・2025年最新】Angular(JSフレームワーク)をこれから学びたい人のためのオススメ書籍&教材特集

GeneratorからObservableへの変換



それでは早速、先程のジェネレーターを元にして、Observableに変換した処理を行ってみます。

検証① ~ fromオペレーターから



別のブログ記事で以前からfromオペレーターで
Promise --> Observableへの変換 を取り上げました。
実のところ、
fromオペレーター はGenerator型のオブジェクトからもObservableへ変換してくれるので、ジェネレーターも以下のように簡単に変換できます。

            import { Observable, from } from 'rxjs';
import { take } from 'rxjs/operators';

function* generator(): Generator<any> {
    yield 0;
    let i = 4;
    yield 1;
    yield 'HOGE';
    yield 2 + i;
    i++;
    yield 'PIYO';
    yield 1.0e-3 * i;
    yield false;
    yield 'FUGA';
    yield i - 3;
    yield true;
}

// 👇fromオペレーターからジェネレーターもObservableへ一発変換
const iterator$ = from(generator()).pipe(
    take(11)
);

iterator$.subscribe(
    res => console.log(res),
    err => console.log(err),
    () => console.log('DONE!')
);

        

これをビルドして実行すると、

            $ node dist/index.js
0
1
HOGE
6
PIYO
0.005
false
FUGA
2
true
DONE! # Generatorの処理が終わったらtakeでUnsubscribe

        

このようにジェネレーターの返り値の内
value の値が、Observableのsubscribeの next() へ流れてきているようです。
また、ジェネレーターが
donetrue に達したときと連動して、Observableのsubscribeも complete() が着火するように設計されています。

検証② ~ deferオペレーターから



Fromオペレーターが使えるのなら、
defer オペレーターも使えそうです。
以下試してみましょう。

            import { Observable, defer } from 'rxjs';
import { take } from 'rxjs/operators';

function* generator(): Generator<any> {
    yield 0;
    let i = 4;
    yield 1;
    yield 'HOGE';
    yield 2 + i;
    i++;
    yield 'PIYO';
    yield 1.0e-3 * i;
    yield false;
    yield 'FUGA';
    yield i - 3;
    yield true;
}

// 👇deferオペレーターからジェネレーターもObservableへ一発変換
const iterator$ = defer(generator).pipe(
    take(11)
);

iterator$.subscribe(
    res => console.log(res),
    err => console.log(err),
    () => console.log('DONE!')
);

        

とすることで、先程の結果と同じ結果を得ます。
fromオペレーターとの違いは、fromのほうが引数に
Generator<any> 型のインスタンス(実体)を指定するのに対して、deferは ジェネレーター関数 そのものを指定します。


合同会社タコスキングダム|蛸壺の技術ブログ【効果的学習法レポート・2025年最新】Angular(JSフレームワーク)をこれから学びたい人のためのオススメ書籍&教材特集

応用編 ~ mergeMap(flatMap)のresultSelector(第二引数)を理解する



上記まででfromオペレーターやdeferオペレーターがGenerator型の引数をカバーしてることを示したました。
map系のオペレーターも第一引数にGenerator関数を指定することで利用可能です。
ここではmap系も種類が多いのですべてのオペレーターを個別に使用例をご紹介することができません。
丁度いい機会ですので、とりわけmap系オペレーターの持つ謎多き第二引数
resultSelector の挙動の詳細に迫ってみます。

resultSelector

公式のAPIリファレンス にはこうあります。
関数の定義の部分だけ抜粋すると、

            Projects each source value to an Observable which is merged in the output Observable.

mergeMap<T, R, O extends ObservableInput<any>>(
    project: (value: T, index: number) => O,
    resultSelector?: number |
        ((outerValue: T, innerValue: ObservedValueOf<O>, outerIndex: number, innerIndex: number) => R),
    concurrent: number = Number.POSITIVE_INFINITY
): OperatorFunction<T, ObservedValueOf<O> | R>

Parameters:
    project:
        A function that, when applied to an item emitted by the source Observable,
        returns an Observable.

    resultSelector:
        Optional. Default is undefined.
        Type:
            number |
            ((outerValue: T, innerValue: ObservedValueOf, outerIndex: number, innerIndex: number) => R).

    concurrent:
        Optional.
        Default is Number.POSITIVE_INFINITY.
        Maximum number of input Observables being subscribed to concurrently.

Returns:
    OperatorFunction<T, ObservedValueOf<O> | R>:
        An Observable that emits the result of applying the projection function
        (and the optional deprecated resultSelector) to each item emitted
        by the source Observable and merging the results of the Observables
        obtained from this transformation.

        

そもそも単に
mergeMap 等を利用する分には、 resultSelector を使うほどの必要性はないかも知れません。
もし必要に迫られ、mergeMapの複数の内部処理を可能な限りカスタマイズする場合には、
resultSelector の挙動を理解しておく必要があります。
resultSelectorの挙動を探るため、mergeMapにジェネレーターを噛ませた以下のようなテストコードでその挙動を探ってみましょう。

            import { Observable, of } from 'rxjs';
import { mergeMap } from 'rxjs/operators';

function genWithMergemap$(): Observable<any> {
    return of(1,5,8).pipe(
        mergeMap(
            (x: any, i: number) => (function* () {
                yield x;
                yield i;
            })(),
            // (outerValue: T, innerValue: ObservedValueOf, outerIndex: number, innerIndex: number)
            // (outerValue) oV: 1 --> 5 --> 8 (member in array of "of")
            // (innerValue) iV: return a member in array as the the generator has observed.
            // (outerIndex) oI: 0 --> 1 --> 2 (index of member in the array in "of").
            // (innerIndex) iI: 0 --> 1 (index that the generator rolls inside).
            (oV: any, iV: any, oI: number, iI: number) => {
                return `oV: ${oV}, iV: ${iV}, oI: ${oI}, iI: ${iI}`;
            }
        )
    );
}

const source = genWithMergemap$();
source.subscribe(
    x => console.log('Next: | %s |', x),
    e => console.log('Error: %s', e),
    () => console.log('Completed')
);

        

これをビルドして実行してみますと、

            $ node dist/index.js
Next: | oV: 1, iV: 1, oI: 0, iI: 0 |
Next: | oV: 1, iV: 0, oI: 0, iI: 1 |
Next: | oV: 5, iV: 5, oI: 1, iI: 0 |
Next: | oV: 5, iV: 1, oI: 1, iI: 1 |
Next: | oV: 8, iV: 8, oI: 2, iI: 0 |
Next: | oV: 8, iV: 2, oI: 2, iI: 1 |
Completed

        

...と
resultSelector を初見の方には、ちんぷんかんぷんな数字が4つ跳ね返ってきているように感じでしまうでしょう。
mergeMap(flatMap)オペレーターは1つの主ストリームから、また別のサブストリーム一つと合成して、新しいストリームを生成して流す仕組みです。

合同会社タコスキングダム|蛸壺の技術ブログ


※出典: Rxjs/mergeMap APIリファレンスページ
https://rxjs-dev.firebaseapp.com/api/operators/mergeMap より抜粋
公式ページの説明図でいうと、
「1 --> 3 --> 5 --> ...」 という主ストリームが、 「10 --> 10 --> 10 --> ...」 というサブストリームに合成され、新しいストリームとして下流に流されるイメージが模してあります。
初学者向けの説明とはいえこの図だと、1つのストリームがもう一つ別のストリームと掛合わさって、新しい1つのストリームに合流するような印象を与えそうな気がします。
ですが実際は、前回の記事・
ObservableのCOLDとHOTって結局なんなの? でも詳しく取り上げたように、通常のColdな性質を持つストリーム同士が合流することはありません。

合同会社タコスキングダム|蛸壺の技術ブログ
【Rxjsのすゝめ】ObservableのCOLDとHOTって結局なんなの?

Rxjsを使いこなしていく中で一度は疑問に思うObservableのColdとHotの違いをまとめてみました。



この図でいうと、
「1 --> 3 --> 5 --> ...」 と流される3つのColdなメインストリームの流れが、 「10 --> 10 --> 10 --> ...」 と流される3つのColdなサブストリームとぶつかって、新しい9つのColdなストリームとなって下流に流されるのです。
つまりは下流に流されるストリームの数は$$3 \times 3 = 9$$です。
再び上記のソースコードに目を向けると、
of オペレーターで 「1 --> 5 --> 8 --> ...」 として3つのOuterストリーム(主ストリーム)が流され、それをジェネレーターで流したInnerストリーム(サブストリーム)とぶつかります。
今回はmergeMapの第一引数をジェネレーター関数で指定して利用します。
この場合、
yeild が2回吐き出されている部分で、2つのストリームが新たに生成されていることになります。
これは、Innerストリームは2回相当となります。
ストリーム総数だけでいうと、$$3 \times 2 = 6$$回処理が行われています。

Innerストリーム



コードの中身を詳しく見ていきます。
まず、mergeMapの第一引数(
project と呼ばれる引数)に指定しているジェネレーター関数です。
ジェネレーター指定する場合には、
(value: T, index: number) => O という形で利用する必要があります。
ここでの
O に当たる部分にジェネレーター関数を指定しますので、

            //....
(x: any, i: number) => (function* () {
    yield x;
    yield i;
})()
//...

        

という形になっています。
こうすると、
yield の数だけObservableが生成されるように働きます。
また、
project で指定したObservableのストリームを、公式ではInnerストリームと呼んでいるようですので、以降では本記事内でも Innerストリーム と呼びます。

Outerストリーム



元となったベースのObservableから生成された流れです。
今回のコードでは
of(1,5,8) から生成しています。

resultSelectorのマトリックス表記



resultSelectorの4つの引数である
(outerValue: T, innerValue: ObservedValueOf<O>, outerIndex: number, innerIndex: number) をマトリックス表記でまとめてみます。

outerValue はOuterストリームから流されている値 1,5,8 であり、 outerIndex でそれぞれのストリームに番号が与えられています。
対して
innerValue は、Innerストリームから流されている値です。
ここでの
innerIndex とは、 yield で押し出される順番であり、最初の yield x;0 で、次の yield i;1 の番号として押し出されます。

innerValue が特に混乱しやすいのですが、ジェネレーター関数が処理時点で yield して押し出そうとしているオブザーブ値です。
今回でいうと
yield しているのが xi のタイミングで出力される値が異なります。
では、
outerValue の方からマトリックス表記すると、以下のような式になります。

0    1Inner index012[115588]Outer index \begin{smallmatrix} & 0 \ \ \ \ 1 & \rightarrow \mathrm{Inner\ index} \\ & & \\ \begin{matrix} {\scriptsize 0} \\ {\scriptsize 1} \\ {\scriptsize 2} \end{matrix} & \begin{bmatrix} 1 & 1 \\ 5 & 5 \\ 8 & 8 \end{bmatrix} \\ \downarrow & & \\ \mathrm{Outer\ index} & & \end{smallmatrix} Eq. (1)


outerValueからすると、innerIndexがどうあれ値としてはOuterストリームの方の値が常に参照されます。

innerValue から値を取りたい場合には、以下のマトリックス表記のようになります。

0    1Inner index012[105182]Outer index \begin{smallmatrix} & 0 \ \ \ \ 1 & \rightarrow \mathrm{Inner\ index} \\ & & \\ \begin{matrix} {\scriptsize 0} \\ {\scriptsize 1} \\ {\scriptsize 2} \end{matrix} & \begin{bmatrix} 1 & 0 \\ 5 & 1 \\ 8 & 2 \end{bmatrix} \\ \downarrow & & \\ \mathrm{Outer\ index} & & \end{smallmatrix} Eq. (2)


こちらはouterValueの時と違い、
innerIndex0 の時に yield x; が処理されたときの返り値の列、 1 の時に yield i; が処理されたときの返り値の列でそれぞれ構成される行列となります。


合同会社タコスキングダム|蛸壺の技術ブログ【効果的学習法レポート・2025年最新】Angular(JSフレームワーク)をこれから学びたい人のためのオススメ書籍&教材特集

まとめ



以上までで、mergeMapでの
resultSelector を使う際の挙動をジェネレーターで説明しました。
これは
project に通常のObservableや、Promiseや、async/awaitを入れたときにも応用の効く知識ですので、ぜひここで理解していただけたらと思います。

参考サイト

イテレーターとジェネレーター | MDN web docsGenerators and Observable Sequences