brei

Welcome to Brei's API documentation. There are two ways to use Brei: from the command-line (in which case we refer to the homepage for documentation), or straight from Python. The easiest function to work with is brei(), which links to the command-line app one-to-one.

Program

If you want to read the Program yourself, there are several ways to do so:

  1. Use Program.read(). You give it a Path to a TOML or JSON file and a section, this last bit giving a object path into the data. For instance: Program.read(Path("pyproject.toml"), "tool.brei").
  2. Read your own data format into JSON compatible data, then construct(Program, data). The construct function uses the type annotations in dataclasses to validate the input data.

After reading the data, you'll want to resolve all tasks, i.e. perform includes and run any necessary task to resolve the targets of all other tasks.

program = Program.read(Path("brei.toml"))
db: TaskDB = await resolve_tasks(program)
await db.run(Phony("all"))

There are three kinds of targets: pathlib.Path, Phony and Variable.

API

 1# ~/~ begin <<docs/implementation.md#brei/__init__.py>>[init]
 2"""
 3Welcome to Brei's API documentation. There are two ways to use Brei: from the
 4command-line (in which case we refer to the homepage for documentation), or
 5straight from Python. The easiest function to work with is `brei()`, which
 6links to the command-line app one-to-one.
 7
 8## Program
 9If you want to read the `Program` yourself, there are several ways to do so:
10
111. Use `Program.read()`. You give it a `Path` to a TOML or JSON file and a
12section, this last bit giving a object path into the data. For instance:
13`Program.read(Path("pyproject.toml"), "tool.brei")`.
142. Read your own data format into JSON compatible data, then
15`construct(Program, data)`. The `construct` function uses the type annotations
16in dataclasses to validate the input data.
17
18After reading the data, you'll want to resolve all tasks, i.e. perform includes
19and run any necessary task to resolve the targets of all other tasks.
20
21    program = Program.read(Path("brei.toml"))
22    db: TaskDB = await resolve_tasks(program)
23    await db.run(Phony("all"))
24
25There are three kinds of targets: `pathlib.Path`, `Phony` and `Variable`.
26
27## API
28"""
29
30from .program import Program, resolve_tasks, TemplateCall
31from .construct import construct
32from .lazy import Lazy, LazyDB
33from .task import Task, TaskDB, Phony, Variable, TaskProxy, Template
34from .runner import Runner
35from .cli import brei
36
37__all__ = [
38    "brei",
39
40    "Lazy", "LazyDB", "Phony", "Program", "Runner", "Task", "TaskDB",
41    "TaskProxy", "Template", "TemplateCall", "Variable", "construct",
42    "resolve_tasks",
43]
44# ~/~ end
@argh.arg('targets', nargs='*', help='names of tasks to run')
@argh.arg('-i', '--input-file', help='Brei TOML or JSON file, use a `[...]` suffix to indicate a subsection.')
@argh.arg('-B', '--force-run', help='rebuild all dependencies')
@argh.arg('-j', '--jobs', help='limit number of concurrent jobs')
@argh.arg('-v', '--version', help='print version number and exit')
@argh.arg('--list-runners', help='show default configured runners')
@argh.arg('--debug', help='more verbose logging')
def brei( targets: list[str], *, input_file: Optional[str] = None, force_run: bool = False, jobs: Optional[int] = None, version: bool = False, list_runners: bool = False, debug: bool = False):
 48@argh.arg("targets", nargs="*", help="names of tasks to run")
 49@argh.arg(
 50    "-i",
 51    "--input-file",
 52    help="Brei TOML or JSON file, use a `[...]` suffix to indicate a subsection.",
 53)
 54@argh.arg("-B", "--force-run", help="rebuild all dependencies")
 55@argh.arg("-j", "--jobs", help="limit number of concurrent jobs")
 56@argh.arg("-v", "--version", help="print version number and exit")
 57@argh.arg("--list-runners", help="show default configured runners")
 58@argh.arg("--debug", help="more verbose logging")
 59def brei(
 60    targets: list[str],
 61    *,
 62    input_file: Optional[str] = None,
 63    force_run: bool = False,
 64    jobs: Optional[int] = None,
 65    version: bool = False,
 66    list_runners: bool = False,
 67    debug: bool = False
 68):
 69    """Build one of the configured targets."""
 70    if version:
 71        print(f"Brei {__version__}, Copyright (c) 2023 Netherlands eScience Center.")
 72        print("Licensed under the Apache License, Version 2.0.")
 73        sys.exit(0)
 74
 75    if list_runners:
 76        t = Table(title="Default Runners", header_style="italic green", show_edge=False)
 77        t.add_column("runner", style="bold yellow")
 78        t.add_column("executable")
 79        t.add_column("arguments")
 80        for r, c in DEFAULT_RUNNERS.items():
 81            t.add_row(r, c.command, f"{c.args}")
 82        console = Console()
 83        console.print(t)
 84        sys.exit(0)
 85
 86    if input_file is not None:
 87        if m := re.match(r"([^\[\]]+)\[([^\[\]\s]+)\]", input_file):
 88            input_path = Path(m.group(1))
 89            section = m.group(2)
 90        else:
 91            input_path = Path(input_file)
 92            section = None
 93
 94        program = read_from_file(Program, input_path, section)
 95
 96    elif Path("brei.toml").exists():
 97        program = read_from_file(Program, Path("brei.toml"))
 98
 99    elif Path("pyproject.toml").exists():
