Implemented Batch Processes

Contents

Implemented Batch Processes#

LSF#

class b2luigi.batch.processes.lsf.LSFJobStatusCache[source]#

Bases: BatchJobStatusCache

_ask_for_job_status(job_id=None)[source]#

Queries the job status from the LSF batch system and updates the internal job status mapping.

Parameters:

job_id (str, optional) – The ID of the job to query. If not provided, the status of all jobs will be queried.

Notes

  • This method uses the bjobs command-line tool to fetch job statuses in JSON format.

  • The output is expected to contain a “RECORDS” key with a list of job records.

  • Each job record should have “JOBID” and “STAT” keys, which are used to update the internal mapping.

class b2luigi.batch.processes.lsf.LSFProcess(*args, **kwargs)[source]#

Bases: BatchProcess

Reference implementation of the batch process for a LSF batch system. Heavily inspired by this post.

Additional to the basic batch setup (see Batch Processing), there are LSF-specific settings. These are:

  • the LSF queue: queue.

  • the number of slots for the job. On KEKCC this increases the memory available to the job: job_slots.

  • the LSF job name: job_name.

For example:

class MyLongTask(b2luigi.Task):
    queue = "l"
    job_name = "my_long_task"

The default queue is the short queue "s". If no job_name is set the task will appear as

<result_dir>/parameter1=value/.../executable_wrapper.sh"

when running bjobs.

By default, the environment variables from the scheduler are copied to the workers. This also implies we start in the same working directory, can reuse the same executable, etc. Normally, you do not need to supply env_script or alike.

get_job_status()[source]#

Retrieves the current status of the batch job associated with this instance.

Returns:

The status of the job, which can be one of the following:
  • JobStatus.successful: If the job has completed successfully (“DONE”).

  • JobStatus.aborted: If the job has been aborted or is not found in the cache (“EXIT” or missing ID).

  • JobStatus.running: If the job is still in progress.

Return type:

JobStatus

start_job()[source]#

Submits a batch job to the LSF system.

This method constructs a command to submit a job using the bsub command-line tool. It dynamically configures the job submission parameters based on task-specific settings and creates necessary log files for capturing standard output and error.

Raises:

RuntimeError – If the batch submission fails or the job ID cannot be extracted from the bsub command output.

Steps:
  1. Retrieve optional settings for queue (-q), job_slots (-n), and job_name (-J).

  2. The stdout and stderr log files are created in the task’s log directory. See get_log_file_dir.

  3. The executable is created with create_executable_wrapper.

terminate_job()[source]#

This method checks if a batch job ID is set. If it exists, it attempts to terminate the job using the bkill command. The command’s output is suppressed, and errors during execution are not raised.

HTCondor#

class b2luigi.batch.processes.htcondor.HTCondorJobStatusCache[source]#

Bases: BatchJobStatusCache

_ask_for_job_status(job_id: int = None)[source]#

With HTCondor, you can check the progress of your jobs using the condor_q command. If no JobId is given as argument, this command shows you the status of all queued jobs (usually only your own by default).

Normally the HTCondor JobID is stated as ClusterId.ProcId. Since only on job is queued per cluster, we can identify jobs by their ClusterId (The ProcId will be 0 for all submitted jobs). With the -json option, the condor_q output is returned in the JSON format. By specifying some attributes, not the entire job ClassAd is returned, but only the necessary information to match a job to its JobStatus. The output is given as string and cannot be directly parsed into a json dictionary. It has the following form:

[
    {...}
    ,
    {...}
    ,
    {...}
]

The {...} are the different dictionaries including the specified attributes. Sometimes it might happen that a job is completed in between the status checks. Then its final status can be found in the condor_history file (works mostly in the same way as condor_q). Both commands are used in order to find out the JobStatus.

_fill_from_output(output)[source]#

Processes the output from an HTCondor job query and updates the job statuses.

Parameters:

output (bytes) – The raw output from the HTCondor job query, encoded as bytes.

Returns:

A set of ClusterId values representing the jobs seen in the output.

Return type:

set

class b2luigi.batch.processes.htcondor.HTCondorJobStatus(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]#

Bases: IntEnum

See https://htcondor.readthedocs.io/en/latest/classad-attributes/job-classad-attributes.html

class b2luigi.batch.processes.htcondor.HTCondorProcess(*args, **kwargs)[source]#

Bases: BatchProcess

Reference implementation of the batch process for a HTCondor batch system.

Additional to the basic batch setup (see Batch Processing), additional HTCondor-specific things are:

  • Please note that most of the HTCondor batch farms do not have the same environment setup on submission and worker machines, so you probably want to give an env_script, an env setting and/or a different executable.

  • HTCondor supports copying files from submission to workers. This means if the folder of your script(s)/python project/etc. is not accessible on the worker, you can copy it from the submission machine by adding it to the setting transfer_files. This list can host both folders and files. Please note that due to HTCondors file transfer mechanism, all specified folders and files will be copied into the worker node flattened, so if you specify a/b/c.txt you will end up with a file c.txt. If you use the transfer_files mechanism, you need to set the working_dir setting to “.” as the files will end up in the current worker scratch folder. All specified files/folders should be absolute paths.

    Hint

    Please do not specify any parts or the full results folder. This will lead to unexpected behavior. We are working on a solution to also copy results, but until this the results folder is still expected to be shared.

    If you copy your python project using this setting to the worker machine, do not forget to actually set it up in your setup script. Additionally, you might want to copy your settings.json as well.

  • Via the htcondor_settings setting you can provide a dict as a for additional options, such as requested memory etc. Its value has to be a dictionary containing HTCondor settings as key/value pairs. These options will be written into the job submission file. For an overview of possible settings refer to the HTCondor documentation.

  • Same as for the LSF, the job_name setting allows giving a meaningful name to a group of jobs. If you want to be htcondor-specific, you can provide the JobBatchName as an entry in the htcondor_settings dict, which will override the global job_name setting. This is useful for manually checking the status of specific jobs with

    condor_q -batch <job name>
    

Example

 1import b2luigi
 2import random
 3
 4
 5class MyNumberTask(b2luigi.Task):
 6    some_parameter = b2luigi.IntParameter()
 7
 8    htcondor_settings = {"request_cpus": 1, "request_memory": "100 MB"}
 9
