Source code for nexusLIMS.cli.process_records

# ruff: noqa: FBT001
"""
CLI command to process new NexusLIMS records.

This module provides a command-line interface for running the NexusLIMS record
builder with file locking, timestamped logging, and email notifications.
It replaces the functionality previously provided by process_new_records.sh.

Usage
-----

.. code-block:: bash

    nexuslims build-records [OPTIONS]

Options
-------

.. code-block:: bash

    -n, --dry-run   : Dry run mode (find files without building records)
    -v, --verbose   : Increase verbosity (-v for INFO, -vv for DEBUG)
    --from <date>   : Start date for filtering (ISO format). Defaults to 1 week ago.
                      Use "none" to disable lower bound.
    --to <date>     : End date for filtering (ISO format). Omit to disable upper bound.
    --version       : Show version and exit
    --help          : Show help message and exit
"""

import json
import logging
import re
import smtplib
import sys
from datetime import datetime, timedelta
from email.mime.text import MIMEText
from pathlib import Path

import click
from filelock import FileLock, Timeout
from rich.console import Console
from rich.logging import RichHandler

from nexusLIMS.builder.preflight import PreflightError
from nexusLIMS.cli import _format_version

# Heavy NexusLIMS imports are lazy-loaded inside functions to speed up --help/--version
# See: setup_file_logging(), send_error_notification(), and main()

logger = logging.getLogger(__name__)
console = Console()


# Error patterns to search for in log files
ERROR_PATTERNS = [
    re.compile(r"\bcritical\b", re.IGNORECASE),
    re.compile(r"\berror\b", re.IGNORECASE),
    re.compile(r"\bfatal\b", re.IGNORECASE),
]

# Patterns to exclude from error detection (known non-critical errors)
EXCLUDE_PATTERNS = [
    "Temporary failure in name resolution",
    "NoDataConsentError",
    "NoMatchingReservationError",
]