100        with open("pyproject.toml", "rb") as f_in:
101            data = tomllib.load(f_in)
102        try:
103            for s in ["tool", "brei"]:
104                data = data[s]
105        except KeyError as e:
106            raise HelpfulUserError(
107                f"With out the `-f` argument, Brei looks for `brei.toml` first, then for "
108                f"a `[tool.brei]` section in `pyproject.toml`. A `pyproject.toml` file was "
109                f"found, but contained no `[tool.brei]` section."
110            ) from e
111
112        program = construct(Program, data)
113    else:
114        raise HelpfulUserError(
115            "No input file given, no `loom.toml` found and no `pyproject.toml` found."
116        )
117
118    jobs = int(jobs) if jobs else None
119    configure_logger(debug)
120    try:
121        asyncio.run(main(program, targets, force_run, jobs))
122    except UserError as e:
123        log.error(f"Failed: {e}")

Build one of the configured targets.

@dataclass
class Lazy(typing.Generic[~T, ~R]):
 37@dataclass
 38class Lazy(Generic[T, R]):
 39    """Base class for tasks that are tagged with type `T` (usually `str` or
 40    `Path`) and representing values of type `R`.
 41
 42    To implement a specific task, you need to implement the asynchronous
 43    `run` method, which should return a value of `R` or throw `TaskFailure`.
 44
 45    Attributes:
 46        targets: list of target identifiers, for instance paths that are
 47            generated by running a particular task.
 48        dependencies: list of dependency identifiers. All of these need to
 49            be realized before the task can run.
 50        result (property): value of the result, once the task was run. This
 51            throws an exception if accessed before the task is complete.
 52    """
 53
 54    creates: list[T]
 55    requires: list[T]
 56
 57    _lock: asyncio.Lock = field(default_factory=asyncio.Lock, init=False)
 58    _result: Optional[Result[R]] = field(default=None, init=False)
 59
 60    @property
 61    def real_requirements(self) -> list[T]:
 62        return [d for d in self.requires if not isinstance(d, Phony)]
 63
 64    def __bool__(self):
 65        return self._result is not None and bool(self._result)
 66
 67    @property
 68    def result(self) -> R:
 69        if self._result is None:
 70            raise ValueError("Task has not run yet.")
 71        if not self._result:
 72            raise ValueError("Task has failed.")
 73        assert isinstance(self._result, Ok)
 74        if isinstance(self._result.value, Lazy):
 75            return self._result.value.result
 76        return self._result.value
 77
 78    async def run(self, *, db) -> R:
 79        raise NotImplementedError()
 80
 81    async def run_after_deps(self, recurse, visited: dict[T, None], **kwargs) -> Result[R]:
 82        dep_res = await asyncio.gather(
 83            *(recurse(dep, copy(visited), **kwargs) for dep in self.requires)
 84        )
 85        if not all(dep_res):
 86            return DependencyFailure(
 87                {k: v for (k, v) in zip(self.requires, dep_res) if not v}
 88            )
 89        try:
 90            return Ok(await self.run(**kwargs))
 91        except TaskFailure as f:
 92            return f
 93
 94    async def run_cached(self, recurse, visited: dict[T, None], **kwargs) -> Result[R]:
 95        async with self._lock:
 96            if self._result is not None:
 97                return self._result
 98            self._result = await self.run_after_deps(recurse, visited, **kwargs)
 99            return self._result
100
101    def reset(self):
102        self._result = None
103
104    def fields(self):
105        return {f.name: getattr(self, f.name) for f in fields(self) if f.name[0] != "_"}

Base class for tasks that are tagged with type T (usually str or Path) and representing values of type R.

To implement a specific task, you need to implement the asynchronous run method, which should return a value of R or throw TaskFailure.

Attributes: targets: list of target identifiers, for instance paths that are generated by running a particular task. dependencies: list of dependency identifiers. All of these need to be realized before the task can run. result (property): value of the result, once the task was run. This throws an exception if accessed before the task is complete.