10    def output(self):
11        yield self.add_to_output("output_file.txt")
12
13    def run(self):
14        print("I am now starting a task")
15        random_number = random.random()
16
17        if self.some_parameter == 3:
18            raise ValueError
19
20        with open(self.get_output_file_name("output_file.txt"), "w") as f:
21            f.write(f"{random_number}\n")
22
23
24class MyAverageTask(b2luigi.Task):
25    htcondor_settings = {"request_cpus": 1, "request_memory": "200 MB"}
26
27    def requires(self):
28        for i in range(10):
29            yield self.clone(MyNumberTask, some_parameter=i)
30
31    def output(self):
32        yield self.add_to_output("average.txt")
33
34    def run(self):
35        print("I am now starting the average task")
36
37        # Build the mean
38        summed_numbers = 0
39        counter = 0
40        for input_file in self.get_input_file_names("output_file.txt"):
41            with open(input_file, "r") as f:
42                summed_numbers += float(f.read())
43                counter += 1
44
45        average = summed_numbers / counter
46
47        with open(self.get_output_file_name("average.txt"), "w") as f:
48            f.write(f"{average}\n")
49
50
51if __name__ == "__main__":
52    b2luigi.process(MyAverageTask(), workers=200, batch=True)
get_job_status()[source]#

Determines the status of a batch job based on its HTCondor job status.

Returns:

The status of the job, which can be one of the following:
  • JobStatus.successful: If the HTCondor job status is ‘completed’.

  • JobStatus.running: If the HTCondor job status is one of ‘idle’, ‘running’, ‘transferring_output’, or ‘suspended’.

  • JobStatus.aborted: If the HTCondor job status is ‘removed’, ‘held’, ‘failed’, or if the job ID is not found in the cache.

Return type:

JobStatus

Raises:

ValueError – If the HTCondor job status is unknown.

start_job()[source]#

Starts a job by creating and submitting an HTCondor submit file.

This method generates an HTCondor submit file using the _create_htcondor_submit_file method, then submits the job using the condor_submit command.

Raises:

RuntimeError – If the batch submission fails or the job ID cannot be extracted from the condor_submit output.

terminate_job()[source]#

Terminates a batch job managed by HTCondor.

This method checks if a batch job ID is available. If a valid job ID exists, it executes the condor_rm command to remove the job from the HTCondor queue.

_create_htcondor_submit_file()[source]#

Creates an HTCondor submit file for the current task.

This method generates the content of an HTCondor submit file based on the task’s configuration and writes it to a file.

Returns:

The path to the generated HTCondor submit file.

Return type:

str

Raises:

ValueError – If transfer_files contains non-absolute file paths or if working_dir is not explicitly set to ‘.’ when using transfer_files.

Note

  • The stdout and stderr log files are created in the task’s log directory. See get_log_file_dir.

  • The HTCondor settings are specified in the htcondor_settings setting, which is a dictionary of key-value pairs.

  • The executable is created with create_executable_wrapper.

  • The transfer_files setting can be used to specify files or directories to be transferred to the worker node.

  • The job_name setting can be used to specify a meaningful name for the job.

  • The submit file is named job.submit and is created in the task’s output directory (get_task_file_dir).

Slurm#

class b2luigi.batch.processes.slurm.SlurmJobStatusCache[source]#

Bases: BatchJobStatusCache

_ask_for_job_status(job_id: int = None)[source]#

With Slurm, you can check the progress of your jobs using the squeue command. If no jobID is given as argument, this command shows you the status of all queued jobs.

Sometimes it might happen that a job is completed in between the status checks. Then its final status can be found using sacct (works mostly in the same way as squeue).

If in the unlikely case the server has the Slurm accounting disabled, then the scontrol command is used as a last resort to access the jobs history. This is the fail safe command as the scontrol by design only holds onto a jobs information for a short period of time after completion. The time between status checks is sufficiently short however so the scontrol command should still have the jobs information on hand.

All three commands are used in order to find out the SlurmJobStatus.

_fill_from_output(output: str) set[source]#

Parses the output of a Slurm command to extract job IDs and their states, updating the internal job status mapping and returning a set of seen job IDs.

Parameters:

output (str) – The output string from a Slurm command, expected to be formatted as ‘<job id> <state>’ per line.

Returns:

A set of job IDs that were parsed from the output.

Return type:

set

Raises:

AssertionError – If a line in the output does not contain exactly two entries (job ID and state).

Notes

  • If the output is empty, an empty set is returned.

  • Lines in the output that are empty or contain unexpected formatting are skipped.

  • Job states with a ‘+’ suffix (e.g., ‘CANCELLED+’) are normalized by stripping the ‘+’ character.

_get_SlurmJobStatus_from_string(state_string: str) str[source]#

Converts a state string into a SlurmJobStatus enumeration value.

Parameters:

state_string (str) – The state string to be converted.

Returns:

The corresponding SlurmJobStatus value.

Return type:

str

Raises:

KeyError – If the provided state string does not match any valid SlurmJobStatus.

_check_if_sacct_is_disabled_on_server() bool[source]#

Checks if the Slurm accounting command sacct is disabled on the system.

This method determines whether the sacct command is unavailable or disabled by attempting to execute it and analyzing the output. The result is cached in the self.sacct_disabled attribute to avoid repeated checks.

Returns:

True if sacct is disabled on the system, False otherwise.

Return type:

bool

class b2luigi.batch.processes.slurm.SlurmJobStatus(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]#

Bases: Enum

See https://slurm.schedmd.com/job_state_codes.html TODO: make this a StrEnum with python>=3.11

completed#

The job has completed successfully.

Type:

str

pending#

The job is waiting to be scheduled.

Type:

str

running#

The job is currently running.

Type:

str

suspended#

The job has been suspended.

Type:

str

preempted#

The job has been preempted by another job.

Type:

str

completing#

The job is in the process of completing.

Type:

str

boot_fail#

The job failed during the boot process.

Type:

str

cancelled#

The job was cancelled by the user or system.

Type:

str

deadline#

The job missed its deadline.

Type:

str

node_fail#

The job failed due to a node failure.

Type:

str

out_of_memory#

The job ran out of memory.

Type:

str

failed#

The job failed for an unspecified reason.

Type:

str

timeout#

The job exceeded its time limit.

Type:

str

class b2luigi.batch.processes.slurm.SlurmProcess(*args, **kwargs)[source]#

Bases: BatchProcess

Reference implementation of the batch process for a Slurm batch system.

Additional to the basic batch setup (see Batch Processing), additional Slurm-specific things are:

  • Please note that most of the Slurm batch farms by default copy the user environment from the submission node to the worker machine. As this can lead to different results when running the same tasks depending on your active environment, you probably want to pass the argument export=NONE. This ensures that a reproducible environment is used. You can provide an env_script, an env setting, and/or a different executable to create the environment necessary for your task.

  • Via the slurm_settings setting you can provide a dict for additional options, such as requested memory etc. Its value has to be a dictionary containing Slurm settings as key/value pairs. These options will be written into the job submission file. For an overview of possible settings refer to the Slurm documentation <https://slurm.schedmd.com/sbatch.html#>_ and the documentation of the cluster you are using.

  • Same as for the LSF and HTCondor, the job_name setting allows giving a meaningful name to a group of jobs. If you want to be task-instance-specific, you can provide the job-name as an entry in the slurm_settings dict, which will override the global job_name setting. This is useful for manually checking the status of specific jobs with

    squeue --name <job name>
    

