Architecture¶
System Overview¶
StreamWeave is a data harvesting pipeline for scientific instruments. It connects to instruments over CIFS/SMB, discovers new files, transfers them to managed storage, and maintains a full audit trail.
graph TD
Users["<b>๐ค Users</b>"] --> Browser["<b>๐ Browser</b><br/>(React SPA)"]
Users --> Scripts["<b>๐ Scripts / Jupyter</b>"]
Browser --> API
Scripts --> API
subgraph Backend["Backend (FastAPI)"]
API["<b>๐ API Routes</b>"]
Auth["<b>๐ Auth</b><br/>(fastapi-users + JWT)"]
API --> Auth
end
API -- "deploy & trigger" --> Prefect["<b>โ๏ธ Prefect 3</b><br/>Orchestration"]
Prefect -- "runs" --> Worker["<b>โฌ๏ธ Harvest Worker</b><br/>(rclone)"]
API --> DB[("<b>๐๏ธ PostgreSQL</b><br/>โโโโโโโโโโโโโ<br/>Instruments ยท Schedules<br/>Files ยท Transfers<br/>Users ยท Access Control<br/>Hooks ยท Audit Log")]
Worker --> DB
subgraph FileStorage["File Storage Locations"]
Instruments["<b>๐ฌ Scientific Instruments</b><br/>(Samba)"]
DestStorage["<b>๐พ Destination Storage</b><br/>(POSIX / S3)"]
end
DB -- "encrypted credentials" --> ServiceAccounts["<b>๐ Service Accounts</b><br/>(per-instrument credentials)"]
ServiceAccounts -- "authenticate" --> Worker
Instruments -- "CIFS/SMB" --> Worker
Worker -- "file copy" --> DestStorage
style FileStorage fill:#fef9c3,stroke:#ca8a04
Data Flow¶
Harvest Pipeline¶
- Schedule triggers โ Prefect cron schedule fires (or manual trigger via API)
- Discovery โ Worker connects to instrument via rclone, runs
rclone lsjson --recursiveto list all files - Deduplication โ Discovered files are compared against existing
FileRecordentries bysource_path - Pre-transfer hooks โ For each new file, pre-transfer hooks run (e.g., file filter to skip
*.tmp) - Transfer โ rclone copies the file from the instrument to the destination storage
- Checksum verification โ xxhash computed on the transferred file and stored
- Post-transfer hooks โ Metadata enrichment and other post-processing
- Record creation โ
FileRecordandFileTransferentries written to the database
Identifier Minting¶
Every file receives a persistent ARK identifier (ark:/99999/fk4...) at discovery time, before transfer. This ensures files are trackable even if transfers fail or are retried.
Key Components¶
Transfer Adapters¶
The transfer layer uses a pluggable adapter pattern:
TransferAdapterโ Abstract base class definingdiscover(),transfer_file(), andchecksum()methodsRcloneAdapterโ Default implementation using rclone CLI for CIFS/SMB transfersfactory.pyโ Creates the right adapter based on instrument configuration
rclone is invoked via asyncio.create_subprocess_exec with connection details passed as CLI flags (no config files), allowing concurrent harvests from different instruments.
Hook System¶
Hooks are configurable actions that run during the harvest pipeline:
- Pre-transfer hooks โ Run before each file transfer. Can
skip(exclude the file),redirect(change destination), orproceed. - Post-transfer hooks โ Run after successful transfer. Can add metadata to the file record.
Built-in hooks:
| Hook | Trigger | Description |
|---|---|---|
file_filter |
pre_transfer | Skip files matching glob patterns (*.tmp, *.lock) |
metadata_enrichment |
post_transfer | Extract metadata from file paths via regex named groups |
Hooks are stored as HookConfig database records and can be assigned per-instrument with priority ordering.
Prefect Integration¶
StreamWeave uses Prefect 3.x for workflow orchestration:
- Flow:
harvest_instrument_flowโ Top-level orchestration for a single instrument - Tasks:
discover_files_task,transfer_single_file_taskโ Individual units of work with retry support - Deployments: Created via the API when a harvest schedule is configured
- Work pool:
streamweave-worker-poolโ Process-based worker pool
The PrefectClientService wraps the Prefect Python client for deployment CRUD and manual trigger operations.
Access Control¶
- Admin users โ Full access to all instruments, files, and management operations
- Regular users โ Can only view their own files and transfers. Instrument-level access (via
UserInstrumentAccess) determines which instruments a user can harvest from, but does not grant visibility into other users' data on the same instrument - Non-accessible resources return 404 (not 403) to avoid leaking existence information
Database Models¶
| Model | Description |
|---|---|
Instrument |
Scientific instrument with CIFS connection details |
ServiceAccount |
Encrypted credentials for instrument access |
StorageLocation |
Destination for transferred files (POSIX, S3, etc.) |
HarvestSchedule |
Cron schedule linking an instrument to a storage location |
HookConfig |
Configurable hook with trigger, implementation, and priority |
FileRecord |
Discovered file with persistent ARK identifier and metadata |
FileTransfer |
Audit record for each file transfer attempt |
User |
Application user (via fastapi-users) |
UserInstrumentAccess |
Junction table for user-instrument permissions |