Advanced Async Patterns

Advanced concurrency patterns for high-performance async Python applications.

What This Section Covers

Concurrent operations, FastAPI integration, background tasks, rate limiting with semaphores, and structured concurrency with TaskGroups.

Prerequisites

This page covers advanced patterns. For basic async client setup, see Configuration. For basic usage examples, see the Quick Start guide.

Concurrent Operations

Run multiple operations concurrently using asyncio.gather():

import asyncio
from vecu_custody.aio import AsyncCustodyClient

async def main():
    async with AsyncCustodyClient.sandbox(token="your-jwt-token") as client:
        vins = [
            "9HGBH41JXMN999999",
            "1HGCM82633A123456",
            "2HGCM82633A789012"
        ]

        # Check releasability for multiple VINs concurrently
        tasks = [
            client.releasability.check_vin(vin=vin)
            for vin in vins
        ]

        results = await asyncio.gather(*tasks)

        for vin, result in zip(vins, results):
            print(f"{vin}: {'Releasable' if result.is_releasable else 'Not releasable'}")

asyncio.run(main())

Handling Errors in Concurrent Operations

Use return_exceptions=True to handle errors without failing the entire batch:

import asyncio
from vecu_custody.aio import AsyncCustodyClient

async def main():
    async with AsyncCustodyClient.sandbox(token="your-jwt-token") as client:
        vins = ["9HGBH41JXMN999999", "INVALID_VIN", "1HGCM82633A123456"]

        tasks = [
            client.releasability.check_vin(vin=vin)
            for vin in vins
        ]

        # Gather with exception handling - won't fail on first error
        results = await asyncio.gather(*tasks, return_exceptions=True)

        for vin, result in zip(vins, results):
            if isinstance(result, Exception):
                print(f"{vin}: Error - {result}")
            else:
                print(f"{vin}: {'Releasable' if result.is_releasable else 'Not releasable'}")

asyncio.run(main())

Task Groups (Python 3.11+)

Use task groups for structured concurrency with automatic cancellation on failure:

import asyncio
from vecu_custody.aio import AsyncCustodyClient

async def main():
    async with AsyncCustodyClient.sandbox(token="your-jwt-token") as client:
        async with asyncio.TaskGroup() as tg:
            # Create multiple tasks - if any fails, all are cancelled
            task1 = tg.create_task(
                client.releasability.check_vin(vin="9HGBH41JXMN999999")
            )
            task2 = tg.create_task(
                client.authorizations.list(vin="9HGBH41JXMN999999", auto_paginate=False)
            )
            task3 = tg.create_task(
                client.transfers.get_status(vin="9HGBH41JXMN999999")
            )

        # All tasks complete when exiting TaskGroup
        print(f"Releasability: {task1.result().is_releasable}")
        print(f"Total authorizations: {task2.result().total_count}")
        print(f"Current custodian: {task3.result().current_custodian}")

asyncio.run(main())

FastAPI Integration

Basic Setup with Lifespan

from fastapi import FastAPI, HTTPException
from vecu_custody.aio import AsyncCustodyClient
from contextlib import asynccontextmanager

# Global client instance
client: AsyncCustodyClient | None = None

@asynccontextmanager
async def lifespan(app: FastAPI):
    # Startup - initialize client
    global client
    client = AsyncCustodyClient.sandbox(token="your-jwt-token")
    yield
    # Shutdown - cleanup
    await client.close()

app = FastAPI(lifespan=lifespan)

@app.get("/releasability/{vin}")
async def check_releasability(vin: str):
    try:
        result = await client.releasability.check_vin(vin=vin)
        return {
            "vin": vin,
            "is_releasable": result.is_releasable,
            "blockers": result.blockers
        }
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

@app.post("/authorizations")
async def create_authorization(
    vin: str,
    origin: str,
    destination: str,
    person_identity_key: str
):
    authorization = await client.authorizations.create(
        vin=vin,
        origin=origin,
        destination=destination,
        person_identity_key=person_identity_key,
        authorized_by="api-integration",
        make_model="Unknown"
    )
    return {"authorization_id": authorization.authorization_id}

Dependency Injection Pattern

from fastapi import FastAPI, Depends, HTTPException
from vecu_custody.aio import AsyncCustodyClient
from contextlib import asynccontextmanager
from typing import Annotated

client: AsyncCustodyClient | None = None

@asynccontextmanager
async def lifespan(app: FastAPI):
    global client
    client = AsyncCustodyClient.sandbox(token="your-jwt-token")
    yield
    await client.close()

app = FastAPI(lifespan=lifespan)

async def get_custody_client() -> AsyncCustodyClient:
    if client is None:
        raise HTTPException(status_code=503, detail="Client not initialized")
    return client

CustodyClientDep = Annotated[AsyncCustodyClient, Depends(get_custody_client)]

@app.get("/releasability/{vin}")
async def check_releasability(vin: str, custody: CustodyClientDep):
    result = await custody.releasability.check_vin(vin=vin)
    return {"vin": vin, "is_releasable": result.is_releasable}

Background Tasks

Polling Pattern

Process operations in the background with a polling loop:

import asyncio
from vecu_custody.aio import AsyncCustodyClient

