A2A v1.0 を Python/FastAPI で実装した話
chatweb.ai では39の専門AIエージェントが協調してタスクを実行しています。この記事では、Google が提唱する A2A (Agent-to-Agent) v1.0 プロトコルをどのように実装したかを解説します。
TL;DR
- A2AはJSON-RPC 2.0ベースのエージェント間通信プロトコル
- 各エージェントはAgent Cardで能力を宣言
- SSE (Server-Sent Events) でリアルタイムストリーミング
- Python/FastAPI での実装コードを公開
- 6時間ごとのフィードバックループで自動改善
アーキテクチャ概要
chatweb.ai のアーキテクチャは3層構成です。
┌─────────────────────────────────────────────────┐
│ Client Layer │
│ Web UI / LINE / Telegram / API │
└──────────────────────┬──────────────────────────┘
│ HTTP/SSE
┌──────────────────────▼──────────────────────────┐
│ Orchestrator (Router) │
│ - タスク解析・分類 │
│ - Agent Card ベースのエージェント選択 │
│ - JSON-RPC 2.0 ディスパッチ │
│ - フィードバック集約 │
└───┬──────┬──────┬──────┬──────┬─────────────────┘
│ │ │ │ │
┌───▼─┐┌───▼─┐┌───▼─┐┌───▼─┐┌───▼─┐
│Gmail││Web ││Code ││Brow-││ ... │ x39 Agents
│Agent││Pub. ││Dep. ││ser ││ │
└─────┘└─────┘└─────┘└─────┘└─────┘
│ │ │ │ │
┌───▼──────▼──────▼──────▼──────▼─────────────────┐
│ 12 AI Models (LLM Layer) │
│ Llama 4 | Claude | GPT-4o | Gemini | Qwen ... │
└─────────────────────────────────────────────────┘
Agent Card の設計
A2Aプロトコルでは、各エージェントが Agent Card を公開し、自身の能力を宣言します。これにより、Orchestratorはタスクに最適なエージェントを動的に選択できます。
agent_card.py
from pydantic import BaseModel
from typing import Optional
class AgentCard(BaseModel):
"""A2A Agent Card - エージェントの能力宣言"""
name: str
description: str
version: str = "1.0.0"
protocol: str = "a2a/1.0"
capabilities: list[str]
input_schema: dict
output_schema: dict
constraints: Optional[dict] = None
quality_target: float = 0.7 # 0.0 - 1.0
max_latency_ms: int = 30000
# GmailAgent の Agent Card 例
gmail_agent_card = AgentCard(
name="gmail-agent",
description="Gmail操作エージェント。メールの送信、返信、検索、整理を実行。",
capabilities=[
"email.send",
"email.reply",
"email.search",
"email.organize",
"email.draft"
],
input_schema={
"type": "object",
"properties": {
"action": {"type": "string", "enum": ["send", "reply", "search", "organize"]},
"to": {"type": "array", "items": {"type": "string", "format": "email"}},
"subject": {"type": "string"},
"body": {"type": "string"},
"thread_id": {"type": "string"}
},
"required": ["action"]
},
output_schema={
"type": "object",
"properties": {
"status": {"type": "string", "enum": ["success", "failure", "partial"]},
"message_id": {"type": "string"},
"details": {"type": "string"}
}
},
constraints={
"rate_limit": "100/hour",
"max_recipients": 50,
"requires_oauth": True
},
quality_target=0.9,
max_latency_ms=10000
)
JSON-RPC 2.0 の実装
A2Aプロトコルの通信層は JSON-RPC 2.0 で統一しています。FastAPI でのエンドポイント実装は以下の通りです。
a2a_server.py
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from typing import Any, Optional
import json
import asyncio
import uuid
app = FastAPI()
class JsonRpcRequest(BaseModel):
jsonrpc: str = "2.0"
method: str
params: Optional[dict[str, Any]] = None
id: Optional[str | int] = None
class JsonRpcResponse(BaseModel):
jsonrpc: str = "2.0"
result: Optional[Any] = None
error: Optional[dict] = None
id: Optional[str | int] = None
# エージェントレジストリ
agent_registry: dict[str, AgentCard] = {}
@app.post("/a2a/rpc")
async def handle_rpc(request: JsonRpcRequest):
"""JSON-RPC 2.0 エンドポイント"""
handlers = {
"agent.discover": handle_discover,
"agent.execute": handle_execute,
"agent.status": handle_status,
"agent.cancel": handle_cancel,
}
handler = handlers.get(request.method)
if not handler:
return JsonRpcResponse(
error={"code": -32601, "message": f"Method not found: {request.method}"},
id=request.id
)
try:
result = await handler(request.params or {})
return JsonRpcResponse(result=result, id=request.id)
except Exception as e:
return JsonRpcResponse(
error={"code": -32603, "message": str(e)},
id=request.id
)
async def handle_discover(params: dict) -> list[dict]:
"""登録済みエージェントのAgent Cardを返す"""
capability = params.get("capability")
if capability:
return [
card.model_dump()
for card in agent_registry.values()
if capability in card.capabilities
]
return [card.model_dump() for card in agent_registry.values()]
async def handle_execute(params: dict) -> dict:
"""エージェントにタスクを実行させる"""
agent_name = params.get("agent")
task = params.get("task")
task_id = str(uuid.uuid4())
agent = agent_registry.get(agent_name)
if not agent:
raise ValueError(f"Agent not found: {agent_name}")
# 非同期でタスクを開始
asyncio.create_task(run_agent_task(task_id, agent_name, task))
return {"task_id": task_id, "status": "accepted"}
SSE ストリーミング
エージェントの実行状況をリアルタイムでクライアントに伝えるため、Server-Sent Events (SSE) を使用しています。
sse_streaming.py
from fastapi.responses import StreamingResponse
import asyncio
import json
# タスクの進捗を保持するストア
task_events: dict[str, asyncio.Queue] = {}
@app.get("/a2a/stream/{task_id}")
async def stream_task(task_id: str):
"""SSE でタスクの進捗をストリーミング"""
if task_id not in task_events:
task_events[task_id] = asyncio.Queue()
async def event_generator():
queue = task_events[task_id]
while True:
event = await queue.get()
yield f"event: {event['type']}\n"
yield f"data: {json.dumps(event['data'], ensure_ascii=False)}\n\n"
if event["type"] in ("complete", "error"):
break
return StreamingResponse(
event_generator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no",
}
)
async def run_agent_task(task_id: str, agent_name: str, task: dict):
"""エージェントタスクを実行し、進捗をSSEで通知"""
queue = task_events.setdefault(task_id, asyncio.Queue())
# 開始通知
await queue.put({
"type": "started",
"data": {"task_id": task_id, "agent": agent_name}
})
try:
# エージェント固有の処理を実行
agent_impl = get_agent_implementation(agent_name)
# 進捗コールバック付きで実行
async for progress in agent_impl.execute(task):
await queue.put({
"type": "progress",
"data": {
"task_id": task_id,
"step": progress.step,
"message": progress.message,
"percent": progress.percent
}
})
# 完了通知
await queue.put({
"type": "complete",
"data": {"task_id": task_id, "result": progress.result}
})
except Exception as e:
await queue.put({
"type": "error",
"data": {"task_id": task_id, "error": str(e)}
})
フィードバックループの仕組み
chatweb.ai の最も特徴的な仕組みが、6時間ごとの自己改善フィードバックループです。
┌─────────────────────────────────────────┐
│ 6-Hour Feedback Cycle │
│ │
│ 1. Collect ──▶ 2. Evaluate │
│ 実行結果を収集 品質スコアリング │
│ │
│ 4. Deploy ◀── 3. Optimize │
│ 改善版をデプロイ プロンプト最適化 │
│ │
│ ↻ 最大5回/サイクル (Pro) │
└─────────────────────────────────────────┘
feedback_loop.py
from dataclasses import dataclass
from datetime import datetime, timedelta
import statistics
@dataclass
class ExecutionRecord:
task_id: str
agent_name: str
success: bool
quality_score: float # 0.0 - 1.0
latency_ms: int
user_feedback: float | None # ユーザーからのフィードバック
timestamp: datetime
class FeedbackLoop:
def __init__(self, cycle_hours: int = 6, max_iterations: int = 5):
self.cycle_hours = cycle_hours
self.max_iterations = max_iterations
async def run_cycle(self):
"""6時間サイクルのメインループ"""
# 1. 直近サイクルの実行結果を収集
since = datetime.utcnow() - timedelta(hours=self.cycle_hours)
records = await self.collect_records(since)
# 2. エージェントごとに品質を評価
agent_scores = self.evaluate(records)
for agent_name, score_data in agent_scores.items():
target = self.get_quality_target(agent_name)
if score_data["avg_score"] >= target:
continue # 品質目標達成済み
# 3. プロンプト最適化(最大5回反復)
for iteration in range(self.max_iterations):
improved_prompt = await self.optimize_prompt(
agent_name,
score_data,
iteration
)
# テストセットで検証
test_score = await self.test_prompt(
agent_name, improved_prompt
)
if test_score >= target:
# 4. 改善版をデプロイ
await self.deploy_prompt(agent_name, improved_prompt)
break
def evaluate(self, records: list[ExecutionRecord]) -> dict:
"""エージェントごとの品質評価"""
agent_groups: dict[str, list] = {}
for r in records:
agent_groups.setdefault(r.agent_name, []).append(r)
results = {}
for agent_name, agent_records in agent_groups.items():
scores = [r.quality_score for r in agent_records]
results[agent_name] = {
"avg_score": statistics.mean(scores),
"min_score": min(scores),
"success_rate": sum(1 for r in agent_records if r.success) / len(agent_records),
"avg_latency_ms": statistics.mean(r.latency_ms for r in agent_records),
"sample_size": len(agent_records),
"failure_patterns": self.extract_failure_patterns(agent_records)
}
return results
async def optimize_prompt(self, agent_name: str, score_data: dict, iteration: int) -> str:
"""メタエージェントによるプロンプト最適化"""
current_prompt = await self.get_current_prompt(agent_name)
failure_patterns = score_data["failure_patterns"]
optimization_prompt = f"""
以下のエージェントのプロンプトを改善してください。
エージェント: {agent_name}
現在の品質スコア: {score_data['avg_score']:.2f}
成功率: {score_data['success_rate']:.1%}
イテレーション: {iteration + 1}
失敗パターン:
{json.dumps(failure_patterns, ensure_ascii=False, indent=2)}
現在のプロンプト:
{current_prompt}
改善方針:
- 失敗パターンを具体的に対処する指示を追加
- 既存の成功パターンを壊さない
- 簡潔さを保つ
"""
# メタエージェント(Claude Opus推奨)で最適化
improved = await call_llm("claude-opus", optimization_prompt)
return improved
マルチモデルオーケストレーション
12のAIモデルをタスク特性に応じて動的に割り当てています。
model_router.py
MODEL_CONFIG = {
"llama-4-scout": {"cost": 0.1, "quality": 0.7, "latency": "fast", "best_for": ["classification", "simple_qa"]},
"claude-opus": {"cost": 1.0, "quality": 0.95, "latency": "slow", "best_for": ["code_gen", "complex_reasoning"]},
"gpt-4o": {"cost": 0.8, "quality": 0.9, "latency": "medium", "best_for": ["general", "multimodal"]},
"gemini-pro": {"cost": 0.5, "quality": 0.85, "latency": "medium", "best_for": ["long_context", "analysis"]},
# ... 8 more models
}
class ModelRouter:
def select_model(self, task_type: str, quality_target: float, budget: str) -> str:
"""タスクとプランに応じて最適モデルを選択"""
candidates = []
for model, config in MODEL_CONFIG.items():
if config["quality"] >= quality_target:
if task_type in config["best_for"] or "general" in config["best_for"]:
candidates.append((model, config))
if budget == "free":
# コスト最小化
candidates.sort(key=lambda x: x[1]["cost"])
else:
# 品質最大化(コストは二次要因)
candidates.sort(key=lambda x: (-x[1]["quality"], x[1]["cost"]))
return candidates[0][0] if candidates else "gpt-4o"
まとめ
A2A v1.0 プロトコルを実装して感じたことをまとめます。
- Agent Card による宣言的設計が、エージェント間の結合度を下げる。新しいエージェントの追加が容易になった。
- JSON-RPC 2.0 は十分にシンプルで、エージェント間通信の標準として適している。
- SSE ストリーミングは、ユーザー体験の観点で必須。エージェントが何をしているか見えないと不安になる。
- フィードバックループは最も効果があった。Proプランの品質9/10は、これなしでは達成できなかった。
- マルチモデルは運用が複雑になるが、コストと品質の最適化には不可欠。
chatweb.ai のコア実装はオープンソース化を予定しています。興味がある方は https://chatweb.ai からサインアップして試してみてください。
この記事は chatweb.ai の実装を元に、Python/FastAPI で再実装した例です。本番環境では Rust (axum) + AWS Lambda で動作しています。