コンテンツにスキップ

Sync Facade over Async Core|非同期コアの同期ファサード

一言で(TL;DR)

内部は常に非同期パイプラインで処理し、外向きAPIは閾値時間内に完了すれば同期レスポンスを返し、超過すればジョブIDを返して非同期に昇格します。呼び出し側はレイテンシの二峰性を意識せず、統一エンドポイントを叩くことができます。

解決する問題

エージェント処理は所要時間が長く(F1)、かつレイテンシのばらつきが大きい(F12)という特性があります。キャッシュヒットや軽量タスクなら数百ミリ秒で返せますが、複雑な推論やツール連鎖を伴うと数十秒以上に伸びます。さらにプロバイダ可用性の揺れ(F7)がテールレイテンシを増幅します。

この二峰分布に対して「常に同期」か「常に非同期」の二択を取ると問題が起きます。常に同期ならタイムアウトやコネクション占有が発生し、常に非同期なら数百ミリ秒で返せるケースでも呼び出し側にポーリングを強制してしまいます。本パターンは内部を非同期に統一しつつ、外向きインタフェースを閾値で自動切替することで、速いときは速く返し、遅いときは壊さないという両立を実現します。

選定条件(When to use / When NOT)

  • 使う条件
    • 処理時間の分布が二峰的です(高速完了と長時間完了が混在します)。
    • 呼び出し側が同期APIを期待する既存クライアントを含みます。
    • [latency_budget] がリクエストごとに大きく異なるユースケースです(チャットUI+バッチ等)。
  • 使わない条件(=代替に倒す)
    • 処理が常に数秒以内で完了し、非同期昇格が実質発生しない場合 → A1 同期エッジ で十分です。
    • 処理が常に30秒超で、同期で返せるケースがほぼ無い場合 → 最初から A2 耐久非同期 にし、UIにはジョブIDを即返す方が単純です。
    • クライアントがWebSocket/SSEで常時接続する前提の場合 → A4 ストリーミング のみで足り、ファサードの切替は不要です。

駆動変数とチューニング(程度)

目盛り 効かなすぎ ⇔ 効きすぎ 決め方 [駆動変数] 目安(出発点)
同期待機閾値 閾値が短すぎ:ほぼ全件が非同期昇格しファサードの意味なし ⇔ 長すぎ:コネクション占有・タイムアウト 観測P95を基準に、[latency_budget] が厳しいほど短く設定 Web API 5-10秒 / 内部RPC 30秒 / Webhook 60秒
ポーリング間隔 / 通知方式 間隔が長い:完了に気づくまでの遅延 ⇔ 短すぎ:ポーリング負荷 [latency_budget] が短いほどプッシュ寄りに SSE/WebSocket推奨、フォールバックとしてポーリング3-5秒
非同期昇格時のリトライ リトライなし:一過性障害で失敗 ⇔ 過多:暴走コスト A6と連動 [latency_budget] 2-3回、指数バックオフ

値は定数でなく駆動変数の関数です。[latency_budget] が厳しい(短い)ほど同期待機閾値を下げ、早めに非同期へ昇格させてクライアントを解放します。

相反における立ち位置(相反)

  • F-1 同期 vs 非同期 → ハイブリッド。処理時間が待機耐性を超えるか否かを実行時に判定し、同一エンドポイントで自動切替します。これがA3の本質であり、A1(常に同期)とA2(常に非同期)の中間に位置します。
  • F-13 プッシュ vs プル → ハイブリッド。同期応答はプッシュ(即座に結果を返す)、非同期昇格後の進捗通知はSSE/Webhookでプッシュしつつ、ポーリングもフォールバックとして残します。

構造

flowchart LR
  C[Client] -->|POST /run| GW[API Gateway]
  GW --> D{閾値内に完了?}
  D -- Yes --> C2[200 + result]
  D -- No --> C3[202 + job_id]
  GW --> AQ[Async Queue]
  AQ --> W[Agent Worker]
  W --> RS[(Result Store)]
  RS -->|SSE / poll| C
  C3 -.->|GET /jobs/id| RS

実装メモ

ファサード層の擬似コード(同期待機閾値で分岐):

async def run_agent(request):
    job = await enqueue(request)          # 内部は常にキュー経由
    try:
        result = await asyncio.wait_for(
            job.result_future(),
            timeout=SYNC_THRESHOLD_SEC,   # 駆動変数から導出
        )
        return JSONResponse(result, status_code=200)
    except asyncio.TimeoutError:
        return JSONResponse(
            {"job_id": job.id, "poll": f"/jobs/{job.id}"},
            status_code=202,
        )

落とし穴:

  • 閾値を静的に決めないでください。P95/P99の推移をG2 トレースで観測し、適応的に調整する仕組みを入れておきます。
  • 202応答のクライアント対応を忘れがちです。OpenAPI仕様に202レスポンススキーマを明示し、SDK生成時にジョブIDハンドリングが漏れないようにしてください。
  • 同期待機中にワーカーが落ちた場合、ファサードは202に切り替えてジョブIDを返す必要があります。ワーカー障害と単なるタイムアウトを区別し、障害時は即座に昇格してください。
  • 非同期コアはA2のチェックポイント・再開機構をそのまま活用します。ファサードは薄いアダプタに留め、ビジネスロジックを持たせないようにしてください。

効かせる力学(forces)

  • F1(長時間):長くなるリクエストを非同期に逃がし、同期境界のタイムアウトを回避します。
  • F12(レイテンシばらつき):二峰分布をファサード層で吸収し、呼び出し側にはfast-path/slow-pathの統一インタフェースを提供します。
  • F7(可用性不安定):プロバイダ遅延時に同期待機で詰まらず202に昇格するため、ゲートウェイのコネクション枯渇を防ぎます。

関連・代替

  • 関連:A1(同期fast-pathの実体)、A2(非同期コアの実体 -- A3のバックエンド)、A4(進捗をストリーミングで返す通知チャネル)、A6(タイムアウト・リトライの実装)、G2(同期→非同期昇格をトレースで追跡)。
  • 代替:A1(常に短時間なら同期だけでよい)、A2(常に長時間なら最初から非同期でよい)。

コーディングエージェント向け指示(machine-actionable)

このパターンを人間に提案するなら、同時に以下を提案/確認してください:

  • [ ] 同期待機閾値を [latency_budget] から導き、なぜその値かを添えて提示したか
  • [ ] 非同期コアとして A2 耐久非同期 を併置し、チェックポイント方針を決めたか
  • [ ] 非同期昇格後の通知方式(SSE / Webhook / ポーリング)を F-13 の判定基準に沿って選んだか
  • [ ] 202応答のスキーマをAPI仕様に明示し、クライアントSDKの対応を確認したか
  • [ ] G2 トレース で同期完了率・昇格率・P95を可視化する計画を含めたか
  • [ ] 処理が常に短い/常に長いなら、A3でなく A1 / A2 に倒す判断を示したか