Files
sentryagent-idp/src/services/AgentService.ts
SentryAgent.ai Developer 272b69f18d feat(phase-3): workstream 5 — Webhooks & Event Streaming
- DB migrations 016/017: webhook_subscriptions and webhook_deliveries tables
- WebhookService: CRUD for subscriptions, Vault-backed secret storage, delivery history
- WebhookDeliveryWorker: Bull queue, HMAC-SHA256 signatures, exponential backoff,
  SSRF protection (RFC 1918 + loopback + link-local rejection), dead-letter handling
- EventPublisher: publishes 10 event types (agent/credential/token lifecycle);
  optional Kafka adapter activated via KAFKA_BROKERS env var
- AgentService, CredentialService, OAuth2Service: wired to EventPublisher
- WebhookController + routes: 6 endpoints with webhooks:read / webhooks:write scope guards
- KafkaAdapter: optional Kafka producer (kafkajs), no-op when KAFKA_BROKERS unset
- OAuthScope extended: webhooks:read, webhooks:write
- AuditAction extended: webhook.created, webhook.updated, webhook.deleted
- Metrics: agentidp_webhook_dead_letters_total counter added to registry
- 523 unit tests passing; TypeScript strict throughout, zero `any`

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-31 00:07:41 +00:00

274 lines
8.6 KiB
TypeScript

/**
* Agent Registry Service for SentryAgent.ai AgentIdP.
* Business logic for agent lifecycle management.
*/
import { AgentRepository } from '../repositories/AgentRepository.js';
import { CredentialRepository } from '../repositories/CredentialRepository.js';
import { AuditService } from './AuditService.js';
import { DIDService } from './DIDService.js';
import { EventPublisher } from './EventPublisher.js';
import {
IAgent,
ICreateAgentRequest,
IUpdateAgentRequest,
IAgentListFilters,
IPaginatedAgentsResponse,
} from '../types/index.js';
import {
AgentNotFoundError,
AgentAlreadyExistsError,
AgentAlreadyDecommissionedError,
FreeTierLimitError,
} from '../utils/errors.js';
import { agentsRegisteredTotal } from '../metrics/registry.js';
const FREE_TIER_MAX_AGENTS = 100;
/**
* Service for agent registration and lifecycle management.
* Enforces free-tier limits and coordinates with AuditService.
*/
export class AgentService {
/**
* @param agentRepository - The agent data repository.
* @param credentialRepository - The credential repository (for decommission cleanup).
* @param auditService - The audit log service.
* @param didService - Optional DIDService. When provided, a W3C DID is generated for each
* newly registered agent. When null/undefined, DID generation is skipped
* (backward-compatible default).
* @param eventPublisher - Optional EventPublisher. When provided, lifecycle events are
* published as webhooks and Kafka messages (fire-and-forget).
*/
constructor(
private readonly agentRepository: AgentRepository,
private readonly credentialRepository: CredentialRepository,
private readonly auditService: AuditService,
private readonly didService: DIDService | null = null,
private readonly eventPublisher: EventPublisher | null = null,
) {}
/**
* Registers a new AI agent identity.
* Enforces the free-tier 100-agent limit and unique email constraint.
*
* @param data - The agent registration data.
* @param ipAddress - Client IP for audit logging.
* @param userAgent - Client User-Agent for audit logging.
* @returns The newly created agent record.
* @throws FreeTierLimitError if the 100-agent limit is reached.
* @throws AgentAlreadyExistsError if the email is already registered.
*/
async registerAgent(
data: ICreateAgentRequest,
ipAddress: string,
userAgent: string,
): Promise<IAgent> {
// Enforce free-tier agent count limit
const currentCount = await this.agentRepository.countActive();
if (currentCount >= FREE_TIER_MAX_AGENTS) {
throw new FreeTierLimitError(
'Free tier limit of 100 registered agents has been reached.',
{ limit: FREE_TIER_MAX_AGENTS, current: currentCount },
);
}
// Check email uniqueness
const existing = await this.agentRepository.findByEmail(data.email);
if (existing !== null) {
throw new AgentAlreadyExistsError(data.email);
}
const agent = await this.agentRepository.create(data);
// Generate a W3C DID for the new agent when DIDService is available
if (this.didService !== null) {
const organizationId = data.organizationId ?? 'org_system';
await this.didService.generateDIDForAgent(agent.agentId, organizationId);
}
// Synchronous audit insert
await this.auditService.logEvent(
agent.agentId,
'agent.created',
'success',
ipAddress,
userAgent,
{ agentType: agent.agentType, owner: agent.owner },
);
// Instrument: count successful agent registrations
agentsRegisteredTotal.inc({ deployment_env: data.deploymentEnv });
// Publish event (fire-and-forget)
void this.eventPublisher?.publishEvent(
agent.organizationId,
'agent.created',
{ agentId: agent.agentId, email: agent.email, name: agent.owner },
);
return agent;
}
/**
* Retrieves a single agent by its UUID.
*
* @param agentId - The agent UUID.
* @returns The agent record.
* @throws AgentNotFoundError if the agent does not exist.
*/
async getAgentById(agentId: string): Promise<IAgent> {
const agent = await this.agentRepository.findById(agentId);
if (!agent) {
throw new AgentNotFoundError(agentId);
}
return agent;
}
/**
* Returns a paginated, optionally filtered list of agents.
*
* @param filters - Pagination and filter criteria.
* @returns Paginated agents response.
*/
async listAgents(filters: IAgentListFilters): Promise<IPaginatedAgentsResponse> {
const { agents, total } = await this.agentRepository.findAll(filters);
return {
data: agents,
total,
page: filters.page,
limit: filters.limit,
};
}
/**
* Partially updates an agent's metadata.
* Immutable fields (agentId, email, createdAt) cannot be changed.
* Decommissioned agents cannot be updated.
*
* @param agentId - The agent UUID to update.
* @param data - The fields to update.
* @param ipAddress - Client IP for audit logging.
* @param userAgent - Client User-Agent for audit logging.
* @returns The updated agent record.
* @throws AgentNotFoundError if the agent does not exist.
* @throws AgentAlreadyDecommissionedError if the agent is decommissioned.
* @throws ValidationError if immutable fields are included.
*/
async updateAgent(
agentId: string,
data: IUpdateAgentRequest,
ipAddress: string,
userAgent: string,
): Promise<IAgent> {
const agent = await this.agentRepository.findById(agentId);
if (!agent) {
throw new AgentNotFoundError(agentId);
}
if (agent.status === 'decommissioned') {
throw new AgentAlreadyDecommissionedError(agentId);
}
// Detect if status changes
const oldStatus = agent.status;
const updated = await this.agentRepository.update(agentId, data);
if (!updated) {
throw new AgentNotFoundError(agentId);
}
// Determine which audit action to log
let auditAction: 'agent.updated' | 'agent.suspended' | 'agent.reactivated' | 'agent.decommissioned' =
'agent.updated';
if (data.status !== undefined && data.status !== oldStatus) {
if (data.status === 'suspended') auditAction = 'agent.suspended';
else if (data.status === 'active') auditAction = 'agent.reactivated';
else if (data.status === 'decommissioned') auditAction = 'agent.decommissioned';
}
await this.auditService.logEvent(
agentId,
auditAction,
'success',
ipAddress,
userAgent,
{ updatedFields: Object.keys(data) },
);
// Publish lifecycle event (fire-and-forget)
if (auditAction === 'agent.updated') {
void this.eventPublisher?.publishEvent(
updated.organizationId,
'agent.updated',
{ agentId, changes: data },
);
} else if (auditAction === 'agent.suspended') {
void this.eventPublisher?.publishEvent(
updated.organizationId,
'agent.suspended',
{ agentId },
);
} else if (auditAction === 'agent.reactivated') {
void this.eventPublisher?.publishEvent(
updated.organizationId,
'agent.reactivated',
{ agentId },
);
} else if (auditAction === 'agent.decommissioned') {
void this.eventPublisher?.publishEvent(
updated.organizationId,
'agent.decommissioned',
{ agentId },
);
}
return updated;
}
/**
* Permanently decommissions an agent (soft delete).
* Revokes all active credentials for the agent.
*
* @param agentId - The agent UUID to decommission.
* @param ipAddress - Client IP for audit logging.
* @param userAgent - Client User-Agent for audit logging.
* @throws AgentNotFoundError if the agent does not exist.
* @throws AgentAlreadyDecommissionedError if already decommissioned.
*/
async decommissionAgent(
agentId: string,
ipAddress: string,
userAgent: string,
): Promise<void> {
const agent = await this.agentRepository.findById(agentId);
if (!agent) {
throw new AgentNotFoundError(agentId);
}
if (agent.status === 'decommissioned') {
throw new AgentAlreadyDecommissionedError(agentId);
}
// Revoke all active credentials
await this.credentialRepository.revokeAllForAgent(agentId);
await this.agentRepository.decommission(agentId);
await this.auditService.logEvent(
agentId,
'agent.decommissioned',
'success',
ipAddress,
userAgent,
{},
);
// Publish event (fire-and-forget)
void this.eventPublisher?.publishEvent(
agent.organizationId,
'agent.decommissioned',
{ agentId },
);
}
}