API Documentation
Contents
API Documentation#
b2luigi
summarizes different topics to help you in your everyday task
creation and processing.
Most important is the b2luigi.process()
function, which lets you run
arbitrary task graphs on the batch.
It is very similar to luigi.build
, but lets you hand in additional parameters
for steering the batch execution.
If you are not yet familiar with luigi
itself, we recommend you to read its current documentation: Luigi Documentation
Top-Level Function#
- b2luigi.process(task_like_elements, show_output=False, dry_run=False, test=False, batch=False, ignore_additional_command_line_args=False, **kwargs)#
Call this function in your main method to tell
b2luigi
where your entry point of the task graph is. It is very similar toluigi.build
with some additional configuration options.Example
This example defines a simple task and tells
b2luigi
to execute it 100 times with different parametes:import b2luigi import random class MyNumberTask(b2luigi.Task): some_parameter = b2luigi.Parameter() def output(self): return b2luigi.LocalTarget(f"results/output_file_{self.some_parameter}.txt") def run(self): random_number = random.random() with self.output().open("w") as f: f.write(f"{random_number}\n") if __name__ == "__main__": b2luigi.process([MyNumberTask(some_parameter=i) for i in range(100)])
All flag arguments can also be given as command line arguments. This means the call with:
b2luigi.process(tasks, batch=True)
is equivalent to calling the script with:
python script.py --batch
- Parameters
task_like_elements (
Task
or list) – Task(s) to execute with luigi. Can either be a list of tasks or a task instance.show_output (bool, optional) – Instead of running the task(s), write out all output files which will be generated marked in color, if they are present already. Good for testing of your tasks will do, what you think they should.
dry_run (bool, optional) – Instead od running the task(s), write out which tasks will be executed. This is a simplified form of dependency resolution, so this information may be wrong in some corner cases. Also good for testing.
test (bool, optional) – Does neither run on the batch system, with multiprocessing or dispatched (see
DispatchableTask
) but directly on the machine for debugging reasons. Does output all logs to the console.batch (bool, optional) – Execute the tasks on the selected batch system. Refer to Quick Start for more information. The default batch system is LSF, but this can be changed with the batch_system settings. See
get_setting
on how to define settings.ignore_additional_command_line_args (bool, optional, default False) – Ignore additional command line arguments. This is useful if you want to use this function in a file that also does some command line parsing.
**kwargs – Additional keyword arguments passed to
luigi.build
.
Warning
You should always have just a single call to
process
in your script. If you need to have multiple calls, either use ab2luigi.WrapperTask
or two scripts.
Super-hero Task Classes#
If you want to use the default luigi.Task
class or any derivative of it,
you are totally fine.
No need to change any of your scripts!
But if you want to take advantage of some of the recipies we have developed
to work with large luigi
task sets, you can use the drop in replacements
from the b2luigi
package.
All task classes (except the b2luigi.DispatchableTask
, see below) are superclasses of
a luigi
class.
As we import luigi
into b2luigi
, you just need to replace
import luigi
with
import b2luigi as luigi
and you will have all the functionality of luigi
and b2luigi
without the need to change anything!
- class b2luigi.Task(*args, **kwargs)#
Bases:
luigi.task.Task
Drop in replacement for
luigi.Task
which is 100% API compatible. It just adds some useful methods for handling output file name generation using the parameters of the task. See Quick Start on information on how to use the methods.Example
class MyAverageTask(b2luigi.Task): def requires(self): for i in range(100): yield self.clone(MyNumberTask, some_parameter=i) def output(self): yield self.add_to_output("average.txt") def run(self): # Build the mean summed_numbers = 0 counter = 0 for input_file in self.get_input_file_names("output_file.txt"): with open(input_file, "r") as f: summed_numbers += float(f.read()) counter += 1 average = summed_numbers / counter with self.get_output_file("average.txt").open("w") as f: f.write(f"{average}\n")
- add_to_output(output_file_name: str) Dict[str, luigi.local_target.LocalTarget] #
Call this in your
output()
function to add a target to the list of files, this task will output. Always use in combination withyield
. This function will automatically add all current parameter values to the file name when used in the form:<result-path>/param1=value1/param2=value2/.../<output-file-name.ext>
This function will automatically use a
LocalTarget
. If you do not want this, you can override the_get_output_file_target
function.Example
This adds two files called
some_file.txt
andsome_other_file.txt
to the output:def output(self): yield self.add_to_output("some_file.txt") yield self.add_to_output("some_other_file.txt")
- Parameters
output_file_name (
str
) – the file name of the output file. Refer to this file name as a key when usingget_input_file_names
,get_output_file_names
orget_output_file
.
- get_all_input_file_names() collections.abc.Iterator[str] #
Return all file paths required by this task.
Example
class TheSuperFancyTask(b2luigi.Task): def dry_run(self): for name in self.get_all_output_file_names(): print(f" output: {name}")
- get_all_output_file_names() collections.abc.Iterator[str] #
Return all file paths created by this task.
Example
class TheSuperFancyTask(b2luigi.Task): def dry_run(self): for name in self.get_all_output_file_names(): print(f" output: {name}")
- get_input_file_names(key: Optional[str] = None) Union[Dict[str, List[str]], List[str]] #
Get a dictionary of input file names of the tasks, which are defined in our requirements. Either use the key argument or dictionary indexing with the key given to
add_to_output
to get back a list (!) of file paths.- Parameters
key (
str
, optional) – If given, only return a list of file paths with this given key.- Returns
If key is none, returns a dictionary of keys to list of file paths. Else, returns only the list of file paths for this given key.
- get_input_file_names_from_dict(requirement_key: str, key: Optional[str] = None) Union[Dict[str, List[str]], List[str]] #
Get a dictionary of input file names of the tasks, which are defined in our requirements.
The requirement method should return a dict whose values are generator expressions (!) yielding required task objects.
Example
class TaskB(luigi.Task): def requires(self): return { "a": (TaskA(5.0, i) for i in range(100)), "b": (TaskA(1.0, i) for i in range(100)), } def run(self): result_a = do_something_with_a( self.get_input_file_names_from_dict("a") ) result_b = do_something_with_b( self.get_input_file_names_from_dict("b") ) combine_a_and_b( result_a, result_b, self.get_output_file_name("combined_results.txt") ) def output(self): yield self.add_to_output("combined_results.txt")
Either use the key argument or dictionary indexing with the key given to
add_to_output
to get back a list (!) of file paths.- Parameters
requirement_key (
str
) – Specifies the required task expression.key (
str
, optional) – If given, only return a list of file paths with this given key.
- Returns
If key is none, returns a dictionary of keys to list of file paths. Else, returns only the list of file paths for this given key.
- get_output_file_name(key: str) str #
Analogous to
get_input_file_names
this function returns a an output file defined in out output function with the given key.In contrast to
get_input_file_names
, only a single file name will be returned (as there can only be a single output file with a given name).- Parameters
key (
str
) – Return the file path with this given key.- Returns
Returns only the file path for this given key.
- class b2luigi.ExternalTask(*args, **kwargs)#
Bases:
b2luigi.core.task.Task
,luigi.task.ExternalTask
Direct copy of
luigi.ExternalTask
, but with the capabilities ofTask
added.
- class b2luigi.WrapperTask(*args, **kwargs)#
Bases:
b2luigi.core.task.Task
,luigi.task.WrapperTask
Direct copy of
luigi.WrapperTask
, but with the capabilities ofTask
added.
- b2luigi.dispatch(run_function)#
In cases you have a run function calling external, probably insecure functionalities, use this function wrapper around your run function. It basically emulates a batch submission on your local computer (without any batch system) with the benefit of having a totally separete execution path. If your called task fails miserably (e.g. segfaults), it does not crash your main application.
Example
The run function can include any code you want. When the task runs, it is started in a subprocess and monitored by the parent process. When it dies unexpectedly (e.g. because of a segfault etc.) the task will be marked as failed. If not, it is successful. The log output will be written to two files in the log folder (marked with the parameters of the task), which you can check afterwards:
import b2luigi class MyTask(b2luigi.Task): @b2luigi.dispatch def run(self): call_some_evil_function()
Note
We are reusing the batch system implementation here, with all its settings and nobs to setup the environment etc. If you want to control it in more detail, please check out Batch Processing.
- Implementation note:
In the subprocess we are calling the current executable (which should by python) with the current input file as a parameter, but let it only run this specific task (by handing over the task id and the
--batch-worker
option). The run function notices this and actually runs the task instead of dispatching again.
Additionally, you can add a
cmd_prefix
parameter to your class, which also needs to be a list of strings, which are prefixed to the current command (e.g. if you want to add a profiler to all your tasks).
- class b2luigi.DispatchableTask(*args, **kwargs)#
Bases:
b2luigi.core.task.Task
Instead of using the
dispatch
function wrapper, you can also inherit from this class. Except that, it has exactly the same functionality as a normalTask
.Important
You need to overload the process function instead of the run function in this case!
- process()#
Override this method with your normal run function. Do not touch the run function itself!
Parameters#
As b2luigi
automatically also imports luigi
, you can use all the parameters from luigi
you know and love.
We have just added a single new flag called hashed
to the parameters constructor.
Turning it to true (it is turned off by default) will make b2luigi
use a hashed version
of the parameters value, when constructing output or log file paths.
This is especially useful if you have parameters, which may include “dangerous” characters, like “/” or “{” (e.g.
when using list or dictionary parameters).
If you want to exclude certain parameters from the creation of the directory structure , you can use the significant
flag and set it to False
.
See also one of our FAQ.
For more information on luigi.Parameters
and what types there are see the Luigi Documentation
Other important features from luigi#
luigi
itself already boasts a number of nice features. Check out some highlights:
- The central scheduler
- The notification system
- communication with tasks
Settings#
- b2luigi.get_setting(key, default=None, task=None, deprecated_keys=None)#
b2luigi
adds a settings management toluigi
and also uses it at various places. Many batch systems, the output and log path, the environment etc. is controlled via these settings.There are four ways settings could be defined. They are used in the following order (an earlier setting overrides a later one):
If the currently processed (or scheduled) task has a property of the given name, it is used. Please note that you can either set the property directly, e.g.
class MyTask(b2luigi.Task): batch_system = "htcondor"
or by using a function (which might even depend on the parameters)
class MyTask(b2luigi.Task): @property def batch_system(self): return "htcondor"
The latter is especially useful for batch system specific settings such as requested wall time etc.
Settings set directly by the user in your script with a call to
b2luigi.set_setting()
.Settings specified in the
settings.json
in the folder of your script or any folder above that. This makes it possible to have general project settings (e.g. the output path or the batch system) and a specificsettings.json
for your sub-project.
With this function, you can get the current value of a specific setting with the given key. If there is no setting defined with this name, either the default is returned or, if you did not supply any default, a value error is raised.
Settings can be of any type, but are mostly strings.
- Parameters
key (
str
) – The name of the parameter to query.task – (
b2luigi.Task
): If given, check if the task has a parameter with this name.default (optional) – If there is no setting which the name, either return this default or if it is not set, raise a ValueError.
deprecated_keys (
List
) – Former names of this setting, will throw a warning when still used
- b2luigi.set_setting(key, value)#
Set the setting with the specified name - overriding any
setting.json
. If you want to have task specific settings, create a parameter with the given name or your task.
- b2luigi.clear_setting(key)#
Clear the setting with the given key
Other functions#
- b2luigi.on_temporary_files(run_function)#
Wrapper for decorating a task’s run function to use temporary files as outputs.
A common problem when using long running tasks in
luigi
is the so called thanksgiving bug (see https://www.arashrouhani.com/luigi-budapest-bi-oct-2015/#/21). It occurs, when you define an output of a task and in its run function, you create this output before filling it with content (maybe even only after a long lasting calculation). It may happen, that during the creation of the output and the finish of the calculation some other tasks checks if the output is already there, finds it and assumes, that the task is already finished (although there is probably only non-sense in the file so far).A solution is already given by luigi itself, when using the
temporary_path()
function of the file system targets, which is really nice! Unfortunately, this means you have to open all your output files with a context manager and this is very hard to do if you have external tasks also (because they will probably use the output file directly instead of the temporary file version of if).This wrapper simplifies the usage of the temporary files:
import b2luigi class MyTask(b2luigi.Task): def output(self): yield self.add_to_output("test.txt") @b2luigi.on_temporary_files def run(self): with open(self.get_output_file_name("test.txt"), "w") as f: raise ValueError() f.write("Test")
Instead of creating the file “test.txt” at the beginning and filling it with content later (which will never happen because of the exception thrown, which makes the file existing but the task actually not finished), the file will be written to a temporary file first and copied to its final location at the end of the run function (but only if there was no error).
Warning
The decorator only edits the function
b2luigi.Task.get_output_file_name()
. If you are using the output directly, you have to take care of using the temporary path correctly by yourself!
- b2luigi.core.utils.product_dict(**kwargs: Any) collections.abc.Iterator[Dict[str, Any]] #
Cross-product the given parameters and return a list of dictionaries.
Example
>>> list(product_dict(arg_1=[1, 2], arg_2=[3, 4])) [{'arg_1': 1, 'arg_2': 3}, {'arg_1': 1, 'arg_2': 4}, {'arg_1': 2, 'arg_2': 3}, {'arg_1': 2, 'arg_2': 4}]
The thus produced list can directly be used as inputs for a required tasks:
def requires(self): for args in product_dict(arg_1=[1, 2], arg_2=[3, 4]): yield some_task(**args)
- Parameters
kwargs – Each keyword argument should be an iterable
- Returns
A list of kwargs where each list of input keyword arguments is cross-multiplied with every other.