Coverage for nexusLIMS/builder/record_builder.py: 100%

250 statements  

« prev     ^ index     » next       coverage.py v7.11.3, created at 2026-03-24 05:23 +0000

1"""Build Nexus records from metadata and datasets. 

2 

3Builds NexusLIMS records. 

4 

5Attributes 

6---------- 

7XSD_PATH 

8 A string containing the path to the Nexus Experiment schema file, 

9 which is used to validate XML records built by this module 

10""" 

11 

12import argparse 

13import logging 

14import shutil 

15import sys 

16from dataclasses import dataclass, field 

17from datetime import datetime as dt 

18from datetime import timedelta as td 

19from importlib import import_module, util 

20from io import BytesIO 

21from pathlib import Path 

22from timeit import default_timer 

23from typing import List 

24from uuid import uuid4 

25 

26from lxml import etree 

27from sqlmodel import Session as DBSession 

28from sqlmodel import select 

29 

30from nexusLIMS import version 

31from nexusLIMS.builder.preflight import PreflightError, run_preflight_checks 

32from nexusLIMS.config import settings 

33from nexusLIMS.db.engine import get_engine 

34from nexusLIMS.db.enums import RecordStatus 

35from nexusLIMS.db.models import SessionLog 

36from nexusLIMS.db.session_handler import Session, get_sessions_to_build 

37from nexusLIMS.exporters import export_records, was_successfully_exported 

38from nexusLIMS.extractors import get_registry 

39from nexusLIMS.harvesters import nemo 

40from nexusLIMS.harvesters.nemo import utils as nemo_utils 

41from nexusLIMS.harvesters.reservation_event import ReservationEvent 

42from nexusLIMS.schemas import activity 

43from nexusLIMS.schemas.activity import AcquisitionActivity, cluster_filelist_mtimes 

44from nexusLIMS.utils.files import ( 

45 find_files_by_mtime, 

46 gnu_find_files_by_mtime, 

47) 

48from nexusLIMS.utils.paths import join_instrument_filestore_path 

49from nexusLIMS.utils.time import ( 

50 current_system_tz, 

51 has_delay_passed, 

52) 

53 

54_logger = logging.getLogger(__name__) 

55XSD_PATH: Path = Path(activity.__file__).parent / "nexus-experiment.xsd" 

56 

57 

58@dataclass 

59class RecordBuildResult: 

60 """Result of building a NexusLIMS XML record. 

61 

62 Parameters 

63 ---------- 

64 xml_text 

65 The serialized XML record string 

66 activities 

67 The AcquisitionActivity objects built during record construction 

68 reservation_event 

69 The ReservationEvent used to populate the record header 

70 """ 

71 

72 xml_text: str 

73 activities: List[AcquisitionActivity] = field(default_factory=list) 

74 reservation_event: ReservationEvent | None = None 

75 

76 

77def build_record( 

78 session: Session, 

79 sample_id: str | None = None, 

80 *, 

81 generate_previews: bool = True, 

82) -> RecordBuildResult: 

83 """ 

84 Build a NexusLIMS XML record of an Experiment. 

85 

86 Construct an XML document conforming to the NexusLIMS schema from a 

87 directory containing microscopy data files. Accepts either a 

88 :py:class:`~nexusLIMS.db.session_handler.Session` object or an Instrument 

89 and date range (for backwards compatibility). For calendar parsing, 

90 currently no logic is implemented for a query that returns multiple records. 

91 

92 Parameters 

93 ---------- 

94 session 

95 A :py:class:`~nexusLIMS.db.session_handler.Session` or ``None``. If 

96 a value is provided, ``instrument``, ``dt_from``, ``dt_to`` and ``user`` 

97 will be ignored, and the values from the Session object will be used 

98 instead 

99 sample_id 

100 A unique identifier pointing to a sample identifier for data 

101 collected in this record. If None, a UUIDv4 will be generated 

102 generate_previews 

103 Whether to create the preview thumbnail images 

104 

105 Returns 

106 ------- 

107 result : RecordBuildResult 

108 A :class:`RecordBuildResult` containing the XML string, activities, 

109 and reservation event 

110 """ 

