A2A 多代理協議實戰與踩坑筆記

A2A 多代理協議實戰與踩坑筆記

📌 本文重點

  • A2A 讓 agents 變成可重用服務,而非每案重寫
  • 協議核心是註冊發現、任務路由與狀態冪等設計
  • 穩定的 A2A 契約可讓框架與部署自由演進

一旦你有第二個客戶、第二條業務線,原本那套「單體 agent + 一大坨編排器」就會開始失控:每個客戶複製一份 agent 系統、工具無法共享、編排器越寫越巨大。A2A(Agent to Agent)協議的目標就是:用一個標準化通訊協議,把 agent 變成可重用服務,而不是每個專案重造一輪輪子。


重點說明:A2A 的核心設計

1. 代理註冊與發現:從「硬編碼 URL」到「可發現的服務」

單體時代常見寫法:

// 單體編排器內部硬呼叫
const result = await routingAgent.handle(request);

一旦你要把同一個 routingAgent 給 N 個客戶用,就需要:

  1. 註冊機制:每個 agent 在啟動時向一個 Registry / Discovery Service 報到:
  2. idshipment-validator
  3. capabilities:支援的任務類型(schema / tags)
  4. endpoint:如 https://agents.mycorp.com/shipment-validator
  5. 發現機制:編排器不再硬編 URL,而是呼叫 Registry API
GET /agents?task=shipment.validate

得到 agent 清單後再去調用具體 endpoint

好處:同一組 agent 服務可以給多個客戶編排器共用;換掉實作(LangChain ➝ 自研框架)也只要更新註冊資料。

💡 關鍵: 透過註冊 / 發現機制,同一個 agent 可被多客戶共用,大幅減少重複實作與維護成本。


2. 任務路由:編排器既是 server 也是 client

在多代理場景中,編排器不只是 HTTP server,也必須是 HTTP client

  • 收到來自外部系統、前端的請求:server 身份
  • 需要委派子任務給其他 agents / 其他編排器:client 身份

一個最小 A2A 任務請求格式可以長這樣(HTTP + JSON):

POST /invoke HTTP/1.1
Content-Type: application/json
X-Trace-Id: 9f2e0a1c-...

{
  "task": "shipment.validate",
  "input": { "shipmentId": "S123" },
  "context": {
    "tenantId": "customer-a",
    "locale": "zh-TW"
  },
  "caller": {
    "agentId": "orchestrator-logistics",
    "replyUrl": "https://orch-a.mycorp.com/a2a/callback"
  }
}

返回結果:

{
  "task": "shipment.validate",
  "status": "success",
  "output": {
    "isValid": true,
    "warnings": []
  },
  "error": null,
  "meta": {
    "traceId": "9f2e0a1c-...",
    "agentId": "shipment-validator",
    "durationMs": 324
  }
}

關鍵點

  • task:描述抽象任務,而不是具體路由 URL,方便 routing / 升級
  • caller.replyUrl:允許非同步回調(長耗時任務、跨 VPC 任務)
  • X-Trace-Id + meta.traceId:貫穿多跳 agent 的追蹤

3. 狀態、錯誤、冪等:分散式 agent 的生存三寶

多 agent 跨服務邊界後,你必須清楚回答三件事:

  1. 狀態放哪裡?

  2. 不要把 workflow state 塞在 LLM context 裡。

  3. 建議:

    • 長期業務狀態:外部 DB / 狀態機服務(如 Statewright 類型)
    • 短期呼叫狀態:每個 A2A 請求帶 taskRunId,在 DB 以 event-sourcing 或簡單 JSON blob 存歷程。
  4. 錯誤怎麼傳遞?

{
  "status": "error",
  "error": {
    "type": "VALIDATION_ERROR",
    "message": "Unknown shipmentId S123",
    "retryable": false,
    "details": { "field": "shipmentId" }
  }
}
  • retryable 很重要:編排器根據這個決定是否自動重試或改走 fallback。

  • 冪等怎麼做?

  • 每個任務呼叫帶 requestId

