Brei

a minimal workflow system

 
  • GitHub Org’s stars
  • Python package
  • PyPI - Version
  • Entangled badge

Implementation

This architecture documentation is very much a work in progress.

brei --version
Brei 0.2.3, Copyright (c) 2023 Netherlands eScience Center.
Licensed under the Apache License, Version 2.0.

Test coverage

Name Stmts Miss Cover
brei/__init__.py 7 0 100%
brei/async_timer.py 12 0 100%
brei/cli.py 87 55 37%
brei/construct.py 86 16 81%
brei/errors.py 28 5 82%
brei/lazy.py 102 11 89%
brei/logging.py 13 6 54%
brei/program.py 116 16 86%
brei/result.py 28 2 93%
brei/runner.py 6 0 100%
brei/task.py 260 15 94%
brei/template_strings.py 38 0 100%
brei/utility.py 21 0 100%
brei/version.py 2 0 100%
TOTAL 806 126 84%

Python module

file:brei/version.py
from importlib import metadata


__version__ = metadata.version("brei")
file:brei/__init__.py
"""
Welcome to Brei's API documentation. There are two ways to use Brei: from the
command-line (in which case we refer to the homepage for documentation), or
straight from Python. The easiest function to work with is `brei()`, which
links to the command-line app one-to-one.

## Program
If you want to read the `Program` yourself, there are several ways to do so:

1. Use `Program.read()`. You give it a `Path` to a TOML or JSON file and a
section, this last bit giving a object path into the data. For instance:
`Program.read(Path("pyproject.toml"), "tool.brei")`.
2. Read your own data format into JSON compatible data, then
`construct(Program, data)`. The `construct` function uses the type annotations
in dataclasses to validate the input data.

After reading the data, you'll want to resolve all tasks, i.e. perform includes
and run any necessary task to resolve the targets of all other tasks.

    program = Program.read(Path("brei.toml"))
    db: TaskDB = await resolve_tasks(program)
    await db.run(Phony("all"))

There are three kinds of targets: `pathlib.Path`, `Phony` and `Variable`.

## API
"""

from .program import Program, resolve_tasks, TemplateCall
from .construct import construct
from .lazy import Lazy, LazyDB
from .task import Task, TaskDB, Phony, Variable, TaskProxy, Template
from .runner import Runner
from .cli import brei

__all__ = [
    "brei",

    "Lazy", "LazyDB", "Phony", "Program", "Runner", "Task", "TaskDB",
    "TaskProxy", "Template", "TemplateCall", "Variable", "construct",
    "resolve_tasks",
]
Logging

Logging is formatted by the rich module.

file:brei/logging.py
import logging
import sys
from rich.highlighter import RegexHighlighter
from rich.logging import RichHandler

def logger():
    return logging.getLogger("brei")

def configure_logger(debug: bool, rich: bool = True):
    class BackTickHighlighter(RegexHighlighter):
        highlights = [r"`(?P<bold>[^`]*)`"]

    if rich:
        FORMAT = "%(message)s"
        logging.basicConfig(
            level=logging.DEBUG if debug else logging.INFO,
            format=FORMAT,
            datefmt="[%X]",
            handlers=[RichHandler(show_path=debug, highlighter=BackTickHighlighter())],
        )
    else:
        logging.basicConfig(
            level=logging.DEBUG if debug else logging.INFO,
            handlers=[logging.StreamHandler(sys.stdout)]
        )
Command-line interface
file:brei/cli.py
from argparse import ArgumentParser
from pathlib import Path
import re
import sys
import textwrap
import tomllib
from typing import Optional
import argh  # type: ignore
import asyncio
from rich.console import Console

from rich_argparse import RichHelpFormatter
from rich.table import Table

from .runner import DEFAULT_RUNNERS
from .errors import HelpfulUserError, UserError
from .lazy import Phony
from .utility import construct, read_from_file
from .program import Program, resolve_tasks
from .logging import logger, configure_logger
from .version import __version__


log = logger()


async def main(
    program: Program, target_strs: list[str], force_run: bool, throttle: Optional[int]
):
    db = await resolve_tasks(program)
    for t in db.tasks:
        log.debug(str(t))
    if throttle:
        db.throttle = asyncio.Semaphore(throttle)
    db.force_run = force_run
    results = await asyncio.gather(*(db.run(Phony(t), db=db) for t in target_strs))
    if not all(results):
        log.error("Some jobs have failed:")
        for r in results:
            if not r:
                msg = textwrap.indent(str(r), "| ")
                log.error(msg)


