Documentation Index
Fetch the complete documentation index at: https://snowglobe.so/docs/llms.txt
Use this file to discover all available pages before exploring further.
Socket-Based Connection Template
The socket-based connection template provides a real-time way to connect your agent to Snowglobe for testing. This template is designed for applications that maintain persistent connections and require conversational state, such as real-time chat applications or streaming LLM services.
When to Use
Use the socket-based template when:
- Your application uses WebSocket or persistent connections
- You need to maintain conversation state across multiple messages
- You’re working with real-time streaming APIs (like OpenAI’s Realtime API)
- Your agent requires session persistence between requests
- You want to test conversational flows that depend on connection state
Template Code
When you run snowglobe-connect init and select the socket template, Snowglobe generates this code:
from snowglobe.client import CompletionRequest, CompletionFunctionOutputs
import logging
import websockets
import json
from openai import AsyncOpenAI
LOGGER = logging.getLogger(__name__)
socket_cache = {}
openai_client = AsyncOpenAI()
async def acompletion(request: CompletionRequest) -> CompletionFunctionOutputs:
"""
When dealing with a realtime socket, we need to create a socket for each conversation.
We store the socket in a cache and reuse it for the same conversation_id so that we can maintain the conversation context.
Swap out the websocket client for your preferred realtime client.
Args:
request (CompletionRequest): The request object containing messages for the test.
Returns:
CompletionFunctionOutputs: The response object with the generated content.
"""
conversation_id = request.get_conversation_id()
if conversation_id not in socket_cache:
socket = await websockets.connect(
"wss://api.openai.com/v1/realtime?model=gpt-4o-realtime-preview-2024-10-01&modalities=text",
additional_headers={
"Authorization": f"Bearer {openai_client.api_key}",
"OpenAI-Beta": "realtime=v1"
}
)
socket_cache[conversation_id] = socket
else:
socket = socket_cache[conversation_id]
# Send user message
messages = request.to_openai_messages()
user_message = messages[-1]["content"]
await socket.send(json.dumps({
"type": "conversation.item.create",
"session": {
"modalities": ["text"], # Only text, no audio
},
"item": {
"type": "message",
"role": "user",
"content": [{"type": "input_text", "text": user_message}]
}
}))
await socket.send(json.dumps({"type": "response.create"}))
# Get response
response_content = ""
async for message in socket:
data = json.loads(message)
if data.get("type") == "response.audio_transcript.delta":
response_content += data.get("delta", "")
elif data.get("type") == "response.done":
break
return CompletionFunctionOutputs(response=response_content)
Code Walkthrough
1. Imports and Setup
from snowglobe.client import CompletionRequest, CompletionFunctionOutputs
import logging
import websockets
import json
from openai import AsyncOpenAI
LOGGER = logging.getLogger(__name__)
socket_cache = {}
openai_client = AsyncOpenAI()
- websockets: Python library for WebSocket connections
- json: For encoding/decoding WebSocket messages
- socket_cache: Dictionary to store persistent connections per conversation
- LOGGER: For debugging WebSocket interactions
2. Conversation-Based Socket Management
conversation_id = request.get_conversation_id()
if conversation_id not in socket_cache:
# Create new socket for this conversation
socket = await websockets.connect(...)
socket_cache[conversation_id] = socket
else:
# Reuse existing socket
socket = socket_cache[conversation_id]
Key Concept: Each conversation gets its own persistent WebSocket connection. This allows:
- Maintaining conversation context across multiple messages
- Session state preservation
- More efficient resource usage for ongoing conversations
3. WebSocket Connection Setup
socket = await websockets.connect(
"wss://api.openai.com/v1/realtime?model=gpt-4o-realtime-preview-2024-10-01&modalities=text",
additional_headers={
"Authorization": f"Bearer {openai_client.api_key}",
"OpenAI-Beta": "realtime=v1"
}
)
This connects to OpenAI’s Realtime API, but you can replace this with:
- Your own WebSocket server
- Other real-time LLM services
- Custom streaming endpoints
4. Message Sending
await socket.send(json.dumps({
"type": "conversation.item.create",
"session": {
"modalities": ["text"],
},
"item": {
"type": "message",
"role": "user",
"content": [{"type": "input_text", "text": user_message}]
}
}))
await socket.send(json.dumps({"type": "response.create"}))
The template sends structured JSON messages over the WebSocket. The format depends on your specific real-time API.
5. Response Handling
response_content = ""
async for message in socket:
data = json.loads(message)
if data.get("type") == "response.audio_transcript.delta":
response_content += data.get("delta", "")
elif data.get("type") == "response.done":
break
This listens for streaming response chunks and assembles the complete response.
Customization Examples
Generic WebSocket Client
import websockets
import json
socket_cache = {}
async def acompletion(request: CompletionRequest) -> CompletionFunctionOutputs:
conversation_id = request.get_conversation_id()
# Connect to your custom WebSocket server
if conversation_id not in socket_cache:
socket = await websockets.connect("ws://your-server.com/chat")
socket_cache[conversation_id] = socket
else:
socket = socket_cache[conversation_id]
# Send message in your custom format
message = {
"conversation_id": conversation_id,
"messages": [{"role": msg.role, "content": msg.content} for msg in request.messages],
"timestamp": request.messages[-1].snowglobe_data.timestamp if request.messages else None
}
await socket.send(json.dumps(message))
# Wait for response
response_data = await socket.recv()
response = json.loads(response_data)
return CompletionFunctionOutputs(response=response.get("content", ""))
Socket Cleanup and Error Handling
import asyncio
import weakref
# Use WeakValueDictionary for automatic cleanup
socket_cache = weakref.WeakValueDictionary()
class ManagedSocket:
def __init__(self, socket, conversation_id):
self.socket = socket
self.conversation_id = conversation_id
self.last_used = asyncio.get_event_loop().time()
async def send(self, data):
self.last_used = asyncio.get_event_loop().time()
await self.socket.send(data)
async def recv(self):
self.last_used = asyncio.get_event_loop().time()
return await self.socket.recv()
async def close(self):
await self.socket.close()
async def acompletion(request: CompletionRequest) -> CompletionFunctionOutputs:
conversation_id = request.get_conversation_id()
try:
# Get or create managed socket
if conversation_id not in socket_cache:
raw_socket = await websockets.connect("ws://your-server.com/chat")
managed_socket = ManagedSocket(raw_socket, conversation_id)
socket_cache[conversation_id] = managed_socket
else:
managed_socket = socket_cache[conversation_id]
# Send message
message = json.dumps({
"conversation_id": conversation_id,
"content": request.messages[-1].content
})
await managed_socket.send(message)
# Receive response
response_data = await managed_socket.recv()
response = json.loads(response_data)
return CompletionFunctionOutputs(response=response.get("content", ""))
except websockets.ConnectionClosed:
# Remove from cache and retry once
if conversation_id in socket_cache:
del socket_cache[conversation_id]
# Recursive retry (only once)
if not hasattr(request, '_retry_attempted'):
request._retry_attempted = True
return await acompletion(request)
else:
return CompletionFunctionOutputs(response="Connection error - please try again")
except Exception as e:
LOGGER.error(f"Socket error for conversation {conversation_id}: {e}")
return CompletionFunctionOutputs(response=f"Error: {str(e)}")
Server-Sent Events (SSE) Alternative
import aiohttp
import json
sse_clients = {}
async def acompletion(request: CompletionRequest) -> CompletionFunctionOutputs:
conversation_id = request.get_conversation_id()
# Create or reuse SSE connection
if conversation_id not in sse_clients:
session = aiohttp.ClientSession()
sse_clients[conversation_id] = session
else:
session = sse_clients[conversation_id]
# Send message via POST
async with session.post('http://your-server.com/chat', json={
'conversation_id': conversation_id,
'message': request.messages[-1].content
}) as resp:
if resp.status != 200:
return CompletionFunctionOutputs(response="Server error")
# Listen to SSE stream
response_content = ""
async with session.get(f'http://your-server.com/stream/{conversation_id}') as resp:
async for line in resp.content:
if line.startswith(b'data: '):
data = json.loads(line[6:].decode())
if data.get('type') == 'content':
response_content += data.get('content', '')
elif data.get('type') == 'done':
break
return CompletionFunctionOutputs(response=response_content)
Socket Pool Management
import asyncio
from collections import defaultdict
import time
class SocketPool:
def __init__(self, max_sockets_per_conversation=3, cleanup_interval=300):
self.sockets = defaultdict(list)
self.max_sockets = max_sockets_per_conversation
self.cleanup_interval = cleanup_interval
asyncio.create_task(self._cleanup_loop())
async def get_socket(self, conversation_id: str):
"""Get an available socket or create a new one"""
available_sockets = self.sockets[conversation_id]
# Find an available socket
for socket_info in available_sockets:
if not socket_info['in_use']:
socket_info['in_use'] = True
socket_info['last_used'] = time.time()
return socket_info['socket']
# Create new socket if under limit
if len(available_sockets) < self.max_sockets:
socket = await websockets.connect("ws://your-server.com/chat")
socket_info = {
'socket': socket,
'in_use': True,
'created': time.time(),
'last_used': time.time()
}
available_sockets.append(socket_info)
return socket
# Wait for available socket
while True:
for socket_info in available_sockets:
if not socket_info['in_use']:
socket_info['in_use'] = True
socket_info['last_used'] = time.time()
return socket_info['socket']
await asyncio.sleep(0.1)
async def release_socket(self, conversation_id: str, socket):
"""Mark socket as available"""
for socket_info in self.sockets[conversation_id]:
if socket_info['socket'] == socket:
socket_info['in_use'] = False
break
async def _cleanup_loop(self):
"""Clean up old unused sockets"""
while True:
await asyncio.sleep(self.cleanup_interval)
current_time = time.time()
for conversation_id, sockets in list(self.sockets.items()):
sockets_to_remove = []
for i, socket_info in enumerate(sockets):
# Remove sockets unused for 30 minutes
if (not socket_info['in_use'] and
current_time - socket_info['last_used'] > 1800):
await socket_info['socket'].close()
sockets_to_remove.append(i)
# Remove closed sockets
for i in reversed(sockets_to_remove):
sockets.pop(i)
# Clean up empty conversations
if not sockets:
del self.sockets[conversation_id]
# Global socket pool
socket_pool = SocketPool()
async def acompletion(request: CompletionRequest) -> CompletionFunctionOutputs:
conversation_id = request.get_conversation_id()
socket = await socket_pool.get_socket(conversation_id)
try:
# Use the socket
await socket.send(json.dumps({
"conversation_id": conversation_id,
"message": request.messages[-1].content
}))
response_data = await socket.recv()
response = json.loads(response_data)
return CompletionFunctionOutputs(response=response.get("content", ""))
finally:
# Always release the socket back to the pool
await socket_pool.release_socket(conversation_id, socket)
Benefits of Socket-Based Connections
Advantages
- State Persistence: Maintain conversation context across messages
- Lower Latency: Persistent connections eliminate handshake overhead
- Real-time Communication: Support for streaming and bi-directional communication
- Resource Efficiency: Reuse connections for multiple messages
- Enhanced Features: Support for typing indicators, presence, etc.
Use Cases
- Conversational AI: Chatbots that need to remember context
- Real-time Collaboration: Multi-user applications
- Streaming Responses: Applications that stream LLM responses
- Gaming or Interactive Apps: Applications requiring low-latency communication
Testing Your Socket Implementation
-
Test the connection:
-
Monitor socket connections: Use logging to track connection lifecycle:
import logging
logging.basicConfig(level=logging.DEBUG)
LOGGER = logging.getLogger(__name__)
# Add logging to your socket operations
LOGGER.info(f"Created new socket for conversation {conversation_id}")
LOGGER.debug(f"Sending message: {message}")
LOGGER.info(f"Socket cache size: {len(socket_cache)}")
-
Start the client:
Memory Management
- Implement socket cleanup to prevent memory leaks
- Use weak references for automatic garbage collection
- Set reasonable limits on concurrent connections
Connection Limits
- Most services limit WebSocket connections per client
- Implement connection pooling for high-volume scenarios
- Monitor and log connection statistics
Error Recovery
- Handle network interruptions gracefully
- Implement exponential backoff for reconnection
- Provide fallback mechanisms for connection failures
Common Pitfalls
1. Socket Leaks
# ❌ Sockets never get cleaned up
socket_cache = {}
# ✅ Use cleanup mechanisms
import weakref
socket_cache = weakref.WeakValueDictionary()
2. Blocking Operations
# ❌ Don't use sync operations in async context
response = socket.recv() # This will block!
# ✅ Always use async operations
response = await socket.recv()
3. No Error Handling
# ❌ Network errors will crash your agent
await socket.send(message)
# ✅ Handle connection errors
try:
await socket.send(message)
except websockets.ConnectionClosed:
# Handle reconnection
del socket_cache[conversation_id]
return await acompletion(request) # Retry
Next Steps