Source code for b2luigi.cli.process
from b2luigi.cli.arguments import get_cli_arguments
from b2luigi.cli import runner
__has_run_already = False
[docs]
def process(
task_like_elements,
show_output=False,
dry_run=False,
test=False,
batch=False,
remove=[],
remove_only=[],
auto_confirm=False,
ignore_additional_command_line_args=False,
**kwargs,
):
"""
Call this function in your main method to tell ``b2luigi`` where your entry
point of the task graph is.
It is very similar to ``luigi.build`` with some additional configuration options.
Example:
This example defines a simple task and tells ``b2luigi`` to execute it 100 times
with different parameters:
.. code-block:: python
import b2luigi
import random
class MyNumberTask(b2luigi.Task):
some_parameter = b2luigi.Parameter()
def output(self):
return b2luigi.LocalTarget(f"results/output_file_{self.some_parameter}.txt")
def run(self):
random_number = random.random()
with self.output().open("w") as f:
f.write(f"{random_number}\\n")
if __name__ == "__main__":
b2luigi.process([MyNumberTask(some_parameter=i) for i in range(100)])
All flag arguments can also be given as command line arguments.
This means the call with::
b2luigi.process(tasks, batch=True)
is equivalent to calling the script with::
python script.py --batch
Args:
task_like_elements (:obj:`Task` or list): Task(s) to execute with luigi.
Can either be a list of tasks or a task instance.
show_output (bool, optional): Instead of running the task(s), write out all output files
which will be generated marked in color, if they are present already.
Good for testing of your tasks will do, what you think they should.
dry_run (bool, optional): Instead od running the task(s), write out which tasks will
be executed. This is a simplified form of dependency resolution, so this
information may be wrong in some corner cases. Also good for testing.
test (bool, optional): Does neither run on the batch system, with multiprocessing
or dispatched (see :obj:`DispatchableTask`) but directly on the machine for
debugging reasons. Does output all logs to the console.
batch (bool, optional): Execute the tasks on the selected batch system.
Refer to :ref:`quick-start-label` for more information.
The default batch system is LSF, but this can be changed with the `batch_system`
settings. See :obj:`get_setting` on how to define settings.
remove (list, optional): If a single task is given, remove the output of this task.
If a list of tasks is given, remove the output of all tasks in the list.
ignore_additional_command_line_args (bool, optional, default False): Ignore additional
command line arguments. This is useful if you want to use this function in a file
that also does some command line parsing.
**kwargs: Additional keyword arguments passed to ``luigi.build``.
Warning:
You should always have just a single call to ``process`` in your script.
If you need to have multiple calls, either use a :class:`b2luigi.WrapperTask`
or two scripts.
"""
# Assert, that process is only run once
global __has_run_already
if __has_run_already:
raise RuntimeError("You are not allowed to call process twice in your code!")
__has_run_already = True
# Create Task List
if not isinstance(task_like_elements, list):
task_list = [task_like_elements]
else:
task_list = task_like_elements
# Check the CLI arguments and run as requested
cli_args = get_cli_arguments(ignore_additional_command_line_args=ignore_additional_command_line_args)
if cli_args.show_output or show_output:
runner.show_all_outputs(task_list)
elif cli_args.dry_run or dry_run:
runner.dry_run(task_list)
elif cli_args.test or test:
runner.run_test_mode(task_list, cli_args, kwargs)
elif cli_args.batch_runner:
runner.run_as_batch_worker(task_list, cli_args, kwargs)
elif cli_args.remove or remove:
runner.remove_outputs(
task_list, target_tasks=cli_args.remove or remove, auto_confirm=auto_confirm or cli_args.yes
)
elif cli_args.remove_only or remove_only:
runner.remove_outputs(
task_list,
target_tasks=cli_args.remove_only or remove_only,
only=True,
auto_confirm=auto_confirm or cli_args.yes,
)
elif cli_args.batch or batch:
runner.run_batched(task_list, cli_args, kwargs)
else:
runner.run_local(task_list, cli_args, kwargs)