@argh.arg("targets", nargs="*", help="names of tasks to run")
@argh.arg(
    "-i",
    "--input-file",
    help="Brei TOML or JSON file, use a `[...]` suffix to indicate a subsection.",
)
@argh.arg("-B", "--force-run", help="rebuild all dependencies")
@argh.arg("-j", "--jobs", help="limit number of concurrent jobs")
@argh.arg("-v", "--version", help="print version number and exit")
@argh.arg("--list-runners", help="show default configured runners")
@argh.arg("--debug", help="more verbose logging")
def brei(
    targets: list[str],
    *,
    input_file: Optional[str] = None,
    force_run: bool = False,
    jobs: Optional[int] = None,
    version: bool = False,
    list_runners: bool = False,
    debug: bool = False
):
    """Build one of the configured targets."""
    if version:
        print(f"Brei {__version__}, Copyright (c) 2023 Netherlands eScience Center.")
        print("Licensed under the Apache License, Version 2.0.")
        sys.exit(0)

    if list_runners:
        t = Table(title="Default Runners", header_style="italic green", show_edge=False)
        t.add_column("runner", style="bold yellow")
        t.add_column("executable")
        t.add_column("arguments")
        for r, c in DEFAULT_RUNNERS.items():
            t.add_row(r, c.command, f"{c.args}")
        console = Console()
        console.print(t)
        sys.exit(0)

    if input_file is not None:
        if m := re.match(r"([^\[\]]+)\[([^\[\]\s]+)\]", input_file):
            input_path = Path(m.group(1))
            section = m.group(2)
        else:
            input_path = Path(input_file)
            section = None

        program = read_from_file(Program, input_path, section)

    elif Path("brei.toml").exists():
        program = read_from_file(Program, Path("brei.toml"))

    elif Path("pyproject.toml").exists():
        with open("pyproject.toml", "rb") as f_in:
            data = tomllib.load(f_in)
        try:
            for s in ["tool", "brei"]:
                data = data[s]
        except KeyError as e:
            raise HelpfulUserError(
                f"With out the `-f` argument, Brei looks for `brei.toml` first, then for "
                f"a `[tool.brei]` section in `pyproject.toml`. A `pyproject.toml` file was "
                f"found, but contained no `[tool.brei]` section."
            ) from e

        program = construct(Program, data)
    else:
        raise HelpfulUserError(
            "No input file given, no `loom.toml` found and no `pyproject.toml` found."
        )

    jobs = int(jobs) if jobs else None
    configure_logger(debug)
    try:
        asyncio.run(main(program, targets, force_run, jobs))
    except UserError as e:
        log.error(f"Failed: {e}")


def cli():
    parser = ArgumentParser(formatter_class=RichHelpFormatter)
    argh.set_default_command(parser, brei)
    argh.dispatch(parser)


if __name__ == "__main__":
    cli()

Laziness

Brei executes workflows in Asyncio, through lazy evaluation and memoization. The Lazy class contains a asyncio.lock and a Result object. When multiple tasks ask for the result of the same dependent task, the lock makes sure a computation is perforemed only once. Once the lock is free, all future requests immediately return the memoized result.

Results

One common problem when using workflow systems in Python, is that errors are not tracable to their source. It is quite common to get a stack-trace that leads to the internals of the workflow system, but not to the cause of the error. The way around this, is not to use Python’s exception system (at least not in the outer layer), but rather signify errors by returning Failure objects.

file:test/test_result.py
from brei.result import Failure, TaskFailure, Ok
from hypothesis import given
from hypothesis.strategies import builds, booleans, text, integers


results = builds(lambda b, t, i: Ok(i) if b else TaskFailure(t),
                 booleans(), text(min_size=1), integers())


@given(results)
def test_result(r):
    assert (r and hasattr(r, "value")) or (not r and isinstance(r, Failure))
file:brei/result.py
from typing import TypeVar, Generic
from dataclasses import dataclass


T = TypeVar("T")
R = TypeVar("R")


class Failure(Exception):
    def __bool__(self):
        return False


@dataclass
class MissingFailure(Failure, Generic[T]):
    target: T

    def __str__(self):
        return f"Missing dependency: {self.target}"


@dataclass
class TaskFailure(Failure):
    message: str

    def __post_init__(self):
        Exception.__init__(self, self.message)


@dataclass
class DependencyFailure(Failure, Generic[T]):
    dependencies: dict[T, Failure]

    def __str__(self):
        return "\n".join(f"{key} -> {fail}" for key, fail in self.dependencies.items())

@dataclass
class Ok(Generic[R]):
    value: R

    def __bool__(self):
        return True


Result = Failure | Ok[R]

Lazy evaluation

file:brei/lazy.py
from __future__ import annotations
from copy import copy
from dataclasses import dataclass, field, fields
from typing import Generic, Iterable, Optional, Self, TypeVar, cast
import asyncio

from .errors import CyclicWorkflowError, HelpfulUserError
from .utility import FromStr
from .logging import logger
from .result import Failure, Result, Ok, DependencyFailure, TaskFailure, MissingFailure

T = TypeVar("T")
R = TypeVar("R")

log = logger()


