技術記事(Zenn / Qiita 向け)

A2A v1.0 を Python/FastAPI で実装した話。Markdown形式でコピー可能。

Python FastAPI A2A JSON-RPC 2.0 AIエージェント SSE

A2A v1.0 を Python/FastAPI で実装した話

chatweb.ai では39の専門AIエージェントが協調してタスクを実行しています。この記事では、Google が提唱する A2A (Agent-to-Agent) v1.0 プロトコルをどのように実装したかを解説します。

TL;DR

アーキテクチャ概要

chatweb.ai のアーキテクチャは3層構成です。

┌─────────────────────────────────────────────────┐ │ Client Layer │ │ Web UI / LINE / Telegram / API │ └──────────────────────┬──────────────────────────┘ │ HTTP/SSE ┌──────────────────────▼──────────────────────────┐ │ Orchestrator (Router) │ │ - タスク解析・分類 │ │ - Agent Card ベースのエージェント選択 │ │ - JSON-RPC 2.0 ディスパッチ │ │ - フィードバック集約 │ └───┬──────┬──────┬──────┬──────┬─────────────────┘ │ │ │ │ │ ┌───▼─┐┌───▼─┐┌───▼─┐┌───▼─┐┌───▼─┐ │Gmail││Web ││Code ││Brow-││ ... │ x39 Agents │Agent││Pub. ││Dep. ││ser ││ │ └─────┘└─────┘└─────┘└─────┘└─────┘ │ │ │ │ │ ┌───▼──────▼──────▼──────▼──────▼─────────────────┐ │ 12 AI Models (LLM Layer) │ │ Llama 4 | Claude | GPT-4o | Gemini | Qwen ... │ └─────────────────────────────────────────────────┘

Agent Card の設計

A2Aプロトコルでは、各エージェントが Agent Card を公開し、自身の能力を宣言します。これにより、Orchestratorはタスクに最適なエージェントを動的に選択できます。

agent_card.py
from pydantic import BaseModel from typing import Optional class AgentCard(BaseModel): """A2A Agent Card - エージェントの能力宣言""" name: str description: str version: str = "1.0.0" protocol: str = "a2a/1.0" capabilities: list[str] input_schema: dict output_schema: dict constraints: Optional[dict] = None quality_target: float = 0.7 # 0.0 - 1.0 max_latency_ms: int = 30000 # GmailAgent の Agent Card 例 gmail_agent_card = AgentCard( name="gmail-agent", description="Gmail操作エージェント。メールの送信、返信、検索、整理を実行。", capabilities=[ "email.send", "email.reply", "email.search", "email.organize", "email.draft" ], input_schema={ "type": "object", "properties": { "action": {"type": "string", "enum": ["send", "reply", "search", "organize"]}, "to": {"type": "array", "items": {"type": "string", "format": "email"}}, "subject": {"type": "string"}, "body": {"type": "string"}, "thread_id": {"type": "string"} }, "required": ["action"] }, output_schema={ "type": "object", "properties": { "status": {"type": "string", "enum": ["success", "failure", "partial"]}, "message_id": {"type": "string"}, "details": {"type": "string"} } }, constraints={ "rate_limit": "100/hour", "max_recipients": 50, "requires_oauth": True }, quality_target=0.9, max_latency_ms=10000 )

JSON-RPC 2.0 の実装

A2Aプロトコルの通信層は JSON-RPC 2.0 で統一しています。FastAPI でのエンドポイント実装は以下の通りです。

