Python SDK Reference
Complete API reference for the hippocortex Python package.
Installation
pip install hippocortex
Requires Python 3.10+. Uses httpx for async HTTP.
Optional Dependencies
For framework adapters, install the optional dependencies:
# OpenAI Agents adapter
pip install hippocortex[openai]
# LangGraph adapter
pip install hippocortex[langgraph]
# CrewAI adapter
pip install hippocortex[crewai]
# AutoGen adapter
pip install hippocortex[autogen]
# OpenClaw adapter
pip install hippocortex[openclaw]
# All adapters
pip install hippocortex[all]
Integration Methods
The SDK provides three ways to add memory, from simplest to most granular:
Auto-Instrumentation (Easiest, 1 Line)
Import once at the top of your app. Every OpenAI and Anthropic SDK call automatically gets memory context injection and conversation capture.
import hippocortex.auto
# That's it. All OpenAI/Anthropic calls now have memory.
from openai import OpenAI
client = OpenAI()
response = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": "Deploy payments to staging"}]
)
# Memory context was injected, conversation was captured
How it works: On import, the module patches Completions.create (OpenAI) and
Messages.create (Anthropic). Each call synthesizes relevant context, prepends it as a
system message, calls the original method, then captures the conversation.
Configuration: Resolves credentials from environment variables (HIPPOCORTEX_API_KEY,
HIPPOCORTEX_BASE_URL) or a .hippocortex.json file. Set HIPPOCORTEX_SILENT=1 to suppress
console output.
Streaming support: Auto-instrumentation wraps streaming responses to collect chunks transparently. The stream iterator passes through unchanged to your code.
wrap() (Recommended)
Transparently wrap an OpenAI or Anthropic client instance. Explicit, per-client control.
from hippocortex import wrap
from openai import OpenAI
client = wrap(OpenAI())
# Use exactly as before. Memory is transparent.
response = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": "Deploy payments to staging"}]
)
Options:
wrap(
client,
api_key=None, # Falls back to env var or .hippocortex.json
base_url=None, # Hippocortex API URL
session_id=None, # Explicit session ID (auto-generated if omitted)
)
Works with both SDKs:
from anthropic import Anthropic
client = wrap(Anthropic())
response = client.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=1024,
messages=[{"role": "user", "content": "Deploy payments to staging"}]
)
Fault tolerance: If Hippocortex is unreachable, all calls pass through to the original client unchanged. Your application never breaks because of memory.
Manual Client (Advanced)
Full control over capture, learn, and synthesize. Best for custom agent loops or non-OpenAI/Anthropic LLMs.
from hippocortex import Hippocortex
# Async client (recommended)
hx = Hippocortex(api_key="hx_live_your_key_here")
# With options
hx = Hippocortex(
api_key="hx_live_your_key_here",
base_url="https://api.hippocortex.dev/v1", # optional
timeout=30.0, # optional, seconds
)
Context Manager
async with Hippocortex(api_key="hx_live_...") as hx:
result = await hx.capture(event)
# Client closes automatically
Cleanup
hx = Hippocortex(api_key="hx_live_...")
try:
await hx.capture(event)
finally:
await hx.close()
Zero-Config
Both auto and wrap() resolve configuration automatically using the following priority:
- Explicit options passed to
wrap()orHippocortex() - Environment variables:
HIPPOCORTEX_API_KEY,HIPPOCORTEX_BASE_URL .hippocortex.jsonfile (searched from cwd upward to filesystem root)
.hippocortex.json
{
"apiKey": "hx_live_your_key_here",
"baseUrl": "https://api.hippocortex.dev/v1"
}
Core Methods
capture(event)
Capture a single agent event.
Signature:
async def capture(self, event: CaptureEvent) -> CaptureResult
Parameters:
from hippocortex.types import CaptureEvent
event = CaptureEvent(
type="tool_call", # Required: event type
session_id="sess-42", # Required: session identifier
payload={ # Required: event data
"tool": "deploy",
"args": {"service": "api"}
},
metadata={ # Optional: additional context
"agent_id": "agent-1"
},
idempotency_key="idem-123" # Optional: deduplication key
)
Returns:
@dataclass
class CaptureResult:
event_id: str
status: str # "ingested" or "duplicate"
salience_score: float # 0.0 to 1.0 (optional)
trace_id: str # Processing trace ID (optional)
reason: str # Duplicate reason (optional)
Example:
from hippocortex import Hippocortex, CaptureEvent
hx = Hippocortex(api_key="hx_live_...")
result = await hx.capture(CaptureEvent(
type="tool_call",
session_id="sess-42",
payload={"tool": "deploy", "args": {"env": "staging"}}
))
print(f"Event {result.event_id}: {result.status}")
capture_batch(events)
Capture multiple events in a single request.
Signature:
async def capture_batch(self, events: List[CaptureEvent]) -> BatchCaptureResult
Returns:
@dataclass
class BatchCaptureResult:
results: List[CaptureResult]
summary: BatchSummary
@dataclass
class BatchSummary:
total: int
ingested: int
duplicates: int
errors: int
Example:
result = await hx.capture_batch([
CaptureEvent(type="message", session_id="s1", payload={"role": "user", "content": "Hello"}),
CaptureEvent(type="tool_call", session_id="s1", payload={"tool": "search", "args": {}}),
CaptureEvent(type="tool_result", session_id="s1", payload={"tool": "search", "result": "found"}),
])
print(f"Ingested: {result.summary.ingested}/{result.summary.total}")
learn(options?)
Trigger the Memory Compiler.
Signature:
async def learn(self, options: Optional[LearnOptions] = None) -> LearnResult
Parameters:
@dataclass
class LearnOptions:
scope: str = "incremental" # "full" or "incremental"
min_pattern_strength: float = None # 0.0 to 1.0
artifact_types: List[str] = None # ["task_schema", "failure_playbook", ...]
Returns:
@dataclass
class LearnResult:
run_id: str
status: str # "completed", "partial", "failed"
artifacts: ArtifactStats
stats: CompilationStats
@dataclass
class ArtifactStats:
created: int
updated: int
unchanged: int
by_type: Dict[str, int]
@dataclass
class CompilationStats:
memories_processed: int
patterns_found: int
compilation_ms: int
Example:
result = await hx.learn()
print(f"Created {result.artifacts.created} artifacts")
print(f"Processed {result.stats.memories_processed} memories")
# Full recompilation
result = await hx.learn(LearnOptions(
scope="full",
min_pattern_strength=0.7,
artifact_types=["task_schema", "failure_playbook"]
))
synthesize(query, options?)
Synthesize compressed context for a query.
Signature:
async def synthesize(
self, query: str, options: Optional[SynthesizeOptions] = None
) -> SynthesizeResult
Parameters:
@dataclass
class SynthesizeOptions:
max_tokens: int = 4000
sections: List[str] = None # ["procedures", "failures", ...]
min_confidence: float = 0.3
include_provenance: bool = True
Returns:
@dataclass
class SynthesizeResult:
pack_id: str
entries: List[SynthesisEntry]
budget: BudgetInfo
@dataclass
class SynthesisEntry:
section: str
content: str
confidence: float
provenance: List[ProvenanceRef] # optional
@dataclass
class BudgetInfo:
limit: int
used: int
compression_ratio: float
entries_included: int
entries_dropped: int
Example:
context = await hx.synthesize(
"deploy payment service to production",
SynthesizeOptions(
max_tokens=8000,
sections=["procedures", "failures"],
min_confidence=0.5
)
)
for entry in context.entries:
print(f"[{entry.section}] ({entry.confidence:.2f}): {entry.content}")
list_artifacts(...)
List compiled knowledge artifacts.
Signature:
async def list_artifacts(
self,
type: Optional[ArtifactType] = None,
status: Optional[ArtifactStatus] = None,
sort: Optional[str] = None,
order: Optional[str] = None,
limit: Optional[int] = None,
cursor: Optional[str] = None,
) -> ArtifactListResult
Example:
result = await hx.list_artifacts(
type="failure_playbook",
status="active",
sort="confidence",
order="desc",
limit=10
)
for a in result.artifacts:
print(f"{a.title} ({a.confidence:.2f})")
if result.pagination.has_more:
next_page = await hx.list_artifacts(cursor=result.pagination.cursor)
get_artifact(artifact_id)
Get a single artifact by ID.
Signature:
async def get_artifact(self, artifact_id: str) -> Artifact
Example:
artifact = await hx.get_artifact("art-001")
print(artifact.title)
print(artifact.content)
get_metrics(...)
Get usage and performance metrics.
Signature:
async def get_metrics(
self,
period: Optional[str] = None,
granularity: Optional[str] = None,
) -> MetricsResult
Example:
metrics = await hx.get_metrics(period="24h")
print(f"Events today: {metrics.usage.events.total}")
print(f"Quota: {metrics.quota.events_used}/{metrics.quota.events_limit}")
Error Handling
from hippocortex import Hippocortex, HippocortexError
try:
await hx.capture(event)
except HippocortexError as e:
if e.code == "rate_limited":
await asyncio.sleep(1)
# Retry
elif e.code == "validation_error":
print(f"Fix request: {e.details}")
elif e.code == "unauthorized":
print("Check API key")
else:
raise
Type Hints
All types are importable from hippocortex.types:
from hippocortex.types import (
Artifact,
ArtifactListResult,
ArtifactStatus,
ArtifactType,
BatchCaptureResult,
CaptureEvent,
CaptureResult,
LearnOptions,
LearnResult,
MetricsResult,
SynthesizeOptions,
SynthesizeResult,
)
Adapters
See the Adapters Guide for framework-specific integration examples.