[docs] def setup_file_logging(dry_run: bool = False) -> tuple[Path, logging.FileHandler]: # noqa: FBT002 """ Set up file logging with timestamped log file. Creates a log directory structure based on the current date and adds a FileHandler to the root logger. Log files are named with timestamps in the format YYYYMMDD-HHMM.log (or YYYYMMDD-HHMM_dryrun.log for dry runs). Note: This function removes any existing FileHandlers from the root logger before adding the new handler to prevent handler accumulation across multiple invocations (important for testing scenarios where the same process runs multiple CLI commands). Parameters ---------- dry_run : bool If True, append '_dryrun' to the log filename Returns ------- tuple[pathlib.Path, logging.FileHandler] A tuple containing: - Path to the created log file - The FileHandler instance that was added to the root logger Raises ------ OSError If log directory creation fails """ from nexusLIMS.config import settings # noqa: PLC0415 # Remove any existing FileHandlers from root logger to prevent accumulation # This is critical when the function is called multiple times (e.g., in tests) # to ensure log messages go only to the current log file for handler in logging.root.handlers[ : ]: # Use slice to avoid modifying list during iteration if isinstance(handler, logging.FileHandler): logging.root.removeHandler(handler) handler.close() now = datetime.now().astimezone() year = now.strftime("%Y") month = now.strftime("%m") day = now.strftime("%d") # Include seconds in timestamp to prevent collisions when multiple runs # happen in same minute timestamp = now.strftime("%Y%m%d-%H%M%S") # Create log directory structure: logs/YYYY/MM/DD/ log_dir = settings.log_dir_path / year / month / day log_dir.mkdir(parents=True, exist_ok=True) # Create log filename suffix = "_dryrun" if dry_run else "" log_file = log_dir / f"{timestamp}{suffix}.log" # Add file handler to root logger file_handler = logging.FileHandler(log_file) file_handler.setFormatter( logging.Formatter("%(asctime)s %(name)s %(levelname)s: %(message)s") ) logging.root.addHandler(file_handler) logger.info("Logging to file: %s", log_file) return log_file, file_handler
[docs] def check_log_for_errors(log_path: Path) -> tuple[bool, list[str]]: """ Check log file for error patterns. Reads the log file and searches for error patterns (critical, error, fatal) while excluding known non-critical error patterns. Parameters ---------- log_path : pathlib.Path Path to the log file to check Returns ------- tuple[bool, list[str]] A tuple containing: - bool: True if errors were found, False otherwise - list[str]: List of error pattern names that were found Raises ------ FileNotFoundError If the log file doesn't exist """ if not log_path.exists(): logger.error("Log file not found: %s", log_path) return False, [] try: log_contents = log_path.read_text() except OSError: logger.exception("Failed to read log file: %s", log_path) return False, [] # Check if any exclude patterns are present for exclude_pattern in EXCLUDE_PATTERNS: if exclude_pattern in log_contents: logger.debug("Found excluded pattern: %s", exclude_pattern) # If we find an excluded pattern, don't send email return False, [] # Check for error patterns found_patterns = [] for pattern in ERROR_PATTERNS: if pattern.search(log_contents): pattern_name = pattern.pattern.strip("\\b").lower() found_patterns.append(pattern_name) logger.debug("Found error pattern: %s", pattern_name) has_errors = len(found_patterns) > 0 return has_errors, found_patterns
[docs] def send_error_notification(log_path: Path, found_patterns: list[str]) -> None: """ Send error notification email. Sends an email notification with the log file contents when errors are detected. Email sending is skipped if email configuration is not available. Parameters ---------- log_path : pathlib.Path Path to the log file to include in the email found_patterns : list[str] List of error pattern names that were found in the log Returns ------- None This function doesn't return anything. Errors are logged but not raised. Notes ----- - Email sending is gracefully skipped if configuration is missing - Any email sending errors are logged but don't cause the function to fail - Uses SMTP with TLS encryption if configured """ from nexusLIMS.config import settings # noqa: PLC0415 # Check if email is configured email_config = settings.email_config() if email_config is None: logger.info("Email not configured, skipping notification") return logger.info("Sending error notification email") try: # Read log file contents log_contents = log_path.read_text() # Build email message subject = "ERROR in NexusLIMS record builder" body = f"""There was an error (or unusual output) in the record builder. Here is the output of {log_path}. To help you debug, the following "bad" strings were found in the output: {", ".join(found_patterns)} {log_contents}""" msg = MIMEText(body) msg["Subject"] = subject msg["From"] = email_config.sender msg["To"] = ", ".join(email_config.recipients) # Send email via SMTP smtp_class = smtplib.SMTP with smtp_class( email_config.smtp_host, email_config.smtp_port, timeout=30 ) as server: if email_config.use_tls: server.starttls() # Authenticate if credentials provided if email_config.smtp_username and email_config.smtp_password: server.login(email_config.smtp_username, email_config.smtp_password) # Send message server.send_message(msg) logger.info("Error notification email sent successfully") except smtplib.SMTPException: logger.exception("Failed to send error notification email") except OSError: logger.exception("Failed to read log file for email: %s", log_path) except Exception: logger.exception("Unexpected error while sending email")
def _get_log_level(verbose: int) -> int: """ Convert verbose count to logging level. Parameters ---------- verbose : int Verbosity level (0 = WARNING, 1 = INFO, 2+ = DEBUG) Returns ------- int Logging level constant from the logging module """ if verbose == 0: return logging.WARNING if verbose == 1: return logging.INFO return logging.DEBUG def _setup_logging(log_level: int, dry_run: bool) -> tuple[Path, logging.FileHandler]: """ Configure console and file logging. Parameters ---------- log_level : int Logging level constant from the logging module dry_run : bool If True, append '_dryrun' to the log filename Returns ------- tuple[Path, logging.FileHandler] Tuple of (log_file_path, file_handler) Raises ------ OSError If file logging setup fails SystemExit If file logging setup fails (exits with code 1) """ from nexusLIMS.utils.logging import setup_loggers # noqa: PLC0415 # Setup console logging with rich logging.basicConfig( level=log_level, format="%(message)s", handlers=[RichHandler(console=console, rich_tracebacks=True)], ) # Setup all nexusLIMS loggers setup_loggers(log_level) # Setup file logging try: return setup_file_logging(dry_run) except OSError: logger.exception("Failed to setup file logging") console.print("[bold red]Failed to setup file logging[/bold red]") sys.exit(1) def _run_with_lock( dry_run: bool, dt_from: datetime | None, dt_to: datetime | None ) -> None: """ Run the record builder with file locking. Parameters ---------- dry_run : bool If True, run in dry-run mode (find files without building records) dt_from : datetime | None The point in time after which sessions will be fetched dt_to : datetime | None The point in time before which sessions will be fetched Returns ------- None Raises ------ SystemExit If lock cannot be acquired (another instance is running) """ from nexusLIMS.builder import record_builder # noqa: PLC0415 from nexusLIMS.config import settings # noqa: PLC0415 lock_file = settings.lock_file_path lock = FileLock(str(lock_file), timeout=0) try: logger.info("Attempting to acquire lock at %s", lock_file) with lock: logger.info("Lock acquired successfully") try: record_builder.process_new_records( dry_run=dry_run, dt_from=dt_from, dt_to=dt_to ) logger.info("Record processing completed") except PreflightError as e: logger.error( # noqa: TRY400 "Preflight checks failed — record builder aborted" ) for check in e.failed_checks: logger.error(" [%s] %s", check.name, check.message) # noqa: TRY400 except Exception: logger.exception("Error during record processing") except Timeout: logger.warning( "Lock file already exists at %s - another instance is running", lock_file, ) console.print(f"[yellow]Lock file already exists at {lock_file}[/yellow]") console.print("[yellow]Another instance is already running. Exiting.[/yellow]") sys.exit(0) def _parse_date_argument( date_str: str | None, *, inclusive_end: bool = False ) -> datetime | None: """ Parse a date string argument into a datetime object. Parameters ---------- date_str : str | None Date string in ISO format (YYYY-MM-DD or YYYY-MM-DDTHH:MM:SS), or special values "none"/"all" to disable filtering, or None to return None inclusive_end : bool If True and date_str has no time component, set time to 23:59:59 to include the entire day. Used for --to parameter to make date ranges inclusive. Default is False (use midnight). Returns ------- datetime | None Parsed datetime with system timezone, or None if date_str is None or a special value Raises ------ click.BadParameter If date string cannot be parsed """ if date_str is None: return None # Check for special values that disable filtering if date_str.lower() in ("none", "all"): return None # Parse ISO format date string try: # Try parsing with time component first if "T" in date_str: dt_obj = datetime.fromisoformat(date_str) # Parse date-only string elif inclusive_end: # For inclusive end dates, set time to end of day dt_obj = datetime.fromisoformat(date_str + "T23:59:59") else: # For start dates, set time to midnight dt_obj = datetime.fromisoformat(date_str + "T00:00:00") # Ensure timezone-aware datetime using system timezone if dt_obj.tzinfo is None: from nexusLIMS.utils.time import current_system_tz # noqa: PLC0415 dt_obj = dt_obj.replace(tzinfo=current_system_tz()) except ValueError as e: msg = ( f"Invalid date format: {date_str}. " f"Use ISO format (YYYY-MM-DD or YYYY-MM-DDTHH:MM:SS) " f'or special value "none" to disable filtering.' ) raise click.BadParameter(msg) from e else: return dt_obj def _handle_error_notification( log_file: Path, file_handler: logging.FileHandler ) -> None: """ Check log for errors and send notification if needed. Parameters ---------- log_file : Path Path to the log file to check file_handler : logging.FileHandler File handler to flush before checking log Returns ------- None This function doesn't return anything. All errors are caught and logged. """ # Ensure file handler is flushed before checking log # This is important for error detection to work correctly file_handler.flush() logger.info("NexusLIMS record processor finished") try: has_errors, found_patterns = check_log_for_errors(log_file) if has_errors: logger.info("Errors detected in log, sending notification") send_error_notification(log_file, found_patterns) else: logger.info("No errors detected in log") except Exception: logger.exception("Error while checking log or sending notification") finally: # Clean up file handler after all logging is complete logging.root.removeHandler(file_handler) file_handler.close() @click.command( epilog=""" Examples: \b # Normal run (process records from last week) $ nexuslims build-records \b # Process all sessions (no date filtering) $ nexuslims build-records --from=none \b # Process sessions since a specific date $ nexuslims build-records --from=2025-01-01 \b # Process a specific date range $ nexuslims build-records --from=2025-01-01 --to=2025-01-31 \b # Dry run (find files only) $ nexuslims build-records -n \b # Verbose output $ nexuslims build-records -vv """ ) @click.option( "-n", "--dry-run", is_flag=True, help="Dry run: find files without building records", ) @click.option( "-v", "--verbose", count=True, help="Increase verbosity (-v for INFO, -vv for DEBUG)", ) @click.option( "--from", "from_arg", type=str, default=None, help="Start date for session filtering (ISO format: YYYY-MM-DD). " 'Defaults to 1 week ago. Use "none" to disable lower bound.', ) @click.option( "--to", "to_arg", type=str, default=None, help="End date for session filtering (ISO format: YYYY-MM-DD). " "Omit to disable upper bound.", ) @click.version_option(version=None, message=_format_version("nexuslims build-records")) def main( *, dry_run: bool, verbose: int, from_arg: str | None, to_arg: str | None ) -> None: """ Process new NexusLIMS records with logging and email notifications. This command runs the NexusLIMS record builder to process new experimental sessions and generate XML records. It provides file locking to prevent concurrent runs, timestamped logging, and email notifications on errors. By default, only sessions from the last week are processed. Use --from=none to process all sessions, or specify custom date ranges with --from and --to. """ from nexusLIMS.cli import handle_config_error # noqa: PLC0415 with handle_config_error(): # Setup logging (accesses settings for log directory path) log_level = _get_log_level(verbose) log_file, file_handler = _setup_logging(log_level, dry_run) # Parse date arguments from raw string parameters dt_from = _parse_date_argument(from_arg) dt_to = _parse_date_argument(to_arg, inclusive_end=True) # Apply default: fetch last week if no --from was provided # (Don't apply if user explicitly passed --from=none) if from_arg is None: from nexusLIMS.utils.time import current_system_tz # noqa: PLC0415 dt_from = datetime.now(tz=current_system_tz()) - timedelta(weeks=1) # Log startup information logger.info("Starting NexusLIMS record processor") logger.info("Dry run: %s", dry_run) if dt_from is not None: logger.info("Fetching sessions from: %s", dt_from.isoformat()) else: logger.info("Fetching sessions from: (no lower bound)") if dt_to is not None: logger.info("Fetching sessions to: %s", dt_to.isoformat()) else: logger.info("Fetching sessions to: (no upper bound)") # Dump sanitized effective configuration when verbose if verbose >= 1: from nexusLIMS.cli.config import ( # noqa: PLC0415 _build_config_dict, _sanitize_config, ) from nexusLIMS.config import settings # noqa: PLC0415 logger.info( "Effective configuration:\n%s", json.dumps( _sanitize_config(_build_config_dict(settings)), indent=2, default=str, ), ) # Run record builder with file locking _run_with_lock(dry_run, dt_from, dt_to) # Handle error notifications and cleanup _handle_error_notification(log_file, file_handler) if __name__ == "__main__": # pragma: no cover main()