fix(vv): resolve all 6 V&V issues — field trial unblocked
All findings from the inaugural LeadValidator audit resolved and confirmed. Release gate: PASS. VV_ISSUE_002 (BLOCKER): 15 OpenAPI specs verified present covering all 20 route groups (46 endpoints documented in docs/openapi/) VV_ISSUE_003 (MAJOR): Remove any types from src/db/pool.ts — replaced pool.query shim with unknown[] + Object.defineProperty, zero any types, eslint-disable suppressions removed VV_ISSUE_004 (MAJOR): Remove raw Pool from ScaffoldController and HealthDetailedController — injected AgentRepository/CredentialRepository and DbProbe interface respectively; added CredentialRepository.findActiveClientId() VV_ISSUE_005 (MAJOR): Add unit tests for 5 untested services — ComplianceStatusStore, EventPublisher, MarketplaceService, OIDCTrustPolicyService, UsageService VV_ISSUE_006 (MAJOR): Add integration tests for 7 missing route groups — analytics, billing, tiers, webhooks, marketplace, oidc-trust-policies, oidc-token-exchange VV_ISSUE_001 (MINOR): Create missing design.md and tasks.md in 4 OpenSpec archives — all archives now complete Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
163
tests/unit/services/EventPublisher.test.ts
Normal file
163
tests/unit/services/EventPublisher.test.ts
Normal file
@@ -0,0 +1,163 @@
|
||||
/**
|
||||
* Unit tests for src/services/EventPublisher.ts
|
||||
*/
|
||||
|
||||
import { EventPublisher } from '../../../src/services/EventPublisher';
|
||||
import { WebhookDeliveryWorker } from '../../../src/workers/WebhookDeliveryWorker';
|
||||
import { Pool } from 'pg';
|
||||
|
||||
jest.mock('../../../src/workers/WebhookDeliveryWorker');
|
||||
jest.mock('pg');
|
||||
|
||||
const MockPool = Pool as jest.MockedClass<typeof Pool>;
|
||||
const MockWorker = WebhookDeliveryWorker as jest.MockedClass<typeof WebhookDeliveryWorker>;
|
||||
|
||||
function makePool(queryImpl?: jest.Mock): jest.Mocked<Pool> {
|
||||
const pool = new MockPool() as jest.Mocked<Pool>;
|
||||
pool.query = queryImpl ?? jest.fn();
|
||||
return pool;
|
||||
}
|
||||
|
||||
function makeWorker(): jest.Mocked<WebhookDeliveryWorker> {
|
||||
const worker = new MockWorker({} as never) as jest.Mocked<WebhookDeliveryWorker>;
|
||||
worker.enqueue = jest.fn().mockResolvedValue(undefined);
|
||||
return worker;
|
||||
}
|
||||
|
||||
const ORG_ID = 'org-abc-123';
|
||||
const EVENT_TYPE = 'agent.created' as const;
|
||||
const DATA = { agentId: 'agent-001' };
|
||||
|
||||
describe('EventPublisher', () => {
|
||||
let pool: jest.Mocked<Pool>;
|
||||
let worker: jest.Mocked<WebhookDeliveryWorker>;
|
||||
|
||||
beforeEach(() => {
|
||||
jest.clearAllMocks();
|
||||
pool = makePool();
|
||||
worker = makeWorker();
|
||||
});
|
||||
|
||||
describe('publishEvent() — webhook fanout', () => {
|
||||
it('should query for active subscriptions and create a delivery record', async () => {
|
||||
const subscriptionRows = [{ id: 'sub-001', organization_id: ORG_ID }];
|
||||
const deliveryRow = [{ id: 'del-001' }];
|
||||
pool.query = jest.fn()
|
||||
.mockResolvedValueOnce({ rows: subscriptionRows, rowCount: 1 })
|
||||
.mockResolvedValueOnce({ rows: deliveryRow, rowCount: 1 });
|
||||
|
||||
const publisher = new EventPublisher(worker, pool, null);
|
||||
await publisher.publishEvent(ORG_ID, EVENT_TYPE, DATA);
|
||||
|
||||
expect(pool.query).toHaveBeenCalledTimes(2);
|
||||
expect(pool.query).toHaveBeenNthCalledWith(
|
||||
1,
|
||||
expect.stringContaining('webhook_subscriptions'),
|
||||
[ORG_ID, JSON.stringify([EVENT_TYPE])],
|
||||
);
|
||||
expect(pool.query).toHaveBeenNthCalledWith(
|
||||
2,
|
||||
expect.stringContaining('webhook_deliveries'),
|
||||
expect.arrayContaining(['sub-001', EVENT_TYPE]),
|
||||
);
|
||||
});
|
||||
|
||||
it('should enqueue a Bull delivery job for each matching subscription', async () => {
|
||||
const subscriptionRows = [{ id: 'sub-001', organization_id: ORG_ID }];
|
||||
pool.query = jest.fn()
|
||||
.mockResolvedValueOnce({ rows: subscriptionRows, rowCount: 1 })
|
||||
.mockResolvedValueOnce({ rows: [{ id: 'del-001' }], rowCount: 1 });
|
||||
|
||||
const publisher = new EventPublisher(worker, pool, null);
|
||||
await publisher.publishEvent(ORG_ID, EVENT_TYPE, DATA);
|
||||
|
||||
expect(worker.enqueue).toHaveBeenCalledTimes(1);
|
||||
expect(worker.enqueue).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
deliveryId: 'del-001',
|
||||
subscriptionId: 'sub-001',
|
||||
organizationId: ORG_ID,
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
it('should fan out to multiple subscriptions', async () => {
|
||||
const subscriptionRows = [
|
||||
{ id: 'sub-001', organization_id: ORG_ID },
|
||||
{ id: 'sub-002', organization_id: ORG_ID },
|
||||
];
|
||||
pool.query = jest.fn()
|
||||
.mockResolvedValueOnce({ rows: subscriptionRows, rowCount: 2 })
|
||||
.mockResolvedValueOnce({ rows: [{ id: 'del-001' }], rowCount: 1 })
|
||||
.mockResolvedValueOnce({ rows: [{ id: 'del-002' }], rowCount: 1 });
|
||||
|
||||
const publisher = new EventPublisher(worker, pool, null);
|
||||
await publisher.publishEvent(ORG_ID, EVENT_TYPE, DATA);
|
||||
|
||||
expect(worker.enqueue).toHaveBeenCalledTimes(2);
|
||||
});
|
||||
|
||||
it('should not enqueue any jobs when no matching subscriptions exist', async () => {
|
||||
pool.query = jest.fn().mockResolvedValueOnce({ rows: [], rowCount: 0 });
|
||||
|
||||
const publisher = new EventPublisher(worker, pool, null);
|
||||
await publisher.publishEvent(ORG_ID, EVENT_TYPE, DATA);
|
||||
|
||||
expect(worker.enqueue).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('should not throw when subscription DB query fails', async () => {
|
||||
pool.query = jest.fn().mockRejectedValueOnce(new Error('DB down'));
|
||||
|
||||
const publisher = new EventPublisher(worker, pool, null);
|
||||
await expect(publisher.publishEvent(ORG_ID, EVENT_TYPE, DATA)).resolves.toBeUndefined();
|
||||
});
|
||||
|
||||
it('should not throw when delivery insert fails for a subscription', async () => {
|
||||
const subscriptionRows = [{ id: 'sub-001', organization_id: ORG_ID }];
|
||||
pool.query = jest.fn()
|
||||
.mockResolvedValueOnce({ rows: subscriptionRows, rowCount: 1 })
|
||||
.mockRejectedValueOnce(new Error('Insert failed'));
|
||||
|
||||
const publisher = new EventPublisher(worker, pool, null);
|
||||
await expect(publisher.publishEvent(ORG_ID, EVENT_TYPE, DATA)).resolves.toBeUndefined();
|
||||
});
|
||||
});
|
||||
|
||||
describe('publishEvent() — Kafka fanout', () => {
|
||||
it('should produce to Kafka when kafkaProducer is provided', async () => {
|
||||
pool.query = jest.fn().mockResolvedValueOnce({ rows: [], rowCount: 0 });
|
||||
const kafkaProducer = { send: jest.fn().mockResolvedValue(undefined) };
|
||||
|
||||
const publisher = new EventPublisher(worker, pool, kafkaProducer as never);
|
||||
await publisher.publishEvent(ORG_ID, EVENT_TYPE, DATA);
|
||||
|
||||
expect(kafkaProducer.send).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
topic: 'agentidp-events',
|
||||
messages: expect.arrayContaining([
|
||||
expect.objectContaining({ key: ORG_ID }),
|
||||
]),
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
it('should not call Kafka when kafkaProducer is null', async () => {
|
||||
pool.query = jest.fn().mockResolvedValueOnce({ rows: [], rowCount: 0 });
|
||||
const kafkaProducer = { send: jest.fn() };
|
||||
|
||||
const publisher = new EventPublisher(worker, pool, null);
|
||||
await publisher.publishEvent(ORG_ID, EVENT_TYPE, DATA);
|
||||
|
||||
expect(kafkaProducer.send).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('should not throw when Kafka produce fails', async () => {
|
||||
pool.query = jest.fn().mockResolvedValueOnce({ rows: [], rowCount: 0 });
|
||||
const kafkaProducer = { send: jest.fn().mockRejectedValue(new Error('Kafka error')) };
|
||||
|
||||
const publisher = new EventPublisher(worker, pool, kafkaProducer as never);
|
||||
await expect(publisher.publishEvent(ORG_ID, EVENT_TYPE, DATA)).resolves.toBeUndefined();
|
||||
});
|
||||
});
|
||||
});
|
||||
Reference in New Issue
Block a user