Belle II specific examples#

The following examples are not of interest to the general audience, but only for basf2 users.

Running at the NAF#

The environment on the workers is different than on the scheduling machine, so we can not just copy the environment variables as on KEKCC.

You can use setup script (e.g. called setup_basf2.sh) with the following content

source /cvmfs/belle.cern.ch/tools/b2setup release-XX-XX-XX

All you have to do is specify the setup script as the env_script setting and also set the executable explicitly.

from time import sleep
import os

import b2luigi


class MyTask(b2luigi.Task):
    parameter = b2luigi.IntParameter()

    def output(self):
        yield self.add_to_output("test.txt")

    def run(self):
        sleep(self.parameter)

        with open(self.get_output_file_name("test.txt"), "w") as f:
            f.write("Test")


class Wrapper(b2luigi.Task):
    def requires(self):
        for i in range(10):
            yield MyTask(parameter=i)

    def output(self):
        yield self.add_to_output("test.txt")

    def run(self):
        with open(self.get_output_file_name("test.txt"), "w") as f:
            f.write("Test")


if __name__ == '__main__':
    # Choose htcondor as our batch system
    b2luigi.set_setting("batch_system", "htcondor")

    # Setup the correct environment on the workers
    b2luigi.set_setting("env_script", "setup_basf2.sh")

    # Most likely your executable from the submission node is not the same on
    # the worker node, so specify it explicitly
    b2luigi.set_setting("executable", ["python3"])

    # Where to store the results
    b2luigi.set_setting("result_dir", "results")

    b2luigi.process(Wrapper(), batch=True, workers=100)

Of course it is also possible to set those settings in the settings.json or as task-specific parameters. Please check out b2luigi.get_setting() for more information.

Please note that the called script as well as the results folder need to be accessible from both the scheduler and the worker machines. If needed, you can also include more setup steps in the source script.

Running at KEKCC#

KEKCC uses LSF as the batch system. As this is the default for b2luigi there is nothing you need to do.

nTuple Generation#

import b2luigi as luigi
from b2luigi.basf2_helper import Basf2PathTask, Basf2nTupleMergeTask

import basf2

import modularAnalysis


class AnalysisTask(Basf2PathTask):
    experiment_number = luigi.IntParameter()
    run_number = luigi.IntParameter()
    mode = luigi.Parameter()
    file_number = luigi.IntParameter()

    def output(self):
        # Define the outputs here
        yield self.add_to_output("D_n_tuple.root")
        yield self.add_to_output("B_n_tuple.root")

    def create_path(self):
        # somehow create filenames from parameters
        # self.experiment_number, self.run_number,
        # self.mode and self.file_number
        # (parameters just examples)
        input_file_names = ..

        path = basf2.Path()
        modularAnalysis.inputMdstList('default', input_file_names, path=path)

        # Now fill your particle lists, just examples
        modularAnalysis.fillParticleLists([('K+', 'kaonID > 0.1'), ('pi+', 'pionID > 0.1')],
                                          path=path)
        modularAnalysis.reconstructDecay('D0 -> K- pi+', '1.7 < M < 1.9', path=path)
        modularAnalysis.fitVertex('D0', 0.1, path=path)
        modularAnalysis.matchMCTruth('D0', path=path)
        modularAnalysis.reconstructDecay('B- -> D0 pi-', '5.2 < Mbc < 5.3', path=path)
        modularAnalysis.fitVertex('B+', 0.1, path=path)
        modularAnalysis.matchMCTruth('B-', path=path)

        # When exporting, use the function get_output_file_name()
        modularAnalysis.variablesToNtuple('D0',
                                        ['M', 'p', 'E', 'useCMSFrame(p)', 'useCMSFrame(E)',
                                        'daughter(0, kaonID)', 'daughter(1, pionID)', 'isSignal', 'mcErrors'],
                                        filename=self.get_output_file_name("D_n_tuple.root"),
                                        path=path)
        modularAnalysis.variablesToNtuple('B-',
                                        ['Mbc', 'deltaE', 'isSignal', 'mcErrors', 'M'],
                                        filename=self.get_output_file_name("B_n_tuple.root"),
                                        path=path)
        return path


class AnalysisWrapperTask(luigi.WrapperTask):
    def requires(self):
        # somehow loop over the runs, experiment etc.
            yield self.clone(AnalysisTask, experiment_number=...)


if __name__ == "__main__":
    luigi.process(AnalysisWrapperTask(), workers=500)

Ensuring the basf2 version#

One can also use the the parameters and automatically generated directory structure to ensure that the same basf2 version is used across Tasks. For that, you can simply add this parameter to your Task:

from b2luigi.basf2_helper import Basf2PathTask,
from b2luigi.basf2_helper.utils import get_basf2_git_hash

