Documentation Index
Fetch the complete documentation index at: https://snowglobe.so/docs/llms.txt
Use this file to discover all available pages before exploring further.
Asynchronous Connection Template
The asynchronous connection template provides a high-performance way to connect your agent to Snowglobe for testing. This template is ideal when your application needs to handle multiple concurrent requests efficiently or uses async APIs.
When to Use
Use the asynchronous template when:
- Your application needs to handle high volumes of concurrent requests
- You’re using async LLM clients (like AsyncOpenAI)
- Your agent performs multiple I/O operations that can be parallelized
- You want optimal performance and resource utilization
- Your existing codebase is already async-based
Template Code
When you run snowglobe-connect init and select the asynchronous template, Snowglobe generates this code:
from snowglobe.client import CompletionRequest, CompletionFunctionOutputs
from openai import AsyncOpenAI
import os
client = AsyncOpenAI(api_key=os.getenv("OPENAI_API_KEY"))
async def acompletion(request: CompletionRequest) -> CompletionFunctionOutputs:
"""
Process a scenario request from Snowglobe.
This function is called by the Snowglobe client to process test requests. It should return a
CompletionFunctionOutputs object with the response content.
Args:
request (CompletionRequest): The request object containing messages for the test.
Returns:
CompletionFunctionOutputs: The response object with the generated content.
"""
# Process the request using the messages. Example using OpenAI:
messages = request.to_openai_messages(system_prompt="You are a helpful assistant.")
response = await client.chat.completions.create(
model="gpt-4o-mini",
messages=messages
)
return CompletionFunctionOutputs(response=response.choices[0].message.content)
Code Walkthrough
1. Imports and Setup
from snowglobe.client import CompletionRequest, CompletionFunctionOutputs
from openai import AsyncOpenAI
import os
client = AsyncOpenAI(api_key=os.getenv("OPENAI_API_KEY"))
- AsyncOpenAI: The asynchronous version of OpenAI’s client for non-blocking API calls
- Same Snowglobe imports: Uses the same request/response objects as the sync template
- Environment variable: Safely loads your OpenAI API key from environment
2. Main Async Function
async def acompletion(request: CompletionRequest) -> CompletionFunctionOutputs:
The acompletion function is the async entry point that Snowglobe calls. Key differences from sync:
- Function name is
acompletion (not completion)
- Decorated with
async keyword
- Can use
await for non-blocking operations
- Snowglobe automatically handles the async execution
3. Message Processing (Same as Sync)
messages = request.to_openai_messages(system_prompt="You are a helpful assistant.")
Message processing works identically to the synchronous version. The CompletionRequest object provides the same methods and data.
4. Async API Call
response = await client.chat.completions.create(
model="gpt-4o-mini",
messages=messages
)
The key difference: using await for the API call. This allows other requests to be processed concurrently while waiting for the LLM response.
return CompletionFunctionOutputs(response=response.choices[0].message.content)
Response formatting is identical to the synchronous version.
The asynchronous template provides several advantages:
- Concurrent Processing: Handle multiple requests simultaneously
- Better Resource Utilization: CPU isn’t blocked during I/O operations
- Scalability: Handle higher request volumes with the same hardware
- Reduced Latency: Other requests don’t wait when one request is slow
Advanced Customization Examples
Multiple Concurrent API Calls
import asyncio
async def acompletion(request: CompletionRequest) -> CompletionFunctionOutputs:
messages = request.to_openai_messages(system_prompt="You are a helpful assistant.")
# Make multiple API calls concurrently
tasks = []
for i in range(3): # Generate 3 responses
task = client.chat.completions.create(
model="gpt-4o-mini",
messages=messages,
temperature=0.7 + (i * 0.1) # Different temperatures
)
tasks.append(task)
# Wait for all responses
responses = await asyncio.gather(*tasks)
# Use the best response (or combine them)
best_response = max(responses, key=lambda r: len(r.choices[0].message.content))
return CompletionFunctionOutputs(response=best_response.choices[0].message.content)
Async Database Operations
import asyncpg
import asyncio
# Database connection pool (initialize once)
db_pool = None
async def init_db_pool():
global db_pool
if db_pool is None:
db_pool = await asyncpg.create_pool("postgresql://...")
async def acompletion(request: CompletionRequest) -> CompletionFunctionOutputs:
await init_db_pool()
# Get conversation history from database concurrently with LLM call
conversation_id = request.get_conversation_id()
# Start both operations concurrently
history_task = get_conversation_history(conversation_id)
llm_task = generate_response(request)
# Wait for both to complete
history, response = await asyncio.gather(history_task, llm_task)
# Store the interaction
await store_interaction(conversation_id, request, response)
return CompletionFunctionOutputs(response=response)
async def get_conversation_history(conversation_id: str):
async with db_pool.acquire() as conn:
return await conn.fetch(
"SELECT * FROM conversations WHERE id = $1",
conversation_id
)
async def generate_response(request: CompletionRequest):
messages = request.to_openai_messages(system_prompt="You are a helpful assistant.")
response = await client.chat.completions.create(
model="gpt-4o-mini",
messages=messages
)
return response.choices[0].message.content
async def store_interaction(conversation_id: str, request: CompletionRequest, response: str):
async with db_pool.acquire() as conn:
await conn.execute(
"INSERT INTO interactions (conversation_id, request, response) VALUES ($1, $2, $3)",
conversation_id, str(request.messages), response
)
Async External API Integration
import aiohttp
async def acompletion(request: CompletionRequest) -> CompletionFunctionOutputs:
conversation_id = request.get_conversation_id()
# Concurrently fetch user context and generate response
async with aiohttp.ClientSession() as session:
context_task = fetch_user_context(session, conversation_id)
response_task = generate_llm_response(request)
context, llm_response = await asyncio.gather(context_task, response_task)
# Enhance response with context
enhanced_response = f"Based on your history: {context}\n\n{llm_response}"
return CompletionFunctionOutputs(response=enhanced_response)
async def fetch_user_context(session, conversation_id):
async with session.get(f"https://api.example.com/users/{conversation_id}") as resp:
if resp.status == 200:
data = await resp.json()
return data.get("context", "No additional context")
return "Context unavailable"
async def generate_llm_response(request):
messages = request.to_openai_messages(system_prompt="You are a helpful assistant.")
response = await client.chat.completions.create(
model="gpt-4o-mini",
messages=messages
)
return response.choices[0].message.content
Error Handling with Retries
import asyncio
from typing import Optional
async def acompletion(request: CompletionRequest) -> CompletionFunctionOutputs:
max_retries = 3
for attempt in range(max_retries):
try:
messages = request.to_openai_messages(system_prompt="You are a helpful assistant.")
response = await client.chat.completions.create(
model="gpt-4o-mini",
messages=messages
)
return CompletionFunctionOutputs(response=response.choices[0].message.content)
except Exception as e:
if attempt == max_retries - 1:
# Last attempt failed
return CompletionFunctionOutputs(
response=f"Service temporarily unavailable: {str(e)}"
)
else:
# Wait before retrying (exponential backoff)
await asyncio.sleep(2 ** attempt)
continue
Testing Your Async Implementation
-
Test the connection:
-
Start the client:
-
Monitor performance: The async template will show better performance under concurrent load
| Feature | Synchronous | Asynchronous |
|---|
| Concurrent Requests | Limited | High |
| Memory Usage | Higher per request | Lower overall |
| CPU Utilization | Blocked during I/O | Efficient |
| Implementation Complexity | Simple | Moderate |
| Best For | Low-medium traffic | High traffic, complex workflows |
Common Pitfalls
1. Blocking Operations
# ❌ Don't do this - blocks the event loop
def acompletion(request: CompletionRequest) -> CompletionFunctionOutputs:
time.sleep(1) # Blocks everything!
# ✅ Do this instead
async def acompletion(request: CompletionRequest) -> CompletionFunctionOutputs:
await asyncio.sleep(1) # Non-blocking
2. Forgetting await
# ❌ This won't work - returns a coroutine object
async def acompletion(request: CompletionRequest) -> CompletionFunctionOutputs:
response = client.chat.completions.create(...) # Missing await!
# ✅ Always await async operations
async def acompletion(request: CompletionRequest) -> CompletionFunctionOutputs:
response = await client.chat.completions.create(...)
3. Mixing Sync and Async Incorrectly
# ❌ Don't mix sync client in async function
from openai import OpenAI # Sync client
client = OpenAI()
async def acompletion(request: CompletionRequest) -> CompletionFunctionOutputs:
response = client.chat.completions.create(...) # Blocks event loop
# ✅ Use async client
from openai import AsyncOpenAI
client = AsyncOpenAI()
async def acompletion(request: CompletionRequest) -> CompletionFunctionOutputs:
response = await client.chat.completions.create(...)
Next Steps