Skip to content

API Reference: Connection

socketspec.connection.Connection dataclass

A single live WebSocket connection passed to every event handler.

Note

emit and disconnect are rebound by ConnectionManager.connect() with bound callables. Until then, calling them raises RuntimeError.

Source code in src\socketspec\connection.py
@dataclass
class Connection:
    """A single live WebSocket connection passed to every event handler.

    Note:
        ``emit`` and ``disconnect`` are rebound by ``ConnectionManager.connect()``
        with bound callables. Until then, calling them raises ``RuntimeError``.
    """

    id: ConnectionId
    raw_socket: RawSocket
    identity: Identity
    session: SessionInfo
    connected_at: datetime
    last_active: datetime
    rooms: set[RoomName] = field(default_factory=set)
    metadata: dict[str, Any] = field(default_factory=dict)
    headers: dict[str, str] = field(default_factory=dict)
    query_params: dict[str, str] = field(default_factory=dict)
    namespace: Namespace = "/"
    _emit_fn: EmitCallable | None = field(default=None, repr=False, compare=False)
    _disconnect_fn: DisconnectCallable | None = field(
        default=None,
        repr=False,
        compare=False,
    )

    async def emit(
        self,
        event: EventName,
        payload: PayloadDict | BaseModel,
    ) -> None:
        """Emit an event directly to this connection.

        Args:
            event: Target event name.
            payload: JSON-serializable payload dict or model dump.

        Raises:
            RuntimeError: If ``ConnectionManager.connect()`` has not run yet.
        """
        if self._emit_fn is None:
            raise RuntimeError(
                "Connection.emit is not available until "
                "ConnectionManager.connect() is called"
            )
        await self._emit_fn(event, payload)

    async def disconnect(self, reason: str = "server_close") -> None:
        """Forcibly close this connection.

        Args:
            reason: Human-readable disconnect reason for logging.

        Raises:
            RuntimeError: If ``ConnectionManager.connect()`` has not run yet.
        """
        if self._disconnect_fn is None:
            raise RuntimeError(
                "Connection.disconnect is not available until "
                "ConnectionManager.connect() is called"
            )
        await self._disconnect_fn(reason)

disconnect(reason='server_close') async

Forcibly close this connection.

Parameters:

Name Type Description Default
reason str

Human-readable disconnect reason for logging.

'server_close'

Raises:

Type Description
RuntimeError

If ConnectionManager.connect() has not run yet.

Source code in src\socketspec\connection.py
async def disconnect(self, reason: str = "server_close") -> None:
    """Forcibly close this connection.

    Args:
        reason: Human-readable disconnect reason for logging.

    Raises:
        RuntimeError: If ``ConnectionManager.connect()`` has not run yet.
    """
    if self._disconnect_fn is None:
        raise RuntimeError(
            "Connection.disconnect is not available until "
            "ConnectionManager.connect() is called"
        )
    await self._disconnect_fn(reason)

emit(event, payload) async

Emit an event directly to this connection.

Parameters:

Name Type Description Default
event EventName

Target event name.

required
payload PayloadDict | BaseModel

JSON-serializable payload dict or model dump.

required

Raises:

Type Description
RuntimeError

If ConnectionManager.connect() has not run yet.

Source code in src\socketspec\connection.py
async def emit(
    self,
    event: EventName,
    payload: PayloadDict | BaseModel,
) -> None:
    """Emit an event directly to this connection.

    Args:
        event: Target event name.
        payload: JSON-serializable payload dict or model dump.

    Raises:
        RuntimeError: If ``ConnectionManager.connect()`` has not run yet.
    """
    if self._emit_fn is None:
        raise RuntimeError(
            "Connection.emit is not available until "
            "ConnectionManager.connect() is called"
        )
    await self._emit_fn(event, payload)

Attributes

Attribute Type Description
id str Unique connection ID (UUID4)
identity Identity Authenticated identity (user_id, role, token_expires_at)
session SessionInfo Session metadata (started_at, expires_at)
connected_at datetime UTC timestamp of connection establishment
last_active datetime UTC timestamp of last received event (updated on every inbound message)
rooms set[str] Room names this connection is currently in
headers dict[str, str] HTTP upgrade request headers
query_params dict[str, str] HTTP upgrade request query parameters
namespace str The namespace this connection belongs to

Methods

await conn.emit(event, payload)

Send a message directly to this connection.

await conn.emit("notification", {"text": "You have a new message"})
Parameter Type Description
event str Event name
payload dict JSON-serializable payload

await conn.disconnect(reason)

Close this connection gracefully.

await conn.disconnect("banned")
Parameter Type Default Description
reason str "server_close" Reason string logged and passed to on_disconnect hooks

Identity

conn.identity is populated by the AuthBackend at connect time.

@dataclass
class Identity:
    user_id: str | None = None
    role: str | None = None
    token_expires_at: datetime | None = None
    extra: dict = field(default_factory=dict)

For unauthenticated connections (no auth= configured), all fields are None.

Access identity in a handler:

@socket.on("protected_action")
async def protected(conn, payload) -> None:
    if not conn.identity.user_id:
        await conn.emit("__error__", {"code": "AUTH_ERROR"})
        return
    # proceed

SessionInfo

conn.session contains timing metadata for the current session.

@dataclass
class SessionInfo:
    started_at: datetime       # when the connection was established
    expires_at: datetime | None  # absolute expiry (None = no max_duration)
    token_expires_at: datetime | None  # from identity

Usage in Handlers

from socketspec.connection import Connection

@socket.on("whoami")
async def whoami(conn: Connection) -> None:
    await conn.emit("identity", {
        "conn_id":    conn.id,
        "user_id":    conn.identity.user_id,
        "rooms":      list(conn.rooms),
        "connected":  conn.connected_at.isoformat(),
        "last_active": conn.last_active.isoformat(),
    })

Usage in Lifecycle Hooks

@socket.on_connect
async def on_connect(conn: Connection) -> None:
    # Send a welcome message immediately after connect
    await conn.emit("welcome", {
        "conn_id": conn.id,
        "message": "Connected to SocketSpec",
    })

@socket.on_disconnect
async def on_disconnect(conn: Connection, reason: str) -> None:
    # Log disconnect reason
    logger.info("Client %s disconnected: %s", conn.id, reason)