Skip to content

Printernizer Integration Patterns

Overview

This document defines the integration patterns for communicating with different printer types in the Printernizer system. Each printer manufacturer uses different protocols and communication patterns, requiring specialized integration approaches while maintaining a unified internal interface.

Integration Architecture Principles

1. Protocol Abstraction

  • Unified interface across all printer types
  • Protocol-specific implementations behind common abstractions
  • Consistent error handling and recovery patterns

2. Event-Driven Communication

  • Real-time status updates via event publishing
  • Asynchronous processing for non-blocking operations
  • Reliable message delivery with retry mechanisms

3. Connection Management

  • Automatic connection recovery and health monitoring
  • Configurable timeouts and retry policies
  • Circuit breaker patterns for failing connections

4. Data Normalization

  • Convert manufacturer-specific data formats to internal models
  • Standardized status codes and progress reporting
  • Unified file listing and metadata extraction

Bambu Lab Integration Pattern (MQTT)

Protocol Characteristics

  • Communication: MQTT over TCP/IP
  • Library: bambulabs-api Python library
  • Authentication: IP Address + Access Code + Serial Number
  • Real-time Updates: Event-driven via MQTT callbacks
  • Connection: Persistent MQTT connection with automatic reconnection

Implementation Pattern

import asyncio
import json
from datetime import datetime
from typing import Dict, Optional, Callable, Any
from bambulabs_api import BambuClient
from bambulabs_api.types import PrinterStatus as BambuStatus

