sanic.server.async_server.AsyncioServer#

Wraps an asyncio server with functionality that might be useful to a user who needs to manage the server lifecycle manually.

class AsyncioServer(app: Sanic, loop, serve_coro, connections)

after_start#

Trigger "after_server_start" events

def after_start(self)

after_stop#

Trigger "after_server_stop" events

def after_stop(self)

before_start#

Trigger "before_server_start" events

def before_start(self)

before_stop#

Trigger "before_server_stop" events

def before_stop(self)

close#

Close the server

def close(self)

is_serving#

Returns True if the server is running, False otherwise

def is_serving(self): -> bool

serve_forever#

Serve requests until the server is stopped

def serve_forever(self)

start_serving#

Start serving requests

def start_serving(self)

startup#

Trigger "startup" operations on the app

def startup(self)

wait_closed#

Wait until the server is closed

def wait_closed(self)

sanic.server.events.trigger_events#

Trigger event callbacks (functions or async)

def trigger_events(events: Optional[Iterable[Callable[..., Any]]], loop, app: Optional[Sanic] = None, kwargs)

Parameters
events
Optional[Iterable[Callable[..., Any]]]

[description]

loop
[type]

[description]

app
Optional[Sanic]

[description]. Defaults to None.

sanic.server.goodbye.get_goodbye#

def get_goodbye(): -> <class 'str'>

sanic.server.loop.try_use_uvloop#

Use uvloop instead of the default asyncio loop.

def try_use_uvloop(): -> None

sanic.server.loop.try_windows_loop#

Try to use the WindowsSelectorEventLoopPolicy instead of the default

def try_windows_loop()

sanic.server.protocols.base_protocol.SanicProtocol#

Interface for stream protocol.

Inherits from: Protocol, BaseProtocol

class SanicProtocol(loop, app: Sanic, signal = None, connections = None, unix = None, kwargs)

The user should implement this interface. They can inherit from this class but don't need to. The implementations here do nothing (they don't raise exceptions).

When the user wants to requests a transport, they pass a protocol factory to a utility function (e.g., EventLoop.create_connection()).

When the connection is made successfully, connection_made() is called with a suitable transport object. Then data_received() will be called 0 or more times with data (bytes) received from the transport; finally, connection_lost() will be called exactly once with either an exception object or None as an argument.

State machine of calls:

start -> CM [-> DR*] [-> ER?] -> CL -> end

  • CM: connection_made()
  • DR: data_received()
  • ER: eof_received()
  • CL: connection_lost()

abort#

Force close the connection.

def abort(self)

close#

Attempt close the connection.

def close(self, timeout: Optional[float] = None)

connection_lost#

Called when the connection is lost or closed.

def connection_lost(self, exc)

The argument is an exception object or None (the latter meaning a regular EOF is received or the connection was aborted or closed).

connection_made#

Generic connection-made, with no connection_task, and no recv_buffer.

def connection_made(self, transport)

Override this for protocol-specific connection implementations.

ctx#

@property
def ctx(self)

data_received#

Called when some data is received.

def data_received(self, data: bytes)

The argument is a bytes object.

eof_received#

Called when the other end calls write_eof() or equivalent.

def eof_received(self)

If this returns a false value (including None), the transport will close itself. If it returns a true value, closing the transport is up to the protocol.

pause_writing#

Called when the transport's buffer goes over the high-water mark.

def pause_writing(self)

Pause and resume calls are paired -- pause_writing() is called once when the buffer goes strictly over the high-water mark (even if subsequent writes increases the buffer size even more), and eventually resume_writing() is called once when the buffer size reaches the low-water mark.

Note that if the buffer size equals the high-water mark, pause_writing() is not called -- it must go strictly over. Conversely, resume_writing() is called when the buffer size is equal or lower than the low-water mark. These end conditions are important to ensure that things go as expected when either mark is zero.

NOTE: This is the only Protocol callback that is not called through EventLoop.call_soon() -- if it were, it would have no effect when it's most needed (when the app keeps writing without yielding until pause_writing() is called).

receive_more#

Wait until more data is received into the Server protocol's buffer

async def receive_more(self)

resume_writing#

Called when the transport's buffer drains below the low-water mark.

def resume_writing(self)

See pause_writing() for details.

send#

Generic data write implementation with backpressure control.

async def send(self, data)

sanic.server.protocols.http_protocol.ConnectionProtocol#

