Super-hero Task Classes#

If you want to use the default luigi.Task class or any derivative of it, you are totally fine. No need to change any of your scripts! But if you want to take advantage of some of the recipes we have developed to work with large luigi task sets, you can use the drop in replacements from the b2luigi package. All task classes (except the b2luigi.DispatchableTask, see below) are superclasses of a luigi class. As we import luigi into b2luigi, you just need to replace

import luigi

with

import b2luigi as luigi

and you will have all the functionality of luigi and b2luigi without the need to change anything!

class b2luigi.Task(*args, **kwargs)[source]#

Bases: Task

Drop in replacement for luigi.Task which is 100% API compatible. It just adds some useful methods for handling output file name generation using the parameters of the task. See Quick Start on information on how to use the methods.

Example

class MyAverageTask(b2luigi.Task):
    def requires(self):
        for i in range(100):
            yield self.clone(MyNumberTask, some_parameter=i)

    def output(self):
        yield self.add_to_output("average.txt")

    def run(self):
        # Build the mean
        summed_numbers = 0
        counter = 0
        for input_file in self.get_input_file_names("output_file.txt"):
            with open(input_file, "r") as f:
                summed_numbers += float(f.read())
                counter += 1

        average = summed_numbers / counter

        with self.get_output_file("average.txt").open("w") as f:
            f.write(f"{average}\n")
add_to_output(output_file_name: str, target_class: Type[FileSystemTarget] | None = None, result_dir: str | None = None, **kwargs) Dict[str, LocalTarget][source]#

Call this in your output() function to add a target to the list of files, this task will output. Always use in combination with yield. This function will automatically add all current parameter values to the file name when used in the form:

<result-path>/param1=value1/param2=value2/.../<output-file-name.ext>

This function will by default use a b2luigi.LocalTarget, but you can also pass a different target_class as an argument. If you do not want this, you can override the _get_output_file_target function.

Example

This adds two files called some_file.txt and some_other_file.txt to the output:

def output(self):
    yield self.add_to_output("some_file.txt")
    yield self.add_to_output("some_other_file.txt")
Parameters:
  • output_file_name (str) – the file name of the output file. Refer to this file name as a key when using get_input_file_names, get_output_file_names or get_output_file.

  • target_class – which class of FileSystemTarget to instantiate for this target. defaults to b2luigi.LocalTarget

  • result_dir (str, optional) – Optionally pass a result_dir to the create_output_file_name.

  • kwargs – kwargs to be passed to the __init__ of the Target_class via the _get_output_file_target function

Returns:

A dictionary with the output file name as key and the target as value.

static _transform_io(input_generator: Iterable[FileSystemTarget]) Dict[str, List[str]][source]#
get_all_input_file_names() Iterator[str][source]#

Return all file paths required by this task.

Example

class TheSuperFancyTask(b2luigi.Task):
    def dry_run(self):
        for name in self.get_all_output_file_names():
            print(f"              output: {name}")
Yields:

Iterator[str] – An iterator over the input file paths as strings.

get_input_file_names(key: str | None = None) Dict[str, List[str]] | List[str][source]#

Get a dictionary of input file names of the tasks, which are defined in our requirements. Either use the key argument or dictionary indexing with the key given to add_to_output to get back a list (!) of file paths.

Parameters:

key (str, optional) – If given, only return a list of file paths with this given key.

Returns:

If key is none, returns a dictionary of keys to list of file paths. Else, returns only the list of file paths for this given key.

get_input_file_names_from_dict(requirement_key: str, key: str | None = None) Dict[str, List[str]] | List[str][source]#

Get a dictionary of input file names of the tasks, which are defined in our requirements.

The requirement method should return a dict whose values are generator expressions (!) yielding required task objects.

Example

class TaskB(luigi.Task):

    def requires(self):
        return {
            "a": (TaskA(5.0, i) for i in range(100)),
            "b": (TaskA(1.0, i) for i in range(100)),
        }

    def run(self):
        result_a = do_something_with_a(
            self.get_input_file_names_from_dict("a")
        )
        result_b = do_something_with_b(
            self.get_input_file_names_from_dict("b")
        )

        combine_a_and_b(
            result_a,
            result_b,
            self.get_output_file_name("combined_results.txt")
        )

    def output(self):
        yield self.add_to_output("combined_results.txt")

Either use the key argument or dictionary indexing with the key given to add_to_output to get back a list (!) of file paths.

Parameters:
  • requirement_key (str) – Specifies the required task expression.

  • key (str, optional) – If given, only return a list of file paths with this given key.

Returns:

If key is none, returns a dictionary of keys to list of file paths. Else, returns only the list of file paths for this given key.

get_input_file_name(key: str | None = None)[source]#

Wraps get_input_file_names and asserts there is only one input file.

Parameters:

key (str, optional) – Return the file path with this given key.

Returns:

File path for the given key.

get_all_output_file_names() Iterator[str][source]#

Return all file paths created by this task.

Example

class TheSuperFancyTask(b2luigi.Task):
    def dry_run(self):
        for name in self.get_all_output_file_names():
            print(f"              output: {name}")
Yields:

str – The file path of each output file.

get_output_file_name(key: str) str[source]#

Analogous to get_input_file_names this function returns a an output file defined in out output function with the given key.

In contrast to get_input_file_names, only a single file name will be returned (as there can only be a single output file with a given name).

Parameters:

key (str) – Return the file path with this given key.

Returns:

Returns only the file path for this given key.

