カテゴリー
【Rxjsワンポイント講座】mergeMapのconcurrentパラメータでプロセスの同時実行数を制限する
※ 当ページには【広告/PR】を含む場合があります。
2025/03/19
Rxjsにおいて、まとめて非同期処理をさせたい場合には、
しかし、何でもかんでもforkJoinすればいいわけでもなく、サーバー側の処理の制限があるようなユースケースでは、適切ではないときがあります。
というのもFetch APIを含んだ処理をforkJoinさせると、好き放題非同期処理(マイクロタスク)が走ったあげく、サーバー側のリクエスト数の制限に引っかかり、処理がスタックしたことがありました。
これを上手く処理させるために、forkJoinではなく、
concurrent
bufferCountではダメなのか?
Rxjsにおいて、実行制限を制限するケースに利用できる「buffer系」の関数オペレーターが存在します。
例えば今回の狙いからいうと、名前のイメージからして実行数を制限したい相当のものを
簡単なサンプルでbufferCountの作用を見てみると、
import { interval, bufferCount } from 'rxjs';
//👇1秒間ごとにカウント数を流す
interval(1000).pipe(
//👇指定した数だけ、流れてきた値をとり貯め、フルになったら逐次下流に流す
bufferCount(3)
).subscribe(val =>
console.log(val)
);
これを実行すると、
[0,1,2] --> [3,4,5] --> ...
というように処理されます。
しかし、bufferCountの場合、後段で値が流れてくるのを待ち構えているだけですので、前段の処理が好き勝手に処理を走らせても何かしら制御をしてくれるわけではありません。
結局、buffer系では、前段にいるforkJoinやMap系のオペレーターの"暴走"を止めることができないのです。
mergeMapのconcurrentパラメータ
では、どのようにストリームの実行数を制御するのか、というと、
concurrent
通常、これはデフォルトで
Infty
一例で見てみると、
import { of, delay, mergeMap } from 'rxjs';
//👇数値を流す
const a$ = of(1,2,3,4,5,6,7,8,9,10).pipe(
//👇何かしらの処理を想定...100ms遅延程度
delay(100)
).subscribe(val =>
console.log(val)
);
const b$ = of(1,2,3,4,5,6,7,8,9,10).pipe(
//👇同時実行数を最大3に設定
mergeMap(v => of(v), 3),
).subscribe(val =>
console.log(val)
);
というように同時実行数を絞らない
a$
b$
これを実行すると、結果は
1 -> 2 -> ... -> 10
違いは内部の処理であり、まずは
a$
701x527

言わずもがな、普通のrxjsストリームの挙動です。
これが、concurrent指定のmergeMapを介在することで以下のような処理に変わります。
687x570

見てのように、処理がconcurrents数以上は走らせることができず、処理中のストリームのどれかが完了するまで、残りのストリームは待機させることができるようになります。
これにより同時アクセス数に制限のあるようなリモートのAPIサービスでも、安全に利用することが可能となるでしょう。
まとめ
以上、concurrent付きmergeMapを覚えておくといざというときに活用できるシーンがあるかもしれません。
forkJoinで太刀打ちできない課題に直面した際には、一度mergeMapでの置き換えができないか検討してみてはいかがでしょうか。
記事を書いた人
ナンデモ系エンジニア
主にAngularでフロントエンド開発することが多いです。 開発環境はLinuxメインで進めているので、シェルコマンドも多用しております。 コツコツとプログラミングするのが好きな人間です。
カテゴリー