Lazy(creates: list[~T], requires: list[~T])
creates: list[~T]
requires: list[~T]
real_requirements: list[~T]
60    @property
61    def real_requirements(self) -> list[T]:
62        return [d for d in self.requires if not isinstance(d, Phony)]
result: ~R
67    @property
68    def result(self) -> R:
69        if self._result is None:
70            raise ValueError("Task has not run yet.")
71        if not self._result:
72            raise ValueError("Task has failed.")
73        assert isinstance(self._result, Ok)
74        if isinstance(self._result.value, Lazy):
75            return self._result.value.result
76        return self._result.value
async def run(self, *, db) -> ~R:
78    async def run(self, *, db) -> R:
79        raise NotImplementedError()
async def run_after_deps( self, recurse, visited: dict[~T, None], **kwargs) -> Union[brei.result.Failure, brei.result.Ok[~R]]:
81    async def run_after_deps(self, recurse, visited: dict[T, None], **kwargs) -> Result[R]:
82        dep_res = await asyncio.gather(
83            *(recurse(dep, copy(visited), **kwargs) for dep in self.requires)
84        )
85        if not all(dep_res):
86            return DependencyFailure(
87                {k: v for (k, v) in zip(self.requires, dep_res) if not v}
88            )
89        try:
90            return Ok(await self.run(**kwargs))
91        except TaskFailure as f:
92            return f
async def run_cached( self, recurse, visited: dict[~T, None], **kwargs) -> Union[brei.result.Failure, brei.result.Ok[~R]]:
94    async def run_cached(self, recurse, visited: dict[T, None], **kwargs) -> Result[R]:
95        async with self._lock:
96            if self._result is not None:
97                return self._result
98            self._result = await self.run_after_deps(recurse, visited, **kwargs)
99            return self._result
def reset(self):
101    def reset(self):
102        self._result = None
def fields(self):
104    def fields(self):
105        return {f.name: getattr(self, f.name) for f in fields(self) if f.name[0] != "_"}
@dataclass
class LazyDB(typing.Generic[~T, ~TaskT]):
114@dataclass
115class LazyDB(Generic[T, TaskT]):
116    """Collect tasks and coordinate running a task from a task identifier."""
117
118    tasks: list[TaskT] = field(default_factory=list)
119    index: dict[T, TaskT] = field(default_factory=dict)
120
121    async def run(self, t: T, visited: dict[T, None] | None = None, **kwargs) -> Result[R]:
122        visited = visited or dict()
123        if t in visited:
124            raise CyclicWorkflowError(list(visited.keys()))
125        visited[t] = None
126
127        if t not in self.index:
128            try:
129                task = self.on_missing(t)
130            except MissingDependency:
131                return MissingFailure(t)
132        else:
133            task = self.index[t]
134
135        while True:
136            match (result := await task.run_cached(self.run, visited, **kwargs)):
137                case Ok(x) if isinstance(x, Lazy):
138                    task = cast(TaskT, x)
139                case _:
140                    return result
141
142    def on_missing(self, _: T) -> TaskT:
143        raise MissingDependency()
144
145    def add(self, task: TaskT):
146        """Add a task to the DB."""
147        log.debug(f"adding task ===\n{task}")
148        self.tasks.append(task)
149        for target in task.creates:
150            self.index[target] = task
151
152    def clean(self):
153        self.tasks = []
154        self.index = {}
155
156    def reset(self):
157        for t in self.tasks:
158            t.reset()

Collect tasks and coordinate running a task from a task identifier.

LazyDB(tasks: list[~TaskT] = <factory>, index: dict[~T, ~TaskT] = <factory>)
tasks: list[~TaskT]
index: dict[~T, ~TaskT]
async def run( self, t: ~T, visited: dict[~T, None] | None = None, **kwargs) -> Union[brei.result.Failure, brei.result.Ok[~R]]:
121    async def run(self, t: T, visited: dict[T, None] | None = None, **kwargs) -> Result[R]:
122        visited = visited or dict()
123        if t in visited:
124            raise CyclicWorkflowError(list(visited.keys()))
125        visited[t] = None
126
127        if t not in self.index:
128            try:
129                task = self.on_missing(t)
130            except MissingDependency:
131                return MissingFailure(t)
132        else:
133            task = self.index[t]
134
135        while True:
136            match (result := await task.run_cached(self.run, visited, **kwargs)):
137                case Ok(x) if isinstance(x, Lazy):
138                    task = cast(TaskT, x)
139                case _:
140                    return result
def on_missing(self, _: ~T) -> ~TaskT:
142    def on_missing(self, _: T) -> TaskT:
143        raise MissingDependency()
def add(self, task: ~TaskT):
145    def add(self, task: TaskT):
146        """Add a task to the DB."""
147        log.debug(f"adding task ===\n{task}")
148        self.tasks.append(task)
149        for target in task.creates:
150            self.index[target] = task

Add a task to the DB.

def clean(self):
152    def clean(self):
153        self.tasks = []
154        self.index = {}
def reset(self):
156    def reset(self):
157        for t in self.tasks:
158            t.reset()
@dataclass
class Phony(brei.construct.FromStr):
20@dataclass
21class Phony(FromStr):
22    name: str
23
24    @classmethod
25    def from_str(cls, s: str) -> Phony:
26        if s[0] == "#":
27            return Phony(s[1:])
28        raise ValueError("A phony target should start with a `#` character.")
29
30    def __str__(self):
31        return f"#{self.name}"
32
33    def __hash__(self):
34        return hash(str(self))
Phony(name: str)
name: str
@classmethod
def from_str(cls, s: str) -> Phony:
24    @classmethod
25    def from_str(cls, s: str) -> Phony:
26        if s[0] == "#":
27            return Phony(s[1:])
28        raise ValueError("A phony target should start with a `#` character.")
@dataclass
class Program:
 93@dataclass
 94class Program:
 95    """A Brei program.
 96
 97    Members:
 98
 99      - task: list of tasks.
100      - environment: variables.
101      - template: set of templates.
102      - call: list of calls to templates.
103      - include: list of includes.
104      - runner: extra configured task runners.
105    """
106    task: list[TaskProxy] = field(default_factory=list)
107    environment: dict[str, str] = field(default_factory=dict)
108    template: dict[str, Template] = field(default_factory=dict)
109    call: list[TemplateCall] = field(default_factory=list)
110    include: list[str] = field(default_factory=list)
111    runner: dict[str, Runner] = field(default_factory=dict)
112
113    @staticmethod
114    def read(path: Path, section: str | None = None) -> Program:
115        return read_from_file(Program, path, section)