_get_input_targets(key: str) Target[source]#

Retrieve the input targets associated with a specific key.

This method acts as a shortcut to access the input targets for a given key from the task’s input.

Parameters:

key (str) – The key for which to retrieve the corresponding input targets.

Returns:

The luigi target(s) associated with the specified key.

Return type:

luigi.Target

Raises:

KeyError – If the specified key is not found in the input dictionary.

_get_output_target(key: str) FileSystemTarget[source]#

Retrieves the output target associated with a specific key.

This method acts as a shortcut to access a luigi target from the task’s output.

Parameters:

key (str) – The key for which the output target is to be retrieved.

Returns:

The luigi target associated with the specified key.

Return type:

luigi.Target

_get_output_file_target(base_filename: str, target_class: Type[FileSystemTarget] | None = None, result_dir: str | None = None, **kwargs: Any) FileSystemTarget[source]#

Generates a Luigi file system target for the output file.

This method constructs the output file name using the provided base filename and additional keyword arguments, and then returns a file system target instance of the specified target class.

Parameters:
  • base_filename (str) – The base name of the output file.

  • target_class (Type[FileSystemTarget], optional) – The class of the file system target to use. Defaults to b2luigi.LocalTarget.

  • result_dir (str, optional) – Optionally pass a result_dir to the create_output_file_name.

  • kwargs (Any) – Additional keyword arguments passed to the target_class’ __init__

Returns:

An instance of the specified file system target class pointing to the output file.

Return type:

LocalTarget

_resolve_output_file_target(filename: str, target_class: Type[FileSystemTarget] | None = None, **kwargs: Any) FileSystemTarget[source]#

Resolves and returns a file system target based on the provided filename and optional target class. This method determines the appropriate file system target class to use for the given filename. It prioritizes the provided target_class, falls back to a default target class specified in the settings, or defaults to using a b2luigi.LocalTarget if no other target class is specified.

Parameters:
  • filename (str) – The name of the file for which the target is being resolved.

  • target_class (Optional[Type[FileSystemTarget]]) – An optional file system target class to use. If provided, this class will be used to create the target.

  • kwargs (Any) – Additional keyword arguments to pass to the target class constructor.

Returns:

The resolved file system target class.

Return type:

FileSystemTarget

Notes

  • The default_task_target_class and target_class_kwargs settings are retrieved using the b2luigi.get_setting() function.

  • If target_class is provided, it takes precedence over the default target class.

  • If neither target_class nor default_task_target_class is provided, a b2luigi.LocalTarget is used as the default.

_remove_output_file_target(base_filename: str) None[source]#

Removes the output file target associated with the given base filename.

This method retrieves the output file target using the provided base filename and attempts to remove it. If the target does not have a remove method, a NotImplementedError is raised.

Parameters:

base_filename (str) – The base filename used to identify the output file target.

Raises:

NotImplementedError – If the target does not have a remove method.

_remove_output() None[source]#

Removes all output file targets associated with the task.

This method iterates through all output file names retrieved from get_all_output_file_names and removes each corresponding output file target by calling _remove_output_file_target.

Warning

Be very careful with this method! It will remove all output files of this task! This is not reversible.

Hint

If you are very sure in what you are doing, you can use this method to remove all output files of this task by calling it in the remove_output method of your task.

Example

class TheSuperFancyTask(b2luigi.Task):
    def remove_output(self):
        self._remove_output()
Returns:

None

_abc_impl = <_abc._abc_data object>#
_namespace_at_class_time = ''#
class b2luigi.ExternalTask(*args, **kwargs)[source]#

Bases: Task, ExternalTask

Direct copy of luigi.ExternalTask, but with the capabilities of Task added.

class b2luigi.WrapperTask(*args, **kwargs)[source]#

Bases: Task, WrapperTask

Direct copy of luigi.WrapperTask, but with the capabilities of Task added.

b2luigi.dispatch(run_function)[source]#

In cases you have a run function calling external, probably insecure functionalities, use this function wrapper around your run function. It basically “emulates” a batch submission on your local computer (without any batch system) with the benefit of having a totally separate execution path. If your called task fails miserably (e.g. segfaults), it does not crash your main application.

Example

The run function can include any code you want. When the task runs, it is started in a subprocess and monitored by the parent process. When it dies unexpectedly (e.g. because of a segfault etc.) the task will be marked as failed. If not, it is successful. The log output will be written to two files in the log folder (marked with the parameters of the task), which you can check afterwards:

import b2luigi

class MyTask(b2luigi.Task):
    @b2luigi.dispatch
    def run(self):
        call_some_evil_function()

Note

We are reusing the batch system implementation here, with all its settings and nobs to setup the environment etc. If you want to control it in more detail, please check out Batch Processing.

Implementation note:

In the subprocess we are calling the current executable with the current input file as a parameter, but let it only run this specific task (by handing over the task id and the --batch-worker option). The run function notices this and actually runs the task instead of dispatching again.

Additionally, you can add a cmd_prefix parameter to your class, which also needs to be a list of strings, which are prefixed to the current command (e.g. if you want to add a profiler to all your tasks).

class b2luigi.DispatchableTask(*args, **kwargs)[source]#

Bases: Task

Instead of using the dispatch function wrapper, you can also inherit from this class. Except that, it has exactly the same functionality as a normal Task.

Important

You need to overload the process function instead of the run function in this case!

process()[source]#

Override this method with your normal run function. Do not touch the run function itself!