Coverage for nexusLIMS/utils/files.py: 100%

107 statements  

« prev     ^ index     » next       coverage.py v7.11.3, created at 2026-03-24 05:23 +0000

1"""File finding and manipulation utilities for NexusLIMS.""" 

2 

3import logging 

4import os 

5import subprocess 

6import warnings 

7from datetime import datetime, timedelta 

8from pathlib import Path 

9from shutil import copyfile 

10from typing import List 

11 

12from nexusLIMS.config import settings 

13 

14_logger = logging.getLogger(__name__) 

15 

16# hours to add to datetime objects (hack for poole testing -- should be -2 if 

17# running tests from Mountain Time on files in Eastern Time) 

18_tz_offset = timedelta(hours=0) 

19 

20 

21def find_dirs_by_mtime( 

22 path: str, 

23 dt_from: datetime, 

24 dt_to: datetime, 

25 *, 

26 followlinks: bool = True, 

27) -> List[str]: 

28 """ 

29 Find directories modified between two times. 

30 

31 Given two timestamps, find the directories under a path that were 

32 last modified between the two. 

33 

34 .. deprecated:: 0.0.9 

35 `find_dirs_by_mtime` is not recommended for use to find files for 

36 record inclusion, because subsequent modifications to a directory 

37 (e.g. the user wrote a text file or did some analysis afterwards) 

38 means no files will be returned from that directory (because it is 

39 not searched) 

40 

41 Parameters 

42 ---------- 

43 path 

44 The root path from which to start the search 

45 dt_from 

46 The "starting" point of the search timeframe 

47 dt_to 

48 The "ending" point of the search timeframe 

49 followlinks 

50 Argument passed on to py:func:`os.walk` to control whether 

51 symbolic links are followed 

52 

53 Returns 

54 ------- 

55 dirs : list 

56 A list of the directories that have modification times within the 

57 time range provided 

58 """ 

59 dirs = [] 

60 

61 # adjust the datetime objects with the tz_offset (usually should be 0) if 

62 # they are naive 

63 if dt_from.tzinfo is None: 

64 dt_from += _tz_offset # pragma: no cover 

65 if dt_to.tzinfo is None: 

66 dt_to += _tz_offset # pragma: no cover 

67 

68 # use os.walk and only inspect the directories for mtime (much fewer 

69 # comparisons than looking at every file): 

70 _logger.info( 

71 "Finding directories modified between %s and %s", 

72 dt_from.isoformat(), 

73 dt_to.isoformat(), 

74 ) 

75 for dirpath, _, _ in os.walk(path, followlinks=followlinks): 

76 if dt_from.timestamp() < Path(dirpath).stat().st_mtime < dt_to.timestamp(): 

77 dirs.append(dirpath) 

78 return dirs 

79 

80 

81def find_files_by_mtime(path: Path, dt_from, dt_to) -> List[Path]: # pragma: no cover 

82 """ 

83 Find files motified between two times. 

84 

85 Given two timestamps, find files under a path that were 

86 last modified between the two. 

87 

88 Parameters 

89 ---------- 

90 path 

91 The root path from which to start the search 

92 dt_from : datetime.datetime 

93 The "starting" point of the search timeframe 

94 dt_to : datetime.datetime 

95 The "ending" point of the search timeframe 

96 

97 Returns 

98 ------- 

99 files : list 

100 A list of the files that have modification times within the 

101 time range provided (sorted by modification time) 

102 """ 

103 warnings.warn( 

104 "find_files_by_mtime has been deprecated in v1.2.0 and is " 

105 "no longer tested or supported. Please use " 

106 "gnu_find_files_by_mtime() instead", 

107 DeprecationWarning, 

108 stacklevel=2, 

109 ) 

110 # find only the directories that have been modified between these two 

111 # timestamps (should be much faster than inspecting all files) 

112 # Note: this doesn't work reliably, so just look in entire path... 

113 

114 dirs = [path] 

115 

116 # adjust the datetime objects with the tz_offset (usually should be 0) if 

117 # they are naive 

118 if dt_from.tzinfo is None: 

119 dt_from += _tz_offset 

120 if dt_to.tzinfo is None: 

121 dt_to += _tz_offset 

122 

123 files = set() # use a set here (faster and we won't have duplicates) 

124 # for each of those directories, walk the file tree and inspect the 

125 # actual files: 

126 for directory in dirs: 

127 for dirpath, _, filenames in os.walk(directory, followlinks=True): 

128 for f in filenames: 

129 fname = Path(dirpath) / f 

130 if dt_from.timestamp() < fname.stat().st_mtime < dt_to.timestamp(): 

131 files.add(fname) 

132 

133 # convert the set to a list and sort my mtime 

134 files = list(files) 

135 files.sort(key=lambda f: f.stat().st_mtime) 

136 

137 return files 

138 

139 

140def _get_find_command(): 