A Brei program.

Members:

  • task: list of tasks.
  • environment: variables.
  • template: set of templates.
  • call: list of calls to templates.
  • include: list of includes.
  • runner: extra configured task runners.
Program( task: list[TaskProxy] = <factory>, environment: dict[str, str] = <factory>, template: dict[str, Template] = <factory>, call: list[TemplateCall] = <factory>, include: list[str] = <factory>, runner: dict[str, Runner] = <factory>)
task: list[TaskProxy]
environment: dict[str, str]
template: dict[str, Template]
call: list[TemplateCall]
include: list[str]
runner: dict[str, Runner]
@staticmethod
def read(path: pathlib.Path, section: str | None = None) -> Program:
113    @staticmethod
114    def read(path: Path, section: str | None = None) -> Program:
115        return read_from_file(Program, path, section)
@dataclass
class Runner:
6@dataclass
7class Runner:
8    command: str
9    args: list[str]
Runner(command: str, args: list[str])
command: str
args: list[str]
 55@dataclass
 56class Task(Lazy[Path | Phony | Variable, str | None]):
 57    name: Optional[str] = None
 58    runner: Optional[str] = None
 59    path: Optional[Path] = None
 60    script: Optional[str] = None
 61    stdin: Optional[Path | Variable] = None
 62    stdout: Optional[Path | Variable] = None
 63    description: Optional[str] = None
 64    force: bool = False
 65
 66    @property
 67    def target_paths(self):
 68        return (p for p in self.creates if isinstance(p, Path))
 69
 70    @property
 71    def dependency_paths(self):
 72        return (p for p in self.requires if isinstance(p, Path))
 73
 74    @property
 75    def digest(self) -> str | None:
 76        if self.script is None:
 77            return None
 78        return hashlib.md5(self.script.encode()).hexdigest()
 79
 80    def __str__(self):
 81        tgts = ", ".join(str(t) for t in self.creates)
 82        deps = ", ".join(str(t) for t in self.requires)
 83        if self.script is not None:
 84            src = indent(self.script, prefix=" â–Ž ", predicate=lambda _: True)
 85        elif self.path is not None:
 86            src = str(self.path)
 87        else:
 88            src = " - "
 89        name = f"{self.name}: " if self.name else ""
 90        return name + f"[{tgts}] <- [{deps}]\n" + src
 91
 92    def __post_init__(self):
 93        if self.name and Phony(self.name) not in self.creates:
 94            self.creates.append(Phony(self.name))
 95        if self.stdin and self.stdin not in self.requires:
 96            self.requires.append(self.stdin)
 97        if self.path and self.path not in self.requires:
 98            self.requires.append(self.path)
 99        if self.stdout and self.stdout not in self.creates:
