用 Gemini Webhooks 正確打開長任務 LLM

用 Gemini Webhooks 正確打開長任務 LLM

📌 本文重點

  • 用 Webhook 取代 polling,解決長任務阻塞
  • Webhook handler 要做到驗簽、冪等、極瘦
  • 任務抽象成 job+事件+worker,易於擴充與控管

長任務 LLM(大檔案 RAG 摘要、批量程式碼分析、批次生成報表)現在普遍體驗很爛,核心原因通常只有兩個字:阻塞

傳統做法是:前端打一個同步 API,後端卡在那裡等 LLM 完成,或者先回 job_id,然後一直 polling 查狀態。前者直接拖垮後端連線與 gateway timeout,後者則是浪費資源+狀態更新延遲。Gemini Webhooks 正在解這個痛:把長任務變成事件驅動的推送流程,讓你的系統只在「有事發生」時醒來,而不是每 5 秒在那邊瞎問「好了沒?」。


重點說明:為什麼要用 Gemini Webhooks

💡 關鍵: 用事件驅動模式取代頻繁 polling,可同時避免 gateway timeout 和後端資源被無意義查詢拖垮

1. 事件驅動:用任務狀態變更取代 polling

在 Gemini API 中,長任務(例如 videos:asyncGenerate、大型批次推理)完成時,會透過 Webhooks 主動打到你註冊的 URL。概念上會有類似以下事件:

  • job.created:任務建立成功
  • job.running:模型開始處理
  • job.succeeded:任務完成,可讀取結果
  • job.failed:任務失敗,附錯誤碼

你不再需要 setInterval 去拉 GET /jobs/{id},而是:

  1. 後端呼叫 Gemini 創建 job,拿到 job_id
  2. 立刻回傳給前端
  3. 等 Gemini 的 Webhook 打回來時,再更新 DB / 推訊息給前端

這跟 Stripe、GitHub 的 Webhook 類似,但 LLM 任務的執行時間級距更大(秒 → 分鐘),而且往往輸出結果非常大,所以事件節點設計與重試策略尤其關鍵。

💡 關鍵: LLM 任務可從「秒級」到「分鐘級」,用 Webhook 讓前端先回應、結果再補推,是改善體驗與穩定性的關鍵設計

2. 可靠傳遞:重試機制與簽名驗證

Gemini Webhooks 一般會具備:

  • 重試機制:如果你的 Webhook handler 回傳非 2xx,Google 會在一段時間內重試。你必須設計 handler 為冪等:同一個 event_id 打多次也不會重複處理。
  • 簽名驗證:Header 中會帶類似 X-Goog-Signature 的簽名,內容由 timestamp + body 使用 Google 公鑰/金鑰驗證。你必須:
  • 驗證 timestamp(避免重放攻擊)
  • 使用官方提供的 public key / JWKS 驗簽

與 Stripe/GitHub 的差異在於:事件 payload 裡通常會內嵌部分結果或結果位置(例如 GCS 路徑、job result id),你要能快速把這些資訊導到後續 pipeline(儲存、後處理、通知)。

3. 典型架構:前端提交通用長任務 + Webhook 回寫結果

