Worker Scheduler Factory#
The key principle of b2luigi
is the steering of different batch processes.
For this, b2luigi
defines a custom so-called “Worker Scheduler Factory”.
With the factory, we define a custom worker which determines the appropriate batch system for a task and creates a task process accordingly.
- class b2luigi.batch.workers.BatchSystems(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]#
Bases:
Enum
An enumeration representing different batch systems.
- lsf = 'lsf'#
- htcondor = 'htcondor'#
- slurm = 'slurm'#
- gbasf2 = 'gbasf2'#
- local = 'local'#
- test = 'test'#
- class b2luigi.batch.workers.SendJobWorker(scheduler=None, worker_id=None, worker_processes=1, assistant=False, **kwargs)[source]#
Bases:
Worker
A custom
luigi
worker that determines the appropriate batch system for a task and creates a task process accordingly.- detect_batch_system(task)[source]#
Detects the batch system to be used for task execution.
This method determines the batch system setting based on the provided task or automatically detects the available batch system on the system if the setting is
auto
. The detection checks for the presence of specific commands associated with known batch systems (e.g.,bsub
for LSF,condor_submit
for HTCondor,sbatch
for SLURM). If no known batch system is detected, it defaults tolocal
.- Parameters:
task – The task for which the batch system is being determined.
- Returns:
An instance of the
BatchSystems
enumeration representing the detected or configured batch system.- Return type:
- class b2luigi.batch.workers.SendJobWorkerSchedulerFactory[source]#
Bases:
_WorkerSchedulerFactory
A factory class for creating instances of
SendJobWorker
.This class extends
luigi.interface._WorkerSchedulerFactory
and overrides thecreate_worker
method to return aSendJobWorker
instance.- Parameters:
scheduler – The scheduler instance to be used by the worker.
worker_processes (int) – The number of worker processes to be used.
assistant (bool, optional) – Indicates whether the worker is in assistant mode. Defaults to False.
- create_worker(scheduler, worker_processes, assistant=False)[source]#
Creates and returns an instance of
SendJobWorker
.- Parameters:
scheduler – The scheduler instance to be used by the worker.
worker_processes (int) – The number of worker processes to be used.
assistant (bool, optional) – Indicates whether the worker should act as an assistant. Defaults to False.
- Returns:
An instance of the
SendJobWorker
class configured with the provided parameters.- Return type:
Batch Process Base Class#
- class b2luigi.batch.processes.BatchProcess(task, scheduler, result_queue, worker_timeout)[source]#
Bases:
object
This is the base class for all batch algorithms that allow luigi to run on a specific batch system. This is an abstract base class and inheriting classes need to supply functionalities for
starting a job using the commands in
self.task_cmd
getting the job status of a running, finished or failed job
and terminating a job
All those commands are called from the main process, which is not running on the batch system. Every batch system that is capable of these functions can in principle work together with
b2luigi
.- Implementation note:
In principle, using the batch system is transparent to the user. In case of problems, it may however be useful to understand how it is working.
When you start your
luigi
dependency tree withprocess(..., batch=True)
, the normalluigi
process is started looking for unfinished tasks and running them etc. Normally, luigi creates a process for each running task and runs them either directly or on a different core (if you have enabled more than one worker). In the batch case, this process is not a normal python multiprocessing process, but thisBatchProcess
, which has the same interface (one can check the status of the process, start or terminate it). The process does not need to wait for the batch job to finish but is asked repeatedly for the job status. By this, most of the core functionality ofluigi
is kept and reused. This also means, that every batch job only includes a single task and is finished whenever this task is done decreasing the batch runtime. You will need exactly as many batch jobs as you have tasks and no batch job will idle waiting for input data as all are scheduled only when the task they should run is actually runnable (the input files are there).What is the batch command now? In each job, we call a specific executable bash script only created for this task. It contains the setup of the environment (if given by the user via the settings), the change of the working directory (the directory of the python script or a specified directory by the user) and a call of this script with the current python interpreter (the one you used to call this main file or given by the setting
executable
) . However, we give this call an additional parameter, which tells it to only run one single task. Task can be identified by their task id. A typical task command may look like:/<path-to-your-exec>/python /your-project/some-file.py --batch-runner --task-id MyTask_38dsf879w3
if the batch job should run the
MyTask
. The implementation of the abstract functions is responsible for creating an running the executable file and writing the log of the job into appropriate locations. You can use the functionscreate_executable_wrapper
andget_log_file_dir
to get the needed information.Checkout the implementation of the
lsf
task for some implementation example.
- property exitcode#
Retrieves the exit code for the process.
This method always returns
0
, which indicates successful execution. By consistently setting the exit code to0
, the result queue can be reliably used for delivering the result of the process.- Returns:
The exit code, always
0
.- Return type:
int
- get_job_status()[source]#
Implement this function to return the current job status. How you identify exactly your job is dependent on the implementation and needs to be handled by your own child class.
Must return one item of the JobStatus enumeration: running, aborted, successful or idle. Will only be called after the job is started but may also be called when the job is finished already. If the task status is unknown, return aborted. If the task has not started already but is scheduled, return running nevertheless (for
b2luigi
it makes no difference). No matter if aborted via a call toterminate_job
, by the batch system or by an exception in the job itself, you should return aborted if the job is not finished successfully (maybe you need to check the exit code of your job).
- start_job()[source]#
Override this function in your child class to start a job on the batch system. It is called exactly once. You need to store any information identifying your batch job on your own.
You can use the
b2luigi.core.utils.get_log_file_dir
and theb2luigi.core.executable.create_executable_wrapper
functions to get the log base name and to create the executable script which you should call in your batch job.After the
start_job
function is called by the framework (and no exception is thrown), it is assumed that a batch job is started or scheduled.After the job is finished (no matter if aborted or successful) we assume the stdout and stderr is written into the two files given by
b2luigi.core.utils.get_log_file_dir(self.task)
.
- terminate_job()[source]#
This command is used to abort a job started by the
start_job
function. It is only called once to abort a job, so make sure to either block until the job is really gone or be sure that it will go down soon. Especially, do not wait until the job is finished. It is called for example when the user pressesCtrl-C
.In some strange corner cases it may happen that this function is called even before the job is started (the
start_job
function is called). In this case, you do not need to do anything (but also not raise an exception).
- run()[source]#
Executes the batch process.
This method logs the start of the batch process, including the class name and associated task, and then initiates the job by calling
start_job
.
- terminate()[source]#
Terminates the current process by invoking the
terminate_job
method.This method is responsible for ensuring that the associated job or process is properly terminated. It acts as a wrapper around the
terminate_job
method, which contains the specific logic for termination.
- is_alive()[source]#
Check if the job is still alive based on its current status.
- Returns:
True
if the job is running,False
if it has terminated(either successfully or due to failure).
- Return type:
bool
- Raises:
ValueError – If the job status returned by
get_job_status
is not recognized.
- Behavior:
If the job has terminated successfully, it updates the result queue with a “DONE” status and marks the job as terminated.
If the job has been aborted, it calls the on_failure handler, updates the result queue with a “FAILED” status, and marks the job as terminated.
If the job is still running, it returns True.