100            self.creates.append(self.stdout)
101
102    def always_run(self) -> bool:
103        return self.force or len(list(self.target_paths)) == 0
104
105    def needs_run(self, db: TaskDB) -> bool:
106        if any(not p.exists() for p in self.target_paths):
107            return True
108        target_stats = [stat(p) for p in self.target_paths]
109        dep_stats = [stat(p) for p in self.dependency_paths]
110        if any(t < d for t in target_stats for d in dep_stats):
111            return True
112        if any(self.digest != db.history.get(p, None) for p in self.target_paths):
113            return True
114        return False
115
116    @contextmanager
117    def get_script_path(self):
118        if self.path is not None:
119            tmpfile = None
120            path = self.path
121        elif self.script is not None:
122            tmpfile = NamedTemporaryFile("w")
123            tmpfile.write(self.script)
124            tmpfile.flush()
125            path = Path(tmpfile.name)
126        else:
127            raise ValueError("A `Rule` can have either `path` or `script` defined.")
128
129        yield path
130
131        if tmpfile is not None:
132            tmpfile.close()
133
134    @contextmanager
135    def get_stdout(self):
136        match self.stdout:
137            case Variable(x):
138                yield asyncio.subprocess.PIPE
139            case x if isinstance(x, Path):
140                stdout = open(x, "w")
141                yield stdout
142                stdout.close()
143            case _:
144                yield None
145
146    async def run(self, *, db: TaskDB):
147        if not self.always_run() and not self.needs_run(db) and not db.force_run:
148            tgts = " ".join(f"`{t}`" for t in self.target_paths)
149            log.info(f"Targets {tgts} already up-to-date.")
150            return
151
152        log.debug(f"{self}")
153        if (self.path is None and self.script is None):
154            return
155
156        targets = " ".join(f"`{t}`" for t in self.creates)
157        short_note = self.description or (f"#{self.name}" if self.name else None) \
158            or f"creating {targets}"
159        log.info(f"[green]{short_note}[/]", extra={'markup': True})
160
161        stdin: TextIO | int | None = None
162        match self.stdin:
163            case Variable(x):
164                stdin = asyncio.subprocess.PIPE
165                input_data = db.environment[x].encode()
166            case x if isinstance(x, Path):
167                stdin = open(x, "r")
168                input_data = None
169            case _:
170                stdin = None
171                input_data = None
172
173        if self.runner is None and self.script is not None:
174            if not is_oneliner(self.script):
175                assert self.stdin is None
176
177            
178            with self.get_stdout() as stdout:
179                stdout_data = b""
180                for line in self.script.splitlines():
181                    async with db.throttle or nullcontext():
182                        proc = await create_subprocess_exec(
183                            *shlex.split(line),
184                            stdin=stdin,
185                            stdout=stdout,
186                            stderr=asyncio.subprocess.PIPE,
187                        )
188                        stdout_data_part, stderr_data = await proc.communicate(input_data)
189                        log.debug(f"return-code {proc.returncode}")
190                    if stdout_data_part:
191                        stdout_data += stdout_data_part
192                    if stderr_data:
193                        log.info(f"[gold1]{short_note}[/] %s", stderr_data.decode().rstrip(), extra={"markup": True})
194
195        elif self.runner is not None:
196            with self.get_script_path() as path, self.get_stdout() as stdout:
197                runner = db.runners[self.runner]
198                args = [string.Template(arg).substitute(script=path) for arg in runner.args]
199                async with db.throttle or nullcontext():
200                    proc = await create_subprocess_exec(
201                        runner.command,
202                        *args,
203                        stdin=stdin,
204                        stdout=stdout,
205                        stderr=asyncio.subprocess.PIPE,
206                    )
207                    stdout_data, stderr_data = await proc.communicate(input_data)
208                    log.debug(f"return-code {proc.returncode}")
209
210            if stderr_data:
211                log.info(f"[gold1]{short_note}[/] %s", stderr_data.decode().rstrip(), extra={"markup": True})
212
213        else:
214            return
215
216        for p in self.target_paths:
217            db.history[p] = self.digest
218
219        if self.needs_run(db):
220            raise TaskFailure("Task didn't achieve goals.")
221
222        return stdout_data.decode().strip() if stdout_data else None
Task( creates: 'list[T]', requires: 'list[T]', name: Optional[str] = None, runner: Optional[str] = None, path: Optional[pathlib.Path] = None, script: Optional[str] = None, stdin: Union[pathlib.Path, Variable, NoneType] = None, stdout: Union[pathlib.Path, Variable, NoneType] = None, description: Optional[str] = None, force: bool = False)
name: Optional[str] = None
runner: Optional[str] = None
path: Optional[pathlib.Path] = None
script: Optional[str] = None
stdin: Union[pathlib.Path, Variable, NoneType] = None
stdout: Union[pathlib.Path, Variable, NoneType] = None
description: Optional[str] = None
force: bool = False
target_paths
66    @property
67    def target_paths(self):
68        return (p for p in self.creates if isinstance(p, Path))
dependency_paths
70    @property
71    def dependency_paths(self):
72        return (p for p in self.requires if isinstance(p, Path))
digest: str | None
74    @property
75    def digest(self) -> str | None:
76        if self.script is None:
77            return None
78        return hashlib.md5(self.script.encode()).hexdigest()
def always_run(self) -> bool:
102    def always_run(self) -> bool:
103        return self.force or len(list(self.target_paths)) == 0
def needs_run(self, db: TaskDB) -> bool:
105    def needs_run(self, db: TaskDB) -> bool:
106        if any(not p.exists() for p in self.target_paths):
107            return True
108        target_stats = [stat(p) for p in self.target_paths]
109        dep_stats = [stat(p) for p in self.dependency_paths]
110        if any(t < d for t in target_stats for d in dep_stats):
111            return True
112        if any(self.digest != db.history.get(p, None) for p in self.target_paths):
113            return True
114        return False
@contextmanager
def get_script_path(self):
116    @contextmanager
117    def get_script_path(self):
118        if self.path is not None:
119            tmpfile = None
120            path = self.path
121        elif self.script is not None:
122            tmpfile = NamedTemporaryFile("w")
123            tmpfile.write(self.script)
124            tmpfile.flush()
125            path = Path(tmpfile.name)
126        else:
127            raise ValueError("A `Rule` can have either `path` or `script` defined.")
128
129        yield path
130
131        if tmpfile is not None:
132            tmpfile.close()
@contextmanager
def get_stdout(self):
134    @contextmanager
135    def get_stdout(self):
136        match self.stdout:
137            case Variable(x):
138                yield asyncio.subprocess.PIPE
139            case x if isinstance(x, Path):
140                stdout = open(x, "w")
141                yield stdout
142                stdout.close()
143            case _:
144                yield None
async def run(self, *, db: TaskDB):
146    async def run(self, *, db: TaskDB):
147        if not self.always_run() and not self.needs_run(db) and not db.force_run:
148            tgts = " ".join(f"`{t}`" for t in self.target_paths)
149            log.info(f"Targets {tgts} already up-to-date.")
150            return
151
152        log.debug(f"{self}")
153        if (self.path is None and self.script is None):
154            return
155
156        targets = " ".join(f"`{t}`" for t in self.creates)
157        short_note = self.description or (f"#{self.name}" if self.name else None) \
158            or f"creating {targets}"
159        log.info(f"[green]{short_note}[/]", extra={'markup': True})
160
161        stdin: TextIO | int | None = None
162        match self.stdin:
163            case Variable(x):
164                stdin = asyncio.subprocess.PIPE
165                input_data = db.environment[x].encode()
166            case x if isinstance(x, Path):
167                stdin = open(x, "r")
168                input_data = None
169            case _:
170                stdin = None
171                input_data = None
172
173        if self.runner is None and self.script is not None:
174            if not is_oneliner(self.script):
175                assert self.stdin is None
176
177            
178            with self.get_stdout() as stdout:
179                stdout_data = b""
180                for line in self.script.splitlines():
181                    async with db.throttle or nullcontext():
182                        proc = await create_subprocess_exec(
183                            *shlex.split(line),
184                            stdin=stdin,
185                            stdout=stdout,
186                            stderr=asyncio.subprocess.PIPE,
187                        )
188                        stdout_data_part, stderr_data = await proc.communicate(input_data)
189                        log.debug(f"return-code {proc.returncode}")
190                    if stdout_data_part:
191                        stdout_data += stdout_data_part
192                    if stderr_data:
193                        log.info(f"[gold1]{short_note}[/] %s", stderr_data.decode().rstrip(), extra={"markup": True})
194
195        elif self.runner is not None:
196            with self.get_script_path() as path, self.get_stdout() as stdout:
197                runner = db.runners[self.runner]
198                args = [string.Template(arg).substitute(script=path) for arg in runner.args]
199                async with db.throttle or nullcontext():
200                    proc = await create_subprocess_exec(
201                        runner.command,
202                        *args,
203                        stdin=stdin,
204                        stdout=stdout,
205                        stderr=asyncio.subprocess.PIPE,
206                    )
207                    stdout_data, stderr_data = await proc.communicate(input_data)
208                    log.debug(f"return-code {proc.returncode}")
209
210            if stderr_data:
211                log.info(f"[gold1]{short_note}[/] %s", stderr_data.decode().rstrip(), extra={"markup": True})
212
213        else:
214            return
215
216        for p in self.target_paths:
217            db.history[p] = self.digest
218
219        if self.needs_run(db):
220            raise TaskFailure("Task didn't achieve goals.")
221
222        return stdout_data.decode().strip() if stdout_data else None
303@dataclass
304class TaskDB(LazyDB[Path | Variable | Phony, Task | TemplateTask | TemplateVariable]):
305    runners: dict[str, Runner] = field(default_factory=lambda: copy(DEFAULT_RUNNERS))
306    throttle: Optional[asyncio.Semaphore] = None
307    force_run: bool = False
308    history_path: Path | None = None
309    history: dict[Path, str | None] = field(default_factory=dict)
310
311    @contextmanager
312    def persistent_history(self):
313        if not self.history_path:
314            yield
315        else:
316            self.read_history(self.history_path)
317            yield
318            self.write_history(self.history_path)
319
320    def read_history(self, history_path: Path):
321        if not history_path.exists():
322            return
323
324        with open(history_path, "r") as f_in:
325            data = json.load(f_in)
326            for k, v in data.items():
327                self.history[Path(k)] = v
328
329    def write_history(self, history_path: Path):
330        with open(history_path, "w") as f_out:
331            json.dump({str(k): v for k, v in self.history.items()}, f_out, indent=2)
332
333    def on_missing(self, t: Path | Phony | Variable):
334        if isinstance(t, Path) and t.exists():
335            return Task([t], [])
336        raise MissingDependency()
337
338    def is_resolvable(self, s: Any) -> bool:
339        return all(v in self.index for v in map(Variable, gather_args(s)))
340
341    async def resolve_object(self, s: Any) -> Any:
342        vars = gather_args(s)
343        await asyncio.gather(*(self.run(Variable(v), db=self) for v in vars))
344        result = substitute(s, self.environment)
345        log.debug(f"substituting {s} => {result}")
346        return result
347
348    @property
349    def environment(self):
350        return Environment(self)
TaskDB( tasks: 'list[TaskT]' = <factory>, index: 'dict[T, TaskT]' = <factory>, runners: dict[str, Runner] = <factory>, throttle: Optional[asyncio.locks.Semaphore] = None, force_run: bool = False, history_path: pathlib.Path | None = None, history: dict[pathlib.Path, str | None] = <factory>)
runners: dict[str, Runner]
throttle: Optional[asyncio.locks.Semaphore] = None
force_run: bool = False
history_path: pathlib.Path | None = None
history: dict[pathlib.Path, str | None]
@contextmanager
def persistent_history(self):
311    @contextmanager
312    def persistent_history(self):
313        if not self.history_path:
314            yield
315        else:
316            self.read_history(self.history_path)
317            yield
318            self.write_history(self.history_path)
def read_history(self, history_path: pathlib.Path):
320    def read_history(self, history_path: Path):
321        if not history_path.exists():
322            return
323
324        with open(history_path, "r") as f_in:
325            data = json.load(f_in)
326            for k, v in data.items():
327                self.history[Path(k)] = v
def write_history(self, history_path: pathlib.Path):
329    def write_history(self, history_path: Path):
330        with open(history_path, "w") as f_out:
331            json.dump({str(k): v for k, v in self.history.items()}, f_out, indent=2)
def on_missing(self, t: pathlib.Path | Phony | Variable):
333    def on_missing(self, t: Path | Phony | Variable):
334        if isinstance(t, Path) and t.exists():
335            return Task([t], [])
336        raise MissingDependency()
def is_resolvable(self, s: Any) -> bool:
338    def is_resolvable(self, s: Any) -> bool:
339        return all(v in self.index for v in map(Variable, gather_args(s)))
async def resolve_object(self, s: Any) -> Any:
341    async def resolve_object(self, s: Any) -> Any:
342        vars = gather_args(s)
343        await asyncio.gather(*(self.run(Variable(v), db=self) for v in vars))
344        result = substitute(s, self.environment)
345        log.debug(f"substituting {s} => {result}")
346        return result
environment
348    @property
349    def environment(self):
350        return Environment(self)
@dataclass
class TaskProxy:
225@dataclass
226class TaskProxy:
227    """Input to create an actual Task.
228
229    An actual `Task` has no template variables remaining and untyped strings
230    are replaced with `Path` `Variable` or `Phony` objects.
231    """
232    creates: list[str] = field(default_factory=list)
233    requires: list[str] = field(default_factory=list)
234    name: Optional[str] = None
235    runner: Optional[str] = None
236    path: Optional[str] = None
237    script: Optional[str] = None
238    stdin: Optional[str] = None
239    stdout: Optional[str] = None
240    description: Optional[str] = None
241    force: bool = False
242
243    @property
244    def all_targets(self):
245        return (
246            self.creates
247            + ([self.stdout] if self.stdout else [])
248            + ([f"#{self.name}"] if self.name else [])
249        )
250
251    @property
252    def all_dependencies(self):
253        return (
254            self.requires
255            + ([self.stdin] if self.stdin else [])
256            + ([self.path] if self.path else [])
257        )

