Batch processing
bca.analyze_batch(paths) runs the same analysis as bca.analyze
over every path in an iterable and never raises on per-file
errors: each result slot is either an analysis dict or a
bca.AnalysisError describing the failure. The list has the same
length as the input and preserves order one-to-one, so callers
can zip(inputs, results) without losing the pairing.
def run(paths: Iterable[Path]) -> dict[str, int]:
"""Analyse ``paths`` as a batch and bucket successes vs failures.
Returns a small summary dict (`ok`, `errors`, `total`) so the
accompanying test can assert on it without re-parsing.
"""
materialised = [str(p) for p in paths]
results = bca.analyze_batch(materialised)
ok = 0
errors = 0
for path, result in zip(materialised, results, strict=True):
if isinstance(result, bca.AnalysisError):
errors += 1
print(f" skip {path}: ({result.error_kind}) {result.error}")
else:
ok += 1
sloc = result["metrics"]["loc"]["sloc"]
print(f" ok {path}: sloc = {sloc:.0f}")
return {"ok": ok, "errors": errors, "total": len(materialised)}
A few key contracts:
AnalysisErroris returned, not raised. It is not anExceptionsubclass —isinstance(slot, bca.AnalysisError)is the discriminator.- The result list is the same length as the input.
pathsis consumed lazily, so generators work — but if you want to keep the input around forzip, materialise it into a list first. analyze_batchruns with theis_generatedwalker filter off: every input position yields either adictor anAnalysisError, neverNone. Callbca.analyze(path)per-file with the defaultskip_generated=Trueif you need the CLI walker's skip behaviour.
Parallel execution
There is no built-in concurrency inside analyze_batch — it is a
sequential sweep. For parallelism, fan the per-file analyze
call out across a thread pool:
def run_parallel(paths: Iterable[Path], *, workers: int = 4) -> list[dict[str, Any] | None]:
"""Fan ``analyze`` out across a thread pool.
PyO3 releases the GIL across each file's read + parse, so a
thread pool actually parallelises the heavy work. Use this when
you need per-file exceptions instead of ``AnalysisError`` slots.
"""
def _analyze(p: Path) -> dict[str, Any] | None:
return bca.analyze(str(p))
with ThreadPoolExecutor(max_workers=workers) as pool:
return list(pool.map(_analyze, paths))
PyO3's Python::detach releases the GIL across each file's read +
tree-sitter parse, so the threads do not serialise on the
interpreter lock — this is real parallelism, not contended
co-operation.
AnalysisError taxonomy
error_kind is a closed Literal:
error_kind | Triggered by |
|---|---|
"UnsupportedLanguage" | Unknown extension + no shebang / emacs-mode hit |
"ParseError" | tree-sitter rejected the source, or a rare internal serialisation failure (internal: serialization error: …) |
"IoError" | std::fs::read failed or the path was not valid UTF-8 |
AnalysisError is frozen and implements __eq__ / __hash__ /
__repr__ over all three fields, so callers can put errors in a
set to deduplicate failures across runs. For retry
classification, the errno is preserved in the error string via
Rust's default formatting:
import re
match = re.search(r"\(os error (\d+)\)$", slot.error)
errno = int(match.group(1)) if match else None
If you need typed dispatch (FileNotFoundError,
PermissionError, …) call bca.analyze(path) per-file instead
of analyze_batch — single-file analyze raises the
canonical OSError subclass. See Error handling.