111 if sample_id is None: 

112 sample_id = str(uuid4()) 

113 

114 # setup XML namespaces 

115 nx_namespace = "https://data.nist.gov/od/dm/nexus/experiment/v1.0" 

116 xsi_namespace = "http://www.w3.org/2001/XMLSchema-instance" 

117 ns_map = {None: nx_namespace, "xsi": xsi_namespace, "nx": nx_namespace} 

118 xml = etree.Element("Experiment", nsmap=ns_map) 

119 

120 _logger.info( 

121 "Getting calendar events with instrument: %s, from %s to %s, " 

122 "user: %s; using harvester: %s", 

123 session.instrument.name, 

124 session.dt_from.isoformat(), 

125 session.dt_to.isoformat(), 

126 session.user, 

127 session.instrument.harvester, 

128 ) 

129 # this returns a nexusLIMS.harvesters.reservation_event.ReservationEvent 

130 res_event = get_reservation_event(session) 

131 

132 output = res_event.as_xml() 

133 

134 for child in output: 

135 xml.append(child) 

136 

137 _logger.info( 

138 "Building acquisition activities for timespan from %s to %s", 

139 session.dt_from.isoformat(), 

140 session.dt_to.isoformat(), 

141 ) 

142 activities = build_acq_activities( 

143 session.instrument, 

144 session.dt_from, 

145 session.dt_to, 

146 generate_previews, 

147 ) 

148 for i, this_activity in enumerate(activities): 

149 a_xml = this_activity.as_xml(i, sample_id) 

150 xml.append(a_xml) 

151 

152 xml_text = etree.tostring( 

153 xml, 

154 xml_declaration=True, 

155 encoding="UTF-8", 

156 pretty_print=True, 

157 ).decode() 

158 return RecordBuildResult( 

159 xml_text=xml_text, 

160 activities=activities, 

161 reservation_event=res_event, 

162 ) 

163 

164 

165def get_reservation_event(session: Session) -> ReservationEvent: 

166 """ 

167 Get a ReservationEvent representation of a Session. 

168 

169 Handles the abstraction of choosing the right "version" of the 

170 ``res_event_from_session`` method from the harvester specified in the 

171 instrument database. This allows for one consistent function name to call 

172 a different method depending on which harvester is specified for each 

173 instrument (currently just NEMO). 

174 

175 Parameters 

176 ---------- 

177 session 

178 The :py:class:`~nexusLIMS.db.session_handler.Session` for which to 

179 fetch a matching 

180 :py:class:`~nexusLIMS.harvesters.reservation_event.ReservationEvent` from 

181 the relevant harvester 

182 

183 Returns 

184 ------- 

185 res_event : ~nexusLIMS.harvesters.reservation_event.ReservationEvent 

186 A :py:class:`~nexusLIMS.harvesters.reservation_event.ReservationEvent` 

187 representation of a reservation that matches the instrument and timespan 

188 specified in ``session``. 

189 """ 

190 # try to find module and raise error if not found: 

191 if ( 

192 util.find_spec(f".{session.instrument.harvester}", "nexusLIMS.harvesters") 

193 is None 

194 ): 

195 msg = ( 

196 f"Harvester {session.instrument.harvester} not found in " 

197 "nexusLIMS.harvesters" 

198 ) 

199 raise NotImplementedError(msg) 

200 

201 # use import_module to choose the correct harvester based on the instrument 

202 harvester = import_module( 

203 f".{session.instrument.harvester}", 

204 "nexusLIMS.harvesters", 

205 ) 

206 # for PyCharm typing, explicitly specify what modules may be in `harvester` 

207 # harvester: Union[nemo] # noqa: ERA001 

208 if not hasattr(harvester, "res_event_from_session"): 

209 msg = ( 

210 f"res_event_from_session has not been implemented for {harvester}, which " 

211 f"is required to use this method." 

212 ) 

