"""File finding and manipulation utilities for NexusLIMS."""
import logging
import os
import subprocess
import warnings
from datetime import datetime, timedelta
from pathlib import Path
from shutil import copyfile
from typing import List
from nexusLIMS.config import settings
_logger = logging.getLogger(__name__)
# hours to add to datetime objects (hack for poole testing -- should be -2 if
# running tests from Mountain Time on files in Eastern Time)
_tz_offset = timedelta(hours=0)
[docs]
def find_dirs_by_mtime(
path: str,
dt_from: datetime,
dt_to: datetime,
*,
followlinks: bool = True,
) -> List[str]:
"""
Find directories modified between two times.
Given two timestamps, find the directories under a path that were
last modified between the two.
.. deprecated:: 0.0.9
`find_dirs_by_mtime` is not recommended for use to find files for
record inclusion, because subsequent modifications to a directory
(e.g. the user wrote a text file or did some analysis afterwards)
means no files will be returned from that directory (because it is
not searched)
Parameters
----------
path
The root path from which to start the search
dt_from
The "starting" point of the search timeframe
dt_to
The "ending" point of the search timeframe
followlinks
Argument passed on to py:func:`os.walk` to control whether
symbolic links are followed
Returns
-------
dirs : list
A list of the directories that have modification times within the
time range provided
"""
dirs = []
# adjust the datetime objects with the tz_offset (usually should be 0) if
# they are naive
if dt_from.tzinfo is None:
dt_from += _tz_offset # pragma: no cover
if dt_to.tzinfo is None:
dt_to += _tz_offset # pragma: no cover
# use os.walk and only inspect the directories for mtime (much fewer
# comparisons than looking at every file):
_logger.info(
"Finding directories modified between %s and %s",
dt_from.isoformat(),
dt_to.isoformat(),
)
for dirpath, _, _ in os.walk(path, followlinks=followlinks):
if dt_from.timestamp() < Path(dirpath).stat().st_mtime < dt_to.timestamp():
dirs.append(dirpath)
return dirs
[docs]
def find_files_by_mtime(path: Path, dt_from, dt_to) -> List[Path]: # pragma: no cover
"""
Find files motified between two times.
Given two timestamps, find files under a path that were
last modified between the two.
Parameters
----------
path
The root path from which to start the search
dt_from : datetime.datetime
The "starting" point of the search timeframe
dt_to : datetime.datetime
The "ending" point of the search timeframe
Returns
-------
files : list
A list of the files that have modification times within the
time range provided (sorted by modification time)
"""
warnings.warn(
"find_files_by_mtime has been deprecated in v1.2.0 and is "
"no longer tested or supported. Please use "
"gnu_find_files_by_mtime() instead",
DeprecationWarning,
stacklevel=2,
)
# find only the directories that have been modified between these two
# timestamps (should be much faster than inspecting all files)
# Note: this doesn't work reliably, so just look in entire path...
dirs = [path]
# adjust the datetime objects with the tz_offset (usually should be 0) if
# they are naive
if dt_from.tzinfo is None:
dt_from += _tz_offset
if dt_to.tzinfo is None:
dt_to += _tz_offset
files = set() # use a set here (faster and we won't have duplicates)
# for each of those directories, walk the file tree and inspect the
# actual files:
for directory in dirs:
for dirpath, _, filenames in os.walk(directory, followlinks=True):
for f in filenames:
fname = Path(dirpath) / f
if dt_from.timestamp() < fname.stat().st_mtime < dt_to.timestamp():
files.add(fname)
# convert the set to a list and sort my mtime
files = list(files)
files.sort(key=lambda f: f.stat().st_mtime)
return files
def _get_find_command():
"""
Get the appropriate GNU find command for the system.
Returns
-------
str
The find command to use ('find' or 'gfind')
Raises
------
RuntimeError
If find command is not available or GNU find is required but not found
"""
def _which(fname):
def _is_exec(f):
return Path(f).is_file() and os.access(f, os.X_OK)
for exe in os.environ["PATH"].split(os.pathsep):
exe_file = str(Path(exe) / fname)
if _is_exec(exe_file):
return exe_file
return False
def _is_gnu_find(find_cmd):
"""Check if the find command is GNU find (supports -xtype)."""
try:
result = subprocess.run(
[find_cmd, "--version"],
check=False,
capture_output=True,
text=True,
timeout=2,
)
except (subprocess.SubprocessError, FileNotFoundError):
return False
else:
return "GNU findutils" in result.stdout
find_command = "find"
if not _which(find_command):
msg = "find command was not found on the system PATH"
raise RuntimeError(msg)
if not _is_gnu_find(find_command):
import platform # noqa: PLC0415
if platform.system() == "Darwin": # pragma: no cover
# macOS
if _which("gfind"):
find_command = "gfind"
_logger.info("BSD find detected, using gfind (GNU find) instead")
else:
msg = (
"BSD find detected on macOS, but GNU find is required.\n"
"The 'find' command on macOS does not support the '-xtype' option "
"needed for NexusLIMS.\n\n"
"Please install GNU find via Homebrew:\n"
" brew install findutils\n\n"
"This will install GNU find as 'gfind', which NexusLIMS will use "
"automatically."
)
raise RuntimeError(msg)
else:
_logger.warning(
"Non-GNU find detected. If you encounter errors, "
"please install GNU findutils.",
)
return find_command
def _find_symlink_dirs(find_command, path):
"""
Find symbolic links pointing to directories.
Parameters
----------
find_command : str
The find command to use
path : Path
The root path to search
Returns
-------
list
List of symbolic link paths, or [path] if none found
"""
find_path = Path(str(settings.NX_INSTRUMENT_DATA_PATH)) / path
cmd = [find_command, str(find_path), "-type", "l", "-xtype", "d", "-print0"]
_logger.info('Running followlinks find via subprocess.run: "%s"', cmd)
out = subprocess.run(cmd, capture_output=True, check=True)
paths = [f.decode() for f in out.stdout.split(b"\x00") if len(f) > 0]
_logger.info('Found the following symlinks: "%s"', paths)
if paths:
_logger.info("find_path is: '%s'", paths)
return paths
return [find_path]
def _build_find_command( # noqa: PLR0913
find_command,
find_paths,
dt_from,
dt_to,
extensions,
followlinks,
):
"""
Build the find command with all arguments.
Parameters
----------
find_command : str
The find command to use
find_paths : list
Paths to search
dt_from : datetime
Start time
dt_to : datetime
End time
extensions : list or None
File extensions to search for
followlinks : bool
Whether to follow symlinks
Returns
-------
list
Complete find command as list of arguments
"""
cmd = [find_command] + (["-H"] if followlinks else [])
cmd += [str(p) for p in find_paths]
cmd += [
"-type",
"f",
"-newermt",
dt_from.isoformat(),
"-not",
"-newermt",
dt_to.isoformat(),
]
# Add extension patterns
if extensions is not None:
cmd += ["("]
for ext in extensions:
cmd += ["-iname", f"*.{ext}", "-o"]
cmd.pop()
cmd += [")"]
# Add ignore patterns (settings already provides a list)
ignore_patterns = settings.NX_IGNORE_PATTERNS
if ignore_patterns:
cmd += ["-and", "("]
for i in ignore_patterns:
cmd += ["-not", "-iname", i, "-and"]
cmd.pop()
cmd += [")"]
cmd += ["-print0"]
return cmd
[docs]
def gnu_find_files_by_mtime(
path: Path,
dt_from: datetime,
dt_to: datetime,
extensions: List[str] | None = None,
*,
followlinks: bool = True,
) -> List[Path]:
"""
Find files modified between two times.
Given two timestamps, find files under a path that were
last modified between the two. Uses the system-provided GNU ``find``
command. In basic testing, this method was found to be approximately 3 times
faster than using :py:meth:`find_files_by_mtime` (which is implemented in
pure Python).
Parameters
----------
path
The root path from which to start the search, relative to
the :ref:`NX_INSTRUMENT_DATA_PATH <config-instrument-data-path>`
environment setting.
dt_from
The "starting" point of the search timeframe
dt_to
The "ending" point of the search timeframe
extensions
A list of strings representing the extensions to find. If None,
all files between are found between the two times.
followlinks
Whether to follow symlinks using the ``find`` command via
the ``-H`` command line flag. This is useful when the
:ref:`NX_INSTRUMENT_DATA_PATH <config-instrument-data-path>` is actually a
directory
of symlinks. If this is the case and ``followlinks`` is
``False``, no files will ever be found because the ``find``
command will not "dereference" the symbolic links it finds.
See comments in the code for more comments on implementation
of this feature.
Returns
-------
List[str]
A list of the files that have modification times within the
time range provided (sorted by modification time)
Raises
------
RuntimeError
If the find command cannot be found, or running it results in output
to `stderr`
"""
_logger.info("Using GNU `find` to search for files")
# Get appropriate find command
find_command = _get_find_command()
# Adjust datetime objects with tz_offset if naive
dt_from += _tz_offset if dt_from.tzinfo is None else timedelta(0)
dt_to += _tz_offset if dt_to.tzinfo is None else timedelta(0)
# Find symlink directories if following links
if followlinks:
find_paths = _find_symlink_dirs(find_command, path)
else:
find_paths = [Path(str(settings.NX_INSTRUMENT_DATA_PATH)) / path]
# Build and execute find command
cmd = _build_find_command(
find_command,
find_paths,
dt_from,
dt_to,
extensions,
followlinks,
)
_logger.info('Running via subprocess.run: "%s"', cmd)
_logger.info('Running via subprocess.run (as string): "%s"', " ".join(cmd))
out = subprocess.run(cmd, capture_output=True, check=True)
# Process results
files = out.stdout.split(b"\x00")
files = [Path(f.decode()) for f in files if len(f) > 0]
files = list(set(files))
files.sort(key=lambda f: f.stat().st_mtime)
_logger.info("Found %i files", len(files))
return files
def _zero_bytes(fname: Path, bytes_from, bytes_to) -> Path:
"""
Set certain byte locations within a file to zero.
This method helps creating highly-compressible test files.
Parameters
----------
fname
bytes_from : int or :obj:`list` of str
The position of the file (in decimal) at which to start zeroing
bytes_to : int or :obj:`list` of str
The position of the file (in decimal) at which to stop zeroing. If
list, must be the same length as list given in ``bytes_from``
Returns
-------
new_fname
The modified file that has it's bytes zeroed
"""
filename, ext = fname.stem, fname.suffix
if ext == ".ser":
index = int(filename.split("_")[-1])
basename = "_".join(filename.split("_")[:-1])
new_fname = fname.parent / f"{basename}_dataZeroed_{index}{ext}"
else:
new_fname = fname.parent / f"{filename}_dataZeroed{ext}"
copyfile(fname, new_fname)
if isinstance(bytes_from, int):
bytes_from = [bytes_from]
bytes_to = [bytes_to]
with Path(new_fname).open(mode="r+b") as f:
for from_byte, to_byte in zip(bytes_from, bytes_to):
f.seek(from_byte)
f.write(b"\0" * (to_byte - from_byte))
return new_fname