Source code for b2luigi.core.dispatchable_task
import functools
from b2luigi.core.settings import get_setting
from b2luigi.core.task import Task
from b2luigi.core.utils import create_output_dirs
from b2luigi.core.executable import run_task_remote
[docs]def dispatch(run_function):
"""
In cases you have a run function calling external, probably insecure functionalities,
use this function wrapper around your run function.
It basically "emulates" a batch submission on your local computer (without any
batch system) with the benefit of having a totally separate execution path.
If your called task fails miserably (e.g. segfaults), it does not crash your main application.
Example:
The run function can include any code you want. When the task runs,
it is started in a subprocess and monitored by the parent process.
When it dies unexpectedly (e.g. because of a segfault etc.)
the task will be marked as failed. If not, it is successful.
The log output will be written to two files in the log folder (marked with
the parameters of the task), which you can check afterwards:
.. code-block:: python
import b2luigi
class MyTask(b2luigi.Task):
@b2luigi.dispatch
def run(self):
call_some_evil_function()
Note:
We are reusing the batch system implementation here, with all its settings
and nobs to setup the environment etc.
If you want to control it in more detail, please check out :ref:`batch-label`.
Implementation note:
In the ``subprocess`` we are calling the current executable
with the current input file as a parameter, but let it only run this
specific task (by handing over the task id and the ``--batch-worker`` option).
The run function notices this and actually runs the task instead of dispatching again.
Additionally, you can add a ``cmd_prefix`` parameter to your class, which also
needs to be a list of strings, which are prefixed to the current command (e.g.
if you want to add a profiler to all your tasks).
"""
@functools.wraps(run_function)
def wrapped_run_function(task):
if get_setting("_dispatch_local_execution", default=False, deprecated_keys=["local_execution"]):
create_output_dirs(task)
run_function(task)
else:
run_task_remote(task)
return wrapped_run_function
[docs]class DispatchableTask(Task):
"""
Instead of using the :obj:`dispatch` function wrapper,
you can also inherit from this class.
Except that, it has exactly the same functionality
as a normal :obj:`Task`.
Important:
You need to overload the process function
instead of the run function in this case!
"""
[docs] def process(self):
"""
Override this method with your normal run function.
Do not touch the run function itself!
"""
raise NotImplementedError
def run(self):
dispatch(self.__class__.process)(self)