213 raise NotImplementedError(msg) 

214 

215 return harvester.res_event_from_session(session) 

216 

217 

218def build_acq_activities(instrument, dt_from, dt_to, generate_previews): 

219 """ 

220 Build an XML string representation of each AcquisitionActivity for a session. 

221 

222 This includes setup parameters and metadata 

223 associated with each dataset obtained during a microscopy session. Unique 

224 AcquisitionActivities are delimited via clustering of file collection 

225 time to detect "long" breaks during a session. 

226 

227 Parameters 

228 ---------- 

229 instrument : :py:class:`~nexusLIMS.db.models.Instrument` 

230 One of the NexusLIMS instruments contained in the 

231 :py:attr:`~nexusLIMS.instruments.instrument_db` database. 

232 Controls what instrument calendar is used to get events. 

233 dt_from : datetime.datetime 

234 The starting timestamp that will be used to determine which files go 

235 in this record 

236 dt_to : datetime.datetime 

237 The ending timestamp used to determine the last point in time for 

238 which files should be associated with this record 

239 generate_previews : bool 

240 Whether or not to create the preview thumbnail images 

241 

242 Returns 

243 ------- 

244 activities : :obj:`list` of 

245 :obj:`~nexusLIMS.schemas.activity.AcquisitionActivity`: 

246 The list of :py:class:`~nexusLIMS.schemas.activity.AcquisitionActivity` 

247 objects generated for the record 

248 """ 

249 logging.getLogger("hyperspy.io_plugins.digital_micrograph").setLevel( 

250 logging.WARNING, 

251 ) 

252 

253 start_timer = default_timer() 

254 path = join_instrument_filestore_path(instrument.filestore_path) 

255 # find the files to be included (list of Paths) 

256 files = get_files(path, dt_from, dt_to) 

257 

258 _logger.info( 

259 "Found %i files in %.2f seconds", 

260 len(files), 

261 default_timer() - start_timer, 

262 ) 

263 

264 # raise error if no file found were found 

265 if len(files) == 0: 

266 msg = "No files found in this time range" 

267 raise FileNotFoundError(msg) 

268 

269 # get the timestamp boundaries of acquisition activities 

270 aa_bounds = cluster_filelist_mtimes(files) 

271 

272 # add the last file's modification time to the boundaries list to make 

273 # the loop below easier to process 

274 aa_bounds.append(files[-1].stat().st_mtime) 

275 

276 activities: List[AcquisitionActivity | None] = [None] * len(aa_bounds) 

277 

278 i = 0 

279 aa_idx = 0 

280 while i < len(files): 

281 f = files[i] 

282 mtime = f.stat().st_mtime 

283 

284 # check this file's mtime, if it is less than this iteration's value 

285 # in the AA bounds, then it belongs to this iteration's AA 

286 # if not, then we should move to the next activity 

287 if mtime <= aa_bounds[aa_idx]: 

288 # if current activity index is None, we need to start a new AA: 

289 if activities[aa_idx] is None: 

290 activities[aa_idx] = AcquisitionActivity( 

291 start=dt.fromtimestamp(mtime, tz=instrument.timezone), 

292 ) 

293 

294 # add this file to the AA 

295 _logger.info( 

296 "Adding file %i/%i %s to activity %i", 

297 i, 

298 len(files), 

299 str(f).replace(str(settings.NX_INSTRUMENT_DATA_PATH), "").strip("/"), 

300 aa_idx, 

301 ) 

302 activities[aa_idx].add_file(fname=f, generate_preview=generate_previews) 

303 # assume this file is the last one in the activity (this will be 

304 # true on the last iteration where mtime is <= to the 

305 # aa_bounds value) 

306 activities[aa_idx].end = dt.fromtimestamp(mtime, tz=instrument.timezone) 

307 i += 1 

308 else: 

309 # this file's mtime is after the boundary and is thus part of the 

310 # next activity, so increment AA counter and reprocess file (do 

311 # not increment i) 

312 aa_idx += 1 

313 