class BambuLabIntegration(BasePrinterIntegration):
    """
    Bambu Lab printer integration using MQTT protocol
    """

    def __init__(self, config: BambuLabConfig, event_publisher: EventPublisher):
        super().__init__(config, event_publisher)
        self.client: Optional[BambuClient] = None
        self.connection_state = ConnectionState.DISCONNECTED
        self.last_status: Optional[BambuStatus] = None
        self.callback_handlers: Dict[str, Callable] = {}
        self._setup_callbacks()

    async def connect(self) -> ConnectionResult:
        """
        Establish MQTT connection to Bambu Lab printer
        """
        try:
            # Initialize Bambu client with credentials
            self.client = BambuClient(
                host=self.config.ip_address,
                access_code=self.config.access_code,
                serial=self.config.serial_number
            )

            # Set up event callbacks
            self.client.on_printer_status = self._on_printer_status
            self.client.on_print_progress = self._on_print_progress
            self.client.on_temperature_update = self._on_temperature_update
            self.client.on_ams_status = self._on_ams_status
            self.client.on_connection_lost = self._on_connection_lost
            self.client.on_error = self._on_error

            # Connect to printer
            await self.client.connect()
            self.connection_state = ConnectionState.CONNECTED

            # Subscribe to relevant MQTT topics
            await self._subscribe_to_topics()

            # Request initial status
            await self.client.request_status()

            self._publish_event("printer_connected", {
                "printer_id": self.config.id,
                "connection_type": "mqtt",
                "firmware_version": await self._get_firmware_version()
            })

            return ConnectionResult.success("MQTT connection established")

        except Exception as e:
            self.connection_state = ConnectionState.ERROR
            self._publish_event("printer_connection_failed", {
                "printer_id": self.config.id,
                "error": str(e)
            })
            return ConnectionResult.error(f"MQTT connection failed: {str(e)}")

    async def disconnect(self) -> bool:
        """
        Disconnect from Bambu Lab printer
        """
        try:
            if self.client:
                await self.client.disconnect()
            self.connection_state = ConnectionState.DISCONNECTED
            self._publish_event("printer_disconnected", {
                "printer_id": self.config.id
            })
            return True
        except Exception as e:
            self._logger.error(f"Disconnect error: {e}")
            return False

    async def get_status(self) -> PrinterStatus:
        """
        Get current printer status (uses cached data from MQTT updates)
        """
        if not self.last_status:
            # If no cached status, request fresh data
            if self.client and self.connection_state == ConnectionState.CONNECTED:
                await self.client.request_status()
                # Wait briefly for response
                await asyncio.sleep(0.5)

        return self._convert_bambu_status(self.last_status) if self.last_status else None

    async def get_current_job(self) -> Optional[JobStatus]:
        """
        Get current print job information
        """
        if not self.last_status:
            return None

        # Extract job info from Bambu status
        return JobStatus(
            id=self.last_status.job_id or 0,
            name=self.last_status.gcode_file or "Unknown",
            status=self._convert_job_status(self.last_status.gcode_state),
            progress=float(self.last_status.print_percent or 0),
            layer_current=self.last_status.layer_num or 0,
            layer_total=self.last_status.total_layer_num,
            estimated_remaining=self._calculate_remaining_time()
        )

    async def list_files(self) -> List[RemoteFile]:
        """
        List files available on printer storage
        """
        try:
            if not self.client or self.connection_state != ConnectionState.CONNECTED:
                raise ConnectionError("Printer not connected")

            # Request file list via MQTT
            file_data = await self.client.request_file_list()

            files = []
            for file_info in file_data.get("files", []):
                files.append(RemoteFile(
                    name=file_info["name"],
                    path=file_info["path"],
                    size=file_info["size"],
                    modified=datetime.fromisoformat(file_info["time"]),
                    type=self._get_file_type(file_info["name"])
                ))

            return files

        except Exception as e:
            self._logger.error(f"Failed to list files: {e}")
            return []

    async def download_file(self, file_path: str) -> bytes:
        """
        Download file from printer storage
        Note: Bambu Lab may not support direct file downloads via MQTT
        This would require HTTP fallback or alternative method
        """
        try:
            # Bambu Lab doesn't directly support file downloads via MQTT
            # This would need to be implemented via HTTP API or alternative method
            raise NotImplementedError(
                "File downloads from Bambu Lab printers require HTTP API access"
            )

        except Exception as e:
            self._logger.error(f"File download failed: {e}")
            raise DownloadError(f"Failed to download {file_path}: {str(e)}")

    async def cancel_job(self) -> CancelResult:
        """
        Cancel current print job
        """
        try:
            if not self.client or self.connection_state != ConnectionState.CONNECTED:
                return CancelResult.error("Printer not connected")

            await self.client.stop_print()

            self._publish_event("job_cancel_requested", {
                "printer_id": self.config.id
            })

            return CancelResult.success("Cancel command sent")

        except Exception as e:
            return CancelResult.error(f"Cancel failed: {str(e)}")

    # MQTT Event Handlers

    async def _on_printer_status(self, status: BambuStatus):
        """
        Handle printer status updates from MQTT
        """
        self.last_status = status
        self.last_communication = datetime.now()

        # Convert and publish internal status update
        internal_status = self._convert_bambu_status(status)
        self._publish_event("printer_status_updated", {
            "printer_id": self.config.id,
            "status": internal_status.dict()
        })

    async def _on_print_progress(self, progress_data: Dict[str, Any]):
        """
        Handle print progress updates
        """
        job_update = {
            "printer_id": self.config.id,
            "progress": float(progress_data.get("print_percent", 0)),
            "layer_current": progress_data.get("layer_num", 0),
            "estimated_remaining": self._calculate_remaining_time()
        }

        self._publish_event("job_progress_updated", job_update)

    async def _on_temperature_update(self, temp_data: Dict[str, Any]):
        """
        Handle temperature updates
        """
        temps = {
            "printer_id": self.config.id,
            "nozzle_temp": temp_data.get("nozzle_temper"),
            "nozzle_target": temp_data.get("nozzle_target_temper"),
            "bed_temp": temp_data.get("bed_temper"),
            "bed_target": temp_data.get("bed_target_temper"),
            "chamber_temp": temp_data.get("chamber_temper")
        }

        self._publish_event("temperature_updated", temps)

    async def _on_ams_status(self, ams_data: Dict[str, Any]):
        """
        Handle AMS (Automatic Material System) status updates
        """
        self._publish_event("ams_status_updated", {
            "printer_id": self.config.id,
            "ams_data": ams_data
        })

    async def _on_connection_lost(self):
        """
        Handle MQTT connection loss
        """
        self.connection_state = ConnectionState.ERROR
        self._publish_event("printer_connection_lost", {
            "printer_id": self.config.id,
            "timestamp": datetime.now()
        })

        # Attempt reconnection
        await self._attempt_reconnection()

    async def _on_error(self, error: Exception):
        """
        Handle MQTT communication errors
        """
        self._logger.error(f"MQTT error for printer {self.config.id}: {error}")
        self._publish_event("printer_error", {
            "printer_id": self.config.id,
            "error": str(error),
            "error_type": type(error).__name__
        })

    # Helper Methods

    async def _subscribe_to_topics(self):
        """
        Subscribe to relevant MQTT topics for the printer
        """
        topics = [
            f"device/{self.config.serial_number}/report",  # Status reports
            f"device/{self.config.serial_number}/print",   # Print progress
            f"device/{self.config.serial_number}/camera",  # Camera updates
            f"device/{self.config.serial_number}/ams"      # AMS status
        ]

        for topic in topics:
            await self.client.subscribe(topic)

    def _convert_bambu_status(self, status: BambuStatus) -> PrinterStatus:
        """
        Convert Bambu Lab status to internal format
        """
        # Map Bambu states to internal states
        state_mapping = {
            "IDLE": InternalStatus.ONLINE,
            "PREPARE": InternalStatus.BUSY,
            "RUNNING": InternalStatus.BUSY,
            "PAUSE": InternalStatus.BUSY,
            "COMPLETE": InternalStatus.ONLINE,
            "FAILED": InternalStatus.ERROR,
            "OFFLINE": InternalStatus.OFFLINE
        }

        internal_status = state_mapping.get(status.gcode_state, InternalStatus.UNKNOWN)

        return PrinterStatus(
            printer_id=self.config.id,
            status=internal_status,
            temperatures=PrinterTemperatures(
                nozzle=Temperature(
                    current=status.nozzle_temper or 0,
                    target=status.nozzle_target_temper or 0
                ) if status.nozzle_temper is not None else None,
                bed=Temperature(
                    current=status.bed_temper or 0,
                    target=status.bed_target_temper or 0
                ) if status.bed_temper is not None else None,
                chamber=Temperature(
                    current=status.chamber_temper or 0,
                    target=0  # Bambu doesn't usually have chamber heating
                ) if status.chamber_temper is not None else None
            ),
            current_job_progress=float(status.print_percent or 0),
            firmware_version=status.firmware_version,
            last_updated=datetime.now()
        )

    def _convert_job_status(self, gcode_state: str) -> JobStatusEnum:
        """
        Convert Bambu job state to internal job status
        """
        mapping = {
            "IDLE": JobStatusEnum.QUEUED,
            "PREPARE": JobStatusEnum.PREPARING,
            "RUNNING": JobStatusEnum.PRINTING,
            "PAUSE": JobStatusEnum.PAUSED,
            "COMPLETE": JobStatusEnum.COMPLETED,
            "FAILED": JobStatusEnum.FAILED,
            "STOPPED": JobStatusEnum.CANCELLED
        }
        return mapping.get(gcode_state, JobStatusEnum.QUEUED)

    async def _attempt_reconnection(self):
        """
        Attempt to reconnect to printer with exponential backoff
        """
        max_retries = 5
        base_delay = 5  # seconds

        for attempt in range(max_retries):
            try:
                await asyncio.sleep(base_delay * (2 ** attempt))  # Exponential backoff
                result = await self.connect()

                if result.success:
                    self._logger.info(f"Reconnection successful for {self.config.id}")
                    return

            except Exception as e:
                self._logger.error(f"Reconnection attempt {attempt + 1} failed: {e}")

        self._logger.error(f"All reconnection attempts failed for {self.config.id}")

