/** * Unit tests for src/services/EventPublisher.ts */ import { EventPublisher } from '../../../src/services/EventPublisher'; import { WebhookDeliveryWorker } from '../../../src/workers/WebhookDeliveryWorker'; import { Pool } from 'pg'; jest.mock('../../../src/workers/WebhookDeliveryWorker'); jest.mock('pg'); const MockPool = Pool as jest.MockedClass; const MockWorker = WebhookDeliveryWorker as jest.MockedClass; function makePool(queryImpl?: jest.Mock): jest.Mocked { const pool = new MockPool() as jest.Mocked; pool.query = queryImpl ?? jest.fn(); return pool; } function makeWorker(): jest.Mocked { const worker = new MockWorker({} as never) as jest.Mocked; worker.enqueue = jest.fn().mockResolvedValue(undefined); return worker; } const ORG_ID = 'org-abc-123'; const EVENT_TYPE = 'agent.created' as const; const DATA = { agentId: 'agent-001' }; describe('EventPublisher', () => { let pool: jest.Mocked; let worker: jest.Mocked; beforeEach(() => { jest.clearAllMocks(); pool = makePool(); worker = makeWorker(); }); describe('publishEvent() — webhook fanout', () => { it('should query for active subscriptions and create a delivery record', async () => { const subscriptionRows = [{ id: 'sub-001', organization_id: ORG_ID }]; const deliveryRow = [{ id: 'del-001' }]; pool.query = jest.fn() .mockResolvedValueOnce({ rows: subscriptionRows, rowCount: 1 }) .mockResolvedValueOnce({ rows: deliveryRow, rowCount: 1 }); const publisher = new EventPublisher(worker, pool, null); await publisher.publishEvent(ORG_ID, EVENT_TYPE, DATA); expect(pool.query).toHaveBeenCalledTimes(2); expect(pool.query).toHaveBeenNthCalledWith( 1, expect.stringContaining('webhook_subscriptions'), [ORG_ID, JSON.stringify([EVENT_TYPE])], ); expect(pool.query).toHaveBeenNthCalledWith( 2, expect.stringContaining('webhook_deliveries'), expect.arrayContaining(['sub-001', EVENT_TYPE]), ); }); it('should enqueue a Bull delivery job for each matching subscription', async () => { const subscriptionRows = [{ id: 'sub-001', organization_id: ORG_ID }]; pool.query = jest.fn() .mockResolvedValueOnce({ rows: subscriptionRows, rowCount: 1 }) .mockResolvedValueOnce({ rows: [{ id: 'del-001' }], rowCount: 1 }); const publisher = new EventPublisher(worker, pool, null); await publisher.publishEvent(ORG_ID, EVENT_TYPE, DATA); expect(worker.enqueue).toHaveBeenCalledTimes(1); expect(worker.enqueue).toHaveBeenCalledWith( expect.objectContaining({ deliveryId: 'del-001', subscriptionId: 'sub-001', organizationId: ORG_ID, }), ); }); it('should fan out to multiple subscriptions', async () => { const subscriptionRows = [ { id: 'sub-001', organization_id: ORG_ID }, { id: 'sub-002', organization_id: ORG_ID }, ]; pool.query = jest.fn() .mockResolvedValueOnce({ rows: subscriptionRows, rowCount: 2 }) .mockResolvedValueOnce({ rows: [{ id: 'del-001' }], rowCount: 1 }) .mockResolvedValueOnce({ rows: [{ id: 'del-002' }], rowCount: 1 }); const publisher = new EventPublisher(worker, pool, null); await publisher.publishEvent(ORG_ID, EVENT_TYPE, DATA); expect(worker.enqueue).toHaveBeenCalledTimes(2); }); it('should not enqueue any jobs when no matching subscriptions exist', async () => { pool.query = jest.fn().mockResolvedValueOnce({ rows: [], rowCount: 0 }); const publisher = new EventPublisher(worker, pool, null); await publisher.publishEvent(ORG_ID, EVENT_TYPE, DATA); expect(worker.enqueue).not.toHaveBeenCalled(); }); it('should not throw when subscription DB query fails', async () => { pool.query = jest.fn().mockRejectedValueOnce(new Error('DB down')); const publisher = new EventPublisher(worker, pool, null); await expect(publisher.publishEvent(ORG_ID, EVENT_TYPE, DATA)).resolves.toBeUndefined(); }); it('should not throw when delivery insert fails for a subscription', async () => { const subscriptionRows = [{ id: 'sub-001', organization_id: ORG_ID }]; pool.query = jest.fn() .mockResolvedValueOnce({ rows: subscriptionRows, rowCount: 1 }) .mockRejectedValueOnce(new Error('Insert failed')); const publisher = new EventPublisher(worker, pool, null); await expect(publisher.publishEvent(ORG_ID, EVENT_TYPE, DATA)).resolves.toBeUndefined(); }); }); describe('publishEvent() — Kafka fanout', () => { it('should produce to Kafka when kafkaProducer is provided', async () => { pool.query = jest.fn().mockResolvedValueOnce({ rows: [], rowCount: 0 }); const kafkaProducer = { send: jest.fn().mockResolvedValue(undefined) }; const publisher = new EventPublisher(worker, pool, kafkaProducer as never); await publisher.publishEvent(ORG_ID, EVENT_TYPE, DATA); expect(kafkaProducer.send).toHaveBeenCalledWith( expect.objectContaining({ topic: 'agentidp-events', messages: expect.arrayContaining([ expect.objectContaining({ key: ORG_ID }), ]), }), ); }); it('should not call Kafka when kafkaProducer is null', async () => { pool.query = jest.fn().mockResolvedValueOnce({ rows: [], rowCount: 0 }); const kafkaProducer = { send: jest.fn() }; const publisher = new EventPublisher(worker, pool, null); await publisher.publishEvent(ORG_ID, EVENT_TYPE, DATA); expect(kafkaProducer.send).not.toHaveBeenCalled(); }); it('should not throw when Kafka produce fails', async () => { pool.query = jest.fn().mockResolvedValueOnce({ rows: [], rowCount: 0 }); const kafkaProducer = { send: jest.fn().mockRejectedValue(new Error('Kafka error')) }; const publisher = new EventPublisher(worker, pool, kafkaProducer as never); await expect(publisher.publishEvent(ORG_ID, EVENT_TYPE, DATA)).resolves.toBeUndefined(); }); }); });