314 # Remove any "None" activities from list 

315 activities: List[AcquisitionActivity] = [a for a in activities if a is not None] 

316 

317 _logger.info("Finished detecting activities") 

318 for i, this_activity in enumerate(activities): 

319 _logger.info("Activity %i: storing setup parameters", i) 

320 this_activity.store_setup_params() 

321 _logger.info("Activity %i: storing unique metadata values", i) 

322 this_activity.store_unique_metadata() 

323 

324 return activities 

325 

326 

327def get_files( 

328 path: Path, 

329 dt_from: dt, 

330 dt_to: dt, 

331) -> List[Path]: 

332 """ 

333 Get files under a path that were last modified between the two given timestamps. 

334 

335 Parameters 

336 ---------- 

337 path 

338 The file path in which to search for files 

339 dt_from : datetime.datetime 

340 The starting timestamp that will be used to determine which files go 

341 in this record 

342 dt_to : datetime.datetime 

343 The ending timestamp used to determine the last point in time for 

344 which files should be associated with this record 

345 

346 Returns 

347 ------- 

348 files : List[pathlib.Path] 

349 A list of the files that have modification times within the 

350 time range provided (sorted by modification time) 

351 """ 

352 _logger.info("Starting new file-finding in %s", path) 

353 

354 # read file finding strategy from settings 

355 strategy = settings.NX_FILE_STRATEGY.lower() 

356 if strategy not in ["inclusive", "exclusive"]: 

357 _logger.warning( 

358 'File finding strategy (setting "NX_FILE_STRATEGY") had ' 

359 'an unexpected value: "%s". Setting value to "exclusive".', 

360 strategy, 

361 ) 

362 strategy = "exclusive" 

363 

364 # Get supported extensions from the registry 

365 # For exclusive strategy, only use extensions with specialized extractors 

366 # (exclude extensions that only have the fallback basic_file_info_extractor) 

367 registry = get_registry() 

368 supported_extensions = registry.get_supported_extensions(exclude_fallback=True) 

369 extension_arg = None if strategy == "inclusive" else supported_extensions 

370 

371 try: 

372 files = gnu_find_files_by_mtime(path, dt_from, dt_to, extensions=extension_arg) 

373 

374 # exclude following from coverage because find_files_by_mtime is deprecated as of 

375 # 1.2.0 and does not support extensions at all (like the above method) 

376 except (NotImplementedError, RuntimeError) as exception: # pragma: no cover 

377 _logger.warning( 

378 "GNU find returned error: %s\nFalling back to pure Python implementation", 

379 exception, 

380 ) 

381 files = find_files_by_mtime(path, dt_from, dt_to) 

382 return files 

383 

384 

385def dump_record( 

386 session: Session, 

387 filename: Path | None = None, 

388 *, 

389 generate_previews: bool = True, 

390) -> Path: 

391 """ 

392 Dump a record to an XML file. 

393 

394 Writes an XML record for a :py:class:`~nexusLIMS.db.session_handler.Session` 

395 composed of information pulled from the appropriate reservation system 

396 as well as metadata extracted from the microscope data (e.g. dm3 or 

397 other files). 

398 

399 Parameters 

400 ---------- 

401 session : nexusLIMS.db.session_handler.Session 

402 A :py:class:`~nexusLIMS.db.session_handler.Session` object 

403 representing a unit of time on one of the instruments known to NexusLIMS 

404 filename : typing.Optional[pathlib.Path] 

405 The filename of the dumped xml file to write. If None, a default name 

406 will be generated from the other parameters 

407 generate_previews : bool 

408 Whether or not to create the preview thumbnail images 

409 

410 Returns 

411 ------- 

412 filename : pathlib.Path 

413 The name of the created record that was returned 

414 """ 

415 if filename is None: 

416 filename = Path( 

417 "compiled_record" 

418 + (f"_{session.instrument.name}" if session.instrument else "") 

419 + session.dt_from.strftime("_%Y-%m-%d") 

420 + (f"_{session.user}" if session.user else "") 

421 + ".xml", 

422 ) 