Input to create an actual Task.

An actual Task has no template variables remaining and untyped strings are replaced with Path Variable or Phony objects.

TaskProxy( creates: list[str] = <factory>, requires: list[str] = <factory>, name: Optional[str] = None, runner: Optional[str] = None, path: Optional[str] = None, script: Optional[str] = None, stdin: Optional[str] = None, stdout: Optional[str] = None, description: Optional[str] = None, force: bool = False)
creates: list[str]
requires: list[str]
name: Optional[str] = None
runner: Optional[str] = None
path: Optional[str] = None
script: Optional[str] = None
stdin: Optional[str] = None
stdout: Optional[str] = None
description: Optional[str] = None
force: bool = False
all_targets
243    @property
244    def all_targets(self):
245        return (
246            self.creates
247            + ([self.stdout] if self.stdout else [])
248            + ([f"#{self.name}"] if self.name else [])
249        )
all_dependencies
251    @property
252    def all_dependencies(self):
253        return (
254            self.requires
255            + ([self.stdin] if self.stdin else [])
256            + ([self.path] if self.path else [])
257        )
class Template(brei.TaskProxy):
367class Template(TaskProxy):
368    """A `Template` can receive the same arguments as a TaskProxy. The difference is 
369    that any template variables in the Template can be substituted with a `TemplateCall`."""
370    def call(self, args: dict[str, Any]) -> TaskProxy:
371        return substitute(self, args)