@dataclass
class Phony(FromStr):
    name: str

    @classmethod
    def from_str(cls, s: str) -> Phony:
        if s[0] == "#":
            return Phony(s[1:])
        raise ValueError("A phony target should start with a `#` character.")

    def __str__(self):
        return f"#{self.name}"

    def __hash__(self):
        return hash(str(self))


@dataclass
class Lazy(Generic[T, R]):
    """Base class for tasks that are tagged with type `T` (usually `str` or
    `Path`) and representing values of type `R`.

    To implement a specific task, you need to implement the asynchronous
    `run` method, which should return a value of `R` or throw `TaskFailure`.

    Attributes:
        targets: list of target identifiers, for instance paths that are
            generated by running a particular task.
        dependencies: list of dependency identifiers. All of these need to
            be realized before the task can run.
        result (property): value of the result, once the task was run. This
            throws an exception if accessed before the task is complete.
    """

    creates: list[T]
    requires: list[T]

    _lock: asyncio.Lock = field(default_factory=asyncio.Lock, init=False)
    _result: Optional[Result[R]] = field(default=None, init=False)

    @property
    def real_requirements(self) -> list[T]:
        return [d for d in self.requires if not isinstance(d, Phony)]

    def __bool__(self):
        return self._result is not None and bool(self._result)

    @property
    def result(self) -> R:
        if self._result is None:
            raise ValueError("Task has not run yet.")
        if not self._result:
            raise ValueError("Task has failed.")
        assert isinstance(self._result, Ok)
        if isinstance(self._result.value, Lazy):
            return self._result.value.result
        return self._result.value

    async def run(self, *, db) -> R:
        raise NotImplementedError()

    async def run_after_deps(self, recurse, visited: dict[T, None], **kwargs) -> Result[R]:
        dep_res = await asyncio.gather(
            *(recurse(dep, copy(visited), **kwargs) for dep in self.requires)
        )
        if not all(dep_res):
            return DependencyFailure(
                {k: v for (k, v) in zip(self.requires, dep_res) if not v}
            )
        try:
            return Ok(await self.run(**kwargs))
        except TaskFailure as f:
            return f

    async def run_cached(self, recurse, visited: dict[T, None], **kwargs) -> Result[R]:
        async with self._lock:
            if self._result is not None:
                return self._result
            self._result = await self.run_after_deps(recurse, visited, **kwargs)
            return self._result

    def reset(self):
        self._result = None

    def fields(self):
        return {f.name: getattr(self, f.name) for f in fields(self) if f.name[0] != "_"}


TaskT = TypeVar("TaskT", bound=Lazy)


class MissingDependency(Exception):
    pass

@dataclass
class LazyDB(Generic[T, TaskT]):
    """Collect tasks and coordinate running a task from a task identifier."""

    tasks: list[TaskT] = field(default_factory=list)
    index: dict[T, TaskT] = field(default_factory=dict)

    async def run(self, t: T, visited: dict[T, None] | None = None, **kwargs) -> Result[R]:
        visited = visited or dict()
        if t in visited:
            raise CyclicWorkflowError(list(visited.keys()))
        visited[t] = None

        if t not in self.index:
            try:
                task = self.on_missing(t)
            except MissingDependency:
                return MissingFailure(t)
        else:
            task = self.index[t]

        while True:
            match (result := await task.run_cached(self.run, visited, **kwargs)):
                case Ok(x) if isinstance(x, Lazy):
                    task = cast(TaskT, x)
                case _:
                    return result

    def on_missing(self, _: T) -> TaskT:
        raise MissingDependency()

    def add(self, task: TaskT):
        """Add a task to the DB."""
        log.debug(f"adding task ===\n{task}")
        self.tasks.append(task)
        for target in task.creates:
            self.index[target] = task

    def clean(self):
        self.tasks = []
        self.index = {}

    def reset(self):
        for t in self.tasks:
            t.reset()

Template strings

The goal is to have more flexible templates in Brei. I’ve looked into Jinja as a way to expand templates, but this is rejected due to large mostly unneeded complexity and ditto dependencies. A better alternative is to use Python’s string.Template to do variable substition. Evaluation needs to be lazy, and I would like to be able to pipe standard output of a task to the contents of a variable. Considering this last point, we want to differentiate between writing output to the contents of a variable, and writing output to the file pointed to by the variable.

To assign the output of a command to a variable, we can have the following:

[[task]]
stdout = "var(commit)"
language = "Bash"
script = "git rev-parse HEAD"

To use it, refer using Python’s string.Template syntax, which is similar to Bash, Make, Perl etc, i.e. either $commit or ${commit}.

[[task]]
targets = ["output/${commit}/run.h5"]
dependencies = ["build/bin/model", "data/input.csv"]
language = "Bash"
script = "model run"

The system should infer that the use of $commit creates an implicit dependency on running git rev-parse HEAD. However, there may be steps in between:

[environment]
output_path = "output/${commit}"

[[task]]
targets = ["${output_path}/run.h5"]
... etc ...