class ConnectionProtocol()

sanic.server.protocols.http_protocol.Http3Protocol#

Inherits from: HttpProtocolMixin, ConnectionProtocol

class Http3Protocol(args, app: Sanic, kwargs): -> None

connection#

@property
def connection(self): -> Optional[H3Connection]

http#

@property
def http(self)

quic_event_received#

def quic_event_received(self, event: QuicEvent): -> None

version#

@property
def version(self)

sanic.server.protocols.http_protocol.HttpProtocol#

This class provides implements the HTTP 1.1 protocol on top of our

Inherits from: HttpProtocolMixin, SanicProtocol, Protocol, BaseProtocol

class HttpProtocol(loop, app: Sanic, signal = None, connections = None, state = None, unix = None, kwargs)

Sanic Server transport

abort#

Force close the connection.

def abort(self)

check_timeouts#

Runs itself periodically to enforce any expired timeouts.

def check_timeouts(self)

close#

Requires to prevent checking timeouts for closed connections

def close(self, timeout: Optional[float] = None)

close_if_idle#

Close the connection if a request is not being sent or received

def close_if_idle(self): -> bool

connection_lost#

Called when the connection is lost or closed.

def connection_lost(self, exc)

The argument is an exception object or None (the latter meaning a regular EOF is received or the connection was aborted or closed).

connection_made#

HTTP-protocol-specific new connection handler

def connection_made(self, transport)

connection_task#

Run a HTTP connection.

async def connection_task(self)

Timeouts and some additional error handling occur here, while most of everything else happens in class Http or in code called from there.

ctx#

@property
def ctx(self)

data_received#

Called when some data is received.

def data_received(self, data: bytes)

The argument is a bytes object.

eof_received#

Called when the other end calls write_eof() or equivalent.

def eof_received(self)

If this returns a false value (including None), the transport will close itself. If it returns a true value, closing the transport is up to the protocol.

http#

@property
def http(self)

pause_writing#

Called when the transport's buffer goes over the high-water mark.

def pause_writing(self)

Pause and resume calls are paired -- pause_writing() is called once when the buffer goes strictly over the high-water mark (even if subsequent writes increases the buffer size even more), and eventually resume_writing() is called once when the buffer size reaches the low-water mark.

Note that if the buffer size equals the high-water mark, pause_writing() is not called -- it must go strictly over. Conversely, resume_writing() is called when the buffer size is equal or lower than the low-water mark. These end conditions are important to ensure that things go as expected when either mark is zero.

NOTE: This is the only Protocol callback that is not called through EventLoop.call_soon() -- if it were, it would have no effect when it's most needed (when the app keeps writing without yielding until pause_writing() is called).

receive_more#

Wait until more data is received into the Server protocol's buffer

async def receive_more(self)

resume_writing#

Called when the transport's buffer drains below the low-water mark.

def resume_writing(self)

See pause_writing() for details.

send#

Writes HTTP data with backpressure control.

async def send(self, data)

version#

@property
def version(self)

sanic.server.protocols.http_protocol.HttpProtocolMixin#

class HttpProtocolMixin()

http#

@property
def http(self)

version#

@property
def version(self)

sanic.server.protocols.websocket_protocol.WebSocketProtocol#

This class provides implements the HTTP 1.1 protocol on top of our

Inherits from: HttpProtocol, HttpProtocolMixin, SanicProtocol, Protocol, BaseProtocol

class WebSocketProtocol(args, websocket_timeout: <class 'float'> = 10.0, websocket_max_size: typing.Optional[int] = None, websocket_ping_interval: typing.Optional[float] = 20.0, websocket_ping_timeout: typing.Optional[float] = 20.0, kwargs)

Sanic Server transport

abort#

Force close the connection.

def abort(self)

check_timeouts#

Runs itself periodically to enforce any expired timeouts.

def check_timeouts(self)

close#

Requires to prevent checking timeouts for closed connections

def close(self, timeout: typing.Optional[float] = None)

close_if_idle#

Close the connection if a request is not being sent or received

def close_if_idle(self)

connection_lost#

Called when the connection is lost or closed.

def connection_lost(self, exc)

The argument is an exception object or None (the latter meaning a regular EOF is received or the connection was aborted or closed).

connection_made#

HTTP-protocol-specific new connection handler

def connection_made(self, transport)

connection_task#

Run a HTTP connection.

async def connection_task(self)