141 """ 

142 Get the appropriate GNU find command for the system. 

143 

144 Returns 

145 ------- 

146 str 

147 The find command to use ('find' or 'gfind') 

148 

149 Raises 

150 ------ 

151 RuntimeError 

152 If find command is not available or GNU find is required but not found 

153 """ 

154 

155 def _which(fname): 

156 def _is_exec(f): 

157 return Path(f).is_file() and os.access(f, os.X_OK) 

158 

159 for exe in os.environ["PATH"].split(os.pathsep): 

160 exe_file = str(Path(exe) / fname) 

161 if _is_exec(exe_file): 

162 return exe_file 

163 return False 

164 

165 def _is_gnu_find(find_cmd): 

166 """Check if the find command is GNU find (supports -xtype).""" 

167 try: 

168 result = subprocess.run( 

169 [find_cmd, "--version"], 

170 check=False, 

171 capture_output=True, 

172 text=True, 

173 timeout=2, 

174 ) 

175 except (subprocess.SubprocessError, FileNotFoundError): 

176 return False 

177 else: 

178 return "GNU findutils" in result.stdout 

179 

180 find_command = "find" 

181 if not _which(find_command): 

182 msg = "find command was not found on the system PATH" 

183 raise RuntimeError(msg) 

184 

185 if not _is_gnu_find(find_command): 

186 import platform # noqa: PLC0415 

187 

188 if platform.system() == "Darwin": # pragma: no cover 

189 # macOS 

190 if _which("gfind"): 

191 find_command = "gfind" 

192 _logger.info("BSD find detected, using gfind (GNU find) instead") 

193 else: 

194 msg = ( 

195 "BSD find detected on macOS, but GNU find is required.\n" 

196 "The 'find' command on macOS does not support the '-xtype' option " 

197 "needed for NexusLIMS.\n\n" 

198 "Please install GNU find via Homebrew:\n" 

199 " brew install findutils\n\n" 

200 "This will install GNU find as 'gfind', which NexusLIMS will use " 

201 "automatically." 

202 ) 

203 raise RuntimeError(msg) 

204 else: 

205 _logger.warning( 

206 "Non-GNU find detected. If you encounter errors, " 

207 "please install GNU findutils.", 

208 ) 

209 

210 return find_command 

211 

212 

213def _find_symlink_dirs(find_command, path): 

214 """ 

215 Find symbolic links pointing to directories. 

216 

217 Parameters 

218 ---------- 

219 find_command : str 

220 The find command to use 

221 path : Path 

222 The root path to search 

223 

224 Returns 

225 ------- 

226 list 

227 List of symbolic link paths, or [path] if none found 

228 """ 

229 find_path = Path(str(settings.NX_INSTRUMENT_DATA_PATH)) / path 

230 cmd = [find_command, str(find_path), "-type", "l", "-xtype", "d", "-print0"] 

231 _logger.info('Running followlinks find via subprocess.run: "%s"', cmd) 

232 out = subprocess.run(cmd, capture_output=True, check=True) 

233 paths = [f.decode() for f in out.stdout.split(b"\x00") if len(f) > 0] 

234 _logger.info('Found the following symlinks: "%s"', paths) 

235 

236 if paths: 

237 _logger.info("find_path is: '%s'", paths) 

238 return paths 

239 return [find_path] 

240 

241 

242def _build_find_command( # noqa: PLR0913 

243 find_command, 

244 find_paths, 

245 dt_from, 

246 dt_to, 

247 extensions, 

248 followlinks, 

249): 

250 """ 

251 Build the find command with all arguments. 

252 

253 Parameters 

254 ---------- 

255 find_command : str 

256 The find command to use 

257 find_paths : list 

258 Paths to search 

259 dt_from : datetime 

260 Start time 

261 dt_to : datetime 

262 End time 

263 extensions : list or None 

264 File extensions to search for 

265 followlinks : bool 

266 Whether to follow symlinks 

267 

268 Returns 

269 ------- 

270 list 

271 Complete find command as list of arguments 

272 """ 

273 cmd = [find_command] + (["-H"] if followlinks else []) 

274 cmd += [str(p) for p in find_paths] 

275 cmd += [ 

276 "-type", 

277 "f", 

278 "-newermt", 

279 dt_from.isoformat(), 

280 "-not", 

281 "-newermt", 

282 dt_to.isoformat(), 

283 ] 

284 

285 # Add extension patterns 

286 if extensions is not None: 

287 cmd += ["("] 

288 for ext in extensions: 

289 cmd += ["-iname", f"*.{ext}", "-o"] 

290 cmd.pop() 

291 cmd += [")"] 

292 

293 # Add ignore patterns (settings already provides a list) 

294 ignore_patterns = settings.NX_IGNORE_PATTERNS 

295 if ignore_patterns: 

296 cmd += ["-and", "("] 

297 for i in ignore_patterns: 

298 cmd += ["-not", "-iname", i, "-and"] 

299 cmd.pop() 

300 cmd += [")"] 

301 

302 cmd += ["-print0"] 

303 return cmd 

304 

305 

306def gnu_find_files_by_mtime( 

307 path: Path, 

308 dt_from: datetime, 

309 dt_to: datetime, 

310 extensions: List[str] | None = None, 

311 *, 

312 followlinks: bool = True, 

313) -> List[Path]: 

314 """ 

315 Find files modified between two times. 

316 

317 Given two timestamps, find files under a path that were 

318 last modified between the two. Uses the system-provided GNU ``find`` 

319 command. In basic testing, this method was found to be approximately 3 times 

320 faster than using :py:meth:`find_files_by_mtime` (which is implemented in 

321 pure Python). 

322 

323 Parameters 

324 ---------- 

325 path 

326 The root path from which to start the search, relative to 

327 the :ref:`NX_INSTRUMENT_DATA_PATH <config-instrument-data-path>` 

328 environment setting. 

329 dt_from 

330 The "starting" point of the search timeframe 

331 dt_to 

332 The "ending" point of the search timeframe 

333 extensions 

334 A list of strings representing the extensions to find. If None, 

335 all files between are found between the two times. 

336 followlinks 

337 Whether to follow symlinks using the ``find`` command via 

338 the ``-H`` command line flag. This is useful when the 

339 :ref:`NX_INSTRUMENT_DATA_PATH <config-instrument-data-path>` is actually a 

340 directory 

341 of symlinks. If this is the case and ``followlinks`` is 

342 ``False``, no files will ever be found because the ``find`` 

343 command will not "dereference" the symbolic links it finds. 

344 See comments in the code for more comments on implementation 

345 of this feature. 

346 

347 Returns 

348 ------- 

349 List[str] 

350 A list of the files that have modification times within the 

351 time range provided (sorted by modification time) 

352 

353 Raises 

354 ------ 

355 RuntimeError 

356 If the find command cannot be found, or running it results in output 

357 to `stderr` 

358 """ 

359 _logger.info("Using GNU `find` to search for files") 

360 

361 # Get appropriate find command 

362 find_command = _get_find_command() 

363 

364 # Adjust datetime objects with tz_offset if naive 

365 dt_from += _tz_offset if dt_from.tzinfo is None else timedelta(0) 

366 dt_to += _tz_offset if dt_to.tzinfo is None else timedelta(0) 

367 

368 # Find symlink directories if following links 

369 if followlinks: 

370 find_paths = _find_symlink_dirs(find_command, path) 

371 else: 

372 find_paths = [Path(str(settings.NX_INSTRUMENT_DATA_PATH)) / path] 

373 

374 # Build and execute find command 

375 cmd = _build_find_command( 

376 find_command, 

377 find_paths, 

378 dt_from, 

379 dt_to, 

380 extensions, 

381 followlinks, 

382 ) 

383 _logger.info('Running via subprocess.run: "%s"', cmd) 

384 _logger.info('Running via subprocess.run (as string): "%s"', " ".join(cmd)) 

385 out = subprocess.run(cmd, capture_output=True, check=True) 

386 

387 # Process results 

388 files = out.stdout.split(b"\x00") 

389 files = [Path(f.decode()) for f in files if len(f) > 0] 

390 files = list(set(files)) 

391 files.sort(key=lambda f: f.stat().st_mtime) 

392 _logger.info("Found %i files", len(files)) 

393 

394 return files 

395 

396 

397def _zero_bytes(fname: Path, bytes_from, bytes_to) -> Path: 

398 """ 

399 Set certain byte locations within a file to zero. 

400 

401 This method helps creating highly-compressible test files. 

402 

403 Parameters 

404 ---------- 

405 fname 

406 bytes_from : int or :obj:`list` of str 

407 The position of the file (in decimal) at which to start zeroing 

408 bytes_to : int or :obj:`list` of str 

409 The position of the file (in decimal) at which to stop zeroing. If 

410 list, must be the same length as list given in ``bytes_from`` 

411 

412 Returns 

413 ------- 

414 new_fname 

415 The modified file that has it's bytes zeroed 

416 """ 

417 filename, ext = fname.stem, fname.suffix 

418 if ext == ".ser": 

419 index = int(filename.split("_")[-1]) 

420 basename = "_".join(filename.split("_")[:-1]) 

421 new_fname = fname.parent / f"{basename}_dataZeroed_{index}{ext}" 

422 else: 

423 new_fname = fname.parent / f"{filename}_dataZeroed{ext}" 

424 copyfile(fname, new_fname) 

425 

426 if isinstance(bytes_from, int): 

427 bytes_from = [bytes_from] 

428 bytes_to = [bytes_to] 

429 

430 with Path(new_fname).open(mode="r+b") as f: 

431 for from_byte, to_byte in zip(bytes_from, bytes_to): 

432 f.seek(from_byte) 

433 f.write(b"\0" * (to_byte - from_byte)) 

434 

435 return new_fname