Source code for b2luigi

"""b2luigi - bringing batch 2 luigi"""

from luigi import *
from luigi.util import copies

# version must be defined after importing the luigi namespace,
# otherwise the b2luigi.__version__ gets overwritten by the one from luigi
__version__ = "1.2.2"

from b2luigi.core.parameter import wrap_parameter, BoolParameter
from typing import Optional, Union, Collection

wrap_parameter()

from b2luigi.core.task import Task, ExternalTask, WrapperTask
from b2luigi.core.target import LocalTarget
from b2luigi.core.temporary_wrapper import on_temporary_files
from b2luigi.core.dispatchable_task import DispatchableTask, dispatch
from b2luigi.core.settings import get_setting, set_setting, clear_setting, _setting_file_iterator
from b2luigi.core.xrootd_targets import XRootDSystem, XRootDTarget
from b2luigi.cli.process import process


[docs] class requires(object): """ This "hack" copies the luigi.requires functionality, except that we allow for additional kwarg arguments when called. It can be used to require a certain task, but with some variables already set, e.g. .. code-block:: python class TaskA(b2luigi.Task): some_parameter = b2luigi.IntParameter() some_other_parameter = b2luigi.IntParameter() def output(self): yield self.add_to_output("test.txt") @b2luigi.requires(TaskA, some_parameter=3) class TaskB(b2luigi.Task): another_parameter = b2luigi.IntParameter() def output(self): yield self.add_to_output("out.dat") TaskB will not require TaskA, where ``some_parameter`` is already set to ``3``. This also means, that TaskB only has the parameters ``another_parameter`` and ``some_other_parameter`` (because ``some_parameter`` is already fixed). It is also possible to require multiple tasks, e.g. .. code-block:: python class TaskA(b2luigi.Task): some_parameter = b2luigi.IntParameter() def output(self): yield self.add_to_output("test.txt") class TaskB(b2luigi.Task): some_other_parameter = b2luigi.IntParameter() def output(self): yield self.add_to_output("test.txt") @b2luigi.requires(TaskA, TaskB) class TaskC(b2luigi.Task): another_parameter = b2luigi.IntParameter() def output(self): yield self.add_to_output("out.dat") """ def __init__(self, *tasks_to_require, **kwargs): super(requires, self).__init__() self.tasks_to_require = tasks_to_require self.kwargs = kwargs def __call__(self, task_that_requires): # Get all parameter objects from the underlying task for task_to_require in self.tasks_to_require: for param_name, param_obj in task_to_require.get_params(): # Check if the parameter exists in the inheriting task if not hasattr(task_that_requires, param_name) and param_name not in self.kwargs: # If not, add it to the inheriting task setattr(task_that_requires, param_name, param_obj) # Modify task_that_requires by adding requires method. # If only one task is required, this single task is returned. # Otherwise, list of tasks is returned old_requires = task_that_requires.requires def requires(_self): yield from old_requires(_self) yield ( _self.clone(cls=self.tasks_to_require[0], **self.kwargs) if len(self.tasks_to_require) == 1 else [_self.clone(cls=task_to_require, **self.kwargs) for task_to_require in self.tasks_to_require] ) task_that_requires.requires = requires return task_that_requires
[docs] class inherits(object): """ This copies the ``luigi.inherits`` functionality but allows specifying parameters you don't want to inherit. It can e.g. be used in tasks that merge the output of the tasks they require. These merger tasks don't need the parameter they resolve anymore but should keep the same order of parameters, therefore simplifying the directory structure created by :meth:`b2luigi.Task.add_to_output`. Usage can be similar to this: .. code-block:: python class TaskA(b2luigi.Task): some_parameter = b2luigi.IntParameter() some_other_parameter = b2luigi.IntParameter() def output(self): yield self.add_to_output("test.txt") @b2luigi.inherits(TaskA, without='some_other_parameter') class TaskB(b2luigi.Task): another_parameter = b2luigi.IntParameter() def requires(self): for my_other_parameter in range(10): yield self.clone(TaskA, some_other_parameter=my_other_parameter) def run(self): # somehow merge the output of TaskA to create "out.dat" pass def output(self): yield self.add_to_output("out.dat") Parameters: without: Either a string or a collection of strings See also: :obj:`b2luigi.requires` which extends ``luigi.requires``. """ def __init__(self, *tasks_to_inherit, **kwargs): super(inherits, self).__init__() self.tasks_to_inherit = tasks_to_inherit without = kwargs.pop("without", None) if isinstance(without, str): self.without = [without] elif without is None: self.without = [] else: self.without = list(without) def __call__(self, task_that_inherits): # Get all parameter objects from each of the underlying tasks task_iterator = self.tasks_to_inherit for task_to_inherit in task_iterator: for param_name, param_obj in task_to_inherit.get_params(): # Check if the parameter exists in the inheriting task if not hasattr(task_that_inherits, param_name) and param_name not in self.without: # If not, add it to the inheriting task setattr(task_that_inherits, param_name, param_obj) elif param_name in self.without: self.without.remove(param_name) # Modify task_that_inherits by adding methods def clone_parent(_self, **kwargs): return _self.clone(cls=self.tasks_to_inherit[0], **kwargs) task_that_inherits.clone_parent = clone_parent def clone_parents(_self, **kwargs): return [_self.clone(cls=task_to_inherit, **kwargs) for task_to_inherit in self.tasks_to_inherit] task_that_inherits.clone_parents = clone_parents return task_that_inherits