Prusa Integration Pattern (HTTP)

Protocol Characteristics

  • Communication: HTTP REST API (PrusaLink)
  • Authentication: API Key header
  • Updates: Polling-based (30-second intervals)
  • Connection: Stateless HTTP requests with connection pooling

Implementation Pattern

import aiohttp
import asyncio
from datetime import datetime, timedelta
from typing import Dict, Optional, List, Any

class PrusaIntegration(BasePrinterIntegration):
    """
    Prusa printer integration using PrusaLink HTTP API
    """

    def __init__(self, config: PrusaConfig, event_publisher: EventPublisher):
        super().__init__(config, event_publisher)
        self.session: Optional[aiohttp.ClientSession] = None
        self.base_url = f"http://{config.ip_address}:{config.port or 80}/api"
        self.headers = {"X-Api-Key": config.api_key}
        self.polling_task: Optional[asyncio.Task] = None
        self.poll_interval = 30  # seconds
        self.last_status: Optional[Dict] = None
        self.connection_state = ConnectionState.DISCONNECTED

    async def connect(self) -> ConnectionResult:
        """
        Establish HTTP connection and start polling
        """
        try:
            # Create HTTP session with connection pooling
            connector = aiohttp.TCPConnector(
                limit=10,  # Connection pool limit
                ttl_dns_cache=300,  # DNS cache TTL
                timeout=aiohttp.ClientTimeout(total=30)
            )

            self.session = aiohttp.ClientSession(
                connector=connector,
                headers=self.headers
            )

            # Test connection with version endpoint
            version_info = await self._make_request("GET", "/version")

            if not version_info:
                raise ConnectionError("Failed to get printer version")

            self.connection_state = ConnectionState.CONNECTED

            # Start polling task
            self.polling_task = asyncio.create_task(self._polling_loop())

            self._publish_event("printer_connected", {
                "printer_id": self.config.id,
                "connection_type": "http",
                "firmware_version": version_info.get("server"),
                "api_version": version_info.get("api")
            })

            return ConnectionResult.success("HTTP connection established")

        except Exception as e:
            self.connection_state = ConnectionState.ERROR
            await self._cleanup_session()

            self._publish_event("printer_connection_failed", {
                "printer_id": self.config.id,
                "error": str(e)
            })

            return ConnectionResult.error(f"HTTP connection failed: {str(e)}")

    async def disconnect(self) -> bool:
        """
        Stop polling and close HTTP session
        """
        try:
            # Cancel polling task
            if self.polling_task and not self.polling_task.done():
                self.polling_task.cancel()
                try:
                    await self.polling_task
                except asyncio.CancelledError:
                    pass

            # Close HTTP session
            await self._cleanup_session()

            self.connection_state = ConnectionState.DISCONNECTED

            self._publish_event("printer_disconnected", {
                "printer_id": self.config.id
            })

            return True

        except Exception as e:
            self._logger.error(f"Disconnect error: {e}")
            return False

    async def get_status(self) -> Optional[PrinterStatus]:
        """
        Get current printer status via HTTP API
        """
        try:
            status_data = await self._make_request("GET", "/printer")

            if not status_data:
                return None

            return self._convert_prusa_status(status_data)

        except Exception as e:
            self._logger.error(f"Failed to get status: {e}")
            return None

    async def get_current_job(self) -> Optional[JobStatus]:
        """
        Get current print job information
        """
        try:
            job_data = await self._make_request("GET", "/job")

            if not job_data or not job_data.get("job"):
                return None

            job_info = job_data["job"]
            progress = job_data.get("progress", {})

            return JobStatus(
                id=0,  # Prusa doesn't provide job IDs
                name=job_info.get("file", {}).get("name", "Unknown"),
                status=self._convert_job_status(job_data.get("state", "Offline")),
                progress=float(progress.get("completion", 0) or 0),
                estimated_remaining=progress.get("printTimeLeft"),
                file_size=job_info.get("file", {}).get("size")
            )

        except Exception as e:
            self._logger.error(f"Failed to get job info: {e}")
            return None

    async def list_files(self) -> List[RemoteFile]:
        """
        List files available in printer storage
        """
        try:
            files_data = await self._make_request("GET", "/files")

            if not files_data:
                return []

            files = []
            self._process_file_tree(files_data.get("files", {}), files)

            return files

        except Exception as e:
            self._logger.error(f"Failed to list files: {e}")
            return []

    async def download_file(self, file_path: str) -> bytes:
        """
        Download file from printer storage
        """
        try:
            # Construct download URL
            download_url = f"/files/local{file_path}"

            async with self.session.get(
                f"{self.base_url}{download_url}",
                headers=self.headers
            ) as response:

                if response.status != 200:
                    raise DownloadError(f"HTTP {response.status}: {response.reason}")

                return await response.read()

        except Exception as e:
            self._logger.error(f"File download failed: {e}")
            raise DownloadError(f"Failed to download {file_path}: {str(e)}")

    async def cancel_job(self) -> CancelResult:
        """
        Cancel current print job
        """
        try:
            result = await self._make_request("POST", "/job", json={"command": "cancel"})

            if result is None:  # Successful POST typically returns nothing
                self._publish_event("job_cancel_requested", {
                    "printer_id": self.config.id
                })
                return CancelResult.success("Cancel command sent")
            else:
                return CancelResult.error("Unexpected response from printer")

        except Exception as e:
            return CancelResult.error(f"Cancel failed: {str(e)}")

    # HTTP Communication Methods

    async def _make_request(self, method: str, endpoint: str, **kwargs) -> Optional[Dict]:
        """
        Make HTTP request to printer API
        """
        if not self.session:
            raise ConnectionError("No active session")

        url = f"{self.base_url}{endpoint}"

        try:
            async with self.session.request(method, url, **kwargs) as response:

                # Handle authentication errors
                if response.status == 401:
                    self._publish_event("printer_auth_error", {
                        "printer_id": self.config.id,
                        "error": "Invalid API key"
                    })
                    raise AuthenticationError("Invalid API key")

                # Handle other HTTP errors
                if response.status >= 400:
                    error_text = await response.text()
                    raise HTTPError(f"HTTP {response.status}: {error_text}")

                # Parse JSON response
                if response.content_type == "application/json":
                    return await response.json()
                else:
                    # Some endpoints return no content on success
                    return None

        except aiohttp.ClientError as e:
            self._logger.error(f"HTTP request failed: {e}")
            raise ConnectionError(f"Request failed: {str(e)}")

    async def _polling_loop(self):
        """
        Main polling loop for status updates
        """
        consecutive_failures = 0
        max_failures = 5

        while True:
            try:
                # Get current status
                status_data = await self._make_request("GET", "/printer")

                if status_data:
                    await self._process_status_update(status_data)
                    consecutive_failures = 0
                else:
                    consecutive_failures += 1

                # Check for too many failures
                if consecutive_failures >= max_failures:
                    self._logger.error(f"Too many polling failures for {self.config.id}")
                    self.connection_state = ConnectionState.ERROR
                    self._publish_event("printer_connection_lost", {
                        "printer_id": self.config.id,
                        "reason": "Polling failures"
                    })
                    break

            except asyncio.CancelledError:
                # Polling task was cancelled
                break

            except Exception as e:
                consecutive_failures += 1
                self._logger.error(f"Polling error: {e}")

                if consecutive_failures >= max_failures:
                    self.connection_state = ConnectionState.ERROR
                    break

            # Wait for next poll
            try:
                await asyncio.sleep(self.poll_interval)
            except asyncio.CancelledError:
                break

    async def _process_status_update(self, status_data: Dict):
        """
        Process status update from polling
        """
        current_status = self._convert_prusa_status(status_data)

        # Check if status changed significantly
        if self._status_changed(current_status):
            self.last_status = status_data
            self.last_communication = datetime.now()

            self._publish_event("printer_status_updated", {
                "printer_id": self.config.id,
                "status": current_status.dict() if current_status else None
            })

            # Also check for job updates
            await self._check_job_updates()

    async def _check_job_updates(self):
        """
        Check for job progress updates
        """
        try:
            job_data = await self._make_request("GET", "/job")

            if job_data and job_data.get("progress"):
                progress = job_data["progress"]

                job_update = {
                    "printer_id": self.config.id,
                    "progress": float(progress.get("completion", 0) or 0),
                    "estimated_remaining": progress.get("printTimeLeft"),
                    "elapsed_time": progress.get("printTime")
                }

                self._publish_event("job_progress_updated", job_update)

        except Exception as e:
            self._logger.error(f"Failed to check job updates: {e}")

    def _convert_prusa_status(self, status_data: Dict) -> Optional[PrinterStatus]:
        """
        Convert Prusa API status to internal format
        """
        printer_info = status_data.get("state", {})
        temp_info = status_data.get("temperature", {})

        # Map Prusa states to internal states
        state_text = printer_info.get("text", "").lower()
        if "operational" in state_text:
            internal_status = InternalStatus.ONLINE
        elif any(word in state_text for word in ["printing", "paused"]):
            internal_status = InternalStatus.BUSY
        elif "error" in state_text:
            internal_status = InternalStatus.ERROR
        elif "offline" in state_text:
            internal_status = InternalStatus.OFFLINE
        else:
            internal_status = InternalStatus.UNKNOWN

        # Extract temperatures
        temperatures = None
        if temp_info:
            temperatures = PrinterTemperatures()

            if "tool0" in temp_info:
                tool_temp = temp_info["tool0"]
                temperatures.nozzle = Temperature(
                    current=tool_temp.get("actual", 0),
                    target=tool_temp.get("target", 0)
                )

            if "bed" in temp_info:
                bed_temp = temp_info["bed"]
                temperatures.bed = Temperature(
                    current=bed_temp.get("actual", 0),
                    target=bed_temp.get("target", 0)
                )

        return PrinterStatus(
            printer_id=self.config.id,
            status=internal_status,
            temperatures=temperatures,
            last_updated=datetime.now()
        )

    def _process_file_tree(self, tree: Dict, files: List[RemoteFile], path_prefix: str = ""):
        """
        Recursively process Prusa file tree structure
        """
        for name, info in tree.items():
            current_path = f"{path_prefix}/{name}" if path_prefix else f"/{name}"

            if info.get("type") == "folder":
                # Recurse into subdirectory
                children = info.get("children", {})
                self._process_file_tree(children, files, current_path)
            else:
                # It's a file
                files.append(RemoteFile(
                    name=name,
                    path=current_path,
                    size=info.get("size", 0),
                    modified=datetime.fromtimestamp(info.get("date", 0)),
                    type=self._get_file_type(name)
                ))

    def _convert_job_status(self, state: str) -> JobStatusEnum:
        """
        Convert Prusa job state to internal job status
        """
        state_lower = state.lower()

        if "printing" in state_lower:
            return JobStatusEnum.PRINTING
        elif "paused" in state_lower:
            return JobStatusEnum.PAUSED
        elif "finished" in state_lower or "complete" in state_lower:
            return JobStatusEnum.COMPLETED
        elif "cancelled" in state_lower or "stopped" in state_lower:
            return JobStatusEnum.CANCELLED
        elif "error" in state_lower or "failed" in state_lower:
            return JobStatusEnum.FAILED
        else:
            return JobStatusEnum.QUEUED

    def _status_changed(self, new_status: Optional[PrinterStatus]) -> bool:
        """
        Check if status has changed significantly since last update
        """
        if not self.last_status or not new_status:
            return True

        # Compare key status indicators
        old_converted = self._convert_prusa_status(self.last_status)

        if not old_converted:
            return True

        return (
            old_converted.status != new_status.status or
            abs((old_converted.last_updated - new_status.last_updated).total_seconds()) > 30
        )

    async def _cleanup_session(self):
        """
        Clean up HTTP session
        """
        if self.session and not self.session.closed:
            await self.session.close()
        self.session = None

