feat(phase-2): workstream 2 — Python SDK (sentryagent-idp)

Sync (requests) and async (httpx) clients with identical API surface
to the Node.js SDK.

Delivered:
- pyproject.toml — python>=3.9, hatchling build, mypy strict config
- types.py — all 14-endpoint request/response dataclasses
- errors.py — AgentIdPError with from_api_error, from_oauth2_error, network_error
- token_manager.py — thread-safe sync TokenManager, 60s refresh buffer
- async_token_manager.py — asyncio-safe AsyncTokenManager (httpx)
- _request.py — shared sync/async request helper (DRY)
- services/agents.py — AgentRegistryClient + AsyncAgentRegistryClient (5 methods each)
- services/credentials.py — CredentialClient + AsyncCredentialClient (4 methods each)
- services/token.py — TokenClient + AsyncTokenClient (introspect + revoke)
- services/audit.py — AuditClient + AsyncAuditClient (query + get)
- client.py — AgentIdPClient + AsyncAgentIdPClient
- __init__.py — barrel exports
- README.md — installation, quick start, full API reference

QA gates:
- mypy --strict: 0 errors (12 source files)
- pytest: 57/57 passed
- Coverage: 90.83% (required >= 80%)
- All 14 endpoints covered (sync + async)
- AgentIdPError raised on all failure paths

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
SentryAgent.ai Developer
2026-03-28 15:11:27 +00:00
parent 90a4addb21
commit c93562e685
38 changed files with 2645 additions and 13 deletions

View File

@@ -0,0 +1,82 @@
"""
SentryAgent.ai AgentIdP Python SDK.
Provides synchronous and asynchronous clients for the AgentIdP API.
Example (sync)::
from sentryagent_idp import AgentIdPClient
client = AgentIdPClient(
base_url="http://localhost:3000",
client_id="your-agent-id",
client_secret="your-client-secret",
)
result = client.agents.list_agents()
Example (async)::
from sentryagent_idp import AsyncAgentIdPClient
client = AsyncAgentIdPClient(
base_url="http://localhost:3000",
client_id="your-agent-id",
client_secret="your-client-secret",
)
result = await client.agents.list_agents()
"""
from .client import AgentIdPClient, AsyncAgentIdPClient
from .errors import AgentIdPError
from .token_manager import TokenManager
from .async_token_manager import AsyncTokenManager
from .types import (
Agent,
AgentStatus,
AgentType,
AuditAction,
AuditEvent,
AuditOutcome,
Credential,
CredentialStatus,
CredentialWithSecret,
DeploymentEnv,
IntrospectResponse,
OAuthScope,
PaginatedAgents,
PaginatedAuditEvents,
PaginatedCredentials,
RegisterAgentRequest,
TokenResponse,
UpdateAgentRequest,
)
__all__ = [
# Clients
"AgentIdPClient",
"AsyncAgentIdPClient",
# Errors
"AgentIdPError",
# Token managers (for advanced use)
"TokenManager",
"AsyncTokenManager",
# Types
"Agent",
"AgentStatus",
"AgentType",
"AuditAction",
"AuditEvent",
"AuditOutcome",
"Credential",
"CredentialStatus",
"CredentialWithSecret",
"DeploymentEnv",
"IntrospectResponse",
"OAuthScope",
"PaginatedAgents",
"PaginatedAuditEvents",
"PaginatedCredentials",
"RegisterAgentRequest",
"TokenResponse",
"UpdateAgentRequest",
]

View File

@@ -0,0 +1,127 @@
"""
Internal HTTP request helpers shared by all service clients.
Not part of the public API.
"""
from __future__ import annotations
from typing import Any, Callable, Dict, Optional
import requests
import httpx
from .errors import AgentIdPError
def sync_request(
method: str,
base_url: str,
path: str,
token: str,
body: Optional[Any] = None,
params: Optional[Dict[str, Any]] = None,
) -> Any:
"""
Make a synchronous authenticated JSON request to the AgentIdP API.
Args:
method: HTTP method (GET, POST, PATCH, DELETE).
base_url: AgentIdP base URL.
path: API path (e.g. ``/api/v1/agents``).
token: Bearer access token.
body: Optional request body (serialised as JSON).
params: Optional query parameters (None values are excluded).
Returns:
Parsed JSON response body, or None for 204 responses.
Raises:
AgentIdPError: On any API or network failure.
"""
url = base_url.rstrip("/") + path
headers = {
"Authorization": f"Bearer {token}",
"Accept": "application/json",
}
filtered_params: Optional[Dict[str, str]] = (
{k: str(v) for k, v in params.items() if v is not None}
if params
else None
)
try:
response = requests.request(
method=method,
url=url,
headers=headers,
json=body,
params=filtered_params,
timeout=30,
)
except requests.RequestException as exc:
raise AgentIdPError.network_error(exc) from exc
if response.status_code == 204:
return None
resp_body = response.json() if response.content else {}
if not response.ok:
raise AgentIdPError.from_api_error(resp_body, response.status_code)
return resp_body
async def async_request(
method: str,
base_url: str,
path: str,
token: str,
body: Optional[Any] = None,
params: Optional[Dict[str, Any]] = None,
) -> Any:
"""
Make an asynchronous authenticated JSON request to the AgentIdP API.
Args:
method: HTTP method (GET, POST, PATCH, DELETE).
base_url: AgentIdP base URL.
path: API path.
token: Bearer access token.
body: Optional request body (serialised as JSON).
params: Optional query parameters (None values are excluded).
Returns:
Parsed JSON response body, or None for 204 responses.
Raises:
AgentIdPError: On any API or network failure.
"""
url = base_url.rstrip("/") + path
headers = {
"Authorization": f"Bearer {token}",
"Accept": "application/json",
}
filtered_params: Optional[Dict[str, str]] = (
{k: str(v) for k, v in params.items() if v is not None}
if params
else None
)
try:
async with httpx.AsyncClient(timeout=30) as client:
response = await client.request(
method=method,
url=url,
headers=headers,
json=body,
params=filtered_params,
)
except httpx.RequestError as exc:
raise AgentIdPError.network_error(exc) from exc
if response.status_code == 204:
return None
resp_body = response.json() if response.content else {}
if not response.is_success:
raise AgentIdPError.from_api_error(resp_body, response.status_code)
return resp_body