Timeouts and some additional error handling occur here, while most of everything else happens in class Http or in code called from there.

ctx#

@property
def ctx(self)

data_received#

Called when some data is received.

def data_received(self, data)

The argument is a bytes object.

eof_received#

Called when the other end calls write_eof() or equivalent.

def eof_received(self): -> typing.Optional[bool]

If this returns a false value (including None), the transport will close itself. If it returns a true value, closing the transport is up to the protocol.

http#

@property
def http(self)

pause_writing#

Called when the transport's buffer goes over the high-water mark.

def pause_writing(self)

Pause and resume calls are paired -- pause_writing() is called once when the buffer goes strictly over the high-water mark (even if subsequent writes increases the buffer size even more), and eventually resume_writing() is called once when the buffer size reaches the low-water mark.

Note that if the buffer size equals the high-water mark, pause_writing() is not called -- it must go strictly over. Conversely, resume_writing() is called when the buffer size is equal or lower than the low-water mark. These end conditions are important to ensure that things go as expected when either mark is zero.

NOTE: This is the only Protocol callback that is not called through EventLoop.call_soon() -- if it were, it would have no effect when it's most needed (when the app keeps writing without yielding until pause_writing() is called).

receive_more#

Wait until more data is received into the Server protocol's buffer

async def receive_more(self)

resume_writing#

Called when the transport's buffer drains below the low-water mark.

def resume_writing(self)

See pause_writing() for details.

sanic_request_to_ws_request#

@staticmethod
def sanic_request_to_ws_request(request: <class 'sanic.request.types.Request'>)

send#

Writes HTTP data with backpressure control.

async def send(self, data)

version#

@property
def version(self)

websocket_handshake#

async def websocket_handshake(self, request, subprotocols: typing.Optional[typing.Sequence[str]] = None)

sanic.server.runners.serve#

Start asynchronous HTTP Server on an individual process.

def serve(host, port, app: Sanic, ssl: Optional[SSLContext] = None, sock: Optional[socket.socket] = None, unix: Optional[str] = None, reuse_port: bool = False, loop = None, protocol: Type[asyncio.Protocol] = <class 'sanic.server.protocols.http_protocol.HttpProtocol'>, backlog: int = 100, register_sys_signals: bool = True, run_multiple: bool = False, run_async: bool = False, connections = None, signal = <sanic.models.server_types.Signal object at 0x7ea089110650>, state = None, asyncio_server_kwargs = None, version = 1)

:param host: Address to host on :param port: Port to host on :param before_start: function to be executed before the server starts listening. Takes arguments app instance and loop :param after_start: function to be executed after the server starts listening. Takes arguments app instance and loop :param before_stop: function to be executed when a stop signal is received before it is respected. Takes arguments app instance and loop :param after_stop: function to be executed when a stop signal is received after it is respected. Takes arguments app instance and loop :param ssl: SSLContext :param sock: Socket for the server to accept connections from :param unix: Unix socket to listen on instead of TCP port :param reuse_port: True for multiple workers :param loop: asyncio compatible event loop :param run_async: bool: Do not create a new event loop for the server, and return an AsyncServer object rather than running it :param asyncio_server_kwargs: key-value args for asyncio/uvloop create_server method :return: Nothing

Parameters
host
str

Address to host on

port
int

Port to host on

app
Sanic

Sanic app instance

ssl
Optional[SSLContext]

SSLContext. Defaults to None.

sock
Optional[socket.socket]

Socket for the server to accept connections from. Defaults to None.

unix
Optional[str]

Unix socket to listen on instead of TCP port. Defaults to None.

reuse_port
bool

True for multiple workers. Defaults to False.

loop

asyncio compatible event loop. Defaults to None.

protocol
Type[asyncio.Protocol]

Protocol to use. Defaults to HttpProtocol.

backlog
int

The maximum number of queued connections passed to socket.listen(). Defaults to 100.

register_sys_signals
bool

Register SIGINT and SIGTERM. Defaults to True.

run_multiple
bool

Run multiple workers. Defaults to False.

run_async
bool

Return an AsyncServer object. Defaults to False.

connections

Connections. Defaults to None.

signal
Signal

Signal. Defaults to Signal().

state

State. Defaults to None.

asyncio_server_kwargs
Optional[Dict[str, Union[int, float]]]

key-value args for asyncio/uvloop create_server method. Defaults to None.

version
str