We can trace these variable substitutions using the same lazy evaluation strategy as the workflow itself.

file:brei/template_strings.py
from dataclasses import dataclass, is_dataclass, fields
from string import Template
from typing import Any, Generic, Mapping, TypeVar, cast
from functools import singledispatch


from .lazy import Lazy


T = TypeVar("T")


@singledispatch
def substitute(template, env: Mapping[str, str]):
    dtype = type(template)
    if is_dataclass(dtype):
        args = {
            f.name: substitute(getattr(template, f.name), env)
            for f in fields(dtype)
            if f.name[0] != "_"
        }
        return dtype(**args)

    return template
    # raise TypeError(f"Can't perform string substitution on object of type: {dtype}")


@substitute.register
def _(template: str, env: Mapping[str, str]) -> str:
    return Template(template).safe_substitute(env)


@substitute.register
def _(template: list, env: Mapping[str, str]) -> list:
    return [substitute(x, env) for x in template]

@substitute.register
def _(_template: None, _) -> None:
    return None


@singledispatch
def gather_args(template: Any) -> set[str]:
    dtype = type(template)
    if is_dataclass(dtype):
        args = (
            gather_args(getattr(template, f.name))
            for f in fields(dtype)
            if f.name[0] != "_"
        )
        return set().union(*args)

    return set()
    # raise TypeError(f"Can't perform string substitution on object of type: {dtype}")


@gather_args.register
def _(template: str) -> set[str]:
    return set(Template(template).get_identifiers())


@gather_args.register
def _(template: list) -> set[str]:
    return set().union(*map(gather_args, template))


@gather_args.register
def _(_template: None) -> set[str]:
    return set()
file:test/test_template_strings.py
from dataclasses import dataclass
from typing import Iterable, Optional
import pytest
from brei.task import TemplateVariable, Variable
from brei.template_strings import gather_args, substitute
from brei.lazy import LazyDB


class Environment(LazyDB[Variable, TemplateVariable]):
    def __setitem__(self, k: str, v: str):
        self.add(TemplateVariable([Variable(k)], [], v))

    def __getitem__(self, k: str) -> str:
        return self.index[Variable(k)].result

    def __contains__(self, k: str) -> bool:
        return Variable(k) in self.index

    def items(self) -> Iterable[str]:
        return (k.name for k in self.index if isinstance(k, Variable))

    @property
    def environment(self):
        return self


@pytest.mark.asyncio
async def test_template_string():
    env = Environment()
    env["x"] = "Hello, ${y}!"
    env["y"] = "World"
    env["z"] = "print('${x}')"
    await env.run(Variable("z"), db=env)
    assert env["x"] == "Hello, World!"
    assert env["z"] == "print('Hello, World!')"


@dataclass
class MyData:
    some_list: list[str]
    some_prop: str
    some_none: Optional[str] = None


def test_template_dtype():
    data = MyData(
        some_list = ["${x} bar", "bar ${x} bar"],
        some_prop = "bar ${x}"
    )

    assert gather_args(data) == set("x")
    subst =  substitute(data, {"x": "foo"})
    assert subst.some_list == ["foo bar", "bar foo bar"]
    assert subst.some_prop == "bar foo"
    assert subst.some_none is None

Tasks

file:brei/runner.py
from dataclasses import dataclass


@dataclass
class Runner:
    command: str
    args: list[str]


DEFAULT_RUNNERS: dict[str, Runner] = {
    "python": Runner("python", ["${script}"]),
    "bash": Runner("bash", ["${script}"]),
}
file:brei/task.py
from __future__ import annotations
import asyncio
from contextlib import contextmanager, nullcontext
from copy import copy
from dataclasses import dataclass, field
from pathlib import Path
import re
import string
from tempfile import NamedTemporaryFile
from typing import Any, Optional, TextIO
from asyncio import create_subprocess_exec
from textwrap import indent
import shlex

from .result import TaskFailure
from .lazy import MissingDependency, Lazy, LazyDB, Phony
from .utility import stat
from .logging import logger
from .template_strings import gather_args, substitute
from .runner import Runner, DEFAULT_RUNNERS


log = logger()


@dataclass
class Variable:
    name: str

    def __hash__(self):
        return hash(f"var({self.name})")

    def __str__(self):
        return f"var({self.name})"


def str_to_target(s: str) -> Path | Phony | Variable:
    if s[0] == "#":
        return Phony(s[1:])
    elif m := re.match(r"var\(([^\s\(\)]+)\)", s):
        return Variable(m.group(1))
    else:
        return Path(s)


def is_oneliner(s: str) -> bool:
    return len(s.splitlines()) == 1