View File

@@ -0,0 +1,117 @@
"""
Asynchronous TokenManager — handles OAuth 2.0 token acquisition, caching, and refresh.
Uses httpx for async HTTP. Tokens are re-issued automatically when within 60 seconds of expiry.
"""
from __future__ import annotations
import asyncio
import time
from dataclasses import dataclass
from typing import Optional
import httpx
from .errors import AgentIdPError
from .types import TokenResponse
#: Seconds before expiry at which a token refresh is triggered.
REFRESH_BUFFER_SECONDS = 60
@dataclass
class _CachedToken:
access_token: str
expires_at: float # Unix timestamp (seconds)
class AsyncTokenManager:
"""
Asyncio-safe asynchronous token manager.
Acquires and caches OAuth 2.0 access tokens. Automatically refreshes
the token when it is within :data:`REFRESH_BUFFER_SECONDS` of expiry.
Args:
base_url: AgentIdP server base URL (e.g. ``http://localhost:3000``).
client_id: The agent's ``agentId`` (UUID).
client_secret: The agent's credential secret.
scopes: Space-separated OAuth 2.0 scopes to request.
"""
def __init__(
self,
base_url: str,
client_id: str,
client_secret: str,
scopes: str,
) -> None:
self._base_url = base_url.rstrip("/")
self._client_id = client_id
self._client_secret = client_secret
self._scopes = scopes
self._cached: Optional[_CachedToken] = None
self._lock: Optional[asyncio.Lock] = None
def _get_lock(self) -> asyncio.Lock:
"""Lazily create the asyncio.Lock on first use (supports different event loops)."""
if self._lock is None:
self._lock = asyncio.Lock()
return self._lock
async def get_token(self) -> str:
"""
Return a valid access token, refreshing if necessary.
Returns:
A valid JWT access token string.
Raises:
AgentIdPError: If token acquisition fails.
"""
async with self._get_lock():
now = time.time()
if (
self._cached is not None
and self._cached.expires_at - now > REFRESH_BUFFER_SECONDS
):
return self._cached.access_token
token_response = await self._issue_token()
self._cached = _CachedToken(
access_token=token_response.access_token,
expires_at=now + token_response.expires_in,
)
return self._cached.access_token
def clear_cache(self) -> None:
"""Clear the cached token, forcing re-acquisition on the next call."""
self._cached = None
async def _issue_token(self) -> TokenResponse:
"""
POST /api/v1/token to obtain a new access token.
Returns:
TokenResponse from the API.
Raises:
AgentIdPError: On authentication failure or network error.
"""
url = f"{self._base_url}/api/v1/token"
data = {
"grant_type": "client_credentials",
"client_id": self._client_id,
"client_secret": self._client_secret,
"scope": self._scopes,
}
try:
async with httpx.AsyncClient(timeout=30) as client:
response = await client.post(url, data=data)
except httpx.RequestError as exc:
raise AgentIdPError.network_error(exc) from exc
body = response.json()
if not response.is_success:
raise AgentIdPError.from_oauth2_error(body, response.status_code)
return TokenResponse.from_dict(body)

View File

