Milestone v0.5.0 — Analysis & Backend Orchestration¶
Target: v0.5.0 — Processor Service (Indexer + Janitor),
recordingsSchema & Indices, Processor Config Seeding, Controller IntegrationStatus: ✅ Done
References: ADR-0009, ADR-0011, ADR-0018, ADR-0019, ADR-0023, ADR-0025, Processor Service Spec, Service Blueprint
Phase 1: Processor Service Skeleton & Compose Integration¶
Goal: Create the Processor service following the Service Blueprint, integrate into Compose as a Tier 1 service, and verify basic lifecycle (health, heartbeat, graceful shutdown).
User Stories: —
Tasks¶
- [x] Scaffold
services/processor/following the Service Blueprint (§1): Containerfile,pyproject.toml,README.mdsrc/silvasonic/processor/__init__.py,__main__.py,py.typedtests/unit/,tests/integration/- [x] Implement
ProcessorService(SilvaService)in__main__.py: service_name = "processor",service_port = 9200- Override
run()with placeholder logic (Phase 3+4 fill this in) - Read
ProcessorSettingsfromsystem_configtable on startup (single DB read, Immutable Container pattern, ADR-0019) - [x] Register in workspace:
- Root
pyproject.toml: addsilvasonic-processorto[project] dependenciesand[tool.uv.sources] - [x] Add
Containerfilefollowing the Service Blueprint (§5): - Base:
python:3.13-slim-bookworm - System deps:
curl,libsndfile1(WAV metadata viasoundfile) - Port:
9200 - [x] Add Processor to
compose.ymlas Tier 1 service: container_name: silvasonic-processor- Depends on
database(healthy) - Mount Recorder workspace read-write (
${SILVASONIC_WORKSPACE_PATH}/recorder:/data/recorder:z) — Processor is the only non-Recorder service with:rwaccess (Janitor delete authority, ADR-0009) - Mount Processor workspace read-write (
${SILVASONIC_WORKSPACE_PATH}/processor:/data/processor:z) - Redis URL, DB env vars
- Healthcheck on
:9200/healthy - [x] Add
compose.override.ymldev mounts for hot-reload - [x] Add
SILVASONIC_PROCESSOR_PORT=9200to.env.example - [x]
just checkpasses
Tests¶
Unit (services/processor/tests/unit/) — @pytest.mark.unit¶
- [x]
test_processor.py—TestProcessorService test_package_import—import silvasonic.processorsucceedstest_service_name_and_port—service_name == "processor",service_port == 9200test_lifecycle_start_shutdown—ProcessorServicestarts, shutdown signal triggers clean exit (mocked DB/Redis)test_settings_loaded_from_db— PydanticProcessorSettingscorrectly deserialized from mocksystem_configrowtest_settings_defaults—ProcessorSettings()defaults matchconfig/defaults.ymlvalues
Integration (services/processor/tests/integration/) — @pytest.mark.integration¶
- [x]
test_processor_lifecycle.py—TestProcessorLifecycle test_processor_starts_with_db— Processor starts in Testcontainer with real DB,/healthyreturns 200 with{"status": "ok"}test_processor_heartbeat_published— Processor publishes heartbeat to Redis within 15s
Smoke (tests/smoke/) — @pytest.mark.smoke¶
- [x]
test_health.py—TestServiceHealth::test_processor_healthy+TestServiceHeartbeats::test_processor_heartbeat_in_redis
Phase 2: Database Schema Enhancements¶
Goal: Finalize the recordings table schema and add indices for Worker Pull (ADR-0018) and Upload polling (v0.6.0 prep) — before the Indexer writes to this table.
User Stories: —
Decision (ADR-0025):
recordingsremains a standard PostgreSQL table (no Hypertable). FK constraints fromdetectionsanduploadsare preserved. Data volume (~2M rows/year) does not require partitioning. See ADR-0025.
Tasks¶
- [x] Add partial index for Worker Pull pattern (ADR-0018):
- [x] Add partial index for Upload polling (v0.6.0 preparation):
- [x] Verify
analysis_stateJSONB column supports the Worker PullSELECT ... FOR UPDATE SKIP LOCKEDpattern efficiently - [x] Update
services/database/init/01-init-schema.sqlwith new indices - [x] Update SQLAlchemy ORM models in
silvasonic.core.database.modelsif schema changes apply
Tests¶
Unit (packages/core/tests/unit/) — @pytest.mark.unit¶
- [x]
test_recording_model.py—TestRecordingModel test_analysis_state_default_empty_jsonb— newRecording()hasanalysis_state == {}test_local_deleted_default_false— newRecording()haslocal_deleted == Falsetest_uploaded_default_false— newRecording()hasuploaded == False
Integration (tests/integration/) — @pytest.mark.integration¶
- [x]
test_worker_pull_query.py—TestWorkerPullQuery test_for_update_skip_locked— Two concurrent sessions: first locks a row, second gets a different row viaSKIP LOCKEDtest_partial_index_used—EXPLAINconfirmsix_recordings_analysis_pendingindex is usedtest_upload_pending_index—EXPLAINconfirmsix_recordings_upload_pendingindex is used
Phase 3: Indexer — Filesystem Polling & Recording Registration¶
Goal: The Indexer scans the Recorder workspace for promoted WAV files, extracts metadata, and registers them in the recordings table. Recordings appear in the database within seconds of being written to disk.
User Stories: US-P01 (Aufnahmen erscheinen automatisch)
Tasks¶
- [x] Implement
silvasonic/processor/indexer.py—Indexerclass: - Periodic filesystem polling of all
recorder/*/data/processed/*.wav(configurable interval fromProcessorSettings.indexer_poll_interval, default:2.0seconds) - Extract WAV metadata via
soundfile: duration, sample_rate, channels, file size - Determine
sensor_idfrom directory structure (recorder/{device_name}/data/...) - Locate corresponding raw file:
recorder/{device_name}/data/raw/{same_filename}.wav - Calculate
filesize_rawfrom the raw file - [x] Register each new WAV in the
recordingstable: time: parsed from segment filename (ISO timestamp)sensor_id: device name from directory pathfile_raw,file_processed: relative paths to both streamsduration,sample_rate,filesize_raw,filesize_processeduploaded = false,local_deleted = falseanalysis_state = '{}'::jsonb(empty — no workers have processed it yet)- Idempotent: check for existing entry by
file_processedbefore insert (avoid duplicates) - [x] Only index files from
data/directories — never from.buffer/(only complete, promoted segments) - [x] Integrate Indexer as periodic async task in
ProcessorService.run() - [x] Report indexing metrics via
get_extra_meta()for heartbeat: last_indexed_at,pending_count(files on disk not yet in DB),total_indexed- [x] Update health component:
self.health.update_status("indexer", True/False, details) - [x] Implement Filesystem Reconciliation Audit (Split-Brain Healing):
- Runs once on Processor startup, before the Indexer polling loop begins
- Queries all
recordingsrows wherelocal_deleted = false - Verifies
file_processedexists on the filesystem - If file is missing: sets
local_deleted = trueand logs atWARNINGlevel with reason"reconciliation" - Reports
reconciled_countin startup log and heartbeat metrics - Rationale: Heals Split-Brain state caused by Panic-Mode blind deletion (Phase 4) during DB outages
Tests¶
Unit (services/processor/tests/unit/) — @pytest.mark.unit¶
- [x]
test_indexer.py—TestIndexer test_wav_metadata_extraction—soundfile.info()returns correct duration, sample_rate, channels from a synthetic WAVtest_sensor_id_from_path— pathrecorder/ultramic-01/data/processed/seg.wavextractssensor_id == "ultramic-01"test_timestamp_from_filename— ISO-timestamp filename parsed correctlytest_raw_file_path_resolution— given processed path, resolves corresponding raw pathtest_idempotent_skip_existing— file already in DB (mocked) is not re-insertedtest_buffer_dir_excluded— files in.buffer/are never indexedtest_only_data_dir_scanned— onlydata/processed/is scanned, not parent or sibling dirs- [x]
test_reconciliation.py—TestReconciliationAudit test_missing_file_marked_deleted— DB row withlocal_deleted=false, file absent → setslocal_deleted=truetest_existing_file_unchanged— DB row withlocal_deleted=false, file present → no changetest_already_deleted_row_skipped— DB row withlocal_deleted=trueis not re-checkedtest_reconciled_count_reported— returns correct count of reconciled rows
Integration (services/processor/tests/integration/) — @pytest.mark.integration¶
- [x]
test_indexer_e2e.py—TestIndexerIntegration test_new_wav_indexed— place WAV files in mock workspace → Indexer picks them up → verifyrecordingsrows in DB (Testcontainer PostgreSQL)test_idempotent_reindex— run Indexer twice on same files → no duplicaterecordingsrowstest_multiple_sensors_indexed— files from two sensor directories → correctsensor_idper row- [x]
test_reconciliation_e2e.py—TestReconciliationIntegration test_orphaned_rows_healed— seed DB withlocal_deleted=falserows, remove files from disk → Reconciliation Audit marks themlocal_deleted=truetest_valid_rows_preserved— seed DB withlocal_deleted=falserows, files exist → no changes
Phase 4: Janitor — Data Retention & Storage Management¶
Goal: The Janitor monitors NVMe disk utilization and enforces the escalating retention policy (ADR-0011 §6) to prevent storage exhaustion. The Recorder never stops due to a full disk.
User Stories: US-P02 (Endlos-Aufnahme ohne Speichersorgen)
Tasks¶
- [x] Implement
silvasonic/processor/janitor.py—Janitorclass: - Periodic disk usage check (configurable interval from
ProcessorSettings.janitor_interval_seconds) - Monitor mount point of
/data/recorderviashutil.disk_usage() -
Implement three escalating retention modes:
Mode Threshold Criteria Log Level Housekeeping > 70% uploaded=trueANDanalysis_statecomplete for all workersINFODefensive > 80% uploaded=true(regardless of analysis)WARNINGPanic > 90% Oldest files regardless of status CRITICAL -
[x] Implement Soft Delete pattern:
- Physically delete both
rawandprocessedWAV files from disk - Update DB row:
local_deleted = TRUE(preserve historical inventory) - Log each deletion: filename, reason, mode
- [x] Implement Panic Mode fallback:
- If DB is unreachable during Panic Mode, fall back to filesystem
mtimefor blind cleanup (oldest files first) - [x] Implement Uploader-Fallback: When no Uploader is configured (no active
storage_remotesrows), skipuploadedcondition in Housekeeping/Defensive. Logged at WARNING withjanitor.uploader_fallback_active - [x] Implement Batch Size Limit:
janitor_batch_size(default 50) per cleanup cycle. Add toProcessorSettings - [x] Exclusive delete authority: only the Processor deletes Recorder files — enforced by RW mount on Recorder workspace (all others mount
:ro, ADR-0009) - [x] Integrate Janitor as periodic async task in
ProcessorService.run() - [x] Report retention metrics via
get_extra_meta(): disk_usage_percent,current_mode(idle/housekeeping/defensive/panic),files_deleted_total- [x] Update health component:
self.health.update_status("janitor", ...)
Tests¶
Unit (services/processor/tests/unit/) — @pytest.mark.unit¶
- [x]
test_janitor.py—TestEvaluateMode,TestFindDeletable,TestDeleteFiles,TestSoftDelete,TestPanicFilesystemFallback,TestRunCleanup test_idle_below_all_thresholds— 60% usage → modeidle, no deletionstest_housekeeping_mode_triggers— 75% usage → modehousekeepingtest_defensive_mode_triggers— 85% usage → modedefensivetest_panic_mode_triggers— 95% usage → modepanictest_housekeeping_criteria_with_uploader— only deletesuploaded=true AND analysis_state completetest_housekeeping_no_uploader_fallback— nostorage_remotes→ skipsuploadedchecktest_defensive_criteria_with_uploader— deletesuploaded=trueregardless of analysistest_defensive_no_uploader_fallback— nostorage_remotes→ deletes all non-deletedtest_panic_criteria— deletes oldest files regardless of statustest_soft_delete_updates_db— physical delete + DB rowlocal_deleted=true(mocked fs/DB)test_panic_fallback_no_db— DB unreachable → falls back tomtime-based filesystem cleanuptest_db_offline_housekeeping_skips— DB offline during Housekeeping → skips (no data loss)test_metrics_reported—disk_usage_percent,current_mode,files_deleted_totalin heartbeat extrastest_batch_size_respected— max N files per LIMIT parameter
Integration (services/processor/tests/integration/) — @pytest.mark.integration¶
- [x]
test_janitor_e2e.py—TestJanitorIntegration test_housekeeping_deletes_correct_files— seed DB + filesystem, mockshutil.disk_usageat 75% → only uploaded+analyzed files deleted, DB rows updatedtest_defensive_deletes_uploaded_only— mock 85% → uploaded files deleted regardless of analysis statetest_panic_deletes_oldest— mock 95% → oldest files deleted regardless of statustest_panic_filesystem_fallback— mock 95% + DB offline → files deleted bymtime, DB untouched
Phase 5: Configuration Seeding & Controller Integration¶
Goal: The Controller seeds ProcessorSettings defaults into system_config on startup, and the Processor reads them. Configuration changes via Web-Interface trigger a Processor restart via State Reconciliation.
User Stories: US-P03 (Speicherregeln anpassen)
Tasks¶
- [x] Uncomment
processorsettings block inconfig/defaults.yml: - [x] Verify Controller's
ConfigSeedercorrectly seedsprocessorkey intosystem_configtable on startup (INSERT ... ON CONFLICT DO NOTHING) - [x] Verify
ProcessorSettingsPydantic schema defaults match YAML seed values (CI test from ADR-0023) - ~~Add
processorentry tosystem_servicestable seed~~ → Deferred to v0.9.0 (no consumer until Web-Interface) - [x] Add Processor workspace directory (
processor/) toscripts/init.pyinitialization
Tests¶
Unit (services/controller/tests/unit/) — @pytest.mark.unit¶
- [x]
test_seeder.py(extend existing) —TestConfigSeeder::test_seed_inserts_defaults test_seed_inserts_defaults—ConfigSeederinserts all 4 schema_map keys (system,processor,uploader,birdnet) with correct defaultstest_seed_skips_existing_values— seeding twice does not overwrite existing values- [x]
test_seeder.py—TestDefaultsYamlParity(drift guard) test_yaml_keys_covered_by_schema_map— every YAML config key has a schema_map entrytest_schema_map_keys_present_in_yaml— every schema_map key exists in defaults.ymltest_all_yaml_values_pass_pydantic_validation— every YAML section validates against its Pydantic schema
Unit (services/processor/tests/unit/) — @pytest.mark.unit¶
- [x]
test_processor.py—TestProcessorSettings test_settings_defaults—ProcessorSettings()defaults match config/defaults.yml valuestest_settings_round_trip— serialize to JSON → deserialize → identical Pydantic model
Integration (services/controller/tests/integration/) — @pytest.mark.integration¶
- [x]
test_seeder.py(existing) —TestConfigSeederIntegration test_seed_inserts_system_config— fresh DB → Controller seeds →system_configcontains keys with correct JSONBtest_seed_is_idempotent— seeding twice does not overwrite- [x]
test_processor_lifecycle.py—TestProcessorLifecycle::test_processor_starts_with_db— Processor reads seeded settings from DB - ~~
test_processor_system_service_registered~~ → Deferred to v0.9.0
Phase 6: Robustness & End-to-End Verification¶
Goal: Verify end-to-end pipeline from Recorder output to database registration, and ensure the Processor survives infrastructure failures.
User Stories: US-P01, US-P02, US-P04 (Pipeline-Status)
Tasks¶
- [x] Verify heartbeat payload contains Processor-specific metrics (indexer + janitor status)
- [x] Update Processor
README.mdwith implemented features and status - [x] Update
ROADMAP.md: mark v0.5.0 as🔨 In Progress - [x]
just check-allpasses (full CI pipeline)
Tests¶
System (tests/system/) — @pytest.mark.system¶
- [x]
test_processor_lifecycle.py—TestProcessorLifecycle test_recorder_to_processor_pipeline✅test_processor_restart_idempotent✅test_concurrent_recorders_indexed✅test_heartbeat_has_processor_metrics✅- [x]
test_processor_resilience.py—TestProcessorResilience test_redis_outage_indexing_continues✅test_redis_outage_janitor_continues✅test_db_outage_housekeeping_skips✅test_db_outage_panic_filesystem_fallback✅test_split_brain_healing✅
Smoke (tests/smoke/) — @pytest.mark.smoke¶
- [x]
test_health.py—TestServiceHealth test_processor_healthy✅- [x]
test_health.py—TestServiceHeartbeats test_processor_heartbeat_in_redis✅
System HW (tests/system/) — @pytest.mark.system_hw¶
Note: Only if v0.5.0 changes affect device detection or Recorder interaction. Recommended but not mandatory.
- [x]
test_hw_recording.py(extend existing) —TestFullPipelineE2E test_hw_recorder_to_processor_pipeline— real USB mic → Recorder → WAV → Processor Indexer → DB recordings ✅ (implemented, requiresjust test-hw)
Out of Scope (Deferred)¶
| Item | Target Version |
|---|---|
| BirdNET analysis worker | v0.8.0 |
| BatDetect analysis worker | v1.3.0 |
| Uploader service (FLAC compression, remote sync) | v0.6.0 |
| Web-Interface (Dashboard, settings UI) | v0.9.0 |
Redis PUBLISH optimization for instant worker wake |
post-v1.0.0 |
| TimescaleDB continuous aggregates | post-v1.0.0 |
| Archive-before-delete safety | post-v1.0.0 |
| Live Opus stream (Recorder → Icecast) | v1.1.0 |
Note: The ROADMAP.md scope for v0.5.0 mentions "Local Inference (BirdNET & BatDetect models)" — BirdNET is now scheduled for v0.8.0 (pre-MVP) and BatDetect for v1.3.0 (post-MVP). This milestone focuses on the critical Processor infrastructure that all workers depend on. The Processor's Indexer and Janitor are prerequisites for any analysis worker (ADR-0018).
Note: US-P03 (settings via Web-Interface) requires both the Processor (this milestone) and the Web-Interface (v0.9.0). This milestone implements the backend support (config seeding, read-on-startup). The UI will be added in v0.9.0.
Note: US-P04 (pipeline status in dashboard) requires the Web-Interface (v0.9.0). This milestone ships the heartbeat payload with the required metrics. The dashboard visualization is deferred.