カテゴリー
【Angular活用講座】Rxjsでエラーレスポンスを上手にリトライするためのサービスを作る
※ 当ページには【広告/PR】を含む場合があります。
2022/05/10
2022/10/05
不定期で紹介しているAngularの実装に便利な
今回はタイトルのように
Rxjsで定期実行スケジューラーの作成する
いくつか実装のポイントがありますので、順番に解説していきます。
intervalオペレーターとtimerオペレーター
Rxjsの一定時間間隔のインターバル処理で簡単に使えるのは、主に
intervalオペレーターはJavascriptでいうところの、setInterval関数に当たるものです。
setInterval(() => {
//...定期実行したい処理を記述
}, 1000);
intervalオペレーターで定期実行する際に悩ましい点として、ストリームを流し始めの最初の値も待機してから実行されます。 つまり、1秒間隔なら最初の値も1秒待ってしまうのです。
これだとストリームを開始したら即時処理が始まってほしい時に困ります。
これを解決するのが(引数2つを渡す時の)timerオペレータです。 最初の引数で実行開始時間を、2つ目でそれ以降の定期実行される時間間隔を指定します。
例えば以下は1ミリ秒経過後に最初の値を流して、それから5秒ごとに値を流しています。
const { timer } = require('rxjs');
const timer$ = timer(1, 5000);
timer$.subscribe((val) => console.log(val));
定期実行させる中身はswitchMapなどのMap系オペレータを利用し、別のストリームから利用します。
Map系オペレーターの微妙な作用の違いは以前の記事で取り上げていましたので、詳しいことはそちらでご確認ください。
例えば、HTTPモジュールのgetメソッドから、どこかのWebAPIエンドポイントを定期間隔で非同期で叩きたい場合には、
const { timer, switchMap } = require('rxjs');
//...中略
this.myInterval$ = timer(1, 5000).pipe(
switchMap(_ => http.get('https://hoge.com/api'))
);
と言うような感じで使います。
ストリームをフェイルセーフにする
必ずしもそうではないのですが、処理がコケる可能性のある不安がある場合には、保険として
先ほどの例だと、
const { timer, switchMap, retry } = require('rxjs');
//...中略
this.myInterval$ = timer(1, 5000).pipe(
switchMap(_ => http.get('https://hoge.com/api')),
retry()
);
という感じに後段にretryを挟むと、上流のストリームで処理エラーを検知した場合にretryオペレータが再度処理を行います。
shareで全てのオブザーバーへブロードキャストする
shareオペレーターで
const { timer, switchMap, retry, share } = require('rxjs');
//...中略
this.myInterval$ = timer(1, 5000).pipe(
switchMap(_ => http.get('https://hoge.com/api')),
retry(),
share()
);
とすると、ホットストリーム化できて、流された値を常に共有することができます。
定期実行を止める
通常intervalもtimerも流しっぱなしですので、ストリームを止めるには
unsubscribe()
とはいえ、
unsubscribe()
不要になった場合は即座に
unsubscribe()
takeUntilはノーティファイア(notifier)と呼ばれる別のストリームからの値を受けとることがトリガーとなり、メインのストリームが終了します。
Angularサービスとして考えたときに、
ngOnDestroy
const { timer, switchMap, retry, share, Subject } = require('rxjs');
//...中略
private stopPolling = new Subject();
ngOnDestroy() {
this.stopPolling.next(null);
}
これでサービスが消滅した時にサブジェクトにnullが送られて、それがnotifierとなって定期実行のストリームを止めてくれます。
で、肝心のtakeUntilオペレーターの仕込みですが、パイプの最後に付けて利用します。
const { timer, switchMap, retry, share, Subject, takeUntil } = require('rxjs');
//...中略
private stopPolling = new Subject();
this.myInterval$ = timer(1, 5000).pipe(
switchMap(_ => http.get('https://hoge.com/api')),
retry(),
share(),
takeUntil(this.stopPolling)
);
これでnotifierが呼び出された任意のタイミングでタイマーを終了することが出来ます。
以上からAngularサービスとしての実装を一通りまとめてみると、
import { Injectable, OnDestroy } from '@angular/core';
import { HttpClient } from '@angular/common/http';
import {
Observable, timer, Subject,
switchMap, share, retry, takeUntil
} from 'rxjs';
@Injectable({
providedIn: 'root'
})
export class HogeService implements OnDestroy {
private myInterval$: Observable<any>;
private stopNotifier = new Subject();
constructor(private http: HttpClient) {
this.myInterval$ = timer(1, 5000).pipe(
switchMap(_ => http.get<any>('https://hoge.com/api')),
retry(),
share(),
takeUntil(this.stopNotifier)
);
}
hogeInfo(): Observable<any> {
return this.myInterval$;
}
ngOnDestroy() {
this.stopNotifier.next(null);
}
}
という感じに書き出せます。 めでたしめでたし。
参考サイト
記事を書いた人
ナンデモ系エンジニア
主にAngularでフロントエンド開発することが多いです。 開発環境はLinuxメインで進めているので、シェルコマンドも多用しております。 コツコツとプログラミングするのが好きな人間です。
カテゴリー