@@ -0,0 +1,128 @@
"""
Top-level client for the SentryAgent.ai AgentIdP API.
Provides both synchronous (AgentIdPClient) and asynchronous (AsyncAgentIdPClient) variants.
"""
from __future__ import annotations
from typing import List, Optional
from .token_manager import TokenManager
from .async_token_manager import AsyncTokenManager
from .services.agents import AgentRegistryClient, AsyncAgentRegistryClient
from .services.credentials import CredentialClient, AsyncCredentialClient
from .services.token import TokenClient, AsyncTokenClient
from .services.audit import AuditClient, AsyncAuditClient
from .types import OAuthScope
_DEFAULT_SCOPES: List[OAuthScope] = [
"agents:read",
"agents:write",
"tokens:read",
"audit:read",
]
class AgentIdPClient:
"""
Synchronous client for the SentryAgent.ai AgentIdP API.
Composes all service clients under a single entry point. Handles token
acquisition and caching automatically via :class:`~.token_manager.TokenManager`.
Args:
base_url: Base URL of the AgentIdP server (e.g. ``http://localhost:3000``).
client_id: The agent's ``agentId`` (UUID).
client_secret: The credential secret.
scopes: OAuth 2.0 scopes to request. Defaults to all four scopes.
Example::
from sentryagent_idp import AgentIdPClient, RegisterAgentRequest
client = AgentIdPClient(
base_url="http://localhost:3000",
client_id="your-agent-id",
client_secret="your-client-secret",
)
agents = client.agents.list_agents()
"""
def __init__(
self,
base_url: str,
client_id: str,
client_secret: str,
scopes: Optional[List[OAuthScope]] = None,
) -> None:
scope_str = " ".join(scopes if scopes is not None else _DEFAULT_SCOPES)
self._token_manager = TokenManager(base_url, client_id, client_secret, scope_str)
get_token = self._token_manager.get_token
#: Agent Registry operations: register, list, get, update, decommission.
self.agents = AgentRegistryClient(base_url, get_token)
#: Credential operations: generate, list, rotate, revoke.
self.credentials = CredentialClient(base_url, get_token)
#: Token operations: introspect, revoke.
self.tokens = TokenClient(base_url, get_token)
#: Audit log operations: query, get event.
self.audit = AuditClient(base_url, get_token)
def clear_token_cache(self) -> None:
"""
Clear the cached access token.
The next API call will request a new token. Use this after rotating credentials.
"""
self._token_manager.clear_cache()
class AsyncAgentIdPClient:
"""
Asynchronous client for the SentryAgent.ai AgentIdP API.
All methods are coroutines and must be awaited. Token acquisition and caching
are handled automatically via :class:`~.async_token_manager.AsyncTokenManager`.
Args:
base_url: Base URL of the AgentIdP server.
client_id: The agent's ``agentId`` (UUID).
client_secret: The credential secret.
scopes: OAuth 2.0 scopes to request. Defaults to all four scopes.
Example::
from sentryagent_idp import AsyncAgentIdPClient
client = AsyncAgentIdPClient(
base_url="http://localhost:3000",
client_id="your-agent-id",
client_secret="your-client-secret",
)
agents = await client.agents.list_agents()
"""
def __init__(
self,
base_url: str,
client_id: str,
client_secret: str,
scopes: Optional[List[OAuthScope]] = None,
) -> None:
scope_str = " ".join(scopes if scopes is not None else _DEFAULT_SCOPES)
self._token_manager = AsyncTokenManager(base_url, client_id, client_secret, scope_str)
get_token = self._token_manager.get_token
#: Agent Registry operations (async).
self.agents = AsyncAgentRegistryClient(base_url, get_token)
#: Credential operations (async).
self.credentials = AsyncCredentialClient(base_url, get_token)
#: Token operations (async).
self.tokens = AsyncTokenClient(base_url, get_token)
#: Audit log operations (async).
self.audit = AsyncAuditClient(base_url, get_token)
def clear_token_cache(self) -> None:
"""Clear the cached access token."""
self._token_manager.clear_cache()

View File

@@ -0,0 +1,108 @@
"""
Error types for the SentryAgent.ai AgentIdP Python SDK.
All API failures are raised as AgentIdPError — never as raw requests/httpx exceptions.
"""
from __future__ import annotations
from typing import Any, Dict, Optional
class AgentIdPError(Exception):
"""
Typed exception raised for all AgentIdP API failures.
Attributes:
code: Machine-readable error code from the API (e.g. ``AgentNotFoundError``).
http_status: HTTP status code of the failed response.
details: Optional structured details from the API error response.
"""
def __init__(
self,
code: str,
message: str,
http_status: int,
details: Optional[Dict[str, Any]] = None,
) -> None:
super().__init__(message)
self.code = code
self.http_status = http_status
self.details = details
def __repr__(self) -> str:
return (
f"AgentIdPError(code={self.code!r}, "
f"http_status={self.http_status}, "
f"message={str(self)!r})"
)
@classmethod
def from_api_error(
cls, body: Any, http_status: int
) -> "AgentIdPError":
"""
Create an AgentIdPError from a standard API error response body.
Args:
body: Parsed response body (dict or unknown).
http_status: HTTP status code.
Returns:
AgentIdPError instance.
"""
if isinstance(body, dict) and "code" in body and "message" in body:
return cls(
code=str(body["code"]),
message=str(body["message"]),
http_status=http_status,
details=body.get("details"),
)
return cls(
code="UNKNOWN_ERROR",
message=str(body),
http_status=http_status,
)
@classmethod
def from_oauth2_error(
cls, body: Any, http_status: int
) -> "AgentIdPError":
"""
Create an AgentIdPError from an OAuth 2.0 error response body.
Args:
body: Parsed response body.
http_status: HTTP status code.
Returns:
AgentIdPError instance.
"""
if isinstance(body, dict):
return cls(
code=str(body.get("error", "unknown_error")),
message=str(body.get("error_description", "Token request failed.")),
http_status=http_status,
)
return cls(
code="unknown_error",
message=str(body),
http_status=http_status,
)
@classmethod
def network_error(cls, cause: Exception) -> "AgentIdPError":
"""
Create an AgentIdPError for a network-level failure (no HTTP response).
Args:
cause: The underlying exception.
Returns:
AgentIdPError with http_status=0.
"""
return cls(
code="NETWORK_ERROR",
message=f"Network error: {cause}",
http_status=0,
)

View File

@@ -0,0 +1 @@
# Services package

View File

