[Rxjs] RxJSのMap系メソッドをコーディングしながら具体的にどう違うか考えてみる


2019/07/07

RxJSを使うとjavascriptの非同期処理が見通しよく構築出来て、個人的にも非常にお世話になっております。

とはいえ、
tapmapなどの頻度が高いメソッドは使うものの、mergeMapは滅多に使わないし、switchMapconcatMapに至っては何んぞや、くらいに使う頻度は皆無です。

今回は主題の通り、map系のメソッドがどう違いのかを自分の理解度を深めるためにまとめております。


map vs. mergeMap

早速違いをみて行きましょう。最初は基本型となるmapmergeMapとの比較です。

            
            import { of } from 'rxjs';
import { map } from 'rxjs/operators';

// Create Observable
const obs = of(0, 1, 2, 3, 4, 5, 6, 7, 8)

// Set a pipeline
const add_one = obs.pipe(
    map(x => x+1)
);

// Let the observable perform the pipeline proccess
add_one.subscribe( val => console.log(val));
        
単純に0~8を数字の中身をもつObservableのストリームに中身に1を足したObservableに変形させております。

これを実行すると、

            
            1
2
3
4
5
6
7
8
9
        
と結果が出力されます。

ここでのポイントは、
mapは基本的に1つのObservableを対象として、そのObservable対象を変形に加えています。

対して
mergeMapは複数のObservableに変形を加えるためのもので、以下の例では2つのObservableに対してmergeを行なっています。

            
            import { of } from 'rxjs';
import { map, mergeMap } from 'rxjs/operators';

// Create an Obervable
const base_obs = of(0, 3, 6);
const sub_obs = of(1, 2, 3);

// The first base Observable, base_obs, is overlayed with the second subsidiary
// Observable, sub_obs and then create new 'merged' Observable.
const merged_obs = base_obs.pipe(
    mergeMap(x => sub_obs.pipe(
            map(i => x+i)
        )
    )
);

merged_obs.subscribe(x => console.log(x));
        
実行結果は先程のmapと同じです。

            
            1
2
3
4
5
6
7
8
9
        


switchMapとconcatMap

それでは、mergeMapと同じように複数のObservableに作用するswitchMapconcatMapはどういったものなのか調査してみましょう。

まず
switchMapですが、先程のmergeMapのコーディング例をそのまま何も考えずswitchMapに入れ替えてみます。

            
            import { of } from 'rxjs';
import { map, switchMap } from 'rxjs/operators';

// Create an Obervable
const base_obs = of(0, 3, 6);
const sub_obs = of(1, 2, 3);

// The first base Observable, base_obs, is overlayed with the second subsidiary
// Observable, sub_obs and then create new 'switched' Observable.
const switched_obs = base_obs.pipe(
    switchMap(x => sub_obs.pipe(
            map(i => x+i)
        )
    )
);

switched_obs.subscribe(x => console.log(x));
        
実行すると…

            
            1
2
3
4
5
6
7
8
9
        
これはmergeMapと全く一緒です。

ならば、
concatMapはどうでしょうか。

            
            import { of } from 'rxjs';
import { map, concatMap } from 'rxjs/operators';

// Create an Obervable
const base_obs = of(0, 3, 6);
const sub_obs = of(1, 2, 3);

// The first base Observable, base_obs, is overlayed with the second subsidiary
// Observable, sub_obs and then create new 'concatenated' Observable.
const concat_obs = base_obs.pipe(
    concatMap(x => sub_obs.pipe(
            map(i => x+i)
        )
    )
);

concat_obs.subscribe(x => console.log(x));
        
実行してみますと、

            
            1
2
3
4
5
6
7
8
9
        
こちらも全く一緒です。

つまりごく単純な例では
mergeMap / switchMap / concatMap 共に違いがありません…

おそらく
非同期処理をさせることではっきりとした違いを出せる感じです。


非同期処理で違いをみる

今度は、ベースとなるObservableのストリームはそのまま自然に処理を流して、横からちょっかいを出すサブのObservableには100msのインターバルを与えて、サブストリームを流れを遮ってみるようにコードを変えてみましょう。

