[Rxjs] deferでasync/await関数からObservableへ変換する


2020/07/01
2021/01/26

以前、
deferとfromでPromiseをObservableへ変換するときの注意点と題して、Promiseを返す関数からdeferfromを使ってObservableに変換する方法を解説したことがあります。

今回は続編として、
asycn/awaitで実装された関数をdeferでObservableに変換する方法をやってみます。


TL;DR

まずは結論からいきますと、

            
            // hoge()がasyncで実装された何かの関数
const obs$ = defer(async () => await hoge());

// obs$はObserbable<any>として、Promise内部で処理された
// awaitした変数を返す
obs$.subscribe(response => console.log(response));
        
としてObservableに変換することが可能です。

また引数付きのasync関数をObservable化したい場合には、

            
            // hoge(_arg)がasyncで実装された引数_arg付きの何かの関数
const obs$ = (_arg: any) => defer(async () => await hoge(_arg));

// 引数付きobs$はObserbable<any>として、Promise内部で処理された
// awaitした変数を返す
const hoge_arg = {name: 'hogehoge'};
obs$(hoge_arg).subscribe(response => console.log(response));
        
という風にも使うことができます。

以降では例を挙げながら、aync関数からObserbavleに変換できるまでの仕組みを解説していきます。


async関数をdeferを使ってObservableに書き換えるまでの出来るだけ略さない実装

まずは簡単な例をとり、出来るだけアロー関数などの楽ちんなショートハンドテクニックを使わないように実装からやってみます。

            
            import { Observable, defer } from 'rxjs';

function deferFromAsync$(): Observable<any> {

    // 内部のasync関数
    async function currentTime() {
        return new Date();
    };

    async function futureDate() {
        // async関数であるcurrentTime()の返り値が解決されるまで待機
        const localdate = await currentTime();
        return `The date after 100 years will be ${localdate.getFullYear() + 100}/${localdate.getMonth() + 1}/${localdate.getDate()}`;
    }

    // deferは更にasync関数であるfutureDate()の返り値が解決されるまで待機する
    // ようなObservableを作成して返す
    return defer(async function() {
        return await futureDate();
    });
    //👆return defer(async () => await futureDate()); と同じ
}

const date = new Date();
console.log(`Now the date is ${date.getFullYear()}/${date.getMonth() + 1}/${date.getDate()}`);

deferFromAsync$().subscribe(res => console.log(res));
        
このコードをビルド後に実行すると、

            
            $ node dist/index.js
Now the date is 2020/7/1
The date after 100 years will be 2120/7/1
        
というように、async/awaitされたObservableが正しく動作していると思います。

実は上のコードは少し助長で、用法的に回りくどく書きました。

ここで仕組みを理解するのに重要なことは、deferの引数に指定できるのは
Promise型のオブジェクトを返す関数であることです。

つまり、暗黙的にPromise型のインスタンスを返す
async関数はdeferの引数とすることができるので、以下のようにasync修飾した関数の参照を取れば、

            
            function deferFromAsync$() : Observable<any> {

    async function currentTime() {
        return new Date();
    };

    async function futureDate() {
        const localdate = await currentTime();
        return `The date after 100 years will be ${localdate.getFullYear() + 100}/${localdate.getMonth() + 1}/${localdate.getDate()}`;
    }

    // 👇deferの引数にasync関数の参照を指定
    return defer(futureDate);
}
        
とすることでもasync () => await futureDate()という関数の再定義のような引数にしてあげなくても正常に動作させることも可能です。

...ということを踏まえると、もう既にここまででdeferに入れる引数の中身を理解された方には、もはや仲介する関数(上のコードでいうところの
futureDate)をわざわざ明示に書く必要すら無いことに気づかれると思います。

つまりは、以下のdeferの中に中間関数を定義してもOKです。

            
            function deferFromAsync$() : Observable<any> {

    async function currentTime() {
        return new Date();
    };

    return defer(async () => {
        const localdate = await currentTime();
        return `The date after 100 years will be ${localdate.getFullYear() + 100}/${localdate.getMonth() + 1}/${localdate.getDate()}`;
    });

}
        
という風にも書けます。

もっとモダンな
rxjs的な書き方をすると、

            
            import { map } from 'rxjs/operators';

function deferFromAsync$() : Observable<any> {

    async function currentTime() {
        return new Date();
    };

    return defer(async () => await currentTime()).pipe(
        map(localdate => `The date after 100 years will be ${localdate.getFullYear() + 100}/${localdate.getMonth() + 1}/${localdate.getDate()}`)
    );
}
        
のように、deferの後にパイプオペレーターを使って処理を続けることもできます。


引数ありのasync関数をdefer変換する方法

先ほどの例で、引数の無いasync関数(Promise型を返す関数)の場合にObservable型に変換する方法を説明しました。

実際には、引数をとるasync関数の方が多いと思いますので、上の実装例だけですと不十分です。

上の例から
deferで引数ありのasync関数をObservable型に変換しようと思うと、以下のように修正します。

            
            import { Observable, defer } from 'rxjs';

// 👇引数をもつasync関数
async function futureTime(localdate: any) {
    return `The date after 100 years will be ${localdate.getFullYear() + 100}/${localdate.getMonth() + 1}/${localdate.getDate()}`;
};

function deferFromAsync$(arg: any) : Observable<any> {
    // 👇引数argは外部async関数futureTimeに関数内で渡される
    return defer(async function() {
        return await futureTime(arg);
    });
}

const date = new Date();
console.log(`Now the date is ${date.getFullYear()}/${date.getMonth() + 1}/${date.getDate()}`);

deferFromAsync$(date).subscribe(res => console.log(res));
        
これをビルドして実行しても正常に動作します。

