Utilities#
requires
and inherits
#
b2luigi
provides two functions to help you with the task creation.
The first one is b2luigi.requires()
, which is a decorator that allows you to
specify the dependencies of a task easily.
It is similar to the requires
method in luigi
, but it allows you to
specify the dependencies in a more flexible way.
The second one is b2luigi.inherits()
, which is a decorator that allows you to
inherit the parameters of a task easily.
It is similar to the inherits
method in luigi
, but it allows you to
inherit without specific parameters.
- b2luigi.requires(*tasks_to_require, **kwargs)[source]#
This “hack” copies the luigi.requires functionality, except that we allow for additional kwarg arguments when called.
It can be used to require a certain task, but with some variables already set, e.g.
class TaskA(b2luigi.Task): some_parameter = b2luigi.IntParameter() some_other_parameter = b2luigi.IntParameter() def output(self): yield self.add_to_output("test.txt") @b2luigi.requires(TaskA, some_parameter=3) class TaskB(b2luigi.Task): another_parameter = b2luigi.IntParameter() def output(self): yield self.add_to_output("out.dat")
TaskB will not require TaskA, where
some_parameter
is already set to3
. This also means, that TaskB only has the parametersanother_parameter
andsome_other_parameter
(becausesome_parameter
is already fixed).It is also possible to require multiple tasks, e.g.
class TaskA(b2luigi.Task): some_parameter = b2luigi.IntParameter() def output(self): yield self.add_to_output("test.txt") class TaskB(b2luigi.Task): some_other_parameter = b2luigi.IntParameter() def output(self): yield self.add_to_output("test.txt") @b2luigi.requires(TaskA, TaskB) class TaskC(b2luigi.Task): another_parameter = b2luigi.IntParameter() def output(self): yield self.add_to_output("out.dat")
- b2luigi.inherits(*tasks_to_inherit, **kwargs)[source]#
This copies the
luigi.inherits
functionality but allows specifying parameters you don’t want to inherit.It can e.g. be used in tasks that merge the output of the tasks they require. These merger tasks don’t need the parameter they resolve anymore but should keep the same order of parameters, therefore simplifying the directory structure created by
b2luigi.Task.add_to_output()
.Usage can be similar to this:
class TaskA(b2luigi.Task): some_parameter = b2luigi.IntParameter() some_other_parameter = b2luigi.IntParameter() def output(self): yield self.add_to_output("test.txt") @b2luigi.inherits(TaskA, without='some_other_parameter') class TaskB(b2luigi.Task): another_parameter = b2luigi.IntParameter() def requires(self): for my_other_parameter in range(10): yield self.clone(TaskA, some_other_parameter=my_other_parameter) def run(self): # somehow merge the output of TaskA to create "out.dat" pass def output(self): yield self.add_to_output("out.dat")
- Parameters:
without – Either a string or a collection of strings
See also:
b2luigi.requires
which extendsluigi.requires
.
Core Utilities#
- b2luigi.core.utils.product_dict(**kwargs: Any) Iterator[Dict[str, Any]] [source]#
Cross-product the given parameters and return a list of dictionaries.
Example
>>> list(product_dict(arg_1=[1, 2], arg_2=[3, 4])) [{'arg_1': 1, 'arg_2': 3}, {'arg_1': 1, 'arg_2': 4}, {'arg_1': 2, 'arg_2': 3}, {'arg_1': 2, 'arg_2': 4}]
The thus produced list can directly be used as inputs for a required tasks:
def requires(self): for args in product_dict(arg_1=[1, 2], arg_2=[3, 4]): yield some_task(**args)
- Parameters:
kwargs (Any) – Each keyword argument should be an iterable
- Returns:
A list of kwargs where each list of input keyword arguments is cross-multiplied with every other.
- Return type:
Iterator[Dict[str, Any]]
- b2luigi.core.utils.fill_kwargs_with_lists(**kwargs: Any) Dict[str, List[Any]] [source]#
Return the kwargs with each value mapped to [value] if not a list already.
Example: .. code-block:: python
>>> fill_kwargs_with_lists(arg_1=[1, 2], arg_2=3) {'arg_1': [1, 2], 'arg_2': [3]}
- Parameters:
kwargs – The input keyword arguments
- Returns:
Same as kwargs, but each value mapped to a list if not a list already
- Return type:
Dict[str, List[Any]]
- b2luigi.core.utils.flatten_to_file_paths(inputs: Iterable[FileSystemTarget]) Dict[str, List[str]] [source]#
Take in a dict of lists of luigi targets and replace each luigi target by its corresponding path. For dicts, it will replace the value as well as the key. The key will however only by the basename of the path.
- Parameters:
inputs – A dict of lists of luigi targets
- Returns:
A dict with the keys replaced by the basename of the targets and the values by the full path
- Return type:
Dict[str, List[str]]
- b2luigi.core.utils.flatten_to_dict(inputs: Iterable[Any]) Dict[Any, Any] [source]#
Return a whatever input structure into a dictionary. If it is a dict already, return this. If is is an iterable of dict or dict-like objects, return the merged dictionary. All non-dict values will be turned into a dictionary with value ->
{value: value}
Example: .. code-block:: python
>>> flatten_to_dict([{"a": 1, "b": 2}, {"c": 3}, "d"]) {'a': 1, 'b': 2, 'c': 3, 'd': 'd'}
- Parameters:
inputs (Iterable[Any]) – The input structure
- Returns:
A dict constructed as described above.
- Return type:
Dict[Any, Any]
- b2luigi.core.utils.flatten_to_dict_of_lists(inputs: Iterable[Any]) Dict[Any, List] [source]#
Flattens a nested iterable structure into a dictionary of lists.
This function takes an iterable of potentially nested structures, flattens it, and converts it into a dictionary where each key maps to a list of values associated with that key.
- Parameters:
inputs (Iterable[Any]) – The input iterable containing nested structures.
- Returns:
A dictionary where keys are derived from the input and values are lists of corresponding items.
- Return type:
Dict[Any, List]
- b2luigi.core.utils.task_iterator(task, only_non_complete=False)[source]#
Iterates through a task and its dependencies in a directed acyclic graph (DAG), ensuring that each task is yielded only once.
- Parameters:
task – The root task to start iterating from. This task should have methods
complete()
to check if the task is complete anddeps()
to retrieve its dependencies.only_non_complete (bool, optional) – If True, only tasks that are not complete (as determined by the
complete()
method) will be yielded. Defaults toFalse
.
- Yields:
task –
- Each unique task in the DAG, starting from the given root task and
including its dependencies.
Notes
The function uses a cache (
already_seen_tasks
) to ensure that tasks are not yielded multiple times, even if they are dependencies of multiple parent tasks in the DAG.The iteration is performed recursively using a nested helper function.
- b2luigi.core.utils.find_dependents(task_iterator, target_task)[source]#
Identifies and returns a set of tasks that are dependents of a specified target task.
- Parameters:
task_iterator (iterable) – An iterable of
luigi
task instances to search through.target_task (str) – The name of the target task class to find dependents for.
- Returns:
A set of tasks that are either instances of the target task class or depend on it.
- Return type:
set
Notes
A task is considered a dependent if it directly or indirectly requires the target task.
The
requires()
method of each task is used to determine dependencies.
- b2luigi.core.utils.get_all_output_files_in_tree(root_module, key=None)[source]#
Recursively retrieves all output files from tasks in a given module tree.
This function iterates through all tasks in the specified root module, collects their output files, and organizes them into a dictionary. If a specific key is provided, it returns the output files corresponding to that key.
- Parameters:
root_module (module) – The root module containing tasks to iterate over.
key (str, optional) – A specific key to filter the output files. If provided, only the output files corresponding to this key will be returned.
- Returns:
A dictionary where keys are file identifiers and values are lists of dictionaries containing:
exists
(bool): Whether the file exists.parameters
(dict): Serialized parameters of the task.file_name
(str): Absolute path to the file.
- Return type:
dict
- Raises:
KeyError – If the specified key is not found in the output files.
Notes
The function uses
task_iterator
to traverse tasks in the module tree.Output files are flattened and converted to absolute file paths.
- b2luigi.core.utils.get_serialized_parameters(task)[source]#
Retrieve a string-typed ordered dictionary of significant parameters in the format
key=value
.This function iterates over the parameters of a given task, filters out non-significant parameters, and serializes the significant ones. If a parameter has a custom serialization method (
serialize_hashed
), it will use that; otherwise, it defaults to the standardserialize
method.- Parameters:
task – An object representing a task, which must implement the
get_params
method to retrieve its parameters and their metadata.- Returns:
- An ordered dictionary where keys are parameter names and values
are their serialized representations.
- Return type:
collections.OrderedDict
- b2luigi.core.utils.create_output_file_name(task, base_filename: str, result_dir: str | None = None) str [source]#
Generates an output file path based on the task’s parameters, a base filename, and an optional result directory.
- Parameters:
task – The task object containing parameters to serialize and use in the output path.
base_filename (str) – The base name of the output file.
result_dir (Optional[str]) – The directory where the output file should be saved. If not provided, it defaults to the
result_dir
setting.
- Returns:
The full path to the output file.
- Return type:
str
- Raises:
ValueError – If any parameter value contains a path separator or cannot be interpreted as a valid directory name.
Notes
The function ensures that the result directory is evaluated relative to the current executed file.
The parameter separator and whether to include parameter names in the output path are configurable via settings.
If
use_parameter_name_in_output
is enabled, the output path includes parameter names and values; otherwise, only parameter values are used.If
parameter_separator
is set to a non-empty string, it will be used to separate parameter names and values in the output path.
- b2luigi.core.utils.get_log_file_dir(task)[source]#
Determines the directory where log files for a given task should be stored.
If the task has a custom method
get_log_file_dir
, it will use that method to retrieve the log file directory. Otherwise, it constructs the log file directory path based on the task’s settings and family.- Parameters:
task – The task object for which the log file directory is being determined.
- Returns:
The path to the log file directory for the given task.
- Return type:
str
- b2luigi.core.utils.get_task_file_dir(task)[source]#
Determines the directory path for a given task’s output files.
If the task has a method
get_task_file_dir
, it will use that method to retrieve the directory path. Otherwise, it generates the directory path using thecreate_output_file_name
function and the task’s family name.- Parameters:
task – An object representing the task. It is expected to have a get_task_file_dir method or a get_task_family method.
- Returns:
The directory path for the task’s output files.
- Return type:
str
- b2luigi.core.utils.get_filename()[source]#
Retrieves the absolute path of the main script being executed.
- Returns:
The absolute path of the main script.
- Return type:
str
- b2luigi.core.utils.map_folder(input_folder)[source]#
Maps a relative folder path to an absolute path based on the location of the current script.
If the input folder path is already absolute, it is returned as-is. Otherwise, the function determines the directory of the current script and joins it with the relative input folder path to produce an absolute path.
- Parameters:
input_folder (str) – The folder path to map. Can be either absolute or relative.
- Returns:
The absolute path corresponding to the input folder.
- Return type:
str
- Raises:
AttributeError – If the current script location cannot be determined, typically when running in an interactive shell (e.g., Jupyter Notebook). In such cases, the user is advised to provide absolute paths in their settings.
- b2luigi.core.utils.on_failure(task, _)[source]#
Handles the failure of a task by generating an explanation message, printing it to stdout, and returning it to be sent back to the scheduler.
- Parameters:
task – The task instance that failed.
- Returns:
A detailed explanation of the failure, including the task ID, parameters, and the location of the log files.
- Return type:
str
- b2luigi.core.utils.add_on_failure_function(task)[source]#
Assigns a custom failure handler to the given task.
This function dynamically binds the
on_failure
method to the provided task object.- Parameters:
task – The task object to which the
on_failure
method will be attached.
- b2luigi.core.utils.create_cmd_from_task(task)[source]#
Constructs a command-line argument list based on the provided task and its settings.
The executable string is made up of three key components:
<executable_prefix> <executable> <filename> --batch-runner --task-id ExampleTask_id_123 <task_cmd_additional_args>
- Parameters:
task – An object representing the task for which the command is being created.
- Returns:
A list of strings representing the command-line arguments.
- Return type:
list
- Raises:
ValueError – If any of the following conditions are met: - The
task_cmd_additional_args
setting is not a list of strings. - Theexecutable_prefix
setting is not a list of strings. - Theexecutable
setting is not a list of strings.
Notes
The
filename
is included in the command if theadd_filename_to_cmd
setting is enabled. (Default:True
)
- b2luigi.core.utils.create_output_dirs(task)[source]#
Creates the necessary output directories for a given task.
This function takes a task object, retrieves its outputs, and ensures that the directories required for those outputs exist by calling the
makedirs
method on each target.- Parameters:
task – The task object whose outputs need directories to be created.
- b2luigi.core.utils.get_filled_params(task)[source]#
Retrieve a dictionary of parameter names and their corresponding values from a given task.
- Parameters:
task – An object representing a task, which must have a get_params method that returns an iterable of parameter name and metadata pairs, and attributes corresponding to the parameter names.
- Returns:
- A dictionary containing parameter names as keys and their
respective values as values.
- Return type:
dict
- b2luigi.core.utils.is_subdir(path, parent_dir)[source]#
Determines if a given path is a subdirectory of a specified parent directory.
- Parameters:
path (str) – The path to check.
parent_dir (str) – The parent directory to compare against.
- Returns:
True
if the path is a subdirectory of the parent directory,False
otherwise.- Return type:
bool
Note
Both
path
andparent_dir
are converted to their absolute paths before comparison.
- b2luigi.core.utils.get_apptainer_or_singularity(task=None)[source]#
Determines the command to use for containerization, prioritizing Apptainer over Singularity. If neither is available, raises a ValueError.
The function first checks if a custom command is set via the
apptainer_cmd
setting. If not, it checks for the availability of theapptainer
orsingularity
commands in the system’s PATH.- Parameters:
task (optional) – An optional task object that may be used to retrieve task-specific settings.
- Returns:
- The command to use for containerization, either “apptainer” or
”singularity”.
- Return type:
str
- Raises:
ValueError – If neither Apptainer nor Singularity is available on the system and no custom command is set.
- b2luigi.core.utils.create_apptainer_command(command, task=None)[source]#
Constructs a command to execute within an Apptainer (or Singularity) container.
This function generates a command string that sets up the necessary environment and mounts for running a given command inside an Apptainer container. It ensures that required settings are provided and validates compatibility with the batch system.
- Parameters:
command (str) – The command to be executed inside the Apptainer container.
task (optional) – An optional task object or identifier used to retrieve task-specific settings.
- Returns:
A list of command-line arguments representing the full Apptainer execution command.
- Return type:
list
- Raises:
ValueError – If the
env_script
is not provided.ValueError – If the batch system is
gbasf2
, as Apptainer is not supported for this batch system.
Notes
apptainer_image
is retrieved from the task settings.apptainer_additional_params
is used to specify additional parameters for the Apptainer command. Expecting a string.apptainer_mounts
is used to specify additional mount points for the Apptainer command. Expecting a list of strings.apptainer_mount_defaults
determines whether to include default mount points (e.g., result directory and log file directory).
Executable#
- b2luigi.core.executable.create_executable_wrapper(task)[source]#
Creates a bash script wrapper to execute a task with the appropriate environment and settings. The wrapper script sets up the working directory, environment variables, and optionally uses an Apptainer image for execution.
- Parameters:
task – The task containing configuration and settings.
- Returns:
The file path to the generated executable wrapper script.
- Return type:
str
- The wrapper script performs the following steps:
Changes to the
working_dir
directory.- Sets up the environment:
Sources
env_script
if provided.Overrides environment variables based on task or settings from
env
.
Constructs the command to execute the task, see
create_cmd_from_task
.- Executes the command:
If an Apptainer image is specified in
apptainer_image
, runs the command within the image.Otherwise, executes the command directly with
exec
.
Writes the generated script to a file and makes it executable.
- b2luigi.core.executable.run_task_remote(task)[source]#
Executes a given task remotely by creating an executable script and running it via a subprocess call. The standard output and error streams are redirected to log files.
- Parameters:
task – The task to be executed
- Raises:
RuntimeError – If the subprocess call returns a non-zero exit code, indicating a failure during execution.
- Side Effects:
Creates a directory for log files if it does not already exist.
Writes the standard output and error of the subprocess execution to separate log files.