import tempfile
from luigi.target import FileSystem
import os
from contextlib import contextmanager
import logging
from typing import Any, Optional, Tuple, Dict, Generator
from b2luigi.core.target import FileSystemTarget
from b2luigi.core.settings import get_setting
from b2luigi.core.task import Task
[docs]class XRootDSystem(FileSystem):
"""
XRootD file system for ``b2luigi`` Targets. Inspiration taken from `RHofsaess <https://github.com/RHofsaess/xrd-interactive/blob/main/XRootD_utils.py>`_.
It implements some standard file system operations, which can be used by the :obj:`XRootDTarget`.
The error handling is done by assertions, since XRootD does not raise exceptions.
"""
def __init__(self, server_path: str) -> None:
"""
Args:
server_path: Path to the server, e.g. root://eosuser.cern.ch/
"""
try:
from XRootD import client
from XRootD.client.flags import DirListFlags, OpenFlags, MkDirFlags
except ModuleNotFoundError as err:
logging.error("The XRootD python package is not imported.")
raise err
self.dir_list_flags = DirListFlags
self.open_flags = OpenFlags
self.mk_dir_flags = MkDirFlags
self.server_path = server_path
self.client = client.FileSystem(self.server_path)
[docs] def exists(self, path: str) -> bool:
"""
Implementation of the exists function for the XRootDSystem.
Will return True if the file or directory exists and False if it can not be found. This might also include cases, where the server is not reachable.
Args:
path (str): The path to check for existence.
Returns:
bool: True if the path exists, False otherwise..
"""
status, _ = self.client.stat(path, self.dir_list_flags.STAT)
if not status.ok:
return False
else:
return True
[docs] def copy_file_to_remote(self, local_path: str, remote_path: str, force: bool = False) -> None:
"""
Function to copy a file from the local file system to the remote file system.
In case the copy fails, a warning will be printed and a assertion will fail.
Args:
local_path (str): The path to the file on the local file system.
remote_path (str): The destination path for the file on the remote file system.
force (bool, optional): If True, overwrites the file on the remote system if it
already exists. Defaults to False.
Raises:
AssertionError: If the file copy operation fails.
"""
status, _ = self.client.copy("file://" + local_path, self.server_path + remote_path, force=force)
if not status.ok:
logging.warning(status.message)
assert status.ok
[docs] def copy_file_from_remote(self, remote_path: str, local_path: str, force: bool = False) -> None:
"""
Function to copy a file from the remote file system to the local file system.
In case the copy fails, a warning will be printed and a assertion will fail.
Args:
remote_path: Path to the file on the remote file system.
local_path: Path to the file on the local file system.
force: If True, the file will be overwritten if it already exists. Default is False.
This method uses the client to perform the file transfer. If the copy operation
fails, a warning message is logged, and an assertion error is raised.
Args:
remote_path (str): The path to the file on the remote file system.
local_path (str): The destination path for the file on the local file system.
force (bool, optional): If True, overwrites the file on the local system if it
already exists. Defaults to False.
Raises:
AssertionError: If the file copy operation fails.
"""
status, _ = self.client.copy(f"{self.server_path}/{remote_path}", local_path, force=force)
if not status.ok:
logging.warning(f"Failed to copy file from {remote_path} to {local_path}: {status.message}")
if "file exists" in status.message:
logging.info(f"File already exists: {local_path}")
status.ok = True
assert status.ok, f"File copy operation failed: {status.message}"
[docs] def copy_dir_from_remote(self, remote_path: str, local_path: str, force: bool = False) -> None:
"""
Copies a directory and its files from the remote file system to the local file system.
This method does not support nested directories. It iterates through the contents
of the remote directory and copies each file to the specified local directory.
Args:
remote_path (str): Path to the directory on the remote file system.
local_path (str): Path to the directory on the local file system.
force (bool, optional): If True, overwrites files on the local system if they
already exist. Defaults to False.
"""
_, directory_listing = self.listdir(remote_path)
for entry in directory_listing:
if entry.statinfo.size != 512: # Check if the entry is not a directory
logging.info(f"Copying file: {entry.name}")
self.copy_file_from_remote(
os.path.join(remote_path, entry.name), os.path.join(local_path, entry.name), force=force
)
[docs] def move(self, source_path: str, dest_path: str) -> None:
"""
A function to move a file from one location to another on the XRootD server.
In case the move fails, a warning will be printed and a assertion will fail.
Args:
source_path (str): The current path of the file on the remote file system.
dest_path (str): The target path for the file on the remote file system.
Raises:
AssertionError: If the move operation fails.
"""
status, _ = self.client.mv(source_path, dest_path)
if not status.ok:
logging.warning(f"Failed to move file from {source_path} to {dest_path}: {status.message}")
assert status.ok, f"Move operation failed: {status.message}"
[docs] def mkdir(self, path: str) -> None:
"""
A function to create a directory on the remote file system.
In case the creation fails, a warning will be printed and a assertion will fail.
Args:
path (str): The path of the directory to create.
Raises:
AssertionError: If the directory creation operation fails.
"""
dir_path, _ = os.path.split(path)
if self.exists(dir_path):
logging.warning(f"Directory already exists: {dir_path}")
return
status, _ = self.client.mkdir(dir_path, self.mk_dir_flags.MAKEPATH)
if not status.ok:
logging.warning(f"Failed to create directory {path}: {status.message}")
if "File exists" in status.message:
status.ok = True
assert status.ok, f"Directory creation failed: {status.message}"
[docs] def locate(self, path: str) -> bool:
"""
Checks the location of a file on the remote file system.
This method queries the remote file system to locate the specified file.
If the operation fails, a warning is logged, and an assertion error is raised.
Args:
path (str): The path to the file on the remote file system.
Returns:
bool: True if the file location is successfully determined.
Raises:
AssertionError: If the locate operation fails.
"""
status, locations = self.client.locate(path, self.open_flags.REFRESH)
if not status.ok:
logging.warning(f"Failed to locate file {path}: {status.message}")
assert status.ok, f"Locate operation failed: {status.message}"
return True
[docs] def remove(self, path: str) -> None:
"""
A function to remove a file from the remote file system.
This function can not remove directories. Use ``remove_dir`` for that.
In case the removal fails, a warning will be printed and a assertion will fail.
Args:
path: Path to the file on the remote file system.
This method deletes a single file and does not support directory removal.
Args:
path (str): The path to the file on the remote file system.
Raises:
AssertionError: If the file removal operation fails.
"""
status, _ = self.client.rm(path)
if not status.ok:
logging.warning(f"Failed to remove file {path}: {status.message}")
assert status.ok, f"File removal failed: {status.message}"
[docs] def listdir(self, path: str, print_entries: bool = False) -> Tuple[Dict[str, int], Any]:
"""
Lists the contents of a directory on the remote file system.
This method retrieves the directory listing and categorizes entries as files or directories.
If the operation fails, a warning is logged, and an assertion error is raised.
Args:
path (str): Path to the directory on the remote file system.
Returns:
Tuple[Dict[str, int], Any]: A dictionary mapping paths to their type (1 for directories, 0 for files),
and the raw listing object.
Raises:
AssertionError: If the directory listing operation fails.
"""
dir_dict = {}
status, listing = self.client.dirlist(path, self.dir_list_flags.STAT)
if not status.ok:
logging.warning(f"Failed to list directory {path}: {status.message}")
assert status.ok, f"Directory listing failed: {status.message}"
for entry in listing:
if entry.statinfo.flags in {51, 19}: # Directory flags
assert entry.statinfo.size == 512, "Unexpected size for a directory entry"
dir_dict[f"{listing.parent}{entry.name}/"] = 1
elif entry.statinfo.flags in {48, 16}: # File flags
dir_dict[f"{listing.parent}{entry.name}"] = 0
else:
logging.warning(f"[get_directory_listing] Info: {entry}")
exit("Unknown flags. RO files, strange permissions?")
if print_entries:
print(entry.name, f"{entry.statinfo.size/1024/1024} MB")
return dir_dict, listing
[docs] def remove_dir(self, path: str) -> None:
"""
A function to iteratively remove a directory and all its content from the remote file system.
In case the removal fails, a warning will be printed and a assertion will fail.
Args:
path (str): Path to the directory on the remote file system.
Raises:
AssertionError: If the directory removal operation fails.
"""
status, listing = self.client.dirlist(path, self.dir_list_flags.STAT)
if not status.ok:
logging.warning(f"Failed to list directory {path}: {status.message}")
assert status.ok, f"Directory listing failed: {status.message}"
for entry in listing:
entry_path = os.path.join(listing.parent, entry.name)
if entry.statinfo.size == 512: # Check if the entry is a directory
logging.info(f"Recursively removing directory: {entry_path}")
assert entry.statinfo.flags in {51, 19}, "Unexpected flags for a directory entry"
self.remove_dir(entry_path)
else:
logging.info(f"Removing file: {entry_path}")
self.remove(entry_path)
status, _ = self.client.rmdir(path) # Remove the now-empty directory
logging.info(f"Status: {status.message}")
if not status.ok:
logging.warning(f"Failed to remove directory {path}: {status.message}")
assert status.ok, f"Directory removal failed: {status.message}"
[docs] def rename_dont_move(self, path: str, dest: str) -> None:
"""
Overwriting a the luigi function used to handle the atomic write problem.
(See https://github.com/spotify/luigi/blob/master/luigi/target.py#L303)
In this case it is just an alias for ``copy_file_to_remote`` with ``force=True``.
Args:
local_path: Path to the file on the local file system.
remote_path: Path to the file on the remote file system.
"""
self.copy_file_to_remote(path, dest, force=True)
[docs]class XRootDTarget(FileSystemTarget):
"""
Luigi target implementation for the XRootD file system.
"""
def __init__(self, path: str, file_system: XRootDSystem):
"""
Initialize the XRootDTarget.
Args:
path (str): Path to the file on the remote file system.
file_system (XRootDSystem): Instance of the XRootDSystem.
"""
self._file_system = file_system
super().__init__(path)
@property
def base_name(self) -> str:
"""Get the base name of the target path."""
return os.path.basename(self.path)
@property
def fs(self) -> XRootDSystem:
"""Get the associated file system."""
return self._file_system
[docs] def makedirs(self) -> None:
"""Create the target's directory on the remote file system."""
self.fs.mkdir(self.path)
[docs] def move(self) -> None:
"""Move the target to a new location."""
self.fs.move(self.path)
[docs] def get(self, path: str = "~") -> str:
"""
A function to copy the file from the remote file system to the local file system.
Args:
path: Path to copy the file to.
Returns:
Path to the copied file.
"""
self.fs.copy_file_from_remote(self.path, f"{path}/{self.base_name}")
return f"{path}/{self.base_name}"
[docs] def open(self, mode: str) -> None:
"""Raise NotImplementedError as open is not supported."""
raise NotImplementedError("XRootDTarget does not support open yet.")