A Template can receive the same arguments as a TaskProxy. The difference is that any template variables in the Template can be substituted with a TemplateCall.

def call(self, args: dict[str, typing.Any]) -> TaskProxy:
370    def call(self, args: dict[str, Any]) -> TaskProxy:
371        return substitute(self, args)
@dataclass
class TemplateCall:
55@dataclass
56class TemplateCall:
57    """Calls a template with a set of arguments.
58
59    Members:
60
61      - template: name of the template.
62      - args: arguments to the call.
63      - collect: name of the phony target by which to collect all generated targets.
64      - join: `inner` or `outer` join.
65    """
66    template: str
67    args: dict[str, str | list[str]]
68    collect: str | None = None
69    join: Join = Join.INNER
70
71    @property
72    def all_args(self):
73        if all(isinstance(v, str) for v in self.args.values()):
74            yield self.args
75            return
76
77        if self.join == Join.INNER:
78            for v in zip(
79                *map(
80                    lambda x: repeat(x) if isinstance(x, str) else x,
81                    self.args.values(),
82                )
83            ):
84                yield dict(zip(self.args.keys(), v))
85
86        else:  # cartesian product
87            for v in product(
88                *map(lambda x: [x] if isinstance(x, str) else x, self.args.values())
89            ):
90                yield dict(zip(self.args.keys(), v))