423 filename.parent.mkdir(parents=True, exist_ok=True) 

424 with filename.open(mode="w", encoding="utf-8") as f: 

425 result = build_record(session=session, generate_previews=generate_previews) 

426 f.write(result.xml_text) 

427 return filename 

428 

429 

430def validate_record(xml_filename): 

431 """ 

432 Validate an .xml record against the Nexus schema. 

433 

434 Parameters 

435 ---------- 

436 xml_filename : str or io.StringIO or io.BytesIO 

437 The path to the xml file to be validated (can also be a file-like 

438 object like StringIO or BytesIO) 

439 

440 Returns 

441 ------- 

442 validates : bool 

443 Whether the record validates against the Nexus schema 

444 """ 

445 xsd_doc = etree.parse(XSD_PATH) 

446 xml_schema = etree.XMLSchema(xsd_doc) 

447 xml_doc = etree.parse(xml_filename) 

448 

449 return xml_schema.validate(xml_doc) 

450 

451 

452def build_new_session_records( 

453 generate_previews: bool = True, # noqa: FBT002, FBT001 

454) -> tuple[ 

455 List[Path], 

456 List[Session], 

457 List[List[AcquisitionActivity]], 

458 List[ReservationEvent | None], 

459]: 

460 """ 

461 Build records for new sessions from the database. 

462 

463 Uses :py:func:`~nexusLIMS.db.session_handler.get_sessions_to_build`) and builds 

464 those records using :py:func:`build_record` (saving to the NexusLIMS folder), and 

465 returns a list of resulting .xml files to be uploaded to CDCS. 

466 

467 Returns 

468 ------- 

469 xml_files : typing.List[pathlib.Path] 

470 A list of record files that were successfully built and saved to 

471 centralized storage 

472 sessions_built : typing.List[Session] 

473 Corresponding Session objects for each built XML file (same length and order) 

474 activities_built : typing.List[typing.List[AcquisitionActivity]] 

475 Corresponding AcquisitionActivity lists for each built session 

476 res_events_built : typing.List[ReservationEvent | None] 

477 Corresponding ReservationEvent for each built session 

478 """ 

479 # get the list of sessions with 'TO_BE_BUILT' status; does not fetch new 

480 # usage events from any NEMO instances; 

481 # nexusLIMS.harvesters.nemo.add_all_usage_events_to_db() must be used 

482 # first to do so 

483 sessions = get_sessions_to_build() 

484 if not sessions: 

485 sys.exit("No 'TO_BE_BUILT' sessions were found. Exiting.") 

486 xml_files = [] 

487 sessions_built = [] 

488 activities_built = [] 

489 res_events_built = [] 

490 # loop through the sessions 

491 for s in sessions: 

492 try: 

493 db_row = s.insert_record_generation_event() 

494 result = build_record(session=s, generate_previews=generate_previews) 

495 record_text = result.xml_text 

496 except ( # pylint: disable=broad-exception-caught 

497 FileNotFoundError, 

498 Exception, 

499 ) as exception: 

500 if isinstance(exception, FileNotFoundError): 

501 # if no files were found for this session log, mark it as so in 

502 # the database 

503 path = join_instrument_filestore_path(s.instrument.filestore_path) 

504 _logger.warning( 

505 "No files found in %s between %s and %s", 

506 path, 

507 s.dt_from.isoformat(), 

508 s.dt_to.isoformat(), 

509 ) 

510 

511 if has_delay_passed(s.dt_to): 

512 _logger.warning( 

513 'Marking %s as "NO_FILES_FOUND"', 

514 s.session_identifier, 

515 ) 

516 s.update_session_status(RecordStatus.NO_FILES_FOUND) 

517 else: 

518 # if the delay hasn't passed, log and delete the record 

519 # generation event we inserted previously 

520 _logger.warning( 

521 "Configured record building delay has not passed; " 

522 "Removing previously inserted RECORD_GENERATION row for %s", 

523 s.session_identifier, 

524 ) 

