Implementation
This architecture documentation is very much a work in progress.
brei --version
Brei 0.2.3, Copyright (c) 2023 Netherlands eScience Center. Licensed under the Apache License, Version 2.0.
Test coverage
Name | Stmts | Miss | Cover |
---|---|---|---|
brei/__init__.py | 7 | 0 | 100% |
brei/async_timer.py | 12 | 0 | 100% |
brei/cli.py | 87 | 55 | 37% |
brei/construct.py | 86 | 16 | 81% |
brei/errors.py | 28 | 5 | 82% |
brei/lazy.py | 102 | 11 | 89% |
brei/logging.py | 13 | 6 | 54% |
brei/program.py | 116 | 16 | 86% |
brei/result.py | 28 | 2 | 93% |
brei/runner.py | 6 | 0 | 100% |
brei/task.py | 260 | 15 | 94% |
brei/template_strings.py | 38 | 0 | 100% |
brei/utility.py | 21 | 0 | 100% |
brei/version.py | 2 | 0 | 100% |
TOTAL | 806 | 126 | 84% |
Python module
from importlib import metadata
= metadata.version("brei") __version__
"""
Welcome to Brei's API documentation. There are two ways to use Brei: from the
command-line (in which case we refer to the homepage for documentation), or
straight from Python. The easiest function to work with is `brei()`, which
links to the command-line app one-to-one.
## Program
If you want to read the `Program` yourself, there are several ways to do so:
1. Use `Program.read()`. You give it a `Path` to a TOML or JSON file and a
section, this last bit giving a object path into the data. For instance:
`Program.read(Path("pyproject.toml"), "tool.brei")`.
2. Read your own data format into JSON compatible data, then
`construct(Program, data)`. The `construct` function uses the type annotations
in dataclasses to validate the input data.
After reading the data, you'll want to resolve all tasks, i.e. perform includes
and run any necessary task to resolve the targets of all other tasks.
program = Program.read(Path("brei.toml"))
db: TaskDB = await resolve_tasks(program)
await db.run(Phony("all"))
There are three kinds of targets: `pathlib.Path`, `Phony` and `Variable`.
## API
"""
from .program import Program, resolve_tasks, TemplateCall
from .construct import construct
from .lazy import Lazy, LazyDB
from .task import Task, TaskDB, Phony, Variable, TaskProxy, Template
from .runner import Runner
from .cli import brei
= [
__all__ "brei",
"Lazy", "LazyDB", "Phony", "Program", "Runner", "Task", "TaskDB",
"TaskProxy", "Template", "TemplateCall", "Variable", "construct",
"resolve_tasks",
]
Logging
Logging is formatted by the rich
module.
import logging
import sys
from rich.highlighter import RegexHighlighter
from rich.logging import RichHandler
def logger():
return logging.getLogger("brei")
def configure_logger(debug: bool, rich: bool = True):
class BackTickHighlighter(RegexHighlighter):
= [r"`(?P<bold>[^`]*)`"]
highlights
if rich:
= "%(message)s"
FORMAT
logging.basicConfig(=logging.DEBUG if debug else logging.INFO,
levelformat=FORMAT,
="[%X]",
datefmt=[RichHandler(show_path=debug, highlighter=BackTickHighlighter())],
handlers
)else:
logging.basicConfig(=logging.DEBUG if debug else logging.INFO,
level=[logging.StreamHandler(sys.stdout)]
handlers )
Command-line interface
from argparse import ArgumentParser
from pathlib import Path
import re
import sys
import textwrap
import tomllib
from typing import Optional
import argh # type: ignore
import asyncio
from rich.console import Console
from rich_argparse import RichHelpFormatter
from rich.table import Table
from .runner import DEFAULT_RUNNERS
from .errors import HelpfulUserError, UserError
from .lazy import Phony
from .utility import construct, read_from_file
from .program import Program, resolve_tasks
from .logging import logger, configure_logger
from .version import __version__
= logger()
log
async def main(
list[str], force_run: bool, throttle: Optional[int]
program: Program, target_strs:
):= await resolve_tasks(program)
db for t in db.tasks:
str(t))
log.debug(if throttle:
= asyncio.Semaphore(throttle)
db.throttle = force_run
db.force_run = await asyncio.gather(*(db.run(Phony(t), db=db) for t in target_strs))
results if not all(results):
"Some jobs have failed:")
log.error(for r in results:
if not r:
= textwrap.indent(str(r), "| ")
msg
log.error(msg)
@argh.arg("targets", nargs="*", help="names of tasks to run")
@argh.arg(
"-i",
"--input-file",
help="Brei TOML or JSON file, use a `[...]` suffix to indicate a subsection.",
)@argh.arg("-B", "--force-run", help="rebuild all dependencies")
@argh.arg("-j", "--jobs", help="limit number of concurrent jobs")
@argh.arg("-v", "--version", help="print version number and exit")
@argh.arg("--list-runners", help="show default configured runners")
@argh.arg("--debug", help="more verbose logging")
def brei(
list[str],
targets: *,
str] = None,
input_file: Optional[bool = False,
force_run: int] = None,
jobs: Optional[bool = False,
version: bool = False,
list_runners: bool = False
debug:
):"""Build one of the configured targets."""
if version:
print(f"Brei {__version__}, Copyright (c) 2023 Netherlands eScience Center.")
print("Licensed under the Apache License, Version 2.0.")
0)
sys.exit(
if list_runners:
= Table(title="Default Runners", header_style="italic green", show_edge=False)
t "runner", style="bold yellow")
t.add_column("executable")
t.add_column("arguments")
t.add_column(for r, c in DEFAULT_RUNNERS.items():
f"{c.args}")
t.add_row(r, c.command, = Console()
console print(t)
console.0)
sys.exit(
if input_file is not None:
if m := re.match(r"([^\[\]]+)\[([^\[\]\s]+)\]", input_file):
= Path(m.group(1))
input_path = m.group(2)
section else:
= Path(input_file)
input_path = None
section
= read_from_file(Program, input_path, section)
program
elif Path("brei.toml").exists():
= read_from_file(Program, Path("brei.toml"))
program
elif Path("pyproject.toml").exists():
with open("pyproject.toml", "rb") as f_in:
= tomllib.load(f_in)
data try:
for s in ["tool", "brei"]:
= data[s]
data except KeyError as e:
raise HelpfulUserError(
f"With out the `-f` argument, Brei looks for `brei.toml` first, then for "
f"a `[tool.brei]` section in `pyproject.toml`. A `pyproject.toml` file was "
f"found, but contained no `[tool.brei]` section."
from e
)
= construct(Program, data)
program else:
raise HelpfulUserError(
"No input file given, no `loom.toml` found and no `pyproject.toml` found."
)
= int(jobs) if jobs else None
jobs
configure_logger(debug)try:
asyncio.run(main(program, targets, force_run, jobs))except UserError as e:
f"Failed: {e}")
log.error(
def cli():
= ArgumentParser(formatter_class=RichHelpFormatter)
parser
argh.set_default_command(parser, brei)
argh.dispatch(parser)
if __name__ == "__main__":
cli()
Laziness
Brei executes workflows in Asyncio, through lazy evaluation and
memoization. The Lazy
class contains a
asyncio.lock
and a Result
object. When
multiple tasks ask for the result of the same dependent task, the lock
makes sure a computation is perforemed only once. Once the lock is free,
all future requests immediately return the memoized result.
Results
One common problem when using workflow systems in Python, is that
errors are not tracable to their source. It is quite common to get a
stack-trace that leads to the internals of the workflow system, but not
to the cause of the error. The way around this, is not to use Python’s
exception system (at least not in the outer layer), but rather signify
errors by returning Failure
objects.
from brei.result import Failure, TaskFailure, Ok
from hypothesis import given
from hypothesis.strategies import builds, booleans, text, integers
= builds(lambda b, t, i: Ok(i) if b else TaskFailure(t),
results =1), integers())
booleans(), text(min_size
@given(results)
def test_result(r):
assert (r and hasattr(r, "value")) or (not r and isinstance(r, Failure))
from typing import TypeVar, Generic
from dataclasses import dataclass
= TypeVar("T")
T = TypeVar("R")
R
class Failure(Exception):
def __bool__(self):
return False
@dataclass
class MissingFailure(Failure, Generic[T]):
target: T
def __str__(self):
return f"Missing dependency: {self.target}"
@dataclass
class TaskFailure(Failure):
str
message:
def __post_init__(self):
Exception.__init__(self, self.message)
@dataclass
class DependencyFailure(Failure, Generic[T]):
dict[T, Failure]
dependencies:
def __str__(self):
return "\n".join(f"{key} -> {fail}" for key, fail in self.dependencies.items())
@dataclass
class Ok(Generic[R]):
value: R
def __bool__(self):
return True
= Failure | Ok[R] Result
Lazy evaluation
from __future__ import annotations
from copy import copy
from dataclasses import dataclass, field, fields
from typing import Generic, Iterable, Optional, Self, TypeVar, cast
import asyncio
from .errors import CyclicWorkflowError, HelpfulUserError
from .utility import FromStr
from .logging import logger
from .result import Failure, Result, Ok, DependencyFailure, TaskFailure, MissingFailure
= TypeVar("T")
T = TypeVar("R")
R
= logger()
log
@dataclass
class Phony(FromStr):
str
name:
@classmethod
def from_str(cls, s: str) -> Phony:
if s[0] == "#":
return Phony(s[1:])
raise ValueError("A phony target should start with a `#` character.")
def __str__(self):
return f"#{self.name}"
def __hash__(self):
return hash(str(self))
@dataclass
class Lazy(Generic[T, R]):
"""Base class for tasks that are tagged with type `T` (usually `str` or
`Path`) and representing values of type `R`.
To implement a specific task, you need to implement the asynchronous
`run` method, which should return a value of `R` or throw `TaskFailure`.
Attributes:
targets: list of target identifiers, for instance paths that are
generated by running a particular task.
dependencies: list of dependency identifiers. All of these need to
be realized before the task can run.
result (property): value of the result, once the task was run. This
throws an exception if accessed before the task is complete.
"""
list[T]
creates: list[T]
requires:
= field(default_factory=asyncio.Lock, init=False)
_lock: asyncio.Lock = field(default=None, init=False)
_result: Optional[Result[R]]
@property
def real_requirements(self) -> list[T]:
return [d for d in self.requires if not isinstance(d, Phony)]
def __bool__(self):
return self._result is not None and bool(self._result)
@property
def result(self) -> R:
if self._result is None:
raise ValueError("Task has not run yet.")
if not self._result:
raise ValueError("Task has failed.")
assert isinstance(self._result, Ok)
if isinstance(self._result.value, Lazy):
return self._result.value.result
return self._result.value
async def run(self, *, db) -> R:
raise NotImplementedError()
async def run_after_deps(self, recurse, visited: dict[T, None], **kwargs) -> Result[R]:
= await asyncio.gather(
dep_res *(recurse(dep, copy(visited), **kwargs) for dep in self.requires)
)if not all(dep_res):
return DependencyFailure(
for (k, v) in zip(self.requires, dep_res) if not v}
{k: v
)try:
return Ok(await self.run(**kwargs))
except TaskFailure as f:
return f
async def run_cached(self, recurse, visited: dict[T, None], **kwargs) -> Result[R]:
async with self._lock:
if self._result is not None:
return self._result
self._result = await self.run_after_deps(recurse, visited, **kwargs)
return self._result
def reset(self):
self._result = None
def fields(self):
return {f.name: getattr(self, f.name) for f in fields(self) if f.name[0] != "_"}
= TypeVar("TaskT", bound=Lazy)
TaskT
class MissingDependency(Exception):
pass
@dataclass
class LazyDB(Generic[T, TaskT]):
"""Collect tasks and coordinate running a task from a task identifier."""
list[TaskT] = field(default_factory=list)
tasks: dict[T, TaskT] = field(default_factory=dict)
index:
async def run(self, t: T, visited: dict[T, None] | None = None, **kwargs) -> Result[R]:
= visited or dict()
visited if t in visited:
raise CyclicWorkflowError(list(visited.keys()))
= None
visited[t]
if t not in self.index:
try:
= self.on_missing(t)
task except MissingDependency:
return MissingFailure(t)
else:
= self.index[t]
task
while True:
:= await task.run_cached(self.run, visited, **kwargs)):
match (result case Ok(x) if isinstance(x, Lazy):
= cast(TaskT, x)
task case _:
return result
def on_missing(self, _: T) -> TaskT:
raise MissingDependency()
def add(self, task: TaskT):
"""Add a task to the DB."""
f"adding task ===\n{task}")
log.debug(self.tasks.append(task)
for target in task.creates:
self.index[target] = task
def clean(self):
self.tasks = []
self.index = {}
def reset(self):
for t in self.tasks:
t.reset()
Template strings
The goal is to have more flexible templates in Brei. I’ve looked into
Jinja as a way to expand templates, but this is rejected due to large
mostly unneeded complexity and ditto dependencies. A better alternative
is to use Python’s string.Template
to do variable
substition. Evaluation needs to be lazy, and I would like to be able to
pipe standard output of a task to the contents of a variable.
Considering this last point, we want to differentiate between writing
output to the contents of a variable, and writing output to the file
pointed to by the variable.
To assign the output of a command to a variable, we can have the following:
[[task]]
stdout = "var(commit)"
language = "Bash"
script = "git rev-parse HEAD"
To use it, refer using Python’s string.Template
syntax,
which is similar to Bash, Make, Perl etc, i.e. either
$commit
or ${commit}
.
[[task]]
targets = ["output/${commit}/run.h5"]
dependencies = ["build/bin/model", "data/input.csv"]
language = "Bash"
script = "model run"
The system should infer that the use of $commit
creates
an implicit dependency on running git rev-parse HEAD
.
However, there may be steps in between:
[environment]
output_path = "output/${commit}"
[[task]]
targets = ["${output_path}/run.h5"]
... etc ...
We can trace these variable substitutions using the same lazy evaluation strategy as the workflow itself.
from dataclasses import dataclass, is_dataclass, fields
from string import Template
from typing import Any, Generic, Mapping, TypeVar, cast
from functools import singledispatch
from .lazy import Lazy
= TypeVar("T")
T
@singledispatch
def substitute(template, env: Mapping[str, str]):
= type(template)
dtype if is_dataclass(dtype):
= {
args getattr(template, f.name), env)
f.name: substitute(for f in fields(dtype)
if f.name[0] != "_"
}return dtype(**args)
return template
# raise TypeError(f"Can't perform string substitution on object of type: {dtype}")
@substitute.register
def _(template: str, env: Mapping[str, str]) -> str:
return Template(template).safe_substitute(env)
@substitute.register
def _(template: list, env: Mapping[str, str]) -> list:
return [substitute(x, env) for x in template]
@substitute.register
def _(_template: None, _) -> None:
return None
@singledispatch
def gather_args(template: Any) -> set[str]:
= type(template)
dtype if is_dataclass(dtype):
= (
args getattr(template, f.name))
gather_args(for f in fields(dtype)
if f.name[0] != "_"
)return set().union(*args)
return set()
# raise TypeError(f"Can't perform string substitution on object of type: {dtype}")
@gather_args.register
def _(template: str) -> set[str]:
return set(Template(template).get_identifiers())
@gather_args.register
def _(template: list) -> set[str]:
return set().union(*map(gather_args, template))
@gather_args.register
def _(_template: None) -> set[str]:
return set()
from dataclasses import dataclass
from typing import Iterable, Optional
import pytest
from brei.task import TemplateVariable, Variable
from brei.template_strings import gather_args, substitute
from brei.lazy import LazyDB
class Environment(LazyDB[Variable, TemplateVariable]):
def __setitem__(self, k: str, v: str):
self.add(TemplateVariable([Variable(k)], [], v))
def __getitem__(self, k: str) -> str:
return self.index[Variable(k)].result
def __contains__(self, k: str) -> bool:
return Variable(k) in self.index
def items(self) -> Iterable[str]:
return (k.name for k in self.index if isinstance(k, Variable))
@property
def environment(self):
return self
@pytest.mark.asyncio
async def test_template_string():
= Environment()
env "x"] = "Hello, ${y}!"
env["y"] = "World"
env["z"] = "print('${x}')"
env[await env.run(Variable("z"), db=env)
assert env["x"] == "Hello, World!"
assert env["z"] == "print('Hello, World!')"
@dataclass
class MyData:
list[str]
some_list: str
some_prop: str] = None
some_none: Optional[
def test_template_dtype():
= MyData(
data = ["${x} bar", "bar ${x} bar"],
some_list = "bar ${x}"
some_prop
)
assert gather_args(data) == set("x")
= substitute(data, {"x": "foo"})
subst assert subst.some_list == ["foo bar", "bar foo bar"]
assert subst.some_prop == "bar foo"
assert subst.some_none is None
Tasks
from dataclasses import dataclass
@dataclass
class Runner:
str
command: list[str]
args:
dict[str, Runner] = {
DEFAULT_RUNNERS: "python": Runner("python", ["${script}"]),
"bash": Runner("bash", ["${script}"]),
}
from __future__ import annotations
import asyncio
from contextlib import contextmanager, nullcontext
from copy import copy
from dataclasses import dataclass, field
from pathlib import Path
import re
import string
from tempfile import NamedTemporaryFile
from typing import Any, Optional, TextIO
from asyncio import create_subprocess_exec
from textwrap import indent
import shlex
from .result import TaskFailure
from .lazy import MissingDependency, Lazy, LazyDB, Phony
from .utility import stat
from .logging import logger
from .template_strings import gather_args, substitute
from .runner import Runner, DEFAULT_RUNNERS
= logger()
log
@dataclass
class Variable:
str
name:
def __hash__(self):
return hash(f"var({self.name})")
def __str__(self):
return f"var({self.name})"
def str_to_target(s: str) -> Path | Phony | Variable:
if s[0] == "#":
return Phony(s[1:])
elif m := re.match(r"var\(([^\s\(\)]+)\)", s):
return Variable(m.group(1))
else:
return Path(s)
def is_oneliner(s: str) -> bool:
return len(s.splitlines()) == 1
@dataclass
class Task(Lazy[Path | Phony | Variable, str | None]):
str] = None
name: Optional[str] = None
runner: Optional[= None
path: Optional[Path] str] = None
script: Optional[| Variable] = None
stdin: Optional[Path | Variable] = None
stdout: Optional[Path str] = None
description: Optional[bool = False
force:
@property
def target_paths(self):
return (p for p in self.creates if isinstance(p, Path))
@property
def dependency_paths(self):
return (p for p in self.requires if isinstance(p, Path))
def __str__(self):
= ", ".join(str(t) for t in self.creates)
tgts = ", ".join(str(t) for t in self.requires)
deps if self.script is not None:
= indent(self.script, prefix=" ▎ ", predicate=lambda _: True)
src elif self.path is not None:
= str(self.path)
src else:
= " - "
src = f"{self.name}: " if self.name else ""
name return name + f"[{tgts}] <- [{deps}]\n" + src
def __post_init__(self):
if self.name and Phony(self.name) not in self.creates:
self.creates.append(Phony(self.name))
if self.stdin and self.stdin not in self.requires:
self.requires.append(self.stdin)
if self.path and self.path not in self.requires:
self.requires.append(self.path)
if self.stdout and self.stdout not in self.creates:
self.creates.append(self.stdout)
def always_run(self) -> bool:
return self.force or len(list(self.target_paths)) == 0
def needs_run(self) -> bool:
if any(not p.exists() for p in self.target_paths):
return True
= [stat(p) for p in self.target_paths]
target_stats = [stat(p) for p in self.dependency_paths]
dep_stats if any(t < d for t in target_stats for d in dep_stats):
return True
return False
@contextmanager
def get_script_path(self):
if self.path is not None:
= None
tmpfile = self.path
path elif self.script is not None:
= NamedTemporaryFile("w")
tmpfile self.script)
tmpfile.write(
tmpfile.flush()= Path(tmpfile.name)
path else:
raise ValueError("A `Rule` can have either `path` or `script` defined.")
yield path
if tmpfile is not None:
tmpfile.close()
@contextmanager
def get_stdout(self):
match self.stdout:
case Variable(x):
yield asyncio.subprocess.PIPE
case x if isinstance(x, Path):
= open(x, "w")
stdout yield stdout
stdout.close()case _:
yield None
async def run(self, *, db: TaskDB):
if not self.always_run() and not self.needs_run() and not db.force_run:
= " ".join(f"`{t}`" for t in self.target_paths)
tgts f"Targets {tgts} already up-to-date.")
log.info(return
f"{self}")
log.debug(if (self.path is None and self.script is None):
return
= " ".join(f"`{t}`" for t in self.creates)
targets = self.description or (f"#{self.name}" if self.name else None) \
short_note or f"creating {targets}"
f"[green]{short_note}[/]", extra={'markup': True})
log.info(
| int | None = None
stdin: TextIO match self.stdin:
case Variable(x):
= asyncio.subprocess.PIPE
stdin = db.environment[x].encode()
input_data case x if isinstance(x, Path):
= open(x, "r")
stdin = None
input_data case _:
= None
stdin = None
input_data
if self.runner is None and self.script is not None:
if not is_oneliner(self.script):
assert self.stdin is None
with self.get_stdout() as stdout:
= b""
stdout_data for line in self.script.splitlines():
async with db.throttle or nullcontext():
= await create_subprocess_exec(
proc *shlex.split(line),
=stdin,
stdin=stdout,
stdout=asyncio.subprocess.PIPE,
stderr
)= await proc.communicate(input_data)
stdout_data_part, stderr_data f"return-code {proc.returncode}")
log.debug(if stdout_data_part:
+= stdout_data_part
stdout_data if stderr_data:
f"[gold1]{short_note}[/] %s", stderr_data.decode().rstrip(), extra={"markup": True})
log.info(
elif self.runner is not None:
with self.get_script_path() as path, self.get_stdout() as stdout:
= db.runners[self.runner]
runner = [string.Template(arg).substitute(script=path) for arg in runner.args]
args async with db.throttle or nullcontext():
= await create_subprocess_exec(
proc
runner.command,*args,
=stdin,
stdin=stdout,
stdout=asyncio.subprocess.PIPE,
stderr
)= await proc.communicate(input_data)
stdout_data, stderr_data f"return-code {proc.returncode}")
log.debug(
if stderr_data:
f"[gold1]{short_note}[/] %s", stderr_data.decode().rstrip(), extra={"markup": True})
log.info(
else:
return
if self.needs_run():
raise TaskFailure("Task didn't achieve goals.")
return stdout_data.decode().strip() if stdout_data else None
@dataclass
class TaskProxy:
"""Input to create an actual Task.
An actual `Task` has no template variables remaining and untyped strings
are replaced with `Path` `Variable` or `Phony` objects.
"""
list[str] = field(default_factory=list)
creates: list[str] = field(default_factory=list)
requires: str] = None
name: Optional[str] = None
runner: Optional[str] = None
path: Optional[str] = None
script: Optional[str] = None
stdin: Optional[str] = None
stdout: Optional[str] = None
description: Optional[bool = False
force:
@property
def all_targets(self):
return (
self.creates
+ ([self.stdout] if self.stdout else [])
+ ([f"#{self.name}"] if self.name else [])
)
@property
def all_dependencies(self):
return (
self.requires
+ ([self.stdin] if self.stdin else [])
+ ([self.path] if self.path else [])
)
@dataclass
class TemplateVariable(Lazy[Variable, str]):
str
template:
def __post_init__(self):
self.requires += [Variable(arg) for arg in gather_args(self.template)]
async def run(self, *, db) -> str:
return substitute(self.template, db.environment)
@dataclass
class TemplateTask(Lazy[Path | Phony | Variable, Task]):
template: TaskProxy
def __post_init__(self):
assert not gather_args(self.template.creates)
self.creates += [str_to_target(t) for t in self.template.all_targets]
self.requires += [Variable(arg) for arg in gather_args(self.template)]
async def run(self, *, db):
= substitute(self.template, db.environment)
proxy = [str_to_target(t) for t in proxy.all_targets]
tgts = [str_to_target(t) for t in proxy.all_dependencies]
deps = Path(proxy.path) if proxy.path else None
path = str_to_target(proxy.stdin) if proxy.stdin else None
stdin = str_to_target(proxy.stdout) if proxy.stdout else None
stdout assert not isinstance(stdin, Phony) and not isinstance(stdout, Phony)
= Task(
task
tgts,
deps,
proxy.name,
proxy.runner,
path,
proxy.script,
stdin,
stdout,
proxy.description,
proxy.force
)return task
@dataclass
class TaskDB(LazyDB[Path | Variable | Phony, Task | TemplateTask | TemplateVariable]):
dict[str, Runner] = field(default_factory=lambda: copy(DEFAULT_RUNNERS))
runners: = None
throttle: Optional[asyncio.Semaphore] bool = False
force_run:
def on_missing(self, t: Path | Phony | Variable):
if isinstance(t, Path) and t.exists():
return Task([t], [])
raise MissingDependency()
def is_resolvable(self, s: Any) -> bool:
return all(v in self.index for v in map(Variable, gather_args(s)))
async def resolve_object(self, s: Any) -> Any:
vars = gather_args(s)
await asyncio.gather(*(self.run(Variable(v), db=self) for v in vars))
= substitute(s, self.environment)
result f"substituting {s} => {result}")
log.debug(return result
@property
def environment(self):
return Environment(self)
class Environment:
def __init__(self, db):
self.db = db
def __contains__(self, k: str):
return Variable(k) in self.db.index
def items(self):
return (k.name for k in self.db.index if isinstance(k, Variable))
def __getitem__(self, k: str):
return self.db.index[Variable(k)].result
class Template(TaskProxy):
"""A `Template` can receive the same arguments as a TaskProxy. The difference is
that any template variables in the Template can be substituted with a `TemplateCall`."""
def call(self, args: dict[str, Any]) -> TaskProxy:
return substitute(self, args)
Programs
from __future__ import annotations
import asyncio
from copy import copy
from itertools import chain, product, repeat
from dataclasses import dataclass, field
from enum import Enum
import json
from pathlib import Path
import tomllib
from .template_strings import gather_args
from .logging import logger
from .errors import HelpfulUserError, UserError
from .utility import construct, read_from_file
from .task import (
Variable,
TaskDB,
Template,
Runner,
TaskProxy,
TemplateTask,
TemplateVariable,
)
= logger()
log
@dataclass
class MissingInclude(UserError):
path: Path
def __str__(self):
return f"Include `{self.path}` not found."
@dataclass
class MissingTemplate(UserError):
str
name:
def __str__(self):
return f"Template `{self.name}` not found."
class Join(Enum):
= 1
INNER = 2
OUTER
@dataclass
class TemplateCall:
"""Calls a template with a set of arguments.
Members:
- template: name of the template.
- args: arguments to the call.
- collect: name of the phony target by which to collect all generated targets.
- join: `inner` or `outer` join.
"""
str
template: dict[str, str | list[str]]
args: str | None = None
collect: = Join.INNER
join: Join
@property
def all_args(self):
if all(isinstance(v, str) for v in self.args.values()):
yield self.args
return
if self.join == Join.INNER:
for v in zip(
*map(
lambda x: repeat(x) if isinstance(x, str) else x,
self.args.values(),
)
):yield dict(zip(self.args.keys(), v))
else: # cartesian product
for v in product(
*map(lambda x: [x] if isinstance(x, str) else x, self.args.values())
):yield dict(zip(self.args.keys(), v))
@dataclass
class Program:
"""A Brei program.
Members:
- task: list of tasks.
- environment: variables.
- template: set of templates.
- call: list of calls to templates.
- include: list of includes.
- runner: extra configured task runners.
"""
list[TaskProxy] = field(default_factory=list)
task: dict[str, str] = field(default_factory=dict)
environment: dict[str, Template] = field(default_factory=dict)
template: list[TemplateCall] = field(default_factory=list)
call: list[str] = field(default_factory=list)
include: dict[str, Runner] = field(default_factory=dict)
runner:
@staticmethod
def read(path: Path, section: str | None = None) -> Program:
return read_from_file(Program, path, section)
def tasks_from_call(template: Template, call: TemplateCall) -> list[TaskProxy]:
= [template.call(args) for args in call.all_args]
tasks if call.collect:
= list(chain.from_iterable(t.all_targets for t in tasks))
targets = TaskProxy([], targets, name=call.collect)
collection return tasks + [collection]
else:
return tasks
async def resolve_delayed(db: TaskDB, tasks: list[TaskProxy]) -> list[TaskProxy]:
"""Resolve `tasks` (substituting variables in targets).
Returns: list of unresolvable tasks.
"""
async def resolve(task: TaskProxy) -> TaskProxy | None:
if not db.is_resolvable(task.all_targets):
return task
= await db.resolve_object(task)
tt
db.add(TemplateTask([], [], tt))return None
return [t for t in await asyncio.gather(*map(resolve, tasks)) if t]
async def resolve_tasks(program: Program) -> TaskDB:
"""Resolve a program. A resolved program has all of its includes and
template calls done, so that only tasks remains. In order to resolve
a program, some tasks may need to be run. Variables that appear in
the `creates` field of a task (aka targets), will be resolved eagerly.
Returns: TaskDB instance.
"""
= TaskDB()
db = dict()
template_index
async def go(program: Program):
for var, template in program.environment.items():
db.add(TemplateVariable([Variable(var)], [], template))
= copy(program.task)
task_templates
template_index.update(program.template)list[TemplateCall] = []
delayed_calls: list[TaskProxy] = []
delayed_templates:
db.runners.update(program.runner)
for c in program.call:
if c.template not in template_index:
log.debug("template `%s` not available, waiting for includes to resolve",
c.template,
)
delayed_calls.append(c)continue
task_templates.extend(tasks_from_call(template_index[c.template], c))
for tt in task_templates:
# we could check for resolvability here, but I don't like the
# idea that order then matters. this way the rule is:
# > if a task has a templated target, those variables should be
# > resolvable after all other tasks were added, seeing that the
# > task to resolve these variables can't have templated targets
# > themselves.
if gather_args(tt.all_targets):
delayed_templates.append(tt)else:
db.add(TemplateTask([], [], tt))
= await resolve_delayed(db, delayed_templates)
delayed_templates
for inc in program.include:
= Path(await db.resolve_object(inc))
incp if incp in db.index:
await db.run(incp, db=db)
if not incp.exists():
raise MissingInclude(incp)
= Program.read(incp)
prg await go(prg)
for c in delayed_calls:
if c.template not in template_index:
log.debug("template `%s` still not available, now this is an error", c.template
)raise MissingTemplate(c.template)
for tt in tasks_from_call(template_index[c.template], c):
if gather_args(tt.creates):
delayed_templates.append(tt)else:
db.add(TemplateTask([], [], tt))
= await resolve_delayed(db, delayed_templates)
delayed_templates if delayed_templates:
= [p for t in delayed_templates for p in t.creates if not db.is_resolvable(p)]
unresolvable raise UserError(f"Task has unresolvable targets: {unresolvable}")
return db
return await go(program)
Utils
from __future__ import annotations
from dataclasses import dataclass
from pathlib import Path
from datetime import datetime
import os
from .construct import construct, FromStr, read_from_file
def normal_relative(path: Path) -> Path:
return path.resolve() # .relative_to(Path.cwd())
@dataclass
class FileStat:
path: Path
modified: datetime
@staticmethod
def from_path(path: Path):
= os.stat(path)
stat return FileStat(path, datetime.fromtimestamp(stat.st_mtime))
def __lt__(self, other: FileStat) -> bool:
return self.modified < other.modified
def stat(path: Path) -> FileStat:
= normal_relative(path)
path return FileStat.from_path(path)
from dataclasses import dataclass
import time
from contextlib import asynccontextmanager
@dataclass
class Elapsed:
float | None = None
elapsed:
@asynccontextmanager
async def timer():
= Elapsed()
e = time.perf_counter()
t yield e
= time.perf_counter() - t e.elapsed
import typing
from typing import Any, Self, Union, TypeVar, TypeGuard, Type, Optional, cast
import types
import tomllib
import json
from dataclasses import is_dataclass
from enum import Enum
from pathlib import Path
from .errors import HelpfulUserError, InputError
= TypeVar("T")
T
def isgeneric(annot):
return typing.get_origin(annot) and hasattr(annot, "__args__")
class FromStr:
@classmethod
def from_str(cls, _: str) -> Self:
raise NotImplementedError()
def construct(annot: Any, json: Any) -> Any:
try:
return _construct(annot, json)
except (AssertionError, ValueError) as e:
raise InputError(annot, json) from e
def is_object_type(dtype: Type[Any]) -> TypeGuard[Type[dict[str, Any]]]:
return (
isgeneric(dtype)and typing.get_origin(dtype) is dict
and typing.get_args(dtype)[0] is str
)
def is_optional_type(dtype: Type[Any]) -> TypeGuard[Type[Optional[Any]]]:
return (
isgeneric(dtype)and typing.get_origin(dtype) is Union
and typing.get_args(dtype)[1] is types.NoneType
)
def _construct(annot: Type[T], json: Any) -> T:
"""Construct an object from a given type from a JSON stream.
The `annot` type should be one of: str, int, list[T], Optional[T],
or a dataclass, and the JSON data should match exactly the given
definitions in the dataclass hierarchy.
"""
if annot is bool:
assert isinstance(json, bool)
return cast(T, json)
if annot is str:
assert isinstance(json, str)
return cast(T, json)
if annot is int:
assert isinstance(json, int)
return cast(T, json)
if is_object_type(annot):
assert isinstance(json, dict)
return cast(
1], v) for k, v in json.items()}
T, {k: construct(typing.get_args(annot)[
)if annot is Any:
return cast(T, json)
# if annot is dict or isgeneric(annot) and typing.get_origin(annot) is dict:
# assert isinstance(json, dict)
# return json
if annot is Path and isinstance(json, str):
return cast(T, Path(json))
if isgeneric(annot) and typing.get_origin(annot) is list:
assert isinstance(json, list)
return cast(T, [construct(typing.get_args(annot)[0], item) for item in json])
if is_optional_type(annot):
if json is None:
return cast(T, None)
else:
return cast(T, construct(typing.get_args(annot)[0], json))
if isgeneric(annot) and typing.get_origin(annot) is types.UnionType:
for dtype in typing.get_args(annot):
try:
return cast(T, _construct(dtype, json))
except ValueError:
continue
except AssertionError:
continue
raise ValueError("None of the choices in type union match data.")
if type(annot) is type and issubclass(annot, FromStr) and isinstance(json, str):
return cast(T, annot.from_str(json))
if is_dataclass(annot):
assert isinstance(json, dict)
= typing.get_type_hints(annot)
arg_annot # assert all(k in json for k in arg_annot)
= {k: construct(arg_annot[k], json[k]) for k in json}
args return cast(T, annot(**args))
if isinstance(json, str) and isinstance(annot, type) and issubclass(annot, Enum):
= {opt.name.lower(): opt for opt in annot}
options assert json.lower() in options
return cast(T, options[json.lower()])
raise ValueError(f"Couldn't construct {annot} from {repr(json)}")
def read_from_file(data_type: Type[T], path: Path, section: Optional[str] = None) -> T:
"""Read a config from given `path` in given `section`. The path should refer to
a TOML or JSON file that should decode to a `Config` object. If `section` is given, only
that section is decoded to a `Config` object. The `section` string may contain
periods to indicate deeper nesting.
Example:
```python
read_from_file(Config, Path("./pyproject.toml"), "tool.loom")
```
"""
if not path.exists():
raise HelpfulUserError(f"File not found: {path}")
with open(path, "rb") as f:
if path.suffix == ".toml":
= tomllib.load(f)
data: Any elif path.suffix == ".json":
= json.load(f)
data else:
raise HelpfulUserError(f"Unrecognized file format: {path}")
try:
if section is not None:
for s in section.split("."):
= data[s]
data except KeyError as e:
raise HelpfulUserError(
f"Data file `{path}` should contain section `{section}`."
from e
)
return construct(data_type, data)