@dataclass
class Task(Lazy[Path | Phony | Variable, str | None]):
    name: Optional[str] = None
    runner: Optional[str] = None
    path: Optional[Path] = None
    script: Optional[str] = None
    stdin: Optional[Path | Variable] = None
    stdout: Optional[Path | Variable] = None
    description: Optional[str] = None
    force: bool = False

    @property
    def target_paths(self):
        return (p for p in self.creates if isinstance(p, Path))

    @property
    def dependency_paths(self):
        return (p for p in self.requires if isinstance(p, Path))

    def __str__(self):
        tgts = ", ".join(str(t) for t in self.creates)
        deps = ", ".join(str(t) for t in self.requires)
        if self.script is not None:
            src = indent(self.script, prefix=" ▎ ", predicate=lambda _: True)
        elif self.path is not None:
            src = str(self.path)
        else:
            src = " - "
        name = f"{self.name}: " if self.name else ""
        return name + f"[{tgts}] <- [{deps}]\n" + src

    def __post_init__(self):
        if self.name and Phony(self.name) not in self.creates:
            self.creates.append(Phony(self.name))
        if self.stdin and self.stdin not in self.requires:
            self.requires.append(self.stdin)
        if self.path and self.path not in self.requires:
            self.requires.append(self.path)
        if self.stdout and self.stdout not in self.creates:
            self.creates.append(self.stdout)

    def always_run(self) -> bool:
        return self.force or len(list(self.target_paths)) == 0

    def needs_run(self) -> bool:
        if any(not p.exists() for p in self.target_paths):
            return True
        target_stats = [stat(p) for p in self.target_paths]
        dep_stats = [stat(p) for p in self.dependency_paths]
        if any(t < d for t in target_stats for d in dep_stats):
            return True
        return False

    @contextmanager
    def get_script_path(self):
        if self.path is not None:
            tmpfile = None
            path = self.path
        elif self.script is not None:
            tmpfile = NamedTemporaryFile("w")
            tmpfile.write(self.script)
            tmpfile.flush()
            path = Path(tmpfile.name)
        else:
            raise ValueError("A `Rule` can have either `path` or `script` defined.")

        yield path

        if tmpfile is not None:
            tmpfile.close()

    @contextmanager
    def get_stdout(self):
        match self.stdout:
            case Variable(x):
                yield asyncio.subprocess.PIPE
            case x if isinstance(x, Path):
                stdout = open(x, "w")
                yield stdout
                stdout.close()
            case _:
                yield None

    async def run(self, *, db: TaskDB):
        if not self.always_run() and not self.needs_run() and not db.force_run:
            tgts = " ".join(f"`{t}`" for t in self.target_paths)
            log.info(f"Targets {tgts} already up-to-date.")
            return

        log.debug(f"{self}")
        if (self.path is None and self.script is None):
            return

        targets = " ".join(f"`{t}`" for t in self.creates)
        short_note = self.description or (f"#{self.name}" if self.name else None) \
            or f"creating {targets}"
        log.info(f"[green]{short_note}[/]", extra={'markup': True})

        stdin: TextIO | int | None = None
        match self.stdin:
            case Variable(x):
                stdin = asyncio.subprocess.PIPE
                input_data = db.environment[x].encode()
            case x if isinstance(x, Path):
                stdin = open(x, "r")
                input_data = None
            case _:
                stdin = None
                input_data = None

        if self.runner is None and self.script is not None:
            if not is_oneliner(self.script):
                assert self.stdin is None
            with self.get_stdout() as stdout:
                stdout_data = b""
                for line in self.script.splitlines():
                    async with db.throttle or nullcontext():
                        proc = await create_subprocess_exec(
                            *shlex.split(line),
                            stdin=stdin,
                            stdout=stdout,
                            stderr=asyncio.subprocess.PIPE,
                        )
                        stdout_data_part, stderr_data = await proc.communicate(input_data)
                        log.debug(f"return-code {proc.returncode}")
                    if stdout_data_part:
                        stdout_data += stdout_data_part
                    if stderr_data:
                        log.info(f"[gold1]{short_note}[/] %s", stderr_data.decode().rstrip(), extra={"markup": True})

        elif self.runner is not None:
            with self.get_script_path() as path, self.get_stdout() as stdout:
                runner = db.runners[self.runner]
                args = [string.Template(arg).substitute(script=path) for arg in runner.args]
                async with db.throttle or nullcontext():
                    proc = await create_subprocess_exec(
                        runner.command,
                        *args,
                        stdin=stdin,
                        stdout=stdout,
                        stderr=asyncio.subprocess.PIPE,
                    )
                    stdout_data, stderr_data = await proc.communicate(input_data)
                    log.debug(f"return-code {proc.returncode}")

            if stderr_data:
                log.info(f"[gold1]{short_note}[/] %s", stderr_data.decode().rstrip(), extra={"markup": True})

        else:
            return

        if self.needs_run():
            raise TaskFailure("Task didn't achieve goals.")

        return stdout_data.decode().strip() if stdout_data else None