@@ -0,0 +1,202 @@
"""
Agent Registry service clients — sync and async.
Covers all five agent endpoints: register, list, get, update, decommission.
"""
from __future__ import annotations
from typing import Any, Callable, Coroutine, Dict, Optional
from .._request import sync_request, async_request
from ..types import (
Agent,
AgentStatus,
AgentType,
PaginatedAgents,
RegisterAgentRequest,
UpdateAgentRequest,
)
class AgentRegistryClient:
"""
Synchronous client for the Agent Registry service.
Args:
base_url: AgentIdP server base URL.
get_token: Callable that returns a valid Bearer token.
"""
def __init__(
self,
base_url: str,
get_token: Callable[[], str],
) -> None:
self._base_url = base_url
self._get_token = get_token
def register_agent(self, request: RegisterAgentRequest) -> Agent:
"""
Register a new AI agent.
Args:
request: Agent registration parameters.
Returns:
The created Agent record.
Raises:
AgentIdPError: On API or network failure.
"""
data = sync_request(
"POST", self._base_url, "/api/v1/agents",
self._get_token(), body=request.to_dict(),
)
return Agent.from_dict(data)
def list_agents(
self,
status: Optional[AgentStatus] = None,
agent_type: Optional[AgentType] = None,
page: int = 1,
limit: int = 20,
) -> PaginatedAgents:
"""
List all registered agents with optional filters.
Args:
status: Filter by lifecycle status.
agent_type: Filter by agent type.
page: Page number (1-based).
limit: Results per page.
Returns:
PaginatedAgents response.
Raises:
AgentIdPError: On API or network failure.
"""
data = sync_request(
"GET", self._base_url, "/api/v1/agents",
self._get_token(),
params={"status": status, "agentType": agent_type, "page": page, "limit": limit},
)
return PaginatedAgents.from_dict(data)
def get_agent(self, agent_id: str) -> Agent:
"""
Get a single agent by its agentId.
Args:
agent_id: The agent UUID.
Returns:
Agent record.
Raises:
AgentIdPError: If agent not found or network failure.
"""
data = sync_request(
"GET", self._base_url, f"/api/v1/agents/{agent_id}",
self._get_token(),
)
return Agent.from_dict(data)
def update_agent(self, agent_id: str, request: UpdateAgentRequest) -> Agent:
"""
Update mutable fields on an existing agent.
Args:
agent_id: The agent UUID.
request: Fields to update.
Returns:
Updated Agent record.
Raises:
AgentIdPError: On API or network failure.
"""
data = sync_request(
"PATCH", self._base_url, f"/api/v1/agents/{agent_id}",
self._get_token(), body=request.to_dict(),
)
return Agent.from_dict(data)
def decommission_agent(self, agent_id: str) -> None:
"""
Decommission an agent. This is irreversible.
Args:
agent_id: The agent UUID.
Raises:
AgentIdPError: On API or network failure.
"""
sync_request(
"DELETE", self._base_url, f"/api/v1/agents/{agent_id}",
self._get_token(),
)
class AsyncAgentRegistryClient:
"""
Asynchronous client for the Agent Registry service.
Args:
base_url: AgentIdP server base URL.
get_token: Async callable that returns a valid Bearer token.
"""
def __init__(
self,
base_url: str,
get_token: Callable[[], Coroutine[Any, Any, str]],
) -> None:
self._base_url = base_url
self._get_token = get_token
async def register_agent(self, request: RegisterAgentRequest) -> Agent:
"""Register a new AI agent (async)."""
data = await async_request(
"POST", self._base_url, "/api/v1/agents",
await self._get_token(), body=request.to_dict(),
)
return Agent.from_dict(data)
async def list_agents(
self,
status: Optional[AgentStatus] = None,
agent_type: Optional[AgentType] = None,
page: int = 1,
limit: int = 20,
) -> PaginatedAgents:
"""List all registered agents with optional filters (async)."""
data = await async_request(
"GET", self._base_url, "/api/v1/agents",
await self._get_token(),
params={"status": status, "agentType": agent_type, "page": page, "limit": limit},
)
return PaginatedAgents.from_dict(data)
async def get_agent(self, agent_id: str) -> Agent:
"""Get a single agent by its agentId (async)."""
data = await async_request(
"GET", self._base_url, f"/api/v1/agents/{agent_id}",
await self._get_token(),
)
return Agent.from_dict(data)
async def update_agent(self, agent_id: str, request: UpdateAgentRequest) -> Agent:
"""Update mutable fields on an existing agent (async)."""
data = await async_request(
"PATCH", self._base_url, f"/api/v1/agents/{agent_id}",
await self._get_token(), body=request.to_dict(),
)
return Agent.from_dict(data)
async def decommission_agent(self, agent_id: str) -> None:
"""Decommission an agent — irreversible (async)."""
await async_request(
"DELETE", self._base_url, f"/api/v1/agents/{agent_id}",
await self._get_token(),
)

View File

