Batch Processing#
As shown in Quick Start, using a batch system instead of local processing is really just a --batch
on the command line or calling process
with batch=True
.
However, there is more to discover!
Choosing the batch system#
Using b2luigi
’s settings mechanism (described here b2luigi.get_setting()
) you can choose which
batch system should be used.
Currently, htcondor
and lsf
are supported, with lsf``being the default setting.
There is also an wrapper for ``gbasf2
, the Belle II
submission tool for the LHC Worldwide Computing Grid, which works for Basf2PathTask
tasks.
In addition, it is possible to set the batch_system
setting to auto
which tries to detect which batch system is
available on your system. The automated discovery checks for the submission tools present on the system and sets the
BatchProcess()
accordingly. This functionality works for lsf
and htcondor
systems. gbasf2
will not
be detected by auto
but needs to be set explicitly.
Choosing the Environment#
If you are doing a local calculation, all calculated tasks will use the same environment (e.g. $PATH
setting, libraries etc.)
as you have currently set up when calling your script(s).
This makes it predictable and simple.
Things get a bit more complicated when using a batch farm, as the workers might not have the same environment set up, the batch submission does not copy the environment (or the local site administrators have forbidden that) or the system on the workers is so different that copying the environment from the scheduling machine does not make sense.
Therefore b2luigi
provides you with three mechanism to set the environment for each task:
You can give a bash script in the
env_script
setting (viaset_setting()
,settings.json
or for each task as usual, seeb2luigi.get_setting()
), which will be called even before anything else on the worker. Use it to set up things like the path variables or the libraries (e.g. when you are using a virtual environment) and your batch system does not support environment copy from the scheduler to the workers. For example a useful script might look like this:# Source my virtual environment source venv/bin/activate # Set some specific settings export MY_IMPORTANT_SETTING 10
You can set the
env
setting to a dictionary, which contains additional variables to be set up before your job runs. Using the mechanism described inb2luigi.get_setting()
it is possible to make this task- or even parameter-dependent.By default,
b2luigi
re-uses the samepython
executable on the workers as you used to schedule the tasks (by calling your script). In some cases, this specific python executable is not present on the worker or is not usable (e.g. because of different operation systems or architectures). You can choose a new executable with theexecutable
setting (it is also possible to just usepython3
as the executable assuming it is in the path). The executable needs to be callable after yourenv_script
or your specificenv
settings are used. Please note, that theenvironment
setting is a list, so you need to pass your python executable with possible arguments like this:b2luigi.set_setting("executable", ["python3"])
File System#
Depending on your batch system, the filesystem on the worker processing the task and the scheduler machine can be different or even unrelated. Different batch systems and batch systems implementations treat this fact differently. In the following, the basic procedure and assumption is explained. Any deviation from this is described in the next section.
By default, b2luigi
needs at least two folders to be accessible from the scheduling as well as worker machine:
the result folder and the folder of your script(s).
If possible, use absolute paths for the result and log directory to prevent any problems.
Some batch systems (e.g. htcondor
) support file copy mechanisms from the scheduler to the worker systems.
Please checkout the specifics below.
Hint
All relative paths given to e.g. the result_dir
or the log_dir
are always evaluated
relative to the folder where your script lives.
To prevent any disambiguities, try to use absolute paths whenever possible.
Some batch system starts the job in an arbitrary folder on the workers instead of the current folder on the scheduler.
That is why b2luigi
will change the directory into the path of your called script before starting the job.
In case your script is accessible from a different location on the worker than on the scheduling machine, you can give the setting working_dir
to specify where the job should run.
Your script needs to be in this folder and every relative path (e.g. for results or log files) will be evaluated from there.
Drawbacks of the batch mode#
Although the batch mode has many benefits, it would be unfair to not mention its downsides:
You have to choose the queue/batch settings/etc. depending in your requirements (e.g. wall clock time) by yourself. So you need to make sure that the tasks will actually finish before the batch system terminates them because of timeout. There is just no way for
b2luigi
to know this beforehand.There is currently no resubmission implemented. This means dying jobs because of batch system failures are just dead. But because of the dependency checking mechanism of
luigi
it is simple to just redo the calculation and re-calculate what is missing.The
luigi
feature to request new dependencies while task running (viayield
) is not implemented for the batch mode so far.
Batch System Specific Settings#
Every batch system has special settings. You can look them up here:
LSF#
- class b2luigi.batch.processes.lsf.LSFProcess(*args, **kwargs)[source]
Bases:
BatchProcess
Reference implementation of the batch process for a LSF batch system. Heavily inspired by this post.
Additional to the basic batch setup (see Batch Processing), there are LSF-specific
settings
. These are:the LSF queue:
queue
.the number of slots for the job. On KEKCC this increases the memory available to the job:
job_slots
.the LSF job name:
job_name
.
For example:
class MyLongTask(b2luigi.Task): queue = "l" job_name = "my_long_task"
The default queue is the short queue
"s"
. If nojob_name
is set the task will appear as<result_dir>/parameter1=value/.../executable_wrapper.sh"
when running
bjobs
.By default, the environment variables from the scheduler are copied to the workers. This also implies we start in the same working directory, can reuse the same executable, etc. Normally, you do not need to supply
env_script
or alike.
HTCondor#
- class b2luigi.batch.processes.htcondor.HTCondorProcess(*args, **kwargs)[source]
Bases:
BatchProcess
Reference implementation of the batch process for a HTCondor batch system.
Additional to the basic batch setup (see Batch Processing), additional HTCondor-specific things are:
Please note that most of the HTCondor batch farms do not have the same environment setup on submission and worker machines, so you probably want to give an
env_script
, anenv
setting
and/or a differentexecutable
.HTCondor supports copying files from submission to workers. This means if the folder of your script(s)/python project/etc. is not accessible on the worker, you can copy it from the submission machine by adding it to the setting
transfer_files
. This list can host both folders and files. Please note that due to HTCondors file transfer mechanism, all specified folders and files will be copied into the worker node flattened, so if you specify a/b/c.txt you will end up with a file c.txt. If you use thetransfer_files
mechanism, you need to set theworking_dir
setting to “.” as the files will end up in the current worker scratch folder. All specified files/folders should be absolute paths.Hint
Please do not specify any parts or the full results folder. This will lead to unexpected behavior. We are working on a solution to also copy results, but until this the results folder is still expected to be shared.
If you copy your python project using this setting to the worker machine, do not forget to actually set it up in your setup script. Additionally, you might want to copy your
settings.json
as well.Via the
htcondor_settings
setting you can provide a dict as a for additional options, such as requested memory etc. Its value has to be a dictionary containing HTCondor settings as key/value pairs. These options will be written into the job submission file. For an overview of possible settings refer to the HTCondor documentation.Same as for the LSF, the
job_name
setting allows giving a meaningful name to a group of jobs. If you want to be htcondor-specific, you can provide theJobBatchName
as an entry in thehtcondor_settings
dict, which will override the globaljob_name
setting. This is useful for manually checking the status of specific jobs withcondor_q -batch <job name>
Example
1import b2luigi 2import random 3 4 5class MyNumberTask(b2luigi.Task): 6 some_parameter = b2luigi.IntParameter() 7 8 htcondor_settings = {"request_cpus": 1, "request_memory": "100 MB"} 9 10 def output(self): 11 yield self.add_to_output("output_file.txt") 12 13 def run(self): 14 print("I am now starting a task") 15 random_number = random.random() 16 17 if self.some_parameter == 3: 18 raise ValueError 19 20 with open(self.get_output_file_name("output_file.txt"), "w") as f: 21 f.write(f"{random_number}\n") 22 23 24class MyAverageTask(b2luigi.Task): 25 htcondor_settings = {"request_cpus": 1, "request_memory": "200 MB"} 26 27 def requires(self): 28 for i in range(10): 29 yield self.clone(MyNumberTask, some_parameter=i) 30 31 def output(self): 32 yield self.add_to_output("average.txt") 33 34 def run(self): 35 print("I am now starting the average task") 36 37 # Build the mean 38 summed_numbers = 0 39 counter = 0 40 for input_file in self.get_input_file_names("output_file.txt"): 41 with open(input_file, "r") as f: 42 summed_numbers += float(f.read()) 43 counter += 1 44 45 average = summed_numbers / counter 46 47 with open(self.get_output_file_name("average.txt"), "w") as f: 48 f.write(f"{average}\n") 49 50 51if __name__ == "__main__": 52 b2luigi.process(MyAverageTask(), workers=200, batch=True)
Slurm#
- class b2luigi.batch.processes.slurm.SlurmProcess(*args, **kwargs)[source]
Bases:
BatchProcess
Reference implementation of the batch process for a Slurm batch system.
Additional to the basic batch setup (see Batch Processing), additional Slurm-specific things are:
Please note that most of the Slurm batch farms by default copy the user environment from the submission node to the worker machine. As this can lead to different results when running the same tasks depending on your active environment, you probably want to pass the argument
export=NONE
. This ensures that a reproducible environment is used. You can provide anenv_script
, anenv
setting
, and/or a differentexecutable
to create the environment necessary for your task.Via the
slurm_settings
setting you can provide a dict for additional options, such as requested memory etc. Its value has to be a dictionary containing Slurm settings as key/value pairs. These options will be written into the job submission file. For an overview of possible settings refer to the Slurm documentation <https://slurm.schedmd.com/sbatch.html#>_ and the documentation of the cluster you are using.Same as for the LSF and HTCondor, the
job_name
setting allows giving a meaningful name to a group of jobs. If you want to be task-instance-specific, you can provide thejob-name
as an entry in theslurm_settings
dict, which will override the globaljob_name
setting. This is useful for manually checking the status of specific jobs withsqueue --name <job name>
Example
1import b2luigi 2import random 3 4 5class MyNumberTask(b2luigi.Task): 6 some_parameter = b2luigi.IntParameter() 7 batch_system = "slurm" 8 slurm_settings = {"export": "NONE", "ntasks": 1, "mem": "100MB"} 9 10 def output(self): 11 yield self.add_to_output("output_file.txt") 12 13 def run(self): 14 print("I am now starting a task") 15 random_number = random.random() 16 17 with open(self.get_output_file_name("output_file.txt"), "w") as f: 18 f.write(f"{random_number}\n") 19 20 21class MyAverageTask(b2luigi.Task): 22 batch_system = "slurm" 23 slurm_settings = {"export": "NONE", "ntasks": 1, "mem": "100MB"} 24 25 def requires(self): 26 for i in range(10): 27 yield self.clone(MyNumberTask, some_parameter=i) 28 29 def output(self): 30 yield self.add_to_output("average.txt") 31 32 def run(self): 33 print("I am now starting the average task") 34 35 # Build the mean 36 summed_numbers = 0 37 counter = 0 38 for input_file in self.get_input_file_names("output_file.txt"): 39 with open(input_file, "r") as f: 40 summed_numbers += float(f.read()) 41 counter += 1 42 43 average = summed_numbers / counter 44 45 with open(self.get_output_file_name("average.txt"), "w") as f: 46 f.write(f"{average}\n") 47 48 49if __name__ == "__main__": 50 b2luigi.process(MyAverageTask(), workers=200, batch=True)
GBasf2 Wrapper for LCG#
- class b2luigi.batch.processes.gbasf2.Gbasf2Process(*args, **kwargs)[source]
Bases:
BatchProcess
Batch process for working with
gbasf2
projects on the LHC Computing Grid (LCG).- Features
``gbasf2`` project submission
The
gbasf2
batch process takes thebasf2
path returned by thecreate_path
method of the task, saves it into a pickle file to the disk and creates a wrapper steering file that executes the saved path. Anybasf2
variable aliases added in thePath()
orcreate_path()
method are also stored in the pickle file. It then sends both the pickle file and the steering file wrapper to the grid via the Belle II-specific DIRAC-wrappergbasf2
. However,b2luigi
supports the submission of custom steering files with the settinggbasf2_custom_steering_file
. This conserves the way that thebasf2
path os still contained in thecreate_path()
method. In this instance,b2luigi``checks automatically if the corresponding file exists and copies it into the active directory. The ``gbasf2
task is then set to submit the unpickled file to the grid job which allows the utilization of python-based basf2 modules.Project status monitoring
After the project submission, the gbasf batch process regularly checks the status of all the jobs belonging to a
gbasf2
project returns a success if all jobs had been successful, while a single failed job results in a failed project. You can close a runningb2luigi
process and then start your script again and if a task with the same project name is running, thisb2luigi
gbasf2
wrapper will recognize that and instead of resubmitting a new project, continue monitoring the running project.Hint
The outputs of
gbasf2
tasks can be a bit overwhelming, so I recommend using the central scheduler which provides a nice overview of all tasks in the browser, including a status/progress indicator how many jobs in agbasf2
project are already done.Automatic download of datasets and logs
If all jobs had been successful, it automatically downloads the output dataset and the log files from the job sandboxes and automatically checks if the download was successful before moving the data to the final location. On failure, it only downloads the logs. The dataset download can be optionally disabled.
Automatic rescheduling of failed jobs
Whenever a job fails,
gbasf2
reschedules it as long as the number of retries is below the value of the settinggbasf2_max_retries
. It keeps track of the number of retries in a local file in thelog_file_dir
, so that it does not change if you closeb2luigi
and start it again. Of course it does not persist if you remove that file or move to a different machine.
Note
Despite all the automatization that this
gbasf2
wrapper provides, the user is expected to have a basic understanding of how the grid works and know the basics of working withgbasf2
commands manually.- Caveats
The
gbasf2
batch process for luigi can only be used for tasks inheriting fromBasf2PathTask
or other tasks with acreate_path()
method that returns abasf2
path.It can be used only for picklable ``basf2`` paths, with only some limited global
basf2
state saved (currently aliases and global tags). The batch process stores the path created bycreate_path
in a python pickle file and runs that on the grid. Therefore, python ``basf2`` modules are not yet supported. To see if the path produced by a steering file is picklable, you can try to dump it withbasf2 --dump-path
and execute it again withbasf2 --execute-path
. In case the steering file contains content (e.g. modules) that cannot be pickled, the feature settinggbasf2_custom_steering_file
can be utilized which has to be set to the path of the steering file the user wishes to be used. This submits the custom steering file to the grid job. The specific use case for this is the usage and interaction with python-basedbasf2
modules that are not pickable.Output format: Changing the batch to
gbasf2
means you also have to adapt how you handle the output of yourgbasf2
task in tasks depending on it, because the output will not be a single root file anymore (e.g.B_ntuple.root
), but a collection of root files, one for each file in the input data set, in a directory with the base name of the root files, e.g.:<task output directory> ├── B_ntuple.root │ └── B_ntuple_0.root │ └── B_ntuple_1.root │ └── ... ├── D_ntuple.root │ └── D_ntuple_0.root │ └── ...
- Settings for
gbasf2
tasks: To submit a task with the
gbasf2
wrapper, you first you have to add the propertybatch_system = "gbasf2"
, which sets thebatch_system
setting. It is not recommended to set that setting globally, as not all tasks can be submitted to the grid, but only tasks with acreate_path
method.For
gbasf2
tasks it is further required to set the settingsgbasf2_input_dataset
: String with the logical path of a dataset on the grid to use as an input to the task. You can provide multiple inputs by having multiple paths contained in this string, separated by commas without spaces. An alternative is to just instantiate multiple tasks with different input datasets, if you want to know in retrospect which input dataset had been used for the production of a specific output.gbasf2_input_dslist
: Alternatively togbasf2_input_dataset
, you can use this setting to provide a text file containing the logical grid path names, one per line.gbasf2_project_name_prefix
: A string with which yourgbasf2
project names will start. To ensure the project associate with each unique task (i.e. for each of luigi parameters) is unique, the uniquetask.task_id
is hashed and appended to the prefix to create the actualgbasf2
project name. Should be below 22 characters so that the project name with the hash can remain under 32 characters.
The following example shows a minimal class with all required options to run on the
gbasf2
/grid batch:class MyTask(Basf2PathTask): batch_system = "gbasf2" gbasf2_project_name_prefix = b2luigi.Parameter(significant=False) gbasf2_input_dataset = b2luigi.Parameter(hashed=True)
Other not required, but noteworthy settings are:
gbasf2_setup_path
: Path togbasf2
environment setup script that needs so be sourced to rungbasf2
commands. Defaults to /cvmfs/belle.kek.jp/grid/gbasf2/pro/bashrc.gbasf2_release
: Defaults to the release of your currently set upbasf2
release. Set this if you want the jobs to use another release on the grid.gbasf2_proxy_lifetime
: Defaults to24
. When initializing a proxy, set the lifetime to this number of hours.gbasf2_min_proxy_lifetime
: Defaults to0
. During processing, prompt user to reinitialize proxy if remaining proxy lifetime drops below this number of hours.gbasf2_print_status_updates
: Defaults toTrue
. By setting it toFalse
you can turn off the printing of of the job summaries, that is the number of jobs in different states in agbasf2
project.gbasf2_max_retries
: Default to0
. Maximum number of times that each job in the project can be automatically rescheduled until the project is declared as failed.gbasf2_proxy_group
: Default to"belle"
. If provided, thegbasf2
wrapper will work with the customgbasf2
group, specified in this parameter. No need to specify this parameter in case of usual physics analysis at Belle II. If specified, one has to providegbasf2_project_lpn_path
parameter.gbasf2_project_lpn_path
: Path to the LPN folder for a specifiedgbasf2
group. The parameter has no effect unless thegbasf2_proxy_group
is used with non-default value.gbasf2_jinja_template_path
: This parameter sets a custom basf2 steering template where the user can adapt the default template (e.g. for altering the pdg database, …). Note that this is an expert option that should be treated with care.gbasf2_additional_download_params
: Defaults to"--new"
. This parameter sets additional parameters that are given togb2_ds_get
. Note that in case you override the parameter, the--new
parameter is not automatically set, so you might have to manually add--new
if you want this parameter to be used.gbasf2_download_dataset
: Defaults toTrue
. Disable this setting if you don’t want to download the output dataset from the grid on job success. As you can’t use the downloaded dataset as an output target forluigi
, you should then use the providedGbasf2GridProjectTarget
, as shown in the following example:from b2luigi.batch.processes.gbasf2 import get_unique_project_name, Gbasf2GridProjectTarget class MyTask(Basf2PathTask): # [...] def output(self): project_name = get_unique_project_name(self) return Gbasf2GridProjectTarget(project_name)
This is useful when chaining
gbasf2
tasks together, as they don’t need the output locally but take the grid datasets as input. Also useful when you just want to produce data on the grid for other people to use.Caution
For
Gbasf2GridProjectTarget``s the global and the task ``gbasf2_proxy_group
cannot differ.Tip
Set only one global
gbasf2_proxy_group
setting.gbasf2_download_logs
: Whether to automatically download the log output ofgbasf2
projects when the task succeeds or fails. Having the logs is important for reproducibility.gbasf2_custom_steering_file
: Optional path to submit a custom steering file togbasf2
. This does not pickle thebasf2.Path
instance and allows the utilization of python-based basf2 modules. Named modules have to be contained either in the steering file itself or by additional files via the input sandbox.
The following optional settings correspond to the equally named
gbasf
command line options (without thegbasf_
prefix) that you can set to customize yourgbasf2
project:gbasf2_noscout
,gbasf2_additional_files
,gbasf2_input_datafiles
,gbasf2_n_repition_job
,gbasf2_force_submission
,gbasf2_cputime
,gbasf2_evtpersec
,gbasf2_priority
,gbasf2_jobtype
,gbasf2_basf2opt
,gbasf2_lfn_sandboxfiles
,gbasf2_input_grouping
It is further possible to append arbitrary command line arguments to the
gbasf2
submission command with thegbasf2_additional_params
setting. If you want to blacklist a grid site, you can e.g. addb2luigi.set_setting("gbasf2_additional_params", "--banned_site LCG.KEK.jp")
- Example
Here is an example file to submit an analysis path created by the script in examples/gbasf2/example_mdst_analysis to grid via
gbasf2
:File:examples/gbasf2/gbasf2_example.py
#1import b2luigi 2from b2luigi.basf2_helper.tasks import Basf2PathTask 3 4import example_mdst_analysis 5 6 7class AnalysisTask(Basf2PathTask): 8 # set the batch_system property to use the gbasf2 wrapper batch process for this task 9 batch_system = "gbasf2" 10 # Must define a prefix for the gbasf2 project name to submit to the grid. 11 # b2luigi will then add a hash derived from the luigi parameters to create a unique project name. 12 gbasf2_project_name_prefix = b2luigi.Parameter() 13 gbasf2_input_dataset = b2luigi.Parameter(hashed=True) 14 # Example luigi cut parameter to facilitate starting multiple projects for different cut values 15 mbc_lower_cut = b2luigi.IntParameter() 16 17 def create_path(self): 18 mbc_range = (self.mbc_lower_cut, 5.3) 19 return example_mdst_analysis.create_analysis_path( 20 d_ntuple_filename="D_ntuple.root", b_ntuple_filename="B_ntuple.root", mbc_range=mbc_range 21 ) 22 23 def output(self): 24 yield self.add_to_output("D_ntuple.root") 25 yield self.add_to_output("B_ntuple.root") 26 27 28class AnalysisWrapperTask(b2luigi.WrapperTask): 29 """ 30 We use the AnalysisWrapperTask to be able to require multiple analyse tasks with 31 different input datasets and cut values. For each parameter combination, a 32 different gbasf2 project will be submitted. 33 """ 34 35 def requires(self): 36 input_dataset = ( 37 "/belle/MC/release-04-01-04/DB00000774/SkimM13ax1/prod00011778/e1003/4S/r00000/mixed/11180100/udst/sub00/" 38 "udst_000006_prod00011778_task10020000006.root" 39 ) 40 # if you want to iterate over different cuts, just add more values to this list 41 mbc_lower_cuts = [5.15, 5.2] 42 for mbc_lower_cut in mbc_lower_cuts: 43 yield AnalysisTask( 44 mbc_lower_cut=mbc_lower_cut, 45 gbasf2_project_name_prefix="luigiExample", 46 gbasf2_input_dataset=input_dataset, 47 max_event=100, 48 ) 49 50 51if __name__ == "__main__": 52 main_task_instance = AnalysisWrapperTask() 53 n_gbasf2_tasks = len(list(main_task_instance.requires())) 54 b2luigi.process(main_task_instance, workers=n_gbasf2_tasks)
- Handling failed jobs
The
gbasf2
input wrapper considers thegbasf2
project as failed if any of the jobs in the project failed and reached the maximum number of retries. It then automatically downloads the logs, so please look into them to see what the reason was. For example, it can be that only certain grid sites were affected, so you might want to exclude them by adding the"--banned_site ...
togbasf2_additional_params
.You also always reschedule jobs manually with the
gb2_job_reschedule
command or delete them withgb2_job_delete
so that thegbasf2
batch process doesn’t know they ever existed. Then run just run your luigi task/script again and it will start monitoring the running project again.
Apptainer#
- class b2luigi.batch.processes.apptainer.ApptainerProcess(*args, **kwargs)[source]
Bases:
BatchProcess
Simple implementation of a batch process for running jobs in an Apptainer container. Strictly speaking, this is not a batch process, but it is a simple way to run jobs in a container environment.
This process inherits the basic properties from
b2luigi.batch.processes.BatchProcess
but does not need to be executed in thebatch
context. However, running inbatch
mode is possible for thelsf
and thehtcondor
batch systems. Although, for the latter batch system it is not recommended to use apptainer images since HTCondor is already running in a container environment.The core principle of this process is to run the task in an Apptainer container. To achieve the execution of tasks, an
apptainer exec
command is build within this class and executed in a subprocess. To steer the execution, one can use the following settings:apptainer_image
: The image to use for the Apptainer container.sThis parameter is mandatory and needs to be set if the task should be executed in an Apptainer container. The image needs to be accessible from the machine where the task is executed. There are no further checks if the image is available or valid. When using custom images, it may be helpful to first check the image with
apptainer inspect
. For people with access to the Belle II own/cvmfs
directory, images are provided in the/cvmfs/belle.cern.ch/images
directory. The description of the images (the repository contains the docker images which are transformed to Apptainer images) and instructions on how to create them can be found in https://gitlab.desy.de/belle2/software/docker-images.
apptainer_mounts
: A list of directories to mount into the Apptainer container.This parameter is optional and can be used to mount directories into the Apptainer container. The directories need to be accessible from the machine where the task is executed. The directories are mounted under the exact same path as they are provided/on the host machine. For most usecases mounts need to be provided to access software or data locations. For people using for example
basf2
software in the Apptainer container, the/cvmfs
directory needs to be mounted. Caution is required when system specific directories are mounted.
apptainer_mount_defaults
: Boolean parameter to mountlog_dir
andresult_dir
by default.The default value is
True
meaning theresult_dir
andlog_dir
are automatically created and mounted if they are not accessible from the execution location. When using custom targets with non local output directories, this parameter should be set toFalse
to avoid mounting non-existing directories.
apptainer_additional_params
: Additional parameters to pass to theapptainer exec
command.This parameter should be a string and will be directly appended to the
apptainer exec
command. It can be used to pass additional parameters to theapptainer exec
command as they would be added in the CLI. A very useful parameter is the--cleanenv
parameter which will clean the environment before executing the task in the Apptainer container. This can be useful to avoid conflicts with the environment in the container. A prominent usecase is the usage of software which depends on the operating system.
A simple example of how an Apptainer based task can be defined is shown below:
class MyApptainerTask(luigi.Task): apptainer_image = "/cvmfs/belle.cern.ch/images/belle2-base-el9" apptainer_mounts = ["/cvmfs"] apptainer_mount_defaults = True apptainer_additional_params = "--cleanenv" <rest of the task definition>
Add your own batch system#
If you want to add a new batch system, all you need to do is to implement the
abstract functions of BatchProcess
for your system:
- class b2luigi.batch.processes.BatchProcess(task, scheduler, result_queue, worker_timeout)[source]
This is the base class for all batch algorithms that allow luigi to run on a specific batch system. This is an abstract base class and inheriting classes need to supply functionalities for
starting a job using the commands in
self.task_cmd
getting the job status of a running, finished or failed job
and terminating a job
All those commands are called from the main process, which is not running on the batch system. Every batch system that is capable of these functions can in principle work together with
b2luigi
.- Implementation note:
In principle, using the batch system is transparent to the user. In case of problems, it may however be useful to understand how it is working.
When you start your
luigi
dependency tree withprocess(..., batch=True)
, the normalluigi
process is started looking for unfinished tasks and running them etc. Normally, luigi creates a process for each running task and runs them either directly or on a different core (if you have enabled more than one worker). In the batch case, this process is not a normal python multiprocessing process, but thisBatchProcess
, which has the same interface (one can check the status of the process, start or terminate it). The process does not need to wait for the batch job to finish but is asked repeatedly for the job status. By this, most of the core functionality ofluigi
is kept and reused. This also means, that every batch job only includes a single task and is finished whenever this task is done decreasing the batch runtime. You will need exactly as many batch jobs as you have tasks and no batch job will idle waiting for input data as all are scheduled only when the task they should run is actually runnable (the input files are there).What is the batch command now? In each job, we call a specific executable bash script only created for this task. It contains the setup of the environment (if given by the user via the settings), the change of the working directory (the directory of the python script or a specified directory by the user) and a call of this script with the current python interpreter (the one you used to call this main file or given by the setting
executable
) . However, we give this call an additional parameter, which tells it to only run one single task. Task can be identified by their task id. A typical task command may look like:/<path-to-your-exec>/python /your-project/some-file.py --batch-runner --task-id MyTask_38dsf879w3
if the batch job should run the
MyTask
. The implementation of the abstract functions is responsible for creating an running the executable file and writing the log of the job into appropriate locations. You can use the functionscreate_executable_wrapper
andget_log_file_dir
to get the needed information.Checkout the implementation of the
lsf
task for some implementation example.
- get_job_status()[source]
Implement this function to return the current job status. How you identify exactly your job is dependent on the implementation and needs to be handled by your own child class.
Must return one item of the JobStatus enumeration: running, aborted, successful or idle. Will only be called after the job is started but may also be called when the job is finished already. If the task status is unknown, return aborted. If the task has not started already but is scheduled, return running nevertheless (for
b2luigi
it makes no difference). No matter if aborted via a call toterminate_job
, by the batch system or by an exception in the job itself, you should return aborted if the job is not finished successfully (maybe you need to check the exit code of your job).
- start_job()[source]
Override this function in your child class to start a job on the batch system. It is called exactly once. You need to store any information identifying your batch job on your own.
You can use the
b2luigi.core.utils.get_log_file_dir
and theb2luigi.core.executable.create_executable_wrapper
functions to get the log base name and to create the executable script which you should call in your batch job.After the
start_job
function is called by the framework (and no exception is thrown), it is assumed that a batch job is started or scheduled.After the job is finished (no matter if aborted or successful) we assume the stdout and stderr is written into the two files given by
b2luigi.core.utils.get_log_file_dir(self.task)
.
- terminate_job()[source]
This command is used to abort a job started by the
start_job
function. It is only called once to abort a job, so make sure to either block until the job is really gone or be sure that it will go down soon. Especially, do not wait until the job is finished. It is called for example when the user pressesCtrl-C
.In some strange corner cases it may happen that this function is called even before the job is started (the
start_job
function is called). In this case, you do not need to do anything (but also not raise an exception).