deferで引数ありのasync関数をObservable変換する際の注意としては、deferの引数の関数の引数(asyncの直後の
())に指定してはいけません。

ということを踏まえた上で、上記の
deferFromAsync$の部分をもう少しモダンなES6的用法だとワンライナーで、

            
            export const deferFromAsync$ = (arg: any) : Observable<any> => defer(async () => await futureTime(arg));
        
と書くことができます。


利用例① ~ switchMapを利用する

switchMapでも、PromiseからObservableにストリーム変換することができます。

            
            import { Observable, of } from 'rxjs';
import { switchMap } from 'rxjs/operators';

async function futureLocalTime(localdate: any) {
    return `The date after 100 years will be ${localdate.getFullYear() + 100}/${localdate.getMonth() + 1}/${localdate.getDate()}`;
};

function switchMapFromPromise$(arg: any) : Observable<any> {
    return of(arg).pipe(switchMap(arg_ => futureLocalTime(arg_)));
    // 👇もしくは関数の参照を利用する場合でもOK
    // return of(arg).pipe(switchMap(futureLocalTime));
}

const date = new Date();
console.log(`Now the date is ${date.getFullYear()}/${date.getMonth() + 1}/${date.getDate()}`);
switchMapFromPromise$(date).subscribe(res => console.log(res));
        
でビルド後に実行しても正常に動作します。

deferを利用した実装と比べて、switchMap単体ではObservableストリームを生成出来ないので、ofなどのcreate系メソッドでpipeする必要があるため多少助長気味な書き方かもしれません。

switchMapに限らず、Map系のオペレーターの一部、take系の一部でもPromiseをObservableに読み変えてくれるメソッドがありますので、詳しくは公式APIリファレンスを参照してください。


利用例② ~ puppeteerのページ遷移をrxjsで制御する

これはpuppeteerなどに限りませんが、async/awaitチェーンで高度な同期をとる事をベースに設計されているjsライブラリをrxjsで操作する際に有効なテクニックとなります。

以下の例では、ウェブサイトにまたがった複数ページをリンクを辿ってなんらかの処理をさせているコードになります。

            
            import { Observable, forkJoin, from, defer } from 'rxjs';
import { map, concatMap, delay } from 'rxjs/operators';

import puppeteer from 'puppeteer';

function multiNavigate$(page: puppeteer.Page, url: string): Observable<any> {

    const nav$ = (url_: string) => defer(async () => await page.goto(url_));

    const wait$ = from(page.waitForNavigation({timeout: 5000, waitUntil: "networkidle0"}));

    const do_something$ = defer(async () => await page.$eval('[セレクタ]', e => e.innerHTML)).pipe(
        map((res: string) => {
            if (res) {
                /// 現在のページにあるセレクタから次の遷移先のリンクを取得し、後段にそのurlを流す
                return '取得したURLアドレス';
            } else {
                return '';
            }
        })
    );

    const capture$ = defer(() => page.screenshot({path: `./capture_page.png`}));

    //👇①同期処理しながら指定先のリンク先のページへ3回遷移して画面をキャプチャさせる
    return forkJoin([wait$, nav$(url)]).pipe(
        delay(1000),
        concatMap(_ => do_something$.pipe(
                concatMap(url2 => forkJoin([wait$, nav$(url2)]).pipe(
                    delay(1000),
                    concatMap(_ => do_something$.pipe(
                        concatMap(url3 => forkJoin([wait$, nav$$(url3)]).pipe(
                            concatMap(_ => capture$)
                        ))
                    ))
                ))
            )
        ),
    );
}

(async () => {
    const browser = await puppeteer.launch({
        executablePath: '/usr/bin/chromium-browser',
        args: ['--disable-dev-shm-usage', '--no-sandbox']
    });

    try {
        const page: puppeteer.Page = await browser.newPage();
        multiNavigate$(page, 'https://www.hoge.piyo.co.jp').subscribe();
    } catch (e) {
        throw e;
    } finally {
        await browser.close();
    }
})();
        
上記のコードの①の部分で、async関数からObserbavleに変換したnav$(URL)を再利用している箇所で今回のテクニックを使っています。


ObservableからPromiseを行う逆変換 ~ toPromise

どうしてもObservableからPromiseを返したいケースもあるかもしれません。

そんなときは
toPromiseを利用することで、ObservableのストリームからPromiseへ値を受け渡すことが可能です。

            
            import { of } from 'rxjs';

function toPromise$() : Promise<any> {
    return of(new Date()).toPromise();
}

toPromise$().then(res => console.log(res));
        
ビルド後に実行すると、

            
            % node dist/index.js
2020-07-01T10:28:31.567Z
        
現行のRxjs6では、toPromiseはObservableクラスのメンバユーティリティーメソッドになっています。deferと同様にtoPromiseもいざと言う時に役に立つと思います。


fromではasync/awaitの変換はダメなのか

APIリファレンスでもあるように、fromは引数として実体のあるPromiseのインスタンスをとり、他方、deferはPromiseを返す関数を引数でとります。

以前の記事で
fromとdeferの違いの考察で解説したように、fromだと内部のPromiseで値がawaitされて返ってくる前に処理されてしまう恐れがあります。async/awaitを使った非同期処理している関数をObservableへ変換したい場合には、deferを利用する方が適切です。


まとめ

以上、今回のお話のキモはasync/awaitをObservableに変換する場合にはdeferを利用すると良い、という内容でした。

RxjsでFetch APIやTensorflowjsのコードを系統的に実装したい場合、実用度の高いテクニックになると思います。
記事を書いた人

記事の担当:taconocat

ナンデモ系エンジニア

主にAngularでフロントエンド開発することが多いです。 開発環境はLinuxメインで進めているので、シェルコマンドも多用しております。 コツコツとプログラミングするのが好きな人間です。