525 # Delete the RECORD_GENERATION log using SQLModel 

526 with DBSession(get_engine()) as db_session: 

527 statement = select(SessionLog).where( 

528 SessionLog.id_session_log == db_row["id_session_log"] 

529 ) 

530 log = db_session.exec(statement).first() 

531 if log: 

532 db_session.delete(log) 

533 db_session.commit() 

534 elif isinstance(exception, nemo.exceptions.NoDataConsentError): 

535 _logger.warning( 

536 "User requested this session not be harvested, " 

537 "so no record was built. %s", 

538 exception, 

539 ) 

540 _logger.info('Marking %s as "NO_CONSENT"', s.session_identifier) 

541 s.update_session_status(RecordStatus.NO_CONSENT) 

542 elif isinstance(exception, nemo.exceptions.NoMatchingReservationError): 

543 _logger.warning( 

544 "No matching reservation found for this session, " 

545 "so assuming no consent was given. %s", 

546 exception, 

547 ) 

548 _logger.info('Marking %s as "NO_RESERVATION"', s.session_identifier) 

549 s.update_session_status(RecordStatus.NO_RESERVATION) 

550 else: 

551 _logger.exception("Could not generate record text") 

552 _logger.exception('Marking %s as "ERROR"', s.session_identifier) 

553 s.update_session_status(RecordStatus.ERROR) 

554 else: 

555 xml_files, sessions_built, activities_built, res_events_built = ( 

556 _record_validation_flow( 

557 record_text, 

558 s, 

559 xml_files, 

560 sessions_built, 

561 result.activities, 

562 result.reservation_event, 

563 activities_built, 

564 res_events_built, 

565 ) 

566 ) 

567 

568 return xml_files, sessions_built, activities_built, res_events_built 

569 

570 

571def _record_validation_flow( # noqa: PLR0913 

572 record_text, 

573 s, 

574 xml_files, 

575 sessions_built, 

576 result_activities, 

577 result_res_event, 

578 activities_built, 

579 res_events_built, 

580) -> tuple[ 

581 List[Path], 

582 List[Session], 

583 List[List[AcquisitionActivity]], 

584 List[ReservationEvent | None], 

585]: 

586 if validate_record(BytesIO(bytes(record_text, "UTF-8"))): 

587 _logger.info("Validated newly generated record") 

588 # generate filename for saved record and make sure path exists 

589 if s.instrument.harvester == "nemo": 

590 # for NEMO session_identifier is a URL of usage_event 

591 unique_suffix = f"{nemo_utils.id_from_url(s.session_identifier)}" 

592 else: # pragma: no cover 

593 # assume session_identifier is a UUID 

594 unique_suffix = f"{s.session_identifier.split('-')[0]}" 

595 basename = ( 

596 f"{s.dt_from.strftime('%Y-%m-%d')}_{s.instrument.name}_{unique_suffix}.xml" 

597 ) 

598 filename = settings.records_dir_path / basename 

599 filename.parent.mkdir(parents=True, exist_ok=True) 

600 # write the record to disk and append to list of files generated 

601 with filename.open(mode="w", encoding="utf-8") as f: 

602 f.write(record_text) 

603 _logger.info("Wrote record to %s", filename) 

604 xml_files.append(Path(filename)) 

605 sessions_built.append(s) 

606 activities_built.append(result_activities) 

607 res_events_built.append(result_res_event) 

608 # Note: Session status will be updated after export attempt 

609 _logger.info( 

610 "Built record for %s, will export to destinations", s.session_identifier 

611 ) 

612 else: 

613 _logger.error('Marking %s as "ERROR"', s.session_identifier) 

614 _logger.error("Could not validate record, did not write to disk") 

615 s.update_session_status(RecordStatus.ERROR) 

616 

617 return xml_files, sessions_built, activities_built, res_events_built 

618 

619 

620def process_new_records( # noqa: PLR0912, PLR0915 

621 *, 

622 dry_run: bool = False, 

623 dt_from: dt | None = None, 

624 dt_to: dt | None = None, 

625): 

