カテゴリー
【Rxjs基礎講座】deferでasync/await関数からObservableへ変換する
※ 当ページには【広告/PR】を含む場合があります。
2020/07/01
2022/10/05
前回、
defer
from
今回は続編として、
asycn/await
defer
TL;DR
まずは結論からいきますと、
// hoge()がasyncで実装された何かの関数
const obs$ = defer(async () => await hoge());
// obs$はObserbable<any>として、Promise内部で処理された
// awaitした変数を返す
obs$.subscribe(response => console.log(response));
としてObservableに変換することが可能です。
また引数付きのasync関数をObservable化したい場合には、
// hoge(_arg)がasyncで実装された引数_arg付きの何かの関数
const obs$ = (_arg: any) => defer(async () => await hoge(_arg));
// 引数付きobs$はObserbable<any>として、Promise内部で処理された
// awaitした変数を返す
const hoge_arg = {name: 'hogehoge'};
obs$(hoge_arg).subscribe(response => console.log(response));
という風にも使うことができます。
以降では例を挙げながら、aync関数からObserbavleに変換できるまでの仕組みを解説していきます。
async関数をdeferを使ってObservableに書き換えるまでの出来るだけ略さない実装
まずは簡単な例をとり、出来るだけアロー関数などの楽ちんなショートハンドテクニックを使わないように実装からやってみます。
import { Observable, defer } from 'rxjs';
function deferFromAsync$(): Observable<any> {
// 内部のasync関数
async function currentTime() {
return new Date();
};
async function futureDate() {
// async関数であるcurrentTime()の返り値が解決されるまで待機
const localdate = await currentTime();
return `The date after 100 years will be ${localdate.getFullYear() + 100}/${localdate.getMonth() + 1}/${localdate.getDate()}`;
}
// deferは更にasync関数であるfutureDate()の返り値が解決されるまで待機する
// ようなObservableを作成して返す
return defer(async function() {
return await futureDate();
});
//👆return defer(async () => await futureDate()); と同じ
}
const date = new Date();
console.log(`Now the date is ${date.getFullYear()}/${date.getMonth() + 1}/${date.getDate()}`);
deferFromAsync$().subscribe(res => console.log(res));
このコードをビルド後に実行すると、
$ node dist/index.js
Now the date is 2020/7/1
The date after 100 years will be 2120/7/1
というように、
async/await
実は上のコードは少し助長で、用法的に回りくどく書きました。
ここで仕組みを理解するのに重要なことは、deferの引数に指定できるのは
Promise型のオブジェクトを返す関数
つまり、暗黙的にPromise型のインスタンスを返す
async関数
function deferFromAsync$() : Observable<any> {
async function currentTime() {
return new Date();
};
async function futureDate() {
const localdate = await currentTime();
return `The date after 100 years will be ${localdate.getFullYear() + 100}/${localdate.getMonth() + 1}/${localdate.getDate()}`;
}
// 👇deferの引数にasync関数の参照を指定
return defer(futureDate);
}
とすることでも
async () => await futureDate()
...ということを踏まえると、もう既にここまででdeferに入れる引数の中身を理解された方には、もはや仲介する関数(上のコードでいうところの
futureDate
つまりは、以下のdeferの中に中間関数を定義してもOKです。
function deferFromAsync$() : Observable<any> {
async function currentTime() {
return new Date();
};
return defer(async () => {
const localdate = await currentTime();
return `The date after 100 years will be ${localdate.getFullYear() + 100}/${localdate.getMonth() + 1}/${localdate.getDate()}`;
});
}
という風にも書けます。
もっとモダンな
rxjs
import { map } from 'rxjs/operators';
function deferFromAsync$() : Observable<any> {
async function currentTime() {
return new Date();
};
return defer(async () => await currentTime()).pipe(
map(localdate => `The date after 100 years will be ${localdate.getFullYear() + 100}/${localdate.getMonth() + 1}/${localdate.getDate()}`)
);
}
のように、deferの後にパイプオペレーターを使って処理を続けることもできます。
引数ありのasync関数をdefer変換する方法
先ほどの例で、引数の無いasync関数(Promise型を返す関数)の場合にObservable型に変換する方法を説明しました。
実際には、引数をとるasync関数の方が多いと思いますので、上の実装例だけですと不十分です。
上の例から
defer
import { Observable, defer } from 'rxjs';
// 👇引数をもつasync関数
async function futureTime(localdate: any) {
return `The date after 100 years will be ${localdate.getFullYear() + 100}/${localdate.getMonth() + 1}/${localdate.getDate()}`;
};
function deferFromAsync$(arg: any) : Observable<any> {
// 👇引数argは外部async関数futureTimeに関数内で渡される
return defer(async function() {
return await futureTime(arg);
});
}
const date = new Date();
console.log(`Now the date is ${date.getFullYear()}/${date.getMonth() + 1}/${date.getDate()}`);
deferFromAsync$(date).subscribe(res => console.log(res));
これをビルドして実行しても正常に動作します。
deferで引数ありのasync関数をObservable変換する際の注意としては、deferの引数の関数の引数(asyncの直後の
()
ということを踏まえた上で、上記の
deferFromAsync$
export const deferFromAsync$ = (arg: any) : Observable<any> => defer(async () => await futureTime(arg));
と書くことができます。
利用例① ~ switchMapを利用する
switchMapでも、PromiseからObservableにストリーム変換することができます。
import { Observable, of } from 'rxjs';
import { switchMap } from 'rxjs/operators';
async function futureLocalTime(localdate: any) {
return `The date after 100 years will be ${localdate.getFullYear() + 100}/${localdate.getMonth() + 1}/${localdate.getDate()}`;
};
function switchMapFromPromise$(arg: any) : Observable<any> {
return of(arg).pipe(switchMap(arg_ => futureLocalTime(arg_)));
// 👇もしくは関数の参照を利用する場合でもOK
// return of(arg).pipe(switchMap(futureLocalTime));
}
const date = new Date();
console.log(`Now the date is ${date.getFullYear()}/${date.getMonth() + 1}/${date.getDate()}`);
switchMapFromPromise$(date).subscribe(res => console.log(res));
でビルド後に実行しても正常に動作します。
deferを利用した実装と比べて、switchMap単体ではObservableストリームを生成出来ないので、ofなどのcreate系メソッドでpipeする必要があるため多少助長気味な書き方かもしれません。
switchMapに限らず、Map系のオペレーターの一部、take系の一部でもPromiseをObservableに読み変えてくれるメソッドがありますので、詳しくは公式APIリファレンスを参照してください。
利用例② ~ puppeteerのページ遷移をrxjsで制御する
これはpuppeteerなどに限りませんが、async/awaitチェーンで高度な同期をとる事をベースに設計されているjsライブラリをrxjsで操作する際に有効なテクニックとなります。
以下の例では、ウェブサイトにまたがった複数ページをリンクを辿ってなんらかの処理をさせているコードになります。
import { Observable, forkJoin, from, defer } from 'rxjs';
import { map, concatMap, delay } from 'rxjs/operators';
import puppeteer from 'puppeteer';
function multiNavigate$(page: puppeteer.Page, url: string): Observable<any> {
const nav$ = (url_: string) => defer(async () => await page.goto(url_));
const wait$ = from(page.waitForNavigation({timeout: 5000, waitUntil: "networkidle0"}));
const do_something$ = defer(async () => await page.$eval('[セレクタ]', e => e.innerHTML)).pipe(
map((res: string) => {
if (res) {
/// 現在のページにあるセレクタから次の遷移先のリンクを取得し、後段にそのurlを流す
return '取得したURLアドレス';
} else {
return '';
}
})
);
const capture$ = defer(() => page.screenshot({path: `./capture_page.png`}));
//👇①同期処理しながら指定先のリンク先のページへ3回遷移して画面をキャプチャさせる
return forkJoin([wait$, nav$(url)]).pipe(
delay(1000),
concatMap(_ => do_something$.pipe(
concatMap(url2 => forkJoin([wait$, nav$(url2)]).pipe(
delay(1000),
concatMap(_ => do_something$.pipe(
concatMap(url3 => forkJoin([wait$, nav$$(url3)]).pipe(
concatMap(_ => capture$)
))
))
))
)
),
);
}
(async () => {
const browser = await puppeteer.launch({
executablePath: '/usr/bin/chromium-browser',
args: ['--disable-dev-shm-usage', '--no-sandbox']
});
try {
const page: puppeteer.Page = await browser.newPage();
multiNavigate$(page, 'https://www.hoge.piyo.co.jp').subscribe();
} catch (e) {
throw e;
} finally {
await browser.close();
}
})();
上記のコードの①の部分で、async関数からObserbavleに変換した
nav$(URL)
ObservableからPromiseを行う逆変換 ~ toPromise
どうしてもObservableからPromiseを返したいケースもあるかもしれません。
そんなときは
import { of } from 'rxjs';
function toPromise$() : Promise<any> {
return of(new Date()).toPromise();
}
toPromise$().then(res => console.log(res));
ビルド後に実行すると、
% node dist/index.js
2020-07-01T10:28:31.567Z
現行のRxjs6では、
toPromise
fromではasync/awaitの変換はダメなのか
APIリファレンスでもあるように、
以前の記事で
async/await
まとめ
以上、今回のお話のキモは
async/await
defer
RxjsでFetch APIやTensorflowjsのコードを系統的に実装したい場合、実用度の高いテクニックになると思います。
記事を書いた人
ナンデモ系エンジニア
主にAngularでフロントエンド開発することが多いです。 開発環境はLinuxメインで進めているので、シェルコマンドも多用しております。 コツコツとプログラミングするのが好きな人間です。
カテゴリー