1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081 |
- """A WSGI and HTTP server for use **during development only**. This
- server is convenient to use, but is not designed to be particularly
- stable, secure, or efficient. Use a dedicate WSGI server and HTTP
- server when deploying to production.
- It provides features like interactive debugging and code reloading. Use
- ``run_simple`` to start the server. Put this in a ``run.py`` script:
- .. code-block:: python
- from myapp import create_app
- from werkzeug import run_simple
- """
- import io
- import os
- import platform
- import signal
- import socket
- import socketserver
- import sys
- import typing as t
- import warnings
- from datetime import datetime as dt
- from datetime import timedelta
- from datetime import timezone
- from http.server import BaseHTTPRequestHandler
- from http.server import HTTPServer
- from ._internal import _log
- from ._internal import _wsgi_encoding_dance
- from .exceptions import InternalServerError
- from .urls import uri_to_iri
- from .urls import url_parse
- from .urls import url_unquote
- try:
- import ssl
- except ImportError:
- class _SslDummy:
- def __getattr__(self, name: str) -> t.Any:
- raise RuntimeError("SSL support unavailable") # noqa: B904
- ssl = _SslDummy() # type: ignore
- _log_add_style = True
- if os.name == "nt":
- try:
- __import__("colorama")
- except ImportError:
- _log_add_style = False
- can_fork = hasattr(os, "fork")
- if can_fork:
- ForkingMixIn = socketserver.ForkingMixIn
- else:
- class ForkingMixIn: # type: ignore
- pass
- try:
- af_unix = socket.AF_UNIX
- except AttributeError:
- af_unix = None # type: ignore
- LISTEN_QUEUE = 128
- can_open_by_fd = not platform.system() == "Windows" and hasattr(socket, "fromfd")
- _TSSLContextArg = t.Optional[
- t.Union["ssl.SSLContext", t.Tuple[str, t.Optional[str]], "te.Literal['adhoc']"]
- ]
- if t.TYPE_CHECKING:
- import typing_extensions as te # noqa: F401
- from _typeshed.wsgi import WSGIApplication
- from _typeshed.wsgi import WSGIEnvironment
- from cryptography.hazmat.primitives.asymmetric.rsa import (
- RSAPrivateKeyWithSerialization,
- )
- from cryptography.x509 import Certificate
- class DechunkedInput(io.RawIOBase):
- """An input stream that handles Transfer-Encoding 'chunked'"""
- def __init__(self, rfile: t.IO[bytes]) -> None:
- self._rfile = rfile
- self._done = False
- self._len = 0
- def readable(self) -> bool:
- return True
- def read_chunk_len(self) -> int:
- try:
- line = self._rfile.readline().decode("latin1")
- _len = int(line.strip(), 16)
- except ValueError as e:
- raise OSError("Invalid chunk header") from e
- if _len < 0:
- raise OSError("Negative chunk length not allowed")
- return _len
- def readinto(self, buf: bytearray) -> int: # type: ignore
- read = 0
- while not self._done and read < len(buf):
- if self._len == 0:
- # This is the first chunk or we fully consumed the previous
- # one. Read the next length of the next chunk
- self._len = self.read_chunk_len()
- if self._len == 0:
- # Found the final chunk of size 0. The stream is now exhausted,
- # but there is still a final newline that should be consumed
- self._done = True
- if self._len > 0:
- # There is data (left) in this chunk, so append it to the
- # buffer. If this operation fully consumes the chunk, this will
- # reset self._len to 0.
- n = min(len(buf), self._len)
- # If (read + chunk size) becomes more than len(buf), buf will
- # grow beyond the original size and read more data than
- # required. So only read as much data as can fit in buf.
- if read + n > len(buf):
- buf[read:] = self._rfile.read(len(buf) - read)
- self._len -= len(buf) - read
- read = len(buf)
- else:
- buf[read : read + n] = self._rfile.read(n)
- self._len -= n
- read += n
- if self._len == 0:
- # Skip the terminating newline of a chunk that has been fully
- # consumed. This also applies to the 0-sized final chunk
- terminator = self._rfile.readline()
- if terminator not in (b"\n", b"\r\n", b"\r"):
- raise OSError("Missing chunk terminating newline")
- return read
- class WSGIRequestHandler(BaseHTTPRequestHandler):
- """A request handler that implements WSGI dispatching."""
- server: "BaseWSGIServer"
- @property
- def server_version(self) -> str: # type: ignore
- from . import __version__
- return f"Werkzeug/{__version__}"
- def make_environ(self) -> "WSGIEnvironment":
- request_url = url_parse(self.path)
- def shutdown_server() -> None:
- warnings.warn(
- "The 'environ['werkzeug.server.shutdown']' function is"
- " deprecated and will be removed in Werkzeug 2.1.",
- stacklevel=2,
- )
- self.server.shutdown_signal = True
- url_scheme = "http" if self.server.ssl_context is None else "https"
- if not self.client_address:
- self.client_address = ("<local>", 0)
- elif isinstance(self.client_address, str):
- self.client_address = (self.client_address, 0)
- # If there was no scheme but the path started with two slashes,
- # the first segment may have been incorrectly parsed as the
- # netloc, prepend it to the path again.
- if not request_url.scheme and request_url.netloc:
- path_info = f"/{request_url.netloc}{request_url.path}"
- else:
- path_info = request_url.path
- path_info = url_unquote(path_info)
- environ: "WSGIEnvironment" = {
- "wsgi.version": (1, 0),
- "wsgi.url_scheme": url_scheme,
- "wsgi.input": self.rfile,
- "wsgi.errors": sys.stderr,
- "wsgi.multithread": self.server.multithread,
- "wsgi.multiprocess": self.server.multiprocess,
- "wsgi.run_once": False,
- "werkzeug.server.shutdown": shutdown_server,
- "werkzeug.socket": self.connection,
- "SERVER_SOFTWARE": self.server_version,
- "REQUEST_METHOD": self.command,
- "SCRIPT_NAME": "",
- "PATH_INFO": _wsgi_encoding_dance(path_info),
- "QUERY_STRING": _wsgi_encoding_dance(request_url.query),
- # Non-standard, added by mod_wsgi, uWSGI
- "REQUEST_URI": _wsgi_encoding_dance(self.path),
- # Non-standard, added by gunicorn
- "RAW_URI": _wsgi_encoding_dance(self.path),
- "REMOTE_ADDR": self.address_string(),
- "REMOTE_PORT": self.port_integer(),
- "SERVER_NAME": self.server.server_address[0],
- "SERVER_PORT": str(self.server.server_address[1]),
- "SERVER_PROTOCOL": self.request_version,
- }
- for key, value in self.headers.items():
- key = key.upper().replace("-", "_")
- value = value.replace("\r\n", "")
- if key not in ("CONTENT_TYPE", "CONTENT_LENGTH"):
- key = f"HTTP_{key}"
- if key in environ:
- value = f"{environ[key]},{value}"
- environ[key] = value
- if environ.get("HTTP_TRANSFER_ENCODING", "").strip().lower() == "chunked":
- environ["wsgi.input_terminated"] = True
- environ["wsgi.input"] = DechunkedInput(environ["wsgi.input"])
- # Per RFC 2616, if the URL is absolute, use that as the host.
- # We're using "has a scheme" to indicate an absolute URL.
- if request_url.scheme and request_url.netloc:
- environ["HTTP_HOST"] = request_url.netloc
- try:
- # binary_form=False gives nicer information, but wouldn't be compatible with
- # what Nginx or Apache could return.
- peer_cert = self.connection.getpeercert(binary_form=True)
- if peer_cert is not None:
- # Nginx and Apache use PEM format.
- environ["SSL_CLIENT_CERT"] = ssl.DER_cert_to_PEM_cert(peer_cert)
- except ValueError:
- # SSL handshake hasn't finished.
- self.server.log("error", "Cannot fetch SSL peer certificate info")
- except AttributeError:
- # Not using TLS, the socket will not have getpeercert().
- pass
- return environ
- def run_wsgi(self) -> None:
- if self.headers.get("Expect", "").lower().strip() == "100-continue":
- self.wfile.write(b"HTTP/1.1 100 Continue\r\n\r\n")
- self.environ = environ = self.make_environ()
- status_set: t.Optional[str] = None
- headers_set: t.Optional[t.List[t.Tuple[str, str]]] = None
- status_sent: t.Optional[str] = None
- headers_sent: t.Optional[t.List[t.Tuple[str, str]]] = None
- def write(data: bytes) -> None:
- nonlocal status_sent, headers_sent
- assert status_set is not None, "write() before start_response"
- assert headers_set is not None, "write() before start_response"
- if status_sent is None:
- status_sent = status_set
- headers_sent = headers_set
- try:
- code_str, msg = status_sent.split(None, 1)
- except ValueError:
- code_str, msg = status_sent, ""
- code = int(code_str)
- self.send_response(code, msg)
- header_keys = set()
- for key, value in headers_sent:
- self.send_header(key, value)
- key = key.lower()
- header_keys.add(key)
- if not (
- "content-length" in header_keys
- or environ["REQUEST_METHOD"] == "HEAD"
- or code < 200
- or code in (204, 304)
- ):
- self.close_connection = True
- self.send_header("Connection", "close")
- if "server" not in header_keys:
- self.send_header("Server", self.version_string())
- if "date" not in header_keys:
- self.send_header("Date", self.date_time_string())
- self.end_headers()
- assert isinstance(data, bytes), "applications must write bytes"
- self.wfile.write(data)
- self.wfile.flush()
- def start_response(status, headers, exc_info=None): # type: ignore
- nonlocal status_set, headers_set
- if exc_info:
- try:
- if headers_sent:
- raise exc_info[1].with_traceback(exc_info[2])
- finally:
- exc_info = None
- elif headers_set:
- raise AssertionError("Headers already set")
- status_set = status
- headers_set = headers
- return write
- def execute(app: "WSGIApplication") -> None:
- application_iter = app(environ, start_response)
- try:
- for data in application_iter:
- write(data)
- if not headers_sent:
- write(b"")
- finally:
- if hasattr(application_iter, "close"):
- application_iter.close() # type: ignore
- try:
- execute(self.server.app)
- except (ConnectionError, socket.timeout) as e:
- self.connection_dropped(e, environ)
- except Exception:
- if self.server.passthrough_errors:
- raise
- from .debug.tbtools import get_current_traceback
- traceback = get_current_traceback(ignore_system_exceptions=True)
- try:
- # if we haven't yet sent the headers but they are set
- # we roll back to be able to set them again.
- if status_sent is None:
- status_set = None
- headers_set = None
- execute(InternalServerError())
- except Exception:
- pass
- self.server.log("error", "Error on request:\n%s", traceback.plaintext)
- def handle(self) -> None:
- """Handles a request ignoring dropped connections."""
- try:
- BaseHTTPRequestHandler.handle(self)
- except (ConnectionError, socket.timeout) as e:
- self.connection_dropped(e)
- except Exception as e:
- if self.server.ssl_context is not None and is_ssl_error(e):
- self.log_error("SSL error occurred: %s", e)
- else:
- raise
- if self.server.shutdown_signal:
- self.initiate_shutdown()
- def initiate_shutdown(self) -> None:
- if is_running_from_reloader():
- # Windows does not provide SIGKILL, go with SIGTERM then.
- sig = getattr(signal, "SIGKILL", signal.SIGTERM)
- os.kill(os.getpid(), sig)
- self.server._BaseServer__shutdown_request = True # type: ignore
- def connection_dropped(
- self, error: BaseException, environ: t.Optional["WSGIEnvironment"] = None
- ) -> None:
- """Called if the connection was closed by the client. By default
- nothing happens.
- """
- def handle_one_request(self) -> None:
- """Handle a single HTTP request."""
- self.raw_requestline = self.rfile.readline()
- if not self.raw_requestline:
- self.close_connection = True
- elif self.parse_request():
- self.run_wsgi()
- def send_response(self, code: int, message: t.Optional[str] = None) -> None:
- """Send the response header and log the response code."""
- self.log_request(code)
- if message is None:
- message = self.responses[code][0] if code in self.responses else ""
- if self.request_version != "HTTP/0.9":
- hdr = f"{self.protocol_version} {code} {message}\r\n"
- self.wfile.write(hdr.encode("ascii"))
- def version_string(self) -> str:
- return super().version_string().strip()
- def address_string(self) -> str:
- if getattr(self, "environ", None):
- return self.environ["REMOTE_ADDR"] # type: ignore
- if not self.client_address:
- return "<local>"
- return self.client_address[0]
- def port_integer(self) -> int:
- return self.client_address[1]
- def log_request(
- self, code: t.Union[int, str] = "-", size: t.Union[int, str] = "-"
- ) -> None:
- try:
- path = uri_to_iri(self.path)
- msg = f"{self.command} {path} {self.request_version}"
- except AttributeError:
- # path isn't set if the requestline was bad
- msg = self.requestline
- code = str(code)
- if _log_add_style:
- if code[0] == "1": # 1xx - Informational
- msg = _ansi_style(msg, "bold")
- elif code == "200": # 2xx - Success
- pass
- elif code == "304": # 304 - Resource Not Modified
- msg = _ansi_style(msg, "cyan")
- elif code[0] == "3": # 3xx - Redirection
- msg = _ansi_style(msg, "green")
- elif code == "404": # 404 - Resource Not Found
- msg = _ansi_style(msg, "yellow")
- elif code[0] == "4": # 4xx - Client Error
- msg = _ansi_style(msg, "bold", "red")
- else: # 5xx, or any other response
- msg = _ansi_style(msg, "bold", "magenta")
- self.log("info", '"%s" %s %s', msg, code, size)
- def log_error(self, format: str, *args: t.Any) -> None:
- self.log("error", format, *args)
- def log_message(self, format: str, *args: t.Any) -> None:
- self.log("info", format, *args)
- def log(self, type: str, message: str, *args: t.Any) -> None:
- _log(
- type,
- f"{self.address_string()} - - [{self.log_date_time_string()}] {message}\n",
- *args,
- )
- def _ansi_style(value: str, *styles: str) -> str:
- codes = {
- "bold": 1,
- "red": 31,
- "green": 32,
- "yellow": 33,
- "magenta": 35,
- "cyan": 36,
- }
- for style in styles:
- value = f"\x1b[{codes[style]}m{value}"
- return f"{value}\x1b[0m"
- def generate_adhoc_ssl_pair(
- cn: t.Optional[str] = None,
- ) -> t.Tuple["Certificate", "RSAPrivateKeyWithSerialization"]:
- try:
- from cryptography import x509
- from cryptography.x509.oid import NameOID
- from cryptography.hazmat.backends import default_backend
- from cryptography.hazmat.primitives import hashes
- from cryptography.hazmat.primitives.asymmetric import rsa
- except ImportError:
- raise TypeError(
- "Using ad-hoc certificates requires the cryptography library."
- ) from None
- backend = default_backend()
- pkey = rsa.generate_private_key(
- public_exponent=65537, key_size=2048, backend=backend
- )
- # pretty damn sure that this is not actually accepted by anyone
- if cn is None:
- cn = "*"
- subject = x509.Name(
- [
- x509.NameAttribute(NameOID.ORGANIZATION_NAME, "Dummy Certificate"),
- x509.NameAttribute(NameOID.COMMON_NAME, cn),
- ]
- )
- backend = default_backend()
- cert = (
- x509.CertificateBuilder()
- .subject_name(subject)
- .issuer_name(subject)
- .public_key(pkey.public_key())
- .serial_number(x509.random_serial_number())
- .not_valid_before(dt.now(timezone.utc))
- .not_valid_after(dt.now(timezone.utc) + timedelta(days=365))
- .add_extension(x509.ExtendedKeyUsage([x509.OID_SERVER_AUTH]), critical=False)
- .add_extension(x509.SubjectAlternativeName([x509.DNSName(cn)]), critical=False)
- .sign(pkey, hashes.SHA256(), backend)
- )
- return cert, pkey
- def make_ssl_devcert(
- base_path: str, host: t.Optional[str] = None, cn: t.Optional[str] = None
- ) -> t.Tuple[str, str]:
- """Creates an SSL key for development. This should be used instead of
- the ``'adhoc'`` key which generates a new cert on each server start.
- It accepts a path for where it should store the key and cert and
- either a host or CN. If a host is given it will use the CN
- ``*.host/CN=host``.
- For more information see :func:`run_simple`.
- .. versionadded:: 0.9
- :param base_path: the path to the certificate and key. The extension
- ``.crt`` is added for the certificate, ``.key`` is
- added for the key.
- :param host: the name of the host. This can be used as an alternative
- for the `cn`.
- :param cn: the `CN` to use.
- """
- if host is not None:
- cn = f"*.{host}/CN={host}"
- cert, pkey = generate_adhoc_ssl_pair(cn=cn)
- from cryptography.hazmat.primitives import serialization
- cert_file = f"{base_path}.crt"
- pkey_file = f"{base_path}.key"
- with open(cert_file, "wb") as f:
- f.write(cert.public_bytes(serialization.Encoding.PEM))
- with open(pkey_file, "wb") as f:
- f.write(
- pkey.private_bytes(
- encoding=serialization.Encoding.PEM,
- format=serialization.PrivateFormat.TraditionalOpenSSL,
- encryption_algorithm=serialization.NoEncryption(),
- )
- )
- return cert_file, pkey_file
- def generate_adhoc_ssl_context() -> "ssl.SSLContext":
- """Generates an adhoc SSL context for the development server."""
- import tempfile
- import atexit
- cert, pkey = generate_adhoc_ssl_pair()
- from cryptography.hazmat.primitives import serialization
- cert_handle, cert_file = tempfile.mkstemp()
- pkey_handle, pkey_file = tempfile.mkstemp()
- atexit.register(os.remove, pkey_file)
- atexit.register(os.remove, cert_file)
- os.write(cert_handle, cert.public_bytes(serialization.Encoding.PEM))
- os.write(
- pkey_handle,
- pkey.private_bytes(
- encoding=serialization.Encoding.PEM,
- format=serialization.PrivateFormat.TraditionalOpenSSL,
- encryption_algorithm=serialization.NoEncryption(),
- ),
- )
- os.close(cert_handle)
- os.close(pkey_handle)
- ctx = load_ssl_context(cert_file, pkey_file)
- return ctx
- def load_ssl_context(
- cert_file: str, pkey_file: t.Optional[str] = None, protocol: t.Optional[int] = None
- ) -> "ssl.SSLContext":
- """Loads SSL context from cert/private key files and optional protocol.
- Many parameters are directly taken from the API of
- :py:class:`ssl.SSLContext`.
- :param cert_file: Path of the certificate to use.
- :param pkey_file: Path of the private key to use. If not given, the key
- will be obtained from the certificate file.
- :param protocol: A ``PROTOCOL`` constant from the :mod:`ssl` module.
- Defaults to :data:`ssl.PROTOCOL_TLS_SERVER`.
- """
- if protocol is None:
- protocol = ssl.PROTOCOL_TLS_SERVER
- ctx = ssl.SSLContext(protocol)
- ctx.load_cert_chain(cert_file, pkey_file)
- return ctx
- def is_ssl_error(error: t.Optional[Exception] = None) -> bool:
- """Checks if the given error (or the current one) is an SSL error."""
- if error is None:
- error = t.cast(Exception, sys.exc_info()[1])
- return isinstance(error, ssl.SSLError)
- def select_address_family(host: str, port: int) -> socket.AddressFamily:
- """Return ``AF_INET4``, ``AF_INET6``, or ``AF_UNIX`` depending on
- the host and port."""
- if host.startswith("unix://"):
- return socket.AF_UNIX
- elif ":" in host and hasattr(socket, "AF_INET6"):
- return socket.AF_INET6
- return socket.AF_INET
- def get_sockaddr(
- host: str, port: int, family: socket.AddressFamily
- ) -> t.Union[t.Tuple[str, int], str]:
- """Return a fully qualified socket address that can be passed to
- :func:`socket.bind`."""
- if family == af_unix:
- return host.split("://", 1)[1]
- try:
- res = socket.getaddrinfo(
- host, port, family, socket.SOCK_STREAM, socket.IPPROTO_TCP
- )
- except socket.gaierror:
- return host, port
- return res[0][4] # type: ignore
- def get_interface_ip(family: socket.AddressFamily) -> str:
- """Get the IP address of an external interface. Used when binding to
- 0.0.0.0 or ::1 to show a more useful URL.
- :meta private:
- """
- # arbitrary private address
- host = "fd31:f903:5ab5:1::1" if family == socket.AF_INET6 else "10.253.155.219"
- with socket.socket(family, socket.SOCK_DGRAM) as s:
- try:
- s.connect((host, 58162))
- except OSError:
- return "::1" if family == socket.AF_INET6 else "127.0.0.1"
- return s.getsockname()[0] # type: ignore
- class BaseWSGIServer(HTTPServer):
- """Simple single-threaded, single-process WSGI server."""
- multithread = False
- multiprocess = False
- request_queue_size = LISTEN_QUEUE
- def __init__(
- self,
- host: str,
- port: int,
- app: "WSGIApplication",
- handler: t.Optional[t.Type[WSGIRequestHandler]] = None,
- passthrough_errors: bool = False,
- ssl_context: t.Optional[_TSSLContextArg] = None,
- fd: t.Optional[int] = None,
- ) -> None:
- if handler is None:
- handler = WSGIRequestHandler
- self.address_family = select_address_family(host, port)
- if fd is not None:
- real_sock = socket.fromfd(fd, self.address_family, socket.SOCK_STREAM)
- port = 0
- server_address = get_sockaddr(host, int(port), self.address_family)
- # remove socket file if it already exists
- if self.address_family == af_unix:
- server_address = t.cast(str, server_address)
- if os.path.exists(server_address):
- os.unlink(server_address)
- super().__init__(server_address, handler) # type: ignore
- self.app = app
- self.passthrough_errors = passthrough_errors
- self.shutdown_signal = False
- self.host = host
- self.port = self.socket.getsockname()[1]
- # Patch in the original socket.
- if fd is not None:
- self.socket.close()
- self.socket = real_sock
- self.server_address = self.socket.getsockname()
- if ssl_context is not None:
- if isinstance(ssl_context, tuple):
- ssl_context = load_ssl_context(*ssl_context)
- if ssl_context == "adhoc":
- ssl_context = generate_adhoc_ssl_context()
- self.socket = ssl_context.wrap_socket(self.socket, server_side=True)
- self.ssl_context: t.Optional["ssl.SSLContext"] = ssl_context
- else:
- self.ssl_context = None
- def log(self, type: str, message: str, *args: t.Any) -> None:
- _log(type, message, *args)
- def serve_forever(self, poll_interval: float = 0.5) -> None:
- self.shutdown_signal = False
- try:
- super().serve_forever(poll_interval=poll_interval)
- except KeyboardInterrupt:
- pass
- finally:
- self.server_close()
- def handle_error(self, request: t.Any, client_address: t.Tuple[str, int]) -> None:
- if self.passthrough_errors:
- raise
- return super().handle_error(request, client_address)
- class ThreadedWSGIServer(socketserver.ThreadingMixIn, BaseWSGIServer):
- """A WSGI server that does threading."""
- multithread = True
- daemon_threads = True
- class ForkingWSGIServer(ForkingMixIn, BaseWSGIServer):
- """A WSGI server that does forking."""
- multiprocess = True
- def __init__(
- self,
- host: str,
- port: int,
- app: "WSGIApplication",
- processes: int = 40,
- handler: t.Optional[t.Type[WSGIRequestHandler]] = None,
- passthrough_errors: bool = False,
- ssl_context: t.Optional[_TSSLContextArg] = None,
- fd: t.Optional[int] = None,
- ) -> None:
- if not can_fork:
- raise ValueError("Your platform does not support forking.")
- BaseWSGIServer.__init__(
- self, host, port, app, handler, passthrough_errors, ssl_context, fd
- )
- self.max_children = processes
- def make_server(
- host: str,
- port: int,
- app: "WSGIApplication",
- threaded: bool = False,
- processes: int = 1,
- request_handler: t.Optional[t.Type[WSGIRequestHandler]] = None,
- passthrough_errors: bool = False,
- ssl_context: t.Optional[_TSSLContextArg] = None,
- fd: t.Optional[int] = None,
- ) -> BaseWSGIServer:
- """Create a new server instance that is either threaded, or forks
- or just processes one request after another.
- """
- if threaded and processes > 1:
- raise ValueError("cannot have a multithreaded and multi process server.")
- elif threaded:
- return ThreadedWSGIServer(
- host, port, app, request_handler, passthrough_errors, ssl_context, fd=fd
- )
- elif processes > 1:
- return ForkingWSGIServer(
- host,
- port,
- app,
- processes,
- request_handler,
- passthrough_errors,
- ssl_context,
- fd=fd,
- )
- else:
- return BaseWSGIServer(
- host, port, app, request_handler, passthrough_errors, ssl_context, fd=fd
- )
- def is_running_from_reloader() -> bool:
- """Checks if the application is running from within the Werkzeug
- reloader subprocess.
- .. versionadded:: 0.10
- """
- return os.environ.get("WERKZEUG_RUN_MAIN") == "true"
- def run_simple(
- hostname: str,
- port: int,
- application: "WSGIApplication",
- use_reloader: bool = False,
- use_debugger: bool = False,
- use_evalex: bool = True,
- extra_files: t.Optional[t.Iterable[str]] = None,
- exclude_patterns: t.Optional[t.Iterable[str]] = None,
- reloader_interval: int = 1,
- reloader_type: str = "auto",
- threaded: bool = False,
- processes: int = 1,
- request_handler: t.Optional[t.Type[WSGIRequestHandler]] = None,
- static_files: t.Optional[t.Dict[str, t.Union[str, t.Tuple[str, str]]]] = None,
- passthrough_errors: bool = False,
- ssl_context: t.Optional[_TSSLContextArg] = None,
- ) -> None:
- """Start a WSGI application. Optional features include a reloader,
- multithreading and fork support.
- This function has a command-line interface too::
- python -m werkzeug.serving --help
- .. versionchanged:: 2.0
- Added ``exclude_patterns`` parameter.
- .. versionadded:: 0.5
- `static_files` was added to simplify serving of static files as well
- as `passthrough_errors`.
- .. versionadded:: 0.6
- support for SSL was added.
- .. versionadded:: 0.8
- Added support for automatically loading a SSL context from certificate
- file and private key.
- .. versionadded:: 0.9
- Added command-line interface.
- .. versionadded:: 0.10
- Improved the reloader and added support for changing the backend
- through the `reloader_type` parameter. See :ref:`reloader`
- for more information.
- .. versionchanged:: 0.15
- Bind to a Unix socket by passing a path that starts with
- ``unix://`` as the ``hostname``.
- :param hostname: The host to bind to, for example ``'localhost'``.
- If the value is a path that starts with ``unix://`` it will bind
- to a Unix socket instead of a TCP socket..
- :param port: The port for the server. eg: ``8080``
- :param application: the WSGI application to execute
- :param use_reloader: should the server automatically restart the python
- process if modules were changed?
- :param use_debugger: should the werkzeug debugging system be used?
- :param use_evalex: should the exception evaluation feature be enabled?
- :param extra_files: a list of files the reloader should watch
- additionally to the modules. For example configuration
- files.
- :param exclude_patterns: List of :mod:`fnmatch` patterns to ignore
- when running the reloader. For example, ignore cache files that
- shouldn't reload when updated.
- :param reloader_interval: the interval for the reloader in seconds.
- :param reloader_type: the type of reloader to use. The default is
- auto detection. Valid values are ``'stat'`` and
- ``'watchdog'``. See :ref:`reloader` for more
- information.
- :param threaded: should the process handle each request in a separate
- thread?
- :param processes: if greater than 1 then handle each request in a new process
- up to this maximum number of concurrent processes.
- :param request_handler: optional parameter that can be used to replace
- the default one. You can use this to replace it
- with a different
- :class:`~BaseHTTPServer.BaseHTTPRequestHandler`
- subclass.
- :param static_files: a list or dict of paths for static files. This works
- exactly like :class:`SharedDataMiddleware`, it's actually
- just wrapping the application in that middleware before
- serving.
- :param passthrough_errors: set this to `True` to disable the error catching.
- This means that the server will die on errors but
- it can be useful to hook debuggers in (pdb etc.)
- :param ssl_context: an SSL context for the connection. Either an
- :class:`ssl.SSLContext`, a tuple in the form
- ``(cert_file, pkey_file)``, the string ``'adhoc'`` if
- the server should automatically create one, or ``None``
- to disable SSL (which is the default).
- """
- if not isinstance(port, int):
- raise TypeError("port must be an integer")
- if use_debugger:
- from .debug import DebuggedApplication
- application = DebuggedApplication(application, use_evalex)
- if static_files:
- from .middleware.shared_data import SharedDataMiddleware
- application = SharedDataMiddleware(application, static_files)
- def log_startup(sock: socket.socket) -> None:
- all_addresses_message = (
- " * Running on all addresses.\n"
- " WARNING: This is a development server. Do not use it in"
- " a production deployment."
- )
- if sock.family == af_unix:
- _log("info", " * Running on %s (Press CTRL+C to quit)", hostname)
- else:
- if hostname == "0.0.0.0":
- _log("warning", all_addresses_message)
- display_hostname = get_interface_ip(socket.AF_INET)
- elif hostname == "::":
- _log("warning", all_addresses_message)
- display_hostname = get_interface_ip(socket.AF_INET6)
- else:
- display_hostname = hostname
- if ":" in display_hostname:
- display_hostname = f"[{display_hostname}]"
- _log(
- "info",
- " * Running on %s://%s:%d/ (Press CTRL+C to quit)",
- "http" if ssl_context is None else "https",
- display_hostname,
- sock.getsockname()[1],
- )
- def inner() -> None:
- try:
- fd: t.Optional[int] = int(os.environ["WERKZEUG_SERVER_FD"])
- except (LookupError, ValueError):
- fd = None
- srv = make_server(
- hostname,
- port,
- application,
- threaded,
- processes,
- request_handler,
- passthrough_errors,
- ssl_context,
- fd=fd,
- )
- if fd is None:
- log_startup(srv.socket)
- srv.serve_forever()
- if use_reloader:
- # If we're not running already in the subprocess that is the
- # reloader we want to open up a socket early to make sure the
- # port is actually available.
- if not is_running_from_reloader():
- if port == 0 and not can_open_by_fd:
- raise ValueError(
- "Cannot bind to a random port with enabled "
- "reloader if the Python interpreter does "
- "not support socket opening by fd."
- )
- # Create and destroy a socket so that any exceptions are
- # raised before we spawn a separate Python interpreter and
- # lose this ability.
- address_family = select_address_family(hostname, port)
- server_address = get_sockaddr(hostname, port, address_family)
- s = socket.socket(address_family, socket.SOCK_STREAM)
- s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- s.bind(server_address)
- s.set_inheritable(True)
- # If we can open the socket by file descriptor, then we can just
- # reuse this one and our socket will survive the restarts.
- if can_open_by_fd:
- os.environ["WERKZEUG_SERVER_FD"] = str(s.fileno())
- s.listen(LISTEN_QUEUE)
- log_startup(s)
- else:
- s.close()
- if address_family == af_unix:
- server_address = t.cast(str, server_address)
- _log("info", "Unlinking %s", server_address)
- os.unlink(server_address)
- from ._reloader import run_with_reloader as _rwr
- _rwr(
- inner,
- extra_files=extra_files,
- exclude_patterns=exclude_patterns,
- interval=reloader_interval,
- reloader_type=reloader_type,
- )
- else:
- inner()
- def run_with_reloader(*args: t.Any, **kwargs: t.Any) -> None:
- """Run a process with the reloader. This is not a public API, do
- not use this function.
- .. deprecated:: 2.0
- Will be removed in Werkzeug 2.1.
- """
- from ._reloader import run_with_reloader as _rwr
- warnings.warn(
- (
- "'run_with_reloader' is a private API, it will no longer be"
- " accessible in Werkzeug 2.1. Use 'run_simple' instead."
- ),
- DeprecationWarning,
- stacklevel=2,
- )
- _rwr(*args, **kwargs)
- def main() -> None:
- """A simple command-line interface for :py:func:`run_simple`."""
- import argparse
- from .utils import import_string
- _log("warning", "This CLI is deprecated and will be removed in version 2.1.")
- parser = argparse.ArgumentParser(
- description="Run the given WSGI application with the development server.",
- allow_abbrev=False,
- )
- parser.add_argument(
- "-b",
- "--bind",
- dest="address",
- help="The hostname:port the app should listen on.",
- )
- parser.add_argument(
- "-d",
- "--debug",
- action="store_true",
- help="Show the interactive debugger for unhandled exceptions.",
- )
- parser.add_argument(
- "-r",
- "--reload",
- action="store_true",
- help="Reload the process if modules change.",
- )
- parser.add_argument(
- "application", help="Application to import and serve, in the form module:app."
- )
- args = parser.parse_args()
- hostname, port = None, None
- if args.address:
- hostname, _, port = args.address.partition(":")
- run_simple(
- hostname=hostname or "127.0.0.1",
- port=int(port or 5000),
- application=import_string(args.application),
- use_reloader=args.reload,
- use_debugger=args.debug,
- )
- if __name__ == "__main__":
- main()
|