Example

 1import b2luigi
 2import random
 3
 4
 5class MyNumberTask(b2luigi.Task):
 6    some_parameter = b2luigi.IntParameter()
 7    batch_system = "slurm"
 8    slurm_settings = {"export": "NONE", "ntasks": 1, "mem": "100MB"}
 9
10    def output(self):
11        yield self.add_to_output("output_file.txt")
12
13    def run(self):
14        print("I am now starting a task")
15        random_number = random.random()
16
17        with open(self.get_output_file_name("output_file.txt"), "w") as f:
18            f.write(f"{random_number}\n")
19
20
21class MyAverageTask(b2luigi.Task):
22    batch_system = "slurm"
23    slurm_settings = {"export": "NONE", "ntasks": 1, "mem": "100MB"}
24
25    def requires(self):
26        for i in range(10):
27            yield self.clone(MyNumberTask, some_parameter=i)
28
29    def output(self):
30        yield self.add_to_output("average.txt")
31
32    def run(self):
33        print("I am now starting the average task")
34
35        # Build the mean
36        summed_numbers = 0
37        counter = 0
38        for input_file in self.get_input_file_names("output_file.txt"):
39            with open(input_file, "r") as f:
40                summed_numbers += float(f.read())
41                counter += 1
42
43        average = summed_numbers / counter
44
45        with open(self.get_output_file_name("average.txt"), "w") as f:
46            f.write(f"{average}\n")
47
48
49if __name__ == "__main__":
50    b2luigi.process(MyAverageTask(), workers=200, batch=True)
get_job_status()[source]#

Determine the status of a batch job based on its Slurm job status.

Returns:

The status of the job, which can be one of the following:
  • JobStatus.successful: If the job has completed successfully.

  • JobStatus.running: If the job is currently running, pending, suspended, preempted, or completing.

  • JobStatus.aborted: If the job has failed, been cancelled, exceeded its deadline, encountered a node failure, ran out of memory, timed out, or if the job ID is not found.

Return type:

JobStatus

Raises:

ValueError – If the Slurm job status is unknown or not handled.

start_job()[source]#

Starts a job by submitting the Slurm submission script.

This method creates a Slurm submit file and submits it using the sbatch command. After submission, it parses the output to extract the batch job ID.

Raises:

RuntimeError – If the batch submission fails or the job ID cannot be extracted.

self._batch_job_id#

The ID of the submitted Slurm batch job.

Type:

int

terminate_job()[source]#

Terminates a batch job if a job ID is available.

This method checks if a batch job ID is set. If it is, it executes the scancel command to terminate the job associated with the given batch job ID. The command’s output is suppressed.

_create_slurm_submit_file()[source]#

Creates a Slurm submit file for the current task.

This method generates a Slurm batch script that specifies the necessary configurations for submitting a job to a Slurm workload manager.

Returns:

The path to the generated Slurm submit file.

Return type:

pathlib.Path

Note

  • The stdout and stderr log files are created in the task’s log directory. See get_log_file_dir.

  • The Slurm settings are specified in the slurm_settings setting, which is a dictionary of key-value pairs.

  • The job_name setting can be used to specify a meaningful name for the job.

  • The executable is created with create_executable_wrapper.

  • The submit file is named slurm_parameters.sh and is created in the task’s output directory (get_task_file_dir).

gbasf2#

class b2luigi.batch.processes.gbasf2.Gbasf2Process(*args, **kwargs)[source]#

Bases: BatchProcess

Batch process for working with gbasf2 projects on the LHC Computing Grid (LCG).

Features
  • ``gbasf2`` project submission

    The gbasf2 batch process takes the basf2 path returned by the create_path method of the task, saves it into a pickle file to the disk and creates a wrapper steering file that executes the saved path. Any basf2 variable aliases added in the Path() or create_path() method are also stored in the pickle file. It then sends both the pickle file and the steering file wrapper to the grid via the Belle II-specific DIRAC-wrapper gbasf2. However, b2luigi supports the submission of custom steering files with the setting gbasf2_custom_steering_file. This conserves the way that the basf2 path os still contained in the create_path() method. In this instance, b2luigi``checks automatically if the corresponding file exists and copies it into the active directory. The ``gbasf2 task is then set to submit the unpickled file to the grid job which allows the utilization of python-based basf2 modules.

  • Project status monitoring

    After the project submission, the gbasf batch process regularly checks the status of all the jobs belonging to a gbasf2 project returns a success if all jobs had been successful, while a single failed job results in a failed project. You can close a running b2luigi process and then start your script again and if a task with the same project name is running, this b2luigi gbasf2 wrapper will recognize that and instead of resubmitting a new project, continue monitoring the running project.

    Hint

    The outputs of gbasf2 tasks can be a bit overwhelming, so I recommend using the central scheduler which provides a nice overview of all tasks in the browser, including a status/progress indicator how many jobs in a gbasf2 project are already done.

  • Automatic download of datasets and logs

    If all jobs had been successful, it automatically downloads the output dataset and the log files from the job sandboxes and automatically checks if the download was successful before moving the data to the final location. On failure, it only downloads the logs. The dataset download can be optionally disabled.

  • Automatic rescheduling of failed jobs

    Whenever a job fails, gbasf2 reschedules it as long as the number of retries is below the value of the setting gbasf2_max_retries. It keeps track of the number of retries in a local file in the log_file_dir, so that it does not change if you close b2luigi and start it again. Of course it does not persist if you remove that file or move to a different machine.

Note

Despite all the automatization that this gbasf2 wrapper provides, the user is expected to have a basic understanding of how the grid works and know the basics of working with gbasf2 commands manually.

Caveats
  • The gbasf2 batch process for luigi can only be used for tasks inheriting from Basf2PathTask or other tasks with a create_path() method that returns a basf2 path.

  • It can be used only for picklable ``basf2`` paths, with only some limited global basf2 state saved (currently aliases and global tags). The batch process stores the path created by create_path in a python pickle file and runs that on the grid. Therefore, python ``basf2`` modules are not yet supported. To see if the path produced by a steering file is picklable, you can try to dump it with basf2 --dump-path and execute it again with basf2 --execute-path. In case the steering file contains content (e.g. modules) that cannot be pickled, the feature setting gbasf2_custom_steering_file can be utilized which has to be set to the path of the steering file the user wishes to be used. This submits the custom steering file to the grid job. The specific use case for this is the usage and interaction with python-based basf2 modules that are not pickable.

  • Output format: Changing the batch to gbasf2 means you also have to adapt how you handle the output of your gbasf2 task in tasks depending on it, because the output will not be a single root file anymore (e.g. B_ntuple.root), but a collection of root files, one for each file in the input data set, in a directory with the base name of the root files, e.g.:

    <task output directory>
                ├── B_ntuple.root
                │   └── B_ntuple_0.root
                │   └── B_ntuple_1.root
                │   └── ...
                ├── D_ntuple.root
                │   └── D_ntuple_0.root
                │   └── ...
    
