brei
Welcome to Brei's API documentation. There are two ways to use Brei: from the
command-line (in which case we refer to the homepage for documentation), or
straight from Python. The easiest function to work with is brei()
, which
links to the command-line app one-to-one.
Program
If you want to read the Program
yourself, there are several ways to do so:
- Use
Program.read()
. You give it aPath
to a TOML or JSON file and a section, this last bit giving a object path into the data. For instance:Program.read(Path("pyproject.toml"), "tool.brei")
. - Read your own data format into JSON compatible data, then
construct(Program, data)
. Theconstruct
function uses the type annotations in dataclasses to validate the input data.
After reading the data, you'll want to resolve all tasks, i.e. perform includes and run any necessary task to resolve the targets of all other tasks.
program = Program.read(Path("brei.toml"))
db: TaskDB = await resolve_tasks(program)
await db.run(Phony("all"))
There are three kinds of targets: pathlib.Path
, Phony
and Variable
.
API
1# ~/~ begin <<docs/implementation.md#brei/__init__.py>>[init] 2""" 3Welcome to Brei's API documentation. There are two ways to use Brei: from the 4command-line (in which case we refer to the homepage for documentation), or 5straight from Python. The easiest function to work with is `brei()`, which 6links to the command-line app one-to-one. 7 8## Program 9If you want to read the `Program` yourself, there are several ways to do so: 10 111. Use `Program.read()`. You give it a `Path` to a TOML or JSON file and a 12section, this last bit giving a object path into the data. For instance: 13`Program.read(Path("pyproject.toml"), "tool.brei")`. 142. Read your own data format into JSON compatible data, then 15`construct(Program, data)`. The `construct` function uses the type annotations 16in dataclasses to validate the input data. 17 18After reading the data, you'll want to resolve all tasks, i.e. perform includes 19and run any necessary task to resolve the targets of all other tasks. 20 21 program = Program.read(Path("brei.toml")) 22 db: TaskDB = await resolve_tasks(program) 23 await db.run(Phony("all")) 24 25There are three kinds of targets: `pathlib.Path`, `Phony` and `Variable`. 26 27## API 28""" 29 30from .program import Program, resolve_tasks, TemplateCall 31from .construct import construct 32from .lazy import Lazy, LazyDB 33from .task import Task, TaskDB, Phony, Variable, TaskProxy, Template 34from .runner import Runner 35from .cli import brei 36 37__all__ = [ 38 "brei", 39 40 "Lazy", "LazyDB", "Phony", "Program", "Runner", "Task", "TaskDB", 41 "TaskProxy", "Template", "TemplateCall", "Variable", "construct", 42 "resolve_tasks", 43] 44# ~/~ end
48@argh.arg("targets", nargs="*", help="names of tasks to run") 49@argh.arg( 50 "-i", 51 "--input-file", 52 help="Brei TOML or JSON file, use a `[...]` suffix to indicate a subsection.", 53) 54@argh.arg("-B", "--force-run", help="rebuild all dependencies") 55@argh.arg("-j", "--jobs", help="limit number of concurrent jobs") 56@argh.arg("-v", "--version", help="print version number and exit") 57@argh.arg("--list-runners", help="show default configured runners") 58@argh.arg("--debug", help="more verbose logging") 59def brei( 60 targets: list[str], 61 *, 62 input_file: Optional[str] = None, 63 force_run: bool = False, 64 jobs: Optional[int] = None, 65 version: bool = False, 66 list_runners: bool = False, 67 debug: bool = False 68): 69 """Build one of the configured targets.""" 70 if version: 71 print(f"Brei {__version__}, Copyright (c) 2023 Netherlands eScience Center.") 72 print("Licensed under the Apache License, Version 2.0.") 73 sys.exit(0) 74 75 if list_runners: 76 t = Table(title="Default Runners", header_style="italic green", show_edge=False) 77 t.add_column("runner", style="bold yellow") 78 t.add_column("executable") 79 t.add_column("arguments") 80 for r, c in DEFAULT_RUNNERS.items(): 81 t.add_row(r, c.command, f"{c.args}") 82 console = Console() 83 console.print(t) 84 sys.exit(0) 85 86 if input_file is not None: 87 if m := re.match(r"([^\[\]]+)\[([^\[\]\s]+)\]", input_file): 88 input_path = Path(m.group(1)) 89 section = m.group(2) 90 else: 91 input_path = Path(input_file) 92 section = None 93 94 program = read_from_file(Program, input_path, section) 95 96 elif Path("brei.toml").exists(): 97 program = read_from_file(Program, Path("brei.toml")) 98 99 elif Path("pyproject.toml").exists(): 100 with open("pyproject.toml", "rb") as f_in: 101 data = tomllib.load(f_in) 102 try: 103 for s in ["tool", "brei"]: 104 data = data[s] 105 except KeyError as e: 106 raise HelpfulUserError( 107 f"With out the `-f` argument, Brei looks for `brei.toml` first, then for " 108 f"a `[tool.brei]` section in `pyproject.toml`. A `pyproject.toml` file was " 109 f"found, but contained no `[tool.brei]` section." 110 ) from e 111 112 program = construct(Program, data) 113 else: 114 raise HelpfulUserError( 115 "No input file given, no `loom.toml` found and no `pyproject.toml` found." 116 ) 117 118 jobs = int(jobs) if jobs else None 119 configure_logger(debug) 120 try: 121 asyncio.run(main(program, targets, force_run, jobs)) 122 except UserError as e: 123 log.error(f"Failed: {e}")
Build one of the configured targets.
37@dataclass 38class Lazy(Generic[T, R]): 39 """Base class for tasks that are tagged with type `T` (usually `str` or 40 `Path`) and representing values of type `R`. 41 42 To implement a specific task, you need to implement the asynchronous 43 `run` method, which should return a value of `R` or throw `TaskFailure`. 44 45 Attributes: 46 targets: list of target identifiers, for instance paths that are 47 generated by running a particular task. 48 dependencies: list of dependency identifiers. All of these need to 49 be realized before the task can run. 50 result (property): value of the result, once the task was run. This 51 throws an exception if accessed before the task is complete. 52 """ 53 54 creates: list[T] 55 requires: list[T] 56 57 _lock: asyncio.Lock = field(default_factory=asyncio.Lock, init=False) 58 _result: Optional[Result[R]] = field(default=None, init=False) 59 60 @property 61 def real_requirements(self) -> list[T]: 62 return [d for d in self.requires if not isinstance(d, Phony)] 63 64 def __bool__(self): 65 return self._result is not None and bool(self._result) 66 67 @property 68 def result(self) -> R: 69 if self._result is None: 70 raise ValueError("Task has not run yet.") 71 if not self._result: 72 raise ValueError("Task has failed.") 73 assert isinstance(self._result, Ok) 74 if isinstance(self._result.value, Lazy): 75 return self._result.value.result 76 return self._result.value 77 78 async def run(self, *, db) -> R: 79 raise NotImplementedError() 80 81 async def run_after_deps(self, recurse, visited: dict[T, None], **kwargs) -> Result[R]: 82 dep_res = await asyncio.gather( 83 *(recurse(dep, copy(visited), **kwargs) for dep in self.requires) 84 ) 85 if not all(dep_res): 86 return DependencyFailure( 87 {k: v for (k, v) in zip(self.requires, dep_res) if not v} 88 ) 89 try: 90 return Ok(await self.run(**kwargs)) 91 except TaskFailure as f: 92 return f 93 94 async def run_cached(self, recurse, visited: dict[T, None], **kwargs) -> Result[R]: 95 async with self._lock: 96 if self._result is not None: 97 return self._result 98 self._result = await self.run_after_deps(recurse, visited, **kwargs) 99 return self._result 100 101 def reset(self): 102 self._result = None 103 104 def fields(self): 105 return {f.name: getattr(self, f.name) for f in fields(self) if f.name[0] != "_"}
Base class for tasks that are tagged with type T
(usually str
or
Path
) and representing values of type R
.
To implement a specific task, you need to implement the asynchronous
run
method, which should return a value of R
or throw TaskFailure
.
Attributes: targets: list of target identifiers, for instance paths that are generated by running a particular task. dependencies: list of dependency identifiers. All of these need to be realized before the task can run. result (property): value of the result, once the task was run. This throws an exception if accessed before the task is complete.
67 @property 68 def result(self) -> R: 69 if self._result is None: 70 raise ValueError("Task has not run yet.") 71 if not self._result: 72 raise ValueError("Task has failed.") 73 assert isinstance(self._result, Ok) 74 if isinstance(self._result.value, Lazy): 75 return self._result.value.result 76 return self._result.value
81 async def run_after_deps(self, recurse, visited: dict[T, None], **kwargs) -> Result[R]: 82 dep_res = await asyncio.gather( 83 *(recurse(dep, copy(visited), **kwargs) for dep in self.requires) 84 ) 85 if not all(dep_res): 86 return DependencyFailure( 87 {k: v for (k, v) in zip(self.requires, dep_res) if not v} 88 ) 89 try: 90 return Ok(await self.run(**kwargs)) 91 except TaskFailure as f: 92 return f
114@dataclass 115class LazyDB(Generic[T, TaskT]): 116 """Collect tasks and coordinate running a task from a task identifier.""" 117 118 tasks: list[TaskT] = field(default_factory=list) 119 index: dict[T, TaskT] = field(default_factory=dict) 120 121 async def run(self, t: T, visited: dict[T, None] | None = None, **kwargs) -> Result[R]: 122 visited = visited or dict() 123 if t in visited: 124 raise CyclicWorkflowError(list(visited.keys())) 125 visited[t] = None 126 127 if t not in self.index: 128 try: 129 task = self.on_missing(t) 130 except MissingDependency: 131 return MissingFailure(t) 132 else: 133 task = self.index[t] 134 135 while True: 136 match (result := await task.run_cached(self.run, visited, **kwargs)): 137 case Ok(x) if isinstance(x, Lazy): 138 task = cast(TaskT, x) 139 case _: 140 return result 141 142 def on_missing(self, _: T) -> TaskT: 143 raise MissingDependency() 144 145 def add(self, task: TaskT): 146 """Add a task to the DB.""" 147 log.debug(f"adding task ===\n{task}") 148 self.tasks.append(task) 149 for target in task.creates: 150 self.index[target] = task 151 152 def clean(self): 153 self.tasks = [] 154 self.index = {} 155 156 def reset(self): 157 for t in self.tasks: 158 t.reset()
Collect tasks and coordinate running a task from a task identifier.
121 async def run(self, t: T, visited: dict[T, None] | None = None, **kwargs) -> Result[R]: 122 visited = visited or dict() 123 if t in visited: 124 raise CyclicWorkflowError(list(visited.keys())) 125 visited[t] = None 126 127 if t not in self.index: 128 try: 129 task = self.on_missing(t) 130 except MissingDependency: 131 return MissingFailure(t) 132 else: 133 task = self.index[t] 134 135 while True: 136 match (result := await task.run_cached(self.run, visited, **kwargs)): 137 case Ok(x) if isinstance(x, Lazy): 138 task = cast(TaskT, x) 139 case _: 140 return result
20@dataclass 21class Phony(FromStr): 22 name: str 23 24 @classmethod 25 def from_str(cls, s: str) -> Phony: 26 if s[0] == "#": 27 return Phony(s[1:]) 28 raise ValueError("A phony target should start with a `#` character.") 29 30 def __str__(self): 31 return f"#{self.name}" 32 33 def __hash__(self): 34 return hash(str(self))
93@dataclass 94class Program: 95 """A Brei program. 96 97 Members: 98 99 - task: list of tasks. 100 - environment: variables. 101 - template: set of templates. 102 - call: list of calls to templates. 103 - include: list of includes. 104 - runner: extra configured task runners. 105 """ 106 task: list[TaskProxy] = field(default_factory=list) 107 environment: dict[str, str] = field(default_factory=dict) 108 template: dict[str, Template] = field(default_factory=dict) 109 call: list[TemplateCall] = field(default_factory=list) 110 include: list[str] = field(default_factory=list) 111 runner: dict[str, Runner] = field(default_factory=dict) 112 113 @staticmethod 114 def read(path: Path, section: str | None = None) -> Program: 115 return read_from_file(Program, path, section)
A Brei program.
Members:
- task: list of tasks.
- environment: variables.
- template: set of templates.
- call: list of calls to templates.
- include: list of includes.
- runner: extra configured task runners.
55@dataclass 56class Task(Lazy[Path | Phony | Variable, str | None]): 57 name: Optional[str] = None 58 runner: Optional[str] = None 59 path: Optional[Path] = None 60 script: Optional[str] = None 61 stdin: Optional[Path | Variable] = None 62 stdout: Optional[Path | Variable] = None 63 description: Optional[str] = None 64 force: bool = False 65 66 @property 67 def target_paths(self): 68 return (p for p in self.creates if isinstance(p, Path)) 69 70 @property 71 def dependency_paths(self): 72 return (p for p in self.requires if isinstance(p, Path)) 73 74 @property 75 def digest(self) -> str | None: 76 if self.script is None: 77 return None 78 return hashlib.md5(self.script.encode()).hexdigest() 79 80 def __str__(self): 81 tgts = ", ".join(str(t) for t in self.creates) 82 deps = ", ".join(str(t) for t in self.requires) 83 if self.script is not None: 84 src = indent(self.script, prefix=" â–Ž ", predicate=lambda _: True) 85 elif self.path is not None: 86 src = str(self.path) 87 else: 88 src = " - " 89 name = f"{self.name}: " if self.name else "" 90 return name + f"[{tgts}] <- [{deps}]\n" + src 91 92 def __post_init__(self): 93 if self.name and Phony(self.name) not in self.creates: 94 self.creates.append(Phony(self.name)) 95 if self.stdin and self.stdin not in self.requires: 96 self.requires.append(self.stdin) 97 if self.path and self.path not in self.requires: 98 self.requires.append(self.path) 99 if self.stdout and self.stdout not in self.creates: 100 self.creates.append(self.stdout) 101 102 def always_run(self) -> bool: 103 return self.force or len(list(self.target_paths)) == 0 104 105 def needs_run(self, db: TaskDB) -> bool: 106 if any(not p.exists() for p in self.target_paths): 107 return True 108 target_stats = [stat(p) for p in self.target_paths] 109 dep_stats = [stat(p) for p in self.dependency_paths] 110 if any(t < d for t in target_stats for d in dep_stats): 111 return True 112 if any(self.digest != db.history.get(p, None) for p in self.target_paths): 113 return True 114 return False 115 116 @contextmanager 117 def get_script_path(self): 118 if self.path is not None: 119 tmpfile = None 120 path = self.path 121 elif self.script is not None: 122 tmpfile = NamedTemporaryFile("w") 123 tmpfile.write(self.script) 124 tmpfile.flush() 125 path = Path(tmpfile.name) 126 else: 127 raise ValueError("A `Rule` can have either `path` or `script` defined.") 128 129 yield path 130 131 if tmpfile is not None: 132 tmpfile.close() 133 134 @contextmanager 135 def get_stdout(self): 136 match self.stdout: 137 case Variable(x): 138 yield asyncio.subprocess.PIPE 139 case x if isinstance(x, Path): 140 stdout = open(x, "w") 141 yield stdout 142 stdout.close() 143 case _: 144 yield None 145 146 async def run(self, *, db: TaskDB): 147 if not self.always_run() and not self.needs_run(db) and not db.force_run: 148 tgts = " ".join(f"`{t}`" for t in self.target_paths) 149 log.info(f"Targets {tgts} already up-to-date.") 150 return 151 152 log.debug(f"{self}") 153 if (self.path is None and self.script is None): 154 return 155 156 targets = " ".join(f"`{t}`" for t in self.creates) 157 short_note = self.description or (f"#{self.name}" if self.name else None) \ 158 or f"creating {targets}" 159 log.info(f"[green]{short_note}[/]", extra={'markup': True}) 160 161 stdin: TextIO | int | None = None 162 match self.stdin: 163 case Variable(x): 164 stdin = asyncio.subprocess.PIPE 165 input_data = db.environment[x].encode() 166 case x if isinstance(x, Path): 167 stdin = open(x, "r") 168 input_data = None 169 case _: 170 stdin = None 171 input_data = None 172 173 if self.runner is None and self.script is not None: 174 if not is_oneliner(self.script): 175 assert self.stdin is None 176 177 178 with self.get_stdout() as stdout: 179 stdout_data = b"" 180 for line in self.script.splitlines(): 181 async with db.throttle or nullcontext(): 182 proc = await create_subprocess_exec( 183 *shlex.split(line), 184 stdin=stdin, 185 stdout=stdout, 186 stderr=asyncio.subprocess.PIPE, 187 ) 188 stdout_data_part, stderr_data = await proc.communicate(input_data) 189 log.debug(f"return-code {proc.returncode}") 190 if stdout_data_part: 191 stdout_data += stdout_data_part 192 if stderr_data: 193 log.info(f"[gold1]{short_note}[/] %s", stderr_data.decode().rstrip(), extra={"markup": True}) 194 195 elif self.runner is not None: 196 with self.get_script_path() as path, self.get_stdout() as stdout: 197 runner = db.runners[self.runner] 198 args = [string.Template(arg).substitute(script=path) for arg in runner.args] 199 async with db.throttle or nullcontext(): 200 proc = await create_subprocess_exec( 201 runner.command, 202 *args, 203 stdin=stdin, 204 stdout=stdout, 205 stderr=asyncio.subprocess.PIPE, 206 ) 207 stdout_data, stderr_data = await proc.communicate(input_data) 208 log.debug(f"return-code {proc.returncode}") 209 210 if stderr_data: 211 log.info(f"[gold1]{short_note}[/] %s", stderr_data.decode().rstrip(), extra={"markup": True}) 212 213 else: 214 return 215 216 for p in self.target_paths: 217 db.history[p] = self.digest 218 219 if self.needs_run(db): 220 raise TaskFailure("Task didn't achieve goals.") 221 222 return stdout_data.decode().strip() if stdout_data else None
105 def needs_run(self, db: TaskDB) -> bool: 106 if any(not p.exists() for p in self.target_paths): 107 return True 108 target_stats = [stat(p) for p in self.target_paths] 109 dep_stats = [stat(p) for p in self.dependency_paths] 110 if any(t < d for t in target_stats for d in dep_stats): 111 return True 112 if any(self.digest != db.history.get(p, None) for p in self.target_paths): 113 return True 114 return False
116 @contextmanager 117 def get_script_path(self): 118 if self.path is not None: 119 tmpfile = None 120 path = self.path 121 elif self.script is not None: 122 tmpfile = NamedTemporaryFile("w") 123 tmpfile.write(self.script) 124 tmpfile.flush() 125 path = Path(tmpfile.name) 126 else: 127 raise ValueError("A `Rule` can have either `path` or `script` defined.") 128 129 yield path 130 131 if tmpfile is not None: 132 tmpfile.close()
146 async def run(self, *, db: TaskDB): 147 if not self.always_run() and not self.needs_run(db) and not db.force_run: 148 tgts = " ".join(f"`{t}`" for t in self.target_paths) 149 log.info(f"Targets {tgts} already up-to-date.") 150 return 151 152 log.debug(f"{self}") 153 if (self.path is None and self.script is None): 154 return 155 156 targets = " ".join(f"`{t}`" for t in self.creates) 157 short_note = self.description or (f"#{self.name}" if self.name else None) \ 158 or f"creating {targets}" 159 log.info(f"[green]{short_note}[/]", extra={'markup': True}) 160 161 stdin: TextIO | int | None = None 162 match self.stdin: 163 case Variable(x): 164 stdin = asyncio.subprocess.PIPE 165 input_data = db.environment[x].encode() 166 case x if isinstance(x, Path): 167 stdin = open(x, "r") 168 input_data = None 169 case _: 170 stdin = None 171 input_data = None 172 173 if self.runner is None and self.script is not None: 174 if not is_oneliner(self.script): 175 assert self.stdin is None 176 177 178 with self.get_stdout() as stdout: 179 stdout_data = b"" 180 for line in self.script.splitlines(): 181 async with db.throttle or nullcontext(): 182 proc = await create_subprocess_exec( 183 *shlex.split(line), 184 stdin=stdin, 185 stdout=stdout, 186 stderr=asyncio.subprocess.PIPE, 187 ) 188 stdout_data_part, stderr_data = await proc.communicate(input_data) 189 log.debug(f"return-code {proc.returncode}") 190 if stdout_data_part: 191 stdout_data += stdout_data_part 192 if stderr_data: 193 log.info(f"[gold1]{short_note}[/] %s", stderr_data.decode().rstrip(), extra={"markup": True}) 194 195 elif self.runner is not None: 196 with self.get_script_path() as path, self.get_stdout() as stdout: 197 runner = db.runners[self.runner] 198 args = [string.Template(arg).substitute(script=path) for arg in runner.args] 199 async with db.throttle or nullcontext(): 200 proc = await create_subprocess_exec( 201 runner.command, 202 *args, 203 stdin=stdin, 204 stdout=stdout, 205 stderr=asyncio.subprocess.PIPE, 206 ) 207 stdout_data, stderr_data = await proc.communicate(input_data) 208 log.debug(f"return-code {proc.returncode}") 209 210 if stderr_data: 211 log.info(f"[gold1]{short_note}[/] %s", stderr_data.decode().rstrip(), extra={"markup": True}) 212 213 else: 214 return 215 216 for p in self.target_paths: 217 db.history[p] = self.digest 218 219 if self.needs_run(db): 220 raise TaskFailure("Task didn't achieve goals.") 221 222 return stdout_data.decode().strip() if stdout_data else None
303@dataclass 304class TaskDB(LazyDB[Path | Variable | Phony, Task | TemplateTask | TemplateVariable]): 305 runners: dict[str, Runner] = field(default_factory=lambda: copy(DEFAULT_RUNNERS)) 306 throttle: Optional[asyncio.Semaphore] = None 307 force_run: bool = False 308 history_path: Path | None = None 309 history: dict[Path, str | None] = field(default_factory=dict) 310 311 @contextmanager 312 def persistent_history(self): 313 if not self.history_path: 314 yield 315 else: 316 self.read_history(self.history_path) 317 yield 318 self.write_history(self.history_path) 319 320 def read_history(self, history_path: Path): 321 if not history_path.exists(): 322 return 323 324 with open(history_path, "r") as f_in: 325 data = json.load(f_in) 326 for k, v in data.items(): 327 self.history[Path(k)] = v 328 329 def write_history(self, history_path: Path): 330 with open(history_path, "w") as f_out: 331 json.dump({str(k): v for k, v in self.history.items()}, f_out, indent=2) 332 333 def on_missing(self, t: Path | Phony | Variable): 334 if isinstance(t, Path) and t.exists(): 335 return Task([t], []) 336 raise MissingDependency() 337 338 def is_resolvable(self, s: Any) -> bool: 339 return all(v in self.index for v in map(Variable, gather_args(s))) 340 341 async def resolve_object(self, s: Any) -> Any: 342 vars = gather_args(s) 343 await asyncio.gather(*(self.run(Variable(v), db=self) for v in vars)) 344 result = substitute(s, self.environment) 345 log.debug(f"substituting {s} => {result}") 346 return result 347 348 @property 349 def environment(self): 350 return Environment(self)
225@dataclass 226class TaskProxy: 227 """Input to create an actual Task. 228 229 An actual `Task` has no template variables remaining and untyped strings 230 are replaced with `Path` `Variable` or `Phony` objects. 231 """ 232 creates: list[str] = field(default_factory=list) 233 requires: list[str] = field(default_factory=list) 234 name: Optional[str] = None 235 runner: Optional[str] = None 236 path: Optional[str] = None 237 script: Optional[str] = None 238 stdin: Optional[str] = None 239 stdout: Optional[str] = None 240 description: Optional[str] = None 241 force: bool = False 242 243 @property 244 def all_targets(self): 245 return ( 246 self.creates 247 + ([self.stdout] if self.stdout else []) 248 + ([f"#{self.name}"] if self.name else []) 249 ) 250 251 @property 252 def all_dependencies(self): 253 return ( 254 self.requires 255 + ([self.stdin] if self.stdin else []) 256 + ([self.path] if self.path else []) 257 )
Input to create an actual Task.
An actual Task
has no template variables remaining and untyped strings
are replaced with Path
Variable
or Phony
objects.
367class Template(TaskProxy): 368 """A `Template` can receive the same arguments as a TaskProxy. The difference is 369 that any template variables in the Template can be substituted with a `TemplateCall`.""" 370 def call(self, args: dict[str, Any]) -> TaskProxy: 371 return substitute(self, args)
A Template
can receive the same arguments as a TaskProxy. The difference is
that any template variables in the Template can be substituted with a TemplateCall
.
55@dataclass 56class TemplateCall: 57 """Calls a template with a set of arguments. 58 59 Members: 60 61 - template: name of the template. 62 - args: arguments to the call. 63 - collect: name of the phony target by which to collect all generated targets. 64 - join: `inner` or `outer` join. 65 """ 66 template: str 67 args: dict[str, str | list[str]] 68 collect: str | None = None 69 join: Join = Join.INNER 70 71 @property 72 def all_args(self): 73 if all(isinstance(v, str) for v in self.args.values()): 74 yield self.args 75 return 76 77 if self.join == Join.INNER: 78 for v in zip( 79 *map( 80 lambda x: repeat(x) if isinstance(x, str) else x, 81 self.args.values(), 82 ) 83 ): 84 yield dict(zip(self.args.keys(), v)) 85 86 else: # cartesian product 87 for v in product( 88 *map(lambda x: [x] if isinstance(x, str) else x, self.args.values()) 89 ): 90 yield dict(zip(self.args.keys(), v))
Calls a template with a set of arguments.
Members:
- template: name of the template.
- args: arguments to the call.
- collect: name of the phony target by which to collect all generated targets.
- join:
inner
orouter
join.
71 @property 72 def all_args(self): 73 if all(isinstance(v, str) for v in self.args.values()): 74 yield self.args 75 return 76 77 if self.join == Join.INNER: 78 for v in zip( 79 *map( 80 lambda x: repeat(x) if isinstance(x, str) else x, 81 self.args.values(), 82 ) 83 ): 84 yield dict(zip(self.args.keys(), v)) 85 86 else: # cartesian product 87 for v in product( 88 *map(lambda x: [x] if isinstance(x, str) else x, self.args.values()) 89 ): 90 yield dict(zip(self.args.keys(), v))
143async def resolve_tasks(program: Program, history_path: Path | None = None) -> TaskDB: 144 """Resolve a program. A resolved program has all of its includes and 145 template calls done, so that only tasks remains. In order to resolve 146 a program, some tasks may need to be run. Variables that appear in 147 the `creates` field of a task (aka targets), will be resolved eagerly. 148 149 Returns: TaskDB instance. 150 """ 151 db = TaskDB(history_path = history_path) 152 template_index = dict() 153 154 async def go(program: Program): 155 for var, template in program.environment.items(): 156 db.add(TemplateVariable([Variable(var)], [], template)) 157 158 task_templates = copy(program.task) 159 template_index.update(program.template) 160 delayed_calls: list[TemplateCall] = [] 161 delayed_templates: list[TaskProxy] = [] 162 163 db.runners.update(program.runner) 164 165 for c in program.call: 166 if c.template not in template_index: 167 log.debug( 168 "template `%s` not available, waiting for includes to resolve", 169 c.template, 170 ) 171 delayed_calls.append(c) 172 continue 173 174 task_templates.extend(tasks_from_call(template_index[c.template], c)) 175 176 for tt in task_templates: 177 # we could check for resolvability here, but I don't like the 178 # idea that order then matters. this way the rule is: 179 # > if a task has a templated target, those variables should be 180 # > resolvable after all other tasks were added, seeing that the 181 # > task to resolve these variables can't have templated targets 182 # > themselves. 183 if gather_args(tt.all_targets): 184 delayed_templates.append(tt) 185 else: 186 db.add(TemplateTask([], [], tt)) 187 188 delayed_templates = await resolve_delayed(db, delayed_templates) 189 190 for inc in program.include: 191 incp = Path(await db.resolve_object(inc)) 192 if incp in db.index: 193 await db.run(incp, db=db) 194 if not incp.exists(): 195 raise MissingInclude(incp) 196 197 prg = Program.read(incp) 198 await go(prg) 199 200 for c in delayed_calls: 201 if c.template not in template_index: 202 log.debug( 203 "template `%s` still not available, now this is an error", c.template 204 ) 205 raise MissingTemplate(c.template) 206 207 for tt in tasks_from_call(template_index[c.template], c): 208 if gather_args(tt.creates): 209 delayed_templates.append(tt) 210 else: 211 db.add(TemplateTask([], [], tt)) 212 213 delayed_templates = await resolve_delayed(db, delayed_templates) 214 if delayed_templates: 215 unresolvable = [p for t in delayed_templates for p in t.creates if not db.is_resolvable(p)] 216 raise UserError(f"Task has unresolvable targets: {unresolvable}") 217 218 return db 219 220 with db.persistent_history(): 221 await go(program) 222 223 return db
Resolve a program. A resolved program has all of its includes and
template calls done, so that only tasks remains. In order to resolve
a program, some tasks may need to be run. Variables that appear in
the creates
field of a task (aka targets), will be resolved eagerly.
Returns: TaskDB instance.