@@ -0,0 +1,144 @@
"""
Audit Log service clients — sync and async.
Covers query (list) and get-by-id operations.
"""
from __future__ import annotations
from typing import Any, Callable, Coroutine, Optional
from .._request import sync_request, async_request
from ..types import AuditAction, AuditEvent, AuditOutcome, PaginatedAuditEvents
class AuditClient:
"""
Synchronous client for the Audit Log service.
Args:
base_url: AgentIdP server base URL.
get_token: Callable that returns a valid Bearer token.
"""
def __init__(
self,
base_url: str,
get_token: Callable[[], str],
) -> None:
self._base_url = base_url
self._get_token = get_token
def query_audit_log(
self,
agent_id: Optional[str] = None,
action: Optional[AuditAction] = None,
outcome: Optional[AuditOutcome] = None,
from_date: Optional[str] = None,
to_date: Optional[str] = None,
page: int = 1,
limit: int = 20,
) -> PaginatedAuditEvents:
"""
Query audit log events with optional filters. Requires ``audit:read`` scope.
Events are retained for 90 days.
Args:
agent_id: Filter by agent UUID.
action: Filter by audit action type.
outcome: Filter by outcome (success or failure).
from_date: ISO 8601 start datetime (inclusive).
to_date: ISO 8601 end datetime (inclusive).
page: Page number (1-based).
limit: Results per page.
Returns:
PaginatedAuditEvents response.
Raises:
AgentIdPError: On API or network failure.
"""
data = sync_request(
"GET", self._base_url, "/api/v1/audit",
self._get_token(),
params={
"agentId": agent_id,
"action": action,
"outcome": outcome,
"fromDate": from_date,
"toDate": to_date,
"page": page,
"limit": limit,
},
)
return PaginatedAuditEvents.from_dict(data)
def get_audit_event(self, event_id: str) -> AuditEvent:
"""
Get a single audit event by its eventId. Requires ``audit:read`` scope.
Args:
event_id: The audit event UUID.
Returns:
AuditEvent record.
Raises:
AgentIdPError: On API or network failure.
"""
data = sync_request(
"GET", self._base_url, f"/api/v1/audit/{event_id}",
self._get_token(),
)
return AuditEvent.from_dict(data)
class AsyncAuditClient:
"""
Asynchronous client for the Audit Log service.
Args:
base_url: AgentIdP server base URL.
get_token: Async callable that returns a valid Bearer token.
"""
def __init__(
self,
base_url: str,
get_token: Callable[[], Coroutine[Any, Any, str]],
) -> None:
self._base_url = base_url
self._get_token = get_token
async def query_audit_log(
self,
agent_id: Optional[str] = None,
action: Optional[AuditAction] = None,
outcome: Optional[AuditOutcome] = None,
from_date: Optional[str] = None,
to_date: Optional[str] = None,
page: int = 1,
limit: int = 20,
) -> PaginatedAuditEvents:
"""Query audit log events with optional filters (async)."""
data = await async_request(
"GET", self._base_url, "/api/v1/audit",
await self._get_token(),
params={
"agentId": agent_id,
"action": action,
"outcome": outcome,
"fromDate": from_date,
"toDate": to_date,
"page": page,
"limit": limit,
},
)
return PaginatedAuditEvents.from_dict(data)
async def get_audit_event(self, event_id: str) -> AuditEvent:
"""Get a single audit event by its eventId (async)."""
data = await async_request(
"GET", self._base_url, f"/api/v1/audit/{event_id}",
await self._get_token(),
)
return AuditEvent.from_dict(data)

View File

@@ -0,0 +1,209 @@
"""
Credential Management service clients — sync and async.
Covers generate, list, rotate, and revoke operations.
"""
from __future__ import annotations
from typing import Any, Callable, Coroutine, Optional
from .._request import sync_request, async_request
from ..types import (
Credential,
CredentialStatus,
CredentialWithSecret,
PaginatedCredentials,
)
class CredentialClient:
"""
Synchronous client for the Credential Management service.
Args:
base_url: AgentIdP server base URL.
get_token: Callable that returns a valid Bearer token.
"""
def __init__(
self,
base_url: str,
get_token: Callable[[], str],
) -> None:
self._base_url = base_url
self._get_token = get_token
def generate_credential(
self,
agent_id: str,
expires_at: Optional[str] = None,
) -> CredentialWithSecret:
"""
Generate a new credential for an agent.
The ``client_secret`` is shown **once** — store it securely immediately.
Args:
agent_id: The agent UUID.
expires_at: Optional ISO 8601 expiry date string.
Returns:
CredentialWithSecret including the one-time plain-text secret.
Raises:
AgentIdPError: On API or network failure.
"""
body = {"expiresAt": expires_at} if expires_at is not None else None
data = sync_request(
"POST", self._base_url, f"/api/v1/agents/{agent_id}/credentials",
self._get_token(), body=body,
)
return CredentialWithSecret.from_dict(data)
def list_credentials(
self,
agent_id: str,
status: Optional[CredentialStatus] = None,
page: int = 1,
limit: int = 20,
) -> PaginatedCredentials:
"""
List credentials for an agent. Secrets are never returned in list responses.
Args:
agent_id: The agent UUID.
status: Filter by credential status.
page: Page number (1-based).
limit: Results per page.
Returns:
PaginatedCredentials response.
Raises:
AgentIdPError: On API or network failure.
"""
data = sync_request(
"GET", self._base_url, f"/api/v1/agents/{agent_id}/credentials",
self._get_token(),
params={"status": status, "page": page, "limit": limit},
)
return PaginatedCredentials.from_dict(data)
def rotate_credential(
self, agent_id: str, credential_id: str
) -> CredentialWithSecret:
"""
Rotate a credential. The same ``credential_id`` is retained; a new secret is issued.
The old secret is immediately invalidated.
The new ``client_secret`` is shown **once** — store it securely immediately.
Args:
agent_id: The agent UUID.
credential_id: The credential UUID to rotate.
Returns:
CredentialWithSecret with the new one-time secret.
Raises:
AgentIdPError: On API or network failure.
"""
data = sync_request(
"POST",
self._base_url,
f"/api/v1/agents/{agent_id}/credentials/{credential_id}/rotate",
self._get_token(),
)
return CredentialWithSecret.from_dict(data)
def revoke_credential(
self, agent_id: str, credential_id: str
) -> Credential:
"""
Revoke a credential permanently.
Args:
agent_id: The agent UUID.
credential_id: The credential UUID to revoke.
Returns:
The revoked Credential record.
Raises:
AgentIdPError: On API or network failure.
"""
data = sync_request(
"DELETE",
self._base_url,
f"/api/v1/agents/{agent_id}/credentials/{credential_id}",
self._get_token(),
)
return Credential.from_dict(data)
class AsyncCredentialClient:
"""
Asynchronous client for the Credential Management service.
Args:
base_url: AgentIdP server base URL.
get_token: Async callable that returns a valid Bearer token.
"""
def __init__(
self,
base_url: str,
get_token: Callable[[], Coroutine[Any, Any, str]],
) -> None:
self._base_url = base_url
self._get_token = get_token
async def generate_credential(
self,
agent_id: str,
expires_at: Optional[str] = None,
) -> CredentialWithSecret:
"""Generate a new credential for an agent (async)."""
body = {"expiresAt": expires_at} if expires_at is not None else None
data = await async_request(
"POST", self._base_url, f"/api/v1/agents/{agent_id}/credentials",
await self._get_token(), body=body,
)
return CredentialWithSecret.from_dict(data)
async def list_credentials(
self,
agent_id: str,
status: Optional[CredentialStatus] = None,
page: int = 1,
limit: int = 20,
) -> PaginatedCredentials:
"""List credentials for an agent (async)."""
data = await async_request(
"GET", self._base_url, f"/api/v1/agents/{agent_id}/credentials",
await self._get_token(),
params={"status": status, "page": page, "limit": limit},
)
return PaginatedCredentials.from_dict(data)
async def rotate_credential(
self, agent_id: str, credential_id: str
) -> CredentialWithSecret:
"""Rotate a credential (async)."""
data = await async_request(
"POST",
self._base_url,
f"/api/v1/agents/{agent_id}/credentials/{credential_id}/rotate",
await self._get_token(),
)
return CredentialWithSecret.from_dict(data)
async def revoke_credential(
self, agent_id: str, credential_id: str
) -> Credential:
"""Revoke a credential permanently (async)."""
data = await async_request(
"DELETE",
self._base_url,
f"/api/v1/agents/{agent_id}/credentials/{credential_id}",
await self._get_token(),
)
return Credential.from_dict(data)