Calls a template with a set of arguments.

Members:

  • template: name of the template.
  • args: arguments to the call.
  • collect: name of the phony target by which to collect all generated targets.
  • join: inner or outer join.
TemplateCall( template: str, args: dict[str, str | list[str]], collect: str | None = None, join: brei.program.Join = <Join.INNER: 1>)
template: str
args: dict[str, str | list[str]]
collect: str | None = None
join: brei.program.Join = <Join.INNER: 1>
all_args
71    @property
72    def all_args(self):
73        if all(isinstance(v, str) for v in self.args.values()):
74            yield self.args
75            return
76
77        if self.join == Join.INNER:
78            for v in zip(
79                *map(
80                    lambda x: repeat(x) if isinstance(x, str) else x,
81                    self.args.values(),
82                )
83            ):
84                yield dict(zip(self.args.keys(), v))
85
86        else:  # cartesian product
87            for v in product(
88                *map(lambda x: [x] if isinstance(x, str) else x, self.args.values())
89            ):
90                yield dict(zip(self.args.keys(), v))
@dataclass
class Variable:
31@dataclass
32class Variable:
33    name: str
34
35    def __hash__(self):
36        return hash(f"var({self.name})")
37
38    def __str__(self):
39        return f"var({self.name})"
Variable(name: str)
name: str
def construct(annot: Any, json: Any) -> Any:
29def construct(annot: Any, json: Any) -> Any:
30    try:
31        return _construct(annot, json)
32    except (AssertionError, ValueError) as e:
33        raise InputError(annot, json) from e
async def resolve_tasks( program: Program, history_path: pathlib.Path | None = None) -> TaskDB:
143async def resolve_tasks(program: Program, history_path: Path | None = None) -> TaskDB:
144    """Resolve a program. A resolved program has all of its includes and
145    template calls done, so that only tasks remains. In order to resolve
146    a program, some tasks may need to be run. Variables that appear in
147    the `creates` field of a task (aka targets), will be resolved eagerly.
148
149    Returns: TaskDB instance.
150    """
151    db = TaskDB(history_path = history_path)
152    template_index = dict()
153
154    async def go(program: Program):
155        for var, template in program.environment.items():
156            db.add(TemplateVariable([Variable(var)], [], template))
157
158        task_templates = copy(program.task)
159        template_index.update(program.template)
160        delayed_calls: list[TemplateCall] = []
161        delayed_templates: list[TaskProxy] = []
162
163        db.runners.update(program.runner)
164
165        for c in program.call:
166            if c.template not in template_index:
167                log.debug(
168                    "template `%s` not available, waiting for includes to resolve",
169                    c.template,
170                )
171                delayed_calls.append(c)
172                continue
173
174            task_templates.extend(tasks_from_call(template_index[c.template], c))
175
176        for tt in task_templates:
177            # we could check for resolvability here, but I don't like the
178            # idea that order then matters. this way the rule is:
179            # > if a task has a templated target, those variables should be
180            # > resolvable after all other tasks were added, seeing that the
181            # > task to resolve these variables can't have templated targets
182            # > themselves.
183            if gather_args(tt.all_targets):
184                delayed_templates.append(tt)
185            else:
186                db.add(TemplateTask([], [], tt))
187
188        delayed_templates = await resolve_delayed(db, delayed_templates)
189
190        for inc in program.include:
191            incp = Path(await db.resolve_object(inc))
192            if incp in db.index:
193                await db.run(incp, db=db)
194            if not incp.exists():
195                raise MissingInclude(incp)
196
197            prg = Program.read(incp)
198            await go(prg)
199
200        for c in delayed_calls:
201            if c.template not in template_index:
202                log.debug(
203                    "template `%s` still not available, now this is an error", c.template
204                )
205                raise MissingTemplate(c.template)
206
207            for tt in tasks_from_call(template_index[c.template], c):
208                if gather_args(tt.creates):
209                    delayed_templates.append(tt)
210                else:
211                    db.add(TemplateTask([], [], tt))
212
213        delayed_templates = await resolve_delayed(db, delayed_templates)
214        if delayed_templates:
215            unresolvable = [p for t in delayed_templates for p in t.creates if not db.is_resolvable(p)]
216            raise UserError(f"Task has unresolvable targets: {unresolvable}")
217
218        return db
219
220    with db.persistent_history():
221        await go(program)
222
223    return db

Resolve a program. A resolved program has all of its includes and template calls done, so that only tasks remains. In order to resolve a program, some tasks may need to be run. Variables that appear in the creates field of a task (aka targets), will be resolved eagerly.

Returns: TaskDB instance.