【Angular活用講座】Rxjsで一定時間間隔でストリーム実行するためのサービスを作る


2022/05/10

不定期で紹介しているAngularの実装に便利な
Rxjsネタです。

今回はタイトルのように
「一定時間間隔でイベントをトリガーするようなストリームを持つサービスを作成する」ということをやってみます。


Rxjsで定期実行スケジューラーの作成する

いくつか実装のポイントがありますので、順番に解説していきます。

intervalオペレーターとtimerオペレーター

Rxjsの一定時間間隔のインターバル処理で簡単に使えるのは、主にintervalオペレーターtimerオペレーターがあります。

intervalオペレーターはJavascriptでいうところの、setInterval関数に当たるものです。

            
            setInterval(() => {
    //...定期実行したい処理を記述
}, 1000);
        

intervalオペレーターで定期実行する際に悩ましい点として、ストリームを流し始めの最初の値も待機してから実行されます。つまり、1秒間隔なら最初の値も1秒待ってしまうのです。

これだとストリームを開始したら即時処理が始まってほしい時に困ります。

これを解決するのが(引数2つを渡す時の)timerオペレータです。最初の引数で実行開始時間を、2つ目でそれ以降の定期実行される時間間隔を指定します。

例えば以下は1ミリ秒経過後に最初の値を流して、それから5秒ごとに値を流しています。

            
            const { timer } = require('rxjs');

const timer$ = timer(1, 5000);

timer$.subscribe((val) => console.log(val));
        
定期実行させる中身はswitchMapなどのMap系オペレータを利用し、別のストリームから利用します。

Map系オペレーターの微妙な作用の違いは以前の記事で取り上げていましたので、詳しいことはそちらでご確認ください。

例えば、HTTPモジュールのgetメソッドから、どこかのWebAPIエンドポイントを定期間隔で非同期で叩きたい場合には、

            
            const { timer, switchMap } = require('rxjs');

//...中略

this.myInterval$ = timer(1, 5000).pipe(
    switchMap(_ => http.get('https://hoge.com/api'))
);
        
と言うような感じで使います。

ストリームをフェイルセーフにする

必ずしもそうではないのですが、処理がコケる可能性のある不安がある場合には、保険としてretryオペレータでフェイルセーフな処理を挟むことも可能です。

先ほどの例だと、

            
            const { timer, switchMap, retry } = require('rxjs');

//...中略

this.myInterval$ = timer(1, 5000).pipe(
    switchMap(_ => http.get('https://hoge.com/api')),
    retry()
);
        
という感じに後段にretryを挟むと、上流のストリームで処理エラーを検知した場合にretryオペレータが再度処理を行います。

shareで全てのオブザーバーへブロードキャストする

shareオペレーターでホットストリームにして、全てのサブスクライバーにストリームの値をブロードキャスト(共有)することもできます。

            
            const { timer, switchMap, retry, share } = require('rxjs');

//...中略

this.myInterval$ = timer(1, 5000).pipe(
    switchMap(_ => http.get('https://hoge.com/api')),
    retry(),
    share()
);
        
とすると、ホットストリーム化できて、流された値を常に共有することができます。

定期実行を止める

通常intervalもtimerも流しっぱなしですので、ストリームを止めるにはunsubscribe()することが望ましい止め方です。

とはいえ、
unsubscribe()し忘れると、不要になった後もAngularアプリが起動し続けるまでストリームは実行され続けます。

不要になった場合は即座に
unsubscribe()する仕組みとして、takeUntilオペレーターを組み合わせた方法が定石のテクニックがあります。

takeUntilはノーティファイア(notifier)と呼ばれる別のストリームからの値を受けとることがトリガーとなり、メインのストリームが終了します。

Angularサービスとして考えたときに、
ngOnDestroyメソッドにnotifierを仕込んでおくとサービス終了時にストリームの消し忘れ防止になります。

            
            const { timer, switchMap, retry, share, Subject } = require('rxjs');

//...中略

private stopPolling = new Subject();

ngOnDestroy() {
    this.stopPolling.next(null);
}
        

これでサービスが消滅した時にサブジェクトにnullが送られて、それがnotifierとなって定期実行のストリームを止めてくれます。

で、肝心のtakeUntilオペレーターの仕込みですが、パイプの最後に付けて利用します。

            
            const { timer, switchMap, retry, share, Subject, takeUntil } = require('rxjs');

//...中略

private stopPolling = new Subject();

this.myInterval$ = timer(1, 5000).pipe(
    switchMap(_ => http.get('https://hoge.com/api')),
    retry(),
    share(),
    takeUntil(this.stopPolling)
);
        

これでnotifierが呼び出された任意のタイミングでタイマーを終了することが出来ます。

以上からAngularサービスとしての実装を一通りまとめてみると、

            
            import { Injectable, OnDestroy } from '@angular/core';
import { HttpClient } from '@angular/common/http';
import {
    Observable, timer, Subject,
    switchMap, share, retry, takeUntil
} from 'rxjs';

@Injectable({
    providedIn: 'root'
})
export class HogeService implements OnDestroy {
    private myInterval$: Observable<any>;
    private stopNotifier = new Subject();

    constructor(private http: HttpClient) {
        this.myInterval$ = timer(1, 5000).pipe(
            switchMap(_ => http.get<any>('https://hoge.com/api')),
            retry(),
            share(),
            takeUntil(this.stopNotifier)
        );
    }

    hogeInfo(): Observable<any> {
        return this.myInterval$;
    }

    ngOnDestroy() {
        this.stopNotifier.next(null);
    }
}
        
という感じに書き出せます。めでたしめでたし。


参考サイト

How to do polling with RxJs and Angular?

記事を書いた人

記事の担当:taconocat

ナンデモ系エンジニア

主にAngularでフロントエンド開発することが多いです。 開発環境はLinuxメインで進めているので、シェルコマンドも多用しております。 コツコツとプログラミングするのが好きな人間です。