@dataclass
class TaskProxy:
    """Input to create an actual Task.

    An actual `Task` has no template variables remaining and untyped strings
    are replaced with `Path` `Variable` or `Phony` objects.
    """
    creates: list[str] = field(default_factory=list)
    requires: list[str] = field(default_factory=list)
    name: Optional[str] = None
    runner: Optional[str] = None
    path: Optional[str] = None
    script: Optional[str] = None
    stdin: Optional[str] = None
    stdout: Optional[str] = None
    description: Optional[str] = None
    force: bool = False

    @property
    def all_targets(self):
        return (
            self.creates
            + ([self.stdout] if self.stdout else [])
            + ([f"#{self.name}"] if self.name else [])
        )

    @property
    def all_dependencies(self):
        return (
            self.requires
            + ([self.stdin] if self.stdin else [])
            + ([self.path] if self.path else [])
        )


@dataclass
class TemplateVariable(Lazy[Variable, str]):
    template: str

    def __post_init__(self):
        self.requires += [Variable(arg) for arg in gather_args(self.template)]

    async def run(self, *, db) -> str:
        return substitute(self.template, db.environment)


@dataclass
class TemplateTask(Lazy[Path | Phony | Variable, Task]):
    template: TaskProxy

    def __post_init__(self):
        assert not gather_args(self.template.creates)
        self.creates += [str_to_target(t) for t in self.template.all_targets]
        self.requires += [Variable(arg) for arg in gather_args(self.template)]

    async def run(self, *, db):
        proxy = substitute(self.template, db.environment)
        tgts = [str_to_target(t) for t in proxy.all_targets]
        deps = [str_to_target(t) for t in proxy.all_dependencies]
        path = Path(proxy.path) if proxy.path else None
        stdin = str_to_target(proxy.stdin) if proxy.stdin else None
        stdout = str_to_target(proxy.stdout) if proxy.stdout else None
        assert not isinstance(stdin, Phony) and not isinstance(stdout, Phony)
        task = Task(
            tgts,
            deps,
            proxy.name,
            proxy.runner,
            path,
            proxy.script,
            stdin,
            stdout,
            proxy.description,
            proxy.force
        )
        return task


@dataclass
class TaskDB(LazyDB[Path | Variable | Phony, Task | TemplateTask | TemplateVariable]):
    runners: dict[str, Runner] = field(default_factory=lambda: copy(DEFAULT_RUNNERS))
    throttle: Optional[asyncio.Semaphore] = None
    force_run: bool = False

    def on_missing(self, t: Path | Phony | Variable):
        if isinstance(t, Path) and t.exists():
            return Task([t], [])
        raise MissingDependency()

    def is_resolvable(self, s: Any) -> bool:
        return all(v in self.index for v in map(Variable, gather_args(s)))

    async def resolve_object(self, s: Any) -> Any:
        vars = gather_args(s)
        await asyncio.gather(*(self.run(Variable(v), db=self) for v in vars))
        result = substitute(s, self.environment)
        log.debug(f"substituting {s} => {result}")
        return result

    @property
    def environment(self):
        return Environment(self)


class Environment:
    def __init__(self, db):
        self.db = db

    def __contains__(self, k: str):
        return Variable(k) in self.db.index

    def items(self):
        return (k.name for k in self.db.index if isinstance(k, Variable))

    def __getitem__(self, k: str):
        return self.db.index[Variable(k)].result


class Template(TaskProxy):
    """A `Template` can receive the same arguments as a TaskProxy. The difference is 
    that any template variables in the Template can be substituted with a `TemplateCall`."""
    def call(self, args: dict[str, Any]) -> TaskProxy:
        return substitute(self, args)

Programs

file:brei/program.py
from __future__ import annotations
import asyncio
from copy import copy
from itertools import chain, product, repeat
from dataclasses import dataclass, field
from enum import Enum
import json
from pathlib import Path

import tomllib


from .template_strings import gather_args
from .logging import logger
from .errors import HelpfulUserError, UserError

from .utility import construct, read_from_file
from .task import (
    Variable,
    TaskDB,
    Template,
    Runner,
    TaskProxy,
    TemplateTask,
    TemplateVariable,
)


log = logger()


@dataclass
class MissingInclude(UserError):
    path: Path

    def __str__(self):
        return f"Include `{self.path}` not found."


@dataclass
class MissingTemplate(UserError):
    name: str

    def __str__(self):
        return f"Template `{self.name}` not found."


class Join(Enum):
    INNER = 1
    OUTER = 2


@dataclass
class TemplateCall:
    """Calls a template with a set of arguments.

    Members:

      - template: name of the template.
      - args: arguments to the call.
      - collect: name of the phony target by which to collect all generated targets.
      - join: `inner` or `outer` join.
    """
    template: str
    args: dict[str, str | list[str]]
    collect: str | None = None
    join: Join = Join.INNER

    @property
    def all_args(self):
        if all(isinstance(v, str) for v in self.args.values()):
            yield self.args
            return

        if self.join == Join.INNER:
            for v in zip(
                *map(
                    lambda x: repeat(x) if isinstance(x, str) else x,
                    self.args.values(),
                )
            ):
                yield dict(zip(self.args.keys(), v))

        else:  # cartesian product
            for v in product(
                *map(lambda x: [x] if isinstance(x, str) else x, self.args.values())
            ):
                yield dict(zip(self.args.keys(), v))