Settings for gbasf2 tasks:

To submit a task with the gbasf2 wrapper, you first you have to add the property batch_system = "gbasf2", which sets the batch_system setting. It is not recommended to set that setting globally, as not all tasks can be submitted to the grid, but only tasks with a create_path method.

For gbasf2 tasks it is further required to set the settings

  • gbasf2_input_dataset: String with the logical path of a dataset on the grid to use as an input to the task. You can provide multiple inputs by having multiple paths contained in this string, separated by commas without spaces. An alternative is to just instantiate multiple tasks with different input datasets, if you want to know in retrospect which input dataset had been used for the production of a specific output.

  • gbasf2_input_dslist: Alternatively to gbasf2_input_dataset, you can use this setting to provide a text file containing the logical grid path names, one per line.

  • gbasf2_project_name_prefix: A string with which your gbasf2 project names will start. To ensure the project associate with each unique task (i.e. for each of luigi parameters) is unique, the unique task.task_id is hashed and appended to the prefix to create the actual gbasf2 project name. Should be below 22 characters so that the project name with the hash can remain under 32 characters.

The following example shows a minimal class with all required options to run on the gbasf2/grid batch:

class MyTask(Basf2PathTask):
    batch_system = "gbasf2"
    gbasf2_project_name_prefix = b2luigi.Parameter(significant=False)
    gbasf2_input_dataset = b2luigi.Parameter(hashed=True)

Other not required, but noteworthy settings are:

  • gbasf2_setup_path: Path to gbasf2 environment setup script that needs so be sourced to run gbasf2 commands. Defaults to /cvmfs/belle.kek.jp/grid/gbasf2/pro/bashrc.

  • gbasf2_release: Defaults to the release of your currently set up basf2 release. Set this if you want the jobs to use another release on the grid.

  • gbasf2_proxy_lifetime: Defaults to 24. When initializing a proxy, set the lifetime to this number of hours.

  • gbasf2_min_proxy_lifetime: Defaults to 0. During processing, prompt user to reinitialize proxy if remaining proxy lifetime drops below this number of hours.

  • gbasf2_print_status_updates: Defaults to True. By setting it to False you can turn off the printing of of the job summaries, that is the number of jobs in different states in a gbasf2 project.

  • gbasf2_max_retries: Default to 0. Maximum number of times that each job in the project can be automatically rescheduled until the project is declared as failed.

  • gbasf2_proxy_group: Default to "belle". If provided, the gbasf2 wrapper will work with the custom gbasf2 group, specified in this parameter. No need to specify this parameter in case of usual physics analysis at Belle II. If specified, one has to provide gbasf2_project_lpn_path parameter.

  • gbasf2_project_lpn_path: Path to the LPN folder for a specified gbasf2 group. The parameter has no effect unless the gbasf2_proxy_group is used with non-default value.

  • gbasf2_jinja_template_path: This parameter sets a custom basf2 steering template where the user can adapt the default template (e.g. for altering the pdg database, …). Note that this is an expert option that should be treated with care.

  • gbasf2_additional_download_params: Defaults to "--new". This parameter sets additional parameters that are given to gb2_ds_get. Note that in case you override the parameter, the --new parameter is not automatically set, so you might have to manually add --new if you want this parameter to be used.

  • gbasf2_download_dataset: Defaults to True. Disable this setting if you don’t want to download the output dataset from the grid on job success. As you can’t use the downloaded dataset as an output target for luigi, you should then use the provided Gbasf2GridProjectTarget, as shown in the following example:

    from b2luigi.batch.processes.gbasf2 import get_unique_project_name, Gbasf2GridProjectTarget
    
    class MyTask(Basf2PathTask):
        # [...]
        def output(self):
            project_name = get_unique_project_name(self)
            return Gbasf2GridProjectTarget(project_name)
    

    This is useful when chaining gbasf2 tasks together, as they don’t need the output locally but take the grid datasets as input. Also useful when you just want to produce data on the grid for other people to use.

    Caution

    For Gbasf2GridProjectTarget``s the global and the task ``gbasf2_proxy_group cannot differ.

    Tip

    Set only one global gbasf2_proxy_group setting.

  • gbasf2_download_logs: Whether to automatically download the log output of gbasf2 projects when the task succeeds or fails. Having the logs is important for reproducibility.

  • gbasf2_custom_steering_file: Optional path to submit a custom steering file to gbasf2. This does not pickle the basf2.Path instance and allows the utilization of python-based basf2 modules. Named modules have to be contained either in the steering file itself or by additional files via the input sandbox.

The following optional settings correspond to the equally named gbasf command line options (without the gbasf_ prefix) that you can set to customize your gbasf2 project:

gbasf2_noscout, gbasf2_additional_files, gbasf2_input_datafiles, gbasf2_n_repition_job, gbasf2_force_submission, gbasf2_cputime, gbasf2_evtpersec, gbasf2_priority, gbasf2_jobtype, gbasf2_basf2opt, gbasf2_lfn_sandboxfiles, gbasf2_input_grouping

It is further possible to append arbitrary command line arguments to the gbasf2 submission command with the gbasf2_additional_params setting. If you want to blacklist a grid site, you can e.g. add

b2luigi.set_setting("gbasf2_additional_params",  "--banned_site LCG.KEK.jp")
Example

Here is an example file to submit an analysis path created by the script in examples/gbasf2/example_mdst_analysis to grid via gbasf2:

File: examples/gbasf2/gbasf2_example.py#
 1import b2luigi
 2from b2luigi.basf2_helper.tasks import Basf2PathTask
 3
 4import example_mdst_analysis
 5
 6
 7class AnalysisTask(Basf2PathTask):
 8    # set the batch_system property to use the gbasf2 wrapper batch process for this task
 9    batch_system = "gbasf2"
