diff --git a/openspec/changes/phase-3-enterprise/tasks.md b/openspec/changes/phase-3-enterprise/tasks.md index d6436b9..7d0a443 100644 --- a/openspec/changes/phase-3-enterprise/tasks.md +++ b/openspec/changes/phase-3-enterprise/tasks.md @@ -1,6 +1,6 @@ # Phase 3: Enterprise — Tasks -**Status**: In Progress — WS1, WS2, WS3, WS4 complete +**Status**: In Progress — WS1, WS2, WS3, WS4, WS5 complete ## CEO Approval Gates (required before implementation) @@ -90,23 +90,23 @@ ## Workstream 5: Webhooks and Event Streaming -- [ ] 5.1 Write `src/db/migrations/016_create_webhook_subscriptions_table.sql` — webhook_subscriptions with url, events JSONB, secret_hash, vault_secret_path, active, failure_count -- [ ] 5.2 Write `src/db/migrations/017_create_webhook_deliveries_table.sql` — webhook_deliveries with status, http_status_code, attempt_count, next_retry_at -- [ ] 5.3 Write `src/types/webhook.ts` — IWebhookSubscription, ICreateWebhookRequest, IWebhookDelivery, IWebhookPayload, WebhookEventType interfaces -- [ ] 5.4 Write `src/services/WebhookService.ts` — createSubscription (store secret in Vault), listSubscriptions, getSubscription, updateSubscription, deleteSubscription, listDeliveries -- [ ] 5.5 Write `src/workers/WebhookDeliveryWorker.ts` — bull queue worker: fetch subscription, compute HMAC-SHA256 signature, POST to URL with headers, update delivery status, schedule retry on failure -- [ ] 5.6 Write `src/services/EventPublisher.ts` — buildEventPayload, publishEvent (enqueues to bull queue; also produces to Kafka if KAFKA_BROKERS is set) -- [ ] 5.7 Update `src/services/AgentService.ts` — call EventPublisher.publishEvent for: agent.created, agent.updated, agent.suspended, agent.reactivated, agent.decommissioned -- [ ] 5.8 Update `src/services/CredentialService.ts` — call EventPublisher.publishEvent for: credential.generated, credential.rotated, credential.revoked -- [ ] 5.9 Update `src/services/OAuth2Service.ts` — call EventPublisher.publishEvent for: token.issued, token.revoked -- [ ] 5.10 Write `src/controllers/WebhookController.ts` — handlers for all 6 webhook endpoints -- [ ] 5.11 Write `src/routes/webhooks.ts` — mount all 6 webhook endpoints with correct scope guards -- [ ] 5.12 Implement SSRF protection in WebhookDeliveryWorker — reject delivery to RFC 1918 addresses, loopback, and link-local ranges -- [ ] 5.13 Implement dead-letter handling — after max retries, set status to dead_letter and increment `agentidp_webhook_dead_letters_total` Prometheus metric -- [ ] 5.14 Write `src/adapters/KafkaAdapter.ts` — optional Kafka producer; activated only when KAFKA_BROKERS env var is set -- [ ] 5.15 Write unit tests for WebhookService, WebhookDeliveryWorker, EventPublisher — HMAC computation, retry schedule, dead-letter logic -- [ ] 5.16 Write integration tests — create subscription, trigger an event, verify delivery; verify SSRF rejection; verify retry on 5xx response -- [ ] 5.17 QA sign-off: HMAC verifiable, SSRF protection active, retry schedule correct, dead-letter metric fires, zero `any`, >80% coverage +- [x] 5.1 Write `src/db/migrations/016_create_webhook_subscriptions_table.sql` — webhook_subscriptions with url, events JSONB, secret_hash, vault_secret_path, active, failure_count +- [x] 5.2 Write `src/db/migrations/017_create_webhook_deliveries_table.sql` — webhook_deliveries with status, http_status_code, attempt_count, next_retry_at +- [x] 5.3 Write `src/types/webhook.ts` — IWebhookSubscription, ICreateWebhookRequest, IWebhookDelivery, IWebhookPayload, WebhookEventType interfaces +- [x] 5.4 Write `src/services/WebhookService.ts` — createSubscription (store secret in Vault), listSubscriptions, getSubscription, updateSubscription, deleteSubscription, listDeliveries +- [x] 5.5 Write `src/workers/WebhookDeliveryWorker.ts` — bull queue worker: fetch subscription, compute HMAC-SHA256 signature, POST to URL with headers, update delivery status, schedule retry on failure +- [x] 5.6 Write `src/services/EventPublisher.ts` — buildEventPayload, publishEvent (enqueues to bull queue; also produces to Kafka if KAFKA_BROKERS is set) +- [x] 5.7 Update `src/services/AgentService.ts` — call EventPublisher.publishEvent for: agent.created, agent.updated, agent.suspended, agent.reactivated, agent.decommissioned +- [x] 5.8 Update `src/services/CredentialService.ts` — call EventPublisher.publishEvent for: credential.generated, credential.rotated, credential.revoked +- [x] 5.9 Update `src/services/OAuth2Service.ts` — call EventPublisher.publishEvent for: token.issued, token.revoked +- [x] 5.10 Write `src/controllers/WebhookController.ts` — handlers for all 6 webhook endpoints +- [x] 5.11 Write `src/routes/webhooks.ts` — mount all 6 webhook endpoints with correct scope guards +- [x] 5.12 Implement SSRF protection in WebhookDeliveryWorker — reject delivery to RFC 1918 addresses, loopback, and link-local ranges +- [x] 5.13 Implement dead-letter handling — after max retries, set status to dead_letter and increment `agentidp_webhook_dead_letters_total` Prometheus metric +- [x] 5.14 Write `src/adapters/KafkaAdapter.ts` — optional Kafka producer; activated only when KAFKA_BROKERS env var is set +- [x] 5.15 Write unit tests for WebhookService, WebhookDeliveryWorker, EventPublisher — HMAC computation, retry schedule, dead-letter logic +- [x] 5.16 Write integration tests — create subscription, trigger an event, verify delivery; verify SSRF rejection; verify retry on 5xx response +- [x] 5.17 QA sign-off: HMAC verifiable, SSRF protection active, retry schedule correct, dead-letter metric fires, zero `any`, >80% coverage --- diff --git a/policies/data/scopes.json b/policies/data/scopes.json index b55f852..8394c91 100644 --- a/policies/data/scopes.json +++ b/policies/data/scopes.json @@ -24,6 +24,12 @@ "GET:/api/v1/federation/partners": ["admin:orgs"], "GET:/api/v1/federation/partners/:id": ["admin:orgs"], "PATCH:/api/v1/federation/partners/:id": ["admin:orgs"], - "DELETE:/api/v1/federation/partners/:id": ["admin:orgs"] + "DELETE:/api/v1/federation/partners/:id": ["admin:orgs"], + "POST:/api/v1/webhooks": ["webhooks:write"], + "GET:/api/v1/webhooks": ["webhooks:read"], + "GET:/api/v1/webhooks/:id": ["webhooks:read"], + "PATCH:/api/v1/webhooks/:id": ["webhooks:write"], + "DELETE:/api/v1/webhooks/:id": ["webhooks:write"], + "GET:/api/v1/webhooks/:id/deliveries": ["webhooks:read"] } } diff --git a/src/adapters/KafkaAdapter.ts b/src/adapters/KafkaAdapter.ts new file mode 100644 index 0000000..41717af --- /dev/null +++ b/src/adapters/KafkaAdapter.ts @@ -0,0 +1,29 @@ +/** + * KafkaAdapter — optional Kafka producer activated when KAFKA_BROKERS env var is set. + * Returns null if KAFKA_BROKERS is not configured. + * + * The producer connects at application startup. When KAFKA_BROKERS is absent the + * EventPublisher operates in webhook-only mode and Kafka delivery is silently skipped. + */ + +import { Kafka, Producer as KafkaProducer } from 'kafkajs'; + +/** + * Creates and connects a KafkaJS producer. + * Returns null when the KAFKA_BROKERS environment variable is not set. + * + * @returns A connected KafkaProducer, or null if Kafka is not configured. + */ +export async function createKafkaProducer(): Promise { + const brokers = process.env['KAFKA_BROKERS']; + if (!brokers) return null; + + const kafka = new Kafka({ + clientId: 'agentidp', + brokers: brokers.split(','), + }); + + const producer = kafka.producer(); + await producer.connect(); + return producer; +} diff --git a/src/app.ts b/src/app.ts index 6cb406d..17dbf0d 100644 --- a/src/app.ts +++ b/src/app.ts @@ -27,6 +27,10 @@ import { DIDService } from './services/DIDService.js'; import { OIDCKeyService } from './services/OIDCKeyService.js'; import { IDTokenService } from './services/IDTokenService.js'; import { FederationService } from './services/FederationService.js'; +import { WebhookService } from './services/WebhookService.js'; +import { EventPublisher } from './services/EventPublisher.js'; +import { WebhookDeliveryWorker } from './workers/WebhookDeliveryWorker.js'; +import { createKafkaProducer } from './adapters/KafkaAdapter.js'; import { AgentController } from './controllers/AgentController.js'; import { TokenController } from './controllers/TokenController.js'; @@ -36,6 +40,7 @@ import { OrgController } from './controllers/OrgController.js'; import { DIDController } from './controllers/DIDController.js'; import { OIDCController } from './controllers/OIDCController.js'; import { FederationController } from './controllers/FederationController.js'; +import { WebhookController } from './controllers/WebhookController.js'; import { createAgentsRouter } from './routes/agents.js'; import { createTokenRouter } from './routes/token.js'; @@ -47,6 +52,7 @@ import { createOrgsRouter } from './routes/organizations.js'; import { createDIDRouter } from './routes/did.js'; import { createOIDCRouter } from './routes/oidc.js'; import { createFederationRouter } from './routes/federation.js'; +import { createWebhooksRouter } from './routes/webhooks.js'; import { errorHandler } from './middleware/errorHandler.js'; import { createOpaMiddleware } from './middleware/opa.js'; @@ -124,13 +130,30 @@ export async function createApp(): Promise { console.log('[AgentIdP] Vault integration enabled — new credentials will use Vault KV v2'); } + // ──────────────────────────────────────────────────────────────── + // Kafka (optional) — activated when KAFKA_BROKERS env var is set + // ──────────────────────────────────────────────────────────────── + const kafkaProducer = await createKafkaProducer(); + if (kafkaProducer !== null) { + console.log('[AgentIdP] Kafka integration enabled — events will be produced to agentidp-events'); + } + + // ──────────────────────────────────────────────────────────────── + // Webhook infrastructure + // ──────────────────────────────────────────────────────────────── + const redisUrl = process.env['REDIS_URL'] ?? 'redis://localhost:6379'; + const webhookService = new WebhookService(pool, vaultClient, redis as RedisClientType); + const webhookWorker = new WebhookDeliveryWorker(pool, vaultClient, redis as RedisClientType, redisUrl); + webhookWorker.start(); + const eventPublisher = new EventPublisher(webhookWorker, pool, kafkaProducer); + // ──────────────────────────────────────────────────────────────── // Service layer // ──────────────────────────────────────────────────────────────── const auditService = new AuditService(auditRepo); const didService = new DIDService(pool, vaultClient, redis as RedisClientType); - const agentService = new AgentService(agentRepo, credentialRepo, auditService, didService); - const credentialService = new CredentialService(credentialRepo, agentRepo, auditService, vaultClient); + const agentService = new AgentService(agentRepo, credentialRepo, auditService, didService, eventPublisher); + const credentialService = new CredentialService(credentialRepo, agentRepo, auditService, vaultClient, eventPublisher); const orgService = new OrgService(orgRepo, agentRepo); const privateKey = process.env['JWT_PRIVATE_KEY']; @@ -153,6 +176,7 @@ export async function createApp(): Promise { publicKey, vaultClient, idTokenService, + eventPublisher, ); // ──────────────────────────────────────────────────────────────── @@ -172,6 +196,7 @@ export async function createApp(): Promise { const oidcController = new OIDCController(oidcKeyService, agentRepo); const federationService = new FederationService(pool, redis as RedisClientType); const federationController = new FederationController(federationService); + const webhookController = new WebhookController(webhookService); // ──────────────────────────────────────────────────────────────── // Org context middleware — sets PostgreSQL session variable app.organization_id @@ -209,6 +234,7 @@ export async function createApp(): Promise { app.use(`${API_BASE}/audit`, createAuditRouter(auditController, opaMiddleware)); app.use(`${API_BASE}/organizations`, createOrgsRouter(orgController, opaMiddleware)); app.use(`${API_BASE}`, createFederationRouter(federationController, authMiddleware, opaMiddleware)); + app.use(`${API_BASE}/webhooks`, createWebhooksRouter(webhookController, authMiddleware, opaMiddleware)); // ──────────────────────────────────────────────────────────────── // Dashboard static assets (served from dashboard/dist/) diff --git a/src/controllers/WebhookController.ts b/src/controllers/WebhookController.ts new file mode 100644 index 0000000..0735e06 --- /dev/null +++ b/src/controllers/WebhookController.ts @@ -0,0 +1,180 @@ +/** + * WebhookController — request handlers for webhook subscription management and delivery history. + * + * Handlers: + * POST /webhooks → createSubscription + * GET /webhooks → listSubscriptions + * GET /webhooks/:id → getSubscription + * PATCH /webhooks/:id → updateSubscription + * DELETE /webhooks/:id → deleteSubscription + * GET /webhooks/:id/deliveries → listDeliveries + * + * All handlers read `req.user.organization_id` to scope operations to the + * authenticated agent's organization. + */ + +import { Request, Response, NextFunction } from 'express'; +import { WebhookService } from '../services/WebhookService.js'; +import { ITokenPayload } from '../types/index.js'; +import { + ICreateWebhookRequest, + IUpdateWebhookRequest, +} from '../types/webhook.js'; +import { AuthenticationError } from '../utils/errors.js'; + +/** + * Controller for all webhook subscription and delivery endpoints. + * Delegates all business logic to WebhookService. + */ +export class WebhookController { + /** + * @param webhookService - Service that manages webhook subscriptions and deliveries. + */ + constructor(private readonly webhookService: WebhookService) {} + + // ───────────────────────────────────────────────────────────────────────── + // Private helpers + // ───────────────────────────────────────────────────────────────────────── + + /** + * Extracts and asserts the organization_id from the token payload. + * Throws AuthenticationError (401) if the token was issued without an org scope. + */ + private requireOrgId(user: ITokenPayload): string { + if (!user.organization_id) { + throw new AuthenticationError('Token does not carry an organization_id — multi-tenant context required'); + } + return user.organization_id; + } + + // ───────────────────────────────────────────────────────────────────────── + // Public handlers + // ───────────────────────────────────────────────────────────────────────── + + /** + * Creates a new webhook subscription for the authenticated agent's organization. + * + * Returns 201 with the subscription and the one-time signing secret. + * + * @param req - Express request. Body must conform to ICreateWebhookRequest. + * @param res - Express response. + * @param next - Express next function — forwards errors to the global error handler. + */ + async createSubscription(req: Request, res: Response, next: NextFunction): Promise { + try { + const user = req.user as ITokenPayload; + const body = req.body as ICreateWebhookRequest; + const result = await this.webhookService.createSubscription(this.requireOrgId(user), body); + res.status(201).json(result); + } catch (err) { + next(err); + } + } + + /** + * Returns all webhook subscriptions for the authenticated agent's organization. + * + * Responds 200 with an array of IWebhookSubscription objects. + * + * @param req - Express request. + * @param res - Express response. + * @param next - Express next function — forwards errors to the global error handler. + */ + async listSubscriptions(req: Request, res: Response, next: NextFunction): Promise { + try { + const user = req.user as ITokenPayload; + const subscriptions = await this.webhookService.listSubscriptions(this.requireOrgId(user)); + res.status(200).json(subscriptions); + } catch (err) { + next(err); + } + } + + /** + * Returns a single webhook subscription by its UUID. + * + * Responds 200 with the matching IWebhookSubscription. + * Responds 404 if the subscription does not exist or belongs to another organization. + * + * @param req - Express request. `req.params.id` must be the subscription UUID. + * @param res - Express response. + * @param next - Express next function — forwards errors to the global error handler. + */ + async getSubscription(req: Request, res: Response, next: NextFunction): Promise { + try { + const user = req.user as ITokenPayload; + const { id } = req.params; + const subscription = await this.webhookService.getSubscription(id, this.requireOrgId(user)); + res.status(200).json(subscription); + } catch (err) { + next(err); + } + } + + /** + * Partially updates an existing webhook subscription. + * + * Responds 200 with the updated IWebhookSubscription. + * Responds 404 if the subscription does not exist or belongs to another organization. + * + * @param req - Express request. `req.params.id` is the subscription UUID; body is IUpdateWebhookRequest. + * @param res - Express response. + * @param next - Express next function — forwards errors to the global error handler. + */ + async updateSubscription(req: Request, res: Response, next: NextFunction): Promise { + try { + const user = req.user as ITokenPayload; + const { id } = req.params; + const body = req.body as IUpdateWebhookRequest; + const subscription = await this.webhookService.updateSubscription(id, this.requireOrgId(user), body); + res.status(200).json(subscription); + } catch (err) { + next(err); + } + } + + /** + * Permanently deletes a webhook subscription and all its delivery records. + * + * Responds 204 No Content on success. + * Responds 404 if the subscription does not exist or belongs to another organization. + * + * @param req - Express request. `req.params.id` must be the subscription UUID. + * @param res - Express response. + * @param next - Express next function — forwards errors to the global error handler. + */ + async deleteSubscription(req: Request, res: Response, next: NextFunction): Promise { + try { + const user = req.user as ITokenPayload; + const { id } = req.params; + await this.webhookService.deleteSubscription(id, this.requireOrgId(user)); + res.status(204).send(); + } catch (err) { + next(err); + } + } + + /** + * Returns a paginated list of delivery records for a webhook subscription. + * + * Responds 200 with an IPaginatedDeliveriesResponse. + * Responds 404 if the subscription does not exist or belongs to another organization. + * + * @param req - Express request. `req.params.id` is the subscription UUID. + * Optional query params: limit (default 20), offset (default 0). + * @param res - Express response. + * @param next - Express next function — forwards errors to the global error handler. + */ + async listDeliveries(req: Request, res: Response, next: NextFunction): Promise { + try { + const user = req.user as ITokenPayload; + const { id } = req.params; + const limit = Math.max(1, parseInt(String(req.query['limit'] ?? '20'), 10) || 20); + const offset = Math.max(0, parseInt(String(req.query['offset'] ?? '0'), 10) || 0); + const result = await this.webhookService.listDeliveries(id, this.requireOrgId(user), limit, offset); + res.status(200).json(result); + } catch (err) { + next(err); + } + } +} diff --git a/src/db/migrations/016_create_webhook_subscriptions_table.sql b/src/db/migrations/016_create_webhook_subscriptions_table.sql new file mode 100644 index 0000000..12ecd4f --- /dev/null +++ b/src/db/migrations/016_create_webhook_subscriptions_table.sql @@ -0,0 +1,19 @@ +-- webhook_subscriptions: per-organization webhook delivery targets. +-- Each subscription registers a URL to receive HTTP POST callbacks for specified event types. +-- The HMAC signing secret is stored in Vault (vault_secret_path) when Vault is configured, +-- or bcrypt-hashed into secret_hash in local mode. The raw secret is never stored in PostgreSQL. +CREATE TABLE IF NOT EXISTS webhook_subscriptions ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + organization_id UUID NOT NULL REFERENCES organizations(id) ON DELETE CASCADE, + name VARCHAR(255) NOT NULL, + url VARCHAR(2048) NOT NULL, -- HTTPS delivery target + events JSONB NOT NULL DEFAULT '[]', -- array of WebhookEventType strings + secret_hash VARCHAR(255) NOT NULL, -- bcrypt hash (local mode) or 'vault' sentinel + vault_secret_path VARCHAR(512) NOT NULL, -- Vault KV path for HMAC signing secret, or 'local' + active BOOLEAN NOT NULL DEFAULT true, + failure_count INTEGER NOT NULL DEFAULT 0, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); +CREATE INDEX IF NOT EXISTS idx_webhook_subscriptions_organization_id ON webhook_subscriptions(organization_id); +CREATE INDEX IF NOT EXISTS idx_webhook_subscriptions_active ON webhook_subscriptions(active); diff --git a/src/db/migrations/017_create_webhook_deliveries_table.sql b/src/db/migrations/017_create_webhook_deliveries_table.sql new file mode 100644 index 0000000..d48236c --- /dev/null +++ b/src/db/migrations/017_create_webhook_deliveries_table.sql @@ -0,0 +1,19 @@ +-- webhook_deliveries: audit trail for every webhook delivery attempt. +-- A new row is inserted for each outbound event dispatch. Retries are tracked via +-- attempt_count and next_retry_at. Terminal states are 'delivered' and 'dead_letter'. +CREATE TABLE IF NOT EXISTS webhook_deliveries ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + subscription_id UUID NOT NULL REFERENCES webhook_subscriptions(id) ON DELETE CASCADE, + event_type VARCHAR(128) NOT NULL, + payload JSONB NOT NULL, -- full IWebhookPayload delivered + status VARCHAR(32) NOT NULL DEFAULT 'pending', -- 'pending' | 'delivered' | 'failed' | 'dead_letter' + http_status_code INTEGER, -- populated after delivery attempt; NULL until first attempt + attempt_count INTEGER NOT NULL DEFAULT 0, + next_retry_at TIMESTAMPTZ, -- NULL until a failure sets a retry schedule + delivered_at TIMESTAMPTZ, -- NULL until successful delivery + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); +CREATE INDEX IF NOT EXISTS idx_webhook_deliveries_subscription_id ON webhook_deliveries(subscription_id); +CREATE INDEX IF NOT EXISTS idx_webhook_deliveries_status ON webhook_deliveries(status); +CREATE INDEX IF NOT EXISTS idx_webhook_deliveries_next_retry_at ON webhook_deliveries(next_retry_at); diff --git a/src/metrics/registry.ts b/src/metrics/registry.ts index 8f97bcc..e7ecd46 100644 --- a/src/metrics/registry.ts +++ b/src/metrics/registry.ts @@ -77,3 +77,15 @@ export const redisCommandDurationSeconds = new Histogram({ buckets: [0.0005, 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25], registers: [metricsRegistry], }); + +/** + * Total number of webhook deliveries that reached the dead-letter state + * (i.e. exhausted all retry attempts without a 2xx response). + * Labels: organization_id + */ +export const webhookDeadLettersTotal = new Counter({ + name: 'agentidp_webhook_dead_letters_total', + help: 'Total number of webhook deliveries that exhausted all retry attempts.', + labelNames: ['organization_id'] as const, + registers: [metricsRegistry], +}); diff --git a/src/routes/webhooks.ts b/src/routes/webhooks.ts new file mode 100644 index 0000000..1da42dc --- /dev/null +++ b/src/routes/webhooks.ts @@ -0,0 +1,86 @@ +/** + * Webhook routes — subscription management and delivery history. + * + * All endpoints require authentication (authMiddleware) and OPA scope enforcement. + * + * Endpoints: + * POST /webhooks → createSubscription (webhooks:write) + * GET /webhooks → listSubscriptions (webhooks:read) + * GET /webhooks/:id → getSubscription (webhooks:read) + * PATCH /webhooks/:id → updateSubscription (webhooks:write) + * DELETE /webhooks/:id → deleteSubscription (webhooks:write) + * GET /webhooks/:id/deliveries → listDeliveries (webhooks:read) + * + * Mount this router at `${API_BASE}/webhooks` in app.ts. + */ + +import { Router, RequestHandler } from 'express'; +import { WebhookController } from '../controllers/WebhookController.js'; +import { asyncHandler } from '../utils/asyncHandler.js'; + +/** + * Creates and returns the Express router for webhook endpoints. + * Mount at `${API_BASE}/webhooks` (i.e. `/api/v1/webhooks`) in app.ts. + * + * @param controller - The webhook controller instance. + * @param authMiddleware - JWT authentication middleware. + * @param opaMiddleware - OPA authorization middleware (scope enforcement). + * @returns Configured Express router. + */ +export function createWebhooksRouter( + controller: WebhookController, + authMiddleware: RequestHandler, + opaMiddleware: RequestHandler, +): Router { + const router = Router(); + + // POST /webhooks — create a new subscription (webhooks:write) + router.post( + '/', + authMiddleware, + opaMiddleware, + asyncHandler(controller.createSubscription.bind(controller)), + ); + + // GET /webhooks — list all subscriptions for the org (webhooks:read) + router.get( + '/', + authMiddleware, + opaMiddleware, + asyncHandler(controller.listSubscriptions.bind(controller)), + ); + + // GET /webhooks/:id — get a single subscription (webhooks:read) + router.get( + '/:id', + authMiddleware, + opaMiddleware, + asyncHandler(controller.getSubscription.bind(controller)), + ); + + // PATCH /webhooks/:id — update a subscription (webhooks:write) + router.patch( + '/:id', + authMiddleware, + opaMiddleware, + asyncHandler(controller.updateSubscription.bind(controller)), + ); + + // DELETE /webhooks/:id — delete a subscription (webhooks:write) + router.delete( + '/:id', + authMiddleware, + opaMiddleware, + asyncHandler(controller.deleteSubscription.bind(controller)), + ); + + // GET /webhooks/:id/deliveries — list delivery history (webhooks:read) + router.get( + '/:id/deliveries', + authMiddleware, + opaMiddleware, + asyncHandler(controller.listDeliveries.bind(controller)), + ); + + return router; +} diff --git a/src/services/AgentService.ts b/src/services/AgentService.ts index c2987c0..3913963 100644 --- a/src/services/AgentService.ts +++ b/src/services/AgentService.ts @@ -7,6 +7,7 @@ import { AgentRepository } from '../repositories/AgentRepository.js'; import { CredentialRepository } from '../repositories/CredentialRepository.js'; import { AuditService } from './AuditService.js'; import { DIDService } from './DIDService.js'; +import { EventPublisher } from './EventPublisher.js'; import { IAgent, ICreateAgentRequest, @@ -36,12 +37,15 @@ export class AgentService { * @param didService - Optional DIDService. When provided, a W3C DID is generated for each * newly registered agent. When null/undefined, DID generation is skipped * (backward-compatible default). + * @param eventPublisher - Optional EventPublisher. When provided, lifecycle events are + * published as webhooks and Kafka messages (fire-and-forget). */ constructor( private readonly agentRepository: AgentRepository, private readonly credentialRepository: CredentialRepository, private readonly auditService: AuditService, private readonly didService: DIDService | null = null, + private readonly eventPublisher: EventPublisher | null = null, ) {} /** @@ -96,6 +100,13 @@ export class AgentService { // Instrument: count successful agent registrations agentsRegisteredTotal.inc({ deployment_env: data.deploymentEnv }); + // Publish event (fire-and-forget) + void this.eventPublisher?.publishEvent( + agent.organizationId, + 'agent.created', + { agentId: agent.agentId, email: agent.email, name: agent.owner }, + ); + return agent; } @@ -184,6 +195,33 @@ export class AgentService { { updatedFields: Object.keys(data) }, ); + // Publish lifecycle event (fire-and-forget) + if (auditAction === 'agent.updated') { + void this.eventPublisher?.publishEvent( + updated.organizationId, + 'agent.updated', + { agentId, changes: data }, + ); + } else if (auditAction === 'agent.suspended') { + void this.eventPublisher?.publishEvent( + updated.organizationId, + 'agent.suspended', + { agentId }, + ); + } else if (auditAction === 'agent.reactivated') { + void this.eventPublisher?.publishEvent( + updated.organizationId, + 'agent.reactivated', + { agentId }, + ); + } else if (auditAction === 'agent.decommissioned') { + void this.eventPublisher?.publishEvent( + updated.organizationId, + 'agent.decommissioned', + { agentId }, + ); + } + return updated; } @@ -224,5 +262,12 @@ export class AgentService { userAgent, {}, ); + + // Publish event (fire-and-forget) + void this.eventPublisher?.publishEvent( + agent.organizationId, + 'agent.decommissioned', + { agentId }, + ); } } diff --git a/src/services/CredentialService.ts b/src/services/CredentialService.ts index d147577..6e80a72 100644 --- a/src/services/CredentialService.ts +++ b/src/services/CredentialService.ts @@ -7,6 +7,7 @@ import { CredentialRepository } from '../repositories/CredentialRepository.js'; import { AgentRepository } from '../repositories/AgentRepository.js'; import { AuditService } from './AuditService.js'; import { VaultClient } from '../vault/VaultClient.js'; +import { EventPublisher } from './EventPublisher.js'; import { ICredential, ICredentialWithSecret, @@ -34,12 +35,15 @@ export class CredentialService { * @param auditService - The audit log service. * @param vaultClient - Optional VaultClient. When provided, new credentials are stored in Vault. * When null, bcrypt is used (Phase 1 behaviour). + * @param eventPublisher - Optional EventPublisher. When provided, credential events are + * published as webhooks and Kafka messages (fire-and-forget). */ constructor( private readonly credentialRepository: CredentialRepository, private readonly agentRepository: AgentRepository, private readonly auditService: AuditService, private readonly vaultClient: VaultClient | null = null, + private readonly eventPublisher: EventPublisher | null = null, ) {} /** @@ -104,6 +108,13 @@ export class CredentialService { { credentialId: credential.credentialId }, ); + // Publish event (fire-and-forget) + void this.eventPublisher?.publishEvent( + agent.organizationId, + 'credential.generated', + { credentialId: credential.credentialId, agentId }, + ); + return { ...credential, clientSecret: plainSecret }; } @@ -205,6 +216,13 @@ export class CredentialService { { credentialId }, ); + // Publish event (fire-and-forget) + void this.eventPublisher?.publishEvent( + agent.organizationId, + 'credential.rotated', + { credentialId, agentId }, + ); + return { ...updated, clientSecret: plainSecret }; } @@ -258,5 +276,12 @@ export class CredentialService { userAgent, { credentialId }, ); + + // Publish event (fire-and-forget) + void this.eventPublisher?.publishEvent( + agent.organizationId, + 'credential.revoked', + { credentialId, agentId }, + ); } } diff --git a/src/services/EventPublisher.ts b/src/services/EventPublisher.ts new file mode 100644 index 0000000..14e8ad8 --- /dev/null +++ b/src/services/EventPublisher.ts @@ -0,0 +1,130 @@ +/** + * EventPublisher — fire-and-forget event bus for agent lifecycle and credential events. + * + * On each publishEvent call the service: + * 1. Builds the IWebhookPayload envelope. + * 2. Queries for active webhook subscriptions that match the event type. + * 3. Creates a delivery record for each matching subscription and enqueues a Bull job. + * 4. Optionally produces the event to the 'agentidp-events' Kafka topic (when configured). + * + * Kafka errors are caught and logged but never propagate — event publishing must not + * block or fail the primary business operation. + */ + +import { Pool } from 'pg'; +import crypto from 'crypto'; +import { Producer as KafkaProducer } from 'kafkajs'; +import { WebhookDeliveryWorker } from '../workers/WebhookDeliveryWorker.js'; +import { IWebhookPayload, WebhookEventType } from '../types/webhook.js'; + +/** Minimal subscription row shape needed for event fanout. */ +interface ActiveSubscriptionRow { + id: string; + organization_id: string; +} + +/** + * Fire-and-forget event publisher. + * Fanout to webhooks and optionally to Kafka on each publishEvent call. + */ +export class EventPublisher { + /** + * @param webhookWorker - Worker that enqueues outbound HTTP delivery jobs. + * @param pool - PostgreSQL connection pool (for subscription lookup and delivery inserts). + * @param kafkaProducer - Optional connected KafkaJS producer. Null when Kafka is not configured. + */ + constructor( + private readonly webhookWorker: WebhookDeliveryWorker, + private readonly pool: Pool, + private readonly kafkaProducer: KafkaProducer | null, + ) {} + + /** + * Publishes an event to all active matching webhook subscriptions and optionally to Kafka. + * + * This method is fire-and-forget. Individual delivery failures are logged but not + * thrown. Callers should not await this method — use `void this.eventPublisher?.publishEvent(...)`. + * + * @param orgId - The organization UUID that owns the resource that changed. + * @param eventType - The event type identifier. + * @param data - Event-specific payload data. + */ + async publishEvent( + orgId: string, + eventType: WebhookEventType, + data: Record, + ): Promise { + const id = crypto.randomUUID(); + const payload: IWebhookPayload = { + id, + event: eventType, + timestamp: new Date().toISOString(), + organization_id: orgId, + data, + }; + + // ── Webhook fanout ───────────────────────────────────────────────────── + try { + // Find all active subscriptions for this org that include this event type. + // The events column is a JSONB array; we check containment with @>. + const subResult = await this.pool.query( + `SELECT id, organization_id + FROM webhook_subscriptions + WHERE organization_id = $1 + AND active = true + AND events @> $2::jsonb`, + [orgId, JSON.stringify([eventType])], + ); + + for (const sub of subResult.rows) { + try { + // Insert a delivery record + const deliveryResult = await this.pool.query<{ id: string }>( + `INSERT INTO webhook_deliveries + (subscription_id, event_type, payload, status, attempt_count) + VALUES ($1, $2, $3::jsonb, 'pending', 0) + RETURNING id`, + [sub.id, eventType, JSON.stringify(payload)], + ); + + const deliveryId = deliveryResult.rows[0].id; + + // Enqueue the Bull delivery job + await this.webhookWorker.enqueue({ + deliveryId, + subscriptionId: sub.id, + organizationId: orgId, + payload, + }); + } catch (subErr) { + console.error( + `[EventPublisher] Failed to enqueue delivery for subscription ${sub.id}: ${(subErr as Error).message}`, + ); + } + } + } catch (webhookErr) { + console.error( + `[EventPublisher] Webhook fanout error for event ${eventType} (org ${orgId}): ${(webhookErr as Error).message}`, + ); + } + + // ── Kafka fanout ─────────────────────────────────────────────────────── + if (this.kafkaProducer !== null) { + try { + await this.kafkaProducer.send({ + topic: 'agentidp-events', + messages: [ + { + key: orgId, + value: JSON.stringify(payload), + }, + ], + }); + } catch (kafkaErr) { + console.error( + `[EventPublisher] Kafka produce error for event ${eventType} (org ${orgId}): ${(kafkaErr as Error).message}`, + ); + } + } + } +} diff --git a/src/services/OAuth2Service.ts b/src/services/OAuth2Service.ts index ad37e7b..7e65b83 100644 --- a/src/services/OAuth2Service.ts +++ b/src/services/OAuth2Service.ts @@ -9,6 +9,7 @@ import { AgentRepository } from '../repositories/AgentRepository.js'; import { AuditService } from './AuditService.js'; import { VaultClient } from '../vault/VaultClient.js'; import { IDTokenService } from './IDTokenService.js'; +import { EventPublisher } from './EventPublisher.js'; import { ITokenPayload, ITokenResponse, @@ -49,6 +50,8 @@ export class OAuth2Service { * @param vaultClient - Optional VaultClient for Phase 2 credential verification. * @param idTokenService - Optional IDTokenService; when provided and `openid` scope * is requested, an OIDC ID token is appended to the token response. + * @param eventPublisher - Optional EventPublisher. When provided, token.issued and + * token.revoked events are published as webhooks and Kafka messages (fire-and-forget). */ constructor( private readonly tokenRepository: TokenRepository, @@ -59,6 +62,7 @@ export class OAuth2Service { private readonly publicKey: string, private readonly vaultClient: VaultClient | null = null, private readonly idTokenService: IDTokenService | null = null, + private readonly eventPublisher: EventPublisher | null = null, ) {} /** @@ -211,6 +215,13 @@ export class OAuth2Service { // Instrument: count successful token issuances tokensIssuedTotal.inc({ scope }); + // Publish event (fire-and-forget) + void this.eventPublisher?.publishEvent( + agent.organizationId ?? 'org_system', + 'token.issued', + { agentId: clientId, scope, jti }, + ); + const tokenResponse: ITokenResponse = { access_token: accessToken, token_type: 'Bearer', @@ -323,6 +334,13 @@ export class OAuth2Service { userAgent, { jti: decoded.jti }, ); + + // Publish event (fire-and-forget) + void this.eventPublisher?.publishEvent( + callerPayload.organization_id ?? 'org_system', + 'token.revoked', + { jti: decoded.jti }, + ); } // If token is malformed/undecoded, per RFC 7009 we still return success } diff --git a/src/services/WebhookService.ts b/src/services/WebhookService.ts new file mode 100644 index 0000000..aefc904 --- /dev/null +++ b/src/services/WebhookService.ts @@ -0,0 +1,475 @@ +/** + * WebhookService — manages webhook subscriptions and delivery history. + * + * HMAC signing secrets are stored in Vault when available (vault_secret_path holds the KV path). + * In local mode (no Vault) the secret is bcrypt-hashed and stored in secret_hash, and + * vault_secret_path is set to the sentinel value 'local'. The raw secret is never persisted + * in PostgreSQL and is only returned once at subscription creation time. + */ + +import { Pool } from 'pg'; +import { RedisClientType } from 'redis'; +import crypto from 'crypto'; +import bcrypt from 'bcryptjs'; +import { VaultClient } from '../vault/VaultClient.js'; +import { SentryAgentError } from '../utils/errors.js'; +import { + IWebhookSubscription, + ICreateWebhookRequest, + IUpdateWebhookRequest, + IWebhookDelivery, + IPaginatedDeliveriesResponse, + WebhookEventType, +} from '../types/webhook.js'; + +// ============================================================================ +// Custom errors +// ============================================================================ + +/** 404 — Referenced webhook subscription was not found (or belongs to another org). */ +export class WebhookNotFoundError extends SentryAgentError { + constructor(subscriptionId?: string) { + super( + 'Webhook subscription with the specified ID was not found.', + 'WEBHOOK_NOT_FOUND', + 404, + subscriptionId ? { subscriptionId } : undefined, + ); + } +} + +/** 400 — Webhook subscription request failed validation. */ +export class WebhookValidationError extends SentryAgentError { + constructor(message: string, details?: Record) { + super(message, 'WEBHOOK_VALIDATION_ERROR', 400, details); + } +} + +// ============================================================================ +// Internal DB row shape +// ============================================================================ + +/** Internal representation of a webhook_subscriptions row. */ +interface WebhookSubscriptionRow { + id: string; + organization_id: string; + name: string; + url: string; + events: WebhookEventType[]; + secret_hash: string; + vault_secret_path: string; + active: boolean; + failure_count: number; + created_at: Date; + updated_at: Date; +} + +/** Internal representation of a webhook_deliveries row. */ +interface WebhookDeliveryRow { + id: string; + subscription_id: string; + event_type: string; + payload: Record; + status: 'pending' | 'delivered' | 'failed' | 'dead_letter'; + http_status_code: number | null; + attempt_count: number; + next_retry_at: Date | null; + delivered_at: Date | null; + created_at: Date; + updated_at: Date; +} + +// ============================================================================ +// Mappers +// ============================================================================ + +/** + * Maps a raw DB row to a public IWebhookSubscription (never includes secret fields). + */ +function mapRowToSubscription(row: WebhookSubscriptionRow): IWebhookSubscription { + return { + id: row.id, + organization_id: row.organization_id, + name: row.name, + url: row.url, + events: row.events, + active: row.active, + failure_count: row.failure_count, + created_at: row.created_at, + updated_at: row.updated_at, + }; +} + +/** + * Maps a raw DB row to a public IWebhookDelivery. + */ +function mapRowToDelivery(row: WebhookDeliveryRow): IWebhookDelivery { + return { + id: row.id, + subscription_id: row.subscription_id, + event_type: row.event_type as WebhookEventType, + payload: row.payload, + status: row.status, + http_status_code: row.http_status_code, + attempt_count: row.attempt_count, + next_retry_at: row.next_retry_at, + delivered_at: row.delivered_at, + created_at: row.created_at, + updated_at: row.updated_at, + }; +} + +// ============================================================================ +// Service +// ============================================================================ + +/** + * Service for managing webhook subscriptions and delivery audit records. + * Coordinates PostgreSQL persistence and optional Vault secret storage. + */ +export class WebhookService { + /** + * @param pool - PostgreSQL connection pool. + * @param vaultClient - Optional VaultClient. When provided, HMAC secrets are stored in Vault. + * @param redis - Redis client (reserved for future caching needs). + */ + constructor( + private readonly pool: Pool, + private readonly vaultClient: VaultClient | null, + _redis: RedisClientType, // reserved for future subscription caching + ) {} + + // ────────────────────────────────────────────────────────────────────────── + // Subscriptions + // ────────────────────────────────────────────────────────────────────────── + + /** + * Creates a new webhook subscription for the given organization. + * + * A 32-byte random HMAC signing secret is generated. When Vault is configured the + * secret is stored in Vault KV v2 at `secret/data/agentidp/webhooks/{orgId}/{id}/secret` + * and `vault_secret_path` is set to that path. In local mode the secret is bcrypt-hashed + * and stored in `secret_hash`; the raw secret is returned from this method and must be + * delivered to the caller immediately — it cannot be retrieved again. + * + * @param orgId - The organization UUID that owns this subscription. + * @param req - The creation request (name, url, events). + * @returns The created IWebhookSubscription plus the one-time `secret` field on the raw object. + * @throws WebhookValidationError if the URL is not https:// or events array is empty. + */ + async createSubscription( + orgId: string, + req: ICreateWebhookRequest, + ): Promise { + this.validateUrl(req.url); + this.validateEvents(req.events); + + const secret = crypto.randomBytes(32).toString('hex'); + const subscriptionId = crypto.randomUUID(); + + let secretHash: string; + let vaultSecretPath: string; + + if (this.vaultClient !== null) { + // Store raw secret in Vault under a webhook-specific path + const vaultPath = `secret/data/agentidp/webhooks/${orgId}/${subscriptionId}/secret`; + await this.storeWebhookSecretInVault(vaultPath, secret); + secretHash = 'vault'; + vaultSecretPath = vaultPath; + } else { + // Local mode: bcrypt-hash the secret; raw secret cannot be recovered later + secretHash = await bcrypt.hash(secret, 10); + vaultSecretPath = 'local'; + } + + const result = await this.pool.query( + `INSERT INTO webhook_subscriptions + (id, organization_id, name, url, events, secret_hash, vault_secret_path, active, failure_count) + VALUES ($1, $2, $3, $4, $5::jsonb, $6, $7, true, 0) + RETURNING *`, + [ + subscriptionId, + orgId, + req.name, + req.url, + JSON.stringify(req.events), + secretHash, + vaultSecretPath, + ], + ); + + const row = result.rows[0]; + return { ...mapRowToSubscription(row), secret }; + } + + /** + * Retrieves the raw HMAC signing secret for a subscription (Vault mode only). + * + * In local mode the secret was only available at creation time and cannot be recovered. + * + * @param subscriptionId - The subscription UUID. + * @param orgId - The organization UUID (ownership verification). + * @returns The raw HMAC signing secret. + * @throws WebhookNotFoundError if the subscription does not exist or belongs to another org. + * @throws WebhookValidationError if the subscription is in local mode (secret not recoverable). + */ + async getSubscriptionSecret(subscriptionId: string, orgId: string): Promise { + const row = await this.fetchRow(subscriptionId, orgId); + + if (row.vault_secret_path === 'local') { + throw new WebhookValidationError( + 'Secret not available — use webhook secret returned at creation time.', + { subscriptionId }, + ); + } + + return this.retrieveWebhookSecretFromVault(row.vault_secret_path); + } + + /** + * Returns all webhook subscriptions for the given organization. + * + * @param orgId - The organization UUID. + * @returns Array of IWebhookSubscription (no secret fields). + */ + async listSubscriptions(orgId: string): Promise { + const result = await this.pool.query( + `SELECT * FROM webhook_subscriptions + WHERE organization_id = $1 + ORDER BY created_at DESC`, + [orgId], + ); + return result.rows.map(mapRowToSubscription); + } + + /** + * Returns a single webhook subscription by ID. + * + * @param id - The subscription UUID. + * @param orgId - The organization UUID (ownership verification). + * @returns The IWebhookSubscription. + * @throws WebhookNotFoundError if not found or belongs to another org. + */ + async getSubscription(id: string, orgId: string): Promise { + const row = await this.fetchRow(id, orgId); + return mapRowToSubscription(row); + } + + /** + * Partially updates a webhook subscription. + * + * Only the fields provided in `req` are changed. If a new URL is provided it must + * use the `https://` scheme. + * + * @param id - The subscription UUID. + * @param orgId - The organization UUID (ownership verification). + * @param req - Fields to update. + * @returns The updated IWebhookSubscription. + * @throws WebhookNotFoundError if not found or belongs to another org. + * @throws WebhookValidationError if the new URL is not https://. + */ + async updateSubscription( + id: string, + orgId: string, + req: IUpdateWebhookRequest, + ): Promise { + // Verify ownership before mutating + await this.fetchRow(id, orgId); + + if (req.url !== undefined) { + this.validateUrl(req.url); + } + + const setClauses: string[] = []; + const values: unknown[] = []; + let paramIndex = 1; + + if (req.name !== undefined) { + setClauses.push(`name = $${paramIndex++}`); + values.push(req.name); + } + if (req.url !== undefined) { + setClauses.push(`url = $${paramIndex++}`); + values.push(req.url); + } + if (req.events !== undefined) { + this.validateEvents(req.events); + setClauses.push(`events = $${paramIndex++}::jsonb`); + values.push(JSON.stringify(req.events)); + } + if (req.active !== undefined) { + setClauses.push(`active = $${paramIndex++}`); + values.push(req.active); + } + + if (setClauses.length === 0) { + // Nothing to update — re-fetch and return current state + return this.getSubscription(id, orgId); + } + + setClauses.push(`updated_at = NOW()`); + values.push(id); + values.push(orgId); + + const result = await this.pool.query( + `UPDATE webhook_subscriptions + SET ${setClauses.join(', ')} + WHERE id = $${paramIndex++} AND organization_id = $${paramIndex} + RETURNING *`, + values, + ); + + if (result.rowCount === 0) { + throw new WebhookNotFoundError(id); + } + + return mapRowToSubscription(result.rows[0]); + } + + /** + * Permanently deletes a webhook subscription (and all its deliveries via CASCADE). + * + * @param id - The subscription UUID. + * @param orgId - The organization UUID (ownership verification). + * @throws WebhookNotFoundError if not found or belongs to another org. + */ + async deleteSubscription(id: string, orgId: string): Promise { + const result = await this.pool.query( + `DELETE FROM webhook_subscriptions + WHERE id = $1 AND organization_id = $2`, + [id, orgId], + ); + + if (result.rowCount === 0) { + throw new WebhookNotFoundError(id); + } + } + + // ────────────────────────────────────────────────────────────────────────── + // Deliveries + // ────────────────────────────────────────────────────────────────────────── + + /** + * Returns a paginated list of delivery records for a subscription. + * + * Verifies that the subscription belongs to the given organization before querying. + * + * @param subscriptionId - The subscription UUID. + * @param orgId - The organization UUID (ownership verification). + * @param limit - Page size (default 20). + * @param offset - Row offset (default 0). + * @returns Paginated delivery records. + * @throws WebhookNotFoundError if the subscription does not exist or belongs to another org. + */ + async listDeliveries( + subscriptionId: string, + orgId: string, + limit: number, + offset: number, + ): Promise { + // Verify ownership + await this.fetchRow(subscriptionId, orgId); + + const countResult = await this.pool.query<{ count: string }>( + `SELECT COUNT(*) AS count FROM webhook_deliveries WHERE subscription_id = $1`, + [subscriptionId], + ); + const total = parseInt(countResult.rows[0].count, 10); + + const result = await this.pool.query( + `SELECT * FROM webhook_deliveries + WHERE subscription_id = $1 + ORDER BY created_at DESC + LIMIT $2 OFFSET $3`, + [subscriptionId, limit, offset], + ); + + return { + deliveries: result.rows.map(mapRowToDelivery), + total, + limit, + offset, + }; + } + + // ────────────────────────────────────────────────────────────────────────── + // Private helpers + // ────────────────────────────────────────────────────────────────────────── + + /** + * Fetches the full subscription row including secret fields (for internal use only). + * + * @param id - The subscription UUID. + * @param orgId - The organization UUID. + * @throws WebhookNotFoundError if not found or org mismatch. + */ + private async fetchRow(id: string, orgId: string): Promise { + const result = await this.pool.query( + `SELECT * FROM webhook_subscriptions WHERE id = $1 AND organization_id = $2`, + [id, orgId], + ); + if (result.rowCount === 0) { + throw new WebhookNotFoundError(id); + } + return result.rows[0]; + } + + /** + * Validates that a URL uses the https:// scheme. + * + * @param url - The URL to validate. + * @throws WebhookValidationError if the scheme is not https. + */ + private validateUrl(url: string): void { + try { + const parsed = new URL(url); + if (parsed.protocol !== 'https:') { + throw new WebhookValidationError( + 'Webhook URL must use the https:// scheme.', + { url }, + ); + } + } catch (err) { + if (err instanceof WebhookValidationError) throw err; + throw new WebhookValidationError('Webhook URL is not a valid URL.', { url }); + } + } + + /** + * Validates that the events array is non-empty. + * + * @param events - The events array to validate. + * @throws WebhookValidationError if the array is empty. + */ + private validateEvents(events: WebhookEventType[]): void { + if (events.length === 0) { + throw new WebhookValidationError('Webhook subscription must include at least one event type.'); + } + } + + /** + * Stores a webhook HMAC secret in Vault at the given KV v2 data path. + * + * @param vaultPath - Full KV v2 data path (e.g. `secret/data/agentidp/webhooks/...`). + * @param secret - The raw HMAC secret to store. + */ + private async storeWebhookSecretInVault(vaultPath: string, secret: string): Promise { + await this.vaultClient!.writeArbitrarySecret(vaultPath, { webhookSecret: secret }); + } + + /** + * Retrieves a webhook HMAC secret from Vault at the given KV v2 data path. + * + * @param vaultPath - Full KV v2 data path. + * @returns The raw HMAC secret string. + * @throws WebhookValidationError if the secret is missing or empty. + */ + private async retrieveWebhookSecretFromVault(vaultPath: string): Promise { + const data = await this.vaultClient!.readArbitrarySecret(vaultPath); + const secret = data['webhookSecret']; + if (typeof secret !== 'string' || secret.length === 0) { + throw new WebhookValidationError('Vault returned an empty or missing webhook secret.', { vaultPath }); + } + return secret; + } +} diff --git a/src/types/index.ts b/src/types/index.ts index 57a3539..4c4cbcb 100644 --- a/src/types/index.ts +++ b/src/types/index.ts @@ -28,7 +28,14 @@ export type DeploymentEnv = 'development' | 'staging' | 'production'; export type CredentialStatus = 'active' | 'revoked'; /** OAuth 2.0 scope values supported by this IdP. */ -export type OAuthScope = 'agents:read' | 'agents:write' | 'tokens:read' | 'audit:read' | 'admin:orgs'; +export type OAuthScope = + | 'agents:read' + | 'agents:write' + | 'tokens:read' + | 'audit:read' + | 'admin:orgs' + | 'webhooks:read' + | 'webhooks:write'; /** Audit action identifiers for all significant platform events. */ export type AuditAction = @@ -47,7 +54,10 @@ export type AuditAction = | 'org.created' | 'org.updated' | 'org.deleted' - | 'org.member_added'; + | 'org.member_added' + | 'webhook.created' + | 'webhook.updated' + | 'webhook.deleted'; /** Outcome of an audited action. */ export type AuditOutcome = 'success' | 'failure'; diff --git a/src/types/webhook.ts b/src/types/webhook.ts new file mode 100644 index 0000000..a966654 --- /dev/null +++ b/src/types/webhook.ts @@ -0,0 +1,94 @@ +/** + * Shared TypeScript interfaces and types for the Webhook / Event Streaming subsystem. + * Imported by WebhookService, WebhookDeliveryWorker, EventPublisher, and their controllers. + */ + +// ============================================================================ +// Event Types +// ============================================================================ + +/** All event type identifiers that can be published and subscribed to. */ +export type WebhookEventType = + | 'agent.created' + | 'agent.updated' + | 'agent.suspended' + | 'agent.reactivated' + | 'agent.decommissioned' + | 'credential.generated' + | 'credential.rotated' + | 'credential.revoked' + | 'token.issued' + | 'token.revoked'; + +// ============================================================================ +// Webhook Subscription +// ============================================================================ + +/** A registered webhook subscription (secret fields are never included). */ +export interface IWebhookSubscription { + id: string; + organization_id: string; + name: string; + url: string; + events: WebhookEventType[]; + active: boolean; + failure_count: number; + created_at: Date; + updated_at: Date; +} + +/** Request body for creating a new webhook subscription. */ +export interface ICreateWebhookRequest { + name: string; + url: string; + events: WebhookEventType[]; +} + +/** Request body for partially updating a webhook subscription. */ +export interface IUpdateWebhookRequest { + name?: string; + url?: string; + events?: WebhookEventType[]; + active?: boolean; +} + +// ============================================================================ +// Webhook Delivery +// ============================================================================ + +/** A single webhook delivery attempt record. */ +export interface IWebhookDelivery { + id: string; + subscription_id: string; + event_type: WebhookEventType; + payload: Record; + status: 'pending' | 'delivered' | 'failed' | 'dead_letter'; + http_status_code: number | null; + attempt_count: number; + next_retry_at: Date | null; + delivered_at: Date | null; + created_at: Date; + updated_at: Date; +} + +/** The envelope sent in every outbound webhook HTTP request body. */ +export interface IWebhookPayload { + /** UUID uniquely identifying this event occurrence. */ + id: string; + /** The event type identifier. */ + event: WebhookEventType; + /** ISO 8601 timestamp of when the event was published. */ + timestamp: string; + /** The organization that owns the resource that changed. */ + organization_id: string; + /** Event-specific data. Contents vary by event type. */ + data: Record; +} + +/** Paginated response for listing webhook deliveries. */ +export interface IPaginatedDeliveriesResponse { + deliveries: IWebhookDelivery[]; + total: number; + limit: number; + offset: number; +} diff --git a/src/vault/VaultClient.ts b/src/vault/VaultClient.ts index 7bdfb1b..041700f 100644 --- a/src/vault/VaultClient.ts +++ b/src/vault/VaultClient.ts @@ -156,6 +156,57 @@ export class VaultClient { return timingSafeEqual(Buffer.from(stored), Buffer.from(candidateSecret)); } + /** + * Writes a key-value data map to an arbitrary Vault KV v2 data path. + * + * Used by subsystems (e.g. webhook secrets) that store secrets at paths + * outside the standard credential hierarchy. + * + * @param dataPath - Full KV v2 data path (e.g. `secret/data/agentidp/webhooks/...`). + * @param data - Key-value pairs to store under the `data` envelope. + * @throws CredentialError if the Vault write fails. + */ + async writeArbitrarySecret(dataPath: string, data: Record): Promise { + try { + await this.client.write(dataPath, { data }); + } catch (err) { + throw new CredentialError( + `Failed to write secret to Vault at path '${dataPath}': ${err instanceof Error ? err.message : String(err)}`, + 'VAULT_WRITE_ERROR', + { dataPath }, + ); + } + } + + /** + * Reads a key-value data map from an arbitrary Vault KV v2 data path. + * + * @param dataPath - Full KV v2 data path. + * @returns The stored key-value data map. + * @throws CredentialError if the read fails or the path contains no data. + */ + async readArbitrarySecret(dataPath: string): Promise> { + let response: KvV2ReadResponse; + try { + response = (await this.client.read(dataPath)) as KvV2ReadResponse; + } catch (err) { + throw new CredentialError( + `Failed to read secret from Vault at path '${dataPath}': ${err instanceof Error ? err.message : String(err)}`, + 'VAULT_READ_ERROR', + { dataPath }, + ); + } + const stored = response?.data?.data; + if (typeof stored !== 'object' || stored === null) { + throw new CredentialError( + 'Vault returned an empty or missing data object.', + 'VAULT_SECRET_MISSING', + { dataPath }, + ); + } + return stored; + } + /** * Permanently deletes all versions of a credential secret from Vault. * Called on credential revocation. diff --git a/src/workers/WebhookDeliveryWorker.ts b/src/workers/WebhookDeliveryWorker.ts new file mode 100644 index 0000000..d1417ba --- /dev/null +++ b/src/workers/WebhookDeliveryWorker.ts @@ -0,0 +1,367 @@ +/** + * WebhookDeliveryWorker — Bull queue processor for outbound webhook HTTP delivery. + * + * Each job carries a delivery ID, subscription ID, organization ID, and the full + * IWebhookPayload to POST. The worker: + * 1. Fetches the subscription record from PostgreSQL. + * 2. Resolves the target hostname and blocks SSRF attempts (RFC 1918, loopback, link-local). + * 3. Computes HMAC-SHA256 signature using the secret from Vault (or logs a warning and sends + * unsigned when in bcrypt-only mode where the raw secret is unavailable). + * 4. POSTs the payload with SentryAgent signature headers. + * 5. Updates the delivery record status, retry schedule, and subscription failure count. + * + * Retry strategy: up to 5 attempts with exponential backoff (5 s × 2^attempt). + * On exhausting all retries the delivery is marked dead_letter and the + * `agentidp_webhook_dead_letters_total` Prometheus counter is incremented. + */ + +import Bull from 'bull'; +import { Pool } from 'pg'; +import { RedisClientType } from 'redis'; +import crypto from 'crypto'; +import { promises as dns } from 'dns'; +import { VaultClient } from '../vault/VaultClient.js'; +import { IWebhookPayload } from '../types/webhook.js'; +import { webhookDeadLettersTotal } from '../metrics/registry.js'; + +// ============================================================================ +// Job data interface +// ============================================================================ + +/** Data carried by every webhook delivery Bull job. */ +export interface WebhookDeliveryJob { + deliveryId: string; + subscriptionId: string; + organizationId: string; + payload: IWebhookPayload; +} + +// ============================================================================ +// Internal DB row shapes +// ============================================================================ + +/** Minimal subscription fields needed for delivery. */ +interface SubscriptionDeliveryRow { + id: string; + organization_id: string; + url: string; + active: boolean; + vault_secret_path: string; + secret_hash: string; + failure_count: number; +} + +// ============================================================================ +// SSRF protection helpers +// ============================================================================ + +/** Regular expressions covering RFC 1918, loopback, and link-local address ranges. */ +const BLOCKED_IPV4_PREFIXES: RegExp[] = [ + /^10\./, // 10.0.0.0/8 + /^172\.(1[6-9]|2[0-9]|3[01])\./, // 172.16.0.0/12 + /^192\.168\./, // 192.168.0.0/16 + /^127\./, // 127.0.0.0/8 (loopback) + /^169\.254\./, // 169.254.0.0/16 (link-local) +]; + +const BLOCKED_IPV6: RegExp[] = [ + /^::1$/, // IPv6 loopback + /^fe80:/i, // IPv6 link-local +]; + +/** + * Returns true if the given IP address is in a blocked range. + * + * @param address - IPv4 or IPv6 address string. + * @returns Whether the address is blocked (SSRF risk). + */ +function isBlockedAddress(address: string): boolean { + for (const pattern of BLOCKED_IPV4_PREFIXES) { + if (pattern.test(address)) return true; + } + for (const pattern of BLOCKED_IPV6) { + if (pattern.test(address)) return true; + } + return false; +} + +/** + * Resolves the hostname of a URL and throws if it resolves to a blocked address. + * + * @param targetUrl - The fully-qualified HTTPS URL to check. + * @throws Error if the hostname resolves to a blocked (SSRF-risk) IP address. + */ +async function guardSsrf(targetUrl: string): Promise { + const { hostname } = new URL(targetUrl); + const { address } = await dns.lookup(hostname); + if (isBlockedAddress(address)) { + throw new Error( + `SSRF protection: hostname '${hostname}' resolves to blocked address '${address}'.`, + ); + } +} + +// ============================================================================ +// Worker class +// ============================================================================ + +/** + * Bull queue worker that processes outbound webhook delivery jobs. + * Each instance manages a single Bull queue named 'webhook-delivery'. + */ +export class WebhookDeliveryWorker { + private readonly queue: Bull.Queue; + + /** + * @param pool - PostgreSQL connection pool for delivery record updates. + * @param vaultClient - Optional VaultClient for fetching HMAC signing secrets. + * @param redis - Redis client (unused directly; Bull uses redisUrl). + * @param redisUrl - Redis connection URL passed to Bull. + */ + constructor( + private readonly pool: Pool, + private readonly vaultClient: VaultClient | null, + _redis: RedisClientType, // reserved for future use; Bull uses redisUrl directly + redisUrl: string, + ) { + this.queue = new Bull('webhook-delivery', redisUrl); + } + + /** + * Registers the job processor with the Bull queue. + * Must be called once after construction. + */ + start(): void { + this.queue.process(this.processJob.bind(this)); + } + + /** + * Adds a webhook delivery job to the Bull queue. + * The job will be retried up to 5 times with exponential backoff starting at 5 seconds. + * + * @param job - The delivery job data. + */ + async enqueue(job: WebhookDeliveryJob): Promise { + await this.queue.add(job, { + attempts: 5, + backoff: { type: 'exponential', delay: 5000 }, + }); + } + + // ────────────────────────────────────────────────────────────────────────── + // Private processor + // ────────────────────────────────────────────────────────────────────────── + + /** + * Processes a single webhook delivery job. + * + * Steps: + * 1. Load subscription from DB. + * 2. Mark dead_letter if subscription is missing or inactive. + * 3. SSRF check — resolve hostname and block private/loopback/link-local addresses. + * 4. Fetch HMAC secret and compute HMAC-SHA256 signature. + * 5. POST payload with SentryAgent signature headers. + * 6. Update delivery record and subscription failure_count. + * + * @param job - Bull job carrying WebhookDeliveryJob data. + */ + private async processJob(job: Bull.Job): Promise { + const { deliveryId, subscriptionId, organizationId, payload } = job.data; + + // 1. Load subscription + const subResult = await this.pool.query( + `SELECT id, organization_id, url, active, vault_secret_path, secret_hash, failure_count + FROM webhook_subscriptions + WHERE id = $1`, + [subscriptionId], + ); + + if (subResult.rowCount === 0 || !subResult.rows[0].active) { + // Subscription gone or disabled — no point retrying + await this.markDeadLetter(deliveryId, null, organizationId); + return; + } + + const subscription = subResult.rows[0]; + + // 2. SSRF check + try { + await guardSsrf(subscription.url); + } catch (ssrfErr) { + console.error( + `[WebhookDeliveryWorker] SSRF blocked delivery ${deliveryId}: ${(ssrfErr as Error).message}`, + ); + await this.markDeadLetter(deliveryId, null, organizationId); + return; + } + + // 3. Fetch signing secret + let secret: string | null = null; + if (subscription.vault_secret_path !== 'local' && this.vaultClient !== null) { + try { + secret = await this.fetchSecretFromVault(subscription.vault_secret_path); + } catch (vaultErr) { + console.error( + `[WebhookDeliveryWorker] Failed to retrieve secret from Vault for delivery ${deliveryId}: ${(vaultErr as Error).message}`, + ); + } + } else { + // Local (bcrypt) mode — raw secret is unrecoverable; send without signature + console.warn( + `[WebhookDeliveryWorker] Delivery ${deliveryId}: subscription in local mode — sending without HMAC signature.`, + ); + } + + // 4. Compute HMAC-SHA256 signature if secret is available + const payloadJson = JSON.stringify(payload); + let signatureHeader = ''; + if (secret !== null) { + const hex = crypto.createHmac('sha256', secret).update(payloadJson).digest('hex'); + signatureHeader = `sha256=${hex}`; + } + + // 5. POST to subscription URL + const attemptCount = (job.attemptsMade ?? 0) + 1; + let httpStatus: number | null = null; + let deliverySucceeded = false; + + try { + const headers: Record = { + 'Content-Type': 'application/json', + 'X-SentryAgent-Event': payload.event, + 'X-SentryAgent-Delivery': deliveryId, + }; + if (signatureHeader) { + headers['X-SentryAgent-Signature'] = signatureHeader; + } + + const response = await fetch(subscription.url, { + method: 'POST', + headers, + body: payloadJson, + }); + + httpStatus = response.status; + deliverySucceeded = response.ok; // 2xx + } catch (networkErr) { + // Network error — treat as failure + console.error( + `[WebhookDeliveryWorker] Network error for delivery ${deliveryId}: ${(networkErr as Error).message}`, + ); + } + + // 6. Update delivery record + if (deliverySucceeded) { + await this.markDelivered(deliveryId, httpStatus as number, subscriptionId); + } else { + const isLastAttempt = attemptCount >= 5; + if (isLastAttempt) { + await this.markDeadLetter(deliveryId, httpStatus, organizationId); + await this.incrementFailureCount(subscriptionId); + } else { + // Exponential backoff: delay = 5000 * 2^(attempt-1) ms + const delayMs = 5000 * Math.pow(2, attemptCount - 1); + const nextRetryAt = new Date(Date.now() + delayMs); + await this.markFailed(deliveryId, httpStatus, attemptCount, nextRetryAt); + await this.incrementFailureCount(subscriptionId); + // Re-throw so Bull retries the job + throw new Error( + `Webhook delivery ${deliveryId} failed (HTTP ${httpStatus ?? 'network error'}), attempt ${attemptCount}/5.`, + ); + } + } + } + + // ────────────────────────────────────────────────────────────────────────── + // DB update helpers + // ────────────────────────────────────────────────────────────────────────── + + /** Marks a delivery as successfully delivered. */ + private async markDelivered( + deliveryId: string, + httpStatus: number, + subscriptionId: string, + ): Promise { + await this.pool.query( + `UPDATE webhook_deliveries + SET status = 'delivered', http_status_code = $2, delivered_at = NOW(), updated_at = NOW() + WHERE id = $1`, + [deliveryId, httpStatus], + ); + // Reset failure count on the subscription + await this.pool.query( + `UPDATE webhook_subscriptions SET failure_count = 0, updated_at = NOW() WHERE id = $1`, + [subscriptionId], + ); + } + + /** Marks a delivery as failed and schedules the next retry. */ + private async markFailed( + deliveryId: string, + httpStatus: number | null, + attemptCount: number, + nextRetryAt: Date, + ): Promise { + await this.pool.query( + `UPDATE webhook_deliveries + SET status = 'failed', + http_status_code = $2, + attempt_count = $3, + next_retry_at = $4, + updated_at = NOW() + WHERE id = $1`, + [deliveryId, httpStatus, attemptCount, nextRetryAt], + ); + } + + /** + * Marks a delivery as dead_letter, fires the Prometheus counter, + * and resets next_retry_at to NULL. + */ + private async markDeadLetter( + deliveryId: string, + httpStatus: number | null, + organizationId: string, + ): Promise { + await this.pool.query( + `UPDATE webhook_deliveries + SET status = 'dead_letter', + http_status_code = $2, + next_retry_at = NULL, + updated_at = NOW() + WHERE id = $1`, + [deliveryId, httpStatus], + ); + webhookDeadLettersTotal.inc({ organization_id: organizationId }); + } + + /** Increments the failure_count on the subscription. */ + private async incrementFailureCount(subscriptionId: string): Promise { + await this.pool.query( + `UPDATE webhook_subscriptions + SET failure_count = failure_count + 1, updated_at = NOW() + WHERE id = $1`, + [subscriptionId], + ); + } + + // ────────────────────────────────────────────────────────────────────────── + // Vault helper + // ────────────────────────────────────────────────────────────────────────── + + /** + * Fetches a webhook HMAC secret from Vault at the given KV v2 data path. + * + * @param vaultPath - Full KV v2 data path. + * @returns The raw HMAC secret string. + * @throws Error if the secret is missing or empty. + */ + private async fetchSecretFromVault(vaultPath: string): Promise { + const data = await this.vaultClient!.readArbitrarySecret(vaultPath); + const secret = data['webhookSecret']; + if (typeof secret !== 'string' || secret.length === 0) { + throw new Error(`Empty or missing webhook secret at Vault path: ${vaultPath}`); + } + return secret; + } +} diff --git a/tests/unit/metrics/registry.test.ts b/tests/unit/metrics/registry.test.ts index 54f90af..588bb76 100644 --- a/tests/unit/metrics/registry.test.ts +++ b/tests/unit/metrics/registry.test.ts @@ -28,9 +28,9 @@ describe('metricsRegistry', () => { expect(metricsRegistry).not.toBe(register); }); - it('contains exactly 6 metric entries', async () => { + it('contains exactly 7 metric entries', async () => { const entries = await metricsRegistry.getMetricsAsJSON(); - expect(entries).toHaveLength(6); + expect(entries).toHaveLength(7); }); // ────────────────────────────────────────────────────────────────── diff --git a/tests/unit/services/WebhookService.test.ts b/tests/unit/services/WebhookService.test.ts new file mode 100644 index 0000000..84f1456 --- /dev/null +++ b/tests/unit/services/WebhookService.test.ts @@ -0,0 +1,377 @@ +/** + * Unit tests for src/services/WebhookService.ts + * + * Covers: createSubscription, listSubscriptions, getSubscription, + * updateSubscription, deleteSubscription, listDeliveries, + * getSubscriptionSecret — all paths including error cases. + */ + +import crypto from 'crypto'; +import { Pool } from 'pg'; +import { RedisClientType } from 'redis'; +import { WebhookService, WebhookNotFoundError, WebhookValidationError } from '../../../src/services/WebhookService'; +import { VaultClient } from '../../../src/vault/VaultClient'; +import { IWebhookSubscription } from '../../../src/types/webhook'; + +// ─── Mocks ──────────────────────────────────────────────────────────────────── + +jest.mock('pg', () => ({ + Pool: jest.fn().mockImplementation(() => ({ + query: jest.fn(), + })), +})); + +jest.mock('../../../src/vault/VaultClient'); + +// ─── Helpers ────────────────────────────────────────────────────────────────── + +const ORG_ID = crypto.randomUUID(); +const SUB_ID = crypto.randomUUID(); +const NOW = new Date('2026-03-30T10:00:00Z'); + +function makeSubRow(overrides: Partial> = {}) { + return { + id: SUB_ID, + organization_id: ORG_ID, + name: 'Test Hook', + url: 'https://example.com/hook', + events: ['agent.created'], + secret_hash: 'vault', + vault_secret_path: 'secret/data/agentidp/webhooks/org/sub/secret', + active: true, + failure_count: 0, + created_at: NOW, + updated_at: NOW, + ...overrides, + }; +} + +function makeDeliveryRow(overrides: Partial> = {}) { + return { + id: crypto.randomUUID(), + subscription_id: SUB_ID, + event_type: 'agent.created', + payload: { id: 'evt-1', event: 'agent.created', timestamp: NOW.toISOString(), organization_id: ORG_ID, data: {} }, + status: 'pending', + http_status_code: null, + attempt_count: 0, + next_retry_at: null, + delivered_at: null, + created_at: NOW, + updated_at: NOW, + ...overrides, + }; +} + +// ─── Suite ──────────────────────────────────────────────────────────────────── + +describe('WebhookService', () => { + let pool: jest.Mocked; + let vaultClient: jest.Mocked; + let service: WebhookService; + const mockRedis = {} as RedisClientType; + + beforeEach(() => { + jest.clearAllMocks(); + pool = new Pool() as jest.Mocked; + vaultClient = new (VaultClient as jest.MockedClass)('http://vault', 'token') as jest.Mocked; + service = new WebhookService(pool, vaultClient, mockRedis); + }); + + // ──────────────────────────────────────────────────────────────────────────── + // createSubscription + // ──────────────────────────────────────────────────────────────────────────── + describe('createSubscription()', () => { + it('stores secret in Vault when vaultClient present and returns subscription with secret', async () => { + vaultClient.writeArbitrarySecret = jest.fn().mockResolvedValue(undefined); + (pool.query as jest.Mock).mockResolvedValueOnce({ rows: [makeSubRow()], rowCount: 1 }); + + const result = await service.createSubscription(ORG_ID, { + name: 'Test Hook', + url: 'https://example.com/hook', + events: ['agent.created'], + }); + + expect(vaultClient.writeArbitrarySecret).toHaveBeenCalledTimes(1); + + const [vaultPath, secretData] = (vaultClient.writeArbitrarySecret as jest.Mock).mock.calls[0] as [string, Record]; + expect(vaultPath).toMatch(/^secret\/data\/agentidp\/webhooks\//); + expect(typeof secretData['webhookSecret']).toBe('string'); + expect(secretData['webhookSecret'].length).toBeGreaterThan(0); + + expect(typeof result.secret).toBe('string'); + expect(result.secret.length).toBeGreaterThan(0); + expect(result.id).toBe(SUB_ID); + expect(result.organization_id).toBe(ORG_ID); + }); + + it('uses bcrypt hash in local mode (no vaultClient) and returns subscription with secret', async () => { + const localService = new WebhookService(pool, null, mockRedis); + (pool.query as jest.Mock).mockResolvedValueOnce({ + rows: [makeSubRow({ vault_secret_path: 'local', secret_hash: '$2b$10$hash' })], + rowCount: 1, + }); + + const result = await localService.createSubscription(ORG_ID, { + name: 'Local Hook', + url: 'https://example.com/hook', + events: ['agent.created'], + }); + + const insertArgs = (pool.query as jest.Mock).mock.calls[0][1] as unknown[]; + const secretHashArg = insertArgs[5] as string; + expect(secretHashArg).toMatch(/^\$2[ab]\$/); + expect(insertArgs[6]).toBe('local'); + expect(typeof result.secret).toBe('string'); + expect(result.secret.length).toBeGreaterThan(0); + }); + + it('throws WebhookValidationError for http:// URL', async () => { + await expect( + service.createSubscription(ORG_ID, { + name: 'Bad Hook', + url: 'http://example.com/hook', + events: ['agent.created'], + }), + ).rejects.toThrow(WebhookValidationError); + }); + + it('throws WebhookValidationError for an invalid URL string', async () => { + await expect( + service.createSubscription(ORG_ID, { + name: 'Bad Hook', + url: 'not-a-url', + events: ['agent.created'], + }), + ).rejects.toThrow(WebhookValidationError); + }); + + it('throws WebhookValidationError when events array is empty', async () => { + await expect( + service.createSubscription(ORG_ID, { + name: 'Empty Events Hook', + url: 'https://example.com/hook', + events: [], + }), + ).rejects.toThrow(WebhookValidationError); + }); + }); + + // ──────────────────────────────────────────────────────────────────────────── + // listSubscriptions + // ──────────────────────────────────────────────────────────────────────────── + describe('listSubscriptions()', () => { + it('returns all subscriptions for the org with secret fields excluded', async () => { + const rows = [makeSubRow(), makeSubRow({ id: crypto.randomUUID(), name: 'Hook 2' })]; + (pool.query as jest.Mock).mockResolvedValueOnce({ rows, rowCount: 2 }); + + const result = await service.listSubscriptions(ORG_ID); + + expect(result).toHaveLength(2); + result.forEach((sub: IWebhookSubscription) => { + expect((sub as unknown as Record)['secret_hash']).toBeUndefined(); + expect((sub as unknown as Record)['vault_secret_path']).toBeUndefined(); + expect(sub.organization_id).toBe(ORG_ID); + }); + }); + + it('returns empty array when org has no subscriptions', async () => { + (pool.query as jest.Mock).mockResolvedValueOnce({ rows: [], rowCount: 0 }); + const result = await service.listSubscriptions(ORG_ID); + expect(result).toEqual([]); + }); + }); + + // ──────────────────────────────────────────────────────────────────────────── + // getSubscription + // ──────────────────────────────────────────────────────────────────────────── + describe('getSubscription()', () => { + it('returns the matching subscription for the correct org', async () => { + (pool.query as jest.Mock).mockResolvedValueOnce({ rows: [makeSubRow()], rowCount: 1 }); + + const result = await service.getSubscription(SUB_ID, ORG_ID); + + expect(result.id).toBe(SUB_ID); + expect(result.organization_id).toBe(ORG_ID); + }); + + it('throws WebhookNotFoundError when subscription belongs to another org', async () => { + (pool.query as jest.Mock).mockResolvedValueOnce({ rows: [], rowCount: 0 }); + + await expect(service.getSubscription(SUB_ID, crypto.randomUUID())).rejects.toThrow( + WebhookNotFoundError, + ); + }); + + it('throws WebhookNotFoundError when subscription does not exist', async () => { + (pool.query as jest.Mock).mockResolvedValueOnce({ rows: [], rowCount: 0 }); + + await expect(service.getSubscription(crypto.randomUUID(), ORG_ID)).rejects.toThrow( + WebhookNotFoundError, + ); + }); + }); + + // ──────────────────────────────────────────────────────────────────────────── + // updateSubscription + // ──────────────────────────────────────────────────────────────────────────── + describe('updateSubscription()', () => { + it('updates the name and returns the updated subscription', async () => { + const updatedRow = makeSubRow({ name: 'Updated Hook' }); + (pool.query as jest.Mock) + .mockResolvedValueOnce({ rows: [makeSubRow()], rowCount: 1 }) // fetchRow (ownership check) + .mockResolvedValueOnce({ rows: [updatedRow], rowCount: 1 }); // UPDATE RETURNING + + const result = await service.updateSubscription(SUB_ID, ORG_ID, { name: 'Updated Hook' }); + + expect(result.name).toBe('Updated Hook'); + }); + + it('validates the new URL when provided and rejects http://', async () => { + (pool.query as jest.Mock).mockResolvedValueOnce({ rows: [makeSubRow()], rowCount: 1 }); + + await expect( + service.updateSubscription(SUB_ID, ORG_ID, { url: 'http://bad.example.com' }), + ).rejects.toThrow(WebhookValidationError); + }); + + it('throws WebhookNotFoundError when subscription is missing', async () => { + (pool.query as jest.Mock).mockResolvedValueOnce({ rows: [], rowCount: 0 }); + + await expect( + service.updateSubscription(crypto.randomUUID(), ORG_ID, { name: 'x' }), + ).rejects.toThrow(WebhookNotFoundError); + }); + + it('returns current subscription unchanged when no fields are provided', async () => { + const row = makeSubRow(); + (pool.query as jest.Mock) + .mockResolvedValueOnce({ rows: [row], rowCount: 1 }) // fetchRow (ownership check) + .mockResolvedValueOnce({ rows: [row], rowCount: 1 }); // getSubscription → fetchRow + + const result = await service.updateSubscription(SUB_ID, ORG_ID, {}); + + expect(result.id).toBe(SUB_ID); + }); + }); + + // ──────────────────────────────────────────────────────────────────────────── + // deleteSubscription + // ──────────────────────────────────────────────────────────────────────────── + describe('deleteSubscription()', () => { + it('deletes the subscription successfully', async () => { + (pool.query as jest.Mock).mockResolvedValueOnce({ rows: [], rowCount: 1 }); + + await expect(service.deleteSubscription(SUB_ID, ORG_ID)).resolves.toBeUndefined(); + expect(pool.query).toHaveBeenCalledTimes(1); + }); + + it('throws WebhookNotFoundError when not found or wrong org', async () => { + (pool.query as jest.Mock).mockResolvedValueOnce({ rows: [], rowCount: 0 }); + + await expect(service.deleteSubscription(crypto.randomUUID(), ORG_ID)).rejects.toThrow( + WebhookNotFoundError, + ); + }); + }); + + // ──────────────────────────────────────────────────────────────────────────── + // listDeliveries + // ──────────────────────────────────────────────────────────────────────────── + describe('listDeliveries()', () => { + it('verifies org ownership and returns paginated deliveries', async () => { + const deliveries = [makeDeliveryRow(), makeDeliveryRow()]; + (pool.query as jest.Mock) + .mockResolvedValueOnce({ rows: [makeSubRow()], rowCount: 1 }) // fetchRow + .mockResolvedValueOnce({ rows: [{ count: '5' }], rowCount: 1 }) // COUNT + .mockResolvedValueOnce({ rows: deliveries, rowCount: 2 }); // SELECT + + const result = await service.listDeliveries(SUB_ID, ORG_ID, 20, 0); + + expect(result.total).toBe(5); + expect(result.limit).toBe(20); + expect(result.offset).toBe(0); + expect(result.deliveries).toHaveLength(2); + }); + + it('throws WebhookNotFoundError when subscription belongs to another org', async () => { + (pool.query as jest.Mock).mockResolvedValueOnce({ rows: [], rowCount: 0 }); + + await expect( + service.listDeliveries(SUB_ID, crypto.randomUUID(), 20, 0), + ).rejects.toThrow(WebhookNotFoundError); + }); + + it('passes correct limit and offset to the query', async () => { + (pool.query as jest.Mock) + .mockResolvedValueOnce({ rows: [makeSubRow()], rowCount: 1 }) + .mockResolvedValueOnce({ rows: [{ count: '50' }], rowCount: 1 }) + .mockResolvedValueOnce({ rows: [], rowCount: 0 }); + + const result = await service.listDeliveries(SUB_ID, ORG_ID, 10, 30); + + expect(result.limit).toBe(10); + expect(result.offset).toBe(30); + + const selectArgs = (pool.query as jest.Mock).mock.calls[2][1] as unknown[]; + expect(selectArgs[1]).toBe(10); + expect(selectArgs[2]).toBe(30); + }); + }); + + // ──────────────────────────────────────────────────────────────────────────── + // getSubscriptionSecret + // ──────────────────────────────────────────────────────────────────────────── + describe('getSubscriptionSecret()', () => { + it('returns the secret from Vault in Vault mode', async () => { + const vaultPath = 'secret/data/agentidp/webhooks/org/sub/secret'; + (pool.query as jest.Mock).mockResolvedValueOnce({ + rows: [makeSubRow({ vault_secret_path: vaultPath })], + rowCount: 1, + }); + vaultClient.readArbitrarySecret = jest.fn().mockResolvedValue({ webhookSecret: 'mysecret' }); + + const result = await service.getSubscriptionSecret(SUB_ID, ORG_ID); + + expect(result).toBe('mysecret'); + expect(vaultClient.readArbitrarySecret).toHaveBeenCalledWith(vaultPath); + }); + + it('throws WebhookValidationError in local mode (secret not recoverable)', async () => { + (pool.query as jest.Mock).mockResolvedValueOnce({ + rows: [makeSubRow({ vault_secret_path: 'local' })], + rowCount: 1, + }); + + await expect(service.getSubscriptionSecret(SUB_ID, ORG_ID)).rejects.toThrow( + WebhookValidationError, + ); + }); + + it('throws WebhookNotFoundError when subscription does not exist', async () => { + (pool.query as jest.Mock).mockResolvedValueOnce({ rows: [], rowCount: 0 }); + + await expect(service.getSubscriptionSecret(crypto.randomUUID(), ORG_ID)).rejects.toThrow( + WebhookNotFoundError, + ); + }); + }); + + // ──────────────────────────────────────────────────────────────────────────── + // Error class properties + // ──────────────────────────────────────────────────────────────────────────── + describe('Error classes', () => { + it('WebhookNotFoundError has httpStatus 404 and correct code', () => { + const err = new WebhookNotFoundError('abc'); + expect(err.httpStatus).toBe(404); + expect(err.code).toBe('WEBHOOK_NOT_FOUND'); + expect(err.details).toEqual({ subscriptionId: 'abc' }); + }); + + it('WebhookValidationError has httpStatus 400 and correct code', () => { + const err = new WebhookValidationError('bad', { field: 'url' }); + expect(err.httpStatus).toBe(400); + expect(err.code).toBe('WEBHOOK_VALIDATION_ERROR'); + }); + }); +});