@dataclass
class Program:
    """A Brei program.

    Members:

      - task: list of tasks.
      - environment: variables.
      - template: set of templates.
      - call: list of calls to templates.
      - include: list of includes.
      - runner: extra configured task runners.
    """
    task: list[TaskProxy] = field(default_factory=list)
    environment: dict[str, str] = field(default_factory=dict)
    template: dict[str, Template] = field(default_factory=dict)
    call: list[TemplateCall] = field(default_factory=list)
    include: list[str] = field(default_factory=list)
    runner: dict[str, Runner] = field(default_factory=dict)

    @staticmethod
    def read(path: Path, section: str | None = None) -> Program:
        return read_from_file(Program, path, section)


def tasks_from_call(template: Template, call: TemplateCall) -> list[TaskProxy]:
    tasks = [template.call(args) for args in call.all_args]
    if call.collect:
        targets = list(chain.from_iterable(t.all_targets for t in tasks))
        collection = TaskProxy([], targets, name=call.collect)
        return tasks + [collection]
    else:
        return tasks


async def resolve_delayed(db: TaskDB, tasks: list[TaskProxy]) -> list[TaskProxy]:
    """Resolve `tasks` (substituting variables in targets).

    Returns: list of unresolvable tasks.
    """
    async def resolve(task: TaskProxy) -> TaskProxy | None:
        if not db.is_resolvable(task.all_targets):
            return task
        tt = await db.resolve_object(task)
        db.add(TemplateTask([], [], tt))
        return None

    return [t for t in await asyncio.gather(*map(resolve, tasks)) if t]


async def resolve_tasks(program: Program) -> TaskDB:
    """Resolve a program. A resolved program has all of its includes and
    template calls done, so that only tasks remains. In order to resolve
    a program, some tasks may need to be run. Variables that appear in
    the `creates` field of a task (aka targets), will be resolved eagerly.

    Returns: TaskDB instance.
    """
    db = TaskDB()
    template_index = dict()

    async def go(program: Program):
        for var, template in program.environment.items():
            db.add(TemplateVariable([Variable(var)], [], template))

        task_templates = copy(program.task)
        template_index.update(program.template)
        delayed_calls: list[TemplateCall] = []
        delayed_templates: list[TaskProxy] = []

        db.runners.update(program.runner)

        for c in program.call:
            if c.template not in template_index:
                log.debug(
                    "template `%s` not available, waiting for includes to resolve",
                    c.template,
                )
                delayed_calls.append(c)
                continue

            task_templates.extend(tasks_from_call(template_index[c.template], c))

        for tt in task_templates:
            # we could check for resolvability here, but I don't like the
            # idea that order then matters. this way the rule is:
            # > if a task has a templated target, those variables should be
            # > resolvable after all other tasks were added, seeing that the
            # > task to resolve these variables can't have templated targets
            # > themselves.
            if gather_args(tt.all_targets):
                delayed_templates.append(tt)
            else:
                db.add(TemplateTask([], [], tt))

        delayed_templates = await resolve_delayed(db, delayed_templates)

        for inc in program.include:
            incp = Path(await db.resolve_object(inc))
            if incp in db.index:
                await db.run(incp, db=db)
            if not incp.exists():
                raise MissingInclude(incp)

            prg = Program.read(incp)
            await go(prg)

        for c in delayed_calls:
            if c.template not in template_index:
                log.debug(
                    "template `%s` still not available, now this is an error", c.template
                )
                raise MissingTemplate(c.template)

            for tt in tasks_from_call(template_index[c.template], c):
                if gather_args(tt.creates):
                    delayed_templates.append(tt)
                else:
                    db.add(TemplateTask([], [], tt))

        delayed_templates = await resolve_delayed(db, delayed_templates)
        if delayed_templates:
            unresolvable = [p for t in delayed_templates for p in t.creates if not db.is_resolvable(p)]
            raise UserError(f"Task has unresolvable targets: {unresolvable}")

        return db

    return await go(program)

Utils

file:brei/utility.py
from __future__ import annotations

from dataclasses import dataclass

from pathlib import Path
from datetime import datetime

import os

from .construct import construct, FromStr, read_from_file 


def normal_relative(path: Path) -> Path:
    return path.resolve()  # .relative_to(Path.cwd())


@dataclass
class FileStat:
    path: Path
    modified: datetime

    @staticmethod
    def from_path(path: Path):
        stat = os.stat(path)
        return FileStat(path, datetime.fromtimestamp(stat.st_mtime))

    def __lt__(self, other: FileStat) -> bool:
        return self.modified < other.modified