10    # Must define a prefix for the gbasf2 project name to submit to the grid.
11    # b2luigi will then add a hash derived from the luigi parameters to create a unique project name.
12    gbasf2_project_name_prefix = b2luigi.Parameter()
13    gbasf2_input_dataset = b2luigi.Parameter(hashed=True)
14    # Example luigi cut parameter to facilitate starting multiple projects for different cut values
15    mbc_lower_cut = b2luigi.IntParameter()
16
17    def create_path(self):
18        mbc_range = (self.mbc_lower_cut, 5.3)
19        return example_mdst_analysis.create_analysis_path(
20            d_ntuple_filename="D_ntuple.root", b_ntuple_filename="B_ntuple.root", mbc_range=mbc_range
21        )
22
23    def output(self):
24        yield self.add_to_output("D_ntuple.root")
25        yield self.add_to_output("B_ntuple.root")
26
27
28class AnalysisWrapperTask(b2luigi.WrapperTask):
29    """
30    We use the AnalysisWrapperTask to be able to require multiple analyse tasks with
31    different input datasets and cut values. For each parameter combination, a
32    different gbasf2 project will be submitted.
33    """
34
35    def requires(self):
36        input_dataset = (
37            "/belle/MC/release-04-01-04/DB00000774/SkimM13ax1/prod00011778/e1003/4S/r00000/mixed/11180100/udst/sub00/"
38            "udst_000006_prod00011778_task10020000006.root"
39        )
40        # if you want to iterate over different cuts, just add more values to this list
41        mbc_lower_cuts = [5.15, 5.2]
42        for mbc_lower_cut in mbc_lower_cuts:
43            yield AnalysisTask(
44                mbc_lower_cut=mbc_lower_cut,
45                gbasf2_project_name_prefix="luigiExample",
46                gbasf2_input_dataset=input_dataset,
47                max_event=100,
48            )
49
50
51if __name__ == "__main__":
52    main_task_instance = AnalysisWrapperTask()
53    n_gbasf2_tasks = len(list(main_task_instance.requires()))
54    b2luigi.process(main_task_instance, workers=n_gbasf2_tasks)
Handling failed jobs

The gbasf2 input wrapper considers the gbasf2 project as failed if any of the jobs in the project failed and reached the maximum number of retries. It then automatically downloads the logs, so please look into them to see what the reason was. For example, it can be that only certain grid sites were affected, so you might want to exclude them by adding the "--banned_site ... to gbasf2_additional_params.

You also always reschedule jobs manually with the gb2_job_reschedule command or delete them with gb2_job_delete so that the gbasf2 batch process doesn’t know they ever existed. Then run just run your luigi task/script again and it will start monitoring the running project again.

pickle_file_path#

file name in which the pickled basf2 path from self.task.create_path() will be stored

wrapper_file_path#

file name for steering file that executes pickled path, which will be send to the grid

max_retries#

Maximum number of times that each job in the project can be rescheduled until the project is declared as failed.

gbasf2_custom_steering_file#

Setting to incorporate custom steering files into b2luigi.

n_retries_by_job#

Store number of times each job had been rescheduled

get_job_status()[source]#

Get the overall status of the gbasf2 project.

This method determines the current status of all jobs in a gbasf2 project (see get_gbasf2_project_job_status_dict). and returns an overall project status. It uses cached job status data if available and updates it if necessary. The method also handles job rescheduling in case of failures and updates task progress and status messages for the central scheduler.

Returns:

The overall status of the gbasf2 project. Possible values are:
  • JobStatus.running: If the project is still in progress.

  • JobStatus.successful: If all jobs in the project are completed successfully.

  • JobStatus.aborted: If any job in the project fails and cannot be rescheduled.

Return type:

JobStatus

Raises:

RuntimeError – If the job status cannot be determined.

Notes

  • The job status is cached for 5 minutes to optimize performance.

  • Failed jobs are rescheduled up to a maximum number of retries before the project is marked as aborted.

  • The project is considered successful only if all jobs are completed successfully.

  • Progress and status updates are logged if gbasf2_print_status_updates is enabled.

_on_first_success_action()[source]#

Executes actions to be performed after all jobs in the project have successfully completed.

This method checks specific settings to determine whether to download logs and datasets associated with the project. If the respective settings are enabled, it triggers the corresponding download operations.

Note

  • Downloads logs if the “gbasf2_download_logs” setting is enabled.

  • Downloads the dataset if the “gbasf2_download_dataset” setting is enabled.

_on_failure_action()[source]#

Handles actions to be performed after a project failure.

This method retrieves the job status dictionary for the specified gbasf2 project and identifies failed jobs. A job is considered failed if its status is “Failed” or if its status is “Done” but its application status is not “Done”. The number of failed jobs and their details are printed. If the setting gbasf2_download_logs is enabled, it triggers the download of logs for further investigation.

_reschedule_failed_jobs()[source]#

Attempts to reschedule failed jobs in the project if the maximum number of retries (gbasf2_max_retries) has not been reached.

This method evaluates the status of jobs in the project and determines whether they should be rescheduled or if they have hit the maximum retry limit. Jobs that are eligible for rescheduling are added to a rescheduling queue, while jobs that have reached the retry limit are logged with a warning.

Returns:

  • True if all failed jobs were successfully rescheduled or no jobs needed rescheduling.

  • False if any jobs reached the maximum retry limit.

Return type:

bool

Raises:

RuntimeWarning – If any jobs have reached the maximum retry limit.

Notes

  • A job is considered failed if its status is “Failed” or if its status is “Done” but its application status is not “Done”.

  • The method relies on get_gbasf2_project_job_status_dict to retrieve the current status of jobs and _reschedule_jobs to perform the rescheduling.

_reschedule_jobs(job_ids)[source]#

Reschedule a list of jobs by their IDs.

This method takes a list of job IDs, logs the rescheduling process, and executes a command to reschedule the specified jobs using the gb2_job_reschedule utility. It also includes the number of retries for each job in the log output.

Parameters:

job_ids (list of str) – A list of job IDs to be rescheduled.

Raises:

KeyError – If a job ID in the list is not found in self.n_retries_by_job.

Notes

  • The gb2_job_reschedule command is executed with the –force flag.

  • The run_with_gbasf2 function is used to execute the rescheduling command.

start_job()[source]#

Submit a new gbasf2 project to the grid.

This method checks if a project with the specified name already exists on the grid. If it does, the submission is skipped, and a message is printed. Otherwise, it prepares the necessary steering files and submits the project.

Steps:
  1. Check if the project already exists on the grid. (see check_project_exists)

  2. Prepare the steering file:
  3. Build the gbasf2 submission command. (see _build_gbasf2_submit_command)

  4. Create a symlink for the pickle file to ensure it is included in the grid input sandbox.

  5. Submit the project using the gbasf2 command. (see run_with_gbasf2)

  6. Clean up the symlink after submission.

Raises:

OSError – If there is an issue creating or removing the symlink.

Note

If the project already exists, the user is advised to change the project name to submit a new project if needed.

terminate_job()[source]#

Terminate a gbasf2 project if it exists.

This method checks if the specified gbasf2 project exists. If it does, it terminates the project using the gb2_job_kill command with the --force option. Terminated jobs are not removed from the job database and can be restarted if needed.

Note

The gb2_job_delete command differs from gb2_job_kill in that deleted jobs are permanently removed from the job database, while terminated jobs remain in the database.

_build_gbasf2_submit_command()[source]#

Constructs the gbasf2 submit command string based on task options and attributes.

This method generates a command string to submit a gbasf2 job, incorporating various settings and parameters. It validates inputs, handles optional parameters, and ensures the command is properly formatted for execution.

Returns:

A list of command-line arguments for the gbasf2 submission command.

Return type:

list

Raises:
  • ValueError – If gbasf2_additional_files is not an iterable or is a string.

  • RuntimeError – If both gbasf2_input_dataset and gbasf2_input_dslist are set, or if neither is set.

  • FileNotFoundError – If the file specified in gbasf2_input_dslist does not exist.

  • ValueError – If priority is not an integer between 0 and 10.

  • ValueError – If gbasf2_proxy_group is non-default and gbasf2_project_lpn_path is not provided.

Notes

  • The method uses various task settings to construct the command, such as: gbasf2_release, gbasf2_additional_files, gbasf2_input_dataset, gbasf2_input_dslist, gbasf2_n_repition_job, gbasf2_input_datafiles, gbasf2_force_submission, gbasf2_cputime, gbasf2_evtpersec, gbasf2_priority, gbasf2_jobtype, gbasf2_basf2opt, and gbasf2_additional_params.

  • If the proxy group is not "belle", an output dataset path must be specified.

_write_path_to_file()[source]#

Serializes and saves a basf2.Path object and variable aliases to a pickle file.

This method attempts to create a basf2.Path object using the b2luigi.basf2_helper.Basf2PathTask.create_path() method of the associated task. The resulting path, along with any variable aliases from the current variable manager instance, is serialized and written to a specified pickle file.

Raises:

Exception – If the associated task does not have a create_path() method, indicating it is not an instance of :class`Basf2PathTask` or a compatible class.

Dependencies:
  • write_path_and_state_to_file: Used to perform the serialization and file writing.

_create_wrapper_steering_file()[source]#

Generates a steering file for grid execution by processing a Jinja2 template.

This method creates a steering file that executes the pickled basf2 path generated by b2luigi.basf2_helper.Basf2PathTask.create_path(). It reads a Jinja2 template, replaces placeholders with appropriate values, and writes the processed template to a new file for grid submission.

The following steps are performed: 1. Reads the Jinja2 template file specified by the gbasf2_jinja_template_path setting. 2. Replaces template variables such as the pickle file path and maximum event count. 3. Writes the processed template to self.wrapper_file_path.

_copy_custom_steering_script()[source]#

Copies a custom gbasf2 steering script to the specified wrapper file path.

This method checks if the custom gbasf2 steering file exists at the specified location. If it exists, the file is copied to the wrapper file path. If the file does not exist, a ValueError is raised.

Raises:

ValueError – If the custom gbasf2 steering file does not exist at the specified path.

_get_gbasf2_dataset_query(output_file_name: str) str[source]#

Constructs a query string to retrieve a subset of files from a grid project associated with the current task, based on the provided output file name.

This method generates a wildcard pattern for use with grid commands such as gb2_ds_list or gb2_ds_get to locate files matching the specified output file name within the project’s directory structure.

Parameters:

output_file_name (str) – The name of the output file, which must be a basename (not a path) and should end with the “.root” extension.

Returns:

A query string containing the wildcard pattern to locate the desired files in the grid project.

Return type:

str

Raises:

ValueError – If the provided output_file_name is not a basename, does not end with “.root”, or if required settings for non-default proxy groups are missing.

static _get_project_download_path(base_output_dir: str)[source]#

Get the location of the .root files of the downloaded project :param base_output_dir: base directory for the gbasf2 download

Returns:

str

_local_gb2_dataset_is_complete(output_file_name: str, check_temp_dir: bool = False) bool[source]#

Helper method that returns True if the download of the gbasf2 dataset for the output output_file_name is complete.

Parameters:
  • output_file_name – Output file name, must be a root file, e.g. ntuple.root. Usually defined by the user via b2luigi.Task.add_to_output() in the b2luigi.Task.output() method.

  • check_temp_dir – Instead of checking the final output path, check whether the download into the temporary (“partial”) directory is complete. This function is usually called with this argument set to True, to check whether the dataset can be moved to its final output path.

_download_dataset()[source]#

Downloads the task outputs from the gbasf2 project dataset.

This method ensures that all files matching the naming pattern name_*.root from the grid dataset corresponding to the project name are downloaded into a specified directory. The download process is handled in a way that prevents marking the task as complete if the download fails.

Steps: 1. Checks if the dataset exists on the grid for the specified project name. 2. For each task output, constructs a query string to locate the dataset files. 3. Skips downloading if the dataset already exists locally and is complete. 4. Downloads the dataset into a temporary directory to ensure atomicity. 5. Handles failed downloads by retrying only the missing files. 6. Verifies the completeness of the downloaded dataset. 7. Moves the successfully downloaded dataset to the final output directory.

Raises:
  • RuntimeError – If the dataset does not exist on the grid.

  • RuntimeError – If no output data is found for the specified project.

  • RuntimeError – If the downloaded dataset is incomplete.

Notes

  • Temporary directories are used to avoid marking the task as complete prematurely.

  • Failed downloads are tracked and retried using a monitoring file.

  • Additional parameters for the gb2_ds_get command can be configured via task settings.

_download_logs()[source]#

Download sandbox files from grid with logs for each job in the gbasf2 project.

It wraps gb2_job_output, which downloads the job sandbox, which has the following structure:

log
└── <project name>
    ├── <first job id>
    │   ├── job.info
    │   ├── Script1_basf2helper.py.log # basf2 outputs
    │   └── std.out
    ├── <second job id>
    │   ├── ...
    ...

These are stored in the task log dir.

class b2luigi.batch.processes.gbasf2.Gbasf2GridProjectTarget(project_name, dirac_user=None, gbasf2_setup_path='/cvmfs/belle.kek.jp/grid/gbasf2/pro/bashrc')[source]#

Bases: Target

Gbasf2GridProjectTarget is a custom Luigi Target that checks the existence and status of a dataset produced by a gbasf2 grid project. It ensures that the dataset exists on the grid and that all jobs associated with the project have completed successfully.

project_name#

Name of the gbasf2 grid project that produced the dataset and under which the dataset is stored.

Type:

str

dirac_user#

Dirac user who produced the output dataset. If None, the current user is used.

Type:

str, optional

gbasf2_setup_path#

Path to the gbasf2 setup file. Defaults to “/cvmfs/belle.kek.jp/grid/gbasf2/pro/bashrc”.

Type:

str

exists()[source]#

Checks the existence and status of a dataset or project on the grid.

This method performs the following checks:

  1. Verifies if a dataset associated with the given project name exists on the grid. (see check_dataset_exists_on_grid)

  2. If a dataset exists, checks whether a project with the same name exists on the grid. (see check_project_exists)

  3. Ensures that no jobs are actively writing to the project by verifying that all jobs associated with the project have a status of “Done” and an application status of “Done”.

Returns:

True if the dataset exists, the project exists, and no jobs are actively writing to it. False otherwise.

Return type:

bool

b2luigi.batch.processes.gbasf2.check_dataset_exists_on_grid(gbasf2_project_name, dirac_user=None, gbasf2_setup_path='/cvmfs/belle.kek.jp/grid/gbasf2/pro/bashrc', task=None)[source]#

Check if an output dataset exists for the specified gbasf2 project on the grid.

Parameters:
  • gbasf2_project_name (str) – The name of the gbasf2 project to check.

  • dirac_user (str, optional) – The DIRAC user to use for querying. Defaults to None.

  • gbasf2_setup_path (str, optional) – The path to the gbasf2 setup script. Defaults to “/cvmfs/belle.kek.jp/grid/gbasf2/pro/bashrc”.

  • task (object, optional) – An optional task object for retrieving settings. Defaults to None.

Returns:

True if the dataset exists on the grid, False otherwise.

Return type:

bool

Raises:

ValueError – If gbasf2_proxy_group is set to a non-default value (“belle”) and gbasf2_project_lpn_path is not provided in the settings.

b2luigi.batch.processes.gbasf2.get_gbasf2_project_job_status_dict(gbasf2_project_name, dirac_user=None, gbasf2_setup_path='/cvmfs/belle.kek.jp/grid/gbasf2/pro/bashrc', task=None)[source]#

Returns a dictionary for all jobs in the project with a structure like the following, which I have taken and adapted from an example output:

{
    "<JobID>": {
        "SubmissionTime": "2020-03-27 13:08:49",
        "Owner": "<dirac username>",
        "JobGroup": "<ProjectName>",
        "ApplicationStatus": "Done",
        "HeartBeatTime": "2020-03-27 16:01:39",
        "Site": "LCG.KEK.jp",
        "MinorStatus": "Execution Complete",
        "LastUpdateTime": "2020-03-27 16:01:40",
        "Status": "<Job Status>"
    }
...
}

For that purpose, the script in gbasf2_job_status.py is called. That script directly interfaces with Dirac via its API, but it only works with the gbasf2 environment and python2, which is why it is called as a subprocess. The job status dictionary is passed to this function via json.

b2luigi.batch.processes.gbasf2.check_project_exists(gbasf2_project_name, dirac_user=None, gbasf2_setup_path='/cvmfs/belle.kek.jp/grid/gbasf2/pro/bashrc', task=None)[source]#

Check if a gbasf2 project exists on the grid.

This function verifies the existence of a gbasf2 project by attempting to retrieve its job status using the get_gbasf2_project_job_status_dict function. If the retrieval fails due to a RuntimeError, the function returns False.

Parameters:
  • gbasf2_project_name (str) – The name of the gbasf2 project to check.

  • dirac_user (str, optional) – The DIRAC user associated with the project. Defaults to None.

  • gbasf2_setup_path (str, optional) – The path to the gbasf2 setup script. Defaults to “/cvmfs/belle.kek.jp/grid/gbasf2/pro/bashrc”.

  • task (optional) – Additional task information to pass to the status retrieval function. Defaults to None.

Returns:

True if the project exists on the grid, False otherwise.

Return type:

bool

b2luigi.batch.processes.gbasf2.run_with_gbasf2(cmd, *args, ensure_proxy_initialized=True, check=True, encoding='utf-8', capture_output=False, gbasf2_setup_path='/cvmfs/belle.kek.jp/grid/gbasf2/pro/bashrc', task=None, **kwargs)[source]#

Call a command in a subprocess with the gbasf2 environment.

This function wraps the execution of a command in a subprocess, ensuring that the gbasf2 environment is properly set up and optionally verifying that the DIRAC proxy is initialized. It provides additional features such as capturing output and specifying encoding.

