【Rxjsワンポイント講座】mergeMapのconcurrentパラメータでプロセスの同時実行数を制限する


※ 当ページには【広告/PR】を含む場合があります。
2025/03/19
【Angular活用講座】Rxjs:repeatオペレーターで一定時間間隔の処理(再帰的ループ)を行わせてみる

Rxjsにおいて、まとめて非同期処理をさせたい場合には、
forkJoinを使うケースも多いと思います。

しかし、何でもかんでもforkJoinすればいいわけでもなく、サーバー側の処理の制限があるようなユースケースでは、適切ではないときがあります。

というのもFetch APIを含んだ処理をforkJoinさせると、好き放題非同期処理(マイクロタスク)が走ったあげく、サーバー側のリクエスト数の制限に引っかかり、処理がスタックしたことがありました。

これを上手く処理させるために、forkJoinではなく、
concurrent付きのmergeMapを使おうというお話です。


合同会社タコスキングダム|蛸壺の技術ブログ【効果的学習法レポート・2025年最新】Angular(JSフレームワーク)をこれから学びたい人のためのオススメ書籍&教材特集

bufferCountではダメなのか?

Rxjsにおいて、実行制限を制限するケースに利用できる「buffer系」の関数オペレーターが存在します。

例えば今回の狙いからいうと、名前のイメージからして実行数を制限したい相当のものを
bufferCountで、実現できるような気もしそうです。

簡単なサンプルでbufferCountの作用を見てみると、

            import { interval, bufferCount } from 'rxjs';

//👇1秒間ごとにカウント数を流す
interval(1000).pipe(
   //👇指定した数だけ、流れてきた値をとり貯め、フルになったら逐次下流に流す
   bufferCount(3)
).subscribe(val =>
   console.log(val)
);
        
これを実行すると、

            [0,1,2] --> [3,4,5]  --> ...
        
というように処理されます。

しかし、bufferCountの場合、後段で値が流れてくるのを待ち構えているだけですので、前段の処理が好き勝手に処理を走らせても何かしら制御をしてくれるわけではありません。

結局、buffer系では、前段にいるforkJoinやMap系のオペレーターの"暴走"を止めることができないのです。


合同会社タコスキングダム|蛸壺の技術ブログ【効果的学習法レポート・2025年最新】Angular(JSフレームワーク)をこれから学びたい人のためのオススメ書籍&教材特集

mergeMapのconcurrentパラメータ

では、どのようにストリームの実行数を制御するのか、というと、mergeMapのオプションであるconcurrentを使うことになります。

通常、これはデフォルトで
Infty(無制限)ですが、具体的な数値を指定することで、同時実行数を制限することができます。

一例で見てみると、

            import { of, delay, mergeMap } from 'rxjs';

//👇数値を流す
const a$ = of(1,2,3,4,5,6,7,8,9,10).pipe(
   //👇何かしらの処理を想定...100ms遅延程度
   delay(100)
).subscribe(val =>
   console.log(val)
);

const b$ = of(1,2,3,4,5,6,7,8,9,10).pipe(
   //👇同時実行数を最大3に設定
   mergeMap(v => of(v), 3),
).subscribe(val =>
  console.log(val)
);
        
というように同時実行数を絞らないa$とmergeMapで制限したb$での違いを比較しましょう。

これを実行すると、結果は
1 -> 2 -> ... -> 10でどちらのストリームの処理も同じです。

違いは内部の処理であり、まずは
a$のストリームからみると、ofオペレーターから各値を流し始めた直後から処理が順次開始され、すべての処理の完了次第、ストリームが終わります。

合同会社タコスキングダム|蛸壺の技術ブログ

言わずもがな、普通のrxjsストリームの挙動です。

これが、concurrent指定のmergeMapを介在することで以下のような処理に変わります。

合同会社タコスキングダム|蛸壺の技術ブログ

見てのように、処理がconcurrents数以上は走らせることができず、処理中のストリームのどれかが完了するまで、残りのストリームは待機させることができるようになります。

これにより同時アクセス数に制限のあるようなリモートのAPIサービスでも、安全に利用することが可能となるでしょう。


合同会社タコスキングダム|蛸壺の技術ブログ【効果的学習法レポート・2025年最新】Angular(JSフレームワーク)をこれから学びたい人のためのオススメ書籍&教材特集

まとめ

以上、concurrent付きmergeMapを覚えておくといざというときに活用できるシーンがあるかもしれません。

forkJoinで太刀打ちできない課題に直面した際には、一度mergeMapでの置き換えができないか検討してみてはいかがでしょうか。

記事を書いた人

記事の担当:taconocat

ナンデモ系エンジニア

主にAngularでフロントエンド開発することが多いです。 開発環境はLinuxメインで進めているので、シェルコマンドも多用しております。 コツコツとプログラミングするのが好きな人間です。

合同会社タコスキングダム|蛸壺の技術ブログ【効果的学習法レポート・2025年最新】Angular(JSフレームワーク)をこれから学びたい人のためのオススメ書籍&教材特集