Utilities#

requires and inherits#

b2luigi provides two functions to help you with the task creation. The first one is b2luigi.requires(), which is a decorator that allows you to specify the dependencies of a task easily. It is similar to the requires method in luigi, but it allows you to specify the dependencies in a more flexible way. The second one is b2luigi.inherits(), which is a decorator that allows you to inherit the parameters of a task easily. It is similar to the inherits method in luigi, but it allows you to inherit without specific parameters.

b2luigi.requires(*tasks_to_require, **kwargs)[source]#

This “hack” copies the luigi.requires functionality, except that we allow for additional kwarg arguments when called.

It can be used to require a certain task, but with some variables already set, e.g.

class TaskA(b2luigi.Task):
    some_parameter = b2luigi.IntParameter()
    some_other_parameter = b2luigi.IntParameter()

    def output(self):
        yield self.add_to_output("test.txt")

@b2luigi.requires(TaskA, some_parameter=3)
class TaskB(b2luigi.Task):
    another_parameter = b2luigi.IntParameter()

    def output(self):
        yield self.add_to_output("out.dat")

TaskB will not require TaskA, where some_parameter is already set to 3. This also means, that TaskB only has the parameters another_parameter and some_other_parameter (because some_parameter is already fixed).

It is also possible to require multiple tasks, e.g.

class TaskA(b2luigi.Task):
    some_parameter = b2luigi.IntParameter()

    def output(self):
        yield self.add_to_output("test.txt")

class TaskB(b2luigi.Task):
    some_other_parameter = b2luigi.IntParameter()

    def output(self):
        yield self.add_to_output("test.txt")

@b2luigi.requires(TaskA, TaskB)
class TaskC(b2luigi.Task):
    another_parameter = b2luigi.IntParameter()

    def output(self):
        yield self.add_to_output("out.dat")
b2luigi.inherits(*tasks_to_inherit, **kwargs)[source]#

This copies the luigi.inherits functionality but allows specifying parameters you don’t want to inherit.

It can e.g. be used in tasks that merge the output of the tasks they require. These merger tasks don’t need the parameter they resolve anymore but should keep the same order of parameters, therefore simplifying the directory structure created by b2luigi.Task.add_to_output().

Usage can be similar to this:

class TaskA(b2luigi.Task):
    some_parameter = b2luigi.IntParameter()
    some_other_parameter = b2luigi.IntParameter()

    def output(self):
        yield self.add_to_output("test.txt")

@b2luigi.inherits(TaskA, without='some_other_parameter')
class TaskB(b2luigi.Task):
    another_parameter = b2luigi.IntParameter()

    def requires(self):
        for my_other_parameter in range(10):
            yield self.clone(TaskA, some_other_parameter=my_other_parameter)

    def run(self):
        # somehow merge the output of TaskA to create "out.dat"
        pass

    def output(self):
        yield self.add_to_output("out.dat")
Parameters:

without – Either a string or a collection of strings

See also: b2luigi.requires which extends luigi.requires.

Core Utilities#

b2luigi.core.utils.product_dict(**kwargs: Any) Iterator[Dict[str, Any]][source]#

Cross-product the given parameters and return a list of dictionaries.

Example

>>> list(product_dict(arg_1=[1, 2], arg_2=[3, 4]))
[{'arg_1': 1, 'arg_2': 3}, {'arg_1': 1, 'arg_2': 4}, {'arg_1': 2, 'arg_2': 3}, {'arg_1': 2, 'arg_2': 4}]

The thus produced list can directly be used as inputs for a required tasks:

def requires(self):
    for args in product_dict(arg_1=[1, 2], arg_2=[3, 4]):
        yield some_task(**args)
Parameters:

kwargs (Any) – Each keyword argument should be an iterable

Returns:

A list of kwargs where each list of input keyword arguments is cross-multiplied with every other.

Return type:

Iterator[Dict[str, Any]]

b2luigi.core.utils.fill_kwargs_with_lists(**kwargs: Any) Dict[str, List[Any]][source]#

Return the kwargs with each value mapped to [value] if not a list already.

Example: .. code-block:: python

>>> fill_kwargs_with_lists(arg_1=[1, 2], arg_2=3)
{'arg_1': [1, 2], 'arg_2': [3]}
Parameters:

kwargs – The input keyword arguments

Returns:

Same as kwargs, but each value mapped to a list if not a list already

Return type:

Dict[str, List[Any]]

b2luigi.core.utils.flatten_to_file_paths(inputs: Iterable[FileSystemTarget]) Dict[str, List[str]][source]#

Take in a dict of lists of luigi targets and replace each luigi target by its corresponding path. For dicts, it will replace the value as well as the key. The key will however only by the basename of the path.

Parameters:

inputs – A dict of lists of luigi targets

Returns:

A dict with the keys replaced by the basename of the targets and the values by the full path

Return type:

Dict[str, List[str]]

b2luigi.core.utils.flatten_to_dict(inputs: Iterable[Any]) Dict[Any, Any][source]#

Return a whatever input structure into a dictionary. If it is a dict already, return this. If is is an iterable of dict or dict-like objects, return the merged dictionary. All non-dict values will be turned into a dictionary with value -> {value: value}

Example: .. code-block:: python

>>> flatten_to_dict([{"a": 1, "b": 2}, {"c": 3}, "d"])
{'a': 1, 'b': 2, 'c': 3, 'd': 'd'}
Parameters:

inputs (Iterable[Any]) – The input structure

Returns:

A dict constructed as described above.

Return type:

Dict[Any, Any]

b2luigi.core.utils.flatten_to_dict_of_lists(inputs: Iterable[Any]) Dict[Any, List][source]#

Flattens a nested iterable structure into a dictionary of lists.

This function takes an iterable of potentially nested structures, flattens it, and converts it into a dictionary where each key maps to a list of values associated with that key.

Parameters:

inputs (Iterable[Any]) – The input iterable containing nested structures.

Returns:

A dictionary where keys are derived from the input and values are lists of corresponding items.

Return type:

Dict[Any, List]

b2luigi.core.utils.task_iterator(task, only_non_complete=False)[source]#

Iterates through a task and its dependencies in a directed acyclic graph (DAG), ensuring that each task is yielded only once.

Parameters:
  • task – The root task to start iterating from. This task should have methods complete() to check if the task is complete and deps() to retrieve its dependencies.

  • only_non_complete (bool, optional) – If True, only tasks that are not complete (as determined by the complete() method) will be yielded. Defaults to False.

Yields:

task

Each unique task in the DAG, starting from the given root task and

including its dependencies.

Notes

  • The function uses a cache (already_seen_tasks) to ensure that tasks are not yielded multiple times, even if they are dependencies of multiple parent tasks in the DAG.

  • The iteration is performed recursively using a nested helper function.

b2luigi.core.utils.find_dependents(task_iterator, target_task)[source]#

Identifies and returns a set of tasks that are dependents of a specified target task.

Parameters:
  • task_iterator (iterable) – An iterable of luigi task instances to search through.

  • target_task (str) – The name of the target task class to find dependents for.

Returns:

A set of tasks that are either instances of the target task class or depend on it.

Return type:

set

Notes

  • A task is considered a dependent if it directly or indirectly requires the target task.

  • The requires() method of each task is used to determine dependencies.

b2luigi.core.utils.get_all_output_files_in_tree(root_module, key=None)[source]#

Recursively retrieves all output files from tasks in a given module tree.

This function iterates through all tasks in the specified root module, collects their output files, and organizes them into a dictionary. If a specific key is provided, it returns the output files corresponding to that key.

Parameters:
  • root_module (module) – The root module containing tasks to iterate over.

  • key (str, optional) – A specific key to filter the output files. If provided, only the output files corresponding to this key will be returned.

Returns:

A dictionary where keys are file identifiers and values are lists of dictionaries containing:

  • exists (bool): Whether the file exists.

  • parameters (dict): Serialized parameters of the task.

  • file_name (str): Absolute path to the file.

Return type:

dict

Raises:

KeyError – If the specified key is not found in the output files.

Notes

  • The function uses task_iterator to traverse tasks in the module tree.

  • Output files are flattened and converted to absolute file paths.

b2luigi.core.utils.get_serialized_parameters(task)[source]#

Retrieve a string-typed ordered dictionary of significant parameters in the format key=value.