HTTP version. Defaults to HTTP.VERSION_1.

Return
AsyncioServer

AsyncioServer object if run_async is True.

Raises
ServerError

Cannot run HTTP/3 server without aioquic installed.

sanic.server.socket.bind_socket#

Create TCP server socket.

def bind_socket(host: str, port: int, backlog = 100): -> socket.socket

Parameters
host

IPv4, IPv6 or hostname may be specified

port

TCP port number

backlog

Maximum number of connections to queue

sanic.server.socket.bind_unix_socket#

Create unix socket.

def bind_unix_socket(path: str, mode = 438, backlog = 100): -> socket.socket

Parameters
path

filesystem path

backlog

Maximum number of connections to queue

sanic.server.socket.configure_socket#

def configure_socket(server_settings: Dict[str, Any]): -> Optional[socket.SocketType]

sanic.server.socket.remove_unix_socket#

Remove dead unix socket during server exit.

def remove_unix_socket(path: Optional[str]): -> None

sanic.server.websockets.connection.WebSocketConnection#

This is for ASGI Connections.

class WebSocketConnection(send: typing.Callable[[typing.MutableMapping[str, typing.Any]], typing.Awaitable[NoneType]], receive: typing.Callable[[], typing.Awaitable[typing.MutableMapping[str, typing.Any]]], subprotocols: typing.Optional[typing.List[str]] = None): -> None

It provides an interface similar to WebsocketProtocol, but sends/receives over an ASGI connection.

accept#

async def accept(self, subprotocols: typing.Optional[typing.List[str]] = None): -> None

close#

async def close(self, code: <class 'int'> = 1000, reason: <class 'str'> = ): -> None

receive#

async def receive(self, args, kwargs): -> typing.Union[str, bytes, NoneType]

recv#

async def recv(self, args, kwargs): -> typing.Union[str, bytes, NoneType]

send#

async def send(self, data: typing.Union[str, bytes], args, kwargs): -> None

subprotocols#

@property
def subprotocols(self)

sanic.server.websockets.frame.WebsocketFrameAssembler#

Assemble a message from frames.

class WebsocketFrameAssembler(protocol): -> None

Code borrowed from aaugustin/websockets project: https://github.com/aaugustin/websockets/blob/6eb98dd8fa5b2c896b9f6be7e8d117708da82a39/src/websockets/sync/messages.py

get#

Read the next message.

async def get(self, timeout: typing.Optional[float] = None): -> typing.Union[str, bytes, NoneType]

get_iter#

Stream the next message.

def get_iter(self): -> typing.AsyncIterator[typing.Union[str, bytes]]

Iterating the return value of :meth:get_iter yields a :class:str or :class:bytes for each frame in the message.

put#

Add frame to the next message.

async def put(self, frame: <class 'websockets.frames.Frame'>): -> None

When frame is the final frame in a message, :meth:put waits until the message is fetched, either by calling :meth:get or by iterating the return value of :meth:get_iter.

sanic.server.websockets.impl.WebsocketImplProtocol#

class WebsocketImplProtocol(ws_proto, max_queue = None, ping_interval: typing.Optional[float] = 20, ping_timeout: typing.Optional[float] = 20, close_timeout: <class 'float'> = 10, loop = None)

abort_pings#

Raise ConnectionClosed in pending keepalive pings.

def abort_pings(self): -> None

They'll never receive a pong once the connection is closed.

async_data_received#

async def async_data_received(self, data_to_send, events_to_process)

async_eof_received#

async def async_eof_received(self, data_to_send, events_to_process)

auto_close_connection#

Close the WebSocket Connection

async def auto_close_connection(self): -> None

When the opening handshake succeeds, :meth:connection_open starts this coroutine in a task. It waits for the data transfer phase to complete then it closes the TCP connection cleanly. When the opening handshake fails, :meth:fail_connection does the same. There's no data transfer phase in that case.

close#

Perform the closing handshake.

async def close(self, code: <class 'int'> = 1000, reason: <class 'str'> = ): -> None

This is a websocket-protocol level close.

Parameters
code

WebSocket close code

reason

WebSocket close reason

connection_lost#

The WebSocket Connection is Closed.

def connection_lost(self, exc)

connection_made#

async def connection_made(self, io_proto: <class 'sanic.server.protocols.base_protocol.SanicProtocol'>, loop: typing.Optional[asyncio.events.AbstractEventLoop] = None)

data_received#

