Temporary File Context Manager#

b2luigi.on_temporary_files(run_function)[source]#

Wrapper for decorating a task’s run function to use temporary files as outputs.

A common problem when using long running tasks in luigi is the so called thanksgiving bug (see https://www.arashrouhani.com/luigi-budapest-bi-oct-2015/#/21). It occurs, when you define an output of a task and in its run function, you create this output before filling it with content (maybe even only after a long lasting calculation). It may happen, that during the creation of the output and the finish of the calculation some other tasks checks if the output is already there, finds it and assumes, that the task is already finished (although there is probably only non-sense in the file so far).

A solution is already given by luigi itself, when using the temporary_path() function of the file system targets, which is really nice! Unfortunately, this means you have to open all your output files with a context manager and this is very hard to do if you have external tasks also (because they will probably use the output file directly instead of the temporary file version of if).

This wrapper simplifies the usage of the temporary files:

import b2luigi

class MyTask(b2luigi.Task):
    def output(self):
        yield self.add_to_output("test.txt")

    @b2luigi.on_temporary_files
    def run(self):
        with open(self.get_output_file_name("test.txt"), "w") as f:
            raise ValueError()
            f.write("Test")

Instead of creating the file “test.txt” at the beginning and filling it with content later (which will never happen because of the exception thrown, which makes the file existing but the task actually not finished), the file will be written to a temporary file first and copied to its final location at the end of the run function (but only if there was no error).

Warning

The decorator only edits the function b2luigi.Task.get_output_file_name(). If you are using the output directly, you have to take care of using the temporary path correctly by yourself!

class b2luigi.core.temporary_wrapper.TemporaryFileContextManager(task: Task)[source]#

Bases: ExitStack

A context manager for managing temporary file paths for a given task. This class overrides the task’s methods for retrieving input and output file names, ensuring that temporary file paths are used during the context’s lifetime. Upon exiting the context, the original methods are restored.

Usage:

This class is not meant to be used directly. Instead, it is used within the on_temporary_files

__enter__()[source]#

This method is called when entering the context manager. It overrides the task’s methods for retrieving input and output file names, replacing them with custom implementations that handle temporary file paths.

Redefinitions:

  • get_output_file_name

    Retrieves or creates a temporary output file for the given key.

    If the file corresponding to the specified key is not already open, this method generates a temporary file path using the task’s b2luigi.Task._get_output_target() and b2luigi.Target.temporary_path() methods, and opens the file within the context of the task.

    Args:

    key (str): The unique identifier for the output file.

    tmp_file_kwargs: Additional keyword arguments to be passed to the b2luigi.Target.temporary_path() method.

    Returns:

    The opened temporary file associated with the given key.

  • get_input_file_names

    Retrieves the input file names associated with a given key, ensuring that the files are temporarily available for processing. Behaves the same as b2luigi.Task.get_input_file_names().

    Args:

    key (str): The identifier for the input files to retrieve.

    tmp_file_kwargs: Additional keyword arguments to pass to the b2luigi.Target.get_temporary_input() method of the target.

    Returns:

    list: A list of opened temporary input files corresponding to the given key.

    Notes:
    • If the n_download_threads setting is specified, the input files are fetched concurrently using a thread pool.

    • If n_download_threads is not specified, the input files are fetched sequentially.

  • get_input_file_names_from_dict

    Retrieves input file names from a dictionary structure, handling temporary file paths. Behaves the same as b2luigi.Task.get_input_file_names_from_dict().

    Args:

    requirement_key (str): The key used to identify the required input in the task’s input dictionary.

    key (Optional[str], optional): A specific key to extract targets from the target dictionary. If None, all targets are retrieved. Defaults to None.

    tmp_file_kwargs: Additional keyword arguments passed to the b2luigi.Target.get_temporary_input() method for generating temporary file paths.

    Returns:

    list: A list of temporary file paths corresponding to the input files.

    Raises:

    KeyError: If the specified key is not found in the target dictionary.