【Angular活用講座】Rxjs:repeatオペレーターで一定時間間隔の処理(再帰的ループ)を行わせてみる


※ 当ページには【広告/PR】を含む場合があります。
2022/05/11
2022/10/06

不定期で紹介しているAngularの実装に便利な
Rxjsネタです。

前回はtimerオペレーターを使って一定時間間隔でイベント処理を行わせてみました。

合同会社タコスキングダム|蛸壺の技術ブログ
【Angular活用講座】Rxjsでエラーレスポンスを上手にリトライするためのサービスを作る

Angularプロジェクト開発で、通信エラー発生時のリトライを実装するためのtimerオペレーターを使った定期実行処理のやり方を検討します。

今回はrepeatオペレーターでも同じようなストリーム処理を行わせてみます。


2つのタイマー 〜 setIntervalと再帰setTimeoutの違いを知る

前回Rxjs:timerオペレーターで実装したタイマーは、Javascriptでいうところの「setInterval関数」の置き換えになっていると説明しまいました。

Javascripで一定時間関数で定期処理を行うタイマーのテクニックとして、
「再帰したsetTimeout関数」を使う方法もあります。

            
            let timerId = setTimeout(function tick() {

    //...定期実行したい処理を記述

    timerId = setTimeout(tick, 1000);
}, 1000);
        

これはsetTimeoutのコールバック関数の中で、更に新しいsetTimeout関数を再帰的に生成している使い方で、setIntervalよりも少し理解するのが複雑になりますが、これも一定の時間間隔で処理が繰り返し実行されることになります。

では、setIntervalで作った定期実行タイマーと、再帰setTimeoutで作ったタイマーは全く同じ機能になるかというと、中身はかなり違ってきます。

setIntervalベースのタイマーは、一定時間ごとにイベントがトリガーされる仕組みですが、再帰setTimeoutベースの場合、コールバック関数の中の処理が完了した後でさらに次の処理がスケジュールされる仕組みです。

つまり前者のsetIntervalベースのタイマーは、一定時間ごとの処理が終了しているか否か全く無関係に淡々とイベントを発生するタイマーですので、厳密な時間間隔での処理が行いたいときに使います。

他方で、後者の再帰setTimeoutベースのタイマーは、タイマーで発生した処理が完了を待ってから、更に一定時間間隔を開けて、次のサイクルを繰り返すタイプの処理です。

このため、コールバックの処理によってはサイクルごとの時間間隔は実質的に変化し、一定間隔は保証されませんが、内部処理の完了やエラーハンドリングなどより複雑な処理をしたい場合に向いています。

合同会社タコスキングダム|蛸壺の技術ブログ

こうして見ると、2つのタイマーの効能はかなり違うことが分かります。

では、
「Rxjs版の再帰setTimeoutタイマー」も実装したい時にどのようにしたら良いのかを次の節で考えます。


repeatオペレーターで作るRxjs版の再帰ループ処理

Rxjsで繰り返しの処理を行わせるオペレーターは何通りかありますが、再帰setTimeoutタイマーとして相性の良いものに
repeatオペレーターがあります。

ただし、intervalオペレーターやtimerオペレーターと違って、repeatオペレーターはパイプ内で利用するタイプのフィルターの一種ですので、それ自体は任意のタイミングでオブザーバブルを生成出来ないのが難点です。

そこでrepeatと併せて、以前利用方法を詳しく取り上げた
deferオペレーターを使う必要があります。

合同会社タコスキングダム|蛸壺の技術ブログ
【Rxjs基礎講座】deferとfromでPromiseをObservableへ変換するときの注意点

Promiseベースで書かれた外部ライブラリをRxjsでも利用したい場合にdeferとfromでObservableへ変換する時の注意点を考えていきます。

なお、ここでは
「RxJS v7以降」の書き方にしております。

v7より前だとライブラリ内のオペレーターは
rxjs/operatorsに収録されていたものが、v7からはrxjsのルートから呼び出せるように仕様が変更されています。

再帰ループ処理の基本形

Angularサービスのどこかのソースコードに実装するものとして、基本系な利用パターンとしては以下のようにして利用できると思います。

            
            import { defer, repeat, share, Subject, takeUntil } from 'rxjs';

//...中略

@Injectable({
    providedIn: 'any'
})
export class HogeTimerService {
    private stopPolling = new Subject();
    const source = defer(() => http.get('https://hoge.com/api'));
    private myInterval$ = source.pipe(
        repeat({ delay: 5000 }),
        share(),
        takeUntil(this.stopPolling)
    );