This function iterates over the parameters of a given task, filters out non-significant parameters, and serializes the significant ones. If a parameter has a custom serialization method (serialize_hashed), it will use that; otherwise, it defaults to the standard serialize method.

Parameters:

task – An object representing a task, which must implement the get_params method to retrieve its parameters and their metadata.

Returns:

An ordered dictionary where keys are parameter names and values

are their serialized representations.

Return type:

collections.OrderedDict

b2luigi.core.utils.create_output_file_name(task, base_filename: str, result_dir: str | None = None) str[source]#

Generates an output file path based on the task’s parameters, a base filename, and an optional result directory.

Parameters:
  • task – The task object containing parameters to serialize and use in the output path.

  • base_filename (str) – The base name of the output file.

  • result_dir (Optional[str]) – The directory where the output file should be saved. If not provided, it defaults to the result_dir setting.

Returns:

The full path to the output file.

Return type:

str

Raises:

ValueError – If any parameter value contains a path separator or cannot be interpreted as a valid directory name.

Notes

  • The function ensures that the result directory is evaluated relative to the current executed file.

  • The parameter separator and whether to include parameter names in the output path are configurable via settings.

  • If use_parameter_name_in_output is enabled, the output path includes parameter names and values; otherwise, only parameter values are used.

  • If parameter_separator is set to a non-empty string, it will be used to separate parameter names and values in the output path.

b2luigi.core.utils.get_log_file_dir(task)[source]#

Determines the directory where log files for a given task should be stored.

If the task has a custom method get_log_file_dir, it will use that method to retrieve the log file directory. Otherwise, it constructs the log file directory path based on the task’s settings and family.

Parameters:

task – The task object for which the log file directory is being determined.

Returns:

The path to the log file directory for the given task.

Return type:

str

b2luigi.core.utils.get_task_file_dir(task)[source]#

Determines the directory path for a given task’s output files.

If the task has a method get_task_file_dir, it will use that method to retrieve the directory path. Otherwise, it generates the directory path using the create_output_file_name function and the task’s family name.

Parameters:

task – An object representing the task. It is expected to have a get_task_file_dir method or a get_task_family method.

Returns:

The directory path for the task’s output files.

Return type:

str

b2luigi.core.utils.get_filename()[source]#

Retrieves the absolute path of the main script being executed.

Returns:

The absolute path of the main script.

Return type:

str

b2luigi.core.utils.map_folder(input_folder)[source]#

Maps a relative folder path to an absolute path based on the location of the current script.

If the input folder path is already absolute, it is returned as-is. Otherwise, the function determines the directory of the current script and joins it with the relative input folder path to produce an absolute path.

Parameters:

input_folder (str) – The folder path to map. Can be either absolute or relative.

Returns:

The absolute path corresponding to the input folder.

Return type:

str

Raises:

AttributeError – If the current script location cannot be determined, typically when running in an interactive shell (e.g., Jupyter Notebook). In such cases, the user is advised to provide absolute paths in their settings.

b2luigi.core.utils.on_failure(task, _)[source]#

Handles the failure of a task by generating an explanation message, printing it to stdout, and returning it to be sent back to the scheduler.

Parameters:

task – The task instance that failed.

Returns:

A detailed explanation of the failure, including the task ID, parameters, and the location of the log files.

Return type:

str

b2luigi.core.utils.add_on_failure_function(task)[source]#

Assigns a custom failure handler to the given task.

This function dynamically binds the on_failure method to the provided task object.

Parameters:

task – The task object to which the on_failure method will be attached.

b2luigi.core.utils.create_cmd_from_task(task)[source]#

Constructs a command-line argument list based on the provided task and its settings.

The executable string is made up of three key components:

<executable_prefix> <executable> <filename> --batch-runner --task-id ExampleTask_id_123 <task_cmd_additional_args>
Parameters:

task – An object representing the task for which the command is being created.

Returns:

A list of strings representing the command-line arguments.

Return type:

list

Raises:

ValueError – If any of the following conditions are met: - The task_cmd_additional_args setting is not a list of strings. - The executable_prefix setting is not a list of strings. - The executable setting is not a list of strings.

Notes

  • The filename is included in the command if the add_filename_to_cmd setting is enabled. (Default: True)

