標籤: Google Gemini

  • 用 Gemini Webhooks 正確打開長任務 LLM

    用 Gemini Webhooks 正確打開長任務 LLM

    📌 本文重點

    • 用 Webhook 取代 polling,解決長任務阻塞
    • Webhook handler 要做到驗簽、冪等、極瘦
    • 任務抽象成 job+事件+worker,易於擴充與控管

    長任務 LLM(大檔案 RAG 摘要、批量程式碼分析、批次生成報表)現在普遍體驗很爛,核心原因通常只有兩個字:阻塞

    傳統做法是:前端打一個同步 API,後端卡在那裡等 LLM 完成,或者先回 job_id,然後一直 polling 查狀態。前者直接拖垮後端連線與 gateway timeout,後者則是浪費資源+狀態更新延遲。Gemini Webhooks 正在解這個痛:把長任務變成事件驅動的推送流程,讓你的系統只在「有事發生」時醒來,而不是每 5 秒在那邊瞎問「好了沒?」。


    重點說明:為什麼要用 Gemini Webhooks

    💡 關鍵: 用事件驅動模式取代頻繁 polling,可同時避免 gateway timeout 和後端資源被無意義查詢拖垮

    1. 事件驅動:用任務狀態變更取代 polling

    在 Gemini API 中,長任務(例如 videos:asyncGenerate、大型批次推理)完成時,會透過 Webhooks 主動打到你註冊的 URL。概念上會有類似以下事件:

    • job.created:任務建立成功
    • job.running:模型開始處理
    • job.succeeded:任務完成,可讀取結果
    • job.failed:任務失敗,附錯誤碼

    你不再需要 setInterval 去拉 GET /jobs/{id},而是:

    1. 後端呼叫 Gemini 創建 job,拿到 job_id
    2. 立刻回傳給前端
    3. 等 Gemini 的 Webhook 打回來時,再更新 DB / 推訊息給前端

    這跟 Stripe、GitHub 的 Webhook 類似,但 LLM 任務的執行時間級距更大(秒 → 分鐘),而且往往輸出結果非常大,所以事件節點設計與重試策略尤其關鍵。

    💡 關鍵: LLM 任務可從「秒級」到「分鐘級」,用 Webhook 讓前端先回應、結果再補推,是改善體驗與穩定性的關鍵設計

    2. 可靠傳遞:重試機制與簽名驗證

    Gemini Webhooks 一般會具備:

    • 重試機制:如果你的 Webhook handler 回傳非 2xx,Google 會在一段時間內重試。你必須設計 handler 為冪等:同一個 event_id 打多次也不會重複處理。
    • 簽名驗證:Header 中會帶類似 X-Goog-Signature 的簽名,內容由 timestamp + body 使用 Google 公鑰/金鑰驗證。你必須:
    • 驗證 timestamp(避免重放攻擊)
    • 使用官方提供的 public key / JWKS 驗簽

    與 Stripe/GitHub 的差異在於:事件 payload 裡通常會內嵌部分結果或結果位置(例如 GCS 路徑、job result id),你要能快速把這些資訊導到後續 pipeline(儲存、後處理、通知)。

    3. 典型架構:前端提交通用長任務 + Webhook 回寫結果

    一個實際可落地的架構可以長這樣:

    1. 前端
    2. 呼叫你的後端 POST /tasks,附上:

      • 檔案位置(GCS / S3 URL)或上傳檔案 ID
      • 任務類型(例如 rag_summarycode_batch_review
    3. 後端 API(同步路徑)

    4. 在 DB 建一筆 tasks 紀錄(狀態 pending
    5. 呼叫 Gemini Async API(例如:POST /v1/videos:asyncGenerate)並設定 webhook_config
    6. job_id 寫回 DB,回應前端:

      json
      { "task_id": "t_123", "job_id": "g_job_abc", "status": "pending" }

    7. Gemini Webhook → 你的 Webhook handler

    8. 收到 job.succeededjob.failed 事件
    9. 驗簽、判斷是否已處理
    10. 更新 DB 的 tasks.status,並把結果丟進 message queue(例如 Pub/Sub / Kafka / SQS

    11. 背景 worker

    12. 從 queue 拉到「任務完成」事件
    13. 下載/讀取 LLM 結果,做後處理(切 chunk、寫向量 DB、生成報表 PDF 等)
    14. 通知前端(WebSocket / SSE / push)

    重點是:Webhook handler 要極瘦,只負責驗證 + enqueue,不做 heavy work,避免阻塞導致重試暴增。


    實作範例:簽名驗證、冪等與背景 worker

    以下用 Node.js / Go / Python 各給一個實作骨架,重點在「怎麼安全又穩定地吃 Webhook」。

    Node.js(Express)示意

    import express from 'express';
    import crypto from 'crypto';
    
    const app = express();
    app.use(express.raw({ type: 'application/json' })); // 保留原始 body
    
    function verifyGoogleSignature(req) {
      const signature = req.header('X-Goog-Signature');
      const timestamp = req.header('X-Goog-Timestamp');
      const body = req.body; // Buffer
    
      if (!signature || !timestamp) return false;
    
      const now = Math.floor(Date.now() / 1000);
      if (Math.abs(now - Number(timestamp)) > 300) return false; // 5 分鐘窗口
    
      const expected = crypto
        .createHmac('sha256', process.env.GOOGLE_WEBHOOK_SECRET)
        .update(timestamp + '.' + body.toString('utf8'))
        .digest('base64');
    
      return crypto.timingSafeEqual(Buffer.from(signature), Buffer.from(expected));
    }
    
    app.post('/webhooks/gemini', async (req, res) => {
      if (!verifyGoogleSignature(req)) {
        return res.status(400).send('invalid signature');
      }
    
      const event = JSON.parse(req.body.toString('utf8'));
      const { id: eventId, type, data } = event;
    
      // 冪等:如果 eventId 已處理過,直接回 200
      const exists = await hasEventProcessed(eventId);
      if (exists) return res.status(200).send('ok');
    
      await markEventProcessed(eventId);
    
      // 僅 enqueue,不做 heavy work
      await enqueueToQueue({ type, data });
    
      res.status(200).send('ok');
    });
    
    app.listen(3000);
    

    關鍵:

    • express.raw 保留原始 body,驗簽才不會失真
    • 使用 timingSafeEqual 避免時間側信道
    • hasEventProcessed / markEventProcessed 通常用 DB / Redis 實作 event_id 去重

    Go(net/http)示意

    func verifyGoogleSignature(r *http.Request, body []byte) bool {
      sig := r.Header.Get("X-Goog-Signature")
      ts := r.Header.Get("X-Goog-Timestamp")
      if sig == "" || ts == "" {
        return false
      }
    
      // 5 分鐘窗口
      tsInt, _ := strconv.ParseInt(ts, 10, 64)
      if math.Abs(float64(time.Now().Unix()-tsInt)) > 300 {
        return false
      }
    
      mac := hmac.New(sha256.New, []byte(os.Getenv("GOOGLE_WEBHOOK_SECRET")))
      mac.Write([]byte(ts + "." + string(body)))
      expected := base64.StdEncoding.EncodeToString(mac.Sum(nil))
    
      return hmac.Equal([]byte(sig), []byte(expected))
    }
    
    func geminiWebhookHandler(w http.ResponseWriter, r *http.Request) {
      body, _ := io.ReadAll(r.Body)
      defer r.Body.Close()
    
      if !verifyGoogleSignature(r, body) {
        w.WriteHeader(http.StatusBadRequest)
        return
      }
    
      var event GeminiEvent
      if err := json.Unmarshal(body, &event); err != nil {
        w.WriteHeader(http.StatusBadRequest)
        return
      }
    
      if alreadyProcessed(event.ID) {
        w.WriteHeader(http.StatusOK)
        return
      }
      markProcessed(event.ID)
      enqueue(event)
    
      w.WriteHeader(http.StatusOK)
    }
    

    Python(FastAPI)示意

    from fastapi import FastAPI, Request, Header, HTTPException
    import hmac, hashlib, base64, time, json
    
    app = FastAPI()
    
    SECRET = b"your-google-webhook-secret"
    
    def verify_signature(sig: str, ts: str, body: bytes) -> bool:
      if not sig or not ts:
        return False
      now = int(time.time())
      if abs(now - int(ts)) > 300:
        return False
      mac = hmac.new(SECRET, f"{ts}.".encode() + body, hashlib.sha256)
      expected = base64.b64encode(mac.digest()).decode()
      return hmac.compare_digest(sig, expected)
    
    @app.post("/webhooks/gemini")
    async def gemini_webhook(request: Request,
                             x_goog_signature: str = Header(None),
                             x_goog_timestamp: str = Header(None)):
      body = await request.body()
      if not verify_signature(x_goog_signature, x_goog_timestamp, body):
        raise HTTPException(status_code=400, detail="invalid signature")
    
      event = json.loads(body)
      event_id = event["id"]
    
      if await is_processed(event_id):
        return {"status": "ok"}
    
      await mark_processed(event_id)
      await enqueue_event(event)
      return {"status": "ok"}
    

    這三個例子都符合同樣原則:Webhook handler = 驗簽 + 去重 + enqueue,真正重的事丟給 worker。


    建議與注意事項:把坑踩在實驗環境就好

    1. 超時與錯誤恢復策略

    • Webhook handler 的處理時間要控制在幾百毫秒內,超時會觸發 Google 重試。
    • 重試代表同一事件可能會打多次,你必須:
    • event_id + 唯一索引,確保不會重複更新任務
    • 在 worker 中也做去重(例如用 task 的狀態機 pending → processing → done

    2. 任務去重與「鬼任務」

    典型問題:

    • 前端連點送出,後端重複創建 job → 成本爆炸
    • Webhook 僅回傳 job 狀態,但你找不到對應的 tenant / 任務

    建議:

    • 在 DB 給 client_request_id 加 unique constraint,後端收到同一個 client_request_id 時直接回已存在的 task_id / job_id
    • tenant_iduser_idtrace_id 放進你自己的 tasks table,收到 Webhook 時用 job_id join 回 tasks 而不是在 payload 裡亂猜

    3. 安全:防止偽造 Webhook 與濫用

    • 一律使用 簽名驗證,不要只靠 IP 白名單
    • 驗證 timestamp,避免攻擊者重放舊事件
    • Webhook URL 單獨使用 domain / path,不與前台共用,方便 WAF 規則收斂
    • 若採多租戶 SaaS:
    • tasks 表一定要有 tenant_id,所有查詢都要帶 tenant scope
    • 配額計算(token、任務次數、併發 job 數)也要按 tenant_id 統計
    • 錯誤事件(job.failed)要能映射回 tenant,避免 debug 時完全看不懂是誰的錯

    4. 與現有框架整合:RAG、工作流、Serverless

    • 自建 RAG pipeline
    • 把「文件上傳 → 向量化 → 寫入 vector DB」拆成多個任務
    • Gemini Webhook 只負責第一段(例如大檔整理+摘要),之後再觸發你既有的向量化 pipeline

    • 工作流引擎(Airflow、Temporal、Prefect)

    • Webhook handler enqueue 到 workflow engine queue
    • 在 workflow 裡設一個「等待 LLM 任務完成」的 node,node 的觸發由 Webhook 完成,而不是 cron / polling

    • Serverless(Cloud Run / Cloud Functions / Lambda)

    • Webhook handler 非常適合跑在 Serverless 上,因為多數時間是 idle
    • heavy worker 則可以獨立部署在 long-running service(K8s)或另一個 queue consumer 上

    結論:

    如果你的 LLM 產品裡已經出現「API 會卡 1 分鐘」「前端一直轉圈」「後端一堆 polling job 消耗資源」這些症狀,Gemini Webhooks 是目前最合理的重構切入點。把長任務抽象成 job + 事件 + worker 三件事,你可以:

    • 提升使用者體驗(提交即回應、狀態即時更新)
    • 降低後端長連線壓力與無意義 polling 成本
    • 更容易在多租戶 SaaS 中做配額控制與隔離

    從先把 Webhook handler 寫瘦、寫穩開始,你的長任務 LLM 系統就會開始好用很多。

    🚀 你現在可以做的事

    • 在現有後端加一個 /webhooks/gemini endpoint,實作驗簽+冪等邏輯
    • 把目前同步長任務改成「建立 job → 透過 Webhook 回寫結果」的流程
    • 選一個 queue(例如 Pub/SubKafkaSQS)接 Webhook 事件,啟動獨立背景 worker 處理重任務