View File

@@ -0,0 +1,154 @@
"""
Token service clients (introspect and revoke) — sync and async.
Token issuance is handled by TokenManager / AsyncTokenManager.
"""
from __future__ import annotations
from typing import Any, Callable, Coroutine
import requests
import httpx
from ..errors import AgentIdPError
from ..types import IntrospectResponse
class TokenClient:
"""
Synchronous client for token introspection and revocation.
Args:
base_url: AgentIdP server base URL.
get_token: Callable that returns a valid Bearer token.
"""
def __init__(
self,
base_url: str,
get_token: Callable[[], str],
) -> None:
self._base_url = base_url.rstrip("/")
self._get_token = get_token
def introspect_token(self, token_to_check: str) -> IntrospectResponse:
"""
Check whether a token is currently active.
Always returns successfully — check ``response.active`` for validity.
Args:
token_to_check: The JWT string to introspect.
Returns:
IntrospectResponse with ``active`` field set.
Raises:
AgentIdPError: On API or network failure.
"""
url = f"{self._base_url}/api/v1/token/introspect"
try:
response = requests.post(
url,
data={"token": token_to_check},
headers={
"Authorization": f"Bearer {self._get_token()}",
"Content-Type": "application/x-www-form-urlencoded",
},
timeout=30,
)
except requests.RequestException as exc:
raise AgentIdPError.network_error(exc) from exc
body = response.json()
if not response.ok:
raise AgentIdPError.from_api_error(body, response.status_code)
return IntrospectResponse.from_dict(body)
def revoke_token(self, token_to_revoke: str) -> None:
"""
Revoke a token immediately. Idempotent (RFC 7009).
Args:
token_to_revoke: The JWT string to revoke.
Raises:
AgentIdPError: On API or network failure.
"""
url = f"{self._base_url}/api/v1/token/revoke"
try:
response = requests.post(
url,
data={"token": token_to_revoke},
headers={
"Authorization": f"Bearer {self._get_token()}",
"Content-Type": "application/x-www-form-urlencoded",
},
timeout=30,
)
except requests.RequestException as exc:
raise AgentIdPError.network_error(exc) from exc
if not response.ok:
body = response.json() if response.content else {}
raise AgentIdPError.from_api_error(body, response.status_code)
class AsyncTokenClient:
"""
Asynchronous client for token introspection and revocation.
Args:
base_url: AgentIdP server base URL.
get_token: Async callable that returns a valid Bearer token.
"""
def __init__(
self,
base_url: str,
get_token: Callable[[], Coroutine[Any, Any, str]],
) -> None:
self._base_url = base_url.rstrip("/")
self._get_token = get_token
async def introspect_token(self, token_to_check: str) -> IntrospectResponse:
"""Check whether a token is currently active (async)."""
url = f"{self._base_url}/api/v1/token/introspect"
token = await self._get_token()
try:
async with httpx.AsyncClient(timeout=30) as client:
response = await client.post(
url,
data={"token": token_to_check},
headers={
"Authorization": f"Bearer {token}",
"Content-Type": "application/x-www-form-urlencoded",
},
)
except httpx.RequestError as exc:
raise AgentIdPError.network_error(exc) from exc
body = response.json()
if not response.is_success:
raise AgentIdPError.from_api_error(body, response.status_code)
return IntrospectResponse.from_dict(body)
async def revoke_token(self, token_to_revoke: str) -> None:
"""Revoke a token immediately — idempotent (async)."""
url = f"{self._base_url}/api/v1/token/revoke"
token = await self._get_token()
try:
async with httpx.AsyncClient(timeout=30) as client:
response = await client.post(
url,
data={"token": token_to_revoke},
headers={
"Authorization": f"Bearer {token}",
"Content-Type": "application/x-www-form-urlencoded",
},
)
except httpx.RequestError as exc:
raise AgentIdPError.network_error(exc) from exc
if not response.is_success:
body = response.json() if response.content else {}
raise AgentIdPError.from_api_error(body, response.status_code)

