Coverage for nexusLIMS/utils/files.py: 100%
107 statements
« prev ^ index » next coverage.py v7.11.3, created at 2026-03-24 05:23 +0000
« prev ^ index » next coverage.py v7.11.3, created at 2026-03-24 05:23 +0000
1"""File finding and manipulation utilities for NexusLIMS."""
3import logging
4import os
5import subprocess
6import warnings
7from datetime import datetime, timedelta
8from pathlib import Path
9from shutil import copyfile
10from typing import List
12from nexusLIMS.config import settings
14_logger = logging.getLogger(__name__)
16# hours to add to datetime objects (hack for poole testing -- should be -2 if
17# running tests from Mountain Time on files in Eastern Time)
18_tz_offset = timedelta(hours=0)
21def find_dirs_by_mtime(
22 path: str,
23 dt_from: datetime,
24 dt_to: datetime,
25 *,
26 followlinks: bool = True,
27) -> List[str]:
28 """
29 Find directories modified between two times.
31 Given two timestamps, find the directories under a path that were
32 last modified between the two.
34 .. deprecated:: 0.0.9
35 `find_dirs_by_mtime` is not recommended for use to find files for
36 record inclusion, because subsequent modifications to a directory
37 (e.g. the user wrote a text file or did some analysis afterwards)
38 means no files will be returned from that directory (because it is
39 not searched)
41 Parameters
42 ----------
43 path
44 The root path from which to start the search
45 dt_from
46 The "starting" point of the search timeframe
47 dt_to
48 The "ending" point of the search timeframe
49 followlinks
50 Argument passed on to py:func:`os.walk` to control whether
51 symbolic links are followed
53 Returns
54 -------
55 dirs : list
56 A list of the directories that have modification times within the
57 time range provided
58 """
59 dirs = []
61 # adjust the datetime objects with the tz_offset (usually should be 0) if
62 # they are naive
63 if dt_from.tzinfo is None:
64 dt_from += _tz_offset # pragma: no cover
65 if dt_to.tzinfo is None:
66 dt_to += _tz_offset # pragma: no cover
68 # use os.walk and only inspect the directories for mtime (much fewer
69 # comparisons than looking at every file):
70 _logger.info(
71 "Finding directories modified between %s and %s",
72 dt_from.isoformat(),
73 dt_to.isoformat(),
74 )
75 for dirpath, _, _ in os.walk(path, followlinks=followlinks):
76 if dt_from.timestamp() < Path(dirpath).stat().st_mtime < dt_to.timestamp():
77 dirs.append(dirpath)
78 return dirs
81def find_files_by_mtime(path: Path, dt_from, dt_to) -> List[Path]: # pragma: no cover
82 """
83 Find files motified between two times.
85 Given two timestamps, find files under a path that were
86 last modified between the two.
88 Parameters
89 ----------
90 path
91 The root path from which to start the search
92 dt_from : datetime.datetime
93 The "starting" point of the search timeframe
94 dt_to : datetime.datetime
95 The "ending" point of the search timeframe
97 Returns
98 -------
99 files : list
100 A list of the files that have modification times within the
101 time range provided (sorted by modification time)
102 """
103 warnings.warn(
104 "find_files_by_mtime has been deprecated in v1.2.0 and is "
105 "no longer tested or supported. Please use "
106 "gnu_find_files_by_mtime() instead",
107 DeprecationWarning,
108 stacklevel=2,
109 )
110 # find only the directories that have been modified between these two
111 # timestamps (should be much faster than inspecting all files)
112 # Note: this doesn't work reliably, so just look in entire path...
114 dirs = [path]
116 # adjust the datetime objects with the tz_offset (usually should be 0) if
117 # they are naive
118 if dt_from.tzinfo is None:
119 dt_from += _tz_offset
120 if dt_to.tzinfo is None:
121 dt_to += _tz_offset
123 files = set() # use a set here (faster and we won't have duplicates)
124 # for each of those directories, walk the file tree and inspect the
125 # actual files:
126 for directory in dirs:
127 for dirpath, _, filenames in os.walk(directory, followlinks=True):
128 for f in filenames:
129 fname = Path(dirpath) / f
130 if dt_from.timestamp() < fname.stat().st_mtime < dt_to.timestamp():
131 files.add(fname)
133 # convert the set to a list and sort my mtime
134 files = list(files)
135 files.sort(key=lambda f: f.stat().st_mtime)
137 return files
140def _get_find_command():
141 """
142 Get the appropriate GNU find command for the system.
144 Returns
145 -------
146 str
147 The find command to use ('find' or 'gfind')
149 Raises
150 ------
151 RuntimeError
152 If find command is not available or GNU find is required but not found
153 """
155 def _which(fname):
156 def _is_exec(f):
157 return Path(f).is_file() and os.access(f, os.X_OK)
159 for exe in os.environ["PATH"].split(os.pathsep):
160 exe_file = str(Path(exe) / fname)
161 if _is_exec(exe_file):
162 return exe_file
163 return False
165 def _is_gnu_find(find_cmd):
166 """Check if the find command is GNU find (supports -xtype)."""
167 try:
168 result = subprocess.run(
169 [find_cmd, "--version"],
170 check=False,
171 capture_output=True,
172 text=True,
173 timeout=2,
174 )
175 except (subprocess.SubprocessError, FileNotFoundError):
176 return False
177 else:
178 return "GNU findutils" in result.stdout
180 find_command = "find"
181 if not _which(find_command):
182 msg = "find command was not found on the system PATH"
183 raise RuntimeError(msg)
185 if not _is_gnu_find(find_command):
186 import platform # noqa: PLC0415
188 if platform.system() == "Darwin": # pragma: no cover
189 # macOS
190 if _which("gfind"):
191 find_command = "gfind"
192 _logger.info("BSD find detected, using gfind (GNU find) instead")
193 else:
194 msg = (
195 "BSD find detected on macOS, but GNU find is required.\n"
196 "The 'find' command on macOS does not support the '-xtype' option "
197 "needed for NexusLIMS.\n\n"
198 "Please install GNU find via Homebrew:\n"
199 " brew install findutils\n\n"
200 "This will install GNU find as 'gfind', which NexusLIMS will use "
201 "automatically."
202 )
203 raise RuntimeError(msg)
204 else:
205 _logger.warning(
206 "Non-GNU find detected. If you encounter errors, "
207 "please install GNU findutils.",
208 )
210 return find_command
213def _find_symlink_dirs(find_command, path):
214 """
215 Find symbolic links pointing to directories.
217 Parameters
218 ----------
219 find_command : str
220 The find command to use
221 path : Path
222 The root path to search
224 Returns
225 -------
226 list
227 List of symbolic link paths, or [path] if none found
228 """
229 find_path = Path(str(settings.NX_INSTRUMENT_DATA_PATH)) / path
230 cmd = [find_command, str(find_path), "-type", "l", "-xtype", "d", "-print0"]
231 _logger.info('Running followlinks find via subprocess.run: "%s"', cmd)
232 out = subprocess.run(cmd, capture_output=True, check=True)
233 paths = [f.decode() for f in out.stdout.split(b"\x00") if len(f) > 0]
234 _logger.info('Found the following symlinks: "%s"', paths)
236 if paths:
237 _logger.info("find_path is: '%s'", paths)
238 return paths
239 return [find_path]
242def _build_find_command( # noqa: PLR0913
243 find_command,
244 find_paths,
245 dt_from,
246 dt_to,
247 extensions,
248 followlinks,
249):
250 """
251 Build the find command with all arguments.
253 Parameters
254 ----------
255 find_command : str
256 The find command to use
257 find_paths : list
258 Paths to search
259 dt_from : datetime
260 Start time
261 dt_to : datetime
262 End time
263 extensions : list or None
264 File extensions to search for
265 followlinks : bool
266 Whether to follow symlinks
268 Returns
269 -------
270 list
271 Complete find command as list of arguments
272 """
273 cmd = [find_command] + (["-H"] if followlinks else [])
274 cmd += [str(p) for p in find_paths]
275 cmd += [
276 "-type",
277 "f",
278 "-newermt",
279 dt_from.isoformat(),
280 "-not",
281 "-newermt",
282 dt_to.isoformat(),
283 ]
285 # Add extension patterns
286 if extensions is not None:
287 cmd += ["("]
288 for ext in extensions:
289 cmd += ["-iname", f"*.{ext}", "-o"]
290 cmd.pop()
291 cmd += [")"]
293 # Add ignore patterns (settings already provides a list)
294 ignore_patterns = settings.NX_IGNORE_PATTERNS
295 if ignore_patterns:
296 cmd += ["-and", "("]
297 for i in ignore_patterns:
298 cmd += ["-not", "-iname", i, "-and"]
299 cmd.pop()
300 cmd += [")"]
302 cmd += ["-print0"]
303 return cmd
306def gnu_find_files_by_mtime(
307 path: Path,
308 dt_from: datetime,
309 dt_to: datetime,
310 extensions: List[str] | None = None,
311 *,
312 followlinks: bool = True,
313) -> List[Path]:
314 """
315 Find files modified between two times.
317 Given two timestamps, find files under a path that were
318 last modified between the two. Uses the system-provided GNU ``find``
319 command. In basic testing, this method was found to be approximately 3 times
320 faster than using :py:meth:`find_files_by_mtime` (which is implemented in
321 pure Python).
323 Parameters
324 ----------
325 path
326 The root path from which to start the search, relative to
327 the :ref:`NX_INSTRUMENT_DATA_PATH <config-instrument-data-path>`
328 environment setting.
329 dt_from
330 The "starting" point of the search timeframe
331 dt_to
332 The "ending" point of the search timeframe
333 extensions
334 A list of strings representing the extensions to find. If None,
335 all files between are found between the two times.
336 followlinks
337 Whether to follow symlinks using the ``find`` command via
338 the ``-H`` command line flag. This is useful when the
339 :ref:`NX_INSTRUMENT_DATA_PATH <config-instrument-data-path>` is actually a
340 directory
341 of symlinks. If this is the case and ``followlinks`` is
342 ``False``, no files will ever be found because the ``find``
343 command will not "dereference" the symbolic links it finds.
344 See comments in the code for more comments on implementation
345 of this feature.
347 Returns
348 -------
349 List[str]
350 A list of the files that have modification times within the
351 time range provided (sorted by modification time)
353 Raises
354 ------
355 RuntimeError
356 If the find command cannot be found, or running it results in output
357 to `stderr`
358 """
359 _logger.info("Using GNU `find` to search for files")
361 # Get appropriate find command
362 find_command = _get_find_command()
364 # Adjust datetime objects with tz_offset if naive
365 dt_from += _tz_offset if dt_from.tzinfo is None else timedelta(0)
366 dt_to += _tz_offset if dt_to.tzinfo is None else timedelta(0)
368 # Find symlink directories if following links
369 if followlinks:
370 find_paths = _find_symlink_dirs(find_command, path)
371 else:
372 find_paths = [Path(str(settings.NX_INSTRUMENT_DATA_PATH)) / path]
374 # Build and execute find command
375 cmd = _build_find_command(
376 find_command,
377 find_paths,
378 dt_from,
379 dt_to,
380 extensions,
381 followlinks,
382 )
383 _logger.info('Running via subprocess.run: "%s"', cmd)
384 _logger.info('Running via subprocess.run (as string): "%s"', " ".join(cmd))
385 out = subprocess.run(cmd, capture_output=True, check=True)
387 # Process results
388 files = out.stdout.split(b"\x00")
389 files = [Path(f.decode()) for f in files if len(f) > 0]
390 files = list(set(files))
391 files.sort(key=lambda f: f.stat().st_mtime)
392 _logger.info("Found %i files", len(files))
394 return files
397def _zero_bytes(fname: Path, bytes_from, bytes_to) -> Path:
398 """
399 Set certain byte locations within a file to zero.
401 This method helps creating highly-compressible test files.
403 Parameters
404 ----------
405 fname
406 bytes_from : int or :obj:`list` of str
407 The position of the file (in decimal) at which to start zeroing
408 bytes_to : int or :obj:`list` of str
409 The position of the file (in decimal) at which to stop zeroing. If
410 list, must be the same length as list given in ``bytes_from``
412 Returns
413 -------
414 new_fname
415 The modified file that has it's bytes zeroed
416 """
417 filename, ext = fname.stem, fname.suffix
418 if ext == ".ser":
419 index = int(filename.split("_")[-1])
420 basename = "_".join(filename.split("_")[:-1])
421 new_fname = fname.parent / f"{basename}_dataZeroed_{index}{ext}"
422 else:
423 new_fname = fname.parent / f"{filename}_dataZeroed{ext}"
424 copyfile(fname, new_fname)
426 if isinstance(bytes_from, int):
427 bytes_from = [bytes_from]
428 bytes_to = [bytes_to]
430 with Path(new_fname).open(mode="r+b") as f:
431 for from_byte, to_byte in zip(bytes_from, bytes_to):
432 f.seek(from_byte)
433 f.write(b"\0" * (to_byte - from_byte))
435 return new_fname