Common Integration Patterns

Base Integration Interface

from abc import ABC, abstractmethod
from enum import Enum
from dataclasses import dataclass
from typing import List, Optional, Dict, Any
from datetime import datetime

class ConnectionState(Enum):
    DISCONNECTED = "disconnected"
    CONNECTING = "connecting"
    CONNECTED = "connected"
    ERROR = "error"

@dataclass
class ConnectionResult:
    success: bool
    message: str
    error_code: Optional[str] = None

    @classmethod
    def success(cls, message: str) -> "ConnectionResult":
        return cls(success=True, message=message)

    @classmethod
    def error(cls, message: str, error_code: str = None) -> "ConnectionResult":
        return cls(success=False, message=message, error_code=error_code)

@dataclass
class CancelResult:
    success: bool
    message: str

    @classmethod
    def success(cls, message: str) -> "CancelResult":
        return cls(success=True, message=message)

    @classmethod
    def error(cls, message: str) -> "CancelResult":
        return cls(success=False, message=message)

class BasePrinterIntegration(ABC):
    """
    Abstract base class for all printer integrations
    """

    def __init__(self, config: PrinterConfig, event_publisher: EventPublisher):
        self.config = config
        self.event_publisher = event_publisher
        self._logger = get_logger(f"printer.{config.id}")
        self.last_communication: Optional[datetime] = None

    @abstractmethod
    async def connect(self) -> ConnectionResult:
        """Establish connection to printer"""
        pass

    @abstractmethod
    async def disconnect(self) -> bool:
        """Disconnect from printer"""
        pass

    @abstractmethod
    async def get_status(self) -> Optional[PrinterStatus]:
        """Get current printer status"""
        pass

    @abstractmethod
    async def get_current_job(self) -> Optional[JobStatus]:
        """Get current print job information"""
        pass

    @abstractmethod
    async def list_files(self) -> List[RemoteFile]:
        """List files available on printer"""
        pass

    @abstractmethod
    async def download_file(self, file_path: str) -> bytes:
        """Download file from printer"""
        pass

    @abstractmethod
    async def cancel_job(self) -> CancelResult:
        """Cancel current print job"""
        pass

    # Common utility methods

    def is_connected(self) -> bool:
        """Check if printer is currently connected"""
        return (
            self.last_communication and
            (datetime.now() - self.last_communication).total_seconds() < 120  # 2 minutes
        )

    def _publish_event(self, event_type: str, data: Dict[str, Any]):
        """Publish event to the event system"""
        try:
            self.event_publisher.publish(SystemEvent(
                type=event_type,
                data=data,
                timestamp=datetime.now(),
                source=f"printer.{self.config.id}"
            ))
        except Exception as e:
            self._logger.error(f"Failed to publish event: {e}")

    def _get_file_type(self, filename: str) -> str:
        """Extract file type from filename"""
        return Path(filename).suffix.lower()

