NHN Cloud Meetup 編集部
Redux-Saga:Channel
2018.03.28
839
今回は、Redux-SagaのChannel(以下チャネル)について紹介します。チャネルは、Redux-Sagaの初期のバージョンから検討していたAPIではありません。ところが、Stackoverflowで外部イベントをSagaとどのように接続するかという質問から、チャネルの必要性が表面的になり始めました。核になるのは、Push基盤とPull基盤の動作です。
WebSocketのような外部イベントは、一般的にリスナーを登録するon(type, listener) 形態のPush基盤でロジックを作成します。しかし、Redux-Sagaはtake(pattern)形態のアクションをドラッグするPull基盤のロジックを作成します。
チャネルは、このように異なるの2つの方法をうまく接続してくれます。
PushとPull
PushとPullは非常に密接で、日常生活にも自然に溶け込んでいます。携帯電話を例にあげよう。じっとていても携帯電話には、天気予報、ニュース、メールなどの通知が絶え間なく流れ込みます。まさにPush基盤の動作です。反対の例としてはウェブサーフィンがあげられます。Webブラウザを起動して、ほしい情報をサーバーに要請し、サーバーは要請された情報を返します。Pull基盤の動作である。違いは情報の流れを誰が制御するかにあります。
SagaのWebSocketのような外部イベントを接続するには、それらのPush動作をPull動作に変更することで可能です。まず簡単に以下のようなコードが考えられます。
Push処理
以下は、一般的によく使用されるリスナー型のイベント処理コードです。
function pushHandle(message) { // ... } socket.on('message', pushHandle);
Push – > Pull処理
このイベント処理コードをPull動作と同じ形に変更します。ポイントは、メッセージを保存して必要なときに参照する点です。
let receivedMessage; socket.on('message', (message) => { receivedMessage = message; }); function pullHandle() { // receivedMessageを保存して処理 } // ... // 必要なときにpullHandleを呼び出す
単純なPull処理には問題がある
一度保存すると、後で必要になったとき参照して(Pull)使用できます。しかし問題があります。メッセージがいつ到着するか分かりません。Pushに該当するpushHandleには、常にメッセージが存在することの保証がありますが、pushHandleでは、メッセージがある場合とない場合があります。
function pullHandle() { if (!receivedMessage) { // ... メッセージがまだ存在しない。失敗 } else { // receivedMessageを保存して処理 } }
このように、常に失敗処理を含めるのは非効率的で、メンテナンスが難しいでしょう。より良い方法を探す必要があります。
Deferredを活用したPull message
上記問題を解決するのに、Deferredが鍵となります。DeferredはPromiseと非常に緊密なオブジェクトです。Deferredが何かよく分からない場合は、jQuery 1.5から登場したDeferredを見てみよう。簡単な概念だけ説明すると、Promiseは約束を作る者が約束を守ります。一方でDeferredは、単に契約書だけを作成します。Deferredの契約を守る義務は他人が持てます。
DeferredをPull処理に適用すると、次のようなPromiseを持つことができます。
let deferred; socket.on('message', (message) => { if (deferred) { deferred.resolve(message); deferred = null; } }); function pullMessage() { if (!deferred) { deferred = {}; deferred.promise = new Promise((resolve) => { deferred.resolve = resolve; }); } return deferred.promise; } function pullHandle() { pullMessage().then((message) => { // メッセージを保存して処理 }); }
これでメッセージの到着をpullMessage関数で常に保証することができます。非同期的なコードであるものは変わっていませんが、Sagaのエフェクトと一緒に使用する場合は違ってきます。pullMessage関数は、SagaのCallエフェクトと使用する場合、Push基盤の非同期的なロジックをPull基盤の同期的なロジックで処理することができます。
function* pullHandle() { const message = yield call(pullMessage); // メッセージ処理 };
チャネル
このようにPush動作をPull動作に変えてみると、一般化することができないかという疑問が残ります。Redux-Sagaを使用する様々な開発者は、CSP(Communicating Sequential Processes)のチャネルを提案しました。CSPにはコアとなるプロセスとチャネルという2つの抽象化があります。これはSagaによく合う抽象化です。
- プロセス:同時的(concurrently)に実行される独立した作業である。各コードは順次的(sequential)に処理されます。
- チャネル:FIFOのキュー。各プロセスはチャネルを介してデータをやりとりして通信する。チャネルにput演算でデータを追加し、take演算でデータを持ってくる。
上記のプロセスは、各Sagaに符合します。メンテナであるyelouafiは、このようなCSPを部分的に借用してRedux-SagaのチャネルAPIを設計しました。(最も初期のチャネル実装は、v0.10.0のchannel.jsで確認できます。)
チャネルを簡単に表示:Upload progress
サービス開発においてRedux-Sagaを使ってファイルをアップロードする機能があると仮定しよう。おそらくアップロードのプログレス表示に悩むことになるでしょう。次のようにstore.dispatchを直接呼び出すことも考えられます。
function* uploadSaga() { const xhr = new XMLHttpRequest(); xhr.addEventListener('progress', ev => { // ... store.dispatch(update({progress: ev.loaded / ev.total})); }); //... }
ストアへの依存性を追加してstore.dispatchを直接呼び出します。ところが大きな問題があります。Sagaではなく外部のスコープで関数が分離されました。ジェネレーター(正確にはRedux-Sagaミドルウェア)の処理フローから外れるため、様々なエフェクトと組み合わせて豊富なロジックが構成できません。
ここで、チャネルを利用してみよう。
function* uploadSaga() { const progressChan = yield call(channel); const xhr = new XMLHttpRequest(); const putProgressToChannel = ({loaded, total}) => progressChan.put(loaded/total)); xhr.addEventListener('progress', putProgressToChannel); yield fork(function* () { const progress = yield take(progressChan); // ストアへの直接的な依存性がなくなった // take, callなど様々なエフェクトとロジックを実装できる yield put(update({progress})); }); }
外部イベントの接続:WebSocket
WebSocketとの接続については、以前の記事で紹介したことがあります。そこではチャネル自体の説明は詳細に行わず、チャネルのAPIを活用する方法を中心に説明しました。今回の記事を読んでから、もう一度前の記事を読むと理解しやすいでしょう。
同時作業:Load-balancing
チャネルは必ず外部イベントにのみ接続させるということはありません。チャネルは基本的に、各プロセス間の通信のために抽象化された概念です。そのためチャネルを外部イベントに接続するのではなく、同じタスクを同時に複数実行させながら、それらのロードバランシングを目的に使用することもできます。
例えば、画像のサムネイルを要請する作業を考えてみよう。
- サムネイルを作成する画像は、数百/数千個ある
- 可能な限り迅速にユーザーに見せたいが、一度に数百/数千の要請を処理するには無理がある
- 3つ程度のWorkerを作り、競って画像のサムネイルを要請するようにする
簡単に考えると、バッファとカウント変数を持って処理することもできそうです。しかし実装してみると分かるが、あまりきれいではありません。現在動作中のWorker数、バッファへの追加要請の有無を毎回確認して処理する必要があります。もし条件チェックを間違えたり、カウントを誤って計算すれば、多くのバグを誘発しかねません。
このようにすると、チャネルを通じて安全でより素晴らしい実装ができます。
const NUMBER_OF_WORKERS = 3; function* requestWatcher() { const chan = yield call(channel); for (let i = 0; i < NUMBER_OF_WORKERS; i += 1) { yield fork(requestWorker, chan); } while (true) { const {payload} = yield take(REQUEST_THUMBNAIL); yield put(chan, payload); } } function* requestWoker(chan) { while (true) { const payload = yield take(chan); // サーバーにサムネイル要請、応答処理 } }
各Workerは独立的に実行されるでしょう。それぞれの処理速度は重要ではありません。休んでいるWorkerたちはすぐに次の要請を処理します。Workerを動的に生成しなくてもよいのです。
さらに詳しく – API(v0.16.0)
Reudx-Sagaが提供するチャネルとバッファはAPIドキュメントによく書かれています。以下の説明は、0.16.0バージョンを基準とします。
- Channel Interface
- channel:チャネル・インターフェースを実装したファクトリ関数。チャネルは一般的なメッセージをput、takeすることができ、バッファを適用できる。
- eventChannel:イベントチャネルファクトリー関数。イベントチャネルは外部イベントとの接続を担う。Subscribe / Unsubscribeができ、バッファやMatcher(=フィルタ)を適用できる。
- エフェクト- actionChannel:アクションチャネルエフェクト。アクションチャネルは、特定のアクションだけを受けるチャネルに制限し、バッファを適用できる。
- Buffer Interface:チャネルに適用されるバッファインターフェースでRedux-Sagaは5つのバッファを提供する。
さらに詳しく – v1.0.0チャネルプレビュー
おそらく、1.0.0バージョンからチャネルはmulticast、unicastの2つがあるでしょう。multicastが新たに追加されたチャネルです。
- multicast:バッファがなく、Matcherを処理できる。-> 待機中のtakerはすべてのメッセージを受信するため、バッファは必要ない。
- unicast:バッファがあり、Matcherを処理できない。既存のchannelやeventChannelは、いずれもMatcherが消える。
そのため前述した「外部イベントの接続:WebSocket」のうち、Matcherを処理する部分は、現在使用してはいけません。multicastのチャネルのMatcherや、一般的なeventChannel
のSubscribe関数内部で十分に処理できます。
v1.0.0のチャネルは、Redux-Saga#PR 824とリリースノートで、詳しい内容を確認できます。
おわりに
Redux-Sagaのチャネルは外部イベントとの接続のために開発を始めました。そしてその内容はCSPのチャネルと非常に似ています。それでもBlocking-Putのような動作は正式には対応しません。理由としては、この機能が必要な場合があまりなく、デッドロックを引き起こす可能性があるためです。また実装がより複雑になり、APIを理解する難易度がより高くなります。