class MyBasf2PathTask():

    git_hash = b2luigi.Parameter(default=get_basf2_git_hash())

The get_basf2_git_hash function will automatically detect your basf2 version.

Standard Simulation, Reconstruction and some nTuple Generation#

import b2luigi as luigi
from b2luigi.basf2_helper import Basf2PathTask, Basf2nTupleMergeTask

from enum import Enum

import basf2

import modularAnalysis
import simulation
import vertex
import generators
import reconstruction
import mdst


class SimulationType(Enum):
    y4s = "Y(4S)"
    continuum = "Continuum"


class SimulationTask(Basf2PathTask):
    n_events = luigi.IntParameter()
    event_type = luigi.EnumParameter(enum=SimulationType)

    def create_path(self):
        path = basf2.Path()
        modularAnalysis.setupEventInfo(self.n_events, path)

        if self.event_type == SimulationType.y4s:
            # In current main branch and release 5 the Y(4S)decay file is moved,
            # so try old and new locations.

            # With ``silent=True``, ``find_file`` returns empty string if nothing is
            # found. With ``silent=False``, a ``FileNotFoundError`` exception is
            # raised.
            dec_file = basf2.find_file("analysis/examples/tutorials/B2A101-Y4SEventGeneration.dec", silent=True)
            if not dec_file:
                dec_file = basf2.find_file("analysis/examples/simulations/B2A101-Y4SEventGeneration.dec")
        elif self.event_type == SimulationType.continuum:
            dec_file = basf2.find_file("analysis/examples/simulations/B2A102-ccbarEventGeneration.dec")
        else:
            raise ValueError(f"Event type {self.event_type} is not valid. It should be either 'Y(4S)' or 'Continuum'!")

        generators.add_evtgen_generator(path, "signal", dec_file)
        simulation.add_simulation(path)

        path.add_module("RootOutput", outputFileName=self.get_output_file_name("simulation_full_output.root"))

        return path

    def output(self):
        yield self.add_to_output("simulation_full_output.root")


@luigi.requires(SimulationTask)
class ReconstructionTask(Basf2PathTask):
    def create_path(self):
        path = basf2.create_path()

        path.add_module("RootInput", inputFileNames=self.get_input_file_names("simulation_full_output.root"))

        path.add_module("Gearbox")
        path.add_module("Geometry")
        reconstruction.add_reconstruction(path)

        mdst.add_mdst_output(path=path, filename=self.get_output_file_name("reconstructed_output.root"))

        return path

    def output(self):
        yield self.add_to_output("reconstructed_output.root")


@luigi.requires(ReconstructionTask)
class AnalysisTask(Basf2PathTask):
    def create_path(self):
        path = basf2.Path()
        modularAnalysis.inputMdstList("default", self.get_input_file_names("reconstructed_output.root"), path=path)
        modularAnalysis.fillParticleLists([("K+", "kaonID > 0.1"), ("pi+", "pionID > 0.1")], path=path)
        modularAnalysis.reconstructDecay("D0 -> K- pi+", "1.7 < M < 1.9", path=path)
        modularAnalysis.matchMCTruth("D0", path=path)
        modularAnalysis.reconstructDecay("B- -> D0 pi-", "5.2 < Mbc < 5.3", path=path)
        try:  # treeFit is the new function name in light releases after release 4 (e.g. light-2002-janus)
            vertex.treeFit("B+", 0.1, update_all_daughters=True, path=path)
        except AttributeError:  # vertexTree is the function name in release 4
            vertex.vertexTree("B+", 0.1, update_all_daughters=True, path=path)
        modularAnalysis.matchMCTruth("B-", path=path)
        modularAnalysis.variablesToNtuple(
            "D0",
            [
                "M",
                "p",
                "E",
                "useCMSFrame(p)",
                "useCMSFrame(E)",
                "daughter(0, kaonID)",
                "daughter(1, pionID)",
                "isSignal",
                "mcErrors",
            ],
            filename=self.get_output_file_name("D_n_tuple.root"),
            path=path,
        )
        modularAnalysis.variablesToNtuple(
            "B-",
            ["Mbc", "deltaE", "isSignal", "mcErrors", "M"],
            filename=self.get_output_file_name("B_n_tuple.root"),
            path=path,
        )
        return path

    def output(self):
        yield self.add_to_output("D_n_tuple.root")
        yield self.add_to_output("B_n_tuple.root")


class AggregatorTask(Basf2nTupleMergeTask):
    n_events = luigi.IntParameter()

    def requires(self):
        for event_type in SimulationType:
            yield self.clone(AnalysisTask, event_type=event_type)


if __name__ == "__main__":
    luigi.process(AggregatorTask(n_events=1), workers=4)