Error Handling Patterns

class PrinterIntegrationError(Exception):
    """Base exception for printer integration errors"""
    pass

class ConnectionError(PrinterIntegrationError):
    """Connection-related errors"""
    pass

class AuthenticationError(PrinterIntegrationError):
    """Authentication failures"""
    pass

class DownloadError(PrinterIntegrationError):
    """File download failures"""
    pass

class HTTPError(PrinterIntegrationError):
    """HTTP-specific errors"""
    pass

class MQTTError(PrinterIntegrationError):
    """MQTT-specific errors"""
    pass

# Circuit breaker pattern for failing connections
class CircuitBreaker:
    """
    Circuit breaker to prevent cascading failures
    """

    def __init__(self, failure_threshold: int = 5, timeout: int = 60):
        self.failure_threshold = failure_threshold
        self.timeout = timeout
        self.failure_count = 0
        self.last_failure_time = None
        self.state = "closed"  # closed, open, half-open

    def can_execute(self) -> bool:
        """Check if operation can be executed"""
        if self.state == "closed":
            return True
        elif self.state == "open":
            if datetime.now() - self.last_failure_time > timedelta(seconds=self.timeout):
                self.state = "half-open"
                return True
            return False
        elif self.state == "half-open":
            return True

    def record_success(self):
        """Record successful operation"""
        self.failure_count = 0
        self.state = "closed"
        self.last_failure_time = None

    def record_failure(self):
        """Record failed operation"""
        self.failure_count += 1
        self.last_failure_time = datetime.now()

        if self.failure_count >= self.failure_threshold:
            self.state = "open"