a2a_server.py
from fastapi import FastAPI, Request from fastapi.responses import StreamingResponse from pydantic import BaseModel from typing import Any, Optional import json import asyncio import uuid app = FastAPI() class JsonRpcRequest(BaseModel): jsonrpc: str = "2.0" method: str params: Optional[dict[str, Any]] = None id: Optional[str | int] = None class JsonRpcResponse(BaseModel): jsonrpc: str = "2.0" result: Optional[Any] = None error: Optional[dict] = None id: Optional[str | int] = None # エージェントレジストリ agent_registry: dict[str, AgentCard] = {} @app.post("/a2a/rpc") async def handle_rpc(request: JsonRpcRequest): """JSON-RPC 2.0 エンドポイント""" handlers = { "agent.discover": handle_discover, "agent.execute": handle_execute, "agent.status": handle_status, "agent.cancel": handle_cancel, } handler = handlers.get(request.method) if not handler: return JsonRpcResponse( error={"code": -32601, "message": f"Method not found: {request.method}"}, id=request.id ) try: result = await handler(request.params or {}) return JsonRpcResponse(result=result, id=request.id) except Exception as e: return JsonRpcResponse( error={"code": -32603, "message": str(e)}, id=request.id ) async def handle_discover(params: dict) -> list[dict]: """登録済みエージェントのAgent Cardを返す""" capability = params.get("capability") if capability: return [ card.model_dump() for card in agent_registry.values() if capability in card.capabilities ] return [card.model_dump() for card in agent_registry.values()] async def handle_execute(params: dict) -> dict: """エージェントにタスクを実行させる""" agent_name = params.get("agent") task = params.get("task") task_id = str(uuid.uuid4()) agent = agent_registry.get(agent_name) if not agent: raise ValueError(f"Agent not found: {agent_name}") # 非同期でタスクを開始 asyncio.create_task(run_agent_task(task_id, agent_name, task)) return {"task_id": task_id, "status": "accepted"}

SSE ストリーミング

エージェントの実行状況をリアルタイムでクライアントに伝えるため、Server-Sent Events (SSE) を使用しています。

sse_streaming.py
from fastapi.responses import StreamingResponse import asyncio import json # タスクの進捗を保持するストア task_events: dict[str, asyncio.Queue] = {} @app.get("/a2a/stream/{task_id}") async def stream_task(task_id: str): """SSE でタスクの進捗をストリーミング""" if task_id not in task_events: task_events[task_id] = asyncio.Queue() async def event_generator(): queue = task_events[task_id] while True: event = await queue.get() yield f"event: {event['type']}\n" yield f"data: {json.dumps(event['data'], ensure_ascii=False)}\n\n" if event["type"] in ("complete", "error"): break return StreamingResponse( event_generator(), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no", } ) async def run_agent_task(task_id: str, agent_name: str, task: dict): """エージェントタスクを実行し、進捗をSSEで通知""" queue = task_events.setdefault(task_id, asyncio.Queue()) # 開始通知 await queue.put({ "type": "started", "data": {"task_id": task_id, "agent": agent_name} }) try: # エージェント固有の処理を実行 agent_impl = get_agent_implementation(agent_name) # 進捗コールバック付きで実行 async for progress in agent_impl.execute(task): await queue.put({ "type": "progress", "data": { "task_id": task_id, "step": progress.step, "message": progress.message, "percent": progress.percent } }) # 完了通知 await queue.put({ "type": "complete", "data": {"task_id": task_id, "result": progress.result} }) except Exception as e: await queue.put({ "type": "error", "data": {"task_id": task_id, "error": str(e)} })

フィードバックループの仕組み

chatweb.ai の最も特徴的な仕組みが、6時間ごとの自己改善フィードバックループです。

