Implemented Batch Processes#
LSF#
- class b2luigi.batch.processes.lsf.LSFJobStatusCache[source]#
Bases:
BatchJobStatusCache
- _ask_for_job_status(job_id=None)[source]#
Queries the job status from the LSF batch system and updates the internal job status mapping.
- Parameters:
job_id (str, optional) – The ID of the job to query. If not provided, the status of all jobs will be queried.
Notes
This method uses the
bjobs
command-line tool to fetch job statuses in JSON format.The output is expected to contain a “RECORDS” key with a list of job records.
Each job record should have “JOBID” and “STAT” keys, which are used to update the internal mapping.
- class b2luigi.batch.processes.lsf.LSFProcess(*args, **kwargs)[source]#
Bases:
BatchProcess
Reference implementation of the batch process for a LSF batch system. Heavily inspired by this post.
Additional to the basic batch setup (see Batch Processing), there are LSF-specific
settings
. These are:the LSF queue:
queue
.the number of slots for the job. On KEKCC this increases the memory available to the job:
job_slots
.the LSF job name:
job_name
.
For example:
class MyLongTask(b2luigi.Task): queue = "l" job_name = "my_long_task"
The default queue is the short queue
"s"
. If nojob_name
is set the task will appear as<result_dir>/parameter1=value/.../executable_wrapper.sh"
when running
bjobs
.By default, the environment variables from the scheduler are copied to the workers. This also implies we start in the same working directory, can reuse the same executable, etc. Normally, you do not need to supply
env_script
or alike.- get_job_status()[source]#
Retrieves the current status of the batch job associated with this instance.
- Returns:
- The status of the job, which can be one of the following:
JobStatus.successful
: If the job has completed successfully (“DONE”).JobStatus.aborted
: If the job has been aborted or is not found in the cache (“EXIT” or missing ID).JobStatus.running
: If the job is still in progress.
- Return type:
JobStatus
- start_job()[source]#
Submits a batch job to the LSF system.
This method constructs a command to submit a job using the
bsub
command-line tool. It dynamically configures the job submission parameters based on task-specific settings and creates necessary log files for capturing standard output and error.- Raises:
RuntimeError – If the batch submission fails or the job ID cannot be extracted from the
bsub
command output.
- Steps:
Retrieve optional settings for
queue
(-q
),job_slots
(-n
), andjob_name
(-J
).The
stdout
andstderr
log files are created in the task’s log directory. Seeget_log_file_dir
.The executable is created with
create_executable_wrapper
.
HTCondor#
- class b2luigi.batch.processes.htcondor.HTCondorJobStatusCache[source]#
Bases:
BatchJobStatusCache
- _ask_for_job_status(job_id: int = None)[source]#
With HTCondor, you can check the progress of your jobs using the
condor_q
command. If noJobId
is given as argument, this command shows you the status of all queued jobs (usually only your own by default).Normally the HTCondor
JobID
is stated asClusterId.ProcId
. Since only on job is queued per cluster, we can identify jobs by theirClusterId
(TheProcId
will be0
for all submitted jobs). With the-json
option, thecondor_q
output is returned in the JSON format. By specifying some attributes, not the entire jobClassAd
is returned, but only the necessary information to match a job to itsJobStatus
. The output is given as string and cannot be directly parsed into a json dictionary. It has the following form:[ {...} , {...} , {...} ]
The
{...}
are the different dictionaries including the specified attributes. Sometimes it might happen that a job is completed in between the status checks. Then its final status can be found in thecondor_history
file (works mostly in the same way ascondor_q
). Both commands are used in order to find out theJobStatus
.
- class b2luigi.batch.processes.htcondor.HTCondorJobStatus(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]#
Bases:
IntEnum
See https://htcondor.readthedocs.io/en/latest/classad-attributes/job-classad-attributes.html
- class b2luigi.batch.processes.htcondor.HTCondorProcess(*args, **kwargs)[source]#
Bases:
BatchProcess
Reference implementation of the batch process for a HTCondor batch system.
Additional to the basic batch setup (see Batch Processing), additional HTCondor-specific things are:
Please note that most of the HTCondor batch farms do not have the same environment setup on submission and worker machines, so you probably want to give an
env_script
, anenv
setting
and/or a differentexecutable
.HTCondor supports copying files from submission to workers. This means if the folder of your script(s)/python project/etc. is not accessible on the worker, you can copy it from the submission machine by adding it to the setting
transfer_files
. This list can host both folders and files. Please note that due to HTCondors file transfer mechanism, all specified folders and files will be copied into the worker node flattened, so if you specify a/b/c.txt you will end up with a file c.txt. If you use thetransfer_files
mechanism, you need to set theworking_dir
setting to “.” as the files will end up in the current worker scratch folder. All specified files/folders should be absolute paths.Hint
Please do not specify any parts or the full results folder. This will lead to unexpected behavior. We are working on a solution to also copy results, but until this the results folder is still expected to be shared.
If you copy your python project using this setting to the worker machine, do not forget to actually set it up in your setup script. Additionally, you might want to copy your
settings.json
as well.Via the
htcondor_settings
setting you can provide a dict as a for additional options, such as requested memory etc. Its value has to be a dictionary containing HTCondor settings as key/value pairs. These options will be written into the job submission file. For an overview of possible settings refer to the HTCondor documentation.Same as for the LSF, the
job_name
setting allows giving a meaningful name to a group of jobs. If you want to be htcondor-specific, you can provide theJobBatchName
as an entry in thehtcondor_settings
dict, which will override the globaljob_name
setting. This is useful for manually checking the status of specific jobs withcondor_q -batch <job name>
Example
1import b2luigi 2import random 3 4 5class MyNumberTask(b2luigi.Task): 6 some_parameter = b2luigi.IntParameter() 7 8 htcondor_settings = {"request_cpus": 1, "request_memory": "100 MB"} 9 10 def output(self): 11 yield self.add_to_output("output_file.txt") 12 13 def run(self): 14 print("I am now starting a task") 15 random_number = random.random() 16 17 if self.some_parameter == 3: 18 raise ValueError 19 20 with open(self.get_output_file_name("output_file.txt"), "w") as f: 21 f.write(f"{random_number}\n") 22 23 24class MyAverageTask(b2luigi.Task): 25 htcondor_settings = {"request_cpus": 1, "request_memory": "200 MB"} 26 27 def requires(self): 28 for i in range(10): 29 yield self.clone(MyNumberTask, some_parameter=i) 30 31 def output(self): 32 yield self.add_to_output("average.txt") 33 34 def run(self): 35 print("I am now starting the average task") 36 37 # Build the mean 38 summed_numbers = 0 39 counter = 0 40 for input_file in self.get_input_file_names("output_file.txt"): 41 with open(input_file, "r") as f: 42 summed_numbers += float(f.read()) 43 counter += 1 44 45 average = summed_numbers / counter 46 47 with open(self.get_output_file_name("average.txt"), "w") as f: 48 f.write(f"{average}\n") 49 50 51if __name__ == "__main__": 52 b2luigi.process(MyAverageTask(), workers=200, batch=True)
- get_job_status()[source]#
Determines the status of a batch job based on its HTCondor job status.
- Returns:
- The status of the job, which can be one of the following:
JobStatus.successful
: If the HTCondor job status is ‘completed’.JobStatus.running
: If the HTCondor job status is one of ‘idle’, ‘running’, ‘transferring_output’, or ‘suspended’.JobStatus.aborted
: If the HTCondor job status is ‘removed’, ‘held’, ‘failed’, or if the job ID is not found in the cache.
- Return type:
JobStatus
- Raises:
ValueError – If the HTCondor job status is unknown.
- start_job()[source]#
Starts a job by creating and submitting an HTCondor submit file.
This method generates an HTCondor submit file using the
_create_htcondor_submit_file
method, then submits the job using thecondor_submit
command.- Raises:
RuntimeError – If the batch submission fails or the job ID cannot be extracted from the
condor_submit
output.
- terminate_job()[source]#
Terminates a batch job managed by HTCondor.
This method checks if a batch job ID is available. If a valid job ID exists, it executes the
condor_rm
command to remove the job from the HTCondor queue.
- _create_htcondor_submit_file()[source]#
Creates an HTCondor submit file for the current task.
This method generates the content of an HTCondor submit file based on the task’s configuration and writes it to a file.
- Returns:
The path to the generated HTCondor submit file.
- Return type:
str
- Raises:
ValueError – If
transfer_files
contains non-absolute file paths or ifworking_dir
is not explicitly set to ‘.’ when usingtransfer_files
.
Note
The
stdout
andstderr
log files are created in the task’s log directory. Seeget_log_file_dir
.The HTCondor settings are specified in the
htcondor_settings
setting, which is a dictionary of key-value pairs.The executable is created with
create_executable_wrapper
.The
transfer_files
setting can be used to specify files or directories to be transferred to the worker node.The
job_name
setting can be used to specify a meaningful name for the job.The submit file is named job.submit and is created in the task’s output directory (
get_task_file_dir
).
Slurm#
- class b2luigi.batch.processes.slurm.SlurmJobStatusCache[source]#
Bases:
BatchJobStatusCache
- _ask_for_job_status(job_id: int = None)[source]#
With Slurm, you can check the progress of your jobs using the
squeue
command. If nojobID
is given as argument, this command shows you the status of all queued jobs.Sometimes it might happen that a job is completed in between the status checks. Then its final status can be found using
sacct
(works mostly in the same way assqueue
).If in the unlikely case the server has the Slurm accounting disabled, then the scontrol command is used as a last resort to access the jobs history. This is the fail safe command as the scontrol by design only holds onto a jobs information for a short period of time after completion. The time between status checks is sufficiently short however so the scontrol command should still have the jobs information on hand.
All three commands are used in order to find out the
SlurmJobStatus
.
- _fill_from_output(output: str) set [source]#
Parses the output of a Slurm command to extract job IDs and their states, updating the internal job status mapping and returning a set of seen job IDs.
- Parameters:
output (str) – The output string from a Slurm command, expected to be formatted as ‘<job id> <state>’ per line.
- Returns:
A set of job IDs that were parsed from the output.
- Return type:
set
- Raises:
AssertionError – If a line in the output does not contain exactly two entries (job ID and state).
Notes
If the output is empty, an empty set is returned.
Lines in the output that are empty or contain unexpected formatting are skipped.
Job states with a ‘+’ suffix (e.g., ‘CANCELLED+’) are normalized by stripping the ‘+’ character.
- _get_SlurmJobStatus_from_string(state_string: str) str [source]#
Converts a state string into a
SlurmJobStatus
enumeration value.- Parameters:
state_string (str) – The state string to be converted.
- Returns:
The corresponding
SlurmJobStatus
value.- Return type:
str
- Raises:
KeyError – If the provided state string does not match any valid
SlurmJobStatus
.
- _check_if_sacct_is_disabled_on_server() bool [source]#
Checks if the Slurm accounting command
sacct
is disabled on the system.This method determines whether the
sacct
command is unavailable or disabled by attempting to execute it and analyzing the output. The result is cached in theself.sacct_disabled
attribute to avoid repeated checks.- Returns:
True if
sacct
is disabled on the system,False
otherwise.- Return type:
bool
- class b2luigi.batch.processes.slurm.SlurmJobStatus(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]#
Bases:
Enum
See https://slurm.schedmd.com/job_state_codes.html TODO: make this a StrEnum with python>=3.11
- completed#
The job has completed successfully.
- Type:
str
- pending#
The job is waiting to be scheduled.
- Type:
str
- running#
The job is currently running.
- Type:
str
- suspended#
The job has been suspended.
- Type:
str
- preempted#
The job has been preempted by another job.
- Type:
str
- completing#
The job is in the process of completing.
- Type:
str
- boot_fail#
The job failed during the boot process.
- Type:
str
- cancelled#
The job was cancelled by the user or system.
- Type:
str
- deadline#
The job missed its deadline.
- Type:
str
- node_fail#
The job failed due to a node failure.
- Type:
str
- out_of_memory#
The job ran out of memory.
- Type:
str
- failed#
The job failed for an unspecified reason.
- Type:
str
- timeout#
The job exceeded its time limit.
- Type:
str
- class b2luigi.batch.processes.slurm.SlurmProcess(*args, **kwargs)[source]#
Bases:
BatchProcess
Reference implementation of the batch process for a Slurm batch system.
Additional to the basic batch setup (see Batch Processing), additional Slurm-specific things are:
Please note that most of the Slurm batch farms by default copy the user environment from the submission node to the worker machine. As this can lead to different results when running the same tasks depending on your active environment, you probably want to pass the argument
export=NONE
. This ensures that a reproducible environment is used. You can provide anenv_script
, anenv
setting
, and/or a differentexecutable
to create the environment necessary for your task.Via the
slurm_settings
setting you can provide a dict for additional options, such as requested memory etc. Its value has to be a dictionary containing Slurm settings as key/value pairs. These options will be written into the job submission file. For an overview of possible settings refer to the Slurm documentation <https://slurm.schedmd.com/sbatch.html#>_ and the documentation of the cluster you are using.Same as for the LSF and HTCondor, the
job_name
setting allows giving a meaningful name to a group of jobs. If you want to be task-instance-specific, you can provide thejob-name
as an entry in theslurm_settings
dict, which will override the globaljob_name
setting. This is useful for manually checking the status of specific jobs withsqueue --name <job name>
Example
1import b2luigi 2import random 3 4 5class MyNumberTask(b2luigi.Task): 6 some_parameter = b2luigi.IntParameter() 7 batch_system = "slurm" 8 slurm_settings = {"export": "NONE", "ntasks": 1, "mem": "100MB"} 9 10 def output(self): 11 yield self.add_to_output("output_file.txt") 12 13 def run(self): 14 print("I am now starting a task") 15 random_number = random.random() 16 17 with open(self.get_output_file_name("output_file.txt"), "w") as f: 18 f.write(f"{random_number}\n") 19 20 21class MyAverageTask(b2luigi.Task): 22 batch_system = "slurm" 23 slurm_settings = {"export": "NONE", "ntasks": 1, "mem": "100MB"} 24 25 def requires(self): 26 for i in range(10): 27 yield self.clone(MyNumberTask, some_parameter=i) 28 29 def output(self): 30 yield self.add_to_output("average.txt") 31 32 def run(self): 33 print("I am now starting the average task") 34 35 # Build the mean 36 summed_numbers = 0 37 counter = 0 38 for input_file in self.get_input_file_names("output_file.txt"): 39 with open(input_file, "r") as f: 40 summed_numbers += float(f.read()) 41 counter += 1 42 43 average = summed_numbers / counter 44 45 with open(self.get_output_file_name("average.txt"), "w") as f: 46 f.write(f"{average}\n") 47 48 49if __name__ == "__main__": 50 b2luigi.process(MyAverageTask(), workers=200, batch=True)
- get_job_status()[source]#
Determine the status of a batch job based on its Slurm job status.
- Returns:
- The status of the job, which can be one of the following:
JobStatus.successful
: If the job has completed successfully.JobStatus.running
: If the job is currently running, pending, suspended, preempted, or completing.JobStatus.aborted
: If the job has failed, been cancelled, exceeded its deadline, encountered a node failure, ran out of memory, timed out, or if the job ID is not found.
- Return type:
JobStatus
- Raises:
ValueError – If the Slurm job status is unknown or not handled.
- start_job()[source]#
Starts a job by submitting the Slurm submission script.
This method creates a Slurm submit file and submits it using the
sbatch
command. After submission, it parses the output to extract the batch job ID.- Raises:
RuntimeError – If the batch submission fails or the job ID cannot be extracted.
- self._batch_job_id#
The ID of the submitted Slurm batch job.
- Type:
int
- terminate_job()[source]#
Terminates a batch job if a job ID is available.
This method checks if a batch job ID is set. If it is, it executes the
scancel
command to terminate the job associated with the given batch job ID. The command’s output is suppressed.
- _create_slurm_submit_file()[source]#
Creates a Slurm submit file for the current task.
This method generates a Slurm batch script that specifies the necessary configurations for submitting a job to a Slurm workload manager.
- Returns:
The path to the generated Slurm submit file.
- Return type:
pathlib.Path
Note
The
stdout
andstderr
log files are created in the task’s log directory. Seeget_log_file_dir
.The Slurm settings are specified in the
slurm_settings
setting, which is a dictionary of key-value pairs.The
job_name
setting can be used to specify a meaningful name for the job.The executable is created with
create_executable_wrapper
.The submit file is named slurm_parameters.sh and is created in the task’s output directory (
get_task_file_dir
).
gbasf2
#
- class b2luigi.batch.processes.gbasf2.Gbasf2Process(*args, **kwargs)[source]#
Bases:
BatchProcess
Batch process for working with
gbasf2
projects on the LHC Computing Grid (LCG).- Features
``gbasf2`` project submission
The
gbasf2
batch process takes thebasf2
path returned by thecreate_path
method of the task, saves it into a pickle file to the disk and creates a wrapper steering file that executes the saved path. Anybasf2
variable aliases added in thePath()
orcreate_path()
method are also stored in the pickle file. It then sends both the pickle file and the steering file wrapper to the grid via the Belle II-specific DIRAC-wrappergbasf2
. However,b2luigi
supports the submission of custom steering files with the settinggbasf2_custom_steering_file
. This conserves the way that thebasf2
path os still contained in thecreate_path()
method. In this instance,b2luigi``checks automatically if the corresponding file exists and copies it into the active directory. The ``gbasf2
task is then set to submit the unpickled file to the grid job which allows the utilization of python-based basf2 modules.Project status monitoring
After the project submission, the gbasf batch process regularly checks the status of all the jobs belonging to a
gbasf2
project returns a success if all jobs had been successful, while a single failed job results in a failed project. You can close a runningb2luigi
process and then start your script again and if a task with the same project name is running, thisb2luigi
gbasf2
wrapper will recognize that and instead of resubmitting a new project, continue monitoring the running project.Hint
The outputs of
gbasf2
tasks can be a bit overwhelming, so I recommend using the central scheduler which provides a nice overview of all tasks in the browser, including a status/progress indicator how many jobs in agbasf2
project are already done.Automatic download of datasets and logs
If all jobs had been successful, it automatically downloads the output dataset and the log files from the job sandboxes and automatically checks if the download was successful before moving the data to the final location. On failure, it only downloads the logs. The dataset download can be optionally disabled.
Automatic rescheduling of failed jobs
Whenever a job fails,
gbasf2
reschedules it as long as the number of retries is below the value of the settinggbasf2_max_retries
. It keeps track of the number of retries in a local file in thelog_file_dir
, so that it does not change if you closeb2luigi
and start it again. Of course it does not persist if you remove that file or move to a different machine.
Note
Despite all the automatization that this
gbasf2
wrapper provides, the user is expected to have a basic understanding of how the grid works and know the basics of working withgbasf2
commands manually.- Caveats
The
gbasf2
batch process for luigi can only be used for tasks inheriting fromBasf2PathTask
or other tasks with acreate_path()
method that returns abasf2
path.It can be used only for picklable ``basf2`` paths, with only some limited global
basf2
state saved (currently aliases and global tags). The batch process stores the path created bycreate_path
in a python pickle file and runs that on the grid. Therefore, python ``basf2`` modules are not yet supported. To see if the path produced by a steering file is picklable, you can try to dump it withbasf2 --dump-path
and execute it again withbasf2 --execute-path
. In case the steering file contains content (e.g. modules) that cannot be pickled, the feature settinggbasf2_custom_steering_file
can be utilized which has to be set to the path of the steering file the user wishes to be used. This submits the custom steering file to the grid job. The specific use case for this is the usage and interaction with python-basedbasf2
modules that are not pickable.Output format: Changing the batch to
gbasf2
means you also have to adapt how you handle the output of yourgbasf2
task in tasks depending on it, because the output will not be a single root file anymore (e.g.B_ntuple.root
), but a collection of root files, one for each file in the input data set, in a directory with the base name of the root files, e.g.:<task output directory> ├── B_ntuple.root │ └── B_ntuple_0.root │ └── B_ntuple_1.root │ └── ... ├── D_ntuple.root │ └── D_ntuple_0.root │ └── ...
- Settings for
gbasf2
tasks: To submit a task with the
gbasf2
wrapper, you first you have to add the propertybatch_system = "gbasf2"
, which sets thebatch_system
setting. It is not recommended to set that setting globally, as not all tasks can be submitted to the grid, but only tasks with acreate_path
method.For
gbasf2
tasks it is further required to set the settingsgbasf2_input_dataset
: String with the logical path of a dataset on the grid to use as an input to the task. You can provide multiple inputs by having multiple paths contained in this string, separated by commas without spaces. An alternative is to just instantiate multiple tasks with different input datasets, if you want to know in retrospect which input dataset had been used for the production of a specific output.gbasf2_input_dslist
: Alternatively togbasf2_input_dataset
, you can use this setting to provide a text file containing the logical grid path names, one per line.gbasf2_project_name_prefix
: A string with which yourgbasf2
project names will start. To ensure the project associate with each unique task (i.e. for each of luigi parameters) is unique, the uniquetask.task_id
is hashed and appended to the prefix to create the actualgbasf2
project name. Should be below 22 characters so that the project name with the hash can remain under 32 characters.
The following example shows a minimal class with all required options to run on the
gbasf2
/grid batch:class MyTask(Basf2PathTask): batch_system = "gbasf2" gbasf2_project_name_prefix = b2luigi.Parameter(significant=False) gbasf2_input_dataset = b2luigi.Parameter(hashed=True)
Other not required, but noteworthy settings are:
gbasf2_setup_path
: Path togbasf2
environment setup script that needs so be sourced to rungbasf2
commands. Defaults to /cvmfs/belle.kek.jp/grid/gbasf2/pro/bashrc.gbasf2_release
: Defaults to the release of your currently set upbasf2
release. Set this if you want the jobs to use another release on the grid.gbasf2_proxy_lifetime
: Defaults to24
. When initializing a proxy, set the lifetime to this number of hours.gbasf2_min_proxy_lifetime
: Defaults to0
. During processing, prompt user to reinitialize proxy if remaining proxy lifetime drops below this number of hours.gbasf2_print_status_updates
: Defaults toTrue
. By setting it toFalse
you can turn off the printing of of the job summaries, that is the number of jobs in different states in agbasf2
project.gbasf2_max_retries
: Default to0
. Maximum number of times that each job in the project can be automatically rescheduled until the project is declared as failed.gbasf2_proxy_group
: Default to"belle"
. If provided, thegbasf2
wrapper will work with the customgbasf2
group, specified in this parameter. No need to specify this parameter in case of usual physics analysis at Belle II. If specified, one has to providegbasf2_project_lpn_path
parameter.gbasf2_project_lpn_path
: Path to the LPN folder for a specifiedgbasf2
group. The parameter has no effect unless thegbasf2_proxy_group
is used with non-default value.gbasf2_jinja_template_path
: This parameter sets a custom basf2 steering template where the user can adapt the default template (e.g. for altering the pdg database, …). Note that this is an expert option that should be treated with care.gbasf2_additional_download_params
: Defaults to"--new"
. This parameter sets additional parameters that are given togb2_ds_get
. Note that in case you override the parameter, the--new
parameter is not automatically set, so you might have to manually add--new
if you want this parameter to be used.gbasf2_download_dataset
: Defaults toTrue
. Disable this setting if you don’t want to download the output dataset from the grid on job success. As you can’t use the downloaded dataset as an output target forluigi
, you should then use the providedGbasf2GridProjectTarget
, as shown in the following example:from b2luigi.batch.processes.gbasf2 import get_unique_project_name, Gbasf2GridProjectTarget class MyTask(Basf2PathTask): # [...] def output(self): project_name = get_unique_project_name(self) return Gbasf2GridProjectTarget(project_name)
This is useful when chaining
gbasf2
tasks together, as they don’t need the output locally but take the grid datasets as input. Also useful when you just want to produce data on the grid for other people to use.Caution
For
Gbasf2GridProjectTarget``s the global and the task ``gbasf2_proxy_group
cannot differ.Tip
Set only one global
gbasf2_proxy_group
setting.gbasf2_download_logs
: Whether to automatically download the log output ofgbasf2
projects when the task succeeds or fails. Having the logs is important for reproducibility.gbasf2_custom_steering_file
: Optional path to submit a custom steering file togbasf2
. This does not pickle thebasf2.Path
instance and allows the utilization of python-based basf2 modules. Named modules have to be contained either in the steering file itself or by additional files via the input sandbox.
The following optional settings correspond to the equally named
gbasf
command line options (without thegbasf_
prefix) that you can set to customize yourgbasf2
project:gbasf2_noscout
,gbasf2_additional_files
,gbasf2_input_datafiles
,gbasf2_n_repition_job
,gbasf2_force_submission
,gbasf2_cputime
,gbasf2_evtpersec
,gbasf2_priority
,gbasf2_jobtype
,gbasf2_basf2opt
,gbasf2_lfn_sandboxfiles
,gbasf2_input_grouping
It is further possible to append arbitrary command line arguments to the
gbasf2
submission command with thegbasf2_additional_params
setting. If you want to blacklist a grid site, you can e.g. addb2luigi.set_setting("gbasf2_additional_params", "--banned_site LCG.KEK.jp")
- Example
Here is an example file to submit an analysis path created by the script in examples/gbasf2/example_mdst_analysis to grid via
gbasf2
:File:examples/gbasf2/gbasf2_example.py
#1import b2luigi 2from b2luigi.basf2_helper.tasks import Basf2PathTask 3 4import example_mdst_analysis 5 6 7class AnalysisTask(Basf2PathTask): 8 # set the batch_system property to use the gbasf2 wrapper batch process for this task 9 batch_system = "gbasf2" 10 # Must define a prefix for the gbasf2 project name to submit to the grid. 11 # b2luigi will then add a hash derived from the luigi parameters to create a unique project name. 12 gbasf2_project_name_prefix = b2luigi.Parameter() 13 gbasf2_input_dataset = b2luigi.Parameter(hashed=True) 14 # Example luigi cut parameter to facilitate starting multiple projects for different cut values 15 mbc_lower_cut = b2luigi.IntParameter() 16 17 def create_path(self): 18 mbc_range = (self.mbc_lower_cut, 5.3) 19 return example_mdst_analysis.create_analysis_path( 20 d_ntuple_filename="D_ntuple.root", b_ntuple_filename="B_ntuple.root", mbc_range=mbc_range 21 ) 22 23 def output(self): 24 yield self.add_to_output("D_ntuple.root") 25 yield self.add_to_output("B_ntuple.root") 26 27 28class AnalysisWrapperTask(b2luigi.WrapperTask): 29 """ 30 We use the AnalysisWrapperTask to be able to require multiple analyse tasks with 31 different input datasets and cut values. For each parameter combination, a 32 different gbasf2 project will be submitted. 33 """ 34 35 def requires(self): 36 input_dataset = ( 37 "/belle/MC/release-04-01-04/DB00000774/SkimM13ax1/prod00011778/e1003/4S/r00000/mixed/11180100/udst/sub00/" 38 "udst_000006_prod00011778_task10020000006.root" 39 ) 40 # if you want to iterate over different cuts, just add more values to this list 41 mbc_lower_cuts = [5.15, 5.2] 42 for mbc_lower_cut in mbc_lower_cuts: 43 yield AnalysisTask( 44 mbc_lower_cut=mbc_lower_cut, 45 gbasf2_project_name_prefix="luigiExample", 46 gbasf2_input_dataset=input_dataset, 47 max_event=100, 48 ) 49 50 51if __name__ == "__main__": 52 main_task_instance = AnalysisWrapperTask() 53 n_gbasf2_tasks = len(list(main_task_instance.requires())) 54 b2luigi.process(main_task_instance, workers=n_gbasf2_tasks)
- Handling failed jobs
The
gbasf2
input wrapper considers thegbasf2
project as failed if any of the jobs in the project failed and reached the maximum number of retries. It then automatically downloads the logs, so please look into them to see what the reason was. For example, it can be that only certain grid sites were affected, so you might want to exclude them by adding the"--banned_site ...
togbasf2_additional_params
.You also always reschedule jobs manually with the
gb2_job_reschedule
command or delete them withgb2_job_delete
so that thegbasf2
batch process doesn’t know they ever existed. Then run just run your luigi task/script again and it will start monitoring the running project again.
- pickle_file_path#
file name in which the pickled basf2 path from
self.task.create_path()
will be stored
- wrapper_file_path#
file name for steering file that executes pickled path, which will be send to the grid
- max_retries#
Maximum number of times that each job in the project can be rescheduled until the project is declared as failed.
- gbasf2_custom_steering_file#
Setting to incorporate custom steering files into b2luigi.
- n_retries_by_job#
Store number of times each job had been rescheduled
- get_job_status()[source]#
Get the overall status of the
gbasf2
project.This method determines the current status of all jobs in a
gbasf2
project (seeget_gbasf2_project_job_status_dict
). and returns an overall project status. It uses cached job status data if available and updates it if necessary. The method also handles job rescheduling in case of failures and updates task progress and status messages for the central scheduler.- Returns:
- The overall status of the
gbasf2
project. Possible values are: JobStatus.running
: If the project is still in progress.JobStatus.successful
: If all jobs in the project are completed successfully.JobStatus.aborted
: If any job in the project fails and cannot be rescheduled.
- The overall status of the
- Return type:
JobStatus
- Raises:
RuntimeError – If the job status cannot be determined.
Notes
The job status is cached for 5 minutes to optimize performance.
Failed jobs are rescheduled up to a maximum number of retries before the project is marked as aborted.
The project is considered successful only if all jobs are completed successfully.
Progress and status updates are logged if
gbasf2_print_status_updates
is enabled.
- _on_first_success_action()[source]#
Executes actions to be performed after all jobs in the project have successfully completed.
This method checks specific settings to determine whether to download logs and datasets associated with the project. If the respective settings are enabled, it triggers the corresponding download operations.
Note
Downloads logs if the “gbasf2_download_logs” setting is enabled.
Downloads the dataset if the “gbasf2_download_dataset” setting is enabled.
- _on_failure_action()[source]#
Handles actions to be performed after a project failure.
This method retrieves the job status dictionary for the specified
gbasf2
project and identifies failed jobs. A job is considered failed if its status is “Failed” or if its status is “Done” but its application status is not “Done”. The number of failed jobs and their details are printed. If the settinggbasf2_download_logs
is enabled, it triggers the download of logs for further investigation.
- _reschedule_failed_jobs()[source]#
Attempts to reschedule failed jobs in the project if the maximum number of retries (
gbasf2_max_retries
) has not been reached.This method evaluates the status of jobs in the project and determines whether they should be rescheduled or if they have hit the maximum retry limit. Jobs that are eligible for rescheduling are added to a rescheduling queue, while jobs that have reached the retry limit are logged with a warning.
- Returns:
True
if all failed jobs were successfully rescheduled or no jobs needed rescheduling.False
if any jobs reached the maximum retry limit.
- Return type:
bool
- Raises:
RuntimeWarning – If any jobs have reached the maximum retry limit.
Notes
A job is considered failed if its status is “Failed” or if its status is “Done” but its application status is not “Done”.
The method relies on
get_gbasf2_project_job_status_dict
to retrieve the current status of jobs and_reschedule_jobs
to perform the rescheduling.
- _reschedule_jobs(job_ids)[source]#
Reschedule a list of jobs by their IDs.
This method takes a list of job IDs, logs the rescheduling process, and executes a command to reschedule the specified jobs using the
gb2_job_reschedule
utility. It also includes the number of retries for each job in the log output.- Parameters:
job_ids (list of str) – A list of job IDs to be rescheduled.
- Raises:
KeyError – If a job ID in the list is not found in self.n_retries_by_job.
Notes
The gb2_job_reschedule command is executed with the –force flag.
The
run_with_gbasf2
function is used to execute the rescheduling command.
- start_job()[source]#
Submit a new gbasf2 project to the grid.
This method checks if a project with the specified name already exists on the grid. If it does, the submission is skipped, and a message is printed. Otherwise, it prepares the necessary steering files and submits the project.
- Steps:
Check if the project already exists on the grid. (see
check_project_exists
)- Prepare the steering file:
If a custom steering file is provided, copy it. (see
_copy_custom_steering_script
)Otherwise, create a wrapper steering file. (see:
_create_wrapper_steering_file
)
Build the gbasf2 submission command. (see
_build_gbasf2_submit_command
)Create a symlink for the pickle file to ensure it is included in the grid input sandbox.
Submit the project using the
gbasf2
command. (seerun_with_gbasf2
)Clean up the symlink after submission.
- Raises:
OSError – If there is an issue creating or removing the symlink.
Note
If the project already exists, the user is advised to change the project name to submit a new project if needed.
- terminate_job()[source]#
Terminate a gbasf2 project if it exists.
This method checks if the specified gbasf2 project exists. If it does, it terminates the project using the
gb2_job_kill
command with the--force
option. Terminated jobs are not removed from the job database and can be restarted if needed.Note
The
gb2_job_delete
command differs fromgb2_job_kill
in that deleted jobs are permanently removed from the job database, while terminated jobs remain in the database.
- _build_gbasf2_submit_command()[source]#
Constructs the
gbasf2
submit command string based on task options and attributes.This method generates a command string to submit a
gbasf2
job, incorporating various settings and parameters. It validates inputs, handles optional parameters, and ensures the command is properly formatted for execution.- Returns:
A list of command-line arguments for the
gbasf2
submission command.- Return type:
list
- Raises:
ValueError – If
gbasf2_additional_files
is not an iterable or is a string.RuntimeError – If both
gbasf2_input_dataset
andgbasf2_input_dslist
are set, or if neither is set.FileNotFoundError – If the file specified in
gbasf2_input_dslist
does not exist.ValueError – If
priority
is not an integer between 0 and 10.ValueError – If
gbasf2_proxy_group
is non-default andgbasf2_project_lpn_path
is not provided.
Notes
The method uses various task settings to construct the command, such as:
gbasf2_release
,gbasf2_additional_files
,gbasf2_input_dataset
,gbasf2_input_dslist
,gbasf2_n_repition_job
,gbasf2_input_datafiles
,gbasf2_force_submission
,gbasf2_cputime
,gbasf2_evtpersec
,gbasf2_priority
,gbasf2_jobtype
,gbasf2_basf2opt
, andgbasf2_additional_params
.If the proxy group is not
"belle"
, an output dataset path must be specified.
- _write_path_to_file()[source]#
Serializes and saves a
basf2.Path
object and variable aliases to a pickle file.This method attempts to create a
basf2.Path
object using theb2luigi.basf2_helper.Basf2PathTask.create_path()
method of the associated task. The resulting path, along with any variable aliases from the current variable manager instance, is serialized and written to a specified pickle file.- Raises:
Exception – If the associated task does not have a
create_path()
method, indicating it is not an instance of :class`Basf2PathTask` or a compatible class.
- Dependencies:
write_path_and_state_to_file
: Used to perform the serialization and file writing.
- _create_wrapper_steering_file()[source]#
Generates a steering file for grid execution by processing a Jinja2 template.
This method creates a steering file that executes the pickled
basf2
path generated byb2luigi.basf2_helper.Basf2PathTask.create_path()
. It reads a Jinja2 template, replaces placeholders with appropriate values, and writes the processed template to a new file for grid submission.The following steps are performed: 1. Reads the Jinja2 template file specified by the
gbasf2_jinja_template_path
setting. 2. Replaces template variables such as the pickle file path and maximum event count. 3. Writes the processed template toself.wrapper_file_path
.
- _copy_custom_steering_script()[source]#
Copies a custom
gbasf2
steering script to the specified wrapper file path.This method checks if the custom
gbasf2
steering file exists at the specified location. If it exists, the file is copied to the wrapper file path. If the file does not exist, aValueError
is raised.- Raises:
ValueError – If the custom
gbasf2
steering file does not exist at the specified path.
- _get_gbasf2_dataset_query(output_file_name: str) str [source]#
Constructs a query string to retrieve a subset of files from a grid project associated with the current task, based on the provided output file name.
This method generates a wildcard pattern for use with grid commands such as
gb2_ds_list
orgb2_ds_get
to locate files matching the specified output file name within the project’s directory structure.- Parameters:
output_file_name (str) – The name of the output file, which must be a basename (not a path) and should end with the “.root” extension.
- Returns:
A query string containing the wildcard pattern to locate the desired files in the grid project.
- Return type:
str
- Raises:
ValueError – If the provided
output_file_name
is not a basename, does not end with “.root”, or if required settings for non-default proxy groups are missing.
- static _get_project_download_path(base_output_dir: str)[source]#
Get the location of the .root files of the downloaded project :param base_output_dir: base directory for the gbasf2 download
- Returns:
str
- _local_gb2_dataset_is_complete(output_file_name: str, check_temp_dir: bool = False) bool [source]#
Helper method that returns
True
if the download of the gbasf2 dataset for the outputoutput_file_name
is complete.- Parameters:
output_file_name – Output file name, must be a root file, e.g.
ntuple.root
. Usually defined by the user viab2luigi.Task.add_to_output()
in theb2luigi.Task.output()
method.check_temp_dir – Instead of checking the final output path, check whether the download into the temporary (“partial”) directory is complete. This function is usually called with this argument set to
True
, to check whether the dataset can be moved to its final output path.
- _download_dataset()[source]#
Downloads the task outputs from the gbasf2 project dataset.
This method ensures that all files matching the naming pattern name_*.root from the grid dataset corresponding to the project name are downloaded into a specified directory. The download process is handled in a way that prevents marking the task as complete if the download fails.
Steps: 1. Checks if the dataset exists on the grid for the specified project name. 2. For each task output, constructs a query string to locate the dataset files. 3. Skips downloading if the dataset already exists locally and is complete. 4. Downloads the dataset into a temporary directory to ensure atomicity. 5. Handles failed downloads by retrying only the missing files. 6. Verifies the completeness of the downloaded dataset. 7. Moves the successfully downloaded dataset to the final output directory.
- Raises:
RuntimeError – If the dataset does not exist on the grid.
RuntimeError – If no output data is found for the specified project.
RuntimeError – If the downloaded dataset is incomplete.
Notes
Temporary directories are used to avoid marking the task as complete prematurely.
Failed downloads are tracked and retried using a monitoring file.
Additional parameters for the
gb2_ds_get
command can be configured via task settings.
- _download_logs()[source]#
Download sandbox files from grid with logs for each job in the gbasf2 project.
It wraps
gb2_job_output
, which downloads the job sandbox, which has the following structure:log └── <project name> ├── <first job id> │ ├── job.info │ ├── Script1_basf2helper.py.log # basf2 outputs │ └── std.out ├── <second job id> │ ├── ... ...
These are stored in the task log dir.
- class b2luigi.batch.processes.gbasf2.Gbasf2GridProjectTarget(project_name, dirac_user=None, gbasf2_setup_path='/cvmfs/belle.kek.jp/grid/gbasf2/pro/bashrc')[source]#
Bases:
Target
Gbasf2GridProjectTarget is a custom Luigi Target that checks the existence and status of a dataset produced by a
gbasf2
grid project. It ensures that the dataset exists on the grid and that all jobs associated with the project have completed successfully.- project_name#
Name of the
gbasf2
grid project that produced the dataset and under which the dataset is stored.- Type:
str
- dirac_user#
Dirac user who produced the output dataset. If None, the current user is used.
- Type:
str, optional
- gbasf2_setup_path#
Path to the
gbasf2
setup file. Defaults to “/cvmfs/belle.kek.jp/grid/gbasf2/pro/bashrc”.- Type:
str
- exists()[source]#
Checks the existence and status of a dataset or project on the grid.
This method performs the following checks:
Verifies if a dataset associated with the given project name exists on the grid. (see
check_dataset_exists_on_grid
)If a dataset exists, checks whether a project with the same name exists on the grid. (see
check_project_exists
)Ensures that no jobs are actively writing to the project by verifying that all jobs associated with the project have a status of “Done” and an application status of “Done”.
- Returns:
True
if the dataset exists, the project exists, and no jobs are actively writing to it.False
otherwise.- Return type:
bool
- b2luigi.batch.processes.gbasf2.check_dataset_exists_on_grid(gbasf2_project_name, dirac_user=None, gbasf2_setup_path='/cvmfs/belle.kek.jp/grid/gbasf2/pro/bashrc', task=None)[source]#
Check if an output dataset exists for the specified
gbasf2
project on the grid.- Parameters:
gbasf2_project_name (str) – The name of the
gbasf2
project to check.dirac_user (str, optional) – The DIRAC user to use for querying. Defaults to
None
.gbasf2_setup_path (str, optional) – The path to the
gbasf2
setup script. Defaults to “/cvmfs/belle.kek.jp/grid/gbasf2/pro/bashrc”.task (object, optional) – An optional task object for retrieving settings. Defaults to None.
- Returns:
True if the dataset exists on the grid, False otherwise.
- Return type:
bool
- Raises:
ValueError – If
gbasf2_proxy_group
is set to a non-default value (“belle”) andgbasf2_project_lpn_path
is not provided in the settings.
- b2luigi.batch.processes.gbasf2.get_gbasf2_project_job_status_dict(gbasf2_project_name, dirac_user=None, gbasf2_setup_path='/cvmfs/belle.kek.jp/grid/gbasf2/pro/bashrc', task=None)[source]#
Returns a dictionary for all jobs in the project with a structure like the following, which I have taken and adapted from an example output:
{ "<JobID>": { "SubmissionTime": "2020-03-27 13:08:49", "Owner": "<dirac username>", "JobGroup": "<ProjectName>", "ApplicationStatus": "Done", "HeartBeatTime": "2020-03-27 16:01:39", "Site": "LCG.KEK.jp", "MinorStatus": "Execution Complete", "LastUpdateTime": "2020-03-27 16:01:40", "Status": "<Job Status>" } ... }
For that purpose, the script in
gbasf2_job_status.py
is called. That script directly interfaces with Dirac via its API, but it only works with the gbasf2 environment and python2, which is why it is called as a subprocess. The job status dictionary is passed to this function via json.
- b2luigi.batch.processes.gbasf2.check_project_exists(gbasf2_project_name, dirac_user=None, gbasf2_setup_path='/cvmfs/belle.kek.jp/grid/gbasf2/pro/bashrc', task=None)[source]#
Check if a
gbasf2
project exists on the grid.This function verifies the existence of a
gbasf2
project by attempting to retrieve its job status using theget_gbasf2_project_job_status_dict
function. If the retrieval fails due to aRuntimeError
, the function returnsFalse
.- Parameters:
gbasf2_project_name (str) – The name of the
gbasf2
project to check.dirac_user (str, optional) – The DIRAC user associated with the project. Defaults to
None
.gbasf2_setup_path (str, optional) – The path to the
gbasf2
setup script. Defaults to “/cvmfs/belle.kek.jp/grid/gbasf2/pro/bashrc”.task (optional) – Additional task information to pass to the status retrieval function. Defaults to
None
.
- Returns:
True if the project exists on the grid, False otherwise.
- Return type:
bool
- b2luigi.batch.processes.gbasf2.run_with_gbasf2(cmd, *args, ensure_proxy_initialized=True, check=True, encoding='utf-8', capture_output=False, gbasf2_setup_path='/cvmfs/belle.kek.jp/grid/gbasf2/pro/bashrc', task=None, **kwargs)[source]#
Call a command in a subprocess with the
gbasf2
environment.This function wraps the execution of a command in a subprocess, ensuring that the
gbasf2
environment is properly set up and optionally verifying that the DIRAC proxy is initialized. It provides additional features such as capturing output and specifying encoding.- Parameters:
cmd (str or list) – The command to execute. Can be a string or a list of arguments.
*args – Additional positional arguments to pass to subprocess.run.
ensure_proxy_initialized (bool, optional) – If
True
, ensures that the DIRAC proxy is initialized and alive. Defaults toTrue
.check (bool, optional) – If
True
, raises asubprocess.CalledProcessError
if the command exits with a non-zero status. Defaults toTrue
.encoding (str, optional) – The encoding to use for interpreting the command output. Defaults to “utf-8”.
capture_output (bool, optional) – If True, captures the
stdout
andstderr
of the command. Equivalent to setting them tosubprocess.PIPE
. Defaults to False.gbasf2_setup_path (str, optional) – Path to the gbas`f2 setup file. Defaults to “/cvmfs/belle.kek.jp/grid/gbasf2/pro/bashrc”.
task (optional) – Task-specific information to pass to the environment setup. Defaults to None.
**kwargs – Additional keyword arguments to pass to
subprocess.run
.
- Returns:
An instance representing the completed process.
- Return type:
subprocess.CompletedProcess
- Raises:
ValueError – If
stdout
orstderr
are specified inkwargs
whilecapture_output
isTrue
.
Notes
If
capture_output
isTrue
, thestdout
andstderr
of the command are captured and returned as strings (decoded using the specified encoding).The
gbasf2
environment is set up using theget_gbasf2_env
function.If
ensure_proxy_initialized
is True, the DIRAC proxy is initialized using thesetup_dirac_proxy
function.
- b2luigi.batch.processes.gbasf2.get_gbasf2_env(gbasf2_setup_path, task=None)[source]#
Retrieve the
gbasf2
environment as a dictionary, which can be used to rungbasf2
commands.This function sets up the
gbasf2
environment by sourcing the specified setup file in a fresh shell and capturing the resulting environment variables. It ensures that the environment is isolated, except for theHOME
variable, which is required for the setup process.- Parameters:
gbasf2_setup_path (str) – Path to the gbasf2 setup file.
task (optional) – Task parameter used to retrieve the
gbasf2_proxy_group
setting. Defaults to"belle"
if not provided.
- Returns:
A dictionary containing the environment variables set up by the gbasf2 setup file.
- Return type:
dict
- Raises:
FileNotFoundError – If the specified
gbasf2
setup file does not exist.
- b2luigi.batch.processes.gbasf2.get_proxy_info(gbasf2_setup_path='/cvmfs/belle.kek.jp/grid/gbasf2/pro/bashrc', task=None)[source]#
Retrieve a dictionary containing the proxy status by running the
gbasf2_proxy_info.py
script.- Parameters:
gbasf2_setup_path (str, optional) – The path to the gbasf2 setup script. Defaults to “/cvmfs/belle.kek.jp/grid/gbasf2/pro/bashrc”.
task (optional) – An optional task parameter to pass to the subprocess.
- Returns:
A dictionary containing the proxy status information.
- Return type:
dict
Notes
The function uses the
run_with_gbasf2
utility to execute thegbasf2_proxy_info.py
script.
- b2luigi.batch.processes.gbasf2.get_dirac_user(gbasf2_setup_path='/cvmfs/belle.kek.jp/grid/gbasf2/pro/bashrc', task=None)[source]#
Retrieve the DIRAC username associated with the initialized proxy.
- Parameters:
gbasf2_setup_path (str) – The file path to the
gbasf2
setup script. Defaults to “/cvmfs/belle.kek.jp/grid/gbasf2/pro/bashrc”.task (optional) – An optional task object that may be used during proxy setup.
- Returns:
The DIRAC username extracted from the proxy information.
- Return type:
str
- Raises:
RuntimeError – If the username cannot be obtained from the proxy information.
- b2luigi.batch.processes.gbasf2.setup_dirac_proxy(gbasf2_setup_path='/cvmfs/belle.kek.jp/grid/gbasf2/pro/bashrc', task=None)[source]#
Ensures that a valid DIRAC proxy is initialized for the Belle II grid system. If a proxy is already active and has sufficient remaining lifetime, no action is taken. Otherwise, the function initializes a new proxy using the
gb2_proxy_init
command.- Parameters:
gbasf2_setup_path (str) – Path to the gbasf2 setup script. Defaults to “/cvmfs/belle.kek.jp/grid/gbasf2/pro/bashrc”.
task (optional) – Task-specific context or configuration, if applicable.
- Behavior:
Checks if an active proxy exists and has sufficient lifetime remaining.
If no valid proxy exists, initializes a new proxy using the
gb2_proxy_init
command.Prompts the user for the certificate password during proxy initialization.
Validates the proxy initialization process and handles errors such as incorrect passwords or other initialization issues.
- Raises:
RuntimeWarning – If the
gbasf2_proxy_lifetime
setting is not a positive integer.
Notes
The proxy lifetime and group name can be configured using the
gbasf2_proxy_lifetime
andgbasf2_proxy_group
settings, respectively.The function ensures sensitive information like passwords is securely handled and deleted after use.
- b2luigi.batch.processes.gbasf2.query_lpns(ds_query: str, dirac_user: str | None = None, gbasf2_setup_path: str = '/cvmfs/belle.kek.jp/grid/gbasf2/pro/bashrc', task=None) List[str] [source]#
Query DIRAC for Logical Physical Names (LPNs) matching the given dataset query.
This function interacts with the DIRAC system to retrieve a list of LPNs based on the provided dataset query string. It uses the gbasf2_ds_list.py script to perform the query and parses the output to return the results as a list of strings.
- Parameters:
ds_query (str) – The dataset query string to search for matching LPNs.
dirac_user (Optional[str]) – The DIRAC user to perform the query as. If not provided, it will be determined using the
get_dirac_user
function.gbasf2_setup_path (str) – The path to the
gbasf2
setup script. Defaults to “/cvmfs/belle.kek.jp/grid/gbasf2/pro/bashrc”.task – An optional task to associate with the query.
- Returns:
A list of LPNs matching the dataset query.
- Return type:
List[str]
- Raises:
TypeError – If the output of the query is not a list.
- b2luigi.batch.processes.gbasf2.get_unique_project_name(task)[source]#
Combine the
gbasf2_project_name_prefix
setting and thetask_id
hash to a unique project name.This is done to make sure that different instances of a task with different luigi parameters result in different gbasf2 project names. When trying to redoing a task on the grid with identical parameters, rename the project name prefix, to ensure that you get a new project.
- Parameters:
task – A task. The task must have a
gbasf2_project_name_prefix
setting, parameter, or attribute.- Returns:
A unique project name combining the prefix and a hashed task ID.
- Return type:
str
- Raises:
Exception – If the task does not have a
gbasf2_project_name_prefix
setting, parameter, or attribute.ValueError – If the generated project name exceeds the maximum allowed length (32 characters) or contains invalid characters (non-alphanumeric, excluding _ and -).
Notes
The task ID hash is generated using
hashlib.md5
for a consistent and compact representation.The maximum length of the project name is 32 characters.
Only alphanumeric characters, underscores (_), and hyphens (-) are allowed in the project name.
- b2luigi.batch.processes.gbasf2.lfn_follows_gb2v5_convention(lfn: str) bool [source]#
- Check if the LFN follows the convention of gbasf2 release 5, i.e.
<name>_<gbasf2param>_<jobID>_<rescheduleNum>.root
The expected naming convention is:
lfn (str): Logical file name, typically a file path on the grid.
- Returns:
True
if the LFN adheres to the gbasf2 release 5 naming convention,False
otherwise.- Return type:
bool
- b2luigi.batch.processes.gbasf2._get_lfn_upto_reschedule_number(lfn: str) str [source]#
Get a string of the gbasf2 v5 LFN upto the reschule number.
E.g. if the LFN is
<name>_<gbasf2param>_<jobID>_<rescheduleNum>.root
return``<name>_<gbasf2param>_<jobID>
.
- b2luigi.batch.processes.gbasf2.get_unique_lfns(lfns: Iterable[str]) Set[str] [source]#
From a list of gbasf2 Logical File Names (LFNs) that may include duplicates due to rescheduled jobs, return a filtered set containing only the LFNs corresponding to the jobs with the highest reschedule number.
Gbasf2 v5 outputs have the format:
<name>_<gbasf2param>_<jobID>_<rescheduleNum>.root
. When usinggb2_ds_list
, duplicates may appear where all parts are identical except for therescheduleNum
. This function ensures that only the LFNs with the highestrescheduleNum
are retained.If the dataset does not follow the gbasf2 v5 naming convention, it is assumed to be produced with an older release and does not contain duplicates.
- Parameters:
lfns (Iterable[str]) – A collection of LFNs to process.
- Returns:
A set of LFNs with duplicates removed, retaining only the ones with the highest reschedule number for each job.
- Return type:
Set[str]
- b2luigi.batch.processes.gbasf2._move_downloaded_dataset_to_output_dir(project_download_path: str, output_path: str) None [source]#
Move files downloaded downloaded to the grid to their final output location.
In the
Gbasf2Process
, the outputs are usually downloaded withgb2_ds_get
to a temporary directory (project_download_path
) with a structure like<result_dir>/B.root.partial/<project_name> ├── sub00/job_name*B.root ├── sub01/job_name*B.root ├── …
This function moves those files to their final
output_path
directory which has the same name as the original root file (e.g.B.root
) to fulfill the luigi output definition. This output directory has the structure<result_dir>/B.root/job_name*B.root
- Parameters:
project_download_path – Directory into which
gb2_ds_get
downloaded the grid dataset. The contents should besub<xy>
data blocks containing root files.output_path – Final output directory into which the ROOT files should be copied.
- b2luigi.batch.processes.gbasf2._split_all_extensions(path: str) Tuple[str, str] [source]#
Split all extensions from a string pathname.
Similar to
os.path.splitext
, but with the difference that the extensions-part is considered everything from the first non-leading dot to to the end. Leading dots at the beginning ofpath
are considered part of the stem and not considered an extensions separator.Returns
(stem, extensions)
.extensions
may be empty ifpath
does not contain non-leading dots.