標籤: Agent Orchestrator

  • A2A 多代理協議實戰與踩坑筆記

    A2A 多代理協議實戰與踩坑筆記

    📌 本文重點

    • A2A 讓 agents 變成可重用服務,而非每案重寫
    • 協議核心是註冊發現、任務路由與狀態冪等設計
    • 穩定的 A2A 契約可讓框架與部署自由演進

    一旦你有第二個客戶、第二條業務線,原本那套「單體 agent + 一大坨編排器」就會開始失控:每個客戶複製一份 agent 系統、工具無法共享、編排器越寫越巨大。A2A(Agent to Agent)協議的目標就是:用一個標準化通訊協議,把 agent 變成可重用服務,而不是每個專案重造一輪輪子。


    重點說明:A2A 的核心設計

    1. 代理註冊與發現:從「硬編碼 URL」到「可發現的服務」

    單體時代常見寫法:

    // 單體編排器內部硬呼叫
    const result = await routingAgent.handle(request);
    

    一旦你要把同一個 routingAgent 給 N 個客戶用,就需要:

    1. 註冊機制:每個 agent 在啟動時向一個 Registry / Discovery Service 報到:
    2. idshipment-validator
    3. capabilities:支援的任務類型(schema / tags)
    4. endpoint:如 https://agents.mycorp.com/shipment-validator
    5. 發現機制:編排器不再硬編 URL,而是呼叫 Registry API
    GET /agents?task=shipment.validate
    

    得到 agent 清單後再去調用具體 endpoint

    好處:同一組 agent 服務可以給多個客戶編排器共用;換掉實作(LangChain ➝ 自研框架)也只要更新註冊資料。

    💡 關鍵: 透過註冊 / 發現機制,同一個 agent 可被多客戶共用,大幅減少重複實作與維護成本。


    2. 任務路由:編排器既是 server 也是 client

    在多代理場景中,編排器不只是 HTTP server,也必須是 HTTP client

    • 收到來自外部系統、前端的請求:server 身份
    • 需要委派子任務給其他 agents / 其他編排器:client 身份

    一個最小 A2A 任務請求格式可以長這樣(HTTP + JSON):

    POST /invoke HTTP/1.1
    Content-Type: application/json
    X-Trace-Id: 9f2e0a1c-...
    
    {
      "task": "shipment.validate",
      "input": { "shipmentId": "S123" },
      "context": {
        "tenantId": "customer-a",
        "locale": "zh-TW"
      },
      "caller": {
        "agentId": "orchestrator-logistics",
        "replyUrl": "https://orch-a.mycorp.com/a2a/callback"
      }
    }
    

    返回結果:

    {
      "task": "shipment.validate",
      "status": "success",
      "output": {
        "isValid": true,
        "warnings": []
      },
      "error": null,
      "meta": {
        "traceId": "9f2e0a1c-...",
        "agentId": "shipment-validator",
        "durationMs": 324
      }
    }
    

    關鍵點

    • task:描述抽象任務,而不是具體路由 URL,方便 routing / 升級
    • caller.replyUrl:允許非同步回調(長耗時任務、跨 VPC 任務)
    • X-Trace-Id + meta.traceId:貫穿多跳 agent 的追蹤

    3. 狀態、錯誤、冪等:分散式 agent 的生存三寶

    多 agent 跨服務邊界後,你必須清楚回答三件事:

    1. 狀態放哪裡?

    2. 不要把 workflow state 塞在 LLM context 裡。

    3. 建議:

      • 長期業務狀態:外部 DB / 狀態機服務(如 Statewright 類型)
      • 短期呼叫狀態:每個 A2A 請求帶 taskRunId,在 DB 以 event-sourcing 或簡單 JSON blob 存歷程。
    4. 錯誤怎麼傳遞?

    {
      "status": "error",
      "error": {
        "type": "VALIDATION_ERROR",
        "message": "Unknown shipmentId S123",
        "retryable": false,
        "details": { "field": "shipmentId" }
      }
    }
    
    • retryable 很重要:編排器根據這個決定是否自動重試或改走 fallback。

    • 冪等怎麼做?

    • 每個任務呼叫帶 requestId

    {
      "task": "shipment.create",
      "requestId": "create-S123-20240501T100000Z",
      ...
    }
    
    • agent 收到相同 requestId 時,重複回傳同一結果而不是再次寫 DB
    • 真實踩坑:物流「建立訂單 + 發通知」若沒冪等,在重試時會重複發貨或發兩封簡訊。

    💡 關鍵: 為每個任務設計 requestId + retryable,是避免重複扣款、重複發貨等災難級錯誤的關鍵保險絲。


    實作範例:從單體 Agent app 演進到 A2A 系統

    下面是一個極簡版 A2A 實作,基於 Node.js + Express + HTTP + JSON,示範:

    • agent 如何註冊
    • orchestrator 如何發現 + 呼叫
    • 如何保留 traceId、處理重試

    1. Agent 啟動與註冊

    // agent/shipment-validator.ts
    import express from 'express';
    import fetch from 'node-fetch';
    
    const app = express();
    app.use(express.json());
    
    const AGENT_ID = 'shipment-validator';
    const REGISTRY_URL = process.env.REGISTRY_URL!;
    
    async function register() {
      await fetch(`${REGISTRY_URL}/agents/register`, {
        method: 'POST',
        headers: { 'Content-Type': 'application/json' },
        body: JSON.stringify({
          id: AGENT_ID,
          endpoint: process.env.PUBLIC_URL,
          capabilities: ['shipment.validate'],
          version: '1.0.0'
        })
      });
    }
    
    app.post('/invoke', async (req, res) => {
      const traceId = req.header('X-Trace-Id') || crypto.randomUUID();
      const { task, input, requestId } = req.body;
    
      if (task !== 'shipment.validate') {
        return res.status(400).json({ status: 'error', error: { type: 'UNKNOWN_TASK' }});
      }
    
      // TODO: 查 DB 或外部 API
      const exists = input.shipmentId?.startsWith('S');
    
      res.json({
        task,
        status: 'success',
        output: { isValid: !!exists },
        meta: { traceId, agentId: AGENT_ID }
      });
    });
    
    app.listen(3001, async () => {
      await register();
      console.log('shipment-validator listening on 3001');
    });
    

    2. Registry:最小可用版本

    // registry/index.ts
    import express from 'express';
    const app = express();
    app.use(express.json());
    
    const agents = new Map<string, any>();
    
    app.post('/agents/register', (req, res) => {
      const { id, endpoint, capabilities, version } = req.body;
      agents.set(id, { id, endpoint, capabilities, version });
      res.json({ ok: true });
    });
    
    app.get('/agents', (req, res) => {
      const task = req.query.task as string;
      const matches = [...agents.values()].filter(a =>
        a.capabilities.includes(task)
      );
      res.json(matches);
    });
    
    app.listen(3000, () => console.log('registry on 3000'));
    

    3. Orchestrator:既當 server 又當 client

    // orchestrator/logistics.ts
    import express from 'express';
    import fetch from 'node-fetch';
    
    const app = express();
    app.use(express.json());
    
    const REGISTRY_URL = process.env.REGISTRY_URL!;
    
    async function findAgentForTask(task: string) {
      const res = await fetch(`${REGISTRY_URL}/agents?task=${encodeURIComponent(task)}`);
      const list = await res.json();
      if (!list.length) throw new Error(`No agent for task ${task}`);
      return list[0]; // naive: take first
    }
    
    app.post('/shipment/check', async (req, res) => {
      const traceId = req.header('X-Trace-Id') || crypto.randomUUID();
      const { shipmentId } = req.body;
    
      const agent = await findAgentForTask('shipment.validate');
    
      const requestBody = {
        task: 'shipment.validate',
        requestId: `validate-${shipmentId}`,
        input: { shipmentId },
        context: { tenantId: 'customer-a' },
        caller: {
          agentId: 'orchestrator-logistics',
          replyUrl: null
        }
      };
    
      const resp = await fetch(`${agent.endpoint}/invoke`, {
        method: 'POST',
        headers: {
          'Content-Type': 'application/json',
          'X-Trace-Id': traceId
        },
        body: JSON.stringify(requestBody)
      });
    
      const data = await resp.json();
    
      // 簡化處理,實際上應依 data.status 分支
      res.setHeader('X-Trace-Id', data.meta?.traceId || traceId);
      res.json(data);
    });
    
    app.listen(4000, () => console.log('orchestrator on 4000'));
    

    這樣,你就從原本的「單體 app 內部函式呼叫」,演進到:

    • 多個客戶可以共用 shipment-validator agent
    • 想新增 pricing-agent,只要註冊到 Registry,不改 orchestrator 主程式

    進一步要上 gRPC、使用隊列系統(Kafka / SQS)時,只是把 /invoke 的 transport 換掉,協議 payload 大致可以不變。

    💡 關鍵: 先穩定 JSON 協議,再替換 HTTP、gRPC、Queue 等傳輸層,可以減少大規模重構帶來的風險。


    建議與注意事項:真實踩坑整理

    1. 編排器雙角色帶來的競態條件

    當 orchestrator 既是 server 又是 client 時,常見問題:

    • 同步與非同步混用:一部分任務是同步 HTTP 呼叫,一部分透過 queue 非同步回調,結果狀態管理爆炸。

    建議:

    • 明確區分 請求-回應 vs fire-and-forget 任務 的 API。
    • 在邏輯架構層(可參考 AI Agent Logical Architecture 那篇思路)先畫出 state machine,再實作;工具如 Statewright 類型可以讓流程更可視化。

    2. 訊息格式與 schema 演進

    多客戶、多 agent 之後,改一個欄位就是全網恐慌

    • 務必使用明確的 version 欄位:
    {
      "protocolVersion": "a2a-1.1",
      "task": "shipment.validate",
      ...
    }
    
    • 新增欄位 ➝ 預設 optional,舊 agent 收到可忽略。
    • 刪除 / 改名欄位 ➝ 用 Deprecation window(先標註 deprecated: true 一段時間)。

    3. 重試與冪等:不要指望下游「應該沒事」

    在多代理系統裡,任何一跳都可能:

    • timeout
    • 500
    • 只執行了一半邏輯

    建議:

    • 所有會造成 side effect 的任務必須有 requestId
    • 在 DB 以 (requestId, task) 做唯一索引
    • 重試時以 requestId 查找舊結果,若存在則直接回傳

    4. 日誌與追蹤:traceId 一定要往外帶

    常見錯誤是只在 API gateway 或第一個 service 打 log。多 agent 下:

    • 每個 A2A 呼叫都要帶 X-Trace-Id header。
    • 每個 agent 回傳結果時,把自己的 agentIdtaskdurationMs 打到 log。
    • 方便之後在 log system(如 ELK / OpenTelemetry)中串連整條 workflow。

    5. 和 MCP、serverless 編排器整合時的性能與隔離

    • MCP / LangChain / LlamaIndex 等框架
    • 不要把所有工具都塞進同一個 process;可以把重型工具封裝為獨立 agent,透過 A2A 呼叫,避免單一框架 process 變成巨石。
    • Serverless 編排器(如 Step FunctionsTemporal
    • 用 A2A 把「呼叫 agent」當成一個 task type。
    • 注意冷啟動 + LLM 啟動成本:盡量把 agent 部署為長跑服務,serverless 只負責 orchestration。
    • 隊列系統整合(Kafka / SQS / RabbitMQ
    • A2A 協議層仍用同一份 JSON schema,只是 transport 從 HTTP 換成 message。
    • 確保 message 中也有 traceIdrequestIdtaskprotocolVersion,否則 debug 會極其痛苦。

    總結

    • A2A 的本質不是某個框架,而是一套清楚的多代理通訊契約
    • 一旦協議穩定,你可以自由更換 LLM、agent framework、部署方式,但還能:
    • 在多客戶間重用同一組 agents
    • 控制編排器複雜度
    • 在真實生產環境中可觀測、可回溯、可演進。

    如果你已經有一個「會動但很醜」的單體 agent app,建議先抽出最常用的 1–2 個子任務,照上面的 minimal A2A 協議拆出去,從那裡開始演進,而不是一次重寫全系統。

    🚀 你現在可以做的事

    • 把現有單體 agent app 中 1–2 個常用任務,先按文中 JSON 協議拆成獨立 /invoke 服務
    • 為這些任務統一增加 requestIdX-Trace-Id,並在日誌中打出 agentIdtaskdurationMs
    • 實作一個最小版 Registry(照文中 registry/index.ts),讓 orchestrator 透過發現機制而不是硬編 URL 呼叫 agents