Core Concepts
Understand the architecture, event system, and design patterns of the Estuary Python SDK.
Architecture
The EstuaryClient is the main entry point. It composes several internal modules:
┌───────────────────────────────────────────────────────────┐
│ EstuaryClient │
│ │
│ ┌─────────────────┐ ┌──────────────┐ ┌─────────────┐ │
│ │ SocketManager │ │ VoiceManager │ │ REST Clients│ │
│ │ (WebSocket) │ │ (Audio I/O) │ │ (HTTP API) │ │
│ └────────┬────────┘ └──────┬───────┘ └─────┬───────┘ │
│ │ │ │ │
└───────────┼──────────────────┼────────────────┼───────────┘
│ │ │
▼ ▼ ▼
Socket.IO v4 Microphone / REST /api/
WebSocket LiveKit WebRTC endpoints
│ │ │
└──────────┬───────┘ │
▼ │
┌────────────────┐ │
│ Estuary Server │◄───────────────┘
└────────────────┘
- SocketManager handles the WebSocket connection, authentication, reconnection logic, and event routing.
- VoiceManager streams audio to the server. Two implementations exist:
WebSocketVoiceManagerandLiveKitVoiceManager. - REST Clients provide access to HTTP APIs:
MemoryClient,PlayersClient,CharactersClient, andGenerateClient.
Async-Only Design
The SDK is built entirely on asyncio. All I/O operations are async:
import asyncio
from estuary_sdk import EstuaryClient, EstuaryConfig
async def main():
async with EstuaryClient(EstuaryConfig(...)) as client:
session = await client.connect()
await client.start_voice()
memories = await client.memory.get_memories()
asyncio.run(main())
The EstuaryClient supports the async with context manager, which automatically calls disconnect() on exit. This ensures connections and resources are cleaned up even if an exception occurs.
Event-Driven Design
The SDK uses an AsyncEventEmitter pattern. Register async callback functions for events:
async def on_response(response):
# response is a BotResponse dataclass
print(response.text)
async def on_connected(session):
# session is a SessionInfo dataclass
print(session.session_id)
client.on("bot_response", on_response)
client.on("connected", on_connected)
You can subscribe with on(), unsubscribe with off(), and use once() for one-time listeners:
# Listen until removed
client.on("bot_response", handler)
client.off("bot_response", handler)
# Listen once
async def on_first_connect(session):
print("First connection:", session.session_id)
client.once("connected", on_first_connect)
Connection Lifecycle
The client tracks its connection through five states:
┌──────────────┐
│ Disconnected │
└──────┬───────┘
│ connect()
▼
┌──────────────┐ ┌───────┐
│ Connecting │────────►│ Error │
└──────┬───────┘ └───────┘
│ authenticated ▲
▼ │
┌──────────────┐ │
│ Connected │──────────────┘
└──────┬───────┘ connection lost
│
▼
┌──────────────┐
│ Reconnecting │ (if auto_reconnect is True)
└──────────────┘
from estuary_sdk import ConnectionState
async def on_state_changed(state):
if state == ConnectionState.CONNECTING:
print("Connecting...")
elif state == ConnectionState.CONNECTED:
print("Connected!")
elif state == ConnectionState.RECONNECTING:
print("Reconnecting...")
elif state == ConnectionState.ERROR:
print("Connection error")
client.on("connection_state_changed", on_state_changed)
When auto_reconnect is enabled (the default), the client automatically attempts to reconnect on disconnect, with increasing delay between attempts up to max_reconnect_attempts (default: 5).
Error Handling
All SDK errors are instances of EstuaryError, which extends Exception with a typed code property:
from estuary_sdk import EstuaryError, ErrorCode
async def on_error(err):
if isinstance(err, EstuaryError):
if err.code == ErrorCode.AUTH_FAILED:
print("Invalid API key")
elif err.code == ErrorCode.QUOTA_EXCEEDED:
print("Usage limit reached")
elif err.code == ErrorCode.VOICE_NOT_SUPPORTED:
print("Voice not available")
else:
print("Error:", err)
client.on("error", on_error)
Methods like connect(), start_voice(), and send_text() raise EstuaryError on failure. Always wrap connect() in a try/except:
try:
await client.connect()
except EstuaryError as err:
if err.code == ErrorCode.AUTH_FAILED:
print("Invalid credentials")
Configuration
The EstuaryConfig dataclass controls all client behavior. Only server_url, api_key, character_id, and player_id are required:
from estuary_sdk import EstuaryConfig
config = EstuaryConfig(
# Required
server_url="https://api.estuary-ai.com",
api_key="est_your_api_key",
character_id="your-character-uuid",
player_id="user-123",
# Optional
voice_transport="websocket", # "websocket" | "livekit" | "auto"
audio_sample_rate=16000, # Hz
auto_reconnect=True,
max_reconnect_attempts=5,
reconnect_delay=1.0, # seconds
debug=False,
realtime_memory=False, # Enable real-time memory extraction events
)
See the Configuration Reference for details on every option.
Next Steps
- Text Chat -- Send messages and handle streaming responses
- Voice (WebSocket) -- Add voice input and output
- API Reference -- Full reference for all classes, events, and types