Source code for b2luigi.core.target
from contextlib import contextmanager
import os
import random
from typing import Generator, Optional
import luigi
import tempfile
from b2luigi.core.settings import get_setting
[docs]
class FileSystemTarget(luigi.target.FileSystemTarget):
"""A local file system target that extends :class:`luigi.LocalTarget` with temporary file handling.
This target provides additional functionality for handling temporary files during task execution,
including support for temporary input and output paths with proper cleanup and atomic file operations.
Attributes:
scratch_dir (str): Directory path where temporary files will be created
Args:
_scratch_dir (str): Directory path where temporary files will be created
*args: Additional positional arguments passed to :obj:`LocalTarget`
**kwargs: Additional keyword arguments passed to :obj:`LocalTarget`
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
@property
def tmp_name(self) -> str:
"""Generate a temporary filename based on the target's path.
Creates a temporary filename by adding a random suffix to the original filename
while preserving the file extension.
Returns:
str: A temporary filename in the format::
"{original_name}-luigi-tmp-{random_number}.{extension}"
Warning:
This method assumes the file has an extension. Files without extensions will raise an :obj:``IndexError``.
This limitation should be addressed in future versions.
Example:
If ``self.path`` is `"data.txt"`, might return `"data-luigi-tmp-1234567890.txt"`
"""
num = random.randrange(0, 10_000_000_000)
filename_parts = os.path.basename(self.path).split(".")
prefix = filename_parts[0]
extension = "".join(filename_parts[1:]) if len(filename_parts) > 1 else ""
_temp_path = f"{prefix}-luigi-tmp-{num:010}.{extension}{self._trailing_slash()}"
return _temp_path
[docs]
@contextmanager
def temporary_path(self, task: Optional[luigi.Task] = None, **tmp_file_kwargs) -> Generator[str, None, None]:
"""Create a temporary file for output that will be moved to the final location.
Implements atomic write operations by first writing to a temporary location
and then moving the file to its final destination only after successful completion.
Args:
**tmp_file_kwargs: Keyword arguments passed to ``tempfile.TemporaryDirectory()``
Yields:
str: Absolute path to the temporary file for writing
Note:
- The temporary directory and its contents are automatically cleaned up
after the file is copied to its final location
- The final copy operation is atomic, preventing partial writes from
corrupting the output file
- Parent directories of the target path will be created if they don't exist
Example:
.. code-block:: python
with target.temporary_path() as tmp_output:
with open(tmp_output, 'w') as f:
f.write('result data')
# File is automatically moved to target location after context exit
"""
# Use a temporary directory
with tempfile.TemporaryDirectory(
dir=get_setting("scratch_dir", task=task, default="/tmp"), **tmp_file_kwargs
) as tmp_dir:
tmp_path = os.path.join(tmp_dir, self.tmp_name)
os.makedirs(os.path.dirname(tmp_path), exist_ok=True)
yield tmp_path
self.fs.rename_dont_move(tmp_path, self.path)
[docs]
class LocalTarget(FileSystemTarget, luigi.LocalTarget):
"""A local file system target that extends :class:`luigi.LocalTarget` with temporary file handling.
This target provides additional functionality for handling temporary files during task execution,
including support for temporary input and output paths with proper cleanup and atomic file operations.
Attributes:
scratch_dir (str): Directory path where temporary files will be created
Args:
_scratch_dir (str): Directory path where temporary files will be created
*args: Additional positional arguments passed to :class:`luigi.LocalTarget`
**kwargs: Additional keyword arguments passed to :class:`luigi.LocalTarget`
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)