Backend API Demo¶
This notebook walks through the core features of the StreamWeave API (admin and user) against the local dev stack:
- Prefect pipeline orchestration
- rclone data transfers from simulated CIFS instrument shares
- Pre- and post-transfer hook system (file filtering, metadata enrichment)
- Fine-grained file access control (users, groups, projects)
Prerequisites: the dev stack must be running at https://streamweave.local.
See Local Development for setup instructions.
This page is a static rendering of a Jupyter Notebook, which you can ⤓ download to run locally.
Prerequisites¶
- Docker and Docker Compose installed
uvinstalled for Python package management- The repo cloned and the dev stack already running (see Local Development)
Initial setup¶
Before running the notebook, start Jupyter from the repo root:
cd backend
uv sync
uv run jupyter lab ../docs/backend-demo.ipynb
You will also need to bring up the development docker stack and run at least the first few steps of the local dev deployment setup:
First, redirect the
streamweave.localDNS name to your local machine by adding the following to/etc/hosts(macOS/Linux) orC:\System32\drivers\etc\hosts(windows):127.0.0.1 streamweave.local
Then, generate local certificates using the script at
scripts/setup-dev-certs.sh
Then, from the repository root, run the following to bring up the development stack:
docker compose -f docker-compose.yml -f docker-compose.dev.yml up
The following cell contains helper commands that will be used throughout the notebook:
import httpx
import json
import os
import subprocess
import threading
import time
import warnings
from pathlib import Path
# Find the repo root regardless of where Jupyter was launched from
def _find_repo_root():
p = Path.cwd()
while p != p.parent:
if (p / "docker-compose.yml").exists():
return p
p = p.parent
raise RuntimeError("Could not find repo root (no docker-compose.yml found)")
def _mkcert_ca_cert() -> str:
caroot = subprocess.run(
["mkcert", "-CAROOT"], capture_output=True, text=True, check=True
).stdout.strip()
return str(Path(caroot) / "rootCA.pem")
REPO_ROOT = _find_repo_root()
DEV_COMPOSE = f"-f {REPO_ROOT}/docker-compose.yml -f {REPO_ROOT}/docker-compose.dev.yml"
MKCERT_CA_CERT = f"{REPO_ROOT / "caddy" / "certs" / "rootCA.pem"}"
# Dev stack credentials (set in docker-compose.dev.yml)
ADMIN_EMAIL = "admin@example.com"
ADMIN_PASSWORD = "adminpassword"
BASE_URL = "https://streamweave.local"
PREFECT_API_URL = "https://streamweave.local/prefect/api"
_limits = httpx.Limits(max_connections=10, max_keepalive_connections=5, keepalive_expiry=300)
client = httpx.Client(base_url=BASE_URL, timeout=30, verify=MKCERT_CA_CERT, limits=_limits)
prefect = httpx.Client(base_url=PREFECT_API_URL, timeout=30, verify=MKCERT_CA_CERT, limits=_limits)
def pp(resp, n: int | None = None):
"""Pretty-print a JSON response. Prints first `n` items if given."""
try:
data = resp.json()
if n is not None and isinstance(data, list) and len(data) > n:
data = [*data[:n], "..."]
print(json.dumps(data, indent=2))
except Exception:
print(f"HTTP {resp.status_code}: {resp.text}")
def pp_dict(data, n: int | None = None):
"""Pretty-print a dictionary or list. Prints first `n` items if given."""
if n is not None and isinstance(data, list) and len(data) > n:
data = [*data[:n], "..."]
print(json.dumps(data, indent=2))
def run(cmd, **kwargs):
"""Run a shell command, streaming stdout normally and stderr in yellow."""
YELLOW = "\033[33m"
RESET = "\033[0m"
with subprocess.Popen(
cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, **kwargs
) as proc:
def _stream_stderr():
for line in proc.stderr:
print(f"{YELLOW}{line}{RESET}", end="", flush=True)
t = threading.Thread(target=_stream_stderr)
t.start()
for line in proc.stdout:
print(line, end="", flush=True)
t.join()
if proc.returncode != 0:
warnings.warn(f"Command exited with code {proc.returncode}: {cmd}")
return proc
def wait_for_flow_run(flow_run_id: str, timeout: int = 120) -> str:
"""Wait for a Prefect flow run to complete. Returns the final state type."""
terminal_states = ("COMPLETED", "FAILED", "CANCELLED", "CRASHED")
print(f"Waiting for flow run {flow_run_id} to complete...")
for attempt in range(timeout):
flow_run = prefect.get(f"/flow_runs/{flow_run_id}").json()
state = flow_run.get("state", {}).get("type", "UNKNOWN")
if state in terminal_states:
print(f"Flow run finished with state: {state}")
return state
print(f" State: {state} (attempt {attempt + 1}/{timeout})")
time.sleep(3)
print(f"Warning: Flow run did not complete within {timeout} seconds")
return "TIMEOUT"
For reference, the dev stack starts the following docker services:
| Service | URL | Description |
|---|---|---|
postgres |
— | Application database |
redis |
— | Prefect cache |
prefect-postgres |
— | Prefect's internal database |
prefect-server |
https://streamweave.local/prefect/ |
Prefect UI + API (admin-only, need to login through main StreamWeave URL first) |
api |
https://streamweave.local/api/ |
StreamWeave FastAPI backend |
worker |
— | Prefect worker with rclone |
frontend |
https://streamweave.local |
StreamWeave vite frontend dev server (hot reload) |
caddy |
https://streamweave.local |
HTTPS reverse proxy |
mailpit |
https://streamweave.local/mail/ |
SMTP catch-all for outgoing emails |
s3-dev |
https://streamweave.local/s3/ |
S3-compatible dev storage |
dev-seed |
— | Seeds sample data on startup, then exits |
instruments-init |
— | One-shot container that copies sample_data/ into named volumes, then exits |
samba-instruments |
— | Single Samba server exposing all 4 instrument shares (nmr, hplc, ms, tem) on port 4461 |
Wait for services to be ready¶
The dev-seed container runs once on startup and populates the database with sample
instruments, storage locations, schedules, and hooks. Re-running is safe — existing
records are skipped.
# Check the API for health to make sure services are ready
for attempt in range(60):
try:
resp = client.get("/health")
if resp.status_code == 200:
print("API is ready.")
break
except httpx.RequestError:
pass
print(f"Waiting for API... (attempt {attempt + 1}/60)")
time.sleep(2)
else:
raise RuntimeError("API did not become available")
API is ready.
_ = run("docker compose ps")
NAME IMAGE COMMAND SERVICE CREATED STATUS PORTS streamweave-api-1 streamweave-api "sh -c 'alembic upgr…" api 33 seconds ago Up 20 seconds (healthy) 0.0.0.0:8000->8000/tcp, [::]:8000->8000/tcp streamweave-caddy-1 caddy:alpine "caddy run --config …" caddy 33 seconds ago Up 20 seconds 0.0.0.0:80->80/tcp, [::]:80->80/tcp, 0.0.0.0:443->443/tcp, [::]:443->443/tcp streamweave-frontend-1 streamweave-frontend-dev "docker-entrypoint.s…" frontend 33 seconds ago Up 20 seconds streamweave-mailpit-1 axllent/mailpit:latest "/mailpit --webroot …" mailpit 35 seconds ago Up 32 seconds (health: starting) 0.0.0.0:1025->1025/tcp, [::]:1025->1025/tcp streamweave-postgres-1 postgres:16-alpine "docker-entrypoint.s…" postgres 35 seconds ago Up 32 seconds (healthy) 0.0.0.0:5432->5432/tcp, [::]:5432->5432/tcp streamweave-prefect-postgres-1 postgres:16-alpine "docker-entrypoint.s…" prefect-postgres 35 seconds ago Up 32 seconds (healthy) 5432/tcp streamweave-prefect-server-1 prefecthq/prefect:3-latest "/usr/bin/tini -g --…" prefect-server 34 seconds ago Up 26 seconds (healthy) streamweave-redis-1 redis:7-alpine "docker-entrypoint.s…" redis 35 seconds ago Up 32 seconds (healthy) 0.0.0.0:6379->6379/tcp, [::]:6379->6379/tcp streamweave-s3-dev-1 rclone/rclone:latest "rclone serve s3 /da…" s3-dev 35 seconds ago Up 32 seconds streamweave-samba-archive-1 dperson/samba:latest "/sbin/tini -- /usr/…" samba-archive 35 seconds ago Up 32 seconds (health: starting) 0.0.0.0:445->445/tcp, [::]:445->445/tcp streamweave-samba-instruments-1 dperson/samba:latest "/sbin/tini -- /usr/…" samba-instruments 34 seconds ago Up 31 seconds (health: starting) 0.0.0.0:4461->445/tcp, [::]:4461->445/tcp streamweave-worker-1 streamweave-worker "prefect worker star…" worker 33 seconds ago Up 20 seconds
Check api status¶
# Wait for the API to be ready (retries up to 30 seconds)
for attempt in range(30):
try:
resp = client.get("/health")
pp(resp)
break
except httpx.RequestError:
print(f"Waiting for API... (attempt {attempt + 1}/30)")
time.sleep(1)
else:
raise RuntimeError("API did not become available within 30 seconds")
{
"status": "ok"
}
Expected: {"status": "ok"}
The Prefect UI is accessible at https://streamweave.local/prefect/ (you must login at https://streamweave.local/ with an admin account first).
2. Get an Auth Token¶
The dev stack automatically creates an admin account on startup via ensure_admin.py.
The default credentials are admin@example.com / adminpassword and can be overridden
with the ADMIN_EMAIL and ADMIN_PASSWORD environment variables. This cell will also add the token authentication
to both the StreamWeave and Prefect API clients so all later calls are authorized.
resp = client.post("/auth/jwt/login", data={"username": ADMIN_EMAIL, "password": ADMIN_PASSWORD})
TOKEN = resp.json()["access_token"]
AUTH = {"Authorization": f"Bearer {TOKEN}"}
client.headers["Authorization"] = f"Bearer {TOKEN}"
prefect.headers["Authorization"] = f"Bearer {TOKEN}"
print(f"Token acquired (first 20 chars): {TOKEN[:20]}...")
Token acquired (first 20 chars): eyJhbGciOiJIUzI1NiIs...
3. Verify Seeded Data¶
Counts of data¶
This cell just verifies that all of the expected data was initialized by the seed data container. The later cells in this section show the API responses for each data type individually.
resources = [
("Service accounts", client.get("/api/service-accounts").json(), 3),
("Storage locations", client.get("/api/storage-locations").json(), 3),
("Instruments", client.get("/api/instruments").json(), 4),
("Schedules", client.get("/api/schedules").json(), 4),
("Hooks", client.get("/api/hooks").json(), 3),
("Users", client.get("/api/admin/users").json(), 5),
]
w = max(len(label) for label, *_ in resources)
print(f"{'Resource':<{w}} {'Count':>5} {'Expected':>8} {'OK':>4}")
print("-" * (w + 23))
for label, data, expected in resources:
count = len(data)
ok = "✓" if count == expected else "✗"
print(f"{label:<{w}} {count:>5} {expected:>8} {ok:>4}")
assert len(data) == expected, f"{label}: expected {expected}, got {len(data)}"
Resource Count Expected OK ---------------------------------------- Service accounts 3 3 ✓ Storage locations 3 3 ✓ Instruments 4 4 ✓ Schedules 4 4 ✓ Hooks 3 3 ✓ Users 5 5 ✓
3a. Instruments¶
Expected: 4 instruments — Bruker AVANCE III 600 MHz NMR, Waters Acquity UPLC-MS,
Thermo Orbitrap Exploris 480, and FEI Titan Themis 300 TEM (offline for maintenance, so enabled is false).
resp = client.get("/api/instruments")
pp(resp)
[
{
"id": "1b1ef370-7eaa-416f-852e-7724248cce5b",
"name": "Bruker AVANCE III 600 MHz NMR",
"description": "600 MHz solution NMR for small-molecule and protein characterization",
"location": "Chemistry Building, Room 102",
"pid": null,
"cifs_host": "samba-instruments",
"cifs_share": "nmr",
"cifs_base_path": "/",
"service_account_id": "93d7a72d-1294-4b0b-8028-a27d7a236c25",
"transfer_adapter": "rclone",
"transfer_config": null,
"enabled": true,
"created_at": "2026-03-01T20:58:26.275154Z",
"updated_at": "2026-03-01T20:58:26.275154Z",
"deleted_at": null
},
{
"id": "ebc4c653-b5fa-4c94-b56d-3fa63ea78452",
"name": "Waters Acquity UPLC-MS",
"description": "Ultra-performance liquid chromatography with mass spectrometry detection",
"location": "Analytical Core, Room 210",
"pid": null,
"cifs_host": "samba-instruments",
"cifs_share": "hplc",
"cifs_base_path": "/",
"service_account_id": "a8f1c2b4-c384-4e8b-bc28-56ca93e3fba9",
"transfer_adapter": "rclone",
"transfer_config": null,
"enabled": true,
"created_at": "2026-03-01T20:58:26.281312Z",
"updated_at": "2026-03-01T20:58:26.281312Z",
"deleted_at": null
},
{
"id": "e687ff98-1733-433f-9f64-9cbd0c8a29cc",
"name": "Thermo Orbitrap Exploris 480",
"description": "High-resolution Orbitrap mass spectrometer for proteomics",
"location": "Proteomics Core, Room 315",
"pid": null,
"cifs_host": "samba-instruments",
"cifs_share": "ms",
"cifs_base_path": "/",
"service_account_id": "41fe2e27-8080-4ba9-8b59-962b317fc373",
"transfer_adapter": "rclone",
"transfer_config": null,
"enabled": true,
"created_at": "2026-03-01T20:58:26.285609Z",
"updated_at": "2026-03-01T20:58:26.285609Z",
"deleted_at": null
},
{
"id": "508f8137-0649-489b-973a-0b837a868545",
"name": "FEI Titan Themis 300 TEM",
"description": "Aberration-corrected transmission electron microscope",
"location": "Electron Microscopy Facility, Basement",
"pid": null,
"cifs_host": "samba-instruments",
"cifs_share": "tem",
"cifs_base_path": "/",
"service_account_id": null,
"transfer_adapter": "rclone",
"transfer_config": null,
"enabled": false,
"created_at": "2026-03-01T20:58:26.289504Z",
"updated_at": "2026-03-01T20:58:26.289504Z",
"deleted_at": null
}
]
3b. Storage locations¶
Expected: 3 storage locations — Local POSIX archive (/storage/posix-archive),
S3 dev bucket (rclone → s3-dev:9000), and Samba archive share (CIFS).
resp = client.get("/api/storage-locations")
pp(resp)
[
{
"id": "920c9c12-97b7-4f37-a1c6-288efb8a270b",
"name": "Local POSIX archive",
"type": "posix",
"connection_config": null,
"base_path": "/storage/posix-archive",
"enabled": true,
"created_at": "2026-03-01T20:58:26.206225Z",
"updated_at": "2026-03-01T20:58:26.206225Z",
"deleted_at": null
},
{
"id": "d584d2a4-3eb0-4649-9395-b96b335ee906",
"name": "S3 dev bucket",
"type": "s3",
"connection_config": {
"bucket": "instruments",
"region": "us-east-1",
"endpoint_url": "http://s3-dev:9000",
"access_key_id": "devkey",
"secret_access_key": "****"
},
"base_path": "instruments",
"enabled": true,
"created_at": "2026-03-01T20:58:26.214598Z",
"updated_at": "2026-03-01T20:58:26.214598Z",
"deleted_at": null
},
{
"id": "8e6bf956-0d69-4fb0-89f8-a811420edd86",
"name": "Samba archive share",
"type": "cifs",
"connection_config": {
"host": "samba-archive",
"share": "archive",
"domain": null,
"username": "devuser",
"password": "****"
},
"base_path": "/archive",
"enabled": true,
"created_at": "2026-03-01T20:58:26.265442Z",
"updated_at": "2026-03-01T20:58:26.265442Z",
"deleted_at": null
}
]
3c. Schedules¶
Expected: 4 schedules with non-null prefect_deployment_id — the dev seed creates
schedules via the API, which triggers Prefect deployment creation automatically.
resp = client.get("/api/schedules")
schedules = resp.json()
pp(resp)
[
{
"id": "1e685983-e78d-470b-b30c-3e7a2bcb3435",
"instrument_id": "1b1ef370-7eaa-416f-852e-7724248cce5b",
"default_storage_location_id": "920c9c12-97b7-4f37-a1c6-288efb8a270b",
"cron_expression": "0 1 * * *",
"prefect_deployment_id": "7ad75b08-504f-4e8a-959c-e06de5aa102d",
"enabled": true,
"created_at": "2026-03-01T20:58:26.294566Z",
"updated_at": "2026-03-01T20:58:26.296650Z",
"deleted_at": null
},
{
"id": "68920c17-e254-4e51-9ced-9334942c9ac6",
"instrument_id": "1b1ef370-7eaa-416f-852e-7724248cce5b",
"default_storage_location_id": "d584d2a4-3eb0-4649-9395-b96b335ee906",
"cron_expression": "0 2 * * *",
"prefect_deployment_id": "7ad75b08-504f-4e8a-959c-e06de5aa102d",
"enabled": true,
"created_at": "2026-03-01T20:58:27.726552Z",
"updated_at": "2026-03-01T20:58:27.728021Z",
"deleted_at": null
},
{
"id": "aeadda83-aa42-4e76-9576-ad20fe765275",
"instrument_id": "ebc4c653-b5fa-4c94-b56d-3fa63ea78452",
"default_storage_location_id": "920c9c12-97b7-4f37-a1c6-288efb8a270b",
"cron_expression": "0 4 * * *",
"prefect_deployment_id": "13979110-2b8e-4930-86cc-eb20bf61e8c0",
"enabled": true,
"created_at": "2026-03-01T20:58:27.769522Z",
"updated_at": "2026-03-01T20:58:27.771368Z",
"deleted_at": null
},
{
"id": "1fea6ec5-2907-434d-abcc-bd45624995b8",
"instrument_id": "e687ff98-1733-433f-9f64-9cbd0c8a29cc",
"default_storage_location_id": "8e6bf956-0d69-4fb0-89f8-a811420edd86",
"cron_expression": "0 0 * * *",
"prefect_deployment_id": "12fd6b22-9843-4024-a906-defdd8ab3761",
"enabled": true,
"created_at": "2026-03-01T20:58:27.807300Z",
"updated_at": "2026-03-01T20:58:27.808785Z",
"deleted_at": null
}
]
3d. Hooks¶
Expected: 3 hooks:
- Auto-assign file access on transfer (
post_transfer,access_assignment) - NMR metadata enrichment (
post_transfer,metadata_enrichment, scoped to NMR instrument) - File size filter — skip temp files (
pre_transfer,file_filter, excludes*.tmp,*.lock,~*)
resp = client.get("/api/hooks")
pp(resp)
[
{
"id": "6dc37a6a-7188-4d03-8068-87d2796f9396",
"name": "Auto-assign file access on transfer",
"description": "Grants the instrument owner read access to every transferred file",
"trigger": "post_transfer",
"implementation": "builtin",
"builtin_name": "access_assignment",
"script_path": null,
"webhook_url": null,
"config": null,
"instrument_id": null,
"priority": 0,
"enabled": true,
"deleted_at": null
},
{
"id": "e3387162-dad0-4452-9317-5e51c682d676",
"name": "NMR metadata enrichment",
"description": "Extracts pulse programme, solvent, and nucleus from Bruker acqus files",
"trigger": "post_transfer",
"implementation": "builtin",
"builtin_name": "metadata_enrichment",
"script_path": null,
"webhook_url": null,
"config": null,
"instrument_id": "1b1ef370-7eaa-416f-852e-7724248cce5b",
"priority": 10,
"enabled": true,
"deleted_at": null
},
{
"id": "75c2ad1b-4eaa-4594-8955-a8e261823a88",
"name": "Skip zero-byte and temp files",
"description": "Drops zero-byte and .tmp files before transfer",
"trigger": "pre_transfer",
"implementation": "builtin",
"builtin_name": "file_filter",
"script_path": null,
"webhook_url": null,
"config": {
"min_size_bytes": 1,
"exclude_patterns": [
"*.tmp",
"*.lock",
"~*"
]
},
"instrument_id": null,
"priority": 0,
"enabled": true,
"deleted_at": null
}
]
# Save the NMR schedule ID (first schedule) for use in later steps
instruments = client.get("/api/instruments").json()
nmr = next(i for i in instruments if "NMR" in i["name"])
NMR_SCHEDULE = next(
s for s in schedules if s["instrument_id"] == nmr["id"]
)
SCHEDULE_ID = NMR_SCHEDULE["id"]
print(f"NMR instrument: {nmr['name']}")
print(f"Schedule ID: {SCHEDULE_ID}")
print(f"Prefect deployment ID: {NMR_SCHEDULE.get('prefect_deployment_id')}")
NMR instrument: Bruker AVANCE III 600 MHz NMR Schedule ID: 1e685983-e78d-470b-b30c-3e7a2bcb3435 Prefect deployment ID: 7ad75b08-504f-4e8a-959c-e06de5aa102d
4. Test Prefect Integration¶
4a. Check Prefect UI¶
Open https://streamweave.local/ in a browser, login in the default admin (this saves a cookie so you can access the Prefect interface), and then view the Prefect dashboard by clicking Admin -> Prefect Dashboard.
You should see:
- Deployments tab: 3 deployments named
harvest-{instrument_name} - Work Pools tab: a pool named streamweave-worker-pool with an active worker
4b. Trigger a manual harvest¶
resp = client.post(f"/api/schedules/{SCHEDULE_ID}/trigger")
pp(resp)
FLOW_RUN_ID = resp.json().get("flow_run_id")
print(f"\nMake sure to login as an admin at https://streamweave.local, then\nView the run at https://streamweave.local/prefect/runs/flow-run/{FLOW_RUN_ID}")
wait_for_flow_run(FLOW_RUN_ID)
{
"flow_run_id": "4b909956-75be-499f-8980-84c2aa339b12",
"schedule_id": "1e685983-e78d-470b-b30c-3e7a2bcb3435"
}
Make sure to login as an admin at https://streamweave.local, then
View the run at https://streamweave.local/prefect/runs/flow-run/4b909956-75be-499f-8980-84c2aa339b12
Waiting for flow run 4b909956-75be-499f-8980-84c2aa339b12 to complete...
State: PENDING (attempt 1/120)
State: PENDING (attempt 2/120)
State: PENDING (attempt 3/120)
State: PENDING (attempt 4/120)
Flow run finished with state: COMPLETED
'COMPLETED'
Expected response:
{
"flow_run_id": "<uuid>",
"schedule_id": "<uuid>"
}
4c. Monitor in Prefect UI¶
Go to https://streamweave.local/prefect/flow-runs (or the link above) and watch the triggered flow run. It will:
- Run
discover_files_task— discovers files on the NMR's Samba share (10, if this is the first time it has run) - Run
transfer_single_file_taskfor each new file — transfers via rclone
The run details in the Prefect interface show the ten original files being found, and transferred:
5. Verify Harvest Results¶
5a. File discovery¶
Should have found 10 example files from the NMR instrument
resp = client.get("/api/files")
print(f"\nFound {len(resp.json())} files")
pp(resp, n=3)
Found 10 files
[
{
"id": "19d53970-a1d1-4369-a5e9-ce103f35cd29",
"persistent_id": "ark:/99999/fk4ikr7lgrwcrbaxfd7helhytlf7e",
"persistent_id_type": "ark",
"instrument_id": "1b1ef370-7eaa-416f-852e-7724248cce5b",
"source_path": "20260201_alanine_13C/pdata/1/1r",
"filename": "1r",
"size_bytes": 8192,
"source_mtime": "2026-03-01T20:58:09.151000Z",
"xxhash": "f90b1bb50d3a727b",
"sha256": null,
"first_discovered_at": "2026-03-01T20:58:56.733048Z",
"metadata_": {},
"owner_id": null
},
{
"id": "67cd7b0b-4d63-41fd-86fe-c85bfb7d6c24",
"persistent_id": "ark:/99999/fk4ztfyomvqevcftgkqqrglxj2jzm",
"persistent_id_type": "ark",
"instrument_id": "1b1ef370-7eaa-416f-852e-7724248cce5b",
"source_path": "20260201_alanine_13C/fid",
"filename": "fid",
"size_bytes": 16384,
"source_mtime": "2026-03-01T20:58:09.156000Z",
"xxhash": "b9ae9fcc0155a1c7",
"sha256": null,
"first_discovered_at": "2026-03-01T20:58:56.685848Z",
"metadata_": {},
"owner_id": null
},
{
"id": "6aa2e54c-115e-472a-8efe-119b50293f7a",
"persistent_id": "ark:/99999/fk45jv3h6dvyvgtdpfdt6eprxp6ia",
"persistent_id_type": "ark",
"instrument_id": "1b1ef370-7eaa-416f-852e-7724248cce5b",
"source_path": "20260201_alanine_13C/acqus",
"filename": "acqus",
"size_bytes": 470,
"source_mtime": "2026-03-01T20:58:09.159000Z",
"xxhash": "cea07227a98f977f",
"sha256": null,
"first_discovered_at": "2026-03-01T20:58:56.637116Z",
"metadata_": {},
"owner_id": null
},
"..."
]
For each file you should see:
persistent_idstarting withark:/99999/fk4...(unique ARK identifier)instrument_idmatching the harvested instrumentsource_pathmatching the file's path on the instrumentfilename— the file namexxhash— checksum computed after transfer
5b. File transfers¶
Likewise, there should be 10 transfer actions for each file:
resp = client.get("/api/transfers")
print(f"Found {len(resp.json())} transfers")
pp(resp, n=5)
Found 10 transfers
[
{
"id": "26cb3c55-0d5c-4fcd-b4b8-a0c920f2c426",
"file_id": "c10e6c0a-fb26-4f34-b16a-8c968d9a4872",
"storage_location_id": "920c9c12-97b7-4f37-a1c6-288efb8a270b",
"destination_path": "/storage/posix-archive/Bruker AVANCE III 600 MHz NMR/20260210_ethanol_COSY/acqus",
"transfer_adapter": "rclone",
"status": "completed",
"bytes_transferred": 485,
"source_checksum": null,
"dest_checksum": "13a9a75aebeedaa8",
"checksum_verified": false,
"started_at": "2026-03-01T20:58:56.252504Z",
"completed_at": "2026-03-01T20:58:56.309077Z",
"error_message": null,
"prefect_flow_run_id": null
},
{
"id": "f393a5d2-10f9-43d2-aa8a-ca339d12cc92",
"file_id": "39722b71-0032-4d3e-8059-19fcb09c8cf2",
"storage_location_id": "920c9c12-97b7-4f37-a1c6-288efb8a270b",
"destination_path": "/storage/posix-archive/Bruker AVANCE III 600 MHz NMR/20260210_ethanol_COSY/fid",
"transfer_adapter": "rclone",
"status": "completed",
"bytes_transferred": 32768,
"source_checksum": null,
"dest_checksum": "5cf4a5b010130869",
"checksum_verified": false,
"started_at": "2026-03-01T20:58:56.318856Z",
"completed_at": "2026-03-01T20:58:56.363116Z",
"error_message": null,
"prefect_flow_run_id": null
},
{
"id": "cd136994-0373-4cad-8287-b5693c858994",
"file_id": "0db30674-42c2-4326-b27a-02cd0138fbe3",
"storage_location_id": "920c9c12-97b7-4f37-a1c6-288efb8a270b",
"destination_path": "/storage/posix-archive/Bruker AVANCE III 600 MHz NMR/20260210_ethanol_COSY/pdata/1/1r",
"transfer_adapter": "rclone",
"status": "completed",
"bytes_transferred": 8192,
"source_checksum": null,
"dest_checksum": "09bb4e0e5d27d746",
"checksum_verified": false,
"started_at": "2026-03-01T20:58:56.372613Z",
"completed_at": "2026-03-01T20:58:56.411892Z",
"error_message": null,
"prefect_flow_run_id": null
},
{
"id": "c445d05c-ca5a-459d-a74f-6c82d8ae02d9",
"file_id": "7a4bcae4-fe76-4cf6-bdd2-900e1f859f1e",
"storage_location_id": "920c9c12-97b7-4f37-a1c6-288efb8a270b",
"destination_path": "/storage/posix-archive/Bruker AVANCE III 600 MHz NMR/20260115_glucose_1H/acqus",
"transfer_adapter": "rclone",
"status": "completed",
"bytes_transferred": 998,
"source_checksum": null,
"dest_checksum": "93fa612f0dfeb7e8",
"checksum_verified": false,
"started_at": "2026-03-01T20:58:56.421415Z",
"completed_at": "2026-03-01T20:58:56.467660Z",
"error_message": null,
"prefect_flow_run_id": null
},
{
"id": "a0097b64-2b84-421a-9d6b-58a590ff43ac",
"file_id": "6d3f677c-8b98-4467-9922-a3d4a20972da",
"storage_location_id": "920c9c12-97b7-4f37-a1c6-288efb8a270b",
"destination_path": "/storage/posix-archive/Bruker AVANCE III 600 MHz NMR/20260115_glucose_1H/fid",
"transfer_adapter": "rclone",
"status": "completed",
"bytes_transferred": 8192,
"source_checksum": null,
"dest_checksum": "8852c19b353053af",
"checksum_verified": false,
"started_at": "2026-03-01T20:58:56.477706Z",
"completed_at": "2026-03-01T20:58:56.520744Z",
"error_message": null,
"prefect_flow_run_id": null
},
"..."
]
Each transfer should have:
status:"completed"or"skipped"dest_checksum— xxhash of the transferred filedestination_path— where the file was written under/storage/bytes_transferred— file sizestarted_atandcompleted_attimestamps
5c. Check files on disk (in the source directory)¶
_ = run(f"docker compose {DEV_COMPOSE} exec samba-instruments find /data/nmr -type f | sort")
/data/nmr/20260115_glucose_1H/acqus /data/nmr/20260115_glucose_1H/fid /data/nmr/20260115_glucose_1H/pdata/1/1r /data/nmr/20260115_glucose_1H/pdata/1/procs /data/nmr/20260201_alanine_13C/acqus /data/nmr/20260201_alanine_13C/fid /data/nmr/20260201_alanine_13C/pdata/1/1r /data/nmr/20260210_ethanol_COSY/acqus /data/nmr/20260210_ethanol_COSY/fid /data/nmr/20260210_ethanol_COSY/pdata/1/1r
5d. Check files on disk (in the storage directory)¶
_ = run(f"docker compose {DEV_COMPOSE} exec api tree /storage/posix-archive/")
/storage/posix-archive/
└── Bruker AVANCE III 600 MHz NMR
├── 20260115_glucose_1H
│ ├── acqus
│ ├── fid
│ └── pdata
│ └── 1
│ ├── 1r
│ └── procs
├── 20260201_alanine_13C
│ ├── acqus
│ ├── fid
│ └── pdata
│ └── 1
│ └── 1r
└── 20260210_ethanol_COSY
├── acqus
├── fid
└── pdata
└── 1
└── 1r
11 directories, 10 files
6. Test Pre-Transfer Hook (File Filter)¶
The file size filter hook skips zero-byte files and files matching
*.tmp, *.lock, ~* patterns.
6a. Add a temp and empty file to the NMR instrument share¶
# Write a .tmp file and empty .txt file into the samba-instruments volume
# via docker exec - this simulates two new files being created by the instrument
_ = run(f'docker compose {DEV_COMPOSE} exec samba-instruments sh -c '
'"echo temp data > /data/nmr/scratch.tmp && truncate -s 0 /data/nmr/empty.txt"')
print("Files in NMR share:\n-------------------")
_ = run(f'docker compose {DEV_COMPOSE} exec samba-instruments sh -c '
'"cd /data/nmr && find . -type f | sort"')
Files in NMR share: ------------------- ./20260115_glucose_1H/acqus ./20260115_glucose_1H/fid ./20260115_glucose_1H/pdata/1/1r ./20260115_glucose_1H/pdata/1/procs ./20260201_alanine_13C/acqus ./20260201_alanine_13C/fid ./20260201_alanine_13C/pdata/1/1r ./20260210_ethanol_COSY/acqus ./20260210_ethanol_COSY/fid ./20260210_ethanol_COSY/pdata/1/1r ./empty.txt ./scratch.tmp
6b. Trigger another harvest¶
resp = client.post(f"/api/schedules/{SCHEDULE_ID}/trigger")
IGNORE_FLOW_RUN_ID = resp.json().get("flow_run_id")
pp(resp)
print(f"\nView the run at https://streamweave.local/prefect/runs/flow-run/{IGNORE_FLOW_RUN_ID}")
wait_for_flow_run(IGNORE_FLOW_RUN_ID)
{
"flow_run_id": "aaaf170c-e6c8-4104-b1b3-0acbd45f0f51",
"schedule_id": "1e685983-e78d-470b-b30c-3e7a2bcb3435"
}
View the run at https://streamweave.local/prefect/runs/flow-run/aaaf170c-e6c8-4104-b1b3-0acbd45f0f51
Waiting for flow run aaaf170c-e6c8-4104-b1b3-0acbd45f0f51 to complete...
State: SCHEDULED (attempt 1/120)
State: SCHEDULED (attempt 2/120)
State: SCHEDULED (attempt 3/120)
State: PENDING (attempt 4/120)
State: PENDING (attempt 5/120)
State: PENDING (attempt 6/120)
State: PENDING (attempt 7/120)
Flow run finished with state: COMPLETED
'COMPLETED'
The run details in the Prefect interface show the two new files being found, and skipped:
6c. Verify the .tmp file was skipped¶
StreamWeave has a demonstration pre-transfer hook that ignores certain file patterns. These can be configured easily on a per-instrument basis. The following example will show that the scratch.tmp file is discovered in the file finding flow, but is not transferred due to the pre-transfer hook blocking it.
print("Files:\n------")
resp = client.get("/api/files")
pp(resp, n=4)
# nmr/scratch.tmp and nmr/empty.txt will be in the file list printed out at this step,
# since they were discovered, but we will confirm they were not transferred in the next step
Files:
------
[
{
"id": "dcf087d0-efb0-4928-adff-4ebc90176f39",
"persistent_id": "ark:/99999/fk43m2vxat6ane5jnipjzh42dwaiy",
"persistent_id_type": "ark",
"instrument_id": "1b1ef370-7eaa-416f-852e-7724248cce5b",
"source_path": "scratch.tmp",
"filename": "scratch.tmp",
"size_bytes": 10,
"source_mtime": "2026-03-01T20:58:59.139000Z",
"xxhash": null,
"sha256": null,
"first_discovered_at": "2026-03-01T20:59:20.070220Z",
"metadata_": {},
"owner_id": null
},
{
"id": "9bbf0276-ff55-4710-b0d7-0895f060289a",
"persistent_id": "ark:/99999/fk4e65r5rr3bbe3rkejdhdr3hf6eq",
"persistent_id_type": "ark",
"instrument_id": "1b1ef370-7eaa-416f-852e-7724248cce5b",
"source_path": "empty.txt",
"filename": "empty.txt",
"size_bytes": 0,
"source_mtime": "2026-03-01T20:58:59.139000Z",
"xxhash": null,
"sha256": null,
"first_discovered_at": "2026-03-01T20:59:20.054933Z",
"metadata_": {},
"owner_id": null
},
{
"id": "19d53970-a1d1-4369-a5e9-ce103f35cd29",
"persistent_id": "ark:/99999/fk4ikr7lgrwcrbaxfd7helhytlf7e",
"persistent_id_type": "ark",
"instrument_id": "1b1ef370-7eaa-416f-852e-7724248cce5b",
"source_path": "20260201_alanine_13C/pdata/1/1r",
"filename": "1r",
"size_bytes": 8192,
"source_mtime": "2026-03-01T20:58:09.151000Z",
"xxhash": "f90b1bb50d3a727b",
"sha256": null,
"first_discovered_at": "2026-03-01T20:58:56.733048Z",
"metadata_": {},
"owner_id": null
},
{
"id": "67cd7b0b-4d63-41fd-86fe-c85bfb7d6c24",
"persistent_id": "ark:/99999/fk4ztfyomvqevcftgkqqrglxj2jzm",
"persistent_id_type": "ark",
"instrument_id": "1b1ef370-7eaa-416f-852e-7724248cce5b",
"source_path": "20260201_alanine_13C/fid",
"filename": "fid",
"size_bytes": 16384,
"source_mtime": "2026-03-01T20:58:09.156000Z",
"xxhash": "b9ae9fcc0155a1c7",
"sha256": null,
"first_discovered_at": "2026-03-01T20:58:56.685848Z",
"metadata_": {},
"owner_id": null
},
"..."
]
# firmly assert that the scratch.tmp file was found
files = resp.json()
scratch = next((f for f in files if f["filename"] == "scratch.tmp"), None)
assert scratch is not None, "FAIL: scratch.tmp file record not found"
# firmly assert that the scratch.tmp file was not transferred
transfers = client.get(f"/api/transfers?file_id={scratch['id']}").json()
assert all(t["status"] == "skipped" for t in transfers), "FAIL: scratch.tmp should only have skipped transfers"
print("PASS: scratch.tmp was correctly filtered by the pre-transfer hook (transfer skipped)")
PASS: scratch.tmp was correctly filtered by the pre-transfer hook (transfer skipped)
7. Test Post-Transfer Hook (Metadata Enrichment)¶
StreamWeave supports post-transfer hooks that can extract scientific metadata from either files or the file paths, which is a common pattern for laboratories to encode metadata.
This example will configure a hook with regex rules that extract the date, compound, and nucleus
from Bruker NMR folder names like 20260115_glucose_1H/.
7a. Update the hook with extraction rules¶
# Find the NMR metadata enrichment hook
hooks = client.get("/api/hooks").json()
nmr_hook = next(h for h in hooks if "NMR" in h["name"])
pp_dict(nmr_hook)
{
"id": "e3387162-dad0-4452-9317-5e51c682d676",
"name": "NMR metadata enrichment",
"description": "Extracts pulse programme, solvent, and nucleus from Bruker acqus files",
"trigger": "post_transfer",
"implementation": "builtin",
"builtin_name": "metadata_enrichment",
"script_path": null,
"webhook_url": null,
"config": null,
"instrument_id": "1b1ef370-7eaa-416f-852e-7724248cce5b",
"priority": 10,
"enabled": true,
"deleted_at": null
}
# Update it with regex rules to extract experiment metadata from the path
resp = client.patch(f"/api/hooks/{nmr_hook['id']}", json={
"config": {
"rules": [
{
"source": "path",
"pattern": r"^(?P<date>\d{8})_(?P<compound>[^_/]+)_(?P<nucleus>[^/]+)/",
}
]
}
})
print(f"Updated hook: {resp.status_code}")
pp(resp)
Updated hook: 200
{
"id": "e3387162-dad0-4452-9317-5e51c682d676",
"name": "NMR metadata enrichment",
"description": "Extracts pulse programme, solvent, and nucleus from Bruker acqus files",
"trigger": "post_transfer",
"implementation": "builtin",
"builtin_name": "metadata_enrichment",
"script_path": null,
"webhook_url": null,
"config": {
"rules": [
{
"source": "path",
"pattern": "^(?P<date>\\d{8})_(?P<compound>[^_/]+)_(?P<nucleus>[^/]+)/"
}
]
},
"instrument_id": "1b1ef370-7eaa-416f-852e-7724248cce5b",
"priority": 10,
"enabled": true,
"deleted_at": null
}
7b. Clear transferred files and re-harvest¶
Delete all file records and transferred files so the harvest runs fresh:
_ = run(f"docker compose {DEV_COMPOSE} exec worker rm -rf /storage/posix-archive/*")
_ = run(f'docker compose {DEV_COMPOSE} exec postgres psql -U streamweave -c "DELETE FROM file_transfers; DELETE FROM file_records;"')
resp = client.post(f"/api/schedules/{SCHEDULE_ID}/trigger")
METADATA_FLOW_RUN_ID = resp.json().get("flow_run_id")
print(f"\nView the run at https://streamweave.local/prefect/runs/flow-run/{METADATA_FLOW_RUN_ID}")
pp(resp)
wait_for_flow_run(METADATA_FLOW_RUN_ID)
DELETE 12
DELETE 12
View the run at https://streamweave.local/prefect/runs/flow-run/9a23c5ac-e582-47f2-871c-41e81c0ba16e
{
"flow_run_id": "9a23c5ac-e582-47f2-871c-41e81c0ba16e",
"schedule_id": "1e685983-e78d-470b-b30c-3e7a2bcb3435"
}
Waiting for flow run 9a23c5ac-e582-47f2-871c-41e81c0ba16e to complete...
State: SCHEDULED (attempt 1/120)
State: SCHEDULED (attempt 2/120)
State: PENDING (attempt 3/120)
State: PENDING (attempt 4/120)
State: PENDING (attempt 5/120)
State: PENDING (attempt 6/120)
Flow run finished with state: COMPLETED
'COMPLETED'
The run details in the Prefect interface show the metadata being extracted from the paths and added to the file records:
7c. Check enriched metadata¶
resp = client.get("/api/files")
for f in resp.json():
print(json.dumps({
"filename": f["filename"],
"source_path": f["source_path"],
"metadata_": f.get("metadata_"),
}, indent=2))
{
"filename": "1r",
"source_path": "20260201_alanine_13C/pdata/1/1r",
"metadata_": {
"date": "20260201",
"compound": "alanine",
"nucleus": "13C"
}
}
{
"filename": "fid",
"source_path": "20260201_alanine_13C/fid",
"metadata_": {
"date": "20260201",
"compound": "alanine",
"nucleus": "13C"
}
}
{
"filename": "acqus",
"source_path": "20260201_alanine_13C/acqus",
"metadata_": {
"date": "20260201",
"compound": "alanine",
"nucleus": "13C"
}
}
{
"filename": "procs",
"source_path": "20260115_glucose_1H/pdata/1/procs",
"metadata_": {
"date": "20260115",
"compound": "glucose",
"nucleus": "1H"
}
}
{
"filename": "1r",
"source_path": "20260115_glucose_1H/pdata/1/1r",
"metadata_": {
"date": "20260115",
"compound": "glucose",
"nucleus": "1H"
}
}
{
"filename": "fid",
"source_path": "20260115_glucose_1H/fid",
"metadata_": {
"date": "20260115",
"compound": "glucose",
"nucleus": "1H"
}
}
{
"filename": "acqus",
"source_path": "20260115_glucose_1H/acqus",
"metadata_": {
"date": "20260115",
"compound": "glucose",
"nucleus": "1H"
}
}
{
"filename": "1r",
"source_path": "20260210_ethanol_COSY/pdata/1/1r",
"metadata_": {
"date": "20260210",
"compound": "ethanol",
"nucleus": "COSY"
}
}
{
"filename": "fid",
"source_path": "20260210_ethanol_COSY/fid",
"metadata_": {
"date": "20260210",
"compound": "ethanol",
"nucleus": "COSY"
}
}
{
"filename": "acqus",
"source_path": "20260210_ethanol_COSY/acqus",
"metadata_": {
"date": "20260210",
"compound": "ethanol",
"nucleus": "COSY"
}
}
{
"filename": "scratch.tmp",
"source_path": "scratch.tmp",
"metadata_": {}
}
{
"filename": "empty.txt",
"source_path": "empty.txt",
"metadata_": {}
}
Files inside dated experiment folders should have extracted metadata:
{
"filename": "acqus",
"source_path": "20260115_glucose_1H/acqus",
"metadata_": {
"date": "20260115",
"compound": "glucose",
"nucleus": "1H"
}
}
8. User-Scoped Access Control Demo¶
Files are private by default. Access is granted explicitly to users, groups, or projects via the FileAccessGrant system.
8a. Get regular user token and user ID¶
resp = client.post("/auth/jwt/login", data={"username": "chemist@example.com", "password": "devpass123!"})
USER_TOKEN = resp.json()["access_token"]
USER_AUTH = {"Authorization": f"Bearer {USER_TOKEN}"}
resp = client.get("/api/me", headers=USER_AUTH)
USER_ID = resp.json()["id"]
print(f"User ID: {USER_ID}")
User ID: 17a12c78-0b7f-41fc-97bf-11d9bd5305d2
8b. Verify user sees no files (no access granted)¶
resp = client.get("/api/files", headers=USER_AUTH)
print("Files:", resp.json())
# Expected: []
resp = client.get("/api/transfers", headers=USER_AUTH)
print("Transfers:", resp.json())
# Expected: []
Files: [] Transfers: []
8c. Grant direct user access to a file¶
# Pick a file to grant access to
FILE_ID = client.get("/api/files").json()[0]["id"]
print(f"File ID: {FILE_ID}")
# Grant the user access (admin-only endpoint)
resp = client.post(f"/api/files/{FILE_ID}/access", json={
"grantee_type": "user",
"grantee_id": USER_ID,
})
pp(resp)
File ID: 79bda453-97bf-43cb-8937-7ae0f7859764
{
"id": "c2dbd57f-4bdf-44e0-8be2-b0bc77a5f0da",
"file_id": "79bda453-97bf-43cb-8937-7ae0f7859764",
"grantee_type": "user",
"grantee_id": "17a12c78-0b7f-41fc-97bf-11d9bd5305d2",
"granted_at": "2026-03-01T20:59:39.171948Z"
}
Expected response:
{
"id": "<grant-uuid>",
"file_id": "<file-uuid>",
"grantee_type": "user",
"grantee_id": "<user-uuid>",
"granted_at": "2026-02-23T..."
}
8d. Verify user now sees the granted file¶
resp = client.get("/api/files", headers=USER_AUTH)
print(f"Files visible to user: {len(resp.json())}")
# Expected: exactly 1 file
resp = client.get(f"/api/files/{FILE_ID}", headers=USER_AUTH)
pp(resp)
# Expected: 200 with full file details
Files visible to user: 1
{
"id": "79bda453-97bf-43cb-8937-7ae0f7859764",
"persistent_id": "ark:/99999/fk4drwlgp6rhjd65mu7onw3pdb33a",
"persistent_id_type": "ark",
"instrument_id": "1b1ef370-7eaa-416f-852e-7724248cce5b",
"source_path": "20260201_alanine_13C/pdata/1/1r",
"filename": "1r",
"size_bytes": 8192,
"source_mtime": "2026-03-01T20:58:09.151000Z",
"xxhash": "f90b1bb50d3a727b",
"sha256": null,
"first_discovered_at": "2026-03-01T20:59:38.756414Z",
"metadata_": {
"date": "20260201",
"compound": "alanine",
"nucleus": "13C"
},
"owner_id": null
}
8e. Verify 404 for files without access¶
OTHER_FILE = client.get("/api/files").json()[1]["id"]
resp = client.get(f"/api/files/{OTHER_FILE}", headers=USER_AUTH)
pp(resp)
# Expected: {"detail": "File not found"} (404, not 403 — avoids leaking existence)
{
"detail": "File not found"
}
8f. List and revoke a grant¶
# List grants for the file (admin only)
resp = client.get(f"/api/files/{FILE_ID}/access")
pp(resp)
# Revoke the grant
GRANT_ID = resp.json()[0]["id"]
resp = client.delete(f"/api/files/{FILE_ID}/access/{GRANT_ID}")
print(f"Delete status: {resp.status_code}")
# Expected: 204
# Verify user can no longer see the file
resp = client.get(f"/api/files/{FILE_ID}", headers=USER_AUTH)
pp(resp)
# Expected: {"detail": "File not found"}
[
{
"id": "c2dbd57f-4bdf-44e0-8be2-b0bc77a5f0da",
"file_id": "79bda453-97bf-43cb-8937-7ae0f7859764",
"grantee_type": "user",
"grantee_id": "17a12c78-0b7f-41fc-97bf-11d9bd5305d2",
"granted_at": "2026-03-01T20:59:39.171948Z"
}
]
Delete status: 204
{
"detail": "File not found"
}
9. Group-Based Access Demo¶
File access can also be granted via group memberships, which are collections of users
9a. Get groups for the example chemistry user and save the first one¶
# Get the groups that chemist@example.com belongs to
resp = client.get(f"/api/admin/users/{USER_ID}/groups")
GROUP_ID = resp.json()[0]["id"]
pp(resp)
[
{
"id": "1503ae32-e59b-4269-a395-c1799bfa60be",
"name": "Chemistry & Chemical Biology",
"description": "Organic and inorganic chemistry researchers using NMR and HPLC",
"created_at": "2026-03-01T20:58:27.993017Z",
"updated_at": "2026-03-01T20:58:27.993017Z"
},
{
"id": "4c4b51be-2d73-4928-a946-f7cad3bb6569",
"name": "Analytical Core",
"description": "Cross-departmental analytical instrumentation users",
"created_at": "2026-03-01T20:58:28.020883Z",
"updated_at": "2026-03-01T20:58:28.020883Z"
}
]
# Remove the user from the group (just to demo the capability)
resp = client.delete(f"/api/groups/{GROUP_ID}/members/{USER_ID}")
pp(resp)
# Add the user back to the group
resp = client.post(f"/api/groups/{GROUP_ID}/members", json={"user_id": USER_ID})
pp(resp)
HTTP 204:
{
"group_id": "1503ae32-e59b-4269-a395-c1799bfa60be",
"user_id": "17a12c78-0b7f-41fc-97bf-11d9bd5305d2",
"email": "chemist@example.com"
}
10B-b. Grant the group access to a file¶
resp = client.post(f"/api/files/{FILE_ID}/access", json={
"grantee_type": "group",
"grantee_id": GROUP_ID,
})
pp(resp)
{
"id": "7416d939-7e3a-48f6-b26b-1dfe354d1349",
"file_id": "79bda453-97bf-43cb-8937-7ae0f7859764",
"grantee_type": "group",
"grantee_id": "1503ae32-e59b-4269-a395-c1799bfa60be",
"granted_at": "2026-03-01T20:59:39.254344Z"
}
10B-c. Verify user sees the file via group membership¶
resp = client.get(f"/api/files/{FILE_ID}", headers=USER_AUTH)
pp(resp)
# Expected: 200 — user can see the file because they're in the granted group
{
"id": "79bda453-97bf-43cb-8937-7ae0f7859764",
"persistent_id": "ark:/99999/fk4drwlgp6rhjd65mu7onw3pdb33a",
"persistent_id_type": "ark",
"instrument_id": "1b1ef370-7eaa-416f-852e-7724248cce5b",
"source_path": "20260201_alanine_13C/pdata/1/1r",
"filename": "1r",
"size_bytes": 8192,
"source_mtime": "2026-03-01T20:58:09.151000Z",
"xxhash": "f90b1bb50d3a727b",
"sha256": null,
"first_discovered_at": "2026-03-01T20:59:38.756414Z",
"metadata_": {
"date": "20260201",
"compound": "alanine",
"nucleus": "13C"
},
"owner_id": null
}
10B-d. Groups CRUD (Create, Read, Update, Delete)¶
# List groups
print("=== All groups ===")
pp(client.get("/api/groups"))
# Get group details
print("\n=== Group details ===")
pp(client.get(f"/api/groups/{GROUP_ID}"))
# List group members
print("\n=== Group members ===")
pp(client.get(f"/api/groups/{GROUP_ID}/members"))
# Update group
print("\n=== Update group ===")
pp(client.patch(f"/api/groups/{GROUP_ID}", json={"description": "Updated description"}))
=== All groups ===
[
{
"id": "1503ae32-e59b-4269-a395-c1799bfa60be",
"name": "Chemistry & Chemical Biology",
"description": "Organic and inorganic chemistry researchers using NMR and HPLC",
"created_at": "2026-03-01T20:58:27.993017Z",
"updated_at": "2026-03-01T20:58:27.993017Z"
},
{
"id": "52e870a7-71c6-4e0c-80e8-559745336945",
"name": "Proteomics Core",
"description": "Mass spectrometry and proteomics platform users",
"created_at": "2026-03-01T20:58:28.005970Z",
"updated_at": "2026-03-01T20:58:28.005970Z"
},
{
"id": "cb6b30dc-cf49-4b5d-85ed-ba6baebbe3e3",
"name": "EM Facility",
"description": "Electron microscopy facility operators and approved users",
"created_at": "2026-03-01T20:58:28.015033Z",
"updated_at": "2026-03-01T20:58:28.015033Z"
},
{
"id": "4c4b51be-2d73-4928-a946-f7cad3bb6569",
"name": "Analytical Core",
"description": "Cross-departmental analytical instrumentation users",
"created_at": "2026-03-01T20:58:28.020883Z",
"updated_at": "2026-03-01T20:58:28.020883Z"
}
]
=== Group details ===
{
"id": "1503ae32-e59b-4269-a395-c1799bfa60be",
"name": "Chemistry & Chemical Biology",
"description": "Organic and inorganic chemistry researchers using NMR and HPLC",
"created_at": "2026-03-01T20:58:27.993017Z",
"updated_at": "2026-03-01T20:58:27.993017Z"
}
=== Group members ===
[
{
"group_id": "1503ae32-e59b-4269-a395-c1799bfa60be",
"user_id": "b702fcc7-4e8a-4783-9c50-366b2b83305d",
"email": "bioinformatics@example.com"
},
{
"group_id": "1503ae32-e59b-4269-a395-c1799bfa60be",
"user_id": "17a12c78-0b7f-41fc-97bf-11d9bd5305d2",
"email": "chemist@example.com"
}
]
=== Update group ===
{
"id": "1503ae32-e59b-4269-a395-c1799bfa60be",
"name": "Chemistry & Chemical Biology",
"description": "Updated description",
"created_at": "2026-03-01T20:58:27.993017Z",
"updated_at": "2026-03-01T20:59:39.283178Z"
}
# Remove member
resp = client.delete(f"/api/groups/{GROUP_ID}/members/{USER_ID}")
print(f"Remove member status: {resp.status_code}")
# Expected: 204
# Verify user lost access (group membership removed)
resp = client.get(f"/api/files/{FILE_ID}", headers=USER_AUTH)
pp(resp)
# Expected: {"detail": "File not found"}
Remove member status: 204
{
"detail": "File not found"
}
10C. Project-Based File Access Demo¶
Projects can contain both individual users and entire groups. When a file is granted to a project, all members (direct users + users in member groups) can see it.
10C-a. Create a project with user and group members¶
# Re-add user to the group (removed in previous step)
client.post(f"/api/groups/{GROUP_ID}/members", json={"user_id": USER_ID})
# Create a new project
resp = client.post("/api/projects", json={
"name": "Microscopy Study 2026",
"description": "Main research project",
})
PROJECT_ID = resp.json()["id"]
print(f"Project ID: {PROJECT_ID}")
# Add the group as a project member
resp = client.post(f"/api/projects/{PROJECT_ID}/members", json={
"member_type": "group",
"member_id": GROUP_ID,
})
pp(resp)
Project ID: 10d1309b-b113-45e2-b09e-c5fec402ed3c
{
"id": "ed9688b1-b250-4c5b-a6c5-6bb77a78ad6a",
"project_id": "10d1309b-b113-45e2-b09e-c5fec402ed3c",
"member_type": "group",
"member_id": "1503ae32-e59b-4269-a395-c1799bfa60be",
"email": null
}
10C-b. Grant the project access to a file¶
# Clean up previous grants on the file
grants = client.get(f"/api/files/{FILE_ID}/access").json()
for g in grants:
client.delete(f"/api/files/{FILE_ID}/access/{g['id']}")
print(f"Cleaned up {len(grants)} existing grants")
# Grant project access
resp = client.post(f"/api/files/{FILE_ID}/access", json={
"grantee_type": "project",
"grantee_id": PROJECT_ID,
})
pp(resp)
Cleaned up 1 existing grants
{
"id": "d8f12f47-1d77-4af6-8886-745ba9125f40",
"file_id": "79bda453-97bf-43cb-8937-7ae0f7859764",
"grantee_type": "project",
"grantee_id": "10d1309b-b113-45e2-b09e-c5fec402ed3c",
"granted_at": "2026-03-01T20:59:39.337510Z"
}
10C-c. Verify user sees the file via project → group → user chain¶
resp = client.get(f"/api/files/{FILE_ID}", headers=USER_AUTH)
pp(resp)
# Expected: 200 — user can see the file because:
# user ∈ group → group ∈ project → project has file grant
{
"id": "79bda453-97bf-43cb-8937-7ae0f7859764",
"persistent_id": "ark:/99999/fk4drwlgp6rhjd65mu7onw3pdb33a",
"persistent_id_type": "ark",
"instrument_id": "1b1ef370-7eaa-416f-852e-7724248cce5b",
"source_path": "20260201_alanine_13C/pdata/1/1r",
"filename": "1r",
"size_bytes": 8192,
"source_mtime": "2026-03-01T20:58:09.151000Z",
"xxhash": "f90b1bb50d3a727b",
"sha256": null,
"first_discovered_at": "2026-03-01T20:59:38.756414Z",
"metadata_": {
"date": "20260201",
"compound": "alanine",
"nucleus": "13C"
},
"owner_id": null
}
10C-d. Test direct user membership in projects¶
# Create a second user
resp = client.post("/auth/register", json={"email": "postdoc@test.org", "password": "testpassword123"})
pp(resp)
resp = client.post("/auth/jwt/login", data={"username": "postdoc@test.org", "password": "testpassword123"})
POSTDOC_TOKEN = resp.json()["access_token"]
POSTDOC_AUTH = {"Authorization": f"Bearer {POSTDOC_TOKEN}"}
POSTDOC_ID = client.get("/api/me", headers=POSTDOC_AUTH).json()["id"]
print(f"Postdoc ID: {POSTDOC_ID}")
# Add postdoc directly to the project (not via group)
resp = client.post(f"/api/projects/{PROJECT_ID}/members", json={
"member_type": "user",
"member_id": POSTDOC_ID,
})
pp(resp)
# Postdoc can also see the file
resp = client.get(f"/api/files/{FILE_ID}", headers=POSTDOC_AUTH)
print(f"\nPostdoc file access status: {resp.status_code}")
# Expected: 200
{
"id": "bafd0f70-6aca-49e8-92dc-60f2c4e21b1e",
"email": "postdoc@test.org",
"is_active": true,
"is_superuser": false,
"is_verified": false,
"role": "user",
"deleted_at": null
}
Postdoc ID: bafd0f70-6aca-49e8-92dc-60f2c4e21b1e
{
"id": "d08ebeb9-35fa-4c8b-adc8-737474587249",
"project_id": "10d1309b-b113-45e2-b09e-c5fec402ed3c",
"member_type": "user",
"member_id": "bafd0f70-6aca-49e8-92dc-60f2c4e21b1e",
"email": "postdoc@test.org"
}
Postdoc file access status: 200
10C-e. Projects CRUD¶
# List projects
print("=== All projects ===")
pp(client.get("/api/projects"))
# List project members
print("\n=== Project members ===")
pp(client.get(f"/api/projects/{PROJECT_ID}/members"))
# Expected: 2 members (1 group + 1 direct user)
=== All projects ===
[
{
"id": "4eb42018-e177-4d8d-b429-becd741898d0",
"name": "Kinase Inhibitor Fragment Screen",
"description": "High-throughput NMR fragment screening of kinase inhibitor candidates",
"created_at": "2026-03-01T20:58:28.035514Z",
"updated_at": "2026-03-01T20:58:28.035514Z"
},
{
"id": "68f2ca8c-a406-44ed-833e-2e34876dc2f0",
"name": "HER2 Phosphoproteome Profiling",
"description": "Quantitative phosphoproteomics of HER2-positive breast cancer cell lines",
"created_at": "2026-03-01T20:58:28.051420Z",
"updated_at": "2026-03-01T20:58:28.051420Z"
},
{
"id": "05c54233-81ca-4676-9980-f7c5590f4a68",
"name": "Gold Nanoparticle Structure Determination",
"description": "Atomic-resolution TEM imaging and structural analysis of AuNP catalysts",
"created_at": "2026-03-01T20:58:28.060068Z",
"updated_at": "2026-03-01T20:58:28.060068Z"
},
{
"id": "02e157e1-95d8-4baa-a233-b355344f6c55",
"name": "Multi-omics Cancer Biomarker Discovery",
"description": "Integrated NMR metabolomics and proteomics for cancer biomarker identification",
"created_at": "2026-03-01T20:58:28.069467Z",
"updated_at": "2026-03-01T20:58:28.069467Z"
},
{
"id": "10d1309b-b113-45e2-b09e-c5fec402ed3c",
"name": "Microscopy Study 2026",
"description": "Main research project",
"created_at": "2026-03-01T20:59:39.310115Z",
"updated_at": "2026-03-01T20:59:39.310115Z"
}
]
=== Project members ===
[
{
"id": "ed9688b1-b250-4c5b-a6c5-6bb77a78ad6a",
"project_id": "10d1309b-b113-45e2-b09e-c5fec402ed3c",
"member_type": "group",
"member_id": "1503ae32-e59b-4269-a395-c1799bfa60be",
"email": null
},
{
"id": "d08ebeb9-35fa-4c8b-adc8-737474587249",
"project_id": "10d1309b-b113-45e2-b09e-c5fec402ed3c",
"member_type": "user",
"member_id": "bafd0f70-6aca-49e8-92dc-60f2c4e21b1e",
"email": "postdoc@test.org"
}
]
# Remove postdoc from project
resp = client.delete(f"/api/projects/{PROJECT_ID}/members/{POSTDOC_ID}")
print(f"Remove member status: {resp.status_code}")
# Expected: 204
# Postdoc loses access
resp = client.get(f"/api/files/{FILE_ID}", headers=POSTDOC_AUTH)
pp(resp)
# Expected: {"detail": "File not found"}
Remove member status: 204
{
"detail": "File not found"
}
10C-f. Non-admin users cannot manage groups/projects/grants¶
# All of these should return 403
for endpoint in ["/api/groups", "/api/projects", f"/api/files/{FILE_ID}/access"]:
resp = client.get(endpoint, headers=USER_AUTH)
print(f"GET {endpoint}: {resp.status_code} — {resp.json()}")
# Expected: {"detail": "Admin access required"}
GET /api/groups: 403 — {'detail': 'Admin access required'}
GET /api/projects: 403 — {'detail': 'Admin access required'}
GET /api/files/79bda453-97bf-43cb-8937-7ae0f7859764/access: 403 — {'detail': 'Admin access required'}
11. File & Transfer API Filtering Demo¶
11a. Filter files by instrument¶
# Get the ID of the NMR (first) instrument
INSTRUMENT_ID = instruments[0]["id"]
resp = client.get(f"/api/files?instrument_id={INSTRUMENT_ID}")
print(f"Files for instrument {INSTRUMENT_ID}: {len(resp.json())}")
Files for instrument 1b1ef370-7eaa-416f-852e-7724248cce5b: 12
11b. Filter transfers by file¶
FILE_ID = client.get("/api/files").json()[0]["id"]
resp = client.get(f"/api/transfers?file_id={FILE_ID}")
pp(resp)
[
{
"id": "684c5051-af2f-4fa7-bfdb-91a5204db382",
"file_id": "79bda453-97bf-43cb-8937-7ae0f7859764",
"storage_location_id": "920c9c12-97b7-4f37-a1c6-288efb8a270b",
"destination_path": "/storage/posix-archive/Bruker AVANCE III 600 MHz NMR/20260201_alanine_13C/pdata/1/1r",
"transfer_adapter": "rclone",
"status": "completed",
"bytes_transferred": 8192,
"source_checksum": null,
"dest_checksum": "f90b1bb50d3a727b",
"checksum_verified": false,
"started_at": "2026-03-01T20:59:38.757156Z",
"completed_at": "2026-03-01T20:59:38.792220Z",
"error_message": null,
"prefect_flow_run_id": null
}
]
11c. Get single file by ID¶
resp = client.get(f"/api/files/{FILE_ID}")
pp(resp)
{
"id": "79bda453-97bf-43cb-8937-7ae0f7859764",
"persistent_id": "ark:/99999/fk4drwlgp6rhjd65mu7onw3pdb33a",
"persistent_id_type": "ark",
"instrument_id": "1b1ef370-7eaa-416f-852e-7724248cce5b",
"source_path": "20260201_alanine_13C/pdata/1/1r",
"filename": "1r",
"size_bytes": 8192,
"source_mtime": "2026-03-01T20:58:09.151000Z",
"xxhash": "f90b1bb50d3a727b",
"sha256": null,
"first_discovered_at": "2026-03-01T20:59:38.756414Z",
"metadata_": {
"date": "20260201",
"compound": "alanine",
"nucleus": "13C"
},
"owner_id": null
}
Verify all fields are present: persistent_id, persistent_id_type, source_path, filename, xxhash, first_discovered_at, metadata_.
11d. Get single transfer by ID¶
TRANSFER_ID = client.get("/api/transfers").json()[0]["id"]
resp = client.get(f"/api/transfers/{TRANSFER_ID}")
pp(resp)
{
"id": "83e14f04-9a0b-4523-a724-3c86b99221f0",
"file_id": "29645b19-d789-487e-b56c-1e085d7b03c0",
"storage_location_id": "920c9c12-97b7-4f37-a1c6-288efb8a270b",
"destination_path": "/storage/posix-archive/Bruker AVANCE III 600 MHz NMR/20260201_alanine_13C/acqus",
"transfer_adapter": "rclone",
"status": "completed",
"bytes_transferred": 470,
"source_checksum": null,
"dest_checksum": "cea07227a98f977f",
"checksum_verified": false,
"started_at": "2026-03-01T20:59:38.674256Z",
"completed_at": "2026-03-01T20:59:38.706721Z",
"error_message": null,
"prefect_flow_run_id": null
}
POSTing to the /api/schedules endpoint should create a new schedule in the StreamWeave database, and also register a new deployment in Prefect
# Get a storage id to associate with this scedule first
STORAGE_ID = client.get("/api/storage-locations").json()[0]["id"]
resp = client.post("/api/schedules", json={
"instrument_id": INSTRUMENT_ID,
"default_storage_location_id": STORAGE_ID,
"cron_expression": "0 */6 * * *",
"enabled": True,
})
pp(resp)
{
"id": "f7f4e001-73fb-418c-a85b-90082c75ed24",
"instrument_id": "1b1ef370-7eaa-416f-852e-7724248cce5b",
"default_storage_location_id": "920c9c12-97b7-4f37-a1c6-288efb8a270b",
"cron_expression": "0 */6 * * *",
"prefect_deployment_id": "7ad75b08-504f-4e8a-959c-e06de5aa102d",
"enabled": true,
"created_at": "2026-03-01T20:59:39.529752Z",
"updated_at": "2026-03-01T20:59:39.531958Z",
"deleted_at": null
}
Check that prefect_deployment_id is populated (Prefect deployment was created).
12b. Update the schedule¶
First, view all the schedules in the application:
pp(client.get("/api/schedules"))
[
{
"id": "1e685983-e78d-470b-b30c-3e7a2bcb3435",
"instrument_id": "1b1ef370-7eaa-416f-852e-7724248cce5b",
"default_storage_location_id": "920c9c12-97b7-4f37-a1c6-288efb8a270b",
"cron_expression": "0 1 * * *",
"prefect_deployment_id": "7ad75b08-504f-4e8a-959c-e06de5aa102d",
"enabled": true,
"created_at": "2026-03-01T20:58:26.294566Z",
"updated_at": "2026-03-01T20:58:26.296650Z",
"deleted_at": null
},
{
"id": "68920c17-e254-4e51-9ced-9334942c9ac6",
"instrument_id": "1b1ef370-7eaa-416f-852e-7724248cce5b",
"default_storage_location_id": "d584d2a4-3eb0-4649-9395-b96b335ee906",
"cron_expression": "0 2 * * *",
"prefect_deployment_id": "7ad75b08-504f-4e8a-959c-e06de5aa102d",
"enabled": true,
"created_at": "2026-03-01T20:58:27.726552Z",
"updated_at": "2026-03-01T20:58:27.728021Z",
"deleted_at": null
},
{
"id": "aeadda83-aa42-4e76-9576-ad20fe765275",
"instrument_id": "ebc4c653-b5fa-4c94-b56d-3fa63ea78452",
"default_storage_location_id": "920c9c12-97b7-4f37-a1c6-288efb8a270b",
"cron_expression": "0 4 * * *",
"prefect_deployment_id": "13979110-2b8e-4930-86cc-eb20bf61e8c0",
"enabled": true,
"created_at": "2026-03-01T20:58:27.769522Z",
"updated_at": "2026-03-01T20:58:27.771368Z",
"deleted_at": null
},
{
"id": "1fea6ec5-2907-434d-abcc-bd45624995b8",
"instrument_id": "e687ff98-1733-433f-9f64-9cbd0c8a29cc",
"default_storage_location_id": "8e6bf956-0d69-4fb0-89f8-a811420edd86",
"cron_expression": "0 0 * * *",
"prefect_deployment_id": "12fd6b22-9843-4024-a906-defdd8ab3761",
"enabled": true,
"created_at": "2026-03-01T20:58:27.807300Z",
"updated_at": "2026-03-01T20:58:27.808785Z",
"deleted_at": null
},
{
"id": "f7f4e001-73fb-418c-a85b-90082c75ed24",
"instrument_id": "1b1ef370-7eaa-416f-852e-7724248cce5b",
"default_storage_location_id": "920c9c12-97b7-4f37-a1c6-288efb8a270b",
"cron_expression": "0 */6 * * *",
"prefect_deployment_id": "7ad75b08-504f-4e8a-959c-e06de5aa102d",
"enabled": true,
"created_at": "2026-03-01T20:59:39.529752Z",
"updated_at": "2026-03-01T20:59:39.531958Z",
"deleted_at": null
}
]
NEW_SCHEDULE_ID = client.get("/api/schedules").json()[-1]["id"]
resp = client.patch(f"/api/schedules/{NEW_SCHEDULE_ID}", json={
"cron_expression": "0 */12 * * *",
})
pp(resp)
{
"id": "f7f4e001-73fb-418c-a85b-90082c75ed24",
"instrument_id": "1b1ef370-7eaa-416f-852e-7724248cce5b",
"default_storage_location_id": "920c9c12-97b7-4f37-a1c6-288efb8a270b",
"cron_expression": "0 */12 * * *",
"prefect_deployment_id": "7ad75b08-504f-4e8a-959c-e06de5aa102d",
"enabled": true,
"created_at": "2026-03-01T20:59:39.529752Z",
"updated_at": "2026-03-01T20:59:39.609753Z",
"deleted_at": null
}
Verify in Prefect UI that the deployment schedule updated.
13. Test Idempotent Discovery¶
Trigger the same harvest twice — the second run should find zero new files.
SCHEDULE_ID
'1e685983-e78d-470b-b30c-3e7a2bcb3435'
# First trigger
resp = client.post(f"/api/schedules/{SCHEDULE_ID}/trigger")
FLOW_RUN_ID = resp.json().get("flow_run_id")
print("Trigger 1:", resp.json())
wait_for_flow_run(FLOW_RUN_ID)
before = len(client.get("/api/files").json())
print(f"Files before: {before}\n")
# Second trigger
resp = client.post(f"/api/schedules/{SCHEDULE_ID}/trigger")
print("Trigger 2:", resp.json())
FLOW_RUN_ID = resp.json().get("flow_run_id")
wait_for_flow_run(FLOW_RUN_ID)
after = len(client.get("/api/files").json())
print(f"Files after: {after}\n")
if before == after:
print("PASS: No duplicate files")
else:
print("FAIL: Duplicate files created")
Trigger 1: {'flow_run_id': '96b1e875-e3e6-43f3-961b-0ff6d7e0250f', 'schedule_id': '1e685983-e78d-470b-b30c-3e7a2bcb3435'}
Waiting for flow run 96b1e875-e3e6-43f3-961b-0ff6d7e0250f to complete...
State: SCHEDULED (attempt 1/120)
State: SCHEDULED (attempt 2/120)
State: PENDING (attempt 3/120)
State: PENDING (attempt 4/120)
State: PENDING (attempt 5/120)
State: PENDING (attempt 6/120)
Flow run finished with state: COMPLETED
Files before: 12
Trigger 2: {'flow_run_id': '8ba8fb2f-0909-44bc-a36b-172ce63f435c', 'schedule_id': '1e685983-e78d-470b-b30c-3e7a2bcb3435'}
Waiting for flow run 8ba8fb2f-0909-44bc-a36b-172ce63f435c to complete...
State: SCHEDULED (attempt 1/120)
State: SCHEDULED (attempt 2/120)
State: PENDING (attempt 3/120)
State: PENDING (attempt 4/120)
State: PENDING (attempt 5/120)
State: PENDING (attempt 6/120)
Flow run finished with state: COMPLETED
Files after: 12
PASS: No duplicate files
14. User API Endpoints¶
StreamWeave exposes a few types of user-related API endpoints: the /api/me endpoint (which includes group and project membership) and admin-only endpoints for listing and managing users.
14a. Current user profile — GET /api/me¶
The /api/me endpoint returns the authenticated user's profile enriched with their group and project memberships.
# Fetch the current user's full profile (as the regular chemist user)
resp = client.get("/api/me", headers=USER_AUTH)
pp(resp)
{
"id": "17a12c78-0b7f-41fc-97bf-11d9bd5305d2",
"email": "chemist@example.com",
"is_active": true,
"is_superuser": false,
"is_verified": false,
"role": "user",
"deleted_at": null,
"groups": [
{
"id": "4c4b51be-2d73-4928-a946-f7cad3bb6569",
"name": "Analytical Core",
"description": "Cross-departmental analytical instrumentation users",
"created_at": "2026-03-01T20:58:28.020883Z",
"updated_at": "2026-03-01T20:58:28.020883Z"
},
{
"id": "1503ae32-e59b-4269-a395-c1799bfa60be",
"name": "Chemistry & Chemical Biology",
"description": "Updated description",
"created_at": "2026-03-01T20:58:27.993017Z",
"updated_at": "2026-03-01T20:59:39.283178Z"
}
],
"projects": [
{
"id": "4eb42018-e177-4d8d-b429-becd741898d0",
"name": "Kinase Inhibitor Fragment Screen",
"description": "High-throughput NMR fragment screening of kinase inhibitor candidates",
"created_at": "2026-03-01T20:58:28.035514Z",
"updated_at": "2026-03-01T20:58:28.035514Z"
},
{
"id": "10d1309b-b113-45e2-b09e-c5fec402ed3c",
"name": "Microscopy Study 2026",
"description": "Main research project",
"created_at": "2026-03-01T20:59:39.310115Z",
"updated_at": "2026-03-01T20:59:39.310115Z"
},
{
"id": "02e157e1-95d8-4baa-a233-b355344f6c55",
"name": "Multi-omics Cancer Biomarker Discovery",
"description": "Integrated NMR metabolomics and proteomics for cancer biomarker identification",
"created_at": "2026-03-01T20:58:28.069467Z",
"updated_at": "2026-03-01T20:58:28.069467Z"
}
]
}
Expected response includes id, email, is_active, is_superuser, is_verified, role, groups (list), and projects (list).
14b. Admin: list all users — GET /api/admin/users¶
Admins can list all registered users. By default soft-deleted users are excluded; pass ?include_deleted=true to include them.
# List all active users (admin only)
resp = client.get("/api/admin/users")
users = resp.json()
print(f"Total active users: {len(users)}")
pp(resp)
Total active users: 6
[
{
"id": "a82372c3-0133-464f-bde1-b6c7922850e0",
"email": "admin@example.com",
"is_active": true,
"is_superuser": true,
"is_verified": false,
"role": "admin",
"deleted_at": null
},
{
"id": "17a12c78-0b7f-41fc-97bf-11d9bd5305d2",
"email": "chemist@example.com",
"is_active": true,
"is_superuser": false,
"is_verified": false,
"role": "user",
"deleted_at": null
},
{
"id": "ce7748d9-cc00-40f8-86ea-9af74bd78f4e",
"email": "proteomics@example.com",
"is_active": true,
"is_superuser": false,
"is_verified": false,
"role": "user",
"deleted_at": null
},
{
"id": "85de084c-ac0f-4471-83e1-e254eb80f574",
"email": "em-operator@example.com",
"is_active": true,
"is_superuser": false,
"is_verified": false,
"role": "user",
"deleted_at": null
},
{
"id": "b702fcc7-4e8a-4783-9c50-366b2b83305d",
"email": "bioinformatics@example.com",
"is_active": true,
"is_superuser": false,
"is_verified": false,
"role": "user",
"deleted_at": null
},
{
"id": "bafd0f70-6aca-49e8-92dc-60f2c4e21b1e",
"email": "postdoc@test.org",
"is_active": true,
"is_superuser": false,
"is_verified": false,
"role": "user",
"deleted_at": null
}
]
14d. Admin: soft-delete and restore a user — DELETE / POST /restore¶
Admins can soft-delete users (sets deleted_at). Deleted users are excluded from normal listings but can be restored.
# Create a temporary user to demonstrate delete/restore
resp = client.post("/auth/register", json={
"email": "temp-demo@example.com",
"password": "devpass123!",
})
TEMP_USER_ID = resp.json()["id"]
print("Created temp user:", TEMP_USER_ID)
Created temp user: 2717330d-da7a-44c8-9a1d-508ca25f2bec
# Soft-delete the user
resp = client.delete(f"/api/admin/users/{TEMP_USER_ID}")
print("Delete status:", resp.status_code) # Expected: 204
Delete status: 204
# Confirm deleted user is excluded from normal list
active = client.get("/api/admin/users").json()
print("Temp user in active list:", any(u["id"] == TEMP_USER_ID for u in active))
Temp user in active list: False
# Confirm deleted user appears with include_deleted=true
all_users = client.get("/api/admin/users?include_deleted=true").json()
deleted = next(u for u in all_users if u["id"] == TEMP_USER_ID)
print("deleted_at:", deleted["deleted_at"])
deleted_at: 2026-03-01T21:00:16.258006Z
# Restore the user
resp = client.post(f"/api/admin/users/{TEMP_USER_ID}/restore")
print("Restored user email:", resp.json()["email"])
print("deleted_at after restore:", resp.json()["deleted_at"]) # Expected: None
Restored user email: temp-demo@example.com deleted_at after restore: None
14e. Admin: user group and project membership — GET /api/admin/users/{id}/groups and /projects¶
Admins can look up which groups and projects any user belongs to.
# Look up the chemist user's groups and projects (admin view)
print("=== Groups ===")
pp(client.get(f"/api/admin/users/{USER_ID}/groups"))
print("\n=== Projects ===")
pp(client.get(f"/api/admin/users/{USER_ID}/projects"))
=== Groups ===
[
{
"id": "4c4b51be-2d73-4928-a946-f7cad3bb6569",
"name": "Analytical Core",
"description": "Cross-departmental analytical instrumentation users",
"created_at": "2026-03-01T20:58:28.020883Z",
"updated_at": "2026-03-01T20:58:28.020883Z"
},
{
"id": "1503ae32-e59b-4269-a395-c1799bfa60be",
"name": "Chemistry & Chemical Biology",
"description": "Updated description",
"created_at": "2026-03-01T20:58:27.993017Z",
"updated_at": "2026-03-01T20:59:39.283178Z"
}
]
=== Projects ===
[
{
"id": "4eb42018-e177-4d8d-b429-becd741898d0",
"name": "Kinase Inhibitor Fragment Screen",
"description": "High-throughput NMR fragment screening of kinase inhibitor candidates",
"created_at": "2026-03-01T20:58:28.035514Z",
"updated_at": "2026-03-01T20:58:28.035514Z"
}
]
15. Cleanup¶
Run the cell below, or from a terminal:
docker compose -f docker-compose.yml -f docker-compose.dev.yml down -v
# Tearing down the dev stack will clear all created files and shut down the app
_ = run(f"docker compose {DEV_COMPOSE} down -v")
client.close()
prefect.close()
Container streamweave-worker-1 Stopping Container streamweave-samba-instruments-1 Stopping Container streamweave-samba-archive-1 Stopping Container streamweave-caddy-1 Stopping Container streamweave-dev-seed-1 Stopping Container streamweave-dev-seed-1 Stopped Container streamweave-dev-seed-1 Removing Container streamweave-dev-seed-1 Removed Container streamweave-s3-dev-1 Stopping Container streamweave-samba-instruments-1 Stopped Container streamweave-samba-instruments-1 Removing Container streamweave-samba-archive-1 Stopped Container streamweave-samba-archive-1 Removing Container streamweave-caddy-1 Stopped Container streamweave-caddy-1 Removing Container streamweave-samba-instruments-1 Removed Container streamweave-instruments-init-1 Stopping Container streamweave-instruments-init-1 Stopped Container streamweave-instruments-init-1 Removing Container streamweave-s3-dev-1 Stopped Container streamweave-s3-dev-1 Removing Container streamweave-samba-archive-1 Removed Container streamweave-caddy-1 Removed Container streamweave-frontend-1 Stopping Container streamweave-instruments-init-1 Removed Container streamweave-s3-dev-1 Removed Container streamweave-worker-1 Stopped Container streamweave-worker-1 Removing Container streamweave-frontend-1 Stopped Container streamweave-frontend-1 Removing Container streamweave-worker-1 Removed Container streamweave-frontend-1 Removed Container streamweave-api-1 Stopping Container streamweave-api-1 Stopped Container streamweave-api-1 Removing Container streamweave-api-1 Removed Container streamweave-mailpit-1 Stopping Container streamweave-postgres-1 Stopping Container streamweave-prefect-server-1 Stopping Container streamweave-postgres-1 Stopped Container streamweave-postgres-1 Removing Container streamweave-prefect-server-1 Stopped Container streamweave-prefect-server-1 Removing Container streamweave-postgres-1 Removed Container streamweave-prefect-server-1 Removed Container streamweave-redis-1 Stopping Container streamweave-prefect-postgres-1 Stopping Container streamweave-mailpit-1 Stopped Container streamweave-mailpit-1 Removing Container streamweave-mailpit-1 Removed Container streamweave-redis-1 Stopped Container streamweave-redis-1 Removing Container streamweave-redis-1 Removed Container streamweave-prefect-postgres-1 Stopped Container streamweave-prefect-postgres-1 Removing Container streamweave-prefect-postgres-1 Removed Volume streamweave_ms_live_data Removing Volume streamweave_pgdata Removing Volume streamweave_s3_data Removing Volume streamweave_samba_data Removing Volume streamweave_nmr_live_data Removing Volume streamweave_tem_live_data Removing Volume streamweave_harvest_data Removing Network streamweave_default Removing Volume streamweave_prefect_pgdata Removing Volume streamweave_hplc_live_data Removing Volume streamweave_caddy_config Removing Volume streamweave_caddy_data Removing Volume streamweave_samba_data Removed Volume streamweave_ms_live_data Removed Volume streamweave_s3_data Removed Volume streamweave_pgdata Removed Volume streamweave_nmr_live_data Removed Volume streamweave_caddy_data Removed Volume streamweave_harvest_data Removed Volume streamweave_tem_live_data Removed Volume streamweave_caddy_config Removed Volume streamweave_hplc_live_data Removed Volume streamweave_prefect_pgdata Removed Network streamweave_default Removed
Confirm no containers are still running:
_ = run(f"docker compose {DEV_COMPOSE} ps")
NAME IMAGE COMMAND SERVICE CREATED STATUS PORTS
16. Conclusion¶
This guide demonstrated the core capabilities of the StreamWeave backend, a research data management platform designed to automate the discovery, transfer, and governance of instrument-generated data.
Features Covered¶
- Automated File Discovery & Transfer — Schedules that automatically harvest files from instrument sources, with checksum verification and idempotent processing
- Persistent Identifiers (ARK) — Every discovered file receives a unique, standards-compliant ARK identifier for long-term reference
- Workflow Orchestration — Prefect-powered flow execution with real-time monitoring, manual triggers, and scheduled runs
- Extensible Hooks System — Pre-transfer hooks for filtering files and post-transfer hooks for metadata enrichment
- Fine-Grained Access Control — User, group, and project-based permissions with hierarchical inheritance
- Full API Coverage — RESTful endpoints for instruments, storage locations, schedules, files, transfers, and access management
Use Cases¶
StreamWeave is ideal for:
- Research core facilities managing data from multiple scientific instruments
- Laboratories requiring automated data archival with provenance tracking
- Organizations needing compliant data governance with audit trails
Interested in deploying StreamWeave for your organization?¶
For deployment assistance, custom integrations, or enterprise support, contact us at: