feat(phase-3): workstream 4 — AGNTCY Federation
Implements cross-IdP token verification for the AGNTCY ecosystem: - Migration 015: federation_partners table (issuer, jwks_uri, allowed_organizations JSONB, status, expires_at) - FederationService: registerPartner (JWKS validation at registration), listPartners, getPartner, updatePartner, deletePartner, verifyFederatedToken (alg:none rejected, RS256/ES256 only, allowedOrganizations filter, expiry enforcement) - JWKS caching in Redis (TTL: FEDERATION_JWKS_CACHE_TTL_SECONDS); cache invalidated on partner delete and jwks_uri change - FederationController + routes: 5 admin:orgs endpoints + POST /federation/verify (agents:read) - OPA policy: 5 federation admin endpoint → admin:orgs mappings - 499 unit tests passing; 94.69% statement coverage on FederationService Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
550
src/services/FederationService.ts
Normal file
550
src/services/FederationService.ts
Normal file
@@ -0,0 +1,550 @@
|
||||
/**
|
||||
* FederationService — manages trusted federation partners and cross-IdP token verification.
|
||||
*
|
||||
* Trust registration: partner's JWKS are fetched and validated at registration time.
|
||||
* Token verification: fetches (or uses cached) partner JWKS, verifies signature and claims.
|
||||
* Organization filtering: if allowed_organizations is non-empty, rejects tokens whose
|
||||
* organization_id claim is not in the allow list.
|
||||
* Expiry: partners past expires_at are treated as status 'expired' and their tokens rejected.
|
||||
*/
|
||||
|
||||
import { Pool, QueryResult } from 'pg';
|
||||
import jwt from 'jsonwebtoken';
|
||||
import { createPublicKey, JsonWebKey } from 'crypto';
|
||||
import { RedisClientType } from 'redis';
|
||||
|
||||
import {
|
||||
IFederationPartner,
|
||||
FederationPartnerStatus,
|
||||
ICreatePartnerRequest,
|
||||
IUpdatePartnerRequest,
|
||||
IFederationVerifyRequest,
|
||||
IFederationVerifyResult,
|
||||
IFederatedTokenClaims,
|
||||
} from '../types/federation.js';
|
||||
import { IJWKSKey } from '../types/oidc.js';
|
||||
import { SentryAgentError } from '../utils/errors.js';
|
||||
|
||||
// ─────────────────────────────────────────────────────────────────────────────
|
||||
// Error classes
|
||||
// ─────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
/**
|
||||
* 400 — Federation partner operation failed (e.g. unreachable JWKS endpoint, invalid JWKS).
|
||||
*/
|
||||
export class FederationPartnerError extends SentryAgentError {
|
||||
constructor(message: string) {
|
||||
super(message, 'FEDERATION_PARTNER_ERROR', 400);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 404 — The requested federation partner was not found.
|
||||
*/
|
||||
export class FederationPartnerNotFoundError extends SentryAgentError {
|
||||
constructor(id?: string) {
|
||||
super(
|
||||
'Federation partner not found.',
|
||||
'FEDERATION_PARTNER_NOT_FOUND',
|
||||
404,
|
||||
id ? { id } : undefined,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 401 — Federated token verification failed.
|
||||
*/
|
||||
export class FederationVerificationError extends SentryAgentError {
|
||||
constructor(reason: string) {
|
||||
super(reason, 'FEDERATION_VERIFICATION_ERROR', 401);
|
||||
}
|
||||
}
|
||||
|
||||
// ─────────────────────────────────────────────────────────────────────────────
|
||||
// Internal DB row shape
|
||||
// ─────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
/** Raw database row for federation_partners. */
|
||||
interface FederationPartnerRow {
|
||||
id: string;
|
||||
name: string;
|
||||
issuer: string;
|
||||
jwks_uri: string;
|
||||
allowed_organizations: string[];
|
||||
status: string;
|
||||
created_at: Date;
|
||||
updated_at: Date;
|
||||
expires_at: Date | null;
|
||||
}
|
||||
|
||||
// ─────────────────────────────────────────────────────────────────────────────
|
||||
// Helpers
|
||||
// ─────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
/**
|
||||
* Maps a raw database row to the IFederationPartner domain model.
|
||||
*
|
||||
* @param row - Raw row from the federation_partners table.
|
||||
* @returns Typed IFederationPartner object.
|
||||
*/
|
||||
function mapRowToPartner(row: FederationPartnerRow): IFederationPartner {
|
||||
return {
|
||||
id: row.id,
|
||||
name: row.name,
|
||||
issuer: row.issuer,
|
||||
jwks_uri: row.jwks_uri,
|
||||
allowed_organizations: row.allowed_organizations ?? [],
|
||||
status: row.status as FederationPartnerStatus,
|
||||
created_at: row.created_at,
|
||||
updated_at: row.updated_at,
|
||||
expires_at: row.expires_at ?? null,
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Converts a JWK object to a PEM public key string using Node.js crypto.
|
||||
*
|
||||
* @param jwk - The IJWKSKey to convert.
|
||||
* @returns A PEM-encoded public key string.
|
||||
*/
|
||||
function jwkToPem(jwk: IJWKSKey): string {
|
||||
const keyObj = createPublicKey({ key: jwk as unknown as JsonWebKey, format: 'jwk' });
|
||||
return keyObj.export({ type: 'spki', format: 'pem' }) as string;
|
||||
}
|
||||
|
||||
// ─────────────────────────────────────────────────────────────────────────────
|
||||
// Service
|
||||
// ─────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
/**
|
||||
* Service that manages federation partners and verifies cross-IdP tokens.
|
||||
* Integrates with PostgreSQL for partner persistence and Redis for JWKS caching.
|
||||
*/
|
||||
export class FederationService {
|
||||
/**
|
||||
* @param pool - PostgreSQL connection pool.
|
||||
* @param redis - Redis client for JWKS caching.
|
||||
*/
|
||||
constructor(
|
||||
private readonly pool: Pool,
|
||||
private readonly redis: RedisClientType,
|
||||
) {}
|
||||
|
||||
// ───────────────────────────────────────────────────────────────────────────
|
||||
// Public API — Partner Management
|
||||
// ───────────────────────────────────────────────────────────────────────────
|
||||
|
||||
/**
|
||||
* Registers a new federation partner.
|
||||
*
|
||||
* Validates the JWKS endpoint is reachable and returns a valid JWKS before persisting.
|
||||
* The fetched JWKS is cached in Redis on successful registration.
|
||||
*
|
||||
* @param req - The partner registration request.
|
||||
* @returns The newly created IFederationPartner.
|
||||
* @throws FederationPartnerError (400) if the JWKS endpoint is unreachable or returns invalid JWKS.
|
||||
*/
|
||||
async registerPartner(req: ICreatePartnerRequest): Promise<IFederationPartner> {
|
||||
// Validate the JWKS endpoint is reachable and returns valid JWKS
|
||||
const jwks = await this.fetchJWKS(req.jwks_uri);
|
||||
|
||||
const allowedOrgs = req.allowed_organizations ?? [];
|
||||
const expiresAt = req.expires_at ? new Date(req.expires_at) : null;
|
||||
|
||||
const result: QueryResult<FederationPartnerRow> = await this.pool.query(
|
||||
`INSERT INTO federation_partners
|
||||
(name, issuer, jwks_uri, allowed_organizations, expires_at)
|
||||
VALUES ($1, $2, $3, $4, $5)
|
||||
RETURNING *`,
|
||||
[req.name, req.issuer, req.jwks_uri, JSON.stringify(allowedOrgs), expiresAt],
|
||||
);
|
||||
|
||||
const partner = mapRowToPartner(result.rows[0]);
|
||||
|
||||
// Cache the fetched JWKS for future token verifications
|
||||
await this.cacheJWKS(partner.issuer, jwks);
|
||||
|
||||
return partner;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns all registered federation partners.
|
||||
*
|
||||
* Partners past their expires_at are automatically updated to status 'expired' in the DB
|
||||
* before the list is returned.
|
||||
*
|
||||
* @returns Array of IFederationPartner objects.
|
||||
*/
|
||||
async listPartners(): Promise<IFederationPartner[]> {
|
||||
// Update any partners that have passed their expiry date
|
||||
await this.pool.query(
|
||||
`UPDATE federation_partners
|
||||
SET status = 'expired', updated_at = NOW()
|
||||
WHERE expires_at IS NOT NULL
|
||||
AND expires_at <= NOW()
|
||||
AND status != 'expired'`,
|
||||
);
|
||||
|
||||
const result: QueryResult<FederationPartnerRow> = await this.pool.query(
|
||||
`SELECT * FROM federation_partners ORDER BY created_at DESC`,
|
||||
);
|
||||
|
||||
return result.rows.map(mapRowToPartner);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a single federation partner by its UUID.
|
||||
*
|
||||
* Applies the same expiry update logic as listPartners.
|
||||
*
|
||||
* @param id - The UUID of the federation partner.
|
||||
* @returns The matching IFederationPartner.
|
||||
* @throws FederationPartnerNotFoundError (404) if no partner with that id exists.
|
||||
*/
|
||||
async getPartner(id: string): Promise<IFederationPartner> {
|
||||
// Update expiry status if needed
|
||||
await this.pool.query(
|
||||
`UPDATE federation_partners
|
||||
SET status = 'expired', updated_at = NOW()
|
||||
WHERE id = $1
|
||||
AND expires_at IS NOT NULL
|
||||
AND expires_at <= NOW()
|
||||
AND status != 'expired'`,
|
||||
[id],
|
||||
);
|
||||
|
||||
const result: QueryResult<FederationPartnerRow> = await this.pool.query(
|
||||
`SELECT * FROM federation_partners WHERE id = $1`,
|
||||
[id],
|
||||
);
|
||||
|
||||
if (result.rows.length === 0) {
|
||||
throw new FederationPartnerNotFoundError(id);
|
||||
}
|
||||
|
||||
return mapRowToPartner(result.rows[0]);
|
||||
}
|
||||
|
||||
/**
|
||||
* Updates an existing federation partner.
|
||||
*
|
||||
* Only the provided fields are updated. The updated_at column is always refreshed.
|
||||
* If jwks_uri changes, the cached JWKS for the old issuer is invalidated.
|
||||
*
|
||||
* @param id - The UUID of the federation partner to update.
|
||||
* @param req - Fields to update.
|
||||
* @returns The updated IFederationPartner.
|
||||
* @throws FederationPartnerNotFoundError (404) if the partner does not exist.
|
||||
*/
|
||||
async updatePartner(id: string, req: IUpdatePartnerRequest): Promise<IFederationPartner> {
|
||||
// Fetch current partner to get old issuer for cache invalidation
|
||||
const current = await this.getPartner(id);
|
||||
|
||||
const setClauses: string[] = [];
|
||||
const values: unknown[] = [];
|
||||
let idx = 1;
|
||||
|
||||
if (req.name !== undefined) {
|
||||
setClauses.push(`name = $${idx++}`);
|
||||
values.push(req.name);
|
||||
}
|
||||
if (req.jwks_uri !== undefined) {
|
||||
setClauses.push(`jwks_uri = $${idx++}`);
|
||||
values.push(req.jwks_uri);
|
||||
}
|
||||
if (req.allowed_organizations !== undefined) {
|
||||
setClauses.push(`allowed_organizations = $${idx++}`);
|
||||
values.push(JSON.stringify(req.allowed_organizations));
|
||||
}
|
||||
if (req.status !== undefined) {
|
||||
setClauses.push(`status = $${idx++}`);
|
||||
values.push(req.status);
|
||||
}
|
||||
if (req.expires_at !== undefined) {
|
||||
setClauses.push(`expires_at = $${idx++}`);
|
||||
values.push(req.expires_at === null ? null : new Date(req.expires_at));
|
||||
}
|
||||
|
||||
// Always update updated_at
|
||||
setClauses.push(`updated_at = NOW()`);
|
||||
values.push(id);
|
||||
|
||||
const result: QueryResult<FederationPartnerRow> = await this.pool.query(
|
||||
`UPDATE federation_partners SET ${setClauses.join(', ')} WHERE id = $${idx} RETURNING *`,
|
||||
values,
|
||||
);
|
||||
|
||||
if (result.rows.length === 0) {
|
||||
throw new FederationPartnerNotFoundError(id);
|
||||
}
|
||||
|
||||
// Invalidate JWKS cache if jwks_uri changed
|
||||
if (req.jwks_uri !== undefined && req.jwks_uri !== current.jwks_uri) {
|
||||
await this.invalidateJWKSCache(current.issuer);
|
||||
}
|
||||
|
||||
return mapRowToPartner(result.rows[0]);
|
||||
}
|
||||
|
||||
/**
|
||||
* Deletes a federation partner by its UUID.
|
||||
*
|
||||
* Invalidates the partner's JWKS cache entry on successful deletion.
|
||||
*
|
||||
* @param id - The UUID of the federation partner to delete.
|
||||
* @throws FederationPartnerNotFoundError (404) if the partner does not exist.
|
||||
*/
|
||||
async deletePartner(id: string): Promise<void> {
|
||||
// Fetch to get issuer before deletion (for cache invalidation)
|
||||
const partner = await this.getPartner(id);
|
||||
|
||||
const result: QueryResult<FederationPartnerRow> = await this.pool.query(
|
||||
`DELETE FROM federation_partners WHERE id = $1 RETURNING id`,
|
||||
[id],
|
||||
);
|
||||
|
||||
if (result.rows.length === 0) {
|
||||
throw new FederationPartnerNotFoundError(id);
|
||||
}
|
||||
|
||||
await this.invalidateJWKSCache(partner.issuer);
|
||||
}
|
||||
|
||||
// ───────────────────────────────────────────────────────────────────────────
|
||||
// Public API — Token Verification
|
||||
// ───────────────────────────────────────────────────────────────────────────
|
||||
|
||||
/**
|
||||
* Verifies a federated JWT token from a registered partner IdP.
|
||||
*
|
||||
* Performs the following checks in order:
|
||||
* 1. Token is structurally valid (decodable header + payload).
|
||||
* 2. `alg: none` is explicitly rejected.
|
||||
* 3. `iss` matches a known, active, non-expired federation partner.
|
||||
* 4. If expected_issuer is provided, `iss` must match exactly.
|
||||
* 5. Partner JWKS is fetched (cache-first) and the matching key used for signature verification.
|
||||
* 6. JWT signature and `exp` claim are verified by jsonwebtoken.
|
||||
* 7. If the partner has a non-empty allowed_organizations list, the token's
|
||||
* `organization_id` claim must be in that list.
|
||||
*
|
||||
* @param req - The verification request containing the token and optional issuer hint.
|
||||
* @returns IFederationVerifyResult with verified claims.
|
||||
* @throws FederationVerificationError (401) if any verification step fails.
|
||||
*/
|
||||
async verifyFederatedToken(req: IFederationVerifyRequest): Promise<IFederationVerifyResult> {
|
||||
// Decode the token header without verification to extract kid and alg
|
||||
const decoded = jwt.decode(req.token, { complete: true });
|
||||
if (!decoded || typeof decoded === 'string') {
|
||||
throw new FederationVerificationError('Token is malformed or not a valid JWT');
|
||||
}
|
||||
|
||||
const header = decoded.header;
|
||||
|
||||
// Explicitly reject alg:none
|
||||
if (!header.alg || header.alg.toLowerCase() === 'none') {
|
||||
throw new FederationVerificationError('alg:none tokens are not accepted');
|
||||
}
|
||||
|
||||
// Extract issuer from the payload
|
||||
const payload = decoded.payload as IFederatedTokenClaims;
|
||||
const iss = payload.iss;
|
||||
|
||||
if (!iss) {
|
||||
throw new FederationVerificationError('Token is missing the iss claim');
|
||||
}
|
||||
|
||||
// Check expected_issuer hint
|
||||
if (req.expected_issuer !== undefined && iss !== req.expected_issuer) {
|
||||
throw new FederationVerificationError(
|
||||
'Token issuer does not match expected issuer',
|
||||
);
|
||||
}
|
||||
|
||||
// Look up partner by issuer
|
||||
const partnerResult: QueryResult<FederationPartnerRow> = await this.pool.query(
|
||||
`SELECT * FROM federation_partners WHERE issuer = $1`,
|
||||
[iss],
|
||||
);
|
||||
|
||||
if (partnerResult.rows.length === 0) {
|
||||
throw new FederationVerificationError(`Unknown federation partner: ${iss}`);
|
||||
}
|
||||
|
||||
const partner = mapRowToPartner(partnerResult.rows[0]);
|
||||
|
||||
// Check partner status
|
||||
if (partner.status === 'suspended') {
|
||||
throw new FederationVerificationError('Federation partner is suspended');
|
||||
}
|
||||
|
||||
// Check partner expiry
|
||||
if (partner.expires_at !== null && partner.expires_at <= new Date()) {
|
||||
throw new FederationVerificationError('Federation partner has expired');
|
||||
}
|
||||
|
||||
// Fetch JWKS (cache-first)
|
||||
const jwksKeys = await this.fetchAndCacheJWKS(partner);
|
||||
|
||||
// Find the JWK matching the token's kid
|
||||
const kid = header.kid;
|
||||
const matchingKey = kid
|
||||
? jwksKeys.find((k) => k.kid === kid)
|
||||
: jwksKeys[0]; // fall back to first key if no kid in header
|
||||
|
||||
if (!matchingKey) {
|
||||
const kidLabel = kid ?? '(none)';
|
||||
throw new FederationVerificationError(
|
||||
`No matching key in partner JWKS for kid: ${kidLabel}`,
|
||||
);
|
||||
}
|
||||
|
||||
// Convert JWK to PEM
|
||||
const publicKeyPem = jwkToPem(matchingKey);
|
||||
|
||||
// Verify signature and exp claim
|
||||
let verifiedPayload: IFederatedTokenClaims;
|
||||
try {
|
||||
const result = jwt.verify(req.token, publicKeyPem, {
|
||||
algorithms: ['RS256', 'ES256'],
|
||||
});
|
||||
if (typeof result === 'string') {
|
||||
throw new FederationVerificationError('Unexpected string payload after verification');
|
||||
}
|
||||
verifiedPayload = result as IFederatedTokenClaims;
|
||||
} catch (err) {
|
||||
if (err instanceof FederationVerificationError) {
|
||||
throw err;
|
||||
}
|
||||
const message = err instanceof Error ? err.message : 'Unknown verification error';
|
||||
throw new FederationVerificationError(message);
|
||||
}
|
||||
|
||||
// Enforce allowed_organizations filter
|
||||
if (partner.allowed_organizations.length > 0) {
|
||||
const tokenOrgId = verifiedPayload['organization_id'];
|
||||
if (
|
||||
typeof tokenOrgId !== 'string' ||
|
||||
!partner.allowed_organizations.includes(tokenOrgId)
|
||||
) {
|
||||
throw new FederationVerificationError(
|
||||
"Token organization_id is not in the partner's allowed list",
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// Build and return the verification result
|
||||
const { iss: verifiedIss, sub, organization_id, ...restClaims } = verifiedPayload;
|
||||
|
||||
return {
|
||||
valid: true,
|
||||
issuer: verifiedIss,
|
||||
subject: sub,
|
||||
organization_id,
|
||||
claims: {
|
||||
iss: verifiedIss,
|
||||
sub,
|
||||
...(organization_id !== undefined ? { organization_id } : {}),
|
||||
...restClaims,
|
||||
} as Record<string, unknown>,
|
||||
};
|
||||
}
|
||||
|
||||
// ───────────────────────────────────────────────────────────────────────────
|
||||
// Private helpers
|
||||
// ───────────────────────────────────────────────────────────────────────────
|
||||
|
||||
/**
|
||||
* Fetches JWKS from the given URL with a 5-second timeout.
|
||||
* Validates that the response contains a `keys` array.
|
||||
*
|
||||
* @param jwksUri - The JWKS endpoint URL.
|
||||
* @returns Array of IJWKSKey objects from the partner.
|
||||
* @throws FederationPartnerError (400) if the endpoint is unreachable or returns invalid JWKS.
|
||||
*/
|
||||
private async fetchJWKS(jwksUri: string): Promise<IJWKSKey[]> {
|
||||
let response: Response;
|
||||
|
||||
try {
|
||||
const controller = new AbortController();
|
||||
const timeout = setTimeout(() => controller.abort(), 5000);
|
||||
try {
|
||||
response = await fetch(jwksUri, { signal: controller.signal });
|
||||
} finally {
|
||||
clearTimeout(timeout);
|
||||
}
|
||||
} catch {
|
||||
throw new FederationPartnerError(`Failed to reach JWKS endpoint: ${jwksUri}`);
|
||||
}
|
||||
|
||||
if (!response.ok) {
|
||||
throw new FederationPartnerError(`Failed to reach JWKS endpoint: ${jwksUri}`);
|
||||
}
|
||||
|
||||
let body: unknown;
|
||||
try {
|
||||
body = await response.json();
|
||||
} catch {
|
||||
throw new FederationPartnerError(
|
||||
`JWKS endpoint returned non-JSON response: ${jwksUri}`,
|
||||
);
|
||||
}
|
||||
|
||||
if (
|
||||
typeof body !== 'object' ||
|
||||
body === null ||
|
||||
!Array.isArray((body as Record<string, unknown>)['keys'])
|
||||
) {
|
||||
throw new FederationPartnerError(
|
||||
`JWKS endpoint returned invalid JWKS (missing keys array): ${jwksUri}`,
|
||||
);
|
||||
}
|
||||
|
||||
return (body as { keys: IJWKSKey[] }).keys;
|
||||
}
|
||||
|
||||
/**
|
||||
* Fetches JWKS for a partner — tries Redis cache first, falls back to HTTP.
|
||||
* Caches the result in Redis on a cache miss.
|
||||
*
|
||||
* @param partner - The federation partner whose JWKS to fetch.
|
||||
* @returns Array of IJWKSKey objects.
|
||||
*/
|
||||
private async fetchAndCacheJWKS(partner: IFederationPartner): Promise<IJWKSKey[]> {
|
||||
const cacheKey = `federation:jwks:${partner.issuer}`;
|
||||
const cached = await this.redis.get(cacheKey);
|
||||
|
||||
if (cached !== null) {
|
||||
return JSON.parse(cached) as IJWKSKey[];
|
||||
}
|
||||
|
||||
const keys = await this.fetchJWKS(partner.jwks_uri);
|
||||
await this.cacheJWKS(partner.issuer, keys);
|
||||
|
||||
return keys;
|
||||
}
|
||||
|
||||
/**
|
||||
* Stores JWKS keys in Redis with the configured TTL.
|
||||
*
|
||||
* @param issuer - The partner issuer URL (used as cache key suffix).
|
||||
* @param keys - The JWKS keys to cache.
|
||||
*/
|
||||
private async cacheJWKS(issuer: string, keys: IJWKSKey[]): Promise<void> {
|
||||
const cacheKey = `federation:jwks:${issuer}`;
|
||||
const ttl = parseInt(
|
||||
process.env['FEDERATION_JWKS_CACHE_TTL_SECONDS'] ?? '3600',
|
||||
10,
|
||||
);
|
||||
await this.redis.set(cacheKey, JSON.stringify(keys), { EX: ttl });
|
||||
}
|
||||
|
||||
/**
|
||||
* Invalidates the Redis JWKS cache entry for a given issuer.
|
||||
*
|
||||
* @param issuer - The partner issuer URL whose cache entry to remove.
|
||||
*/
|
||||
private async invalidateJWKSCache(issuer: string): Promise<void> {
|
||||
await this.redis.del(`federation:jwks:${issuer}`);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user