    //...略
        

例えばこのコードパターンを利用して、前回の
timerオペレーターのタイマー・サービスクラスを置き換えてみます。

タイマーを止めるのに
takeUntilを使ったり、ホットストリーム化で値を共有するのにshareを使ったりするのは共通です。

ただこの場合、エラーが起こったときに利用する
retryが必要なくなった分、エラーハンドリング処理は別に考慮する必要があります。

具体的な実装例は以下のようになるでしょう。

            
            import { Injectable, OnDestroy } from '@angular/core';
import { HttpClient } from '@angular/common/http';
import { Observable, defer,repeat, share, Subject, takeUntil } from 'rxjs';

@Injectable({
    providedIn: 'root'
})
export class HogeService implements OnDestroy {
    private myInterval$: Observable<any>;
    private stopNotifier = new Subject();

    constructor(private http: HttpClient) {
        this.myInterval$ = defer(() => http.get('https://hoge.com/api')).pipe(
            repeat({ delay: 5000 }),
            share(),
            takeUntil(this.stopNotifier)
        );
    }

    hogeInfo(): Observable<any> {
        return this.myInterval$;
    }

    ngOnDestroy() {
        this.stopNotifier.next(null);
    }
}
        
主にdeferrepeatだけで、Rxjs版再帰setTimeoutタイマーに書き換えることが出来ました。

repeatオペレーターの処理をもっとよく理解する

先程の内容の通り、再帰的なsetTimeoutタイマー相当の実装はRxjsだといとも簡単に出来てしまいます。

少し呆気なく感じてしまうかも知れませんので、もう少し動作周りを深堀してみましょう。

先程までの話で、再帰的な処理を以下のソースコードで確認してみます。

            
            let myInterval$: Observable<any>;
let stopNotifier = new Subject();

//👇待機するだけのsleepメソッド
const sleep = async (ms: number) => new Promise<void>((resolve) => setTimeout(resolve, ms));

//👇指定時間だけ遅れて実行されるだけのObservableを返す関数
function deferFromAsync$(ms: number) : Observable<any> {
    return defer(async () => {
        //👇指定時間(ミリ秒)だけ待機
        await sleep(ms);
        return ms;
    });
}

const baseTime = Date.now();
let counter = 0;

//👇内部処理時間として500ms待機
myInterval$ = deferFromAsync$(500).pipe(
    tap((res: number) => {
        console.log(`[ループ#${++counter}]遅延時間:${~~res}[ms] -> 積算経過時間:${Date.now() - baseTime}[ms]`);
    }),
    repeat({ delay: 1000 }),
    takeUntil(stopNotifier)
);

console.log(`ループ処理タイマーを開始します`);
myInterval$.subscribe();

//👇10秒後にタイマー終了する
setTimeout(() => {
    console.log(`10秒経過したのでタイマーを終了します`);
    stopNotifier.next(null);
}, 10000);
        
これを実行すると、以下のような動作が確認できます。

            
            ループ処理タイマーを開始します
[ループ#1]遅延時間:500[ms] -> 積算経過時間:507[ms]
[ループ#2]遅延時間:500[ms] -> 積算経過時間:2011[ms]
[ループ#3]遅延時間:500[ms] -> 積算経過時間:3513[ms]
[ループ#4]遅延時間:500[ms] -> 積算経過時間:5015[ms]
[ループ#5]遅延時間:500[ms] -> 積算経過時間:6517[ms]
[ループ#6]遅延時間:500[ms] -> 積算経過時間:8019[ms]
[ループ#7]遅延時間:500[ms] -> 積算経過時間:9521[ms]
10秒経過したのでタイマーを終了します
        
これは期待通り、ループごとに内部処理時間で500ms遅延した後に、repeatオペレーターが1000ms追加で間隔を空けて処理が繰り返されています。

メインプロセスで10秒経つと、
takeUntilオペレーターが終了通知を受け取って、Observableのストリームが全て終了します。

ちなみに
repeatオペレーターは、一度流れたストリームを再生するものなので、この場合、pipeのどこに置いても結果は同じです。

            
            //...
myInterval$ = deferFromAsync$(500).pipe(
    //👈repeat(...)はココでも良い
    tap(...),
    //👈repeat(...)はココでも良い
    tap(...),
    //👈repeat(...)はココでも良い
    tap(...),
    repeat({ delay: 1000 }),
    tap(...),
    //👈repeat(...)はココでも良い
    takeUntil(stopNotifier)
);
//...
        
ただし、takeUntilオペレーターは必ずpipeの最後に記述するものなので、以下は不可です。

            
            //...
myInterval$ = deferFromAsync$(500).pipe(
    tap(...),
    tap(...),
    tap(...),
    takeUntil(stopNotifier),
    repeat({ delay: 1000 }), //✖repeat(...)は機能しない
);
//...
        
このrepeatオペレーターの記述位置が任意なため、ストリームをパイプするときのrepeatの前後のオペレーターには一切干渉することがありません。

例えば先程の
loop.tsのObservableのパイプ処理を以下のように変えてみましょう。

            
            //...中略

myInterval$ = deferFromAsync$(500).pipe(
    tap((res: number) => {
        console.log(`[ループ#${++counter}]遅延時間:${~~res}[ms] -> 積算経過時間:${Date.now() - baseTime}[ms]`);
    }),
    repeat({ delay: 1000 }),
    tap((res: number) => {
        console.log(`[ループ#${++counter}]repeatの後ろはdelayしない -> 積算経過時間:${Date.now() - baseTime}[ms]`);
    }),
    takeUntil(stopNotifier)
);

//...以下略
        
これを実行しても、repeatオペレーターを挟んで前後のtapオペレーターは遅延しないことが分かります。

            
            ループ処理タイマーを開始します
[ループ#1]遅延時間:500[ms] -> 積算経過時間:507[ms]
[ループ#1]repeatの後ろはdelayしない -> 積算経過時間:507[ms]
[ループ#2]遅延時間:500[ms] -> 積算経過時間:2010[ms]
[ループ#2]repeatの後ろはdelayしない -> 積算経過時間:2011[ms]
[ループ#3]遅延時間:500[ms] -> 積算経過時間:3513[ms]
[ループ#3]repeatの後ろはdelayしない -> 積算経過時間:3513[ms]
[ループ#4]遅延時間:500[ms] -> 積算経過時間:5015[ms]
[ループ#4]repeatの後ろはdelayしない -> 積算経過時間:5015[ms]
[ループ#5]遅延時間:500[ms] -> 積算経過時間:6517[ms]
[ループ#5]repeatの後ろはdelayしない -> 積算経過時間:6517[ms]
[ループ#6]遅延時間:500[ms] -> 積算経過時間:8019[ms]
[ループ#6]repeatの後ろはdelayしない -> 積算経過時間:8020[ms]
[ループ#7]遅延時間:500[ms] -> 積算経過時間:9521[ms]
[ループ#7]repeatの後ろはdelayしない -> 積算経過時間:9521[ms]
10秒経過したのでタイマーを終了します
        
プログラムの目的によっては、パイプの各段で細かくdelay処理を制御を入れたいときに、このやり方だと少し悩ましいときがあります。

そこまでのニーズがあるかはわかりませんが、遅延時間の細かな制御までを考慮した、ある意味、Rxjsの再帰ループ処理の最終形態が、
「repeatとdelayを分離させて使う」ようなものになります。

            
            //...中略

myInterval$ = deferFromAsync$(500).pipe(
    tap((res: number) => {
        console.log(`[ループ#${++counter}]遅延時間:${~~res}[ms] -> 積算経過時間:${Date.now() - baseTime}[ms]`);
    }),
    delay(1000),
    tap((res: number) => {
        console.log(`[ループ#${counter}]1000msのdelayあり -> 積算経過時間:${Date.now() - baseTime}[ms]`);
    }),
    repeat(),
    takeUntil(stopNotifier)
);

//...以下略
        
これを実行すると、

            
            ループ処理タイマーを開始します
[ループ#1]遅延時間:500[ms] -> 積算経過時間:505[ms]
[ループ#1]1000msのdelayあり -> 積算経過時間:1507[ms]
[ループ#2]遅延時間:500[ms] -> 積算経過時間:2009[ms]
[ループ#2]1000msのdelayあり -> 積算経過時間:3010[ms]
[ループ#3]遅延時間:500[ms] -> 積算経過時間:3511[ms]
[ループ#3]1000msのdelayあり -> 積算経過時間:4513[ms]
[ループ#4]遅延時間:500[ms] -> 積算経過時間:5014[ms]
[ループ#4]1000msのdelayあり -> 積算経過時間:6015[ms]
[ループ#5]遅延時間:500[ms] -> 積算経過時間:6516[ms]
[ループ#5]1000msのdelayあり -> 積算経過時間:7516[ms]
[ループ#6]遅延時間:500[ms] -> 積算経過時間:8017[ms]
[ループ#6]1000msのdelayあり -> 積算経過時間:9026[ms]
[ループ#7]遅延時間:500[ms] -> 積算経過時間:9527[ms]
10秒経過したのでタイマーを終了します
        
のように動作します。

ここでの
delayオペレーターは単なる遅延なので、説明することはないでしょう。

ポイントは、引数なしの
repeatオペレーターは、「ストリームが終わり次第、即時ストリームを再生する」という意味です。

repeatからdelayの機能を分離することで、自由度の高い遅延処理を実装可能です。

以上ここまでで、
Rxjsでの再帰ループは「delay〜repeat〜takeUntil」パターン、と覚えておけば、もう迷うことはないでしょう。

めでたしめでたし。


参考サイト

スケジューリング: setTimeout と setInterval