View File

@@ -0,0 +1,116 @@
"""
Synchronous TokenManager — handles OAuth 2.0 token acquisition, caching, and refresh.
Tokens are re-issued automatically when expired or within 60 seconds of expiry.
"""
from __future__ import annotations
import time
import threading
from dataclasses import dataclass
from typing import Optional
import requests
from .errors import AgentIdPError
from .types import TokenResponse
#: Seconds before expiry at which a token refresh is triggered.
REFRESH_BUFFER_SECONDS = 60
@dataclass
class _CachedToken:
access_token: str
expires_at: float # Unix timestamp (seconds)
class TokenManager:
"""
Thread-safe synchronous token manager.
Acquires and caches OAuth 2.0 access tokens. Automatically refreshes
the token when it is within :data:`REFRESH_BUFFER_SECONDS` of expiry.
Args:
base_url: AgentIdP server base URL (e.g. ``http://localhost:3000``).
client_id: The agent's ``agentId`` (UUID).
client_secret: The agent's credential secret.
scopes: Space-separated OAuth 2.0 scopes to request.
"""
def __init__(
self,
base_url: str,
client_id: str,
client_secret: str,
scopes: str,
) -> None:
self._base_url = base_url.rstrip("/")
self._client_id = client_id
self._client_secret = client_secret
self._scopes = scopes
self._cached: Optional[_CachedToken] = None
self._lock = threading.Lock()
def get_token(self) -> str:
"""
Return a valid access token, refreshing if necessary.
Returns:
A valid JWT access token string.
Raises:
AgentIdPError: If token acquisition fails.
"""
with self._lock:
now = time.time()
if (
self._cached is not None
and self._cached.expires_at - now > REFRESH_BUFFER_SECONDS
):
return self._cached.access_token
token_response = self._issue_token()
self._cached = _CachedToken(
access_token=token_response.access_token,
expires_at=now + token_response.expires_in,
)
return self._cached.access_token
def clear_cache(self) -> None:
"""Clear the cached token, forcing re-acquisition on the next call."""
with self._lock:
self._cached = None
def _issue_token(self) -> TokenResponse:
"""
POST /api/v1/token to obtain a new access token.
Returns:
TokenResponse from the API.
Raises:
AgentIdPError: On authentication failure or network error.
"""
url = f"{self._base_url}/api/v1/token"
data = {
"grant_type": "client_credentials",
"client_id": self._client_id,
"client_secret": self._client_secret,
"scope": self._scopes,
}
try:
response = requests.post(
url,
data=data,
headers={"Content-Type": "application/x-www-form-urlencoded"},
timeout=30,
)
except requests.RequestException as exc:
raise AgentIdPError.network_error(exc) from exc
body = response.json()
if not response.ok:
raise AgentIdPError.from_oauth2_error(body, response.status_code)
return TokenResponse.from_dict(body)

View File

