カテゴリー
【Rxjs基礎講座】RxJSのMap系メソッドをコーディングしながら具体的にどう違うか考えてみる
※ 当ページには【広告/PR】を含む場合があります。
2019/07/07
2022/10/05
「RxJS」
とはいえ、
tap
map
mergeMap
switchMap
concatMap
今回は主題の通り、map系のメソッドがどう違いのかを自分の理解度を深めるためにまとめております。
リアクティブな関数型プログラミングの基礎 関数型リアクティブプログラミング (Programmer's SELECTION)
RxjsでのmapとmergeMapの違いを理解する
まずはmap系のオペレーターでもっとも基礎的な
map
mergeMap
import { of } from 'rxjs';
import { map } from 'rxjs/operators';
//👇オブザーバブルストリームを作成
const obs = of(0, 1, 2, 3, 4, 5, 6, 7, 8);
//※なお配列からストリームを作成する場合、fromオペレーターのほうが簡単
//const obs = from([0, 1, 2, 3, 4, 5, 6, 7, 8]);
//※ES6以降ならスプレッド構文を使って配列とofでストリーム作成が可能
//const obs = of(...[0, 1, 2, 3, 4, 5, 6, 7, 8]);
//👇オブザーバブルにパイプからmapオペレーターを仕込む
const add_one = obs.pipe(
map(x => x+1)
);
//👇ストリームの開始
add_one.subscribe(val => console.log(val));
単純に0~8を数字の中身をもつ
Observable
Observable
これを実行すると、
1
2
3
4
5
6
7
8
9
と結果が出力されます。
このストリームを把握するために図解すると、

のようになっています。
ここでのポイントとしては、mapオペレーターは基本的に1つのオブザーバブルに作用するもので、結果としてそのオブザーバブルの中身を単純に変形するような働きをします。
この意味で、mapがもっとも単純な
これに対して
mergeMap
以下の例では2つのオブザーバブルに対してmergeMapを行なってみましょう。
import { of } from 'rxjs';
import { map, mergeMap } from 'rxjs/operators';
// Create an Obervable
const base_obs = of(0, 3, 6);
const sub_obs = of(1, 2, 3);
// The first base Observable, base_obs, is overlayed with the second subsidiary
// Observable, sub_obs and then create new 'merged' Observable.
const merged_obs = base_obs.pipe(
mergeMap(x => sub_obs.pipe(
map(i => x+i)
)
)
);
merged_obs.subscribe(x => console.log(x));
実行結果は先程のmapの場合と同じになります。
1
2
3
4
5
6
7
8
9
先程の説明したように、mapオペレーターがストリームの中身自体を変形して流していたのに対して、

ここでの例でいうと、baseストリームの各要素が下流のsubストリームの要素へ合流するように処理が進みます。
基本的に一つのストリームは非同期処理ですので、ストリームの中の要素がどのタイミングで消費されるのかは制御されずにそのストリームの好き勝手に処理されていきます。
このため、場合によっては上の実行結果ように
「1 > 2 > 3 > ...」
リアクティブな関数型プログラミングの基礎 関数型リアクティブプログラミング (Programmer's SELECTION)
switchMapとconcatMap
mergeMap
switchMap
concatMap
このswitchMapとconcatMapがどういったものなのかも調査してみましょう。
まず
switchMap
mergeMap
switchMap
import { of } from 'rxjs';
import { map, switchMap } from 'rxjs/operators';
// Create an Obervable
const base_obs = of(0, 3, 6);
const sub_obs = of(1, 2, 3);
// The first base Observable, base_obs, is overlayed with the second subsidiary
// Observable, sub_obs and then create new 'switched' Observable.
const switched_obs = base_obs.pipe(
switchMap(x => sub_obs.pipe(
map(i => x+i)
)
)
);
switched_obs.subscribe(x => console.log(x));
実行すると…
1
2
3
4
5
6
7
8
9
これは
mergeMap
ならば、
concatMap
import { of } from 'rxjs';
import { map, concatMap } from 'rxjs/operators';
// Create an Obervable
const base_obs = of(0, 3, 6);
const sub_obs = of(1, 2, 3);
// The first base Observable, base_obs, is overlayed with the second subsidiary
// Observable, sub_obs and then create new 'concatenated' Observable.
const concat_obs = base_obs.pipe(
concatMap(x => sub_obs.pipe(
map(i => x+i)
)
)
);
concat_obs.subscribe(x => console.log(x));
実行してみますと、
1
2
3
4
5
6
7
8
9
こちらも全く一緒です。
つまり通常の非同期処理を行うオブザーバブルストリーム同士でストリーム合成しても
mergeMap
switchMap
concatMap

この挙動をRxjsの入門者が最初に目にされたときには、「なんじゃこれは...」と感じられはずです。
これは以降で説明するように
同期処理
リアクティブな関数型プログラミングの基礎 関数型リアクティブプログラミング (Programmer's SELECTION)
mergeMapとswitchMapとconcatMapの違いを同期処理でみてみる
今度は、ベースのオブザーバブルのストリームはそのまま自然に処理を流して(つまり非同期)、横からちょっかいを出すサブのオブザーバブルには100msのインターバルを与えて(つまり同期的な流れにする)、サブストリームからベースストリームを遮ってみるようにコードを変えてみましょう。
なお100msのインターバルを与えるために
interval(100)
mergeMapを使う場合
まずはmergeMapで使ったコードを以下のように修正します。
import { of, interval } from 'rxjs';
import { map, mergeMap, zip } from 'rxjs/operators';
const base_obs = of(0, 3, 6);
const sub_obs = of(1, 2, 3);
const merged_obs = base_obs.pipe(
mergeMap(x => sub_obs.pipe(
map(i => x+i),
zip(interval(100)) //👈追記。100msおきにsubの要素が流れる
)
)
);
merged_obs.subscribe(x => console.log(x));
実行結果は
[ 1, 0 ]
[ 4, 0 ]
[ 7, 0 ]
[ 2, 1 ]
[ 5, 1 ]
[ 8, 1 ]
[ 3, 2 ]
[ 6, 2 ]
[ 9, 2 ]
となります。
出力はzipで2つの出力が配列形式で紐付けで吐き出されており、第一要素が
ベース > サブ

この実行結果は、サブのストリームが100ms間隔で間隔を保って同期的に処理される間も、ベースのストリームは思うがままにそのまま止まることなく流れているのが分かります。
このため「mergeMap」は下流の合成させるオブザーバブルのストリームを一切考慮しない(考慮する必要がない)ときに利用する単純(フラット)なストリーム合成オペレーターです。
元々mergeMapという名前になる前には
何がどうフラットなのか?という割と面倒な話を暗に避けたい意図があったのかどうかは知りませんが、flatという言葉は用いられなくなりました。
switchMapを使う場合
ではswitchMapの場合を見ていきます。
import { of, interval } from 'rxjs';
import { map, switchMap, zip } from 'rxjs/operators';
const base_obs = of(0, 3, 6);
const sub_obs = of(1, 2, 3);
const switched_obs = base_obs.pipe(
switchMap(x => sub_obs.pipe(
map(i => x+i),
zip(interval(100))
)
)
);
switched_obs.subscribe(x => console.log(x));
実行しますと、
[ 7, 0 ]
[ 8, 1 ]
[ 9, 2 ]
という結果が得られます。
これはサブストリームがインターバル間隔中にベースのストリームで処理が完了した場合、ベースのオブザーバブルでの最新の処理(ここでは
Of(6)
Of(0,3)

switchMapの最新の処理以外はキャンセルするこの性質は非常に便利で、例えば代表的なユースケースとして、
またウェブページをスクロールするときにページ座標を常時監視して下流のストリームへ流して別の処理する場合などにも、switchMapの利用が検討できます。
concatMapを使う場合
最後にconcatMapを使ってみます。
import { of, interval } from 'rxjs';
import { map, concatMap, zip } from 'rxjs/operators';
const base_obs = of(0, 3, 6);
const sub_obs = of(1, 2, 3);
const concat_obs = base_obs.pipe(
concatMap(x => sub_obs.pipe(
map(i => x+i),
zip(interval(100))
)
)
);
concat_obs.subscribe(x => console.log(x));
これを実行すると、
[ 1, 0 ]
[ 2, 1 ]
[ 3, 2 ]
[ 4, 0 ]
[ 5, 1 ]
[ 6, 2 ]
[ 7, 0 ]
[ 8, 1 ]
[ 9, 2 ]
となります。
こちらはサブのオブザーバブルが間隔待ちしている間、ベースとなるメインのオブザーバブルの処理にも介入して同期処理をさせている感じになります。

concatMapを使うと、完全な同期処理をストリーム全体に強制することでできます。
非同期処理だったストリームも同期処理にさせられるので、処理時間が遅くなるのがデメリットですが、全ての要素が同期的に処理させるという保証が欲しい時につかうのがconcatMapです。
例えばRestAPIなどでサーバーへ複数のクエリリストを用意しておいて、そこから順次リクエストしたい場合に一斉に同時多発的なリクエストを投げると困るので、concatMapで順番にリクエストさせるなどに使えます。
リアクティブな関数型プログラミングの基礎 関数型リアクティブプログラミング (Programmer's SELECTION)
Takeaway〜まとめ
以上まとめますと、
- map:
1つのObservableの処理の中身を変形させたい場合に使いましょう。
- mergeMap:
2つ以上のObservableを非同期処理させたい場合に使いましょう。
- switchMap:
非同期処理したい場合で、一連の処理の過程は重要視されず、
もっとも新しい処理以外は棄却されても良い場合にはこっちを使いましょう。
- concatMap:
2つ以上のObservableで同期処理させたい場合に使いましょう。
参考
記事を書いた人
ナンデモ系エンジニア
主にAngularでフロントエンド開発することが多いです。 開発環境はLinuxメインで進めているので、シェルコマンドも多用しております。 コツコツとプログラミングするのが好きな人間です。
カテゴリー