626 """ 

627 Process new records (this is the main entrypoint to the record builder). 

628 

629 Using :py:meth:`build_new_session_records()`, process new records, 

630 save them to disk, and upload them to the NexusLIMS CDCS instance. 

631 

632 Parameters 

633 ---------- 

634 dry_run 

635 Controls whether or not records will actually be built. If ``True``, 

636 session harvesting and file finding will be performed, but no preview 

637 images or records will be built. Can be used to see what _would_ happen 

638 if ``dry_run`` is set to ``False``. 

639 dt_from 

640 The point in time after which sessions will be fetched. If ``None``, 

641 no date filtering will be performed. This parameter currently only 

642 has an effect for the NEMO harvester. 

643 dt_to 

644 The point in time before which sessions will be fetched. If ``None``, 

645 no date filtering will be performed. This parameter currently only 

646 has an effect for the NEMO harvester. 

647 """ 

648 results = run_preflight_checks(dry_run=dry_run) 

649 for r in results: 

650 if r.passed: 

651 level = logging.DEBUG 

652 else: 

653 level = logging.ERROR if r.severity == "error" else logging.WARNING 

654 status = "PASS" if r.passed else "FAIL" 

655 _logger.log(level, "[preflight] %s: %s — %s", r.name, status, r.message) 

656 

657 failed_errors = [r for r in results if not r.passed and r.severity == "error"] 

658 if failed_errors: 

659 raise PreflightError(failed_errors) 

660 

661 if dry_run: 

662 _logger.info("!!DRY RUN!! Only finding files, not building records") 

663 # get 'TO_BE_BUILT' sessions from the database 

664 sessions = get_sessions_to_build() 

665 # get Session objects for NEMO usage events without adding to DB 

666 # DONE: NEMO usage events fetched should take a time range; 

667 sessions += nemo_utils.get_usage_events_as_sessions( 

668 dt_from=dt_from, 

669 dt_to=dt_to, 

670 ) 

671 if not sessions: 

672 _logger.warning("No 'TO_BE_BUILT' sessions were found. Exiting.") 

673 return 

674 for s in sessions: 

675 # at this point, sessions can be from any type of harvester 

676 _logger.info("") 

677 _logger.info("") 

678 try: 

679 get_reservation_event(s) 

680 except nemo.exceptions.NoDataConsentError as e: 

681 _logger.warning( 

682 "User requested this session not be harvested, " 

683 "skipping dry run for this session. %s", 

684 e, 

685 ) 

686 continue 

687 except nemo.exceptions.NoMatchingReservationError as e: 

688 _logger.warning( 

689 "No matching reservation found for this session, " 

690 "skipping dry run for this session. %s", 

691 e, 

692 ) 

693 continue 

694 dry_run_file_find(s) 

695 else: 

696 nemo_utils.add_all_usage_events_to_db(dt_from=dt_from, dt_to=dt_to) 

697 xml_files, sessions_built, activities_built, res_events_built = ( 

698 build_new_session_records() 

699 ) 

700 if len(xml_files) == 0: 

701 _logger.warning("No XML files built, so no files exported") 

702 else: 

703 # Export records to all configured destinations 

704 export_results = export_records( 

705 xml_files, sessions_built, activities_built, res_events_built 

706 ) 

707 

708 # Update session status based on export results 

709 sessions_by_file = dict(zip(xml_files, sessions_built, strict=True)) 

710 for xml_file, session in sessions_by_file.items(): 

711 if was_successfully_exported(xml_file, export_results): 

712 session.update_session_status(RecordStatus.COMPLETED) 

713 _logger.info( 

714 'Marking %s as "COMPLETED"', session.session_identifier 

715 ) 

716 else: 

717 session.update_session_status(RecordStatus.BUILT_NOT_EXPORTED) 

718 _logger.error( 

719 'All exports failed for %s, marking as "BUILT_NOT_EXPORTED"', 

720 session.session_identifier, 

721 ) 

722 

723 # Move successfully exported files to uploaded directory 

