Table of Contents
ProcessStatesanic.worker.constantsRestartOrdersanic.worker.constantsInspectorsanic.worker.inspectorAppLoadersanic.worker.loaderCertLoadersanic.worker.loaderMonitorCyclesanic.worker.managerWorkerManagersanic.worker.managerWorkerMultiplexersanic.worker.multiplexerWorkersanic.worker.processWorkerProcesssanic.worker.processget_nowsanic.worker.processReloadersanic.worker.reloaderRestartersanic.worker.restarterworker_servesanic.worker.serveWorkerStatesanic.worker.statesanic.worker.constants.ProcessState#
Process states.
class ProcessState(value, names = None, module = None, qualname = None, type = None, start = 1, boundary = None)
sanic.worker.constants.RestartOrder#
Available restart orders.
class RestartOrder(value, names = None, module = None, qualname = None, type = None, start = 1, boundary = None)
sanic.worker.inspector.Inspector#
Inspector for Sanic workers.
class Inspector(publisher: Connection, app_info: Dict[str, Any], worker_state: Mapping[str, Any], host: str, port: int, api_key: str, tls_key: Union[Path, str, Default], tls_cert: Union[Path, str, Default])
This class is used to create an inspector for Sanic workers. It is instantiated by the worker class and is used to create a Sanic app that can be used to inspect and control the workers and the server.
It is not intended to be used directly by the user.
See Inspector for more information.
Parameters
- publisher
Connection The connection to the worker.
- app_info
Dict[str, Any] Information about the app.
- worker_state
Mapping[str, Any] The state of the worker.
- host
str The host to bind the inspector to.
- port
int The port to bind the inspector to.
- api_key
str The API key to use for authentication.
- tls_key
Union[Path, str, Default] The path to the TLS key file.
- tls_cert
Union[Path, str, Default] The path to the TLS cert file.
reload#
Reload the workers
def reload(self, zero_downtime: bool = False): -> None
Parameters
- zero_downtime
bool Whether to use zero downtime reload. Defaults to
False
.
scale#
Scale the number of workers
def scale(self, replicas: Union[str, int]): -> str
Parameters
- replicas
Union[str, int] The number of workers to scale to.
Return
- str
A log message.
sanic.worker.loader.AppLoader#
A helper to load application instances.
class AppLoader(module_input: str = "", as_factory: bool = False, as_simple: bool = False, args: Any = None, factory: Optional[Callable[[], SanicApp]] = None): -> None
Generally used by the worker to load the application instance.
See Dynamic Applications for information on when you may need to use this.
Parameters
- module_input
str The module to load the application from.
- as_factory
bool Whether the application is a factory.
- as_simple
bool Whether the application is a simple server.
- args
Any Arguments to pass to the application factory.
- factory
Callable[[], SanicApp] A callable that returns a Sanic application instance.
load#
def load(self): -> SanicApp
sanic.worker.loader.CertLoader#
class CertLoader(ssl_data: Optional[Union[SSLContext, Dict[str, Union[str, os.PathLike]]]])
load#
def load(self, app: SanicApp)
sanic.worker.manager.MonitorCycle#
Enum where members are also (and must be) ints
class MonitorCycle(value, names = None, module = None, qualname = None, type = None, start = 1, boundary = None)
sanic.worker.manager.WorkerManager#
Manage all of the processes.
class WorkerManager(number: <class 'int'>, serve, server_settings, context, monitor_pubsub, worker_state)
This class is used to manage all of the processes. It is instantiated by Sanic when in multiprocess mode (which is OOTB default) and is used to start, stop, and restart the worker processes.
You can access it to interact with it ONLY when on the main process.
Therefore, you should really only access it from within the
main_process_ready
event listener.
from sanic import Sanic
app = Sanic("MyApp")
all_workers#
@property
def all_workers(self): -> typing.Iterable[typing.Tuple[str, sanic.worker.process.Worker]]
create_server#
Create a new server process.
def create_server(self): -> <class 'sanic.worker.process.Worker'>
Return
- Worker
The Worker instance
durable_processes#
@property
def durable_processes(self)
manage#
Instruct Sanic to manage a custom process.
def manage(self, ident: <class 'str'>, func: typing.Callable[..., typing.Any], kwargs: typing.Dict[str, typing.Any], transient: <class 'bool'> = False, restartable: typing.Optional[bool] = None, tracked: <class 'bool'> = True, auto_start: <class 'bool'> = True, workers: <class 'int'> = 1): -> <class 'sanic.worker.process.Worker'>
Parameters
- ident
str A name for the worker process
- func
Callable[..., Any] The function to call in the background process
- kwargs
Dict[str, Any] Arguments to pass to the function
- transient
bool Whether to mark the process as transient. If
True
then the Worker Manager will restart the process along with any global restart (ex: auto-reload), defaults toFalse
- restartable
Optional[bool] Whether to mark the process as restartable. If
True
then the Worker Manager will be able to restart the process if prompted. Iftransient=True
, this property will be implied to beTrue
, defaults toNone
- tracked
bool Whether to track the process after completion, defaults to
True
- auto_start
bool Whether to start the process immediately, defaults to
True
- workers
int The number of worker processes to run. Defaults to
1
.
Return
- Worker
The Worker instance
monitor#
Monitor the worker processes.
def monitor(self)
First, wait for all of the workers to acknowledge that they are ready. Then, wait for messages from the workers. If a message is received then it is processed and the state of the worker is updated.
Also used to restart, shutdown, and scale the workers.
Raises
- ServerKilled
Raised when a worker fails to come online.
remove_worker#
def remove_worker(self, worker: <class 'sanic.worker.process.Worker'>): -> None
restart#
Restart the worker processes.
def restart(self, process_names: typing.Optional[typing.List[str]] = None, restart_order = SHUTDOWN_FIRST, kwargs)
Parameters
- process_names
Optional[List[str]] The names of the processes to restart. If
None
then all processes will be restarted. Defaults toNone
.
- restart_order
RestartOrder The order in which to restart the processes. Defaults to
RestartOrder.SHUTDOWN_FIRST
.
scale#
def scale(self, num_worker: <class 'int'>)
shutdown_server#
Shutdown a server process.
def shutdown_server(self, ident: typing.Optional[str] = None): -> None
Parameters
- ident
Optional[str] The name of the server process to shutdown. If
None
then a random server will be chosen. Defaults toNone
.
workers#
Get all of the workers.
@property
def workers(self): -> typing.List[sanic.worker.process.Worker]
sanic.worker.multiplexer.WorkerMultiplexer#
Multiplexer for Sanic workers.
class WorkerMultiplexer(monitor_publisher: <class 'multiprocessing.connection.Connection'>, worker_state: typing.Dict[str, typing.Any])
This is instantiated inside of worker porocesses only. It is used to communicate with the monitor process.
Parameters
- monitor_publisher
Connection The connection to the monitor.
- worker_state
Dict[str, Any] The state of the worker.
manage#
Manages the initiation and monitoring of a worker process.
def manage(self, ident: <class 'str'>, func: typing.Callable[..., typing.Any], kwargs: typing.Dict[str, typing.Any], transient: <class 'bool'> = False, restartable: typing.Optional[bool] = None, tracked: <class 'bool'> = False, auto_start: <class 'bool'> = True, workers: <class 'int'> = 1): -> None
Parameters
- ident
str A unique identifier for the worker process.
- func
Callable[..., Any] The function to be executed in the worker process.
- kwargs
Dict[str, Any] A dictionary of arguments to be passed to
func
.
- transient
bool Flag to mark the process as transient. If
True
, the Worker Manager will restart the process with any global restart (e.g., auto-reload). Defaults toFalse
.
- restartable
Optional[bool] Flag to mark the process as restartable. If
True
, the Worker Manager can restart the process if prompted. Defaults toNone
.
- tracked
bool Flag to indicate whether the process should be tracked after its completion. Defaults to
False
.
- auto_start
bool Flag to indicate whether the process should be started
- workers
int The number of worker processes to run. Defaults to 1.
reload#
Restart the worker.
def reload(self, name: <class 'str'> = , all_workers: <class 'bool'> = False, zero_downtime: <class 'bool'> = False)
Parameters
- name
str The name of the process to restart.
- all_workers
bool Whether to restart all workers.
- zero_downtime
bool Whether to restart with zero downtime.
restart#
Restart the worker.
def restart(self, name: <class 'str'> = , all_workers: <class 'bool'> = False, zero_downtime: <class 'bool'> = False)
Parameters
- name
str The name of the process to restart.
- all_workers
bool Whether to restart all workers.
- zero_downtime
bool Whether to restart with zero downtime.
scale#
Scale the number of workers.
def scale(self, num_workers: <class 'int'>)
Parameters
- num_workers
int The number of workers to scale to.
set_serving#
Set the worker to serving.
def set_serving(self, serving: <class 'bool'>): -> None
Parameters
- serving
bool Whether the worker is serving.
terminate#
Terminate the worker.
def terminate(self, early: <class 'bool'> = False)
Parameters
- early
bool Whether to terminate early.
sanic.worker.process.Worker#
class Worker(ident: <class 'str'>, serve, server_settings, context: <class 'multiprocessing.context.BaseContext'>, worker_state: typing.Dict[str, typing.Any], num: <class 'int'> = 1, restartable: <class 'bool'> = False, tracked: <class 'bool'> = True, auto_start: <class 'bool'> = True)
create_process#
def create_process(self): -> <class 'sanic.worker.process.WorkerProcess'>
has_alive_processes#
def has_alive_processes(self): -> <class 'bool'>
sanic.worker.process.WorkerProcess#
A worker process.
class WorkerProcess(factory, name, target, kwargs, worker_state, restartable: <class 'bool'> = False)
exit#
def exit(self)
exitcode#
@property
def exitcode(self)
is_alive#
def is_alive(self)
join#
def join(self)
pid#
@property
def pid(self)
restart#
def restart(self, restart_order = SHUTDOWN_FIRST, kwargs)
set_state#
def set_state(self, state: <enum 'ProcessState'>, force = False)
spawn#
def spawn(self)
start#
def start(self)
terminate#
def terminate(self)
sanic.worker.process.get_now#
def get_now()
sanic.worker.reloader.Reloader#
class Reloader(publisher: Connection, interval: float, reload_dirs: Set[Path], app_loader: AppLoader)
check_file#
@staticmethod
def check_file(filename, mtimes): -> bool
files#
def files(self)
python_files#
This iterates over all relevant Python files.
def python_files(self)
It goes through all loaded files from modules, all files in folders of already loaded modules as well as all files reachable through a package.
reload#
def reload(self, reloaded_files)
stop#
def stop(self, _)
sanic.worker.restarter.Restarter#
class Restarter()
restart#
Restart the worker processes.
def restart(self, transient_processes: typing.List[sanic.worker.process.WorkerProcess], durable_processes: typing.List[sanic.worker.process.WorkerProcess], process_names: typing.Optional[typing.List[str]] = None, restart_order = SHUTDOWN_FIRST, kwargs): -> None
Parameters
- process_names
Optional[List[str]] The names of the processes to restart. If
None
, then all processes will be restarted. Defaults toNone
.
- restart_order
RestartOrder The order in which to restart the processes. Defaults to
RestartOrder.SHUTDOWN_FIRST
.
sanic.worker.serve.worker_serve#
def worker_serve(host, port, app_name: <class 'str'>, monitor_publisher: typing.Optional[multiprocessing.connection.Connection], app_loader: <class 'sanic.worker.loader.AppLoader'>, worker_state: typing.Optional[typing.Dict[str, typing.Any]] = None, server_info: typing.Optional[typing.Dict[str, typing.List[sanic.application.state.ApplicationServerInfo]]] = None, ssl: typing.Union[ssl.SSLContext, typing.Dict[str, typing.Union[str, os.PathLike]], NoneType] = None, sock: typing.Optional[socket.socket] = None, unix: typing.Optional[str] = None, reuse_port: <class 'bool'> = False, loop = None, protocol: typing.Type[asyncio.protocols.Protocol] = <class 'sanic.server.protocols.http_protocol.HttpProtocol'>, backlog: <class 'int'> = 100, register_sys_signals: <class 'bool'> = True, run_multiple: <class 'bool'> = False, run_async: <class 'bool'> = False, connections = None, signal = <sanic.models.server_types.Signal object at 0x7ed7be3af8d0>, state = None, asyncio_server_kwargs = None, version = 1, config = None, passthru: typing.Optional[typing.Dict[str, typing.Any]] = None)
sanic.worker.state.WorkerState#
A Mapping is a generic container for associating key/value
class WorkerState(state: typing.Dict[str, typing.Any], current: <class 'str'>): -> None
pairs.
This class provides concrete generic implementations of all methods except for getitem, iter, and len.
full#
def full(self): -> typing.Dict[str, typing.Any]
items#
D.items() -> a set-like object providing a view on D's items
def items(self): -> typing.ItemsView[str, typing.Any]
keys#
D.keys() -> a set-like object providing a view on D's keys
def keys(self): -> typing.KeysView[str]
pop#
def pop(self): -> None
update#
def update(self, mapping: typing.Mapping[str, typing.Any]): -> None
values#
D.values() -> an object providing a view on D's values
def values(self): -> typing.ValuesView[typing.Any]