def stat(path: Path) -> FileStat:
    path = normal_relative(path)
    return FileStat.from_path(path)
file:brei/async_timer.py
from dataclasses import dataclass
import time
from contextlib import asynccontextmanager

@dataclass
class Elapsed:
    elapsed: float | None = None

@asynccontextmanager
async def timer():
    e = Elapsed()
    t = time.perf_counter()
    yield e
    e.elapsed = time.perf_counter() - t
file:brei/construct.py
import typing
from typing import Any, Self, Union, TypeVar, TypeGuard, Type, Optional, cast
import types

import tomllib
import json

from dataclasses import is_dataclass
from enum import Enum
from pathlib import Path

from .errors import HelpfulUserError, InputError

T = TypeVar("T")


def isgeneric(annot):
    return typing.get_origin(annot) and hasattr(annot, "__args__")


class FromStr:
    @classmethod
    def from_str(cls, _: str) -> Self:
        raise NotImplementedError()


def construct(annot: Any, json: Any) -> Any:
    try:
        return _construct(annot, json)
    except (AssertionError, ValueError) as e:
        raise InputError(annot, json) from e


def is_object_type(dtype: Type[Any]) -> TypeGuard[Type[dict[str, Any]]]:
    return (
        isgeneric(dtype)
        and typing.get_origin(dtype) is dict
        and typing.get_args(dtype)[0] is str
    )


def is_optional_type(dtype: Type[Any]) -> TypeGuard[Type[Optional[Any]]]:
    return (
        isgeneric(dtype)
        and typing.get_origin(dtype) is Union
        and typing.get_args(dtype)[1] is types.NoneType
    )


def _construct(annot: Type[T], json: Any) -> T:
    """Construct an object from a given type from a JSON stream.

    The `annot` type should be one of: str, int, list[T], Optional[T],
    or a dataclass, and the JSON data should match exactly the given
    definitions in the dataclass hierarchy.
    """
    if annot is bool:
        assert isinstance(json, bool)
        return cast(T, json)
    if annot is str:
        assert isinstance(json, str)
        return cast(T, json)
    if annot is int:
        assert isinstance(json, int)
        return cast(T, json)
    if is_object_type(annot):
        assert isinstance(json, dict)
        return cast(
            T, {k: construct(typing.get_args(annot)[1], v) for k, v in json.items()}
        )
    if annot is Any:
        return cast(T, json)
    # if annot is dict or isgeneric(annot) and typing.get_origin(annot) is dict:
    #    assert isinstance(json, dict)
    #    return json
    if annot is Path and isinstance(json, str):
        return cast(T, Path(json))
    if isgeneric(annot) and typing.get_origin(annot) is list:
        assert isinstance(json, list)
        return cast(T, [construct(typing.get_args(annot)[0], item) for item in json])
    if is_optional_type(annot):
        if json is None:
            return cast(T, None)
        else:
            return cast(T, construct(typing.get_args(annot)[0], json))
    if isgeneric(annot) and typing.get_origin(annot) is types.UnionType:
        for dtype in typing.get_args(annot):
            try:
                return cast(T, _construct(dtype, json))
            except ValueError:
                continue
            except AssertionError:
                continue
        raise ValueError("None of the choices in type union match data.")
    if type(annot) is type and issubclass(annot, FromStr) and isinstance(json, str):
        return cast(T, annot.from_str(json))
    if is_dataclass(annot):
        assert isinstance(json, dict)
        arg_annot = typing.get_type_hints(annot)
        # assert all(k in json for k in arg_annot)
        args = {k: construct(arg_annot[k], json[k]) for k in json}
        return cast(T, annot(**args))
    if isinstance(json, str) and isinstance(annot, type) and issubclass(annot, Enum):
        options = {opt.name.lower(): opt for opt in annot}
        assert json.lower() in options
        return cast(T, options[json.lower()])
    raise ValueError(f"Couldn't construct {annot} from {repr(json)}")


def read_from_file(data_type: Type[T], path: Path, section: Optional[str] = None) -> T:
    """Read a config from given `path` in given `section`. The path should refer to
    a TOML or JSON file that should decode to a `Config` object. If `section` is given, only
    that section is decoded to a `Config` object. The `section` string may contain
    periods to indicate deeper nesting.

    Example:

    ```python
    read_from_file(Config, Path("./pyproject.toml"), "tool.loom")
    ```
    """
    if not path.exists():
        raise HelpfulUserError(f"File not found: {path}")
    with open(path, "rb") as f:
        if path.suffix == ".toml":
            data: Any = tomllib.load(f)
        elif path.suffix == ".json":
            data = json.load(f)
        else:
            raise HelpfulUserError(f"Unrecognized file format: {path}")

    try:
        if section is not None:
            for s in section.split("."):
                data = data[s]
    except KeyError as e:
        raise HelpfulUserError(
            f"Data file `{path}` should contain section `{section}`."
        ) from e

    return construct(data_type, data)