Skip to main content

Core Concepts

Understand the architecture, event system, and design patterns of the Estuary Python SDK.

Architecture

The EstuaryClient is the main entry point. It composes several internal modules:

┌───────────────────────────────────────────────────────────┐
│ EstuaryClient │
│ │
│ ┌─────────────────┐ ┌──────────────┐ ┌─────────────┐ │
│ │ SocketManager │ │ VoiceManager │ │ REST Clients│ │
│ │ (WebSocket) │ │ (Audio I/O) │ │ (HTTP API) │ │
│ └────────┬────────┘ └──────┬───────┘ └─────┬───────┘ │
│ │ │ │ │
└───────────┼──────────────────┼────────────────┼───────────┘
│ │ │
▼ ▼ ▼
Socket.IO v4 Microphone / REST /api/
WebSocket LiveKit WebRTC endpoints
│ │ │
└──────────┬───────┘ │
▼ │
┌────────────────┐ │
│ Estuary Server │◄───────────────┘
└────────────────┘
  • SocketManager handles the WebSocket connection, authentication, reconnection logic, and event routing.
  • VoiceManager streams audio to the server. Two implementations exist: WebSocketVoiceManager and LiveKitVoiceManager.
  • REST Clients provide access to HTTP APIs: MemoryClient, PlayersClient, CharactersClient, and GenerateClient.

Async-Only Design

The SDK is built entirely on asyncio. All I/O operations are async:

import asyncio
from estuary_sdk import EstuaryClient, EstuaryConfig

async def main():
async with EstuaryClient(EstuaryConfig(...)) as client:
session = await client.connect()
await client.start_voice()
memories = await client.memory.get_memories()

asyncio.run(main())

The EstuaryClient supports the async with context manager, which automatically calls disconnect() on exit. This ensures connections and resources are cleaned up even if an exception occurs.

Event-Driven Design

The SDK uses an AsyncEventEmitter pattern. Register async callback functions for events:

async def on_response(response):
# response is a BotResponse dataclass
print(response.text)

async def on_connected(session):
# session is a SessionInfo dataclass
print(session.session_id)

client.on("bot_response", on_response)
client.on("connected", on_connected)

You can subscribe with on(), unsubscribe with off(), and use once() for one-time listeners:

# Listen until removed
client.on("bot_response", handler)
client.off("bot_response", handler)

# Listen once
async def on_first_connect(session):
print("First connection:", session.session_id)

client.once("connected", on_first_connect)

Connection Lifecycle

The client tracks its connection through five states:

┌──────────────┐
│ Disconnected │
└──────┬───────┘
│ connect()

┌──────────────┐ ┌───────┐
│ Connecting │────────►│ Error │
└──────┬───────┘ └───────┘
│ authenticated ▲
▼ │
┌──────────────┐ │
│ Connected │──────────────┘
└──────┬───────┘ connection lost


┌──────────────┐
│ Reconnecting │ (if auto_reconnect is True)
└──────────────┘
from estuary_sdk import ConnectionState

async def on_state_changed(state):
if state == ConnectionState.CONNECTING:
print("Connecting...")
elif state == ConnectionState.CONNECTED:
print("Connected!")
elif state == ConnectionState.RECONNECTING:
print("Reconnecting...")
elif state == ConnectionState.ERROR:
print("Connection error")

client.on("connection_state_changed", on_state_changed)

When auto_reconnect is enabled (the default), the client automatically attempts to reconnect on disconnect, with increasing delay between attempts up to max_reconnect_attempts (default: 5).

Error Handling

All SDK errors are instances of EstuaryError, which extends Exception with a typed code property:

from estuary_sdk import EstuaryError, ErrorCode

async def on_error(err):
if isinstance(err, EstuaryError):
if err.code == ErrorCode.AUTH_FAILED:
print("Invalid API key")
elif err.code == ErrorCode.QUOTA_EXCEEDED:
print("Usage limit reached")
elif err.code == ErrorCode.VOICE_NOT_SUPPORTED:
print("Voice not available")
else:
print("Error:", err)

client.on("error", on_error)

Methods like connect(), start_voice(), and send_text() raise EstuaryError on failure. Always wrap connect() in a try/except:

try:
await client.connect()
except EstuaryError as err:
if err.code == ErrorCode.AUTH_FAILED:
print("Invalid credentials")

Configuration

The EstuaryConfig dataclass controls all client behavior. Only server_url, api_key, character_id, and player_id are required:

from estuary_sdk import EstuaryConfig

config = EstuaryConfig(
# Required
server_url="https://api.estuary-ai.com",
api_key="est_your_api_key",
character_id="your-character-uuid",
player_id="user-123",

# Optional
voice_transport="websocket", # "websocket" | "livekit" | "auto"
audio_sample_rate=16000, # Hz
auto_reconnect=True,
max_reconnect_attempts=5,
reconnect_delay=1.0, # seconds
debug=False,
realtime_memory=False, # Enable real-time memory extraction events
)

See the Configuration Reference for details on every option.

Next Steps