一個實際可落地的架構可以長這樣:

  1. 前端
  2. 呼叫你的後端 POST /tasks,附上:

    • 檔案位置(GCS / S3 URL)或上傳檔案 ID
    • 任務類型(例如 rag_summarycode_batch_review
  3. 後端 API(同步路徑)

  4. 在 DB 建一筆 tasks 紀錄(狀態 pending
  5. 呼叫 Gemini Async API(例如:POST /v1/videos:asyncGenerate)並設定 webhook_config
  6. job_id 寫回 DB,回應前端:

    json
    { "task_id": "t_123", "job_id": "g_job_abc", "status": "pending" }

  7. Gemini Webhook → 你的 Webhook handler

  8. 收到 job.succeededjob.failed 事件
  9. 驗簽、判斷是否已處理
  10. 更新 DB 的 tasks.status,並把結果丟進 message queue(例如 Pub/Sub / Kafka / SQS

  11. 背景 worker

  12. 從 queue 拉到「任務完成」事件
  13. 下載/讀取 LLM 結果,做後處理(切 chunk、寫向量 DB、生成報表 PDF 等)
  14. 通知前端(WebSocket / SSE / push)

重點是:Webhook handler 要極瘦,只負責驗證 + enqueue,不做 heavy work,避免阻塞導致重試暴增。


實作範例:簽名驗證、冪等與背景 worker

以下用 Node.js / Go / Python 各給一個實作骨架,重點在「怎麼安全又穩定地吃 Webhook」。

Node.js(Express)示意

import express from 'express';
import crypto from 'crypto';

const app = express();
app.use(express.raw({ type: 'application/json' })); // 保留原始 body

function verifyGoogleSignature(req) {
  const signature = req.header('X-Goog-Signature');
  const timestamp = req.header('X-Goog-Timestamp');
  const body = req.body; // Buffer

  if (!signature || !timestamp) return false;

  const now = Math.floor(Date.now() / 1000);
  if (Math.abs(now - Number(timestamp)) > 300) return false; // 5 分鐘窗口

  const expected = crypto
    .createHmac('sha256', process.env.GOOGLE_WEBHOOK_SECRET)
    .update(timestamp + '.' + body.toString('utf8'))
    .digest('base64');

  return crypto.timingSafeEqual(Buffer.from(signature), Buffer.from(expected));
}

app.post('/webhooks/gemini', async (req, res) => {
  if (!verifyGoogleSignature(req)) {
    return res.status(400).send('invalid signature');
  }

  const event = JSON.parse(req.body.toString('utf8'));
  const { id: eventId, type, data } = event;

  // 冪等:如果 eventId 已處理過,直接回 200
  const exists = await hasEventProcessed(eventId);
  if (exists) return res.status(200).send('ok');

  await markEventProcessed(eventId);

  // 僅 enqueue,不做 heavy work
  await enqueueToQueue({ type, data });

  res.status(200).send('ok');
});

app.listen(3000);

關鍵:

  • express.raw 保留原始 body,驗簽才不會失真
  • 使用 timingSafeEqual 避免時間側信道
  • hasEventProcessed / markEventProcessed 通常用 DB / Redis 實作 event_id 去重

Go(net/http)示意

func verifyGoogleSignature(r *http.Request, body []byte) bool {
  sig := r.Header.Get("X-Goog-Signature")
  ts := r.Header.Get("X-Goog-Timestamp")
  if sig == "" || ts == "" {
    return false
  }

  // 5 分鐘窗口
  tsInt, _ := strconv.ParseInt(ts, 10, 64)
  if math.Abs(float64(time.Now().Unix()-tsInt)) > 300 {
    return false
  }

  mac := hmac.New(sha256.New, []byte(os.Getenv("GOOGLE_WEBHOOK_SECRET")))
  mac.Write([]byte(ts + "." + string(body)))
  expected := base64.StdEncoding.EncodeToString(mac.Sum(nil))

  return hmac.Equal([]byte(sig), []byte(expected))
}

func geminiWebhookHandler(w http.ResponseWriter, r *http.Request) {
  body, _ := io.ReadAll(r.Body)
  defer r.Body.Close()

  if !verifyGoogleSignature(r, body) {
    w.WriteHeader(http.StatusBadRequest)
    return
  }

  var event GeminiEvent
  if err := json.Unmarshal(body, &event); err != nil {
    w.WriteHeader(http.StatusBadRequest)
    return
  }

  if alreadyProcessed(event.ID) {
    w.WriteHeader(http.StatusOK)
    return
  }
  markProcessed(event.ID)
  enqueue(event)

  w.WriteHeader(http.StatusOK)
}

Python(FastAPI)示意

from fastapi import FastAPI, Request, Header, HTTPException
import hmac, hashlib, base64, time, json

app = FastAPI()

SECRET = b"your-google-webhook-secret"

def verify_signature(sig: str, ts: str, body: bytes) -> bool:
  if not sig or not ts:
    return False
  now = int(time.time())
  if abs(now - int(ts)) > 300:
    return False
  mac = hmac.new(SECRET, f"{ts}.".encode() + body, hashlib.sha256)
  expected = base64.b64encode(mac.digest()).decode()
  return hmac.compare_digest(sig, expected)

@app.post("/webhooks/gemini")
async def gemini_webhook(request: Request,
                         x_goog_signature: str = Header(None),
                         x_goog_timestamp: str = Header(None)):
  body = await request.body()
  if not verify_signature(x_goog_signature, x_goog_timestamp, body):
    raise HTTPException(status_code=400, detail="invalid signature")

  event = json.loads(body)
  event_id = event["id"]

  if await is_processed(event_id):
    return {"status": "ok"}

  await mark_processed(event_id)
  await enqueue_event(event)
  return {"status": "ok"}

這三個例子都符合同樣原則:Webhook handler = 驗簽 + 去重 + enqueue,真正重的事丟給 worker。


建議與注意事項:把坑踩在實驗環境就好

1. 超時與錯誤恢復策略

  • Webhook handler 的處理時間要控制在幾百毫秒內,超時會觸發 Google 重試。
  • 重試代表同一事件可能會打多次,你必須:
  • event_id + 唯一索引,確保不會重複更新任務
  • 在 worker 中也做去重(例如用 task 的狀態機 pending → processing → done

2. 任務去重與「鬼任務」

典型問題:

  • 前端連點送出,後端重複創建 job → 成本爆炸
  • Webhook 僅回傳 job 狀態,但你找不到對應的 tenant / 任務

建議:

  • 在 DB 給 client_request_id 加 unique constraint,後端收到同一個 client_request_id 時直接回已存在的 task_id / job_id
  • tenant_iduser_idtrace_id 放進你自己的 tasks table,收到 Webhook 時用 job_id join 回 tasks 而不是在 payload 裡亂猜

3. 安全:防止偽造 Webhook 與濫用

  • 一律使用 簽名驗證,不要只靠 IP 白名單
  • 驗證 timestamp,避免攻擊者重放舊事件
  • Webhook URL 單獨使用 domain / path,不與前台共用,方便 WAF 規則收斂
  • 若採多租戶 SaaS:
  • tasks 表一定要有 tenant_id,所有查詢都要帶 tenant scope
  • 配額計算(token、任務次數、併發 job 數)也要按 tenant_id 統計
  • 錯誤事件(job.failed)要能映射回 tenant,避免 debug 時完全看不懂是誰的錯

4. 與現有框架整合:RAG、工作流、Serverless

  • 自建 RAG pipeline
  • 把「文件上傳 → 向量化 → 寫入 vector DB」拆成多個任務
  • Gemini Webhook 只負責第一段(例如大檔整理+摘要),之後再觸發你既有的向量化 pipeline

  • 工作流引擎(Airflow、Temporal、Prefect)

  • Webhook handler enqueue 到 workflow engine queue
  • 在 workflow 裡設一個「等待 LLM 任務完成」的 node,node 的觸發由 Webhook 完成,而不是 cron / polling

  • Serverless(Cloud Run / Cloud Functions / Lambda)

  • Webhook handler 非常適合跑在 Serverless 上,因為多數時間是 idle
  • heavy worker 則可以獨立部署在 long-running service(K8s)或另一個 queue consumer 上

結論:

如果你的 LLM 產品裡已經出現「API 會卡 1 分鐘」「前端一直轉圈」「後端一堆 polling job 消耗資源」這些症狀,Gemini Webhooks 是目前最合理的重構切入點。把長任務抽象成 job + 事件 + worker 三件事,你可以:

  • 提升使用者體驗(提交即回應、狀態即時更新)
  • 降低後端長連線壓力與無意義 polling 成本
  • 更容易在多租戶 SaaS 中做配額控制與隔離

從先把 Webhook handler 寫瘦、寫穩開始,你的長任務 LLM 系統就會開始好用很多。

🚀 你現在可以做的事

  • 在現有後端加一個 /webhooks/gemini endpoint,實作驗簽+冪等邏輯
  • 把目前同步長任務改成「建立 job → 透過 Webhook 回寫結果」的流程
  • 選一個 queue(例如 Pub/SubKafkaSQS)接 Webhook 事件,啟動獨立背景 worker 處理重任務

留言

發佈留言

發佈留言必須填寫的電子郵件地址不會公開。 必填欄位標示為 *