Source code for b2luigi.core.parameter
import hashlib
from inspect import signature
import luigi
from luigi.parameter import _no_value
from typing import Callable, Optional
[docs]
def wrap_parameter() -> None:
"""
Monkey patch the parameter base class (and with it all other parameters(
of luigi to include three additional parameters in its constructor:
``hashed``, ``hash_function``, ``hidden``, ``grouping`` and ``grouping_function``.
Enabling the ``hashed`` parameter will use a hashed version of the
parameter value when creating file paths our of the parameters of a task
instead of the value itself. By default an md5 hash is used. A custom
hash function can be provided via the ``hash_function`` parameter. This
function should take one input, the value of the parameter. It is up
to the user to ensure a unique string is created from the input.
This is especially useful when you have list, string or dict parameters,
where the resulting file path may include "/" or "{}".
With the ``hidden`` parameter, you can control whether the parameter
should be hiddened in the task's output directory structure when using
:meth:`add_to_output <b2luigi.Task.add_to_output>`.
With the ``grouping`` parameter, you can control whether the parameter
should be treated as a grouping parameter. If no ``grouping_function`` is provided,
the default function will be to return a list of the input value. You still
treat the parameter as a normal parameter when defining the task, but during
execution, the task will be executed once for each value in the group. If
you provide a custom ``grouping_function``, it should follow the format:
``function(iterable[x])->x`` where ``x`` is the parameter you want to group over.
To enable grouping, you also need to set the task property ``max_grouping_size``
to a value greater than 1. For more information on parameter grouping, see :ref:`parameter-grouping-label`.
.. caution::
This will remove the parameter from the unique output of the task,
so be sure to add it back, e.g. into the output file name:
.. code-block:: python
class MyTask(b2luigi.Task):
iddened_parameter = b2luigi.Parameter(hidden=True)
def output(self):
yield self.add_to_output(f"test_{self.hiddened_parameter}.txt")
"""
import b2luigi
from b2luigi.core.utils import get_luigi_logger
parameter_class = b2luigi.Parameter
def serialize_hashed(self, x):
if self.hash_function is None:
return "hashed_" + hashlib.md5(str(x).encode()).hexdigest()
else:
return self.hash_function(x)
old_init = parameter_class.__init__
def __init__(
self,
hashed: bool = False,
hash_function: Optional[Callable] = None,
hidden: Optional[bool] = None,
grouping: bool = False,
grouping_function: Optional[Callable] = None,
*args,
**kwargs,
):
old_init(self, *args, **kwargs)
if hash_function is not None:
n_params = len(signature(hash_function).parameters)
assert n_params == 1, f"Custom hash function can have only 1 argument, found {n_params}"
self.hash_function = hash_function
self.hidden = hidden if hidden is not None else not self.significant
if not self.significant and not self.hidden:
raise ValueError("Parameter cannot be both hidden=False and significant=False.")
if hasattr(self, "batch_method") and self.batch_method is not None:
logger = get_luigi_logger()
logger.warning(
f"Warning: Parameter {self} has a batch_method given.\n"
"Internally, we use this for parameter grouping."
"If you intended to use the parameter grouping feature, "
"please set the grouping parameter to True and provide a grouping_function if you want.\n"
"We overwrite the batch_method internally when grouping is enabled, so the old batch_method will be lost."
)
self.grouping = grouping
if self.grouping:
if grouping_function is None:
self._batch_method = lambda x: [i for i in x]
else:
self._batch_method = grouping_function
if self.hash_function is None:
def _hash_function(x) -> str:
try:
return "_".join(str(i) for i in sorted(x))
except TypeError:
return str(x)
self.hash_function = _hash_function
hashed = True
if hashed:
self.serialize_hashed = lambda x: serialize_hashed(self, x)
parameter_class.__init__ = __init__
class BoolParameter(luigi.BoolParameter):
"""Copied BoolParameter without default value"""
def __init__(self, **kwargs):
if any(k in kwargs for k in ["grouping", "grouping_function", "batch_method"]):
raise ValueError("BoolParameter does not support grouping parameters.")
kwargs.setdefault("default", _no_value)
luigi.Parameter.__init__(self, **kwargs)
class BatchIntParameter(luigi.IntParameter):
def next_in_enumeration(self, value) -> None:
return None