【Angular活用講座】Rxjs:repeatオペレーターで一定時間間隔の処理を行わせてみる


2022/05/11

不定期で紹介しているAngularの実装に便利な
Rxjsネタです。

前回はtimerオペレーターを使って一定時間間隔でイベント処理を行わせてみましたが、今回はrepeatオペレーターでも同じようなストリーム処理を行わせてみます。


2つのタイマー 〜 setIntervalと再帰setTimeoutの違いを知る

前回Rxjs:timerオペレーターで実装したタイマーは、Javascriptでいうところの「setInterval関数」の置き換えになっていると説明しまいました。

Javascripで一定時間関数で定期処理を行うタイマーのテクニックとして、
「再帰したsetTimeout関数」を使う方法もあります。

            
            let timerId = setTimeout(function tick() {

    //...定期実行したい処理を記述

    timerId = setTimeout(tick, 1000);
}, 1000);
        

これはsetTimeoutのコールバック関数の中で、更に新しいsetTimeout関数を再帰的に生成している使い方で、setIntervalよりも少し理解するのが複雑になりますが、これも一定の時間間隔で処理が繰り返し実行されることになります。

では、setIntervalで作った定期実行タイマーと、再帰setTimeoutで作ったタイマーは全く同じ機能になるかというと、中身はかなり違ってきます。

setIntervalベースのタイマーは、一定時間ごとにイベントがトリガーされる仕組みですが、再帰setTimeoutベースの場合、コールバック関数の中の処理が完了した後でさらに次の処理がスケジュールされる仕組みです。

つまり前者のsetIntervalベースのタイマーは、一定時間ごとの処理が終了しているか否か全く無関係に淡々とイベントを発生するタイマーですので、厳密な時間間隔での処理が行いたいときに使います。

他方で、後者の再帰setTimeoutベースのタイマーは、タイマーで発生した処理が完了を待ってから、更に一定時間間隔を開けて、次のサイクルを繰り返すタイプの処理です。

このため、コールバックの処理によってはサイクルごとの時間間隔は実質的に変化し、一定間隔は保証されませんが、内部処理の完了やエラーハンドリングなどより複雑な処理をしたい場合に向いています。

合同会社タコスキングダム|蛸壺の技術ブログ

こうして見ると、2つのタイマーの効能はかなり違うことが分かります。

では、
「Rxjs版の再帰setTimeoutタイマー」も実装したい時にどのようにしたら良いのかを次の節で考えます。


rpeatオペレーターで作るRxjs版の再帰setTimeoutタイマー

Rxjsで繰り返しの処理を行わせるオペレーターは何通りかありますが、再帰setTimeoutタイマーとして相性の良いものに
repeatオペレーターがあります。

ただし、intervalオペレーターやtimerオペレーターと違って、repeatオペレーターはパイプ内で利用するタイプのフィルターの一種ですので、それ自体は任意のタイミングでオブザーバブルを生成出来ないのが難点です。

そこでrepeatと併せて、
以前利用方法を詳しく取り上げたdeferオペレーターを使う必要があります。

ということでAngularサービスのソースコードに実装する基本系として、

            
            const { defer,repeat, share, Subject, takeUntil } = require('rxjs');

//...中略

private stopPolling = new Subject();

const source = defer(() => http.get('https://hoge.com/api'));

this.myInterval$ = source.pipe(
    repeat({ delay: 5000 }),
    share(),
    takeUntil(this.stopPolling)
);
        
のようにして前回のtimerオペレーターのタイマーを置き換えます。

タイマーを止めるのにtakeUntilを使ったり、ホットストリーム化で値を共有するのにshaeを使ったりするのは共通です。

ただこの場合、エラーが起こったときの保険としてのretryが必要なくなった分、エラーハンドリング処理は別に考慮する必要があります。

主にdeferとrepeatで前回のsetIntervalベースのタイマーを置き換えただけで、Rxjs版再帰setTimeoutタイマーに書き換えることが出来ました。

以上からAngularサービスとしての今回の実装の内容を一通りまとめてみると、

            
            import { Injectable, OnDestroy } from '@angular/core';
import { HttpClient } from '@angular/common/http';
import { Observable, defer,repeat, share, Subject, takeUntil } from 'rxjs';

@Injectable({
    providedIn: 'root'
})
export class HogeService implements OnDestroy {
    private myInterval$: Observable<any>;
    private stopNotifier = new Subject();

    constructor(private http: HttpClient) {
        this.myInterval$ = defer(() => http.get('https://hoge.com/api')).pipe(
            repeat({ delay: 5000 }),
            share(),
            takeUntil(this.stopNotifier)
        );
    }

    hogeInfo(): Observable<any> {
        return this.myInterval$;
    }

    ngOnDestroy() {
        this.stopNotifier.next(null);
    }
}
        
という感じに落ち着きます。めでたしめでたし。


参考サイト

スケジューリング: setTimeout と setInterval

記事を書いた人

記事の担当:taconocat

ナンデモ系エンジニア

主にAngularでフロントエンド開発することが多いです。 開発環境はLinuxメインで進めているので、シェルコマンドも多用しております。 コツコツとプログラミングするのが好きな人間です。