Source code for chorelib.deprunner

"""Asynchronous build runner with parallel execution support.

Executes build targets based on the dependency graph, comparing mtimes
to skip up-to-date targets. Builder functions run in a ThreadPoolExecutor
for parallelism.
"""

import asyncio
import errno
import logging
import os
import time
from concurrent.futures import ThreadPoolExecutor
from threading import get_ident, local
from typing import Any, Sequence

from .depgraph import BuildInfo, DepGraph
from .errors import RuleNotFoundError, TargetNotFoundError
from .ruledef import MTimeFunc, RuleSet
from .utils import message, to_timestamp

logger = logging.getLogger(__name__)


class Runner:
    """Asynchronous build runner that manages building targets based on dependencies.

    Coordinates the build process: resolves the dependency graph, compares
    mtimes to determine what needs rebuilding, and executes builder functions
    in parallel using a ThreadPoolExecutor.

    Attributes:
        rules: The RuleSet containing all rule, task, and mtime definitions.
        graph: The DepGraph tracking registered targets.
        build_tasks: Dict mapping target names to their asyncio Tasks.
        skipped_targets: Set of targets that were up-to-date and skipped.
        build_always: If True, rebuild all targets regardless of mtime.
    """

    def __init__(
        self, rules: RuleSet, num_workers: int | None = 0, build_always: bool = False
    ) -> None:
        self.loop = asyncio.get_event_loop()
        self.threadid = get_ident()
        self.running = False
        self.num_workers = num_workers
        self.build_always = build_always

        self.rules = rules
        self.graph = DepGraph()
        self.build_tasks: dict[str, asyncio.Task[Any]] = {}
        self.skipped_targets: set[str] = set()

        logger.debug(
            f"Initializing Runner with num_workers={self.num_workers}, "
            f"build_always={self.build_always}"
        )

        # Use a thread pool for parallel builds when multiple workers requested
        self.executor: ThreadPoolExecutor | None = None
        if self.num_workers is None or self.num_workers > 1:
            self.executor = ThreadPoolExecutor(max_workers=self.num_workers)

    async def _run_in_executor(self, func: Any, *args: Any, **kwargs: Any) -> Any:
        """Run a function in the thread pool executor, or inline if single-threaded.

        Sets up thread-local state so that ``schedule()`` can find the
        active runner from within builder functions.
        """

        def wrapper() -> Any:
            if getattr(_runner, "running", None):
                raise RuntimeError(f"Nested calls to _run_in_executor are not allowed: {func}")

            _runner.running = self
            try:
                logger.debug(f"Running {func} in executor")
                return func(*args, **kwargs)
            finally:
                _runner.running = None

        if self.executor:
            loop = asyncio.get_running_loop()
            return await loop.run_in_executor(self.executor, wrapper)
        else:
            return func(*args, **kwargs)

    async def _get_mtime(self, mtimefunc: MTimeFunc, target: str) -> Any:
        """Retrieve the mtime of a target using its mtime function.

        Raises:
            TargetNotFoundError: If the mtime function returns None
                (target does not exist).
        """
        message(f"Running mtime function {mtimefunc} for target: {target}", 2)
        ret = await self._run_in_executor(mtimefunc, target)
        ret = to_timestamp(ret)
        if ret is None:
            raise TargetNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), target)
        return ret

    async def _run_build_deps(self, node: BuildInfo, build_always: bool) -> Any | None:
        """Build dependencies of the node and return the latest timestamp among them."""
        deps = [self._ensure_target(t, build_always) for t in node.depends]
        needs = [self._ensure_target(t, False) for t in node.needs]
        dep_timestamps: list[Any] = list(await asyncio.gather(*deps))
        await asyncio.gather(*needs)
        if dep_timestamps:
            return max(dep_timestamps)
        else:
            # No dependencies
            return None

    async def _run_builder(self, node: BuildInfo) -> Any:
        """Run the builder function for the node."""
        logger.debug(f"Starting {node.run_builder} for `{node.target}`")
        ts = await self._run_in_executor(node.run_builder)
        timestamp = to_timestamp(ts)
        if timestamp is None:
            timestamp = time.time()
        return timestamp

    @staticmethod
    def default_get_file_mtime(name: str) -> Any:
        """default function to get file modification time on local filesystem."""
        return os.path.getmtime(name)

    async def _build_target(self, target: str, build_always: bool) -> Any:
        """Build a single target and return its timestamp.

        For tasks or forced rebuilds, the builder runs unconditionally.
        For rules, the target's mtime is compared against its dependencies'
        mtimes to decide whether a rebuild is needed.
        """
        message(f"Building target: {target}", 1)

        if not self.running:
            return None

        node = self.graph.get(target)

        # Tasks and forced rebuilds always execute the builder
        if node and (build_always or node.is_task()):
            await self._run_build_deps(node, build_always)
            timestamp = await self._run_builder(node)
            return timestamp

        # Check the current mtime of the target
        mtimefunc = self.rules.get_mtime_func(target)
        timestamp_or_none: Any
        try:
            timestamp_or_none = await self._get_mtime(mtimefunc, target)
        except (FileNotFoundError, TargetNotFoundError) as e:
            message(f"File not found for target '{target}': {e!s}", 2)
            if not node:
                # No builder defined for this target
                raise RuleNotFoundError(target) from e
            timestamp_or_none = None

        if node:
            # Build dependencies and compare mtimes
            dep_timestamp = await self._run_build_deps(node, build_always)
            if (timestamp_or_none is None) or (
                (dep_timestamp is not None) and (timestamp_or_none < dep_timestamp)
            ):
                # Target is missing or older than dependencies — rebuild
                message(f"Start building '{target}'", 2)
                timestamp_or_none = await self._run_builder(node)
            else:
                self.skipped_targets.add(target)
                message(f"Skipping build for target '{target}'; up to date.", 1)

        return timestamp_or_none

    def _ensure_target(self, target: str, build_always: bool) -> asyncio.Task[Any]:
        """Ensure that the target is built and return its timestamp."""
        task = self.build_tasks.get(target)
        if not task:
            task = asyncio.create_task(self._build_target(target, build_always))
            task.set_name(target)
            self.build_tasks[target] = task
        return task

    def _addtargets(self, targets: Sequence[str]) -> list[str]:
        """Add targets to the build graph."""
        added: list[str] = []
        for target in targets:
            if self.graph.addtarget(self.rules, target):
                added.append(target)

        if added:
            self.graph.detectloop()

        for target in added:
            self._ensure_target(target, self.build_always)

        return added

    async def wait_until_done(self) -> None:
        """Wait until all registered build tasks have completed."""
        seen: set[str] = set()
        try:
            while True:
                tasks = {
                    target: task for target, task in self.build_tasks.items() if target not in seen
                }
                if not tasks:
                    break
                seen.update(tasks.keys())
                await asyncio.gather(*(tasks.values()))
        except Exception:
            self.running = False
            for task in self.build_tasks.values():
                task.cancel()
            raise

    async def run(self, targets: Sequence[str]) -> None:
        """Run the build for the specified targets."""
        self.running = True
        _runner.running = self
        self.skipped_targets = set()

        # start building
        self._addtargets(targets)
        await self.wait_until_done()

        for target in targets:
            if target in self.skipped_targets:
                message(f"{target!r} is up to date.")