{
  "task": "shipment.create",
  "requestId": "create-S123-20240501T100000Z",
  ...
}
  • agent 收到相同 requestId 時,重複回傳同一結果而不是再次寫 DB
  • 真實踩坑:物流「建立訂單 + 發通知」若沒冪等,在重試時會重複發貨或發兩封簡訊。

💡 關鍵: 為每個任務設計 requestId + retryable,是避免重複扣款、重複發貨等災難級錯誤的關鍵保險絲。


實作範例:從單體 Agent app 演進到 A2A 系統

下面是一個極簡版 A2A 實作,基於 Node.js + Express + HTTP + JSON,示範:

  • agent 如何註冊
  • orchestrator 如何發現 + 呼叫
  • 如何保留 traceId、處理重試

1. Agent 啟動與註冊

// agent/shipment-validator.ts
import express from 'express';
import fetch from 'node-fetch';

const app = express();
app.use(express.json());

const AGENT_ID = 'shipment-validator';
const REGISTRY_URL = process.env.REGISTRY_URL!;

async function register() {
  await fetch(`${REGISTRY_URL}/agents/register`, {
    method: 'POST',
    headers: { 'Content-Type': 'application/json' },
    body: JSON.stringify({
      id: AGENT_ID,
      endpoint: process.env.PUBLIC_URL,
      capabilities: ['shipment.validate'],
      version: '1.0.0'
    })
  });
}

app.post('/invoke', async (req, res) => {
  const traceId = req.header('X-Trace-Id') || crypto.randomUUID();
  const { task, input, requestId } = req.body;

  if (task !== 'shipment.validate') {
    return res.status(400).json({ status: 'error', error: { type: 'UNKNOWN_TASK' }});
  }

  // TODO: 查 DB 或外部 API
  const exists = input.shipmentId?.startsWith('S');

  res.json({
    task,
    status: 'success',
    output: { isValid: !!exists },
    meta: { traceId, agentId: AGENT_ID }
  });
});

app.listen(3001, async () => {
  await register();
  console.log('shipment-validator listening on 3001');
});

2. Registry:最小可用版本

// registry/index.ts
import express from 'express';
const app = express();
app.use(express.json());

const agents = new Map<string, any>();

app.post('/agents/register', (req, res) => {
  const { id, endpoint, capabilities, version } = req.body;
  agents.set(id, { id, endpoint, capabilities, version });
  res.json({ ok: true });
});

app.get('/agents', (req, res) => {
  const task = req.query.task as string;
  const matches = [...agents.values()].filter(a =>
    a.capabilities.includes(task)
  );
  res.json(matches);
});

app.listen(3000, () => console.log('registry on 3000'));

3. Orchestrator:既當 server 又當 client

// orchestrator/logistics.ts
import express from 'express';
import fetch from 'node-fetch';

const app = express();
app.use(express.json());

const REGISTRY_URL = process.env.REGISTRY_URL!;

async function findAgentForTask(task: string) {
  const res = await fetch(`${REGISTRY_URL}/agents?task=${encodeURIComponent(task)}`);
  const list = await res.json();
  if (!list.length) throw new Error(`No agent for task ${task}`);
  return list[0]; // naive: take first
}

app.post('/shipment/check', async (req, res) => {
  const traceId = req.header('X-Trace-Id') || crypto.randomUUID();
  const { shipmentId } = req.body;

  const agent = await findAgentForTask('shipment.validate');

  const requestBody = {
    task: 'shipment.validate',
    requestId: `validate-${shipmentId}`,
    input: { shipmentId },
    context: { tenantId: 'customer-a' },
    caller: {
      agentId: 'orchestrator-logistics',
      replyUrl: null
    }
  };

  const resp = await fetch(`${agent.endpoint}/invoke`, {
    method: 'POST',
    headers: {
      'Content-Type': 'application/json',
      'X-Trace-Id': traceId
    },
    body: JSON.stringify(requestBody)
  });

  const data = await resp.json();

  // 簡化處理,實際上應依 data.status 分支
  res.setHeader('X-Trace-Id', data.meta?.traceId || traceId);
  res.json(data);
});

app.listen(4000, () => console.log('orchestrator on 4000'));

這樣,你就從原本的「單體 app 內部函式呼叫」,演進到:

  • 多個客戶可以共用 shipment-validator agent
  • 想新增 pricing-agent,只要註冊到 Registry,不改 orchestrator 主程式

