# Webhooks and Event Streaming — Specification **Workstream**: 5 of 6 **Phase**: 3 — Enterprise **Author**: Virtual Architect **Date**: 2026-03-29 --- ## Overview Real-time event notifications for agent lifecycle events via HTTP webhooks. Operators create webhook subscriptions specifying a target URL, the events they want to receive, and a secret for HMAC-SHA256 signature verification. Delivery is asynchronous via a Redis-backed `bull` queue with exponential backoff retry (max 10 attempts). All deliveries are logged for observability. Supported events: `agent.created`, `agent.updated`, `agent.suspended`, `agent.reactivated`, `agent.decommissioned`, `credential.generated`, `credential.rotated`, `credential.revoked`, `token.issued`, `token.revoked`. An optional Kafka/NATS adapter enables high-throughput event streaming alongside webhook delivery. --- ## API Endpoints ### POST /webhooks Create a new webhook subscription. Requires `agents:write` scope. ```yaml POST /webhooks Authorization: Bearer Content-Type: application/json Request Body: schema: type: object required: [url, events, secret] properties: url: type: string format: uri description: HTTPS endpoint to deliver events to example: "https://app.example.com/hooks/agentidp" events: type: array items: type: string enum: - agent.created - agent.updated - agent.suspended - agent.reactivated - agent.decommissioned - credential.generated - credential.rotated - credential.revoked - token.issued - token.revoked - "*" minItems: 1 description: List of event types to subscribe to. Use ["*"] to subscribe to all events. example: ["agent.created", "credential.rotated"] secret: type: string minLength: 16 description: Secret used to compute HMAC-SHA256 signature. Store securely — it is returned only once. example: "whsec_super_secret_value_here" description: type: string maxLength: 255 description: Optional human-readable description for this subscription active: type: boolean default: true Responses: 201 Created: schema: $ref: '#/components/schemas/WebhookSubscription' example: subscriptionId: "wh_01HXK7Z9P3FKWABCDEF55555" organizationId: "org_01HXK7Z9P3FKWABCDEF12345" url: "https://app.example.com/hooks/agentidp" events: ["agent.created", "credential.rotated"] description: "Production event sink" active: true createdAt: "2026-03-29T12:00:00Z" updatedAt: "2026-03-29T12:00:00Z" 400 Bad Request: schema: $ref: '#/components/schemas/ErrorResponse' examples: invalid_url: code: "VALIDATION_ERROR" message: "url must be a valid HTTPS URI" invalid_event: code: "VALIDATION_ERROR" message: "Unknown event type: agent.unknown" 401 Unauthorized: schema: $ref: '#/components/schemas/ErrorResponse' 403 Forbidden: schema: $ref: '#/components/schemas/ErrorResponse' ``` --- ### GET /webhooks List webhook subscriptions for the caller's organization. Requires `agents:read` scope. ```yaml GET /webhooks Authorization: Bearer Query Parameters: active: type: boolean description: Filter by active/inactive subscriptions page: type: integer default: 1 limit: type: integer default: 20 maximum: 100 Responses: 200 OK: schema: type: object properties: data: type: array items: $ref: '#/components/schemas/WebhookSubscription' total: type: integer page: type: integer limit: type: integer 401 Unauthorized: schema: $ref: '#/components/schemas/ErrorResponse' 403 Forbidden: schema: $ref: '#/components/schemas/ErrorResponse' ``` --- ### GET /webhooks/:id Get a single webhook subscription. Requires `agents:read` scope. ```yaml GET /webhooks/{subscriptionId} Authorization: Bearer Path Parameters: subscriptionId: type: string Responses: 200 OK: schema: $ref: '#/components/schemas/WebhookSubscription' 401 Unauthorized: schema: $ref: '#/components/schemas/ErrorResponse' 403 Forbidden: schema: $ref: '#/components/schemas/ErrorResponse' 404 Not Found: schema: $ref: '#/components/schemas/ErrorResponse' example: code: "WEBHOOK_NOT_FOUND" message: "Webhook subscription not found" ``` --- ### PATCH /webhooks/:id Update a webhook subscription (e.g., pause/resume, change events). Requires `agents:write` scope. ```yaml PATCH /webhooks/{subscriptionId} Authorization: Bearer Content-Type: application/json Request Body: schema: type: object properties: url: type: string format: uri events: type: array items: type: string description: type: string maxLength: 255 active: type: boolean Responses: 200 OK: schema: $ref: '#/components/schemas/WebhookSubscription' 400 Bad Request: schema: $ref: '#/components/schemas/ErrorResponse' 401 Unauthorized: schema: $ref: '#/components/schemas/ErrorResponse' 403 Forbidden: schema: $ref: '#/components/schemas/ErrorResponse' 404 Not Found: schema: $ref: '#/components/schemas/ErrorResponse' ``` --- ### DELETE /webhooks/:id Delete a webhook subscription. Requires `agents:write` scope. ```yaml DELETE /webhooks/{subscriptionId} Authorization: Bearer Responses: 204 No Content: {} 401 Unauthorized: schema: $ref: '#/components/schemas/ErrorResponse' 403 Forbidden: schema: $ref: '#/components/schemas/ErrorResponse' 404 Not Found: schema: $ref: '#/components/schemas/ErrorResponse' ``` --- ### GET /webhooks/:id/deliveries List delivery attempts for a specific webhook subscription. Requires `agents:read` scope. ```yaml GET /webhooks/{subscriptionId}/deliveries Authorization: Bearer Query Parameters: status: type: string enum: [pending, success, failed, dead_letter] eventType: type: string description: Filter by event type fromDate: type: string format: date-time toDate: type: string format: date-time page: type: integer default: 1 limit: type: integer default: 50 maximum: 200 Responses: 200 OK: schema: type: object properties: data: type: array items: $ref: '#/components/schemas/WebhookDelivery' total: type: integer page: type: integer limit: type: integer example: data: - deliveryId: "del_01HXK7Z9P3FKWABCDEF77777" subscriptionId: "wh_01HXK7Z9P3FKWABCDEF55555" eventType: "agent.created" eventId: "evt_01HXK7Z9P3FKWABCDEF99999" status: "success" httpStatusCode: 200 attemptCount: 1 nextRetryAt: null deliveredAt: "2026-03-29T12:00:05Z" createdAt: "2026-03-29T12:00:00Z" total: 1 page: 1 limit: 50 401 Unauthorized: schema: $ref: '#/components/schemas/ErrorResponse' 404 Not Found: schema: $ref: '#/components/schemas/ErrorResponse' ``` --- ## Webhook Payload Format Every webhook delivery uses this envelope format: ```json { "id": "evt_01HXK7Z9P3FKWABCDEF99999", "type": "agent.created", "organizationId": "org_01HXK7Z9P3FKWABCDEF12345", "timestamp": "2026-03-29T12:00:00Z", "data": { "agentId": "agt_01HXK7Z9P3FKWABCDEF67890", "agentType": "orchestrator", "status": "active", "owner": "acme-ai", "version": "1.0.0", "deploymentEnv": "production" } } ``` ### HMAC-SHA256 Signature Every delivery includes the following HTTP headers: ``` X-AgentIdP-Event: agent.created X-AgentIdP-Delivery-Id: del_01HXK7Z9P3FKWABCDEF77777 X-AgentIdP-Timestamp: 1743249600 X-AgentIdP-Signature-256: sha256= ``` Signature computation: ``` signed_content = timestamp + "." + JSON.stringify(payload) signature = HMAC-SHA256(secret, signed_content) header_value = "sha256=" + hex(signature) ``` --- ## Database Schema Changes ### New Table: webhook_subscriptions ```sql CREATE TABLE webhook_subscriptions ( subscription_id VARCHAR(40) PRIMARY KEY, organization_id VARCHAR(40) NOT NULL REFERENCES organizations(organization_id), url VARCHAR(2048) NOT NULL, events JSONB NOT NULL DEFAULT '[]', secret_hash VARCHAR(255) NOT NULL, -- bcrypt hash of secret; plain text stored in Vault vault_secret_path VARCHAR(255) NOT NULL, description VARCHAR(255), active BOOLEAN NOT NULL DEFAULT TRUE, failure_count INTEGER NOT NULL DEFAULT 0, last_delivery_at TIMESTAMPTZ, created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW() ); CREATE INDEX idx_webhook_subs_org_id ON webhook_subscriptions(organization_id); CREATE INDEX idx_webhook_subs_active ON webhook_subscriptions(active) WHERE active = TRUE; ``` ### New Table: webhook_deliveries ```sql CREATE TABLE webhook_deliveries ( delivery_id VARCHAR(40) PRIMARY KEY, subscription_id VARCHAR(40) NOT NULL REFERENCES webhook_subscriptions(subscription_id), organization_id VARCHAR(40) NOT NULL REFERENCES organizations(organization_id), event_id VARCHAR(40) NOT NULL, event_type VARCHAR(100) NOT NULL, payload JSONB NOT NULL, status VARCHAR(20) NOT NULL DEFAULT 'pending', http_status_code SMALLINT, response_body TEXT, attempt_count SMALLINT NOT NULL DEFAULT 0, next_retry_at TIMESTAMPTZ, delivered_at TIMESTAMPTZ, created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), CONSTRAINT webhook_deliveries_status_check CHECK (status IN ('pending', 'success', 'failed', 'dead_letter')) ); CREATE INDEX idx_webhook_deliveries_sub_id ON webhook_deliveries(subscription_id); CREATE INDEX idx_webhook_deliveries_status ON webhook_deliveries(status); CREATE INDEX idx_webhook_deliveries_org_id ON webhook_deliveries(organization_id); CREATE INDEX idx_webhook_deliveries_created ON webhook_deliveries(created_at); ``` --- ## Retry Schedule ``` Attempt 1: immediate Attempt 2: 1 minute after failure Attempt 3: 5 minutes after failure Attempt 4: 15 minutes after failure Attempt 5: 1 hour after failure Attempt 6: 4 hours after failure Attempt 7: 12 hours after failure Attempt 8: 24 hours after failure Attempt 9: 48 hours after failure Attempt 10: 72 hours after failure After attempt 10: status = dead_letter; operator alerted via Prometheus metric ``` --- ## Configuration | Environment Variable | Description | Default | |---------------------|-------------|---------| | `WEBHOOKS_ENABLED` | Enable webhook functionality | `true` | | `WEBHOOK_DELIVERY_TIMEOUT_MS` | HTTP delivery request timeout | `10000` | | `WEBHOOK_MAX_RETRIES` | Maximum delivery attempts before dead-letter | `10` | | `WEBHOOK_WORKER_CONCURRENCY` | Number of concurrent delivery workers | `5` | | `KAFKA_BROKERS` | Comma-separated Kafka broker list (optional; activates Kafka adapter) | `""` | | `KAFKA_TOPIC_PREFIX` | Prefix for Kafka topic names | `agentidp` | | `NATS_URL` | NATS server URL (optional; activates NATS adapter) | `""` | --- ## Dependencies | Package | Version | Purpose | |---------|---------|---------| | `bull` | `^4.16.3` | Redis-backed async job queue for webhook delivery | | `kafkajs` | `^2.2.4` | Kafka producer adapter (optional) | --- ## Security Considerations - Webhook secrets are stored in Vault; only a bcrypt hash is in PostgreSQL for in-memory comparison - All deliveries must be to HTTPS endpoints — HTTP endpoints are rejected at subscription creation - Private/internal IP ranges (RFC 1918, loopback) are blocked at delivery time to prevent SSRF - HMAC signature allows the receiving server to verify the delivery is authentic - Replay attacks are mitigated by including a timestamp in the signed content; receivers should reject deliveries with timestamps older than 5 minutes - Dead-letter events generate a Prometheus metric `agentidp_webhook_dead_letters_total` for alerting --- ## Acceptance Criteria - [ ] `POST /webhooks` creates a subscription; secret stored in Vault, not returned after creation - [ ] Webhook delivery occurs within 30 seconds of event generation for healthy subscribers - [ ] Delivery includes correct `X-AgentIdP-Signature-256` header verifiable with provided secret - [ ] Failed delivery is retried per schedule; status updates in `webhook_deliveries` table - [ ] After max retries, status is `dead_letter` and metric is incremented - [ ] Delivery to HTTP (non-HTTPS) URL is rejected at subscription creation - [ ] Delivery to private IP range is rejected (SSRF protection) - [ ] `GET /webhooks/:id/deliveries` returns accurate delivery history - [ ] TypeScript strict, zero `any`, >80% test coverage on WebhookService