Async patterns

bca.analyze is CPU-bound: the work is a tree-sitter parse plus the metric passes, both of which release the GIL on the Rust side via PyO3's Python::detach. The canonical async pattern is therefore asyncio.to_thread:

async def analyze_async(path: Path) -> dict[str, Any] | None:
    """Run ``bca.analyze(path)`` on the default thread executor."""
    return await asyncio.to_thread(bca.analyze, str(path))


async def analyze_all(
    paths: Iterable[Path],
) -> list[dict[str, Any] | BaseException | None]:
    """Fan ``analyze_async`` out across ``paths`` with ``asyncio.gather``.

    ``return_exceptions=True`` matters here: ``bca.analyze`` runs
    inside ``asyncio.to_thread`` and Python threads cannot be
    cancelled. If one call raises and gather re-raises with
    ``return_exceptions=False``, the surviving threads keep running
    in the default executor, producing results that are silently
    discarded. With ``return_exceptions=True`` every thread's
    result (success OR exception) lands in the returned list so
    the caller can dispatch per-file.
    """
    return await asyncio.gather(
        *(analyze_async(p) for p in paths),
        return_exceptions=True,
    )

Why to_thread, not native async

bca.analyze is a synchronous Python function backed by synchronous Rust code — there is no await boundary inside it. Wrapping it in asyncio.to_thread:

  1. Schedules the call on the default thread pool.
  2. Lets other coroutines progress while the parse + metric pass runs.
  3. Returns the result back to the calling coroutine when done.

Because the Rust side releases the GIL across the heavy work, several to_thread(bca.analyze, ...) calls genuinely run in parallel — this is not co-operative I/O multiplexing, it is real multi-core utilisation gated on the thread pool's size.

Custom executors

For a tighter cap on the worker count, hand to_thread a purpose-built executor:

import asyncio
from concurrent.futures import ThreadPoolExecutor

import big_code_analysis as bca

async def analyze_many(paths: list[str]) -> list[object]:
    loop = asyncio.get_running_loop()
    with ThreadPoolExecutor(max_workers=8) as pool:
        return await asyncio.gather(
            *(loop.run_in_executor(pool, bca.analyze, p) for p in paths)
        )

Eight workers on an 8-core machine is the comfortable upper bound for purely CPU-bound work; raising it further oversubscribes the machine and trades throughput for context-switch overhead.

Streaming results

asyncio.as_completed lets you start consuming results as soon as the first analysis finishes — useful when the per-file work varies wildly in cost (a 5 KB file vs a 500 KB generated bundle):

import asyncio
import big_code_analysis as bca

async def first_failure(paths: list[str]) -> str | None:
    """Return the path of the first file with cyclomatic > 50."""
    tasks = [asyncio.create_task(asyncio.to_thread(bca.analyze, p)) for p in paths]
    try:
        for coro in asyncio.as_completed(tasks):
            result = await coro
            if result is None:
                continue
            if result["metrics"]["cyclomatic"]["sum"] > 50:
                return result["name"]
    finally:
        for t in tasks:
            t.cancel()
    return None

The finally-block cancellation matters: as_completed does not auto-cancel pending tasks when the caller returns early, so a leaked task can keep running on the thread pool well after the async function returns.

Anti-pattern: calling bca.analyze directly in a coroutine

# Don't do this.
async def bad(path: str) -> dict | None:
    return bca.analyze(path)  # blocks the event loop on every call

async def does not make the body asynchronous. Without to_thread or an explicit executor, every coroutine that calls bca.analyze stalls the event loop for the full duration of the parse — other tasks waiting on I/O, timers, or queues all freeze until the parse returns. The to_thread wrapper is one line and makes the difference between a responsive server and a single-threaded one.

When analyze_batch is the better fit

If you are processing a static, finite list of paths and do not need streaming results, bca.analyze_batch is simpler than gather(*to_thread(...)): it runs sequentially on the calling thread but never raises on per-file errors. Wrap the whole analyze_batch call in asyncio.to_thread to keep the event loop responsive:

import asyncio
import big_code_analysis as bca

async def batch(paths: list[str]) -> list[object]:
    return await asyncio.to_thread(bca.analyze_batch, paths)

This trades the per-file parallelism of gather for the simpler error model of analyze_batch. Pick gather when you want both parallelism and typed OSError dispatch; pick to_thread(analyze_batch, paths) when you want one async call and the never-raise contract.