Integration Factory Pattern

class PrinterIntegrationFactory:
    """
    Factory for creating printer integration instances
    """

    _integration_classes = {
        "bambu_lab": BambuLabIntegration,
        "prusa": PrusaIntegration
    }

    @classmethod
    def create_integration(
        cls, 
        printer_config: PrinterConfig, 
        event_publisher: EventPublisher
    ) -> BasePrinterIntegration:
        """
        Create appropriate integration instance based on printer type
        """
        printer_type = printer_config.type.lower()

        if printer_type not in cls._integration_classes:
            raise ValueError(f"Unsupported printer type: {printer_type}")

        integration_class = cls._integration_classes[printer_type]
        return integration_class(printer_config, event_publisher)

    @classmethod
    def get_supported_types(cls) -> List[str]:
        """Get list of supported printer types"""
        return list(cls._integration_classes.keys())

    @classmethod
    def register_integration(cls, printer_type: str, integration_class):
        """Register new integration type"""
        cls._integration_classes[printer_type] = integration_class

Integration Best Practices

1. Connection Management

  • Implement automatic reconnection with exponential backoff
  • Use circuit breaker pattern to prevent cascading failures
  • Monitor connection health with heartbeat/ping mechanisms
  • Log all connection events for debugging

2. Error Handling

  • Categorize errors (network, authentication, protocol, etc.)
  • Implement retry logic with appropriate delays
  • Graceful degradation when connections fail
  • Clear error messages for troubleshooting

3. Performance Optimization

  • Connection pooling for HTTP integrations
  • Efficient MQTT topic subscriptions
  • Minimize polling frequency while maintaining responsiveness
  • Cache frequently accessed data

4. Security Considerations

  • Secure credential storage and transmission
  • Input validation for all API responses
  • Network timeout enforcement
  • Certificate validation for HTTPS connections

5. Monitoring and Logging

  • Comprehensive logging for all integration events
  • Performance metrics collection
  • Health monitoring dashboards
  • Alert systems for integration failures

This integration pattern design provides a robust foundation for communicating with different printer types while maintaining consistency and reliability across the Printernizer system.