Milestone v0.6.0 — Processor Cloud Sync (Upload Worker)¶
Target: v0.6.0 — Single-target cloud sync as internal Processor worker: FLAC compression, Rclone-based upload, Janitor integration, Audit logging
Status: ✅ Done
References: ADR-0009, ADR-0011, ADR-0018, ADR-0019, ADR-0023, ADR-0024, Refactoring Plan, Processor Service, Testing Guidelines, AGENTS.md, VISION.md, Glossary
Architecture Overview¶
The Cloud-Sync-Worker is an internal async worker within the Processor (Tier 1), following the same pattern as the Indexer and Janitor. It is NOT a standalone Tier 2 container — there is no Controller management, no multi-instance orchestration, and no separate health port.
This is a KISS simplification of the original multi-target Uploader architecture (archived, see ADR-0009 NOTE).
Key Design Decisions¶
| Question | Decision | Rationale |
|---|---|---|
| Where does upload run? | Inside Processor as async worker | Same pattern as Indexer/Janitor; no Controller complexity |
| How many targets? | Single target configured via system_config key "cloud_sync" |
KISS: Multi-target adds orchestration complexity with no MVP value |
| Config source? | CloudSyncSettings in system_config JSONB |
No storage_remotes table needed; same Pydantic schema pattern as other settings. Remote target via remote_type + remote_config dict |
| Upload tracking? | recordings.uploaded boolean |
Simple Janitor signal; one target = one boolean |
| Audit log? | uploads table (immutable, no remote_slug FK) |
Keep audit trail for debugging; simplified without multi-target references |
| FLAC encoding? | ffmpeg subprocess (ADR-0024) |
Same tool as Recorder; lossless, ~50% size reduction |
| Upload backend? | rclone subprocess |
Universal protocol support (WebDAV/Nextcloud, S3, SFTP) without Python library dependencies |
| Default state? | enabled: false |
Prevents Janitor deadlock when no rclone target is configured |
| Credentials? | Fernet-encrypted in system_config JSONB |
Sensitive values (user, pass) AES-encrypted via SILVASONIC_ENCRYPTION_KEY (.env); enc: prefix detection for graceful plaintext fallback |
Remote Path Convention¶
Example:
Local: /workspace/recorder/ultramic-01/data/raw/raw_20260329_143000.wav
Remote: silvasonic/silvasonic-dev/ultramic-01/2026-03-29/raw_20260329_143000.flac
The path_builder module is the only component that knows about the remote directory convention.
Phase 1: Core Infrastructure¶
Goal: Build the foundation for credential encryption and cloud sync configuration. Add the crypto module to core, extend the DB schema, install rclone in the Processor container, and implement the CloudSyncSeeder in the Controller.
User Stories: US-U06 (Seamless tracking — audit log schema)
Dependency: Preparatory refactoring (v0.5.x) must be complete — storage_remotes table removed, CloudSyncSettings.enabled defaults to false, remote_type / remote_config fields present in schema.
Cross-Cutting Dependency (core): Credential encryption uses
packages/core/src/silvasonic/core/crypto.py— a thin Fernet wrapper (encrypt_value,decrypt_value) added tosilvasonic-coreas part of this milestone. The encryption key lives in.envasSILVASONIC_ENCRYPTION_KEY.
Source References¶
Read these files before starting Phase 1:
| Purpose | File |
|---|---|
| Upload DB model | packages/core/src/silvasonic/core/database/models/recordings.py |
| CloudSyncSettings schema | packages/core/src/silvasonic/core/schemas/system_config.py |
| Database DDL | services/database/init/01-init-schema.sql |
| Existing Controller seeder pattern | services/controller/src/silvasonic/controller/seeder.py |
| Environment config | .env / .env.example |
Tasks¶
- [x] Update Database Schema & Models:
- Add
remote_path TEXTcolumn touploadstable in01-init-schema.sql - Add
CREATE INDEX ix_uploads_attempt_at ON uploads (attempt_at DESC)in01-init-schema.sql - Add
remote_path: Mapped[str | None]toUploadmodel inpackages/core/src/silvasonic/core/database/models/system.py - [x] Add
rcloneto ProcessorContainerfilesystem dependencies (alongside existingffmpeg) - [x] Implement
packages/core/src/silvasonic/core/crypto.py: encrypt_value(plaintext: str, key: bytes) -> str— Fernet encrypt, returnsenc:<token>prefixed stringdecrypt_value(value: str, key: bytes) -> str— ifenc:prefix → decrypt; otherwise return as-is (plaintext fallback)load_encryption_key() -> bytes— readSILVASONIC_ENCRYPTION_KEYfrom env, raise clear error if missing- Key generation helper:
python -m silvasonic.core.crypto generate-key(prints new Fernet key to stdout) - [x] Add
cryptography>=43.0topackages/core/pyproject.tomldependencies - [x] Add
SILVASONIC_ENCRYPTION_KEYto.envand.env.examplewith generated Fernet key - [x] Implement
CloudSyncSeederinservices/controller/src/silvasonic/controller/seeder.py: - Reads 4 optional env vars:
SILVASONIC_CLOUD_REMOTE_TYPE,_URL,_USER,_PASS - All-or-nothing: if any of the 4 is missing → log debug + skip (cloud sync stays as seeded by defaults.yml)
- If all 4 present: build
remote_configdict, encryptuserandpassviacrypto.encrypt_value() - UPSERT (not
ON CONFLICT DO NOTHING):.envis infrastructure, always overwrites DB values - Sets
cloud_sync.enabled = true,cloud_sync.remote_type,cloud_sync.remote_configinsystem_config - For
remote_type=webdav: auto-setvendor=nextcloudif URL containsnextcloudor ends with/webdav/ - Requires
SILVASONIC_ENCRYPTION_KEY— if missing while credentials are set → clear error + skip - Wired into
run_all_seeders()afterConfigSeeder(so defaults exist first), beforeAuthSeeder
Acceptance Criteria¶
Phase is complete when
just checkpasses and all tests below are green.
Unit Tests (packages/core/tests/unit/test_crypto.py) — @pytest.mark.unit¶
| Test | Validates |
|---|---|
test_encrypt_decrypt_roundtrip |
decrypt_value(encrypt_value(s, k), k) == s |
test_plaintext_fallback |
No enc: prefix → returned as-is |
test_decrypt_wrong_key_fails |
Wrong key → InvalidToken raised |
test_load_encryption_key_missing |
Env var not set → clear error message |
test_generate_key_valid_fernet |
Generated key is valid Fernet key |
Unit Tests (services/controller/tests/unit/test_cloud_sync_seeder.py) — @pytest.mark.unit¶
| Test | Validates |
|---|---|
test_all_env_vars_set_seeds_encrypted |
All 4 vars → system_config updated with enc: values, enabled=true |
test_partial_env_vars_skips |
Only 2 of 4 vars → no DB change, debug log |
test_no_env_vars_skips |
No vars → no DB change, no error |
test_upsert_overwrites_existing |
Existing cloud_sync in DB → overwritten with new .env values |
test_missing_encryption_key_errors |
Credentials set but no SILVASONIC_ENCRYPTION_KEY → clear error, skip |
test_webdav_auto_vendor_nextcloud |
remote_type=webdav + URL with /webdav/ → vendor=nextcloud auto-set |
Phase 2: Upload Worker Modules¶
Goal: Implement all Cloud-Sync-Worker modules within the Processor service. The worker polls for pending recordings, compresses WAV→FLAC, uploads via rclone, logs the attempt, and updates the database.
User Stories: US-U01 (Automatic cloud upload with FLAC), US-U06 (Seamless tracking)
Dependency: Phase 1 (Core Infrastructure) must be complete — crypto module available, DB schema updated, rclone installed in container.
Modules¶
| Module | File | Responsibility |
|---|---|---|
| Upload Worker | upload_worker.py |
Main async loop: poll → encode → upload → audit → stats |
| Work Poller | work_poller.py |
Find recordings with uploaded=false and local_deleted=false |
| FLAC Encoder | flac_encoder.py |
WAV→FLAC via ffmpeg subprocess |
| Path Builder | path_builder.py |
Build remote path from DB fields |
| Rclone Client | rclone_client.py |
Upload files via rclone subprocess |
| Audit Logger | audit_logger.py |
Immutable uploads table + set recordings.uploaded=true |
| Upload Stats | upload_stats.py |
Two-Phase Logging: startup detail → periodic summary |
Source References¶
Read these files before starting Phase 2:
| Purpose | File |
|---|---|
| Processor service entry point | services/processor/src/silvasonic/processor/__main__.py |
| Indexer — internal worker pattern to copy | services/processor/src/silvasonic/processor/indexer.py |
| Janitor — internal worker pattern reference | services/processor/src/silvasonic/processor/janitor.py |
| RecordingStats — Two-Phase Logging pattern | services/recorder/src/silvasonic/recorder/recording_stats.py |
| Recording DB model | packages/core/src/silvasonic/core/database/models/recordings.py |
| Crypto module (Phase 1) | packages/core/src/silvasonic/core/crypto.py |
| FFmpeg usage pattern (ADR-0024) | docs/adr/0024-ffmpeg-audio-engine.md |
| Worker Pull orchestration (ADR-0018) | docs/adr/0018-worker-pull-orchestration.md |
Tasks¶
- [x] Implement
upload_worker.py: UploadWorker(session_factory, settings, stats)— main async loop- Early exit: if
CloudSyncSettings.enabled == false→ log and skip (global pause) - Early exit: if
CloudSyncSettings.remote_type is None→ log warning and skip (no target configured) - Poll loop:
find_pending()→ encode → upload → audit → repeat - Sequential per file (parallel deferred to post-v1.0.0)
- Inline schedule check (opt-in):
_is_within_window(start_hour, end_hour) -> bool(KISS — 4-line function). Default: bothnull→ 24/7 continuous upload - Break on connection error: abort remaining batch, wait for next cycle
- Missing file handling: if WAV file not on disk (Janitor deleted it) → log, skip to next
- Respect
CloudSyncSettings.poll_intervalbetween cycles - Health:
self.health.update_status("upload_worker", True/False, details) - Heartbeat extra meta:
pending_count,uploaded_count,failed_count,last_upload_at - [x] Implement
work_poller.py: find_pending_uploads(session, batch_size) -> list[PendingUpload]- Single-target query:
WHERE uploaded=false AND local_deleted=false ORDER BY time ASC LIMIT batch - Uses
ix_recordings_upload_pendingpartial index onrecordings PendingUploadPydantic model:recording_id, file_raw, sensor_id, time- [x] Implement
flac_encoder.py: encode_wav_to_flac(wav_path: Path, output_dir: Path) -> Path- ffmpeg subprocess:
ffmpeg -i input.wav -c:a flac -compression_level 5 output.flac - Validate output file (exists, non-zero size)
- Clean up partial FLAC file on failure
- No FLAC caching — re-encode on retry (encoding is <1s per segment on RPi 5)
- [x] Implement
path_builder.py: build_remote_path(station_name, sensor_id, time, filename) -> str- Convention:
silvasonic/{station_slug}/{sensor_id}/{YYYY-MM-DD}/{filename}.flac - Station name slugified via simple inline regex (no external dependency)
- [x] Implement
rclone_client.py: RcloneClient(cloud_sync_settings: CloudSyncSettings, encryption_key: bytes):generate_rclone_conf() -> Path— decryptenc:-prefixed values inremote_configviacrypto.decrypt_value(), then write temprclone.conf- Validate
remote_configviaschemas.cloud_sync.validate_rclone_config(remote_type, remote_config)on init (validates after decryption) - For WebDAV/Nextcloud: set
vendor = nextcloud upload_file(local, remote_path, bandwidth_limit) -> RcloneResultRcloneResult(success, bytes_transferred, error_message, duration_s, is_connection_error)- Bandwidth limiting via
--bwlimitfromCloudSyncSettings.bandwidth_limit - Checksum verification via
--checksumflag
- [x] Implement
audit_logger.py: log_upload_attempt(session, recording_id, filename, remote_path, size, success, error_message):- INSERT into
uploadstable (immutable audit log) - Pass
duration_sas JSON withinerror_messageif needed for detailed metrics (KISS) - On success: UPDATE
recordings SET uploaded=true, uploaded_at=now()— Janitor signal - On failure:
uploadsrow withsuccess=false,recordings.uploadedunchanged
- INSERT into
- [x] Implement
upload_stats.py: UploadStats— Two-Phase Logging (same pattern asRecordingStats):- Startup Phase (default 5min): log every upload individually
- Steady State: periodic summary every 5min
- Shutdown:
emit_final_summary()with lifetime totals
- No
threading.Lockneeded — single-threaded worker - [x] Wire
UploadWorkerintoProcessorService.__main__.py: - Start alongside Indexer and Janitor as third internal worker
- Shutdown:
emit_final_summary()on SIGTERM
Acceptance Criteria¶
Phase is complete when
just checkpasses and all tests below are green.
Unit Tests (services/processor/tests/unit/test_upload_worker.py) — @pytest.mark.unit¶
| Test | Validates |
|---|---|
test_upload_disabled_skips |
enabled=false → worker does not poll |
test_poll_loop_processes_pending |
Mocked pending list → encode → upload → audit called in sequence |
test_connection_error_aborts_batch |
is_connection_error=True → remaining batch skipped |
test_missing_file_skipped |
WAV not on disk → logged, next recording processed |
test_null_schedule_always_active |
Both hours null → 24/7 upload |
test_within_window |
_is_within_window(22, 6): hour=23 → True |
test_outside_window |
_is_within_window(22, 6): hour=12 → False |
Unit Tests (services/processor/tests/unit/test_work_poller.py) — @pytest.mark.unit¶
| Test | Validates |
|---|---|
test_finds_pending_recordings |
uploaded=false, local_deleted=false → returned |
test_excludes_already_uploaded |
uploaded=true → excluded |
test_excludes_locally_deleted |
local_deleted=true → excluded |
test_batch_size_respected |
LIMIT matches parameter |
test_empty_result |
No pending recordings → empty list |
Unit Tests (services/processor/tests/unit/test_flac_encoder.py) — @pytest.mark.unit¶
| Test | Validates |
|---|---|
test_encode_creates_flac_file |
Mock subprocess → FLAC path returned |
test_encode_fails_on_ffmpeg_error |
returncode != 0 → raises FlacEncodingError |
test_encode_cleanup_on_failure |
Partial FLAC removed on error |
Unit Tests (services/processor/tests/unit/test_path_builder.py) — @pytest.mark.unit¶
| Test | Validates |
|---|---|
test_builds_correct_remote_path |
silvasonic/{slug}/{sensor}/{date}/{filename}.flac |
test_station_name_slugified |
"Silvasonic Müller-Station" → "silvasonic-mueller-station" |
test_filename_extension_replaced |
.wav input → .flac output |
Unit Tests (services/processor/tests/unit/test_rclone_client.py) — @pytest.mark.unit¶
| Test | Validates |
|---|---|
test_generate_rclone_conf_format |
Correct INI format written |
test_upload_success |
Mock subprocess → RcloneResult(success=True) |
test_upload_failure_connection |
Network error → is_connection_error=True |
test_bandwidth_limit_in_args |
--bwlimit present in CLI args |
test_encrypted_credentials_decrypted |
enc:-prefixed values in remote_config → plaintext in generated rclone.conf |
Unit Tests (services/processor/tests/unit/test_audit_logger.py) — @pytest.mark.unit¶
| Test | Validates |
|---|---|
test_success_marks_uploaded |
success=true → uploads INSERT + recordings.uploaded=true |
test_failure_no_flag_change |
success=false → uploads INSERT, recordings.uploaded unchanged |
test_audit_log_immutable |
INSERT only, no UPDATE on uploads table |
Unit Tests (services/processor/tests/unit/test_upload_stats.py) — @pytest.mark.unit¶
| Test | Validates |
|---|---|
test_startup_phase_logs_individually |
During startup: individual upload.completed log |
test_steady_state_accumulates |
After startup: no individual log |
test_summary_emitted_after_interval |
maybe_emit_summary() → upload.summary |
test_final_summary_on_shutdown |
emit_final_summary() → lifetime totals |
Integration Tests (services/processor/tests/integration/test_upload_pipeline_e2e.py) — @pytest.mark.integration¶
| Test | Validates |
|---|---|
test_full_pipeline_mock_rclone |
Seed recordings + workspace → poll → FLAC → mock rclone → audit → uploaded=true |
test_pipeline_skips_already_uploaded |
uploaded=true recording → not re-processed |
test_pipeline_handles_missing_file |
Recording in DB, file missing → error logged, pipeline continues |
Integration Tests (services/processor/tests/integration/test_flac_encoder_e2e.py) — @pytest.mark.integration¶
| Test | Validates |
|---|---|
test_real_wav_to_flac |
Synthetic WAV (numpy) → ffmpeg → valid FLAC file |
test_flac_smaller_than_wav |
FLAC output is smaller than WAV input |
Phase 3: System Tests¶
Goal: Verify upload worker runs correctly within the full Processor container. Verify Janitor integration with upload status.
User Stories: US-U02 (Continue recording indefinitely)
Dependency: Phase 2 (upload worker modules implemented and unit-tested).
Tasks¶
- [x] Verify upload worker starts alongside Indexer and Janitor in Processor container
- [x] Verify
CloudSyncSettings.enabled=false→ upload worker inactive (no rclone calls) - [x] Verify Janitor respects
uploadedflag for retention decisions - [x] Update Processor
README.mdwith Cloud-Sync-Worker section
Acceptance Criteria¶
Phase is complete when
just cipasses (including Processor image build).
System Tests (tests/system/test_processor_lifecycle.py) — @pytest.mark.system¶
| Test | Validates |
|---|---|
test_upload_worker_starts_with_processor |
Processor container logs show upload worker initialization |
test_upload_disabled_no_rclone |
enabled=false → no rclone subprocess spawned |
System Tests (tests/system/test_processor_resilience.py) — @pytest.mark.system¶
| Test | Validates |
|---|---|
test_janitor_respects_uploaded_flag |
uploaded=true → Janitor may delete; uploaded=false → keeps file |
Phase 4: Documentation & Release¶
Goal: Finalize documentation, bump version, run full pipeline, create release.
Dependency: All previous phases (1–3) complete.
Tasks¶
- [x] Update
VISION.md: Processor status annotation (Cloud-Sync-Worker →✅ AS-IS) - [x] Update
ROADMAP.md: v0.6.0 →✅ Done - [x] Update User Stories: mark completed acceptance criteria (US-U01, US-U02, US-U06 backend)
- [x] Update Processor
README.md: Implementation Status table - [x] Update this file (
milestone_0_6_0.md): check all items, Status →✅ Done - [x] Version bump
0.5.x→0.6.0in 3 files (perrelease_checklist.md§1): packages/core/src/silvasonic/core/__init__.pypyproject.toml(root)README.mdLine 5- [x]
just cipasses (full pipeline including Processor image rebuild with rclone) - [x]
just test-hwpasses (recommended — no regressions) - [x] Commit + annotated tag + push:
Out of Scope (Deferred)¶
| Item | Target Version |
|---|---|
| Web-UI for remote target configuration (US-U04) | v0.9.0 |
| Upload progress in Dashboard (US-U05) | v0.9.0 |
| Upload audit log in Web-UI (US-U06 UI portion) | v0.9.0 |
| Parallel uploads (multiple files concurrently) | post-v1.0.0 |
| Resume support (partial upload tracking) | post-v1.0.0 |
| Multi-target upload | post-v1.0.0 |
| Hardware-backed KMS / Secure Element integration | post-v1.0.0 |
Note: US-U04 (Settings via Web UI) and US-U05 (Dashboard Status) require the Web-Interface (v0.9.0). This milestone implements the backend support —
CloudSyncSettingsschema, heartbeat payload with upload metrics. Web-Mock routes (/upload) already exist with mock data.Note: US-U06 (Seamless tracking) is split: the audit backend (immutable
uploadstable) is implemented here. The Web-Interface for browsing the audit log is deferred to v0.9.0.Note: The upload worker only uploads Raw recordings as FLAC (ADR-0011 §4). Processed (48kHz) files can be regenerated from the lossless FLAC on the cloud side if needed.