import os
import shutil
import b2luigi
from b2luigi.basf2_helper.targets import ROOTLocalTarget
import subprocess
from b2luigi.basf2_helper.utils import get_basf2_git_hash
from b2luigi.core.utils import create_output_dirs, get_serialized_parameters
[docs]class Basf2Task(b2luigi.DispatchableTask):
[docs] class Basf2Task:
"""
A custom task class for handling ``basf2``-related tasks in the ``b2luigi`` framework. This class extends
:class:`b2luigi.DispatchableTask` and provides additional functionality for managing output file targets
and serialized parameters.
Attributes:
git_hash (b2luigi.Parameter): A parameter representing the ``basf2`` git hash. It is set to the
current ``basf2`` git hash (see: :meth:`get_basf2_git_hash <b2luigi.basf2_helper.utils.get_basf2_git_hash>`) by default and marked as non-significant to avoid affecting the
task's unique ID.
"""
git_hash = b2luigi.Parameter(default=get_basf2_git_hash(), significant=False)
[docs] def get_output_file_target(self, *args, **kwargs):
"""
Determines the appropriate output file target based on the file extension.
If the output file has a ".root" extension, it returns a :class:`b2luigi.basf2_helper.ROOTLocalTarget`
for the specified file. Otherwise, it delegates to the superclass
implementation of :meth:`b2luigi.Task.get_output_file_target`.
Args:
*args: Positional arguments passed to ``get_output_file_name`` and the
superclass method.
**kwargs: Keyword arguments passed to ``get_output_file_name`` and the
superclass method.
Returns:
Target: A :class:`b2luigi.basf2_helper.ROOTLocalTarget` if the file extension is ".root", otherwise
the result of the superclass's ``get_output_file_target`` method.
"""
file_name = self.get_output_file_name(*args, **kwargs)
if os.path.splitext(file_name)[-1] == ".root":
return ROOTLocalTarget(file_name)
return super().get_output_file_target(*args, **kwargs)
[docs] def get_serialized_parameters(self):
"""
Retrieve the serialized parameters of the current task. (see :meth:`get_serialized_parameters <b2luigi.core.utils.get_serialized_parameters>`)
Returns:
dict: A dictionary containing the serialized parameters of the task.
"""
return get_serialized_parameters(self)
[docs]class Basf2PathTask(Basf2Task):
"""
A task for running a ``basf2`` processing path within the ``b2luigi`` framework.
In contrast to the normal ``(b2)luigi`` tasks, the execution logic of a
:class:`Basf2PathTask <b2luigi.basf2_helper.tasks.Basf2PathTask>` is not defined in
a ``run`` method but in
:meth:`create_path <b2luigi.basf2_helper.tasks.Basf2PathTask.create_path>`.
The :meth:`create_path <b2luigi.basf2_helper.tasks.Basf2PathTask.create_path>` method needs
to return the ``basf2`` path that is created in the steering file.
Furthermore, the ``Progress`` module is automatically added and ``print(b2.statistics)``
is called after the path is processed.
.. warning::
Due to technical reasons, the path needs to be created within the :meth:`create_path <b2luigi.basf2_helper.tasks.Basf2PathTask.create_path>`
method. The path can be used in further objects, however, it is not possible
for it to originate from an outer scope.
"""
num_processes = b2luigi.IntParameter(significant=False, default=0)
max_event = b2luigi.IntParameter(significant=False, default=0)
[docs] def create_path(self):
raise NotImplementedError()
[docs] @b2luigi.on_temporary_files
def process(self):
"""
Executes the processing task using the ``basf2`` framework.
It sets the number of processes for basf2 if
``self.num_processes`` is specified. Then, it creates a processing path,
adds the ``basf2.Progress`` module to the path, and prints the path configuration.
Finally, it processes the path with the specified maximum number of events
(``self.max_event``). If ``self.max_event`` is not set, it defaults to ``0``
(process all events). After processing, it prints the ``basf2`` statistics.
Raises:
ImportError: If the basf2 module cannot be found.
"""
try:
import basf2
except ImportError:
raise ImportError("Can not find basf2. Can not use the basf2 task.")
if self.num_processes:
basf2.set_nprocesses(self.num_processes)
path = self.create_path()
path.add_module("Progress")
basf2.print_path(path)
max_event = self.max_event if self.max_event else 0
basf2.process(path=path, max_event=max_event)
print(basf2.statistics)
[docs]class SimplifiedOutputBasf2Task(Basf2PathTask):
"""
A task that simplifies the handling of output files in a ``basf2`` processing path.
This class is intended to be subclassed (see :obj:`Basf2PathTask`), and the ``create_path`` method must be
implemented to define the ``basf2`` processing path. The ``output`` method identifies
and collects output file targets from the modules in the path.
"""
[docs] def create_path(self):
raise NotImplementedError()
[docs] def output(self):
"""
Generates the output targets for the task by inspecting the modules in the created path.
Returns:
list: A list of ``ROOTLocalTarget`` objects corresponding to the output file names
specified in the "RootOutput" modules of the path.
"""
path = self.create_path()
outputs = []
for module in path.modules():
if module.type() == "RootOutput":
for param in module.available_params():
if param.name == "outputFileName":
outputs.append(ROOTLocalTarget(param.values))
return outputs
[docs]class MergerTask(Basf2Task):
"""
A task class for merging input files using a specified command.
"""
cmd = []
[docs] def output(self):
"""
Generates the output for the task by iterating over input file names and
applying filters based on the ``keys`` attribute.
Yields:
The result of :meth:`b2luigi.Task.add_to_output` for each key that passes the filtering
conditions.
Filtering Conditions:
- If the task has a ``keys`` attribute, only keys present in ``self.keys``
are processed.
"""
for key, _ in self.get_input_file_names().items():
if hasattr(self, "keys") and key not in self.keys:
continue
yield self.add_to_output(key)
[docs] @b2luigi.on_temporary_files
def process(self):
"""
Processes input files and generates output files by executing a command.
1. Creates necessary output directories using :meth:`create_output_dirs <b2luigi.core.utils.create_output_dirs>`.
2. Iterates over the input file names grouped by keys.
3. Skips processing for keys not specified in ``self.keys`` (if ``self.keys`` exists).
4. Constructs a command by appending the output file name and input file list to ``self.cmd``.
5. Executes the constructed command using ``subprocess.check_call``.
"""
create_output_dirs(self)
for key, file_list in self.get_input_file_names().items():
if hasattr(self, "keys") and key not in self.keys:
continue
args = self.cmd + [self.get_output_file_name(key)] + file_list
subprocess.check_call(args)
[docs]class HaddTask(MergerTask):
"""
:obj:`HaddTask` is a subclass of :obj:`MergerTask` that represents a task for merging ROOT files
using the ``hadd`` command-line tool.
"""
cmd = ["hadd", "-f"]
[docs]class Basf2FileMergeTask(MergerTask):
"""
:obj:`Basf2FileMergeTask` is a subclass of :obj:`MergerTask` that represents a task for merging ``basf2`` ROOT files
using the ``b2file-merge`` command-line tool.
"""
cmd = ["b2file-merge", "-f"]
[docs]class Basf2nTupleMergeTask(MergerTask):
@property
def cmd(self):
"Command to use to merge basf2 tuple files."
# ``fei_merge_files`` has been renamed to ``analysis-fei-mergefiles``, use
# the newer command if it exists in the release.
new_cmd_name = "analysis-fei-mergefiles"
old_cmd_name = "fei_merge_files"
if shutil.which(new_cmd_name):
return [new_cmd_name]
return [old_cmd_name]