【Rxjs基礎講座】concatMapを使った同期処理を行う


2019/11/24

色々とお仕事の方(主にプログラミング)が立て込んでまいりまして、ブログの更新をすっかり疎かにしておりました。

今回のお題は表題の通り、AngularでrxjsのconcatMapを用いて、同期処理の応用をちょっとだけ特集してきます。

なお、本記事で動作確認した時のAngular環境は、
package.json上で、~8.2.0です。

ちなみに以前のブログ記事、
RxJSのMap系メソッドをコーディングしながら具体的にどう違うか考えてみるには、主なMap系のメソッドの違いを簡易的にまとめておりますので、そちらも興味があればご参考までに。


初段目のObservable

まずはconcatMapを利用して同期処理をさせる前に、今回の基礎になるストリームを作成します。

最初は単純なもので、
0 > 1 > 2 > 3を逐次流すだけです。

また、Observable発火後の結果を、ストリームの中身に1加えた物を出力させます。

            
            const base_obs = from([0, 1, 2, 3]);
const firststage_obs = base_obs.pipe(
    // x: 0 > 1 > 2 > 3
    tap(x => console.log(`Input: [x: ${x}]`)),
    map(x => x + 1)
);

console.log('============== 1st stage ==============');
firststage_obs.subscribe(val => console.log(`Result: ${val}`));
        
このコードを実行すると、

            
            ============== 1st stage ==============
Input: [x: 0]
Result: 1
Input: [x: 1]
Result: 2
Input: [x: 2]
Result: 3
Input: [x: 3]
Result: 4
        
と出力されます。


同期処理 ・ ストリームの2段目をつける場合

では、concatMapを使って、初段のストリームから後段のストリームを発生させ、最も単純な同期処理をさせてみましょう。

テストコードは以下のように変更・追加します。

            
            const base_obs = from([0, 1, 2, 3]);
const sub_obs = from([0, 1, 2, 3, 4]);
const secondstage_obs = base_obs.pipe(
    // x: 0 > 1 > 2 > 3
    concatMap(x => sub_obs.pipe(
        // y: 0 > 1 > 2 > 3 > 4
        tap(y => console.log(`Input: [x: ${x}, y: ${y}]`)),
        map(y => 5 * x + y)
    ))
);

console.log('============== 2nd stage ==============');
secondstage_obs.subscribe(val => console.log(`Result: ${val}`));
        
実行しますと、

            
            ============== 2nd stage ==============
Input: [x: 0, y: 0]
Result: 0
Input: [x: 0, y: 1]
Result: 1
Input: [x: 0, y: 2]
Result: 2
Input: [x: 0, y: 3]
Result: 3
Input: [x: 0, y: 4]
Result: 4
#....中略
Result: 17
Input: [x: 3, y: 3]
Result: 18
Input: [x: 3, y: 4]
Result: 19
        
が出力されているかと思います。

concatMapでは、ベースとなっているストリームから派生させたストリームに服従関係を付け、ストリームを同期させることができます。

派生ストリームが動作している時には、ベースストリームは動きを止めているようになります。

よって、httpリクエストで、サーバーからのなんからのレスポンスを受け取って、その後、なんからの処理をしたデータを別のサーバーへ送り出す...という同期的なユースケースに活用できます。


同期処理 ・ ストリーム3段目をつける場合

もっと欲を出して、基底のストームから派生させたストリームから、さらにもう一段ストリームを生やすとどうなるのでしょうか。

テストコードを以下のように、変更・追加します。

            
            const base_obs = from([0, 1, 2, 3]);
const sub_obs = from([0, 1, 2, 3, 4]);
const subsub_obs = from([0, 1, 2, 3, 4, 5]);

const thirdstage_obs = base_obs.pipe(
    // x: 0 > 1 > 2 > 3
    concatMap(x => sub_obs.pipe(
        // y: 0 > 1 > 2 > 3 > 4
        concatMap(y => subsub_obs.pipe(
            // z: 0 > 1 > 2 > 3 > 4 > 5
            tap(z => console.log(`Input: [x: ${x}, y: ${y}, z: ${z}]`)),
            map(z => 6 * 5 * x + 6 * y + z)
        ))
    ))
);

console.log('============== 3rd stage ==============');
thirdstage_obs.subscribe(val => console.log(`Result: ${val}`));
        
実行させると以下のように吐き出します(ストリーム3段だけあって長いですが...)。

            
            ============== 3rd stage ==============
Input: [x: 0, y: 0, z: 0]
Result: 0
Input: [x: 0, y: 0, z: 1]
Result: 1
Input: [x: 0, y: 0, z: 2]
Result: 2
Input: [x: 0, y: 0, z: 3]
Result: 3
Input: [x: 0, y: 0, z: 4]
Result: 4
#....中略
Input: [x: 3, y: 4, z: 0]
Result: 114
Input: [x: 3, y: 4, z: 1]
Result: 115
Input: [x: 3, y: 4, z: 2]
Result: 116
Input: [x: 3, y: 4, z: 3]
Result: 117
Input: [x: 3, y: 4, z: 4]
Result: 118
Input: [x: 3, y: 4, z: 5]
Result: 119
        
きちんと3つのストリームで同期できているようです。


まとめ

同期処理を2ストリーム以上で行いたい場合には、concatMapでパイプさせていくと良いと思います。

ただし、あまりにも多段でストリームを発生してしまうと、意図とせず処理パフォーマンス落としてしまったり、複雑な処理になりすぎてコードの可読性を低下させることになるかもしれません。

オペレーターの濫用は程々にしといた方がいいかもしれません。

記事を書いた人

記事の担当:taconocat

ナンデモ系エンジニア

主にAngularでフロントエンド開発することが多いです。 開発環境はLinuxメインで進めているので、シェルコマンドも多用しております。 コツコツとプログラミングするのが好きな人間です。