feat(phase-3): workstream 5 — Webhooks & Event Streaming

- DB migrations 016/017: webhook_subscriptions and webhook_deliveries tables
- WebhookService: CRUD for subscriptions, Vault-backed secret storage, delivery history
- WebhookDeliveryWorker: Bull queue, HMAC-SHA256 signatures, exponential backoff,
  SSRF protection (RFC 1918 + loopback + link-local rejection), dead-letter handling
- EventPublisher: publishes 10 event types (agent/credential/token lifecycle);
  optional Kafka adapter activated via KAFKA_BROKERS env var
- AgentService, CredentialService, OAuth2Service: wired to EventPublisher
- WebhookController + routes: 6 endpoints with webhooks:read / webhooks:write scope guards
- KafkaAdapter: optional Kafka producer (kafkajs), no-op when KAFKA_BROKERS unset
- OAuthScope extended: webhooks:read, webhooks:write
- AuditAction extended: webhook.created, webhook.updated, webhook.deleted
- Metrics: agentidp_webhook_dead_letters_total counter added to registry
- 523 unit tests passing; TypeScript strict throughout, zero `any`

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
SentryAgent.ai Developer
2026-03-31 00:07:41 +00:00
parent 03b5de300c
commit 272b69f18d
20 changed files with 1994 additions and 25 deletions

View File

@@ -0,0 +1,29 @@
/**
* KafkaAdapter — optional Kafka producer activated when KAFKA_BROKERS env var is set.
* Returns null if KAFKA_BROKERS is not configured.
*
* The producer connects at application startup. When KAFKA_BROKERS is absent the
* EventPublisher operates in webhook-only mode and Kafka delivery is silently skipped.
*/
import { Kafka, Producer as KafkaProducer } from 'kafkajs';
/**
* Creates and connects a KafkaJS producer.
* Returns null when the KAFKA_BROKERS environment variable is not set.
*
* @returns A connected KafkaProducer, or null if Kafka is not configured.
*/
export async function createKafkaProducer(): Promise<KafkaProducer | null> {
const brokers = process.env['KAFKA_BROKERS'];
if (!brokers) return null;
const kafka = new Kafka({
clientId: 'agentidp',
brokers: brokers.split(','),
});
const producer = kafka.producer();
await producer.connect();
return producer;
}

View File