なお100msのインターバルを与えるために、
interval(100)Observablezipメソッドで付随させております。

mergeMap

            
            import { of, interval } from 'rxjs';
import { map, mergeMap, zip } from 'rxjs/operators';

// Create an Obervable
const base_obs = of(0, 3, 6);
const sub_obs = of(1, 2, 3);

// The first base Observable, base_obs, is overlayed with the second subsidiary
// Observable, sub_obs and then create new 'merged' Observable.
const merged_obs = base_obs.pipe(
    mergeMap(x => sub_obs.pipe(
            map(i => x+i),
            zip(interval(100)) // <-- Inserted
        )
    )
);

merged_obs.subscribe(x => console.log(x));
        
実行結果は

            
            [ 1, 0 ]
[ 4, 0 ]
[ 7, 0 ]
[ 2, 1 ]
[ 5, 1 ]
[ 8, 1 ]
[ 3, 2 ]
[ 6, 2 ]
[ 9, 2 ]
        
結果は、サブのストリームが100ms間隔で 0 -> 1 -> 2 と間隔を保って停まって処理される間も、ベースのストリームは思うがままにそのまま流れて行っております。

switchMap

            
            import { of, interval } from 'rxjs';
import { map, switchMap, zip } from 'rxjs/operators';

// Create an Obervable
const base_obs = of(0, 3, 6);
const sub_obs = of(1, 2, 3);

// The first base Observable, base_obs, is overlayed with the second subsidiary
// Observable, sub_obs and then create new 'switched' Observable.
const switched_obs = base_obs.pipe(
    switchMap(x => sub_obs.pipe(
            map(i => x+i),
            zip(interval(100))
        )
    )
);

switched_obs.subscribe(x => console.log(x));
        
実行しますと、

            
            [ 7, 0 ]
[ 8, 1 ]
[ 9, 2 ]
        
という結果が得られます。

これはサブストリームがインターバル間隔中で、ベースのストリームで処理が完了した場合、ベースの
Observableで最後に入った処理(ここではOf(6))以外のそれ以前に生じていて処理完了したもの(Of(0,3))は無かったものとしてキャンセルされているようです。

concatMap

            
            import { of, interval } from 'rxjs';
import { map, concatMap, zip } from 'rxjs/operators';

// Create an Obervable
const base_obs = of(0, 3, 6);
const sub_obs = of(1, 2, 3);

// The first base Observable, base_obs, is overlayed with the second subsidiary
// Observable, sub_obs and then create new 'concatenated' Observable.
const concat_obs = base_obs.pipe(
    concatMap(x => sub_obs.pipe(
            map(i => x+i),
            zip(interval(100))
        )
    )
);

concat_obs.subscribe(x => console.log(x));
        
これを実行すると、

            
            [ 1, 0 ]
[ 2, 1 ]
[ 3, 2 ]
[ 4, 0 ]
[ 5, 1 ]
[ 6, 2 ]
[ 7, 0 ]
[ 8, 1 ]
[ 9, 2 ]
        
となります。

こちらはサブの
Observableが間隔待ちしている間も、ベースとなるメインのObservableの処理にも介入して同期処理をさせている感じになります。


Takeaway

以上まとめますと、

            
            - map:
    1つのObservableの処理の中身を変形させたい場合に使いましょう。

- mergeMap:
    2つ以上のObservableを非同期処理させたい場合に使いましょう。

- switchMap:
    非同期処理したい場合で、一連の処理の過程は重要視されず、
    もっとも新しい処理以外は棄却されても良い場合にはこっちを使いましょう。

- concatMap:
    2つ以上のObservableで同期処理させたい場合に使いましょう。
        

参考

RxJS - Reactive Extensions Library for JavaScript

ANGULAR - RxJS ライブラリ

RxJS を学ぼう #1 - これからはじめる人のための導入編

RxJSのconcatMap, mergeMap, switchMapの違いを理解する(中級者向け)

記事を書いた人

記事の担当:taconocat

ナンデモ系エンジニア

主にAngularでフロントエンド開発することが多いです。 開発環境はLinuxメインで進めているので、シェルコマンドも多用しております。 コツコツとプログラミングするのが好きな人間です。