Usage Statistics - Technical Specification¶
Version: 1.0 (Draft) Last Updated: 2025-11-20 Status: Draft for Implementation
System Architecture¶
Component Overview¶
┌─────────────────────────────────────────────────────────────────┐
│ Printernizer Application │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ src/services/usage_statistics_service.py │ │
│ │ │ │
│ │ - record_event(event_type, metadata) │ │
│ │ - aggregate_stats() → dict │ │
│ │ - submit_stats() → bool │ │
│ │ - get_local_stats() → dict │ │
│ │ - delete_all_stats() → bool │ │
│ │ - export_stats() → JSON │ │
│ └────────────────┬─────────────────────────────────────────┘ │
│ │ │
│ ┌────────────────▼─────────────────────────────────────────┐ │
│ │ src/database/repositories/usage_statistics_repository.py│ │
│ │ │ │
│ │ - insert_event(event) │ │
│ │ - get_events(filters) → List[Event] │ │
│ │ - get_setting(key) → str │ │
│ │ - set_setting(key, value) │ │
│ │ - delete_all_events() │ │
│ └────────────────┬─────────────────────────────────────────┘ │
│ │ │
│ ┌────────────────▼─────────────────────────────────────────┐ │
│ │ SQLite Database (printernizer.db) │ │
│ │ │ │
│ │ Tables: │ │
│ │ - usage_events │ │
│ │ - usage_settings │ │
│ │ - usage_aggregates (optional cache) │ │
│ └──────────────────────────────────────────────────────────┘ │
│ │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ src/api/routers/usage_statistics.py │ │
│ │ │ │
│ │ GET /api/v1/usage-stats/local │ │
│ │ GET /api/v1/usage-stats/export │ │
│ │ POST /api/v1/usage-stats/opt-in │ │
│ │ POST /api/v1/usage-stats/opt-out │ │
│ │ DELETE /api/v1/usage-stats/delete-all │ │
│ └──────────────────────────────────────────────────────────┘ │
│ │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ Frontend: Settings UI │ │
│ │ - Privacy settings page │ │
│ │ - Local statistics viewer │ │
│ │ - Export/delete controls │ │
│ └──────────────────────────────────────────────────────────┘ │
│ │
└───────────────────────────┬───────────────────────────────────────┘
│ HTTPS POST (weekly, if opted in)
│
┌─────────────▼─────────────────────────────────┐
│ Aggregation Service (Phase 2) │
│ https://stats.printernizer.com │
│ │
│ POST /submit │
│ - Validate payload │
│ - Rate limiting │
│ - Store in SQL Server │
└───────────────────────────────────────────────┘
Database Schema (Phase 1: Local Storage)¶
Table: usage_events¶
Stores individual usage events for local analysis and aggregation.
CREATE TABLE IF NOT EXISTS usage_events (
id TEXT PRIMARY KEY, -- UUID v4
event_type TEXT NOT NULL, -- Event category (see Event Types below)
timestamp DATETIME NOT NULL, -- ISO 8601 format
metadata TEXT, -- JSON blob with event-specific data
submitted BOOLEAN DEFAULT 0, -- Flag: has this been submitted?
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
-- Indexes for performance
INDEX idx_event_type (event_type),
INDEX idx_timestamp (timestamp),
INDEX idx_submitted (submitted)
);
Event Types:
- app_start - Application started
- app_shutdown - Application stopped gracefully
- job_created - Print job created
- job_completed - Print job completed successfully
- job_failed - Print job failed
- file_downloaded - File downloaded from printer
- file_uploaded - File uploaded to library
- printer_connected - Printer connected successfully
- printer_disconnected - Printer disconnected
- error_occurred - Error encountered (with sanitized details)
- feature_enabled - Feature toggled on
- feature_disabled - Feature toggled off
Metadata Examples:
// app_start event
{
"app_version": "2.7.0",
"python_version": "3.11.0",
"platform": "linux",
"deployment_mode": "homeassistant"
}
// job_completed event
{
"duration_seconds": 3600,
"printer_type": "bambu_lab"
}
// error_occurred event
{
"error_type": "connection_timeout",
"component": "printer_service",
"printer_type": "prusa"
}
Table: usage_settings¶
Stores configuration for usage statistics feature.
CREATE TABLE IF NOT EXISTS usage_settings (
key TEXT PRIMARY KEY,
value TEXT NOT NULL,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP
);
Keys:
- opt_in_status - "enabled" or "disabled"
- installation_id - Random UUID (generated on first run)
- first_run_date - ISO 8601 timestamp
- last_submission_date - ISO 8601 timestamp
- submission_count - Number of times stats submitted
- privacy_policy_version - Version user agreed to
Table: usage_aggregates (Optional Cache)¶
Pre-computed aggregates for faster local statistics viewing.
CREATE TABLE IF NOT EXISTS usage_aggregates (
period_start DATE PRIMARY KEY, -- Week/day start
period_end DATE NOT NULL, -- Week/day end
aggregate_data TEXT NOT NULL, -- JSON blob
computed_at DATETIME DEFAULT CURRENT_TIMESTAMP
);
Data Models (Pydantic)¶
Event Model¶
from pydantic import BaseModel, Field
from datetime import datetime
from typing import Optional, Dict, Any
import uuid
class UsageEvent(BaseModel):
"""Individual usage event."""
id: str = Field(default_factory=lambda: str(uuid.uuid4()))
event_type: str = Field(..., min_length=1, max_length=50)
timestamp: datetime = Field(default_factory=datetime.utcnow)
metadata: Optional[Dict[str, Any]] = None
submitted: bool = False
class Config:
json_schema_extra = {
"example": {
"id": "550e8400-e29b-41d4-a716-446655440000",
"event_type": "job_completed",
"timestamp": "2024-11-20T12:00:00Z",
"metadata": {"duration_seconds": 3600, "printer_type": "bambu_lab"},
"submitted": False
}
}
Aggregated Stats Model¶
class AggregatedStats(BaseModel):
"""Aggregated statistics for a time period."""
schema_version: str = "1.0"
submission_timestamp: datetime = Field(default_factory=datetime.utcnow)
installation: InstallationInfo
period: TimePeriod
printer_fleet: PrinterFleetStats
usage_stats: UsageStats
error_summary: Dict[str, int]
class Config:
json_schema_extra = {
"example": {
"schema_version": "1.0",
"submission_timestamp": "2024-11-20T12:00:00Z",
"installation": {...},
"period": {...},
"printer_fleet": {...},
"usage_stats": {...},
"error_summary": {"connection_timeout": 2}
}
}
class InstallationInfo(BaseModel):
"""Anonymous installation information."""
installation_id: str = Field(..., min_length=36, max_length=36)
first_seen: datetime
app_version: str
python_version: str
platform: str # "linux", "windows", "darwin"
deployment_mode: str # "homeassistant", "docker", "standalone", "pi"
country_code: str = Field(..., min_length=2, max_length=2)
class TimePeriod(BaseModel):
"""Time period for aggregated stats."""
start: datetime
end: datetime
duration_days: int
class PrinterFleetStats(BaseModel):
"""Printer fleet composition (anonymous)."""
printer_count: int = Field(..., ge=0)
printer_types: list[str]
printer_type_counts: Dict[str, int]
class UsageStats(BaseModel):
"""Usage activity statistics."""
job_count: int = Field(..., ge=0)
file_count: int = Field(..., ge=0)
upload_count: int = Field(..., ge=0)
uptime_hours: int = Field(..., ge=0)
feature_usage: Dict[str, bool]
Service Layer Implementation¶
UsageStatisticsService¶
# src/services/usage_statistics_service.py
from typing import Optional, Dict, Any
from datetime import datetime, timedelta
import structlog
import aiohttp
from src.database.repositories.usage_statistics_repository import UsageStatisticsRepository
from src.models.usage_statistics import UsageEvent, AggregatedStats
from src.utils.config import get_settings
logger = structlog.get_logger()
class UsageStatisticsService:
"""
Privacy-first usage statistics service.
Responsibilities:
- Record usage events locally
- Aggregate statistics for submission
- Submit to aggregation service (if opted in)
- Provide local statistics viewer data
- Handle opt-in/opt-out
"""
def __init__(self, repository: UsageStatisticsRepository):
self.repository = repository
self.settings = get_settings()
self.aggregation_endpoint = "https://stats.printernizer.com/submit"
async def is_opted_in(self) -> bool:
"""Check if user has opted in to usage statistics."""
opt_in_status = await self.repository.get_setting("opt_in_status")
return opt_in_status == "enabled"
async def opt_in(self) -> bool:
"""Enable usage statistics collection and submission."""
logger.info("User opted in to usage statistics")
await self.repository.set_setting("opt_in_status", "enabled")
# Generate installation ID if not exists
installation_id = await self.repository.get_setting("installation_id")
if not installation_id:
installation_id = str(uuid.uuid4())
await self.repository.set_setting("installation_id", installation_id)
await self.repository.set_setting("first_run_date", datetime.utcnow().isoformat())
return True
async def opt_out(self) -> bool:
"""Disable usage statistics collection and submission."""
logger.info("User opted out of usage statistics")
await self.repository.set_setting("opt_in_status", "disabled")
return True
async def record_event(
self,
event_type: str,
metadata: Optional[Dict[str, Any]] = None
) -> Optional[UsageEvent]:
"""
Record a usage event.
Events are stored locally regardless of opt-in status
(allows user to review before opting in).
Submission only happens if opted in.
"""
try:
event = UsageEvent(
event_type=event_type,
metadata=metadata or {}
)
await self.repository.insert_event(event)
logger.debug("Usage event recorded", event_type=event_type)
return event
except Exception as e:
# Never let statistics break the app
logger.error("Failed to record usage event", error=str(e), event_type=event_type)
return None
async def aggregate_stats(
self,
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None
) -> AggregatedStats:
"""
Aggregate usage statistics for a time period.
Default: last 7 days if no dates provided.
"""
if not end_date:
end_date = datetime.utcnow()
if not start_date:
start_date = end_date - timedelta(days=7)
# Get events for period
events = await self.repository.get_events(
start_date=start_date,
end_date=end_date
)
# TODO: Implement aggregation logic
# - Count events by type
# - Get printer fleet composition (from PrinterService)
# - Calculate uptime
# - Summarize errors
return aggregated_stats
async def submit_stats(self) -> bool:
"""
Submit aggregated statistics to remote endpoint.
Only submits if opted in.
Returns True if submission successful or not needed.
"""
if not await self.is_opted_in():
logger.debug("Skipping stats submission - user opted out")
return True
try:
# Get aggregated stats
stats = await self.aggregate_stats()
# Submit via HTTPS
async with aiohttp.ClientSession() as session:
async with session.post(
self.aggregation_endpoint,
json=stats.model_dump(),
headers={"Content-Type": "application/json"},
timeout=aiohttp.ClientTimeout(total=10)
) as response:
if response.status == 200:
logger.info("Usage statistics submitted successfully")
# Mark events as submitted
await self.repository.mark_events_submitted(
start_date=stats.period.start,
end_date=stats.period.end
)
# Update last submission date
await self.repository.set_setting(
"last_submission_date",
datetime.utcnow().isoformat()
)
return True
else:
logger.warning(
"Failed to submit usage statistics",
status_code=response.status
)
return False
except Exception as e:
# Never let statistics break the app
logger.error("Error submitting usage statistics", error=str(e))
return False
async def get_local_stats(self) -> Dict[str, Any]:
"""
Get local statistics for user viewing.
Returns human-readable summary of collected data.
"""
# TODO: Implement local stats viewer data
pass
async def export_stats(self) -> str:
"""Export all local statistics as JSON."""
events = await self.repository.get_all_events()
settings = await self.repository.get_all_settings()
return json.dumps({
"events": [e.model_dump() for e in events],
"settings": settings,
"exported_at": datetime.utcnow().isoformat()
}, indent=2)
async def delete_all_stats(self) -> bool:
"""Delete all local statistics."""
logger.info("Deleting all local usage statistics")
await self.repository.delete_all_events()
return True
API Endpoints (Phase 1)¶
GET /api/v1/usage-stats/local¶
Get local statistics summary for user viewing.
Response:
{
"installation_id": "550e8400...",
"first_seen": "2024-11-01T00:00:00Z",
"opt_in_status": "disabled",
"total_events": 1234,
"this_week": {
"job_count": 23,
"file_count": 18,
"error_count": 2
},
"last_submission": null
}
POST /api/v1/usage-stats/opt-in¶
Enable usage statistics collection and submission.
Response:
{
"success": true,
"installation_id": "550e8400...",
"message": "Usage statistics enabled. Thank you for helping improve Printernizer!"
}
POST /api/v1/usage-stats/opt-out¶
Disable usage statistics submission.
Response:
GET /api/v1/usage-stats/export¶
Export all local statistics as JSON.
Response: (JSON file download)
DELETE /api/v1/usage-stats/delete-all¶
Delete all local usage statistics.
Response:
Background Tasks¶
Submission Scheduler¶
# src/tasks/usage_statistics_submitter.py
import asyncio
from datetime import datetime, timedelta
async def periodic_stats_submission(service: UsageStatisticsService):
"""
Background task to submit statistics weekly.
Runs every 24 hours, submits if 7 days since last submission.
"""
while True:
try:
last_submission = await service.repository.get_setting("last_submission_date")
if last_submission:
last_date = datetime.fromisoformat(last_submission)
if datetime.utcnow() - last_date >= timedelta(days=7):
await service.submit_stats()
else:
# First submission after opt-in
await service.submit_stats()
except Exception as e:
logger.error("Error in periodic stats submission", error=str(e))
# Check every 24 hours
await asyncio.sleep(86400)
Integration Points¶
Application Startup¶
# src/main.py
from src.services.usage_statistics_service import UsageStatisticsService
async def on_startup():
# ... existing startup code ...
# Initialize usage statistics
stats_service = UsageStatisticsService(repository=stats_repo)
await stats_service.record_event("app_start", metadata={
"app_version": APP_VERSION,
"python_version": platform.python_version(),
"platform": platform.system().lower(),
"deployment_mode": get_deployment_mode()
})
# Start background submission task
asyncio.create_task(periodic_stats_submission(stats_service))
Job Service Integration¶
# src/services/job_service.py
async def create_job(self, ...):
# ... existing job creation logic ...
# Record usage event
if self.stats_service:
await self.stats_service.record_event("job_created", metadata={
"printer_type": printer.type
})
Performance Considerations¶
Non-Blocking Design¶
- All statistics operations are async
- Never block main application flow
- Fail silently if statistics fail
Database Optimization¶
- Indexes on frequently queried columns
- Batch inserts for multiple events
- Periodic cleanup of old events (optional)
Memory Usage¶
- Events are not kept in memory
- Direct database writes
- Aggregation done on-demand
Network Impact¶
- Submissions are async and background
- Max 1 submission per week
- Payload size: ~1KB (minimal bandwidth)
Testing Strategy¶
Unit Tests¶
test_usage_statistics_service.pytest_usage_statistics_repository.py- Mock external HTTP calls
Integration Tests¶
- Test full event flow
- Test aggregation accuracy
- Test opt-in/opt-out behavior
Privacy Tests¶
- Verify no PII in payloads
- Test data sanitization
- Verify opt-out stops submission
Migration¶
Database Migration¶
# src/database/migrations/00XX_add_usage_statistics.py
async def upgrade(connection):
"""Add usage statistics tables."""
await connection.execute("""
CREATE TABLE IF NOT EXISTS usage_events (
id TEXT PRIMARY KEY,
event_type TEXT NOT NULL,
timestamp DATETIME NOT NULL,
metadata TEXT,
submitted BOOLEAN DEFAULT 0,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
)
""")
await connection.execute("""
CREATE INDEX idx_usage_events_type ON usage_events(event_type)
""")
await connection.execute("""
CREATE INDEX idx_usage_events_timestamp ON usage_events(timestamp)
""")
await connection.execute("""
CREATE TABLE IF NOT EXISTS usage_settings (
key TEXT PRIMARY KEY,
value TEXT NOT NULL,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP
)
""")
async def downgrade(connection):
"""Remove usage statistics tables."""
await connection.execute("DROP TABLE IF EXISTS usage_events")
await connection.execute("DROP TABLE IF EXISTS usage_settings")
Security Considerations¶
Input Validation¶
- Sanitize all event metadata
- Validate event types against whitelist
- Limit metadata size (max 1KB per event)
Rate Limiting¶
- Max 1 submission per hour per installation
- Prevent DoS of aggregation service
Data Sanitization¶
- Remove file paths from error messages
- Remove network information
- Remove user input
Next Steps: 1. Review and approve technical design 2. Implement Phase 1 (local collection) 3. Write comprehensive tests 4. Deploy and monitor