API Reference: Connection¶
socketspec.connection.Connection
dataclass
¶
A single live WebSocket connection passed to every event handler.
Note
emit and disconnect are rebound by ConnectionManager.connect()
with bound callables. Until then, calling them raises RuntimeError.
Source code in src\socketspec\connection.py
disconnect(reason='server_close')
async
¶
Forcibly close this connection.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
reason
|
str
|
Human-readable disconnect reason for logging. |
'server_close'
|
Raises:
| Type | Description |
|---|---|
RuntimeError
|
If |
Source code in src\socketspec\connection.py
emit(event, payload)
async
¶
Emit an event directly to this connection.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
event
|
EventName
|
Target event name. |
required |
payload
|
PayloadDict | BaseModel
|
JSON-serializable payload dict or model dump. |
required |
Raises:
| Type | Description |
|---|---|
RuntimeError
|
If |
Source code in src\socketspec\connection.py
Attributes¶
| Attribute | Type | Description |
|---|---|---|
id |
str |
Unique connection ID (UUID4) |
identity |
Identity |
Authenticated identity (user_id, role, token_expires_at) |
session |
SessionInfo |
Session metadata (started_at, expires_at) |
connected_at |
datetime |
UTC timestamp of connection establishment |
last_active |
datetime |
UTC timestamp of last received event (updated on every inbound message) |
rooms |
set[str] |
Room names this connection is currently in |
headers |
dict[str, str] |
HTTP upgrade request headers |
query_params |
dict[str, str] |
HTTP upgrade request query parameters |
namespace |
str |
The namespace this connection belongs to |
Methods¶
await conn.emit(event, payload)¶
Send a message directly to this connection.
| Parameter | Type | Description |
|---|---|---|
event |
str |
Event name |
payload |
dict |
JSON-serializable payload |
await conn.disconnect(reason)¶
Close this connection gracefully.
| Parameter | Type | Default | Description |
|---|---|---|---|
reason |
str |
"server_close" |
Reason string logged and passed to on_disconnect hooks |
Identity¶
conn.identity is populated by the AuthBackend at connect time.
@dataclass
class Identity:
user_id: str | None = None
role: str | None = None
token_expires_at: datetime | None = None
extra: dict = field(default_factory=dict)
For unauthenticated connections (no auth= configured), all fields are None.
Access identity in a handler:
@socket.on("protected_action")
async def protected(conn, payload) -> None:
if not conn.identity.user_id:
await conn.emit("__error__", {"code": "AUTH_ERROR"})
return
# proceed
SessionInfo¶
conn.session contains timing metadata for the current session.
@dataclass
class SessionInfo:
started_at: datetime # when the connection was established
expires_at: datetime | None # absolute expiry (None = no max_duration)
token_expires_at: datetime | None # from identity
Usage in Handlers¶
from socketspec.connection import Connection
@socket.on("whoami")
async def whoami(conn: Connection) -> None:
await conn.emit("identity", {
"conn_id": conn.id,
"user_id": conn.identity.user_id,
"rooms": list(conn.rooms),
"connected": conn.connected_at.isoformat(),
"last_active": conn.last_active.isoformat(),
})
Usage in Lifecycle Hooks¶
@socket.on_connect
async def on_connect(conn: Connection) -> None:
# Send a welcome message immediately after connect
await conn.emit("welcome", {
"conn_id": conn.id,
"message": "Connected to SocketSpec",
})
@socket.on_disconnect
async def on_disconnect(conn: Connection, reason: str) -> None:
# Log disconnect reason
logger.info("Client %s disconnected: %s", conn.id, reason)