@@ -0,0 +1,323 @@
"""
Type definitions for the SentryAgent.ai AgentIdP Python SDK.
All request and response shapes derived from the AgentIdP OpenAPI 3.0 specs.
"""
from __future__ import annotations
from typing import Any, Dict, List, Literal, Optional
from dataclasses import dataclass, field
# ─────────────────────────────────────────────────────────────────────────────
# Enums / Literal types
# ─────────────────────────────────────────────────────────────────────────────
AgentType = Literal[
"screener",
"classifier",
"orchestrator",
"extractor",
"summarizer",
"router",
"monitor",
"custom",
]
AgentStatus = Literal["active", "suspended", "decommissioned"]
DeploymentEnv = Literal["development", "staging", "production"]
CredentialStatus = Literal["active", "revoked"]
OAuthScope = Literal["agents:read", "agents:write", "tokens:read", "audit:read"]
AuditAction = Literal[
"agent.created",
"agent.updated",
"agent.decommissioned",
"agent.suspended",
"agent.reactivated",
"token.issued",
"token.revoked",
"token.introspected",
"credential.generated",
"credential.rotated",
"credential.revoked",
"auth.failed",
]
AuditOutcome = Literal["success", "failure"]
# ─────────────────────────────────────────────────────────────────────────────
# Agent Registry
# ─────────────────────────────────────────────────────────────────────────────
@dataclass
class Agent:
"""A registered AI agent identity."""
agent_id: str
email: str
agent_type: AgentType
version: str
capabilities: List[str]
owner: str
deployment_env: DeploymentEnv
status: AgentStatus
created_at: str
updated_at: str
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "Agent":
"""Deserialise from an API response dict."""
return cls(
agent_id=data["agentId"],
email=data["email"],
agent_type=data["agentType"],
version=data["version"],
capabilities=data["capabilities"],
owner=data["owner"],
deployment_env=data["deploymentEnv"],
status=data["status"],
created_at=data["createdAt"],
updated_at=data["updatedAt"],
)
@dataclass
class RegisterAgentRequest:
"""Request body for registering a new AI agent."""
email: str
agent_type: AgentType
version: str
capabilities: List[str]
owner: str
deployment_env: DeploymentEnv
def to_dict(self) -> Dict[str, Any]:
"""Serialise to API request dict."""
return {
"email": self.email,
"agentType": self.agent_type,
"version": self.version,
"capabilities": self.capabilities,
"owner": self.owner,
"deploymentEnv": self.deployment_env,
}
@dataclass
class UpdateAgentRequest:
"""Request body for partially updating an agent (all fields optional)."""
agent_type: Optional[AgentType] = None
version: Optional[str] = None
capabilities: Optional[List[str]] = None
owner: Optional[str] = None
deployment_env: Optional[DeploymentEnv] = None
status: Optional[AgentStatus] = None
def to_dict(self) -> Dict[str, Any]:
"""Serialise to API request dict, omitting None fields."""
out: Dict[str, Any] = {}
if self.agent_type is not None:
out["agentType"] = self.agent_type
if self.version is not None:
out["version"] = self.version
if self.capabilities is not None:
out["capabilities"] = self.capabilities
if self.owner is not None:
out["owner"] = self.owner
if self.deployment_env is not None:
out["deploymentEnv"] = self.deployment_env
if self.status is not None:
out["status"] = self.status
return out
@dataclass
class PaginatedAgents:
"""Paginated list of agents."""
data: List[Agent]
total: int
page: int
limit: int
@classmethod
def from_dict(cls, d: Dict[str, Any]) -> "PaginatedAgents":
return cls(
data=[Agent.from_dict(a) for a in d["data"]],
total=d["total"],
page=d["page"],
limit=d["limit"],
)
# ─────────────────────────────────────────────────────────────────────────────
# Credential Management
# ─────────────────────────────────────────────────────────────────────────────
@dataclass
class Credential:
"""A credential record (client_secret never included)."""
credential_id: str
client_id: str
status: CredentialStatus
created_at: str
expires_at: Optional[str]
revoked_at: Optional[str]
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "Credential":
return cls(
credential_id=data["credentialId"],
client_id=data["clientId"],
status=data["status"],
created_at=data["createdAt"],
expires_at=data.get("expiresAt"),
revoked_at=data.get("revokedAt"),
)
@dataclass
class CredentialWithSecret(Credential):
"""Credential with plain-text secret — returned once only on create/rotate."""
client_secret: str = field(default="")
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "CredentialWithSecret":
base = Credential.from_dict(data)
return cls(
credential_id=base.credential_id,
client_id=base.client_id,
status=base.status,
created_at=base.created_at,
expires_at=base.expires_at,
revoked_at=base.revoked_at,
client_secret=data["clientSecret"],
)
@dataclass
class PaginatedCredentials:
"""Paginated list of credentials."""
data: List[Credential]
total: int
page: int
limit: int
@classmethod
def from_dict(cls, d: Dict[str, Any]) -> "PaginatedCredentials":
return cls(
data=[Credential.from_dict(c) for c in d["data"]],
total=d["total"],
page=d["page"],
limit=d["limit"],
)
# ─────────────────────────────────────────────────────────────────────────────
# OAuth 2.0 Tokens
# ─────────────────────────────────────────────────────────────────────────────
@dataclass
class TokenResponse:
"""OAuth 2.0 access token response."""
access_token: str
token_type: str
expires_in: int
scope: str
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "TokenResponse":
return cls(
access_token=data["access_token"],
token_type=data["token_type"],
expires_in=data["expires_in"],
scope=data["scope"],
)
@dataclass
class IntrospectResponse:
"""Token introspection response (RFC 7662)."""
active: bool
sub: Optional[str] = None
client_id: Optional[str] = None
scope: Optional[str] = None
token_type: Optional[str] = None
iat: Optional[int] = None
exp: Optional[int] = None
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "IntrospectResponse":
return cls(
active=data["active"],
sub=data.get("sub"),
client_id=data.get("client_id"),
scope=data.get("scope"),
token_type=data.get("token_type"),
iat=data.get("iat"),
exp=data.get("exp"),
)
# ─────────────────────────────────────────────────────────────────────────────
# Audit Log
# ─────────────────────────────────────────────────────────────────────────────
@dataclass
class AuditEvent:
"""An immutable audit event record."""
event_id: str
agent_id: str
action: AuditAction
outcome: AuditOutcome
ip_address: str
user_agent: str
metadata: Dict[str, Any]
timestamp: str
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "AuditEvent":
return cls(
event_id=data["eventId"],
agent_id=data["agentId"],
action=data["action"],
outcome=data["outcome"],
ip_address=data["ipAddress"],
user_agent=data["userAgent"],
metadata=data.get("metadata", {}),
timestamp=data["timestamp"],
)
@dataclass
class PaginatedAuditEvents:
"""Paginated list of audit events."""
data: List[AuditEvent]
total: int
page: int
limit: int
@classmethod
def from_dict(cls, d: Dict[str, Any]) -> "PaginatedAuditEvents":
return cls(
data=[AuditEvent.from_dict(e) for e in d["data"]],
total=d["total"],
page=d["page"],
limit=d["limit"],
)