Parameters:
  • cmd (str or list) – The command to execute. Can be a string or a list of arguments.

  • *args – Additional positional arguments to pass to subprocess.run.

  • ensure_proxy_initialized (bool, optional) – If True, ensures that the DIRAC proxy is initialized and alive. Defaults to True.

  • check (bool, optional) – If True, raises a subprocess.CalledProcessError if the command exits with a non-zero status. Defaults to True.

  • encoding (str, optional) – The encoding to use for interpreting the command output. Defaults to “utf-8”.

  • capture_output (bool, optional) – If True, captures the stdout and stderr of the command. Equivalent to setting them to subprocess.PIPE. Defaults to False.

  • gbasf2_setup_path (str, optional) – Path to the gbas`f2 setup file. Defaults to “/cvmfs/belle.kek.jp/grid/gbasf2/pro/bashrc”.

  • task (optional) – Task-specific information to pass to the environment setup. Defaults to None.

  • **kwargs – Additional keyword arguments to pass to subprocess.run.

Returns:

An instance representing the completed process.

Return type:

subprocess.CompletedProcess

Raises:

ValueError – If stdout or stderr are specified in kwargs while capture_output is True.

Notes

  • If capture_output is True, the stdout and stderr of the command are captured and returned as strings (decoded using the specified encoding).

  • The gbasf2 environment is set up using the get_gbasf2_env function.

  • If ensure_proxy_initialized is True, the DIRAC proxy is initialized using the setup_dirac_proxy function.

b2luigi.batch.processes.gbasf2.get_gbasf2_env(gbasf2_setup_path, task=None)[source]#

Retrieve the gbasf2 environment as a dictionary, which can be used to run gbasf2 commands.

This function sets up the gbasf2 environment by sourcing the specified setup file in a fresh shell and capturing the resulting environment variables. It ensures that the environment is isolated, except for the HOME variable, which is required for the setup process.

Parameters:
  • gbasf2_setup_path (str) – Path to the gbasf2 setup file.

  • task (optional) – Task parameter used to retrieve the gbasf2_proxy_group setting. Defaults to "belle" if not provided.

Returns:

A dictionary containing the environment variables set up by the gbasf2 setup file.

Return type:

dict

Raises:

FileNotFoundError – If the specified gbasf2 setup file does not exist.

b2luigi.batch.processes.gbasf2.get_proxy_info(gbasf2_setup_path='/cvmfs/belle.kek.jp/grid/gbasf2/pro/bashrc', task=None)[source]#

Retrieve a dictionary containing the proxy status by running the gbasf2_proxy_info.py script.

Parameters:
  • gbasf2_setup_path (str, optional) – The path to the gbasf2 setup script. Defaults to “/cvmfs/belle.kek.jp/grid/gbasf2/pro/bashrc”.

  • task (optional) – An optional task parameter to pass to the subprocess.

Returns:

A dictionary containing the proxy status information.

Return type:

dict

Notes

  • The function uses the run_with_gbasf2 utility to execute the gbasf2_proxy_info.py script.

b2luigi.batch.processes.gbasf2.get_dirac_user(gbasf2_setup_path='/cvmfs/belle.kek.jp/grid/gbasf2/pro/bashrc', task=None)[source]#

Retrieve the DIRAC username associated with the initialized proxy.

Parameters:
  • gbasf2_setup_path (str) – The file path to the gbasf2 setup script. Defaults to “/cvmfs/belle.kek.jp/grid/gbasf2/pro/bashrc”.

  • task (optional) – An optional task object that may be used during proxy setup.

Returns:

The DIRAC username extracted from the proxy information.

Return type:

str

Raises:

RuntimeError – If the username cannot be obtained from the proxy information.

b2luigi.batch.processes.gbasf2.setup_dirac_proxy(gbasf2_setup_path='/cvmfs/belle.kek.jp/grid/gbasf2/pro/bashrc', task=None)[source]#

Ensures that a valid DIRAC proxy is initialized for the Belle II grid system. If a proxy is already active and has sufficient remaining lifetime, no action is taken. Otherwise, the function initializes a new proxy using the gb2_proxy_init command.

Parameters:
  • gbasf2_setup_path (str) – Path to the gbasf2 setup script. Defaults to “/cvmfs/belle.kek.jp/grid/gbasf2/pro/bashrc”.

  • task (optional) – Task-specific context or configuration, if applicable.

Behavior:
  • Checks if an active proxy exists and has sufficient lifetime remaining.

  • If no valid proxy exists, initializes a new proxy using the gb2_proxy_init command.

  • Prompts the user for the certificate password during proxy initialization.

  • Validates the proxy initialization process and handles errors such as incorrect passwords or other initialization issues.

Raises:

RuntimeWarning – If the gbasf2_proxy_lifetime setting is not a positive integer.

Notes

  • The proxy lifetime and group name can be configured using the gbasf2_proxy_lifetime and gbasf2_proxy_group settings, respectively.

  • The function ensures sensitive information like passwords is securely handled and deleted after use.

b2luigi.batch.processes.gbasf2.query_lpns(ds_query: str, dirac_user: str | None = None, gbasf2_setup_path: str = '/cvmfs/belle.kek.jp/grid/gbasf2/pro/bashrc', task=None) List[str][source]#

Query DIRAC for Logical Physical Names (LPNs) matching the given dataset query.

This function interacts with the DIRAC system to retrieve a list of LPNs based on the provided dataset query string. It uses the gbasf2_ds_list.py script to perform the query and parses the output to return the results as a list of strings.

Parameters:
  • ds_query (str) – The dataset query string to search for matching LPNs.

  • dirac_user (Optional[str]) – The DIRAC user to perform the query as. If not provided, it will be determined using the get_dirac_user function.

  • gbasf2_setup_path (str) – The path to the gbasf2 setup script. Defaults to “/cvmfs/belle.kek.jp/grid/gbasf2/pro/bashrc”.

  • task – An optional task to associate with the query.

Returns:

A list of LPNs matching the dataset query.

Return type:

List[str]

Raises:

TypeError – If the output of the query is not a list.

b2luigi.batch.processes.gbasf2.get_unique_project_name(task)[source]#

Combine the gbasf2_project_name_prefix setting and the task_id hash to a unique project name.

This is done to make sure that different instances of a task with different luigi parameters result in different gbasf2 project names. When trying to redoing a task on the grid with identical parameters, rename the project name prefix, to ensure that you get a new project.

Parameters:

task – A task. The task must have a gbasf2_project_name_prefix setting, parameter, or attribute.

Returns:

A unique project name combining the prefix and a hashed task ID.

Return type:

str

Raises:
  • Exception – If the task does not have a gbasf2_project_name_prefix setting, parameter, or attribute.

  • ValueError – If the generated project name exceeds the maximum allowed length (32 characters) or contains invalid characters (non-alphanumeric, excluding _ and -).

Notes

  • The task ID hash is generated using hashlib.md5 for a consistent and compact representation.

  • The maximum length of the project name is 32 characters.

  • Only alphanumeric characters, underscores (_), and hyphens (-) are allowed in the project name.

b2luigi.batch.processes.gbasf2.lfn_follows_gb2v5_convention(lfn: str) bool[source]#
Check if the LFN follows the convention of gbasf2 release 5, i.e.

<name>_<gbasf2param>_<jobID>_<rescheduleNum>.root

The expected naming convention is:

lfn (str): Logical file name, typically a file path on the grid.

Returns:

True if the LFN adheres to the gbasf2 release 5 naming convention, False otherwise.

Return type:

bool

b2luigi.batch.processes.gbasf2._get_lfn_upto_reschedule_number(lfn: str) str[source]#

Get a string of the gbasf2 v5 LFN upto the reschule number.

E.g. if the LFN is <name>_<gbasf2param>_<jobID>_<rescheduleNum>.root return ``<name>_<gbasf2param>_<jobID>.

b2luigi.batch.processes.gbasf2.get_unique_lfns(lfns: Iterable[str]) Set[str][source]#

From a list of gbasf2 Logical File Names (LFNs) that may include duplicates due to rescheduled jobs, return a filtered set containing only the LFNs corresponding to the jobs with the highest reschedule number.

Gbasf2 v5 outputs have the format: <name>_<gbasf2param>_<jobID>_<rescheduleNum>.root. When using gb2_ds_list, duplicates may appear where all parts are identical except for the rescheduleNum. This function ensures that only the LFNs with the highest rescheduleNum are retained.

If the dataset does not follow the gbasf2 v5 naming convention, it is assumed to be produced with an older release and does not contain duplicates.

Parameters:

lfns (Iterable[str]) – A collection of LFNs to process.

Returns:

A set of LFNs with duplicates removed, retaining only the ones with the highest reschedule number for each job.

Return type:

Set[str]

b2luigi.batch.processes.gbasf2._move_downloaded_dataset_to_output_dir(project_download_path: str, output_path: str) None[source]#

Move files downloaded downloaded to the grid to their final output location.

In the Gbasf2Process, the outputs are usually downloaded with gb2_ds_get to a temporary directory (project_download_path) with a structure like

<result_dir>/B.root.partial/<project_name> ├── sub00/job_name*B.root ├── sub01/job_name*B.root ├── …

This function moves those files to their final output_path directory which has the same name as the original root file (e.g. B.root) to fulfill the luigi output definition. This output directory has the structure

<result_dir>/B.root/job_name*B.root

Parameters:
  • project_download_path – Directory into which gb2_ds_get downloaded the grid dataset. The contents should be sub<xy> data blocks containing root files.

  • output_path – Final output directory into which the ROOT files should be copied.

b2luigi.batch.processes.gbasf2._split_all_extensions(path: str) Tuple[str, str][source]#

Split all extensions from a string pathname.

Similar to os.path.splitext, but with the difference that the extensions-part is considered everything from the first non-leading dot to to the end. Leading dots at the beginning of path are considered part of the stem and not considered an extensions separator.

Returns (stem, extensions).

extensions may be empty if path does not contain non-leading dots.