┌─────────────────────────────────────────┐ │ 6-Hour Feedback Cycle │ │ │ │ 1. Collect ──▶ 2. Evaluate │ │ 実行結果を収集 品質スコアリング │ │ │ │ 4. Deploy ◀── 3. Optimize │ │ 改善版をデプロイ プロンプト最適化 │ │ │ │ ↻ 最大5回/サイクル (Pro) │ └─────────────────────────────────────────┘
feedback_loop.py
from dataclasses import dataclass from datetime import datetime, timedelta import statistics @dataclass class ExecutionRecord: task_id: str agent_name: str success: bool quality_score: float # 0.0 - 1.0 latency_ms: int user_feedback: float | None # ユーザーからのフィードバック timestamp: datetime class FeedbackLoop: def __init__(self, cycle_hours: int = 6, max_iterations: int = 5): self.cycle_hours = cycle_hours self.max_iterations = max_iterations async def run_cycle(self): """6時間サイクルのメインループ""" # 1. 直近サイクルの実行結果を収集 since = datetime.utcnow() - timedelta(hours=self.cycle_hours) records = await self.collect_records(since) # 2. エージェントごとに品質を評価 agent_scores = self.evaluate(records) for agent_name, score_data in agent_scores.items(): target = self.get_quality_target(agent_name) if score_data["avg_score"] >= target: continue # 品質目標達成済み # 3. プロンプト最適化(最大5回反復) for iteration in range(self.max_iterations): improved_prompt = await self.optimize_prompt( agent_name, score_data, iteration ) # テストセットで検証 test_score = await self.test_prompt( agent_name, improved_prompt ) if test_score >= target: # 4. 改善版をデプロイ await self.deploy_prompt(agent_name, improved_prompt) break def evaluate(self, records: list[ExecutionRecord]) -> dict: """エージェントごとの品質評価""" agent_groups: dict[str, list] = {} for r in records: agent_groups.setdefault(r.agent_name, []).append(r) results = {} for agent_name, agent_records in agent_groups.items(): scores = [r.quality_score for r in agent_records] results[agent_name] = { "avg_score": statistics.mean(scores), "min_score": min(scores), "success_rate": sum(1 for r in agent_records if r.success) / len(agent_records), "avg_latency_ms": statistics.mean(r.latency_ms for r in agent_records), "sample_size": len(agent_records), "failure_patterns": self.extract_failure_patterns(agent_records) } return results async def optimize_prompt(self, agent_name: str, score_data: dict, iteration: int) -> str: """メタエージェントによるプロンプト最適化""" current_prompt = await self.get_current_prompt(agent_name) failure_patterns = score_data["failure_patterns"] optimization_prompt = f""" 以下のエージェントのプロンプトを改善してください。 エージェント: {agent_name} 現在の品質スコア: {score_data['avg_score']:.2f} 成功率: {score_data['success_rate']:.1%} イテレーション: {iteration + 1} 失敗パターン: {json.dumps(failure_patterns, ensure_ascii=False, indent=2)} 現在のプロンプト: {current_prompt} 改善方針: - 失敗パターンを具体的に対処する指示を追加 - 既存の成功パターンを壊さない - 簡潔さを保つ """ # メタエージェント(Claude Opus推奨)で最適化 improved = await call_llm("claude-opus", optimization_prompt) return improved

マルチモデルオーケストレーション

12のAIモデルをタスク特性に応じて動的に割り当てています。

model_router.py
MODEL_CONFIG = { "llama-4-scout": {"cost": 0.1, "quality": 0.7, "latency": "fast", "best_for": ["classification", "simple_qa"]}, "claude-opus": {"cost": 1.0, "quality": 0.95, "latency": "slow", "best_for": ["code_gen", "complex_reasoning"]}, "gpt-4o": {"cost": 0.8, "quality": 0.9, "latency": "medium", "best_for": ["general", "multimodal"]}, "gemini-pro": {"cost": 0.5, "quality": 0.85, "latency": "medium", "best_for": ["long_context", "analysis"]}, # ... 8 more models } class ModelRouter: def select_model(self, task_type: str, quality_target: float, budget: str) -> str: """タスクとプランに応じて最適モデルを選択""" candidates = [] for model, config in MODEL_CONFIG.items(): if config["quality"] >= quality_target: if task_type in config["best_for"] or "general" in config["best_for"]: candidates.append((model, config)) if budget == "free": # コスト最小化 candidates.sort(key=lambda x: x[1]["cost"]) else: # 品質最大化(コストは二次要因) candidates.sort(key=lambda x: (-x[1]["quality"], x[1]["cost"])) return candidates[0][0] if candidates else "gpt-4o"

まとめ

A2A v1.0 プロトコルを実装して感じたことをまとめます。

  1. Agent Card による宣言的設計が、エージェント間の結合度を下げる。新しいエージェントの追加が容易になった。
  2. JSON-RPC 2.0 は十分にシンプルで、エージェント間通信の標準として適している。
  3. SSE ストリーミングは、ユーザー体験の観点で必須。エージェントが何をしているか見えないと不安になる。
  4. フィードバックループは最も効果があった。Proプランの品質9/10は、これなしでは達成できなかった。
  5. マルチモデルは運用が複雑になるが、コストと品質の最適化には不可欠。

chatweb.ai のコア実装はオープンソース化を予定しています。興味がある方は https://chatweb.ai からサインアップして試してみてください。

この記事は chatweb.ai の実装を元に、Python/FastAPI で再実装した例です。本番環境では Rust (axum) + AWS Lambda で動作しています。
Zennの場合: 「技術記事」カテゴリ、トピック「Python」「FastAPI」「AI」で投稿。Qiitaの場合: タグ「Python」「FastAPI」「AIエージェント」「A2A」「LLM」。いずれも記事の最後に chatweb.ai へのリンクを含める。