async def process_pending_authorizations(client: AsyncCustodyClient):
    """Background task to process pending authorizations."""
    while True:
        try:
            page = await client.authorizations.list(
                status="PENDING",
                limit=50,
                auto_paginate=False
            )

            for auth in page.authorizations:
                print(f"Processing: {auth.authorization_id}")
                # Process authorization...

        except Exception as e:
            print(f"Error in background task: {e}")

        await asyncio.sleep(60)  # Poll every 60 seconds

async def main():
    async with AsyncCustodyClient.sandbox(token="your-jwt-token") as client:
        # Start background task
        task = asyncio.create_task(process_pending_authorizations(client))

        # Do other work...
        await asyncio.sleep(300)

        # Graceful shutdown
        task.cancel()
        try:
            await task
        except asyncio.CancelledError:
            print("Background task cancelled")

asyncio.run(main())

FastAPI Background Tasks

from fastapi import FastAPI, BackgroundTasks
from vecu_custody.aio import AsyncCustodyClient

app = FastAPI()
client: AsyncCustodyClient | None = None

async def process_transfer_async(vin: str, transfer_type: str):
    """Background task to process a transfer."""
    transfer = await client.transfers.create(
        vin=vin,
        transfer_type=transfer_type,
        credential_id="AUTH-12345678",
        location_id="LOC-AUCTION-MANHEIM-ATLANTA"
    )
    print(f"Transfer completed: {transfer.transfer_id}")

@app.post("/transfers/{vin}")
async def initiate_transfer(
    vin: str,
    transfer_type: str,
    background_tasks: BackgroundTasks
):
    # Queue the transfer for background processing
    background_tasks.add_task(process_transfer_async, vin, transfer_type)
    return {"status": "Transfer queued", "vin": vin}

Rate Limiting with Semaphores

Control concurrency to avoid overwhelming the API:

import asyncio
from vecu_custody.aio import AsyncCustodyClient

async def main():
    sem = asyncio.Semaphore(10)  # Max 10 concurrent requests

    async def check_with_limit(client: AsyncCustodyClient, vin: str):
        async with sem:
            return await client.releasability.check_vin(vin=vin)

    async with AsyncCustodyClient.sandbox(token="your-jwt-token") as client:
        vins = [f"VIN{i:012d}" for i in range(100)]  # 100 VINs

        tasks = [check_with_limit(client, vin) for vin in vins]
        results = await asyncio.gather(*tasks)

        print(f"Processed {len(results)} VINs")

asyncio.run(main())

Timeouts

Set timeouts for individual operations:

import asyncio
from vecu_custody.aio import AsyncCustodyClient

async def main():
    async with AsyncCustodyClient.sandbox(token="your-jwt-token") as client:
        try:
            result = await asyncio.wait_for(
                client.releasability.check_vin(vin="9HGBH41JXMN999999"),
                timeout=5.0  # 5 seconds
            )
            print(f"Releasable: {result.is_releasable}")
        except asyncio.TimeoutError:
            print("Operation timed out")

asyncio.run(main())

Timeout with Fallback

import asyncio
from vecu_custody.aio import AsyncCustodyClient
from vecu_custody.exceptions import NotFoundError

async def check_with_timeout_and_fallback(
    client: AsyncCustodyClient,
    vin: str,
    timeout: float = 5.0
):
    """Check releasability with timeout and fallback to cached data."""
    try:
        return await asyncio.wait_for(
            client.releasability.check_vin(vin=vin),
            timeout=timeout
        )
    except asyncio.TimeoutError:
        print(f"Timeout for {vin}, trying cached data...")
        try:
            return await client.releasability.get_cached(vin=vin)
        except NotFoundError:
            return None

Async Pagination

Iterate through paginated results asynchronously:

import asyncio
from vecu_custody.aio import AsyncCustodyClient

async def main():
    async with AsyncCustodyClient.sandbox(token="your-jwt-token") as client:
        # Auto-paginated async iteration
        count = 0
        async for authorization in client.authorizations.list(
            vin="9HGBH41JXMN999999",
            limit=50
        ):
            print(f"{authorization.authorization_id}: {authorization.status}")
            count += 1

        print(f"Total: {count} authorizations")

asyncio.run(main())

Best Practices

1. Reuse Client Instances

# Good - single client for all operations
async with AsyncCustodyClient.sandbox(token="...") as client:
    for vin in vins:
        await client.releasability.check_vin(vin=vin)

# Avoid - creating new client per request
for vin in vins:
    async with AsyncCustodyClient.sandbox(token="...") as client:
        await client.releasability.check_vin(vin=vin)

2. Use Structured Concurrency

# Good - TaskGroup ensures all tasks complete or cancel together
async with asyncio.TaskGroup() as tg:
    task1 = tg.create_task(operation1())
    task2 = tg.create_task(operation2())

# Acceptable - gather for independent operations
results = await asyncio.gather(operation1(), operation2())

3. Handle Cancellation Gracefully

async def long_running_task(client: AsyncCustodyClient):
    try:
        while True:
            await process_batch(client)
            await asyncio.sleep(60)
    except asyncio.CancelledError:
        # Cleanup on cancellation
        print("Task cancelled, cleaning up...")
        raise  # Re-raise to propagate cancellation

4. Bound Concurrency for Batch Operations

async def process_batch(vins: list[str], max_concurrent: int = 10):
    sem = asyncio.Semaphore(max_concurrent)

    async def bounded_check(vin: str):
        async with sem:
            return await client.releasability.check_vin(vin=vin)

    return await asyncio.gather(*[bounded_check(vin) for vin in vins])

Next Steps