進一步要上 gRPC、使用隊列系統(Kafka / SQS)時,只是把 /invoke 的 transport 換掉,協議 payload 大致可以不變。

💡 關鍵: 先穩定 JSON 協議,再替換 HTTP、gRPC、Queue 等傳輸層,可以減少大規模重構帶來的風險。


建議與注意事項:真實踩坑整理

1. 編排器雙角色帶來的競態條件

當 orchestrator 既是 server 又是 client 時,常見問題:

  • 同步與非同步混用:一部分任務是同步 HTTP 呼叫,一部分透過 queue 非同步回調,結果狀態管理爆炸。

建議:

  • 明確區分 請求-回應 vs fire-and-forget 任務 的 API。
  • 在邏輯架構層(可參考 AI Agent Logical Architecture 那篇思路)先畫出 state machine,再實作;工具如 Statewright 類型可以讓流程更可視化。

2. 訊息格式與 schema 演進

多客戶、多 agent 之後,改一個欄位就是全網恐慌

  • 務必使用明確的 version 欄位:
{
  "protocolVersion": "a2a-1.1",
  "task": "shipment.validate",
  ...
}
  • 新增欄位 ➝ 預設 optional,舊 agent 收到可忽略。
  • 刪除 / 改名欄位 ➝ 用 Deprecation window(先標註 deprecated: true 一段時間)。

3. 重試與冪等:不要指望下游「應該沒事」

在多代理系統裡,任何一跳都可能:

  • timeout
  • 500
  • 只執行了一半邏輯

建議:

  • 所有會造成 side effect 的任務必須有 requestId
  • 在 DB 以 (requestId, task) 做唯一索引
  • 重試時以 requestId 查找舊結果,若存在則直接回傳

4. 日誌與追蹤:traceId 一定要往外帶

常見錯誤是只在 API gateway 或第一個 service 打 log。多 agent 下:

  • 每個 A2A 呼叫都要帶 X-Trace-Id header。
  • 每個 agent 回傳結果時,把自己的 agentIdtaskdurationMs 打到 log。
  • 方便之後在 log system(如 ELK / OpenTelemetry)中串連整條 workflow。

5. 和 MCP、serverless 編排器整合時的性能與隔離

  • MCP / LangChain / LlamaIndex 等框架
  • 不要把所有工具都塞進同一個 process;可以把重型工具封裝為獨立 agent,透過 A2A 呼叫,避免單一框架 process 變成巨石。
  • Serverless 編排器(如 Step FunctionsTemporal
  • 用 A2A 把「呼叫 agent」當成一個 task type。
  • 注意冷啟動 + LLM 啟動成本:盡量把 agent 部署為長跑服務,serverless 只負責 orchestration。
  • 隊列系統整合(Kafka / SQS / RabbitMQ
  • A2A 協議層仍用同一份 JSON schema,只是 transport 從 HTTP 換成 message。
  • 確保 message 中也有 traceIdrequestIdtaskprotocolVersion,否則 debug 會極其痛苦。

總結

  • A2A 的本質不是某個框架,而是一套清楚的多代理通訊契約
  • 一旦協議穩定,你可以自由更換 LLM、agent framework、部署方式,但還能:
  • 在多客戶間重用同一組 agents
  • 控制編排器複雜度
  • 在真實生產環境中可觀測、可回溯、可演進。

如果你已經有一個「會動但很醜」的單體 agent app,建議先抽出最常用的 1–2 個子任務,照上面的 minimal A2A 協議拆出去,從那裡開始演進,而不是一次重寫全系統。

🚀 你現在可以做的事

  • 把現有單體 agent app 中 1–2 個常用任務,先按文中 JSON 協議拆成獨立 /invoke 服務
  • 為這些任務統一增加 requestIdX-Trace-Id,並在日誌中打出 agentIdtaskdurationMs
  • 實作一個最小版 Registry(照文中 registry/index.ts),讓 orchestrator 透過發現機制而不是硬編 URL 呼叫 agents

留言

發佈留言

發佈留言必須填寫的電子郵件地址不會公開。 必填欄位標示為 *