# Thread-local storage for tracking the active Runner instance.
# Used by schedule() to find the runner from within builder functions.
_runner = local()


[docs] def schedule(*targets: str) -> None: """Dynamically add new targets to the running build from within a builder. This function can be called from a builder function to request that additional targets be built. It is thread-safe and works from both the event loop thread and worker threads. Args: targets: A list of target names to schedule for building. Raises: RuntimeError: If no build runner is currently active. """ runner: Runner | None = getattr(_runner, "running", None) if not runner: raise RuntimeError("No build runner is running.") if runner.threadid != get_ident(): # Called from a worker thread — schedule via the event loop async def _schedule_targets() -> None: runner._addtargets(targets) fut = asyncio.run_coroutine_threadsafe(_schedule_targets(), runner.loop) fut.result() else: # Called from the event loop thread directly runner._addtargets(targets)
async def run( rules: RuleSet, targets: Sequence[str], num_workers: int | None = 0, rebuild: bool = False, ) -> None: """Run the build process for the specified targets. Args: rules: The RuleSet containing rule, task, and mtime definitions. targets: A list of target names to build. num_workers: Maximum number of parallel worker threads. 0 or 1 for single-threaded, None for unlimited. rebuild: If True, rebuild all targets regardless of mtime. """ runner = Runner(rules, num_workers=num_workers, build_always=rebuild) await runner.run(targets)