API Reference¶
This section provides the auto-generated API reference for the library’s public components.
Server & Client¶
- class webshocket.websocket.client(uri, on_receive=None, *, ssl_context=None, max_packet_qsize=128)¶
Bases:
objectRepresents a WebSocket client for connecting to a WebSocket server.
This class provides functionality to connect, send, receive, and close WebSocket connections, supporting both secure (WSS) and unsecure (WS) protocols. It manages the underlying connection life-cycle and handles automatic reconnection and message queuing.
async with WebSocketClient("ws://localhost:5000") as client: result = await client.send_rpc("ping") print(result.data)
- Variables:
uri (str) – The URI of the WebSocket server.
state (ConnectionState) – The current state of the connection.
ssl_context (ssl.SSLContext | None) – The SSL context used for the connection.
cert (str | None) – Path to the CA certificate file (deprecated).
- async close()¶
Closes the WebSocket client connection gracefully.
Cancels the listener task and closes the underlying protocol. Updates the connection state to DISCONNECTED.
- Return type:
None
- async connect(retry=False, max_retry_attempt=3, retry_interval=2, **kwargs)¶
Connects the WebSocket client to the server.
Supports optional retry logic with exponential backoff.
client = WebSocketClient("ws://localhost:5000") await client.connect(retry=True, max_retry_attempt=5)
- Parameters:
retry (bool) – If True, attempts to reconnect multiple times on failure. Defaults to False.
max_retry_attempt (int) – The maximum number of retry attempts. Defaults to 3.
retry_interval (int) – The base interval in seconds between retry attempts. Defaults to 2.
**kwargs – Keyword arguments to pass to the underlying connection helper.
- Raises:
ConnectionFailedError – If all connection attempts fail when retry is True.
- Return type:
Self
- logger¶
- on_receive_callback¶
- async recv(timeout=30)¶
Receives data from the WebSocket connection.
- Parameters:
timeout (int | None) – The maximum time in seconds to wait for a message. Defaults to 30 seconds. Set to None for no timeout.
- Returns:
The received data
- Return type:
str | bytes
- Raises:
WebSocketError – If the client is not connected.
TimeoutError – If the receive operation times out.
- send(data)¶
Sends data over the WebSocket connection.
- Parameters:
data (Serializable) – The data to send. Must be a serializable type.
- Raises:
WebSocketError – If the client is not connected.
WebSocketError – If the client is not connected.
- Return type:
None
- async send_rpc(method_name, /, *args, raise_on_rate_limit=False, **kwargs)¶
Sends an RPC message to the WebSocket server and waits for the response.
response = await client.send_rpc("add", 10, 20) print(response.data) # 30
- Parameters:
method_name (str) – The name of the RPC method to call.
*args – Positional arguments for the RPC method.
raise_on_rate_limit (bool) – If True, raises RateLimitError if exceeded. Defaults to False.
**kwargs – Keyword arguments for the RPC method.
- Raises:
WebSocketError – If the client is not connected.
RPCTimeoutError – If the request times out.
RateLimitError – If the server returns a RATE_LIMIT_EXCEEDED error and raise_on_rate_limit is True.
- Returns:
The response object from the server.
- Return type:
- ssl_context¶
- state¶
- async stream_rpc(method_name, *args, raise_on_rate_limit=True, **kwargs)¶
Calls an RPC method that streams results via async generator.
async for response in client.stream_rpc("get_updates", max=5): print(response.data)
- Parameters:
method_name (str) – The name of the RPC method to call.
*args – Positional arguments for the RPC method.
raise_on_rate_limit (bool) – Whether to raise RateLimitError if exceeded. Defaults to True.
**kwargs – Keyword arguments for the RPC method.
- Raises:
WebSocketError – If the client is not connected.
RateLimitError – If the server returns a RATE_LIMIT_EXCEEDED error and raise_on_rate_limit is True.
- Yields:
RPCResponse – Each chunk of data yielded by the server.
- Return type:
AsyncGenerator[RPCResponse,None]
- uri¶
- class webshocket.websocket.server(host, port, *, clientHandler=<class 'webshocket.handler.DefaultWebSocketHandler'>, ssl_context=None, max_connection=None, packet_qsize=512, rpc_task_limit=1024, max_subscribed_channels=5)¶
Bases:
Generic[H]Represents a WebSocket server that handles incoming connections and messages.
This class provides functionality to start, manage, and close a WebSocket server, integrating with a custom WebSocketHandler for application-specific logic. It supports both secure (WSS) and unsecure (WS) connections.
server = WebSocketServer("localhost", 5000, clientHandler=MyHandler) async with server: await server.serve_forever()
- async accept()¶
Accepts a new WebSocket connection and returns the ClientConnection object.
This method is only available when using the DefaultWebSocketHandler. Breakdown of the method:
If no client is connected, it will wait until a client is connected and returns the ClientConnection object.
If the handler callback is active, this method will raise a TypeError.
If the server is not started, this method will raise a WebSocketError.
- Returns:
The ClientConnection object for the accepted connection.
- Return type:
- async close()¶
Closes the WebSocket server gracefully.
This method stops the server from accepting new connections and waits for all existing connections to close.
- Return type:
None
- handler¶
- host¶
- logger¶
- max_connection¶
- port¶
- async serve_forever(**kwargs)¶
Starts the WebSocket server and keeps it running indefinitely.
This method calls start() and then waits for the server to be closed.
server = WebSocketServer("localhost", 5000) await server.serve_forever()
- Parameters:
**kwargs – Keyword arguments to pass to picows_server.PicowsServer.
- Return type:
None
- ssl_context¶
- async start(**kwargs)¶
Starts the WebSocket server.
This method initializes the server and makes it ready to accept connections.
- Parameters:
**kwargs – Keyword arguments to pass to picows_server.PicowsServer.
- Return type:
Self
-
state:
ServerState¶
Connection Object¶
- class webshocket.connection.ClientConnection(websocket_protocol, handler, client_type, packet_qsize=128, max_subscribed_channels=5)¶
Bases:
Generic[TState]Represents a single client connection to the WebSocket server.
This class wraps the underlying picows.WSTransport and provides convenient access structure to session-specific state, channel management, and communication methods. It supports dynamic attribute access which maps to an internal session state dictionary.
async def on_receive(self, connection: ClientConnection, packet: Packet): # Access session state directly connection.username = "alice" # Subscribe to a channel connection.subscribe("chat-room") # Send data connection.send({"status": "ok"})
- Variables:
client_type (ClientType) – The type of client (e.g., FRAMEWORK, GENERIC).
connection_state (ConnectionState) – The current state of the connection (CONNECTED, CLOSED, etc.).
session_state (dict) – A dictionary holding arbitrary user-defined state for this connection.
max_subscribed_channels (int) – The maximum number of channels this client can subscribe to.
uid (UUID) – A unique identifier for this connection instance.
logger (Logger) – A logger instance for this connection.
remote_address (tuple[str, int]) – The (host, port) of the connected client.
subscribed_channel (set[str]) – A set of channel names this client is subscribed to.
- client_type¶
- close(code=<WSCloseCode.OK: 1000>, reason=b'')¶
Closes the connection.
- Return type:
None
- connection_state¶
- logger¶
-
max_subscribed_channels:
int¶
- async recv(timeout=30.0)¶
Receives the next message and parses it into a validated Packet object.
This method receives incoming data and parses it into a validated Packet. If the data is plain JSON or msgpack, it’s wrapped in a Packet with source set to CUSTOM.
# Only works if using DefaultWebSocketHandler (no on_receive callback) try: packet = await connection.recv(timeout=10.0) print(packet.data) except ReceiveTimeoutError: print("No message received within timeout")
- Parameters:
timeout (float | None) – Max seconds to wait for a message. Defaults to 30.0. Use None to wait indefinitely.
- Raises:
ConnectionClosedError – If the client is not connected.
ReceiveTimeoutError – If no message is received within the timeout period.
- Returns:
A validated Packet object containing the received data.
- Return type:
- property remote_address: tuple[str, int]¶
A property that gets the remote address of the connection.
- send(data, chunk_size=65536)¶
Sends data over the connection.
Non-Packet payloads are automatically wrapped in a Packet before serialization. Framework clients receive fast msgpack bytes; generic clients receive JSON strings.
# Send a raw string or dict connection.send("Hello World!") connection.send({"status": "success"}) # Or send a pre-constructed Packet packet = Packet(data="Custom", source=PacketSource.CUSTOM) connection.send(packet)
- Parameters:
data (Serializable) – The data to send.
chunk_size (int) – Max WebSocket frame size before fragmentation. Defaults to 64 KB.
- Return type:
None
- session_state¶
- subscribe(channel)¶
A shortcut method for this connection to join one or more channels.
connection.subscribe("room1") connection.subscribe(["room2", "room3"]) connection.subscribe("news.*") # Wildcard pattern
- Parameters:
channel (str | Iterable[str]) – A string or iterable that contains lists of channel to join.
- Returns:
True if subscribed successfully, False if the max channel limit is reached.
- Return type:
bool
- property subscribed_channel: set[str]¶
Returns the set of channel names this client is subscribed to.
- Returns:
A copy of the internal subscribed channels set.
- uid¶
- unsubscribe(channel)¶
A shortcut method for this connection to leave one or more channels.
connection.unsubscribe("room1") connection.unsubscribe(["room2", "room3"])
- Parameters:
channel (str | Iterable[str]) – A string or iterable that contains lists of channel to leave.
- Return type:
None
Handler Interface¶
- class webshocket.handler.DefaultWebSocketHandler¶
Bases:
WebSocketHandlerA minimal, built-in handler that performs no actions on events.
This is used as the default by the webshocket.server if no custom handler is provided by the user. It simply queues received packets for manual retrieval via accept()/recv().
-
channels:
dict[str,set[ClientConnection]]¶
-
clients:
set[ClientConnection]¶
- async on_receive(connection, packet)¶
Called when a client sends a confirmed packet.
- Parameters:
connection (ClientConnection[TState]) – The connection object sending the packet.
packet (Packet) – The received data packet.
-
patterns:
dict[str,set[ClientConnection]]¶
-
channels:
- class webshocket.handler.WebSocketHandler¶
Bases:
Generic[TState]Defines the interface for handling server-side WebSocket logic.
This class serves as the base for implementing custom application logic. Subclasses should override the lifecycle methods (on_connect, on_receive, on_disconnect) to handle WebSocket events.
class MyHandler(WebSocketHandler): async def on_connect(self, connection: ClientConnection): print(f"Client {connection.uid} connected") async def on_receive(self, connection: ClientConnection, packet: Packet): print(f"Received: {packet.data}") @rpc_method() async def ping(self, connection: ClientConnection): return "pong"
- Variables:
clients (set[ClientConnection]) – A set of all currently connected clients managed by this handler.
channels (dict[str, set[ClientConnection]]) – A dictionary mapping channel names to sets of subscribed clients.
- broadcast(data, exclude=None, predicate=None, **kwargs)¶
Broadcasts a message to all connected clients.
This method sends the provided data to every client currently connected to this handler. You can optionally exclude specific clients or filter recipients using a predicate.
# Broadcast to everyone self.broadcast("Server is restarting!") # Broadcast to everyone EXCEPT the sender self.broadcast("Someone joined", exclude=(connection,)) # Broadcast only to admins using a predicate self.broadcast("Admin alert", predicate=Is("is_admin"))
- Parameters:
data (Serializable) – The message data to broadcast.
exclude (Optional[tuple[ClientConnection, ...]]) – Clients to exclude from the broadcast.
predicate (Optional[RPC_Predicate]) – A predicate function to filter recipients.
**kwargs – Additional arguments to pass to the Packet constructor.
- Raises:
PacketError – If attempting to broadcast a packet with a source other than PacketSource.BROADCAST.
- Return type:
None
-
channels:
dict[str,set[ClientConnection]]¶
-
clients:
set[ClientConnection]¶
- async on_connect(connection)¶
Called when a new client connects (after handshake).
- Parameters:
connection (ClientConnection[TState]) – The connection object for the new client.
- async on_disconnect(connection)¶
Called when a client disconnects.
- Parameters:
connection (ClientConnection[TState]) – The connection object for the disconnected client.
- async on_receive(connection, packet)¶
Called when a client sends a confirmed packet.
- Parameters:
connection (ClientConnection[TState]) – The connection object sending the packet.
packet (Packet) – The received data packet.
-
patterns:
dict[str,set[ClientConnection]]¶
- publish(channel, data, exclude=None, predicate=None)¶
Publishes a message to all clients subscribed to a specific channel.
This method sends data only to clients that have explicitly subscribed to the target channel (or a wildcard pattern matching the channel).
# Publish to a single room self.publish("room1", "Hello room 1") # Publish to multiple rooms, excluding the sender self.publish(["room1", "room2"], "Hello!", exclude=(connection,)) # Publish to a room, but only to editors self.publish("docs.123", "Edit made", predicate=Is("is_editor"))
- Parameters:
channel (str | Iterable[str]) – The name of the channel(s) to publish the message to.
data (Serializable) – The message data to publish.
exclude (Optional[tuple[ClientConnection, ...]]) – Clients to exclude from the publication.
predicate (Optional[RPC_Predicate]) – A predicate function to filter recipients.
- Raises:
PacketError – If attempting to publish a packet with a source other than PacketSource.CHANNEL.
- Return type:
None
- register_rpc_method(func, alias_name=None)¶
Registers a function as an RPC method dynamically.
- Parameters:
func (RPC_Function) – The function to register. Must be marked with @rpc_method.
alias_name (Optional[str]) – An optional alias for the method name. If provided, the client will use this name to call the method.
- Raises:
ValueError – If the function is not marked as an RPC method.
- Return type:
None
- subscribe(client, channel)¶
Subscribes a client to one or more channels.
- Parameters:
client (ClientConnection) – The client connection to subscribe.
channel (str | Iterable) – The channel name(s) to subscribe the client to.
- Return type:
None
- unsubscribe(client, channel)¶
Unsubscribes a client from one or more channels.
- Parameters:
client (ClientConnection) – The client connection to unsubscribe.
channel (str | Iterable[str]) – The channel name(s) to unsubscribe the client from.
- Return type:
None
RPC & Rate Limiting¶
- webshocket.rpc.rate_limit(*, limit, period='1s', disconnect_on_limit_exceeded=False)¶
Decorator to mark a method in a WebSocketHandler as a rate-limited method. This limits the number of times a method can be called within a specific time period.
class MyHandler(WebSocketHandler): @rate_limit(limit=5, period="1m") # 5 calls per minute @rpc_method() async def expensive_call(self, connection: ClientConnection): ...
- Parameters:
limit (int) – The maximum number of times the method can be called within the specified time unit.
period (str) – The time unit for the rate limit (e.g., “10s”, “1m”, “1h”).
disconnect_on_limit_exceeded (bool) – Whether to disconnect the client when the rate limit is exceeded.
- Returns:
The wrapped function.
- Return type:
Callable
- webshocket.rpc.rpc_method(alias_name=None, requires=None)¶
Decorator to mark a function as an RPC method in a WebSocketHandler.
class MyHandler(WebSocketHandler): @rpc_method() async def my_method(self, connection: ClientConnection, data: Any): ... @rpc_method(alias_name="custom-name") async def another_method(self, connection: ClientConnection): ... @rpc_method(requires=IsEqual("admin", True)) async def admin_only(self, connection: ClientConnection): ...
- Parameters:
alias_name (str | None) – Optional alias to expose the method under a different name.
requires (Any | None) – Optional permission or requirement (predicate) for the method.
- Returns:
The decorator function.
- Return type:
RPCDecorator
Predicates¶
- class webshocket.predicate.All(*predicates)¶
Bases:
objectLogical AND: Returns True if ALL of the provided predicates are True.
@rpc_method(requires=All(Has("username"), IsEqual("is_active", True)))
- predicates¶
- class webshocket.predicate.Any(*predicates)¶
Bases:
objectLogical OR: Returns True if ANY of the provided predicates are True.
@rpc_method(requires=Any(IsEqual("role", "admin"), IsEqual("role", "editor")))
- predicates¶
- class webshocket.predicate.Has(key)¶
Bases:
objectChecks if the connection has a specific state attribute.
@rpc_method(requires=Has("username"))
- key¶
Data Structures¶
- class webshocket.packets.Packet(source: PacketSource, data: Any = None, rpc: RType | None = None, channel: str | None = None, timestamp: float | None = None, correlation_id: str | None = None)¶
Bases:
Struct,Generic[RType]A structured data packet for WebSocket communication.
- Variables:
data (Any) – The data payload.
source (PacketSource) – The source of the packet.
channel (str | None) – The channel associated with the packet.
timestamp (float) – The timestamp when the packet was created.
correlation_id (uuid.UUID | None) – The correlation ID associated with the packet.
rpc (RType | None) – Optional RPC request or response data.
-
channel:
str|None¶
-
correlation_id:
str|None¶
-
data:
Any¶
-
rpc:
Optional[TypeVar(RType, bound=RPCRequest|RPCResponse)]¶
-
source:
PacketSource¶
-
timestamp:
float|None¶
- class webshocket.packets.RPCRequest(method: str, args: ~collections.abc.Sequence[~typing.Any] = (), kwargs: dict[str, ~typing.Any] = <factory>, call_id: str = <factory>)¶
Bases:
StructRepresents an RPC (Remote Procedure Call) request.
-
args:
Sequence[Any]¶
-
call_id:
str¶
-
kwargs:
dict[str,Any]¶
-
method:
str¶
-
args:
- class webshocket.packets.RPCResponse(call_id: str, response: Any = None, error: None | RPCErrorCode = None, is_stream: bool = False, is_end: bool = False)¶
Bases:
StructRepresents an RPC (Remote Procedure Call) response.
-
call_id:
str¶
-
error:
None|RPCErrorCode¶
-
is_end:
bool¶
-
is_stream:
bool¶
-
response:
Any¶
-
call_id:
- webshocket.packets.deserialize(data)¶
Deserializes a byte array into a BaseModel object.
Decode the given byte data using Msgpack and validate it into the specified BaseModel object.
- Parameters:
data (
bytes) – The byte array to be deserialized.base_model – The BaseModel type to deserialize the data into.
- Return type:
- Returns:
A BaseModel object of the specified type if deserialization and validation are successful.
- webshocket.packets.serialize(base_model)¶
Serializes a BaseModel object into msgpack bytes.
Encode the given BaseModel object into a byte string using Msgpack.
- Parameters:
base_model (
Struct) – The BaseModel object to be serialized.- Return type:
bytes- Returns:
A bytes object containing the serialized data.
Exceptions¶
- exception webshocket.exceptions.ConnectionClosedError¶
Bases:
ConnectionErrorRaised when attempting an operation on a closed connection.
- exception webshocket.exceptions.ConnectionError¶
Bases:
WebSocketErrorBase class for connection-related errors.
- exception webshocket.exceptions.ConnectionFailedError¶
Bases:
ConnectionErrorRaised when a client fails to establish a connection with the server.
- exception webshocket.exceptions.InvalidURIError¶
Bases:
ConnectionErrorRaised when an invalid WebSocket URI is provided.
- exception webshocket.exceptions.MessageError¶
Bases:
WebSocketErrorBase class for message processing errors.
- exception webshocket.exceptions.PacketError¶
Bases:
MessageErrorRaised when a packet is malformed or invalid.
- exception webshocket.exceptions.PacketValidationError¶
Bases:
PacketErrorRaised when packet data fails validation.
- exception webshocket.exceptions.RPCError¶
Bases:
WebSocketErrorBase class for RPC-related errors.
- exception webshocket.exceptions.RPCMethodNotFoundError¶
Bases:
RPCErrorRaised when an RPC method is not found.
- exception webshocket.exceptions.RPCTimeoutError¶
Bases:
TimeoutErrorRaised when an RPC request times out.
- exception webshocket.exceptions.RateLimitError¶
Bases:
RPCErrorRaised when an RPC call rate limit is exceeded.
- exception webshocket.exceptions.ReceiveTimeoutError¶
Bases:
TimeoutErrorRaised when a receive operation times out.
- exception webshocket.exceptions.TimeoutError¶
Bases:
WebSocketErrorBase class for timeout-related errors.
- exception webshocket.exceptions.WebSocketError¶
Bases:
ExceptionBase exception class for all errors raised by the webshocket library.
Enums & Constants¶
- class webshocket.enum.ClientType(*values)¶
Bases:
EnumRepresents the type of client that is connected to the server.
- Variables:
FRAMEWORK – The client uses the webshocket framework.
GENERIC – The client is a generic client.
- FRAMEWORK = 'Framework'¶
- GENERIC = 'Generic'¶
- class webshocket.enum.ConnectionState(*values)¶
Bases:
IntEnumRepresents the various states of a WebSocket connection.
- Variables:
DISCONNECTED – The connection is currently not active.
CONNECTING – The connection is in the process of being established.
CONNECTED – The connection is successfully established and active.
CLOSED – The connection has been explicitly closed.
- CLOSED = 4¶
- CONNECTED = 3¶
- CONNECTING = 2¶
- DISCONNECTED = 1¶
- class webshocket.enum.PacketSource(*values)¶
Bases:
IntEnumRepresents the source of a packet that is being sent.
- Variables:
BROADCAST – A packet broadcast to all connected clients.
CHANNEL – A packet published to a specific subscribed channel of the client.
UNKNOWN – A packet with an unknown source.
CUSTOM – A packet manually sent by the server.
RPC – A packet sent in response to an RPC request.
- BROADCAST = 1¶
- CHANNEL = 2¶
- CUSTOM = 4¶
- RPC = 5¶
- UNKNOWN = 3¶
- class webshocket.enum.RPCErrorCode(*values)¶
Bases:
IntEnumDefines standard error codes for the RPC system, inspired by the JSON-RPC 2.0 spec.
- ACCESS_DENIED = -32002¶
The client is not authorized to execute the requested method.
- APPLICATION_ERROR = -32001¶
The RPC method was executed successfully but raised an intentional, application-specific exception.
- INTERNAL_SERVER_ERROR = -32603¶
A generic, unexpected error occurred on the server during the execution of the RPC method.
- INVALID_PARAMS = -32602¶
Invalid method parameters were provided (e.g., wrong type, wrong number of arguments).
- METHOD_NOT_FOUND = -32601¶
The requested RPC method does not exist on the handler.
- RATE_LIMIT_EXCEEDED = -32000¶
The client has exceeded the rate limit for the requested method.