from kallima import WebhookVerifier, WebhookSignatureError
verifier = WebhookVerifier(secret="whsec_...")
# In your request handler (framework-agnostic):
def handle_webhook(headers: dict, raw_body: bytes) -> None:
try:
event = verifier.verify(
payload=raw_body,
signature=headers["X-Kallima-Signature"],
timestamp=headers["X-Kallima-Timestamp"],
)
except WebhookSignatureError:
return # reject — invalid or replayed request
if event["type"] == "job.completed":
job_id = event["data"]["id"]
print(f"Job {job_id} finished")