Python SDK Reference

Complete API reference for the hippocortex Python package.


Installation

pip install hippocortex

Requires Python 3.10+. Uses httpx for async HTTP.

Optional Dependencies

For framework adapters, install the optional dependencies:

# OpenAI Agents adapter
pip install hippocortex[openai]

# LangGraph adapter
pip install hippocortex[langgraph]

# CrewAI adapter
pip install hippocortex[crewai]

# AutoGen adapter
pip install hippocortex[autogen]

# OpenClaw adapter
pip install hippocortex[openclaw]

# All adapters
pip install hippocortex[all]

Integration Methods

The SDK provides three ways to add memory, from simplest to most granular:

Auto-Instrumentation (Easiest, 1 Line)

Import once at the top of your app. Every OpenAI and Anthropic SDK call automatically gets memory context injection and conversation capture.

import hippocortex.auto

# That's it. All OpenAI/Anthropic calls now have memory.
from openai import OpenAI
client = OpenAI()

response = client.chat.completions.create(
    model="gpt-4o",
    messages=[{"role": "user", "content": "Deploy payments to staging"}]
)
# Memory context was injected, conversation was captured

How it works: On import, the module patches Completions.create (OpenAI) and Messages.create (Anthropic). Each call synthesizes relevant context, prepends it as a system message, calls the original method, then captures the conversation.

Configuration: Resolves credentials from environment variables (HIPPOCORTEX_API_KEY, HIPPOCORTEX_BASE_URL) or a .hippocortex.json file. Set HIPPOCORTEX_SILENT=1 to suppress console output.

Streaming support: Auto-instrumentation wraps streaming responses to collect chunks transparently. The stream iterator passes through unchanged to your code.

wrap() (Recommended)

Transparently wrap an OpenAI or Anthropic client instance. Explicit, per-client control.

from hippocortex import wrap
from openai import OpenAI

client = wrap(OpenAI())

# Use exactly as before. Memory is transparent.
response = client.chat.completions.create(
    model="gpt-4o",
    messages=[{"role": "user", "content": "Deploy payments to staging"}]
)

Options:

wrap(
    client,
    api_key=None,      # Falls back to env var or .hippocortex.json
    base_url=None,     # Hippocortex API URL
    session_id=None,   # Explicit session ID (auto-generated if omitted)
)

Works with both SDKs:

from anthropic import Anthropic
client = wrap(Anthropic())

response = client.messages.create(
    model="claude-sonnet-4-20250514",
    max_tokens=1024,
    messages=[{"role": "user", "content": "Deploy payments to staging"}]
)

Fault tolerance: If Hippocortex is unreachable, all calls pass through to the original client unchanged. Your application never breaks because of memory.

Manual Client (Advanced)

Full control over capture, learn, and synthesize. Best for custom agent loops or non-OpenAI/Anthropic LLMs.

from hippocortex import Hippocortex

# Async client (recommended)
hx = Hippocortex(api_key="hx_live_your_key_here")

# With options
hx = Hippocortex(
    api_key="hx_live_your_key_here",
    base_url="https://api.hippocortex.dev/v1",  # optional
    timeout=30.0,                                  # optional, seconds
)

Context Manager

async with Hippocortex(api_key="hx_live_...") as hx:
    result = await hx.capture(event)
    # Client closes automatically

Cleanup

hx = Hippocortex(api_key="hx_live_...")
try:
    await hx.capture(event)
finally:
    await hx.close()

Zero-Config

Both auto and wrap() resolve configuration automatically using the following priority:

  1. Explicit options passed to wrap() or Hippocortex()
  2. Environment variables: HIPPOCORTEX_API_KEY, HIPPOCORTEX_BASE_URL
  3. .hippocortex.json file (searched from cwd upward to filesystem root)

.hippocortex.json

{
  "apiKey": "hx_live_your_key_here",
  "baseUrl": "https://api.hippocortex.dev/v1"
}

Core Methods

capture(event)

Capture a single agent event.

Signature:

async def capture(self, event: CaptureEvent) -> CaptureResult

Parameters:

from hippocortex.types import CaptureEvent

event = CaptureEvent(
    type="tool_call",          # Required: event type
    session_id="sess-42",      # Required: session identifier
    payload={                  # Required: event data
        "tool": "deploy",
        "args": {"service": "api"}
    },
    metadata={                 # Optional: additional context
        "agent_id": "agent-1"
    },
    idempotency_key="idem-123" # Optional: deduplication key
)

Returns:

@dataclass
class CaptureResult:
    event_id: str
    status: str           # "ingested" or "duplicate"
    salience_score: float  # 0.0 to 1.0 (optional)
    trace_id: str          # Processing trace ID (optional)
    reason: str            # Duplicate reason (optional)

Example:

from hippocortex import Hippocortex, CaptureEvent

hx = Hippocortex(api_key="hx_live_...")

result = await hx.capture(CaptureEvent(
    type="tool_call",
    session_id="sess-42",
    payload={"tool": "deploy", "args": {"env": "staging"}}
))
print(f"Event {result.event_id}: {result.status}")

capture_batch(events)

Capture multiple events in a single request.

Signature:

async def capture_batch(self, events: List[CaptureEvent]) -> BatchCaptureResult

Returns:

@dataclass
class BatchCaptureResult:
    results: List[CaptureResult]
    summary: BatchSummary

@dataclass
class BatchSummary:
    total: int
    ingested: int
    duplicates: int
    errors: int

Example:

result = await hx.capture_batch([
    CaptureEvent(type="message", session_id="s1", payload={"role": "user", "content": "Hello"}),
    CaptureEvent(type="tool_call", session_id="s1", payload={"tool": "search", "args": {}}),
    CaptureEvent(type="tool_result", session_id="s1", payload={"tool": "search", "result": "found"}),
])
print(f"Ingested: {result.summary.ingested}/{result.summary.total}")

learn(options?)

Trigger the Memory Compiler.

Signature:

async def learn(self, options: Optional[LearnOptions] = None) -> LearnResult

Parameters:

@dataclass
class LearnOptions:
    scope: str = "incremental"         # "full" or "incremental"
    min_pattern_strength: float = None # 0.0 to 1.0
    artifact_types: List[str] = None   # ["task_schema", "failure_playbook", ...]

Returns:

@dataclass
class LearnResult:
    run_id: str
    status: str          # "completed", "partial", "failed"
    artifacts: ArtifactStats
    stats: CompilationStats

@dataclass
class ArtifactStats:
    created: int
    updated: int
    unchanged: int
    by_type: Dict[str, int]

@dataclass
class CompilationStats:
    memories_processed: int
    patterns_found: int
    compilation_ms: int

Example:

result = await hx.learn()
print(f"Created {result.artifacts.created} artifacts")
print(f"Processed {result.stats.memories_processed} memories")

# Full recompilation
result = await hx.learn(LearnOptions(
    scope="full",
    min_pattern_strength=0.7,
    artifact_types=["task_schema", "failure_playbook"]
))

synthesize(query, options?)

Synthesize compressed context for a query.

Signature:

async def synthesize(
    self, query: str, options: Optional[SynthesizeOptions] = None
) -> SynthesizeResult

Parameters:

@dataclass
class SynthesizeOptions:
    max_tokens: int = 4000
    sections: List[str] = None    # ["procedures", "failures", ...]
    min_confidence: float = 0.3
    include_provenance: bool = True

Returns:

@dataclass
class SynthesizeResult:
    pack_id: str
    entries: List[SynthesisEntry]
    budget: BudgetInfo

@dataclass
class SynthesisEntry:
    section: str
    content: str
    confidence: float
    provenance: List[ProvenanceRef]  # optional

@dataclass
class BudgetInfo:
    limit: int
    used: int
    compression_ratio: float
    entries_included: int
    entries_dropped: int

Example:

context = await hx.synthesize(
    "deploy payment service to production",
    SynthesizeOptions(
        max_tokens=8000,
        sections=["procedures", "failures"],
        min_confidence=0.5
    )
)

for entry in context.entries:
    print(f"[{entry.section}] ({entry.confidence:.2f}): {entry.content}")

list_artifacts(...)

List compiled knowledge artifacts.

Signature:

async def list_artifacts(
    self,
    type: Optional[ArtifactType] = None,
    status: Optional[ArtifactStatus] = None,
    sort: Optional[str] = None,
    order: Optional[str] = None,
    limit: Optional[int] = None,
    cursor: Optional[str] = None,
) -> ArtifactListResult

Example:

result = await hx.list_artifacts(
    type="failure_playbook",
    status="active",
    sort="confidence",
    order="desc",
    limit=10
)

for a in result.artifacts:
    print(f"{a.title} ({a.confidence:.2f})")

if result.pagination.has_more:
    next_page = await hx.list_artifacts(cursor=result.pagination.cursor)

get_artifact(artifact_id)

Get a single artifact by ID.

Signature:

async def get_artifact(self, artifact_id: str) -> Artifact

Example:

artifact = await hx.get_artifact("art-001")
print(artifact.title)
print(artifact.content)

get_metrics(...)

Get usage and performance metrics.

Signature:

async def get_metrics(
    self,
    period: Optional[str] = None,
    granularity: Optional[str] = None,
) -> MetricsResult

Example:

metrics = await hx.get_metrics(period="24h")
print(f"Events today: {metrics.usage.events.total}")
print(f"Quota: {metrics.quota.events_used}/{metrics.quota.events_limit}")

Error Handling

from hippocortex import Hippocortex, HippocortexError

try:
    await hx.capture(event)
except HippocortexError as e:
    if e.code == "rate_limited":
        await asyncio.sleep(1)
        # Retry
    elif e.code == "validation_error":
        print(f"Fix request: {e.details}")
    elif e.code == "unauthorized":
        print("Check API key")
    else:
        raise

Type Hints

All types are importable from hippocortex.types:

from hippocortex.types import (
    Artifact,
    ArtifactListResult,
    ArtifactStatus,
    ArtifactType,
    BatchCaptureResult,
    CaptureEvent,
    CaptureResult,
    LearnOptions,
    LearnResult,
    MetricsResult,
    SynthesizeOptions,
    SynthesizeResult,
)

Adapters

See the Adapters Guide for framework-specific integration examples.