def data_received(self, data)

end_connection#

def end_connection(self, code = 1000, reason = )

eof_received#

def eof_received(self): -> typing.Optional[bool]

fail_connection#

Fail the WebSocket Connection

def fail_connection(self, code: <class 'int'> = 1006, reason: <class 'str'> = ): -> <class 'bool'>

This requires:

  1. Stopping all processing of incoming data, which means cancelling pausing the underlying io protocol. The close code will be 1006 unless a close frame was received earlier.
  2. Sending a close frame with an appropriate code if the opening handshake succeeded and the other side is likely to process it.
  3. Closing the connection. :meth:auto_close_connection takes care of this. (The specification describes these steps in the opposite order.)

keepalive_ping#

Send a Ping frame and wait for a Pong frame at regular intervals.

async def keepalive_ping(self): -> None

This coroutine exits when the connection terminates and one of the following happens:

  • :meth:ping raises :exc:ConnectionClosed, or
  • :meth:auto_close_connection cancels :attr:keepalive_ping_task.

pause_frames#

def pause_frames(self)

ping#

Send a ping.

async def ping(self, data: typing.Union[str, bytes, NoneType] = None): -> <class '_asyncio.Future'>

Return an :class:~asyncio.Future that will be resolved when the corresponding pong is received. You can ignore it if you don't intend to wait. A ping may serve as a keepalive or as a check that the remote endpoint received all messages up to this point:: await pong_event = ws.ping() await pong_event # only if you want to wait for the pong By default, the ping contains four random bytes. This payload may be overridden with the optional data argument which must be a string (which will be encoded to UTF-8) or a bytes-like object.

pong#

Send a pong.

async def pong(self, data: typing.Union[str, bytes] = b''): -> None

An unsolicited pong may serve as a unidirectional heartbeat. The payload may be set with the optional data argument which must be a string (which will be encoded to UTF-8) or a bytes-like object.

process_events#

Process a list of incoming events.

async def process_events(self, events: typing.Sequence[typing.Union[websockets.http11.Request, websockets.http11.Response, websockets.frames.Frame]]): -> None

process_pong#

async def process_pong(self, frame: <class 'websockets.frames.Frame'>): -> None

recv#

Receive the next message.

async def recv(self, timeout: typing.Optional[float] = None): -> typing.Union[str, bytes, NoneType]

Return a :class:str for a text frame and :class:bytes for a binary frame. When the end of the message stream is reached, :meth:recv raises

Raises
~websockets.exceptions.ConnectionClosed

when the connection is closed

asyncio.CancelledError

if the websocket closes while waiting

ServerError

if two tasks call :meth:recv or :meth:recv_streaming concurrently

recv_burst#

Receive the messages which have arrived since last checking.

async def recv_burst(self, max_recv = 256): -> typing.Sequence[typing.Union[str, bytes]]

Return a :class:list containing :class:str for a text frame and :class:bytes for a binary frame. When the end of the message stream is reached, :meth:recv_burst raises :exc:~websockets.exceptions.ConnectionClosed. Specifically, it raises :exc:~websockets.exceptions.ConnectionClosedOK after a normal connection closure and

Raises
~websockets.exceptions.ConnectionClosed

when the connection is closed

ServerError

if two tasks call :meth:recv_burst or :meth:recv_streaming concurrently

recv_streaming#

Receive the next message frame by frame.

def recv_streaming(self): -> typing.AsyncIterator[typing.Union[str, bytes]]

Return an iterator of :class:str for a text frame and :class:bytes for a binary frame. The iterator should be exhausted, or else the connection will become unusable. With the exception of the return value, :meth:recv_streaming behaves like :meth:recv.

resume_frames#

def resume_frames(self)

send#

Send a message.

async def send(self, message: typing.Union[str, bytes, typing.Iterable[typing.Union[str, bytes]]]): -> None

A string (:class:str) is sent as a Text frame_. A bytestring or bytes-like object (:class:bytes, :class:bytearray, or

Raises
TypeError

for unsupported inputs

send_data#

async def send_data(self, data_to_send)

subprotocol#

@property
def subprotocol(self)

wait_for_connection_lost#

Wait until the TCP connection is closed or timeout elapses.

async def wait_for_connection_lost(self, timeout = None): -> <class 'bool'>

If timeout is None, wait forever. Recommend you should pass in self.close_timeout as timeout

Return True if the connection is closed and False otherwise.