Advanced Async Patterns
Advanced concurrency patterns for high-performance async Python applications.
What This Section Covers
Concurrent operations, FastAPI integration, background tasks, rate limiting with semaphores, and structured concurrency with TaskGroups.
Prerequisites
This page covers advanced patterns. For basic async client setup, see Configuration. For basic usage examples, see the Quick Start guide.
Concurrent Operations
Run multiple operations concurrently using asyncio.gather():
import asyncio
from vecu_custody.aio import AsyncCustodyClient
async def main():
async with AsyncCustodyClient.sandbox(token="your-jwt-token") as client:
vins = [
"9HGBH41JXMN999999",
"1HGCM82633A123456",
"2HGCM82633A789012"
]
# Check releasability for multiple VINs concurrently
tasks = [
client.releasability.check_vin(vin=vin)
for vin in vins
]
results = await asyncio.gather(*tasks)
for vin, result in zip(vins, results):
print(f"{vin}: {'Releasable' if result.is_releasable else 'Not releasable'}")
asyncio.run(main())
Handling Errors in Concurrent Operations
Use return_exceptions=True to handle errors without failing the entire batch:
import asyncio
from vecu_custody.aio import AsyncCustodyClient
async def main():
async with AsyncCustodyClient.sandbox(token="your-jwt-token") as client:
vins = ["9HGBH41JXMN999999", "INVALID_VIN", "1HGCM82633A123456"]
tasks = [
client.releasability.check_vin(vin=vin)
for vin in vins
]
# Gather with exception handling - won't fail on first error
results = await asyncio.gather(*tasks, return_exceptions=True)
for vin, result in zip(vins, results):
if isinstance(result, Exception):
print(f"{vin}: Error - {result}")
else:
print(f"{vin}: {'Releasable' if result.is_releasable else 'Not releasable'}")
asyncio.run(main())
Task Groups (Python 3.11+)
Use task groups for structured concurrency with automatic cancellation on failure:
import asyncio
from vecu_custody.aio import AsyncCustodyClient
async def main():
async with AsyncCustodyClient.sandbox(token="your-jwt-token") as client:
async with asyncio.TaskGroup() as tg:
# Create multiple tasks - if any fails, all are cancelled
task1 = tg.create_task(
client.releasability.check_vin(vin="9HGBH41JXMN999999")
)
task2 = tg.create_task(
client.authorizations.list(vin="9HGBH41JXMN999999", auto_paginate=False)
)
task3 = tg.create_task(
client.transfers.get_status(vin="9HGBH41JXMN999999")
)
# All tasks complete when exiting TaskGroup
print(f"Releasability: {task1.result().is_releasable}")
print(f"Total authorizations: {task2.result().total_count}")
print(f"Current custodian: {task3.result().current_custodian}")
asyncio.run(main())
FastAPI Integration
Basic Setup with Lifespan
from fastapi import FastAPI, HTTPException
from vecu_custody.aio import AsyncCustodyClient
from contextlib import asynccontextmanager
# Global client instance
client: AsyncCustodyClient | None = None
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup - initialize client
global client
client = AsyncCustodyClient.sandbox(token="your-jwt-token")
yield
# Shutdown - cleanup
await client.close()
app = FastAPI(lifespan=lifespan)
@app.get("/releasability/{vin}")
async def check_releasability(vin: str):
try:
result = await client.releasability.check_vin(vin=vin)
return {
"vin": vin,
"is_releasable": result.is_releasable,
"blockers": result.blockers
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/authorizations")
async def create_authorization(
vin: str,
origin: str,
destination: str,
person_identity_key: str
):
authorization = await client.authorizations.create(
vin=vin,
origin=origin,
destination=destination,
person_identity_key=person_identity_key,
authorized_by="api-integration",
make_model="Unknown"
)
return {"authorization_id": authorization.authorization_id}
Dependency Injection Pattern
from fastapi import FastAPI, Depends, HTTPException
from vecu_custody.aio import AsyncCustodyClient
from contextlib import asynccontextmanager
from typing import Annotated
client: AsyncCustodyClient | None = None
@asynccontextmanager
async def lifespan(app: FastAPI):
global client
client = AsyncCustodyClient.sandbox(token="your-jwt-token")
yield
await client.close()
app = FastAPI(lifespan=lifespan)
async def get_custody_client() -> AsyncCustodyClient:
if client is None:
raise HTTPException(status_code=503, detail="Client not initialized")
return client
CustodyClientDep = Annotated[AsyncCustodyClient, Depends(get_custody_client)]
@app.get("/releasability/{vin}")
async def check_releasability(vin: str, custody: CustodyClientDep):
result = await custody.releasability.check_vin(vin=vin)
return {"vin": vin, "is_releasable": result.is_releasable}
Background Tasks
Polling Pattern
Process operations in the background with a polling loop:
import asyncio
from vecu_custody.aio import AsyncCustodyClient
async def process_pending_authorizations(client: AsyncCustodyClient):
"""Background task to process pending authorizations."""
while True:
try:
page = await client.authorizations.list(
status="PENDING",
limit=50,
auto_paginate=False
)
for auth in page.authorizations:
print(f"Processing: {auth.authorization_id}")
# Process authorization...
except Exception as e:
print(f"Error in background task: {e}")
await asyncio.sleep(60) # Poll every 60 seconds
async def main():
async with AsyncCustodyClient.sandbox(token="your-jwt-token") as client:
# Start background task
task = asyncio.create_task(process_pending_authorizations(client))
# Do other work...
await asyncio.sleep(300)
# Graceful shutdown
task.cancel()
try:
await task
except asyncio.CancelledError:
print("Background task cancelled")
asyncio.run(main())
FastAPI Background Tasks
from fastapi import FastAPI, BackgroundTasks
from vecu_custody.aio import AsyncCustodyClient
app = FastAPI()
client: AsyncCustodyClient | None = None
async def process_transfer_async(vin: str, transfer_type: str):
"""Background task to process a transfer."""
transfer = await client.transfers.create(
vin=vin,
transfer_type=transfer_type,
credential_id="AUTH-12345678",
location_id="LOC-AUCTION-MANHEIM-ATLANTA"
)
print(f"Transfer completed: {transfer.transfer_id}")
@app.post("/transfers/{vin}")
async def initiate_transfer(
vin: str,
transfer_type: str,
background_tasks: BackgroundTasks
):
# Queue the transfer for background processing
background_tasks.add_task(process_transfer_async, vin, transfer_type)
return {"status": "Transfer queued", "vin": vin}
Rate Limiting with Semaphores
Control concurrency to avoid overwhelming the API:
import asyncio
from vecu_custody.aio import AsyncCustodyClient
async def main():
sem = asyncio.Semaphore(10) # Max 10 concurrent requests
async def check_with_limit(client: AsyncCustodyClient, vin: str):
async with sem:
return await client.releasability.check_vin(vin=vin)
async with AsyncCustodyClient.sandbox(token="your-jwt-token") as client:
vins = [f"VIN{i:012d}" for i in range(100)] # 100 VINs
tasks = [check_with_limit(client, vin) for vin in vins]
results = await asyncio.gather(*tasks)
print(f"Processed {len(results)} VINs")
asyncio.run(main())
Timeouts
Set timeouts for individual operations:
import asyncio
from vecu_custody.aio import AsyncCustodyClient
async def main():
async with AsyncCustodyClient.sandbox(token="your-jwt-token") as client:
try:
result = await asyncio.wait_for(
client.releasability.check_vin(vin="9HGBH41JXMN999999"),
timeout=5.0 # 5 seconds
)
print(f"Releasable: {result.is_releasable}")
except asyncio.TimeoutError:
print("Operation timed out")
asyncio.run(main())
Timeout with Fallback
import asyncio
from vecu_custody.aio import AsyncCustodyClient
from vecu_custody.exceptions import NotFoundError
async def check_with_timeout_and_fallback(
client: AsyncCustodyClient,
vin: str,
timeout: float = 5.0
):
"""Check releasability with timeout and fallback to cached data."""
try:
return await asyncio.wait_for(
client.releasability.check_vin(vin=vin),
timeout=timeout
)
except asyncio.TimeoutError:
print(f"Timeout for {vin}, trying cached data...")
try:
return await client.releasability.get_cached(vin=vin)
except NotFoundError:
return None
Async Pagination
Iterate through paginated results asynchronously:
import asyncio
from vecu_custody.aio import AsyncCustodyClient
async def main():
async with AsyncCustodyClient.sandbox(token="your-jwt-token") as client:
# Auto-paginated async iteration
count = 0
async for authorization in client.authorizations.list(
vin="9HGBH41JXMN999999",
limit=50
):
print(f"{authorization.authorization_id}: {authorization.status}")
count += 1
print(f"Total: {count} authorizations")
asyncio.run(main())
Best Practices
1. Reuse Client Instances
# Good - single client for all operations
async with AsyncCustodyClient.sandbox(token="...") as client:
for vin in vins:
await client.releasability.check_vin(vin=vin)
# Avoid - creating new client per request
for vin in vins:
async with AsyncCustodyClient.sandbox(token="...") as client:
await client.releasability.check_vin(vin=vin)
2. Use Structured Concurrency
# Good - TaskGroup ensures all tasks complete or cancel together
async with asyncio.TaskGroup() as tg:
task1 = tg.create_task(operation1())
task2 = tg.create_task(operation2())
# Acceptable - gather for independent operations
results = await asyncio.gather(operation1(), operation2())
3. Handle Cancellation Gracefully
async def long_running_task(client: AsyncCustodyClient):
try:
while True:
await process_batch(client)
await asyncio.sleep(60)
except asyncio.CancelledError:
# Cleanup on cancellation
print("Task cancelled, cleaning up...")
raise # Re-raise to propagate cancellation
4. Bound Concurrency for Batch Operations
async def process_batch(vins: list[str], max_concurrent: int = 10):
sem = asyncio.Semaphore(max_concurrent)
async def bounded_check(vin: str):
async with sem:
return await client.releasability.check_vin(vin=vin)
return await asyncio.gather(*[bounded_check(vin) for vin in vins])
Next Steps
- Configuration - Client setup and initialization
- Error Handling - Exception handling patterns
- Testing - Test async code
- API Reference - Complete API documentation