b2luigi.core.utils.create_output_dirs(task)[source]#

Creates the necessary output directories for a given task.

This function takes a task object, retrieves its outputs, and ensures that the directories required for those outputs exist by calling the makedirs method on each target.

Parameters:

task – The task object whose outputs need directories to be created.

b2luigi.core.utils.get_filled_params(task)[source]#

Retrieve a dictionary of parameter names and their corresponding values from a given task.

Parameters:

task – An object representing a task, which must have a get_params method that returns an iterable of parameter name and metadata pairs, and attributes corresponding to the parameter names.

Returns:

A dictionary containing parameter names as keys and their

respective values as values.

Return type:

dict

b2luigi.core.utils.is_subdir(path, parent_dir)[source]#

Determines if a given path is a subdirectory of a specified parent directory.

Parameters:
  • path (str) – The path to check.

  • parent_dir (str) – The parent directory to compare against.

Returns:

True if the path is a subdirectory of the parent directory, False otherwise.

Return type:

bool

Note

Both path and parent_dir are converted to their absolute paths before comparison.

b2luigi.core.utils.get_apptainer_or_singularity(task=None)[source]#

Determines the command to use for containerization, prioritizing Apptainer over Singularity. If neither is available, raises a ValueError.

The function first checks if a custom command is set via the apptainer_cmd setting. If not, it checks for the availability of the apptainer or singularity commands in the system’s PATH.

Parameters:

task (optional) – An optional task object that may be used to retrieve task-specific settings.

Returns:

The command to use for containerization, either “apptainer” or

”singularity”.

Return type:

str

Raises:

ValueError – If neither Apptainer nor Singularity is available on the system and no custom command is set.

b2luigi.core.utils.create_apptainer_command(command, task=None)[source]#

Constructs a command to execute within an Apptainer (or Singularity) container.

This function generates a command string that sets up the necessary environment and mounts for running a given command inside an Apptainer container. It ensures that required settings are provided and validates compatibility with the batch system.

Parameters:
  • command (str) – The command to be executed inside the Apptainer container.

  • task (optional) – An optional task object or identifier used to retrieve task-specific settings.

Returns:

A list of command-line arguments representing the full Apptainer execution command.

Return type:

list

Raises:
  • ValueError – If the env_script is not provided.

  • ValueError – If the batch system is gbasf2, as Apptainer is not supported for this batch system.

Notes

  • apptainer_image is retrieved from the task settings.

  • apptainer_additional_params is used to specify additional parameters for the Apptainer command. Expecting a string.

  • apptainer_mounts is used to specify additional mount points for the Apptainer command. Expecting a list of strings.

  • apptainer_mount_defaults determines whether to include default mount points (e.g., result directory and log file directory).

b2luigi.core.utils.get_luigi_logger()[source]#

Retrieve the logger instance used by luigi.

Returns:

The logger instance for “luigi-interface”.

Return type:

logging.Logger

Executable#

b2luigi.core.executable.create_executable_wrapper(task)[source]#

Creates a bash script wrapper to execute a task with the appropriate environment and settings. The wrapper script sets up the working directory, environment variables, and optionally uses an Apptainer image for execution.

Parameters:

task – The task containing configuration and settings.

Returns:

The file path to the generated executable wrapper script.

Return type:

str

The wrapper script performs the following steps:
  1. Changes to the working_dir directory.

  2. Sets up the environment:
    • Sources env_script if provided.

    • Overrides environment variables based on task or settings from env.

  3. Constructs the command to execute the task, see create_cmd_from_task.

  4. Executes the command:
    • If an Apptainer image is specified in apptainer_image, runs the command within the image.

    • Otherwise, executes the command directly with exec.

  5. Writes the generated script to a file and makes it executable.

b2luigi.core.executable.run_task_remote(task)[source]#

Executes a given task remotely by creating an executable script and running it via a subprocess call. The standard output and error streams are redirected to log files.

Parameters:

task – The task to be executed

Raises:

RuntimeError – If the subprocess call returns a non-zero exit code, indicating a failure during execution.

Side Effects:
  • Creates a directory for log files if it does not already exist.

  • Writes the standard output and error of the subprocess execution to separate log files.