Source code for sidekick.core.communication_manager

"""Defines the abstract base class for managing low-level communication channels.

This module provides the `CommunicationManager` Abstract Base Class (ABC).
Concrete implementations of this class will handle the specifics of different
communication transports, such as WebSockets (for CPython environments) or
JavaScript `postMessage` (for Pyodide environments).

The `CommunicationManager` is responsible for:

- Establishing and tearing down the raw communication link (e.g., WebSocket connection).
- Sending raw string-based messages over the established link.
- Receiving raw string-based messages from the link.
- Reporting changes in the connection status (using `CoreConnectionStatus`).
- Notifying about underlying communication errors that might occur.
- Allowing higher-level services (like `ConnectionService`) to provide callback
  functions during connection to handle incoming messages, status changes, and errors.

This abstraction allows the rest of the Sidekick system to interact with different
communication methods through a consistent interface.
"""

import asyncio
from abc import ABC, abstractmethod
from typing import Awaitable, Callable, Union, Any, Optional # Ensure Optional is imported

from .status import CoreConnectionStatus


# Define type aliases for handler functions to improve readability and maintainability.
# These handlers can be either synchronous functions or asynchronous coroutine functions.

MessageHandlerType = Callable[[str], Union[None, Awaitable[None]]]
"""Type alias for a function that handles incoming raw string messages.
It accepts the message string and returns None or an Awaitable.
"""

StatusChangeHandlerType = Callable[[CoreConnectionStatus], Union[None, Awaitable[None]]]
"""Type alias for a function that handles connection status changes.
It accepts a CoreConnectionStatus enum member and returns None or an Awaitable.
"""

ErrorHandlerType = Callable[[Exception], Union[None, Awaitable[None]]]
"""Type alias for a function that handles low-level communication errors.
It accepts an Exception object and returns None or an Awaitable.
"""


[docs] class CommunicationManager(ABC): """Abstract Base Class for managing a raw communication channel. Implementations of this class will abstract the specifics of the underlying transport mechanism (e.g., WebSockets, Pyodide's JavaScript message passing) and provide a standardized interface for: - Connecting to and disconnecting from a remote endpoint, accepting handlers at connect time. - Sending and receiving string-based messages. - Monitoring the connection status. """
[docs] @abstractmethod async def connect_async( self, message_handler: Optional[MessageHandlerType] = None, status_change_handler: Optional[StatusChangeHandlerType] = None, error_handler: Optional[ErrorHandlerType] = None ) -> None: """Establishes the connection to the remote endpoint asynchronously. Implementations should handle the complete process of setting up the communication link according to the chosen transport protocol (e.g., performing a WebSocket handshake). The provided handlers should be stored and used by the CommunicationManager for the duration of this connection. Upon successful connection, the manager's status should transition to `CoreConnectionStatus.CONNECTED`. If the connection attempt fails, an appropriate `CoreConnectionError` (such as `CoreConnectionRefusedError` or `CoreConnectionTimeoutError`) should be raised to indicate the failure. Args: message_handler (Optional[MessageHandlerType]): Callback for incoming messages. status_change_handler (Optional[StatusChangeHandlerType]): Callback for status changes. error_handler (Optional[ErrorHandlerType]): Callback for communication errors. Raises: CoreConnectionError: If the connection cannot be established due to refusal, timeout, or other transport-level issues. """ pass
[docs] @abstractmethod async def close_async(self) -> None: """Closes the communication channel asynchronously. Implementations should gracefully terminate the connection. This might involve sending close frames (for WebSockets) or releasing resources. The status should typically transition through `CoreConnectionStatus.CLOSING` and eventually to `CoreConnectionStatus.DISCONNECTED`. This method should be idempotent; calling it multiple times on an already closed or closing connection should not cause errors. """ pass
[docs] @abstractmethod async def send_message_async(self, message_str: str) -> None: """Sends a string message over the communication channel asynchronously. The caller is responsible for ensuring the `message_str` is formatted according to the expected protocol (e.g., as a JSON string). Args: message_str (str): The raw string message to send. Raises: CoreDisconnectedError: If the channel is not currently connected or if the send operation fails due to a broken or closed connection. Exception: Other transport-specific exceptions might be raised if the send fails for reasons other than disconnection (e.g., message too large for buffer, underlying socket errors). """ pass
[docs] @abstractmethod def is_connected(self) -> bool: """Checks if the communication channel is currently in an active, connected state. A "connected" state typically means the status is `CoreConnectionStatus.CONNECTED` and the underlying transport link is open and usable for sending/receiving data. Returns: bool: True if the channel is actively connected, False otherwise. """ pass
[docs] @abstractmethod def get_current_status(self) -> CoreConnectionStatus: """Returns the current `CoreConnectionStatus` of the communication channel. This provides the most up-to-date known state of the connection according to the manager. Returns: CoreConnectionStatus: The current connection status. """ pass