@@ -27,6 +27,10 @@ import { DIDService } from './services/DIDService.js';
import { OIDCKeyService } from './services/OIDCKeyService.js';
import { IDTokenService } from './services/IDTokenService.js';
import { FederationService } from './services/FederationService.js';
import { WebhookService } from './services/WebhookService.js';
import { EventPublisher } from './services/EventPublisher.js';
import { WebhookDeliveryWorker } from './workers/WebhookDeliveryWorker.js';
import { createKafkaProducer } from './adapters/KafkaAdapter.js';
import { AgentController } from './controllers/AgentController.js';
import { TokenController } from './controllers/TokenController.js';
@@ -36,6 +40,7 @@ import { OrgController } from './controllers/OrgController.js';
import { DIDController } from './controllers/DIDController.js';
import { OIDCController } from './controllers/OIDCController.js';
import { FederationController } from './controllers/FederationController.js';
import { WebhookController } from './controllers/WebhookController.js';
import { createAgentsRouter } from './routes/agents.js';
import { createTokenRouter } from './routes/token.js';
@@ -47,6 +52,7 @@ import { createOrgsRouter } from './routes/organizations.js';
import { createDIDRouter } from './routes/did.js';
import { createOIDCRouter } from './routes/oidc.js';
import { createFederationRouter } from './routes/federation.js';
import { createWebhooksRouter } from './routes/webhooks.js';
import { errorHandler } from './middleware/errorHandler.js';
import { createOpaMiddleware } from './middleware/opa.js';
@@ -124,13 +130,30 @@ export async function createApp(): Promise<Application> {
console.log('[AgentIdP] Vault integration enabled — new credentials will use Vault KV v2');
}
// ────────────────────────────────────────────────────────────────
// Kafka (optional) — activated when KAFKA_BROKERS env var is set
// ────────────────────────────────────────────────────────────────
const kafkaProducer = await createKafkaProducer();
if (kafkaProducer !== null) {
console.log('[AgentIdP] Kafka integration enabled — events will be produced to agentidp-events');
}
// ────────────────────────────────────────────────────────────────
// Webhook infrastructure
// ────────────────────────────────────────────────────────────────
const redisUrl = process.env['REDIS_URL'] ?? 'redis://localhost:6379';
const webhookService = new WebhookService(pool, vaultClient, redis as RedisClientType);
const webhookWorker = new WebhookDeliveryWorker(pool, vaultClient, redis as RedisClientType, redisUrl);
webhookWorker.start();
const eventPublisher = new EventPublisher(webhookWorker, pool, kafkaProducer);
// ────────────────────────────────────────────────────────────────
// Service layer
// ────────────────────────────────────────────────────────────────
const auditService = new AuditService(auditRepo);
const didService = new DIDService(pool, vaultClient, redis as RedisClientType);
const agentService = new AgentService(agentRepo, credentialRepo, auditService, didService);
const credentialService = new CredentialService(credentialRepo, agentRepo, auditService, vaultClient);
const agentService = new AgentService(agentRepo, credentialRepo, auditService, didService, eventPublisher);
const credentialService = new CredentialService(credentialRepo, agentRepo, auditService, vaultClient, eventPublisher);
const orgService = new OrgService(orgRepo, agentRepo);
const privateKey = process.env['JWT_PRIVATE_KEY'];
@@ -153,6 +176,7 @@ export async function createApp(): Promise<Application> {
publicKey,
vaultClient,
idTokenService,
eventPublisher,
);
// ────────────────────────────────────────────────────────────────
@@ -172,6 +196,7 @@ export async function createApp(): Promise<Application> {
const oidcController = new OIDCController(oidcKeyService, agentRepo);
const federationService = new FederationService(pool, redis as RedisClientType);
const federationController = new FederationController(federationService);
const webhookController = new WebhookController(webhookService);
// ────────────────────────────────────────────────────────────────
// Org context middleware — sets PostgreSQL session variable app.organization_id
@@ -209,6 +234,7 @@ export async function createApp(): Promise<Application> {
app.use(`${API_BASE}/audit`, createAuditRouter(auditController, opaMiddleware));
app.use(`${API_BASE}/organizations`, createOrgsRouter(orgController, opaMiddleware));
app.use(`${API_BASE}`, createFederationRouter(federationController, authMiddleware, opaMiddleware));
app.use(`${API_BASE}/webhooks`, createWebhooksRouter(webhookController, authMiddleware, opaMiddleware));
// ────────────────────────────────────────────────────────────────
// Dashboard static assets (served from dashboard/dist/)

View File

@@ -0,0 +1,180 @@
/**
* WebhookController — request handlers for webhook subscription management and delivery history.
*
* Handlers:
* POST /webhooks → createSubscription
* GET /webhooks → listSubscriptions
* GET /webhooks/:id → getSubscription
* PATCH /webhooks/:id → updateSubscription
* DELETE /webhooks/:id → deleteSubscription
* GET /webhooks/:id/deliveries → listDeliveries
*
* All handlers read `req.user.organization_id` to scope operations to the
* authenticated agent's organization.
*/
import { Request, Response, NextFunction } from 'express';
import { WebhookService } from '../services/WebhookService.js';
import { ITokenPayload } from '../types/index.js';
import {
ICreateWebhookRequest,
IUpdateWebhookRequest,
} from '../types/webhook.js';
import { AuthenticationError } from '../utils/errors.js';
/**
* Controller for all webhook subscription and delivery endpoints.
* Delegates all business logic to WebhookService.
*/
export class WebhookController {
/**
* @param webhookService - Service that manages webhook subscriptions and deliveries.
*/
constructor(private readonly webhookService: WebhookService) {}
// ─────────────────────────────────────────────────────────────────────────
// Private helpers
// ─────────────────────────────────────────────────────────────────────────
/**
* Extracts and asserts the organization_id from the token payload.
* Throws AuthenticationError (401) if the token was issued without an org scope.
*/
private requireOrgId(user: ITokenPayload): string {
if (!user.organization_id) {
throw new AuthenticationError('Token does not carry an organization_id — multi-tenant context required');
}
return user.organization_id;
}
// ─────────────────────────────────────────────────────────────────────────
// Public handlers
// ─────────────────────────────────────────────────────────────────────────
/**
* Creates a new webhook subscription for the authenticated agent's organization.
*
* Returns 201 with the subscription and the one-time signing secret.
*
* @param req - Express request. Body must conform to ICreateWebhookRequest.
* @param res - Express response.
* @param next - Express next function — forwards errors to the global error handler.
*/
async createSubscription(req: Request, res: Response, next: NextFunction): Promise<void> {
try {
const user = req.user as ITokenPayload;
const body = req.body as ICreateWebhookRequest;
const result = await this.webhookService.createSubscription(this.requireOrgId(user), body);
res.status(201).json(result);
} catch (err) {
next(err);
}
}
/**
* Returns all webhook subscriptions for the authenticated agent's organization.
*
* Responds 200 with an array of IWebhookSubscription objects.
*
* @param req - Express request.
* @param res - Express response.
* @param next - Express next function — forwards errors to the global error handler.
*/
async listSubscriptions(req: Request, res: Response, next: NextFunction): Promise<void> {
try {
const user = req.user as ITokenPayload;
const subscriptions = await this.webhookService.listSubscriptions(this.requireOrgId(user));
res.status(200).json(subscriptions);
} catch (err) {
next(err);
}
}
/**
* Returns a single webhook subscription by its UUID.
*
* Responds 200 with the matching IWebhookSubscription.
* Responds 404 if the subscription does not exist or belongs to another organization.
*
* @param req - Express request. `req.params.id` must be the subscription UUID.
* @param res - Express response.
* @param next - Express next function — forwards errors to the global error handler.
*/
async getSubscription(req: Request, res: Response, next: NextFunction): Promise<void> {
try {
const user = req.user as ITokenPayload;
const { id } = req.params;
const subscription = await this.webhookService.getSubscription(id, this.requireOrgId(user));
res.status(200).json(subscription);
} catch (err) {
next(err);
}
}
/**
* Partially updates an existing webhook subscription.
*
* Responds 200 with the updated IWebhookSubscription.
* Responds 404 if the subscription does not exist or belongs to another organization.
*
* @param req - Express request. `req.params.id` is the subscription UUID; body is IUpdateWebhookRequest.
* @param res - Express response.
* @param next - Express next function — forwards errors to the global error handler.
*/
async updateSubscription(req: Request, res: Response, next: NextFunction): Promise<void> {
try {
const user = req.user as ITokenPayload;
const { id } = req.params;
const body = req.body as IUpdateWebhookRequest;
const subscription = await this.webhookService.updateSubscription(id, this.requireOrgId(user), body);
res.status(200).json(subscription);
} catch (err) {
next(err);
}
}
/**
* Permanently deletes a webhook subscription and all its delivery records.
*
* Responds 204 No Content on success.
* Responds 404 if the subscription does not exist or belongs to another organization.
*
* @param req - Express request. `req.params.id` must be the subscription UUID.
* @param res - Express response.
* @param next - Express next function — forwards errors to the global error handler.
*/
async deleteSubscription(req: Request, res: Response, next: NextFunction): Promise<void> {
try {
const user = req.user as ITokenPayload;
const { id } = req.params;
await this.webhookService.deleteSubscription(id, this.requireOrgId(user));
res.status(204).send();
} catch (err) {
next(err);
}
}
/**
* Returns a paginated list of delivery records for a webhook subscription.
*
* Responds 200 with an IPaginatedDeliveriesResponse.
* Responds 404 if the subscription does not exist or belongs to another organization.
*
* @param req - Express request. `req.params.id` is the subscription UUID.
* Optional query params: limit (default 20), offset (default 0).
* @param res - Express response.
* @param next - Express next function — forwards errors to the global error handler.
*/
async listDeliveries(req: Request, res: Response, next: NextFunction): Promise<void> {
try {
const user = req.user as ITokenPayload;
const { id } = req.params;
const limit = Math.max(1, parseInt(String(req.query['limit'] ?? '20'), 10) || 20);
const offset = Math.max(0, parseInt(String(req.query['offset'] ?? '0'), 10) || 0);
const result = await this.webhookService.listDeliveries(id, this.requireOrgId(user), limit, offset);
res.status(200).json(result);
} catch (err) {
next(err);
}
}
}

View File

@@ -0,0 +1,19 @@
-- webhook_subscriptions: per-organization webhook delivery targets.
-- Each subscription registers a URL to receive HTTP POST callbacks for specified event types.
-- The HMAC signing secret is stored in Vault (vault_secret_path) when Vault is configured,
-- or bcrypt-hashed into secret_hash in local mode. The raw secret is never stored in PostgreSQL.
CREATE TABLE IF NOT EXISTS webhook_subscriptions (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
organization_id UUID NOT NULL REFERENCES organizations(id) ON DELETE CASCADE,
name VARCHAR(255) NOT NULL,
url VARCHAR(2048) NOT NULL, -- HTTPS delivery target
events JSONB NOT NULL DEFAULT '[]', -- array of WebhookEventType strings
secret_hash VARCHAR(255) NOT NULL, -- bcrypt hash (local mode) or 'vault' sentinel
vault_secret_path VARCHAR(512) NOT NULL, -- Vault KV path for HMAC signing secret, or 'local'
active BOOLEAN NOT NULL DEFAULT true,
failure_count INTEGER NOT NULL DEFAULT 0,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_webhook_subscriptions_organization_id ON webhook_subscriptions(organization_id);
CREATE INDEX IF NOT EXISTS idx_webhook_subscriptions_active ON webhook_subscriptions(active);

View File

@@ -0,0 +1,19 @@
-- webhook_deliveries: audit trail for every webhook delivery attempt.
-- A new row is inserted for each outbound event dispatch. Retries are tracked via
-- attempt_count and next_retry_at. Terminal states are 'delivered' and 'dead_letter'.
CREATE TABLE IF NOT EXISTS webhook_deliveries (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
subscription_id UUID NOT NULL REFERENCES webhook_subscriptions(id) ON DELETE CASCADE,
event_type VARCHAR(128) NOT NULL,
payload JSONB NOT NULL, -- full IWebhookPayload delivered
status VARCHAR(32) NOT NULL DEFAULT 'pending', -- 'pending' | 'delivered' | 'failed' | 'dead_letter'
http_status_code INTEGER, -- populated after delivery attempt; NULL until first attempt
attempt_count INTEGER NOT NULL DEFAULT 0,
next_retry_at TIMESTAMPTZ, -- NULL until a failure sets a retry schedule
delivered_at TIMESTAMPTZ, -- NULL until successful delivery
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_webhook_deliveries_subscription_id ON webhook_deliveries(subscription_id);
CREATE INDEX IF NOT EXISTS idx_webhook_deliveries_status ON webhook_deliveries(status);
CREATE INDEX IF NOT EXISTS idx_webhook_deliveries_next_retry_at ON webhook_deliveries(next_retry_at);

View File

@@ -77,3 +77,15 @@ export const redisCommandDurationSeconds = new Histogram({
buckets: [0.0005, 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25],
registers: [metricsRegistry],
});
/**
* Total number of webhook deliveries that reached the dead-letter state
* (i.e. exhausted all retry attempts without a 2xx response).
* Labels: organization_id
*/
export const webhookDeadLettersTotal = new Counter({
name: 'agentidp_webhook_dead_letters_total',
help: 'Total number of webhook deliveries that exhausted all retry attempts.',
labelNames: ['organization_id'] as const,
registers: [metricsRegistry],
});

86
src/routes/webhooks.ts Normal file
View File

@@ -0,0 +1,86 @@
/**
* Webhook routes — subscription management and delivery history.
*
* All endpoints require authentication (authMiddleware) and OPA scope enforcement.
*
* Endpoints:
* POST /webhooks → createSubscription (webhooks:write)
* GET /webhooks → listSubscriptions (webhooks:read)
* GET /webhooks/:id → getSubscription (webhooks:read)
* PATCH /webhooks/:id → updateSubscription (webhooks:write)
* DELETE /webhooks/:id → deleteSubscription (webhooks:write)
* GET /webhooks/:id/deliveries → listDeliveries (webhooks:read)
*
* Mount this router at `${API_BASE}/webhooks` in app.ts.
*/
import { Router, RequestHandler } from 'express';
import { WebhookController } from '../controllers/WebhookController.js';
import { asyncHandler } from '../utils/asyncHandler.js';
/**
* Creates and returns the Express router for webhook endpoints.
* Mount at `${API_BASE}/webhooks` (i.e. `/api/v1/webhooks`) in app.ts.
*
* @param controller - The webhook controller instance.
* @param authMiddleware - JWT authentication middleware.
* @param opaMiddleware - OPA authorization middleware (scope enforcement).
* @returns Configured Express router.
*/
export function createWebhooksRouter(
controller: WebhookController,
authMiddleware: RequestHandler,
opaMiddleware: RequestHandler,
): Router {
const router = Router();
// POST /webhooks — create a new subscription (webhooks:write)
router.post(
'/',
authMiddleware,
opaMiddleware,
asyncHandler(controller.createSubscription.bind(controller)),
);
// GET /webhooks — list all subscriptions for the org (webhooks:read)
router.get(
'/',
authMiddleware,
opaMiddleware,
asyncHandler(controller.listSubscriptions.bind(controller)),
);
// GET /webhooks/:id — get a single subscription (webhooks:read)
router.get(
'/:id',
authMiddleware,
opaMiddleware,
asyncHandler(controller.getSubscription.bind(controller)),
);
// PATCH /webhooks/:id — update a subscription (webhooks:write)
router.patch(
'/:id',
authMiddleware,
opaMiddleware,
asyncHandler(controller.updateSubscription.bind(controller)),
);
// DELETE /webhooks/:id — delete a subscription (webhooks:write)
router.delete(
'/:id',
authMiddleware,
opaMiddleware,
asyncHandler(controller.deleteSubscription.bind(controller)),
);
// GET /webhooks/:id/deliveries — list delivery history (webhooks:read)
router.get(
'/:id/deliveries',
authMiddleware,
opaMiddleware,
asyncHandler(controller.listDeliveries.bind(controller)),
);
return router;
}

View File

@@ -7,6 +7,7 @@ import { AgentRepository } from '../repositories/AgentRepository.js';
import { CredentialRepository } from '../repositories/CredentialRepository.js';
import { AuditService } from './AuditService.js';
import { DIDService } from './DIDService.js';
import { EventPublisher } from './EventPublisher.js';
import {
IAgent,
ICreateAgentRequest,
@@ -36,12 +37,15 @@ export class AgentService {
* @param didService - Optional DIDService. When provided, a W3C DID is generated for each
* newly registered agent. When null/undefined, DID generation is skipped
* (backward-compatible default).
* @param eventPublisher - Optional EventPublisher. When provided, lifecycle events are
* published as webhooks and Kafka messages (fire-and-forget).
*/
constructor(
private readonly agentRepository: AgentRepository,
private readonly credentialRepository: CredentialRepository,
private readonly auditService: AuditService,
private readonly didService: DIDService | null = null,
private readonly eventPublisher: EventPublisher | null = null,
) {}
/**
@@ -96,6 +100,13 @@ export class AgentService {
// Instrument: count successful agent registrations
agentsRegisteredTotal.inc({ deployment_env: data.deploymentEnv });
// Publish event (fire-and-forget)
void this.eventPublisher?.publishEvent(
agent.organizationId,
'agent.created',
{ agentId: agent.agentId, email: agent.email, name: agent.owner },
);
return agent;
}
@@ -184,6 +195,33 @@ export class AgentService {
{ updatedFields: Object.keys(data) },
);
// Publish lifecycle event (fire-and-forget)
if (auditAction === 'agent.updated') {
void this.eventPublisher?.publishEvent(
updated.organizationId,
'agent.updated',
{ agentId, changes: data },
);
} else if (auditAction === 'agent.suspended') {
void this.eventPublisher?.publishEvent(
updated.organizationId,
'agent.suspended',
{ agentId },
);
} else if (auditAction === 'agent.reactivated') {
void this.eventPublisher?.publishEvent(
updated.organizationId,
'agent.reactivated',
{ agentId },
);
} else if (auditAction === 'agent.decommissioned') {
void this.eventPublisher?.publishEvent(
updated.organizationId,
'agent.decommissioned',
{ agentId },
);
}
return updated;
}
@@ -224,5 +262,12 @@ export class AgentService {
userAgent,
{},
);
// Publish event (fire-and-forget)
void this.eventPublisher?.publishEvent(
agent.organizationId,
'agent.decommissioned',
{ agentId },
);
}
}

View File

@@ -7,6 +7,7 @@ import { CredentialRepository } from '../repositories/CredentialRepository.js';
import { AgentRepository } from '../repositories/AgentRepository.js';
import { AuditService } from './AuditService.js';
import { VaultClient } from '../vault/VaultClient.js';
import { EventPublisher } from './EventPublisher.js';
import {
ICredential,
ICredentialWithSecret,
@@ -34,12 +35,15 @@ export class CredentialService {
* @param auditService - The audit log service.
* @param vaultClient - Optional VaultClient. When provided, new credentials are stored in Vault.
* When null, bcrypt is used (Phase 1 behaviour).
* @param eventPublisher - Optional EventPublisher. When provided, credential events are
* published as webhooks and Kafka messages (fire-and-forget).
*/
constructor(
private readonly credentialRepository: CredentialRepository,
private readonly agentRepository: AgentRepository,
private readonly auditService: AuditService,
private readonly vaultClient: VaultClient | null = null,
private readonly eventPublisher: EventPublisher | null = null,
) {}
/**
@@ -104,6 +108,13 @@ export class CredentialService {
{ credentialId: credential.credentialId },
);
// Publish event (fire-and-forget)
void this.eventPublisher?.publishEvent(
agent.organizationId,
'credential.generated',
{ credentialId: credential.credentialId, agentId },
);
return { ...credential, clientSecret: plainSecret };
}
@@ -205,6 +216,13 @@ export class CredentialService {
{ credentialId },
);
// Publish event (fire-and-forget)
void this.eventPublisher?.publishEvent(
agent.organizationId,
'credential.rotated',
{ credentialId, agentId },
);
return { ...updated, clientSecret: plainSecret };
}
@@ -258,5 +276,12 @@ export class CredentialService {
userAgent,
{ credentialId },
);
// Publish event (fire-and-forget)
void this.eventPublisher?.publishEvent(
agent.organizationId,
'credential.revoked',
{ credentialId, agentId },
);
}
}

View File

@@ -0,0 +1,130 @@
/**
* EventPublisher — fire-and-forget event bus for agent lifecycle and credential events.
*
* On each publishEvent call the service:
* 1. Builds the IWebhookPayload envelope.
* 2. Queries for active webhook subscriptions that match the event type.
* 3. Creates a delivery record for each matching subscription and enqueues a Bull job.
* 4. Optionally produces the event to the 'agentidp-events' Kafka topic (when configured).
*
* Kafka errors are caught and logged but never propagate — event publishing must not
* block or fail the primary business operation.
*/
import { Pool } from 'pg';
import crypto from 'crypto';
import { Producer as KafkaProducer } from 'kafkajs';
import { WebhookDeliveryWorker } from '../workers/WebhookDeliveryWorker.js';
import { IWebhookPayload, WebhookEventType } from '../types/webhook.js';
/** Minimal subscription row shape needed for event fanout. */
interface ActiveSubscriptionRow {
id: string;
organization_id: string;
}
/**
* Fire-and-forget event publisher.
* Fanout to webhooks and optionally to Kafka on each publishEvent call.
*/
export class EventPublisher {
/**
* @param webhookWorker - Worker that enqueues outbound HTTP delivery jobs.
* @param pool - PostgreSQL connection pool (for subscription lookup and delivery inserts).
* @param kafkaProducer - Optional connected KafkaJS producer. Null when Kafka is not configured.
*/
constructor(
private readonly webhookWorker: WebhookDeliveryWorker,
private readonly pool: Pool,
private readonly kafkaProducer: KafkaProducer | null,
) {}
/**
* Publishes an event to all active matching webhook subscriptions and optionally to Kafka.
*
* This method is fire-and-forget. Individual delivery failures are logged but not
* thrown. Callers should not await this method — use `void this.eventPublisher?.publishEvent(...)`.
*
* @param orgId - The organization UUID that owns the resource that changed.
* @param eventType - The event type identifier.
* @param data - Event-specific payload data.
*/
async publishEvent(
orgId: string,
eventType: WebhookEventType,
data: Record<string, unknown>,
): Promise<void> {
const id = crypto.randomUUID();
const payload: IWebhookPayload = {
id,
event: eventType,
timestamp: new Date().toISOString(),
organization_id: orgId,
data,
};
// ── Webhook fanout ─────────────────────────────────────────────────────
try {
// Find all active subscriptions for this org that include this event type.
// The events column is a JSONB array; we check containment with @>.
const subResult = await this.pool.query<ActiveSubscriptionRow>(
`SELECT id, organization_id
FROM webhook_subscriptions
WHERE organization_id = $1
AND active = true
AND events @> $2::jsonb`,
[orgId, JSON.stringify([eventType])],
);
for (const sub of subResult.rows) {
try {
// Insert a delivery record
const deliveryResult = await this.pool.query<{ id: string }>(
`INSERT INTO webhook_deliveries
(subscription_id, event_type, payload, status, attempt_count)
VALUES ($1, $2, $3::jsonb, 'pending', 0)
RETURNING id`,
[sub.id, eventType, JSON.stringify(payload)],
);
const deliveryId = deliveryResult.rows[0].id;
// Enqueue the Bull delivery job
await this.webhookWorker.enqueue({
deliveryId,
subscriptionId: sub.id,
organizationId: orgId,
payload,
});
} catch (subErr) {
console.error(
`[EventPublisher] Failed to enqueue delivery for subscription ${sub.id}: ${(subErr as Error).message}`,
);
}
}
} catch (webhookErr) {
console.error(
`[EventPublisher] Webhook fanout error for event ${eventType} (org ${orgId}): ${(webhookErr as Error).message}`,
);
}
// ── Kafka fanout ───────────────────────────────────────────────────────
if (this.kafkaProducer !== null) {
try {
await this.kafkaProducer.send({
topic: 'agentidp-events',
messages: [
{
key: orgId,
value: JSON.stringify(payload),
},
],
});
} catch (kafkaErr) {
console.error(
`[EventPublisher] Kafka produce error for event ${eventType} (org ${orgId}): ${(kafkaErr as Error).message}`,
);
}
}
}
}

View File

@@ -9,6 +9,7 @@ import { AgentRepository } from '../repositories/AgentRepository.js';
import { AuditService } from './AuditService.js';
import { VaultClient } from '../vault/VaultClient.js';
import { IDTokenService } from './IDTokenService.js';
import { EventPublisher } from './EventPublisher.js';
import {
ITokenPayload,
ITokenResponse,
@@ -49,6 +50,8 @@ export class OAuth2Service {
* @param vaultClient - Optional VaultClient for Phase 2 credential verification.
* @param idTokenService - Optional IDTokenService; when provided and `openid` scope
* is requested, an OIDC ID token is appended to the token response.
* @param eventPublisher - Optional EventPublisher. When provided, token.issued and
* token.revoked events are published as webhooks and Kafka messages (fire-and-forget).
*/
constructor(
private readonly tokenRepository: TokenRepository,
@@ -59,6 +62,7 @@ export class OAuth2Service {
private readonly publicKey: string,
private readonly vaultClient: VaultClient | null = null,
private readonly idTokenService: IDTokenService | null = null,
private readonly eventPublisher: EventPublisher | null = null,
) {}
/**
@@ -211,6 +215,13 @@ export class OAuth2Service {
// Instrument: count successful token issuances
tokensIssuedTotal.inc({ scope });
// Publish event (fire-and-forget)
void this.eventPublisher?.publishEvent(
agent.organizationId ?? 'org_system',
'token.issued',
{ agentId: clientId, scope, jti },
);
const tokenResponse: ITokenResponse = {
access_token: accessToken,
token_type: 'Bearer',
@@ -323,6 +334,13 @@ export class OAuth2Service {
userAgent,
{ jti: decoded.jti },
);
// Publish event (fire-and-forget)
void this.eventPublisher?.publishEvent(
callerPayload.organization_id ?? 'org_system',
'token.revoked',
{ jti: decoded.jti },
);
}
// If token is malformed/undecoded, per RFC 7009 we still return success
}

View File

@@ -0,0 +1,475 @@
/**
* WebhookService — manages webhook subscriptions and delivery history.
*
* HMAC signing secrets are stored in Vault when available (vault_secret_path holds the KV path).
* In local mode (no Vault) the secret is bcrypt-hashed and stored in secret_hash, and
* vault_secret_path is set to the sentinel value 'local'. The raw secret is never persisted
* in PostgreSQL and is only returned once at subscription creation time.
*/
import { Pool } from 'pg';
import { RedisClientType } from 'redis';
import crypto from 'crypto';
import bcrypt from 'bcryptjs';
import { VaultClient } from '../vault/VaultClient.js';
import { SentryAgentError } from '../utils/errors.js';
import {
IWebhookSubscription,
ICreateWebhookRequest,
IUpdateWebhookRequest,
IWebhookDelivery,
IPaginatedDeliveriesResponse,
WebhookEventType,
} from '../types/webhook.js';
// ============================================================================
// Custom errors
// ============================================================================
/** 404 — Referenced webhook subscription was not found (or belongs to another org). */
export class WebhookNotFoundError extends SentryAgentError {
constructor(subscriptionId?: string) {
super(
'Webhook subscription with the specified ID was not found.',
'WEBHOOK_NOT_FOUND',
404,
subscriptionId ? { subscriptionId } : undefined,
);
}
}
/** 400 — Webhook subscription request failed validation. */
export class WebhookValidationError extends SentryAgentError {
constructor(message: string, details?: Record<string, unknown>) {
super(message, 'WEBHOOK_VALIDATION_ERROR', 400, details);
}
}
// ============================================================================
// Internal DB row shape
// ============================================================================
/** Internal representation of a webhook_subscriptions row. */
interface WebhookSubscriptionRow {
id: string;
organization_id: string;
name: string;
url: string;
events: WebhookEventType[];
secret_hash: string;
vault_secret_path: string;
active: boolean;
failure_count: number;
created_at: Date;
updated_at: Date;
}
/** Internal representation of a webhook_deliveries row. */
interface WebhookDeliveryRow {
id: string;
subscription_id: string;
event_type: string;
payload: Record<string, unknown>;
status: 'pending' | 'delivered' | 'failed' | 'dead_letter';
http_status_code: number | null;
attempt_count: number;
next_retry_at: Date | null;
delivered_at: Date | null;
created_at: Date;
updated_at: Date;
}
// ============================================================================
// Mappers
// ============================================================================
/**
* Maps a raw DB row to a public IWebhookSubscription (never includes secret fields).
*/
function mapRowToSubscription(row: WebhookSubscriptionRow): IWebhookSubscription {
return {
id: row.id,
organization_id: row.organization_id,
name: row.name,
url: row.url,
events: row.events,
active: row.active,
failure_count: row.failure_count,
created_at: row.created_at,
updated_at: row.updated_at,
};
}
/**
* Maps a raw DB row to a public IWebhookDelivery.
*/
function mapRowToDelivery(row: WebhookDeliveryRow): IWebhookDelivery {
return {
id: row.id,
subscription_id: row.subscription_id,
event_type: row.event_type as WebhookEventType,
payload: row.payload,
status: row.status,
http_status_code: row.http_status_code,
attempt_count: row.attempt_count,
next_retry_at: row.next_retry_at,
delivered_at: row.delivered_at,
created_at: row.created_at,
updated_at: row.updated_at,
};
}
// ============================================================================
// Service
// ============================================================================
/**
* Service for managing webhook subscriptions and delivery audit records.
* Coordinates PostgreSQL persistence and optional Vault secret storage.
*/
export class WebhookService {
/**
* @param pool - PostgreSQL connection pool.
* @param vaultClient - Optional VaultClient. When provided, HMAC secrets are stored in Vault.
* @param redis - Redis client (reserved for future caching needs).
*/
constructor(
private readonly pool: Pool,
private readonly vaultClient: VaultClient | null,
_redis: RedisClientType, // reserved for future subscription caching
) {}
// ──────────────────────────────────────────────────────────────────────────
// Subscriptions
// ──────────────────────────────────────────────────────────────────────────
/**
* Creates a new webhook subscription for the given organization.
*
* A 32-byte random HMAC signing secret is generated. When Vault is configured the
* secret is stored in Vault KV v2 at `secret/data/agentidp/webhooks/{orgId}/{id}/secret`
* and `vault_secret_path` is set to that path. In local mode the secret is bcrypt-hashed
* and stored in `secret_hash`; the raw secret is returned from this method and must be
* delivered to the caller immediately — it cannot be retrieved again.
*
* @param orgId - The organization UUID that owns this subscription.
* @param req - The creation request (name, url, events).
* @returns The created IWebhookSubscription plus the one-time `secret` field on the raw object.
* @throws WebhookValidationError if the URL is not https:// or events array is empty.
*/
async createSubscription(
orgId: string,
req: ICreateWebhookRequest,
): Promise<IWebhookSubscription & { secret: string }> {
this.validateUrl(req.url);
this.validateEvents(req.events);
const secret = crypto.randomBytes(32).toString('hex');
const subscriptionId = crypto.randomUUID();
let secretHash: string;
let vaultSecretPath: string;
if (this.vaultClient !== null) {
// Store raw secret in Vault under a webhook-specific path
const vaultPath = `secret/data/agentidp/webhooks/${orgId}/${subscriptionId}/secret`;
await this.storeWebhookSecretInVault(vaultPath, secret);
secretHash = 'vault';
vaultSecretPath = vaultPath;
} else {
// Local mode: bcrypt-hash the secret; raw secret cannot be recovered later
secretHash = await bcrypt.hash(secret, 10);
vaultSecretPath = 'local';
}
const result = await this.pool.query<WebhookSubscriptionRow>(
`INSERT INTO webhook_subscriptions
(id, organization_id, name, url, events, secret_hash, vault_secret_path, active, failure_count)
VALUES ($1, $2, $3, $4, $5::jsonb, $6, $7, true, 0)
RETURNING *`,
[
subscriptionId,
orgId,
req.name,
req.url,
JSON.stringify(req.events),
secretHash,
vaultSecretPath,
],
);
const row = result.rows[0];
return { ...mapRowToSubscription(row), secret };
}
/**
* Retrieves the raw HMAC signing secret for a subscription (Vault mode only).
*
* In local mode the secret was only available at creation time and cannot be recovered.
*
* @param subscriptionId - The subscription UUID.
* @param orgId - The organization UUID (ownership verification).
* @returns The raw HMAC signing secret.
* @throws WebhookNotFoundError if the subscription does not exist or belongs to another org.
* @throws WebhookValidationError if the subscription is in local mode (secret not recoverable).
*/
async getSubscriptionSecret(subscriptionId: string, orgId: string): Promise<string> {
const row = await this.fetchRow(subscriptionId, orgId);
if (row.vault_secret_path === 'local') {
throw new WebhookValidationError(
'Secret not available — use webhook secret returned at creation time.',
{ subscriptionId },
);
}
return this.retrieveWebhookSecretFromVault(row.vault_secret_path);
}
/**
* Returns all webhook subscriptions for the given organization.
*
* @param orgId - The organization UUID.
* @returns Array of IWebhookSubscription (no secret fields).
*/
async listSubscriptions(orgId: string): Promise<IWebhookSubscription[]> {
const result = await this.pool.query<WebhookSubscriptionRow>(
`SELECT * FROM webhook_subscriptions
WHERE organization_id = $1
ORDER BY created_at DESC`,
[orgId],
);
return result.rows.map(mapRowToSubscription);
}
/**
* Returns a single webhook subscription by ID.
*
* @param id - The subscription UUID.
* @param orgId - The organization UUID (ownership verification).
* @returns The IWebhookSubscription.
* @throws WebhookNotFoundError if not found or belongs to another org.
*/
async getSubscription(id: string, orgId: string): Promise<IWebhookSubscription> {
const row = await this.fetchRow(id, orgId);
return mapRowToSubscription(row);
}
/**
* Partially updates a webhook subscription.
*
* Only the fields provided in `req` are changed. If a new URL is provided it must
* use the `https://` scheme.
*
* @param id - The subscription UUID.
* @param orgId - The organization UUID (ownership verification).
* @param req - Fields to update.
* @returns The updated IWebhookSubscription.
* @throws WebhookNotFoundError if not found or belongs to another org.
* @throws WebhookValidationError if the new URL is not https://.
*/
async updateSubscription(
id: string,
orgId: string,
req: IUpdateWebhookRequest,
): Promise<IWebhookSubscription> {
// Verify ownership before mutating
await this.fetchRow(id, orgId);
if (req.url !== undefined) {
this.validateUrl(req.url);
}
const setClauses: string[] = [];
const values: unknown[] = [];
let paramIndex = 1;
if (req.name !== undefined) {
setClauses.push(`name = $${paramIndex++}`);
values.push(req.name);
}
if (req.url !== undefined) {
setClauses.push(`url = $${paramIndex++}`);
values.push(req.url);
}
if (req.events !== undefined) {
this.validateEvents(req.events);
setClauses.push(`events = $${paramIndex++}::jsonb`);
values.push(JSON.stringify(req.events));
}
if (req.active !== undefined) {
setClauses.push(`active = $${paramIndex++}`);
values.push(req.active);
}
if (setClauses.length === 0) {
// Nothing to update — re-fetch and return current state
return this.getSubscription(id, orgId);
}
setClauses.push(`updated_at = NOW()`);
values.push(id);
values.push(orgId);
const result = await this.pool.query<WebhookSubscriptionRow>(
`UPDATE webhook_subscriptions
SET ${setClauses.join(', ')}
WHERE id = $${paramIndex++} AND organization_id = $${paramIndex}
RETURNING *`,
values,
);
if (result.rowCount === 0) {
throw new WebhookNotFoundError(id);
}
return mapRowToSubscription(result.rows[0]);
}
/**
* Permanently deletes a webhook subscription (and all its deliveries via CASCADE).
*
* @param id - The subscription UUID.
* @param orgId - The organization UUID (ownership verification).
* @throws WebhookNotFoundError if not found or belongs to another org.
*/
async deleteSubscription(id: string, orgId: string): Promise<void> {
const result = await this.pool.query(
`DELETE FROM webhook_subscriptions
WHERE id = $1 AND organization_id = $2`,
[id, orgId],
);
if (result.rowCount === 0) {
throw new WebhookNotFoundError(id);
}
}
// ──────────────────────────────────────────────────────────────────────────
// Deliveries
// ──────────────────────────────────────────────────────────────────────────
/**
* Returns a paginated list of delivery records for a subscription.
*
* Verifies that the subscription belongs to the given organization before querying.
*
* @param subscriptionId - The subscription UUID.
* @param orgId - The organization UUID (ownership verification).
* @param limit - Page size (default 20).
* @param offset - Row offset (default 0).
* @returns Paginated delivery records.
* @throws WebhookNotFoundError if the subscription does not exist or belongs to another org.
*/
async listDeliveries(
subscriptionId: string,
orgId: string,
limit: number,
offset: number,
): Promise<IPaginatedDeliveriesResponse> {
// Verify ownership
await this.fetchRow(subscriptionId, orgId);
const countResult = await this.pool.query<{ count: string }>(
`SELECT COUNT(*) AS count FROM webhook_deliveries WHERE subscription_id = $1`,
[subscriptionId],
);
const total = parseInt(countResult.rows[0].count, 10);
const result = await this.pool.query<WebhookDeliveryRow>(
`SELECT * FROM webhook_deliveries
WHERE subscription_id = $1
ORDER BY created_at DESC
LIMIT $2 OFFSET $3`,
[subscriptionId, limit, offset],
);
return {
deliveries: result.rows.map(mapRowToDelivery),
total,
limit,
offset,
};
}
// ──────────────────────────────────────────────────────────────────────────
// Private helpers
// ──────────────────────────────────────────────────────────────────────────
/**
* Fetches the full subscription row including secret fields (for internal use only).
*
* @param id - The subscription UUID.
* @param orgId - The organization UUID.
* @throws WebhookNotFoundError if not found or org mismatch.
*/
private async fetchRow(id: string, orgId: string): Promise<WebhookSubscriptionRow> {
const result = await this.pool.query<WebhookSubscriptionRow>(
`SELECT * FROM webhook_subscriptions WHERE id = $1 AND organization_id = $2`,
[id, orgId],
);
if (result.rowCount === 0) {
throw new WebhookNotFoundError(id);
}
return result.rows[0];
}
/**
* Validates that a URL uses the https:// scheme.
*
* @param url - The URL to validate.
* @throws WebhookValidationError if the scheme is not https.
*/
private validateUrl(url: string): void {
try {
const parsed = new URL(url);
if (parsed.protocol !== 'https:') {
throw new WebhookValidationError(
'Webhook URL must use the https:// scheme.',
{ url },
);
}
} catch (err) {
if (err instanceof WebhookValidationError) throw err;
throw new WebhookValidationError('Webhook URL is not a valid URL.', { url });
}
}
/**
* Validates that the events array is non-empty.
*
* @param events - The events array to validate.
* @throws WebhookValidationError if the array is empty.
*/
private validateEvents(events: WebhookEventType[]): void {
if (events.length === 0) {
throw new WebhookValidationError('Webhook subscription must include at least one event type.');
}
}
/**
* Stores a webhook HMAC secret in Vault at the given KV v2 data path.
*
* @param vaultPath - Full KV v2 data path (e.g. `secret/data/agentidp/webhooks/...`).
* @param secret - The raw HMAC secret to store.
*/
private async storeWebhookSecretInVault(vaultPath: string, secret: string): Promise<void> {
await this.vaultClient!.writeArbitrarySecret(vaultPath, { webhookSecret: secret });
}
/**
* Retrieves a webhook HMAC secret from Vault at the given KV v2 data path.
*
* @param vaultPath - Full KV v2 data path.
* @returns The raw HMAC secret string.
* @throws WebhookValidationError if the secret is missing or empty.
*/
private async retrieveWebhookSecretFromVault(vaultPath: string): Promise<string> {
const data = await this.vaultClient!.readArbitrarySecret(vaultPath);
const secret = data['webhookSecret'];
if (typeof secret !== 'string' || secret.length === 0) {
throw new WebhookValidationError('Vault returned an empty or missing webhook secret.', { vaultPath });
}
return secret;
}
}

View File

@@ -28,7 +28,14 @@ export type DeploymentEnv = 'development' | 'staging' | 'production';
export type CredentialStatus = 'active' | 'revoked';
/** OAuth 2.0 scope values supported by this IdP. */
export type OAuthScope = 'agents:read' | 'agents:write' | 'tokens:read' | 'audit:read' | 'admin:orgs';
export type OAuthScope =
| 'agents:read'
| 'agents:write'
| 'tokens:read'
| 'audit:read'
| 'admin:orgs'
| 'webhooks:read'
| 'webhooks:write';
/** Audit action identifiers for all significant platform events. */
export type AuditAction =
@@ -47,7 +54,10 @@ export type AuditAction =
| 'org.created'
| 'org.updated'
| 'org.deleted'
| 'org.member_added';
| 'org.member_added'
| 'webhook.created'
| 'webhook.updated'
| 'webhook.deleted';
/** Outcome of an audited action. */
export type AuditOutcome = 'success' | 'failure';

94
src/types/webhook.ts Normal file
View File

@@ -0,0 +1,94 @@
/**
* Shared TypeScript interfaces and types for the Webhook / Event Streaming subsystem.
* Imported by WebhookService, WebhookDeliveryWorker, EventPublisher, and their controllers.
*/
// ============================================================================
// Event Types
// ============================================================================
/** All event type identifiers that can be published and subscribed to. */
export type WebhookEventType =
| 'agent.created'
| 'agent.updated'
| 'agent.suspended'
| 'agent.reactivated'
| 'agent.decommissioned'
| 'credential.generated'
| 'credential.rotated'
| 'credential.revoked'
| 'token.issued'
| 'token.revoked';
// ============================================================================
// Webhook Subscription
// ============================================================================
/** A registered webhook subscription (secret fields are never included). */
export interface IWebhookSubscription {
id: string;
organization_id: string;
name: string;
url: string;
events: WebhookEventType[];
active: boolean;
failure_count: number;
created_at: Date;
updated_at: Date;
}
/** Request body for creating a new webhook subscription. */
export interface ICreateWebhookRequest {
name: string;
url: string;
events: WebhookEventType[];
}
/** Request body for partially updating a webhook subscription. */
export interface IUpdateWebhookRequest {
name?: string;
url?: string;
events?: WebhookEventType[];
active?: boolean;
}
// ============================================================================
// Webhook Delivery
// ============================================================================
/** A single webhook delivery attempt record. */
export interface IWebhookDelivery {
id: string;
subscription_id: string;
event_type: WebhookEventType;
payload: Record<string, unknown>;
status: 'pending' | 'delivered' | 'failed' | 'dead_letter';
http_status_code: number | null;
attempt_count: number;
next_retry_at: Date | null;
delivered_at: Date | null;
created_at: Date;
updated_at: Date;
}
/** The envelope sent in every outbound webhook HTTP request body. */
export interface IWebhookPayload {
/** UUID uniquely identifying this event occurrence. */
id: string;
/** The event type identifier. */
event: WebhookEventType;
/** ISO 8601 timestamp of when the event was published. */
timestamp: string;
/** The organization that owns the resource that changed. */
organization_id: string;
/** Event-specific data. Contents vary by event type. */
data: Record<string, unknown>;
}
/** Paginated response for listing webhook deliveries. */
export interface IPaginatedDeliveriesResponse {
deliveries: IWebhookDelivery[];
total: number;
limit: number;
offset: number;
}

View File

@@ -156,6 +156,57 @@ export class VaultClient {
return timingSafeEqual(Buffer.from(stored), Buffer.from(candidateSecret));
}
/**
* Writes a key-value data map to an arbitrary Vault KV v2 data path.
*
* Used by subsystems (e.g. webhook secrets) that store secrets at paths
* outside the standard credential hierarchy.
*
* @param dataPath - Full KV v2 data path (e.g. `secret/data/agentidp/webhooks/...`).
* @param data - Key-value pairs to store under the `data` envelope.
* @throws CredentialError if the Vault write fails.
*/
async writeArbitrarySecret(dataPath: string, data: Record<string, string>): Promise<void> {
try {
await this.client.write(dataPath, { data });
} catch (err) {
throw new CredentialError(
`Failed to write secret to Vault at path '${dataPath}': ${err instanceof Error ? err.message : String(err)}`,
'VAULT_WRITE_ERROR',
{ dataPath },
);
}
}
/**
* Reads a key-value data map from an arbitrary Vault KV v2 data path.
*
* @param dataPath - Full KV v2 data path.
* @returns The stored key-value data map.
* @throws CredentialError if the read fails or the path contains no data.
*/
async readArbitrarySecret(dataPath: string): Promise<Record<string, string>> {
let response: KvV2ReadResponse;
try {
response = (await this.client.read(dataPath)) as KvV2ReadResponse;
} catch (err) {
throw new CredentialError(
`Failed to read secret from Vault at path '${dataPath}': ${err instanceof Error ? err.message : String(err)}`,
'VAULT_READ_ERROR',
{ dataPath },
);
}
const stored = response?.data?.data;
if (typeof stored !== 'object' || stored === null) {
throw new CredentialError(
'Vault returned an empty or missing data object.',
'VAULT_SECRET_MISSING',
{ dataPath },
);
}
return stored;
}
/**
* Permanently deletes all versions of a credential secret from Vault.
* Called on credential revocation.

View File

@@ -0,0 +1,367 @@
/**
* WebhookDeliveryWorker — Bull queue processor for outbound webhook HTTP delivery.
*
* Each job carries a delivery ID, subscription ID, organization ID, and the full
* IWebhookPayload to POST. The worker:
* 1. Fetches the subscription record from PostgreSQL.
* 2. Resolves the target hostname and blocks SSRF attempts (RFC 1918, loopback, link-local).
* 3. Computes HMAC-SHA256 signature using the secret from Vault (or logs a warning and sends
* unsigned when in bcrypt-only mode where the raw secret is unavailable).
* 4. POSTs the payload with SentryAgent signature headers.
* 5. Updates the delivery record status, retry schedule, and subscription failure count.
*
* Retry strategy: up to 5 attempts with exponential backoff (5 s × 2^attempt).
* On exhausting all retries the delivery is marked dead_letter and the
* `agentidp_webhook_dead_letters_total` Prometheus counter is incremented.
*/
import Bull from 'bull';
import { Pool } from 'pg';
import { RedisClientType } from 'redis';
import crypto from 'crypto';
import { promises as dns } from 'dns';
import { VaultClient } from '../vault/VaultClient.js';
import { IWebhookPayload } from '../types/webhook.js';
import { webhookDeadLettersTotal } from '../metrics/registry.js';
// ============================================================================
// Job data interface
// ============================================================================
/** Data carried by every webhook delivery Bull job. */
export interface WebhookDeliveryJob {
deliveryId: string;
subscriptionId: string;
organizationId: string;
payload: IWebhookPayload;
}
// ============================================================================
// Internal DB row shapes
// ============================================================================
/** Minimal subscription fields needed for delivery. */
interface SubscriptionDeliveryRow {
id: string;
organization_id: string;
url: string;
active: boolean;
vault_secret_path: string;
secret_hash: string;
failure_count: number;
}
// ============================================================================
// SSRF protection helpers
// ============================================================================
/** Regular expressions covering RFC 1918, loopback, and link-local address ranges. */
const BLOCKED_IPV4_PREFIXES: RegExp[] = [
/^10\./, // 10.0.0.0/8
/^172\.(1[6-9]|2[0-9]|3[01])\./, // 172.16.0.0/12
/^192\.168\./, // 192.168.0.0/16
/^127\./, // 127.0.0.0/8 (loopback)
/^169\.254\./, // 169.254.0.0/16 (link-local)
];
const BLOCKED_IPV6: RegExp[] = [
/^::1$/, // IPv6 loopback
/^fe80:/i, // IPv6 link-local
];
/**
* Returns true if the given IP address is in a blocked range.
*
* @param address - IPv4 or IPv6 address string.
* @returns Whether the address is blocked (SSRF risk).
*/
function isBlockedAddress(address: string): boolean {
for (const pattern of BLOCKED_IPV4_PREFIXES) {
if (pattern.test(address)) return true;
}
for (const pattern of BLOCKED_IPV6) {
if (pattern.test(address)) return true;
}
return false;
}
/**
* Resolves the hostname of a URL and throws if it resolves to a blocked address.
*
* @param targetUrl - The fully-qualified HTTPS URL to check.
* @throws Error if the hostname resolves to a blocked (SSRF-risk) IP address.
*/
async function guardSsrf(targetUrl: string): Promise<void> {
const { hostname } = new URL(targetUrl);
const { address } = await dns.lookup(hostname);
if (isBlockedAddress(address)) {
throw new Error(
`SSRF protection: hostname '${hostname}' resolves to blocked address '${address}'.`,
);
}
}
// ============================================================================
// Worker class
// ============================================================================
/**
* Bull queue worker that processes outbound webhook delivery jobs.
* Each instance manages a single Bull queue named 'webhook-delivery'.
*/
export class WebhookDeliveryWorker {
private readonly queue: Bull.Queue<WebhookDeliveryJob>;
/**
* @param pool - PostgreSQL connection pool for delivery record updates.
* @param vaultClient - Optional VaultClient for fetching HMAC signing secrets.
* @param redis - Redis client (unused directly; Bull uses redisUrl).
* @param redisUrl - Redis connection URL passed to Bull.
*/
constructor(
private readonly pool: Pool,
private readonly vaultClient: VaultClient | null,
_redis: RedisClientType, // reserved for future use; Bull uses redisUrl directly
redisUrl: string,
) {
this.queue = new Bull<WebhookDeliveryJob>('webhook-delivery', redisUrl);
}
/**
* Registers the job processor with the Bull queue.
* Must be called once after construction.
*/
start(): void {
this.queue.process(this.processJob.bind(this));
}
/**
* Adds a webhook delivery job to the Bull queue.
* The job will be retried up to 5 times with exponential backoff starting at 5 seconds.
*
* @param job - The delivery job data.
*/
async enqueue(job: WebhookDeliveryJob): Promise<void> {
await this.queue.add(job, {
attempts: 5,
backoff: { type: 'exponential', delay: 5000 },
});
}
// ──────────────────────────────────────────────────────────────────────────
// Private processor
// ──────────────────────────────────────────────────────────────────────────
/**
* Processes a single webhook delivery job.
*
* Steps:
* 1. Load subscription from DB.
* 2. Mark dead_letter if subscription is missing or inactive.
* 3. SSRF check — resolve hostname and block private/loopback/link-local addresses.
* 4. Fetch HMAC secret and compute HMAC-SHA256 signature.
* 5. POST payload with SentryAgent signature headers.
* 6. Update delivery record and subscription failure_count.
*
* @param job - Bull job carrying WebhookDeliveryJob data.
*/
private async processJob(job: Bull.Job<WebhookDeliveryJob>): Promise<void> {
const { deliveryId, subscriptionId, organizationId, payload } = job.data;
// 1. Load subscription
const subResult = await this.pool.query<SubscriptionDeliveryRow>(
`SELECT id, organization_id, url, active, vault_secret_path, secret_hash, failure_count
FROM webhook_subscriptions
WHERE id = $1`,
[subscriptionId],
);
if (subResult.rowCount === 0 || !subResult.rows[0].active) {
// Subscription gone or disabled — no point retrying
await this.markDeadLetter(deliveryId, null, organizationId);
return;
}
const subscription = subResult.rows[0];
// 2. SSRF check
try {
await guardSsrf(subscription.url);
} catch (ssrfErr) {
console.error(
`[WebhookDeliveryWorker] SSRF blocked delivery ${deliveryId}: ${(ssrfErr as Error).message}`,
);
await this.markDeadLetter(deliveryId, null, organizationId);
return;
}
// 3. Fetch signing secret
let secret: string | null = null;
if (subscription.vault_secret_path !== 'local' && this.vaultClient !== null) {
try {
secret = await this.fetchSecretFromVault(subscription.vault_secret_path);
} catch (vaultErr) {
console.error(
`[WebhookDeliveryWorker] Failed to retrieve secret from Vault for delivery ${deliveryId}: ${(vaultErr as Error).message}`,
);
}
} else {
// Local (bcrypt) mode — raw secret is unrecoverable; send without signature
console.warn(
`[WebhookDeliveryWorker] Delivery ${deliveryId}: subscription in local mode — sending without HMAC signature.`,
);
}
// 4. Compute HMAC-SHA256 signature if secret is available
const payloadJson = JSON.stringify(payload);
let signatureHeader = '';
if (secret !== null) {
const hex = crypto.createHmac('sha256', secret).update(payloadJson).digest('hex');
signatureHeader = `sha256=${hex}`;
}
// 5. POST to subscription URL
const attemptCount = (job.attemptsMade ?? 0) + 1;
let httpStatus: number | null = null;
let deliverySucceeded = false;
try {
const headers: Record<string, string> = {
'Content-Type': 'application/json',
'X-SentryAgent-Event': payload.event,
'X-SentryAgent-Delivery': deliveryId,
};
if (signatureHeader) {
headers['X-SentryAgent-Signature'] = signatureHeader;
}
const response = await fetch(subscription.url, {
method: 'POST',
headers,
body: payloadJson,
});
httpStatus = response.status;
deliverySucceeded = response.ok; // 2xx
} catch (networkErr) {
// Network error — treat as failure
console.error(
`[WebhookDeliveryWorker] Network error for delivery ${deliveryId}: ${(networkErr as Error).message}`,
);
}
// 6. Update delivery record
if (deliverySucceeded) {
await this.markDelivered(deliveryId, httpStatus as number, subscriptionId);
} else {
const isLastAttempt = attemptCount >= 5;
if (isLastAttempt) {
await this.markDeadLetter(deliveryId, httpStatus, organizationId);
await this.incrementFailureCount(subscriptionId);
} else {
// Exponential backoff: delay = 5000 * 2^(attempt-1) ms
const delayMs = 5000 * Math.pow(2, attemptCount - 1);
const nextRetryAt = new Date(Date.now() + delayMs);
await this.markFailed(deliveryId, httpStatus, attemptCount, nextRetryAt);
await this.incrementFailureCount(subscriptionId);
// Re-throw so Bull retries the job
throw new Error(
`Webhook delivery ${deliveryId} failed (HTTP ${httpStatus ?? 'network error'}), attempt ${attemptCount}/5.`,
);
}
}
}
// ──────────────────────────────────────────────────────────────────────────
// DB update helpers
// ──────────────────────────────────────────────────────────────────────────
/** Marks a delivery as successfully delivered. */
private async markDelivered(
deliveryId: string,
httpStatus: number,
subscriptionId: string,
): Promise<void> {
await this.pool.query(
`UPDATE webhook_deliveries
SET status = 'delivered', http_status_code = $2, delivered_at = NOW(), updated_at = NOW()
WHERE id = $1`,
[deliveryId, httpStatus],
);
// Reset failure count on the subscription
await this.pool.query(
`UPDATE webhook_subscriptions SET failure_count = 0, updated_at = NOW() WHERE id = $1`,
[subscriptionId],
);
}
/** Marks a delivery as failed and schedules the next retry. */
private async markFailed(
deliveryId: string,
httpStatus: number | null,
attemptCount: number,
nextRetryAt: Date,
): Promise<void> {
await this.pool.query(
`UPDATE webhook_deliveries
SET status = 'failed',
http_status_code = $2,
attempt_count = $3,
next_retry_at = $4,
updated_at = NOW()
WHERE id = $1`,
[deliveryId, httpStatus, attemptCount, nextRetryAt],
);
}
/**
* Marks a delivery as dead_letter, fires the Prometheus counter,
* and resets next_retry_at to NULL.
*/
private async markDeadLetter(
deliveryId: string,
httpStatus: number | null,
organizationId: string,
): Promise<void> {
await this.pool.query(
`UPDATE webhook_deliveries
SET status = 'dead_letter',
http_status_code = $2,
next_retry_at = NULL,
updated_at = NOW()
WHERE id = $1`,
[deliveryId, httpStatus],
);
webhookDeadLettersTotal.inc({ organization_id: organizationId });
}
/** Increments the failure_count on the subscription. */
private async incrementFailureCount(subscriptionId: string): Promise<void> {
await this.pool.query(
`UPDATE webhook_subscriptions
SET failure_count = failure_count + 1, updated_at = NOW()
WHERE id = $1`,
[subscriptionId],
);
}
// ──────────────────────────────────────────────────────────────────────────
// Vault helper
// ──────────────────────────────────────────────────────────────────────────
/**
* Fetches a webhook HMAC secret from Vault at the given KV v2 data path.
*
* @param vaultPath - Full KV v2 data path.
* @returns The raw HMAC secret string.
* @throws Error if the secret is missing or empty.
*/
private async fetchSecretFromVault(vaultPath: string): Promise<string> {
const data = await this.vaultClient!.readArbitrarySecret(vaultPath);
const secret = data['webhookSecret'];
if (typeof secret !== 'string' || secret.length === 0) {
throw new Error(`Empty or missing webhook secret at Vault path: ${vaultPath}`);
}
return secret;
}
}