724 files_exported = [ 

725 f for f in xml_files if was_successfully_exported(f, export_results) 

726 ] 

727 for f in files_exported: 

728 uploaded_dir = settings.records_dir_path / "uploaded" 

729 Path(uploaded_dir).mkdir(parents=True, exist_ok=True) 

730 

731 shutil.copy2(f, uploaded_dir) 

732 Path(f).unlink() 

733 

734 files_not_exported = [f for f in xml_files if f not in files_exported] 

735 if len(files_not_exported) > 0: 

736 _logger.error( 

737 "Some record files were not exported: %s", 

738 files_not_exported, 

739 ) 

740 return 

741 

742 

743def dry_run_file_find(s: Session) -> List[Path]: 

744 """ 

745 Get the files that *would* be included for a record built for the supplied session. 

746 

747 Parameters 

748 ---------- 

749 s : nexusLIMS.db.session_handler.Session 

750 A session read from the database 

751 

752 Returns 

753 ------- 

754 files : typing.List[pathlib.Path] 

755 A list of Paths containing the files that would be included for the 

756 record of this session (if it were not a dry run) 

757 """ 

758 path = join_instrument_filestore_path(s.instrument.filestore_path) 

759 _logger.info( 

760 "Searching for files for %s in %s between %s and %s", 

761 s.instrument.name, 

762 path, 

763 s.dt_from.isoformat(), 

764 s.dt_to.isoformat(), 

765 ) 

766 files = get_files(path, s.dt_from, s.dt_to) 

767 

768 _logger.info("Results for %s on %s:", s.session_identifier, s.instrument) 

769 if len(files) == 0: 

770 _logger.warning("No files found for this session") 

771 else: 

772 _logger.info("Found %i files for this session", len(files)) 

773 for f in files: 

774 mtime = dt.fromtimestamp( 

775 f.stat().st_mtime, 

776 tz=s.instrument.timezone, 

777 ).isoformat() 

778 _logger.info("*mtime* %s - %s", mtime, f) 

779 return files 

780 

781 

782if __name__ == "__main__": # pragma: no cover 

783 # If running as a module, process new records (with some control flags) 

784 from nexusLIMS.utils import setup_loggers 

785 

786 parser = argparse.ArgumentParser() 

787 

788 # Optional argument flag which defaults to False 

789 parser.add_argument( 

790 "-n", 

791 "--dry-run", 

792 action="store_true", 

793 dest="dry_run", 

794 default=False, 

795 ) 

796 

797 # Optional verbosity counter (eg. -v, -vv, -vvv, etc.) 

798 parser.add_argument( 

799 "-v", 

800 "--verbose", 

801 action="count", 

802 default=0, 

803 help="Verbosity (-v, -vv); corresponds to python logging level. " 

804 "0 is WARN, 1 (-v) is INFO, 2 (-vv) is DEBUG. ERROR and " 

805 "CRITICAL are always shown.", 

806 ) 

807 

808 # Specify output of "--version" 

809 parser.add_argument( 

810 "--version", 

811 action="version", 

812 version=f"%(prog)s (version {version})", 

813 ) 

814 

815 args = parser.parse_args() 

816 

817 # set up logging 

818 logging_levels = {0: logging.WARNING, 1: logging.INFO, 2: logging.DEBUG} 

819 

820 if args.dry_run and args.verbose <= 0: 

821 _logger.warning('Increasing verbosity so output of "dry-run" will be shown') 

822 args.verbose = 1 

823 

824 setup_loggers(logging_levels[args.verbose]) 

825 # when running as script, __name__ is "__main__", so we need to set level 

826 # explicitly since the setup_loggers function won't find it 

827 _logger.setLevel(logging_levels[args.verbose]) 

828 

829 # by default only fetch the last week's worth of data from the NEMO 

830 # harvesters to speed things up 

831 process_new_records( 

832 dry_run=args.dry_run, 

833 dt_from=dt.now(tz=current_system_tz()) - td(weeks=1), 

834 )