jws.py 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259
  1. import hashlib
  2. import time
  3. import warnings
  4. from datetime import datetime
  5. from datetime import timezone
  6. from decimal import Decimal
  7. from numbers import Real
  8. from ._json import _CompactJSON
  9. from .encoding import base64_decode
  10. from .encoding import base64_encode
  11. from .encoding import want_bytes
  12. from .exc import BadData
  13. from .exc import BadHeader
  14. from .exc import BadPayload
  15. from .exc import BadSignature
  16. from .exc import SignatureExpired
  17. from .serializer import Serializer
  18. from .signer import HMACAlgorithm
  19. from .signer import NoneAlgorithm
  20. class JSONWebSignatureSerializer(Serializer):
  21. """This serializer implements JSON Web Signature (JWS) support. Only
  22. supports the JWS Compact Serialization.
  23. .. deprecated:: 2.0
  24. Will be removed in ItsDangerous 2.1. Use a dedicated library
  25. such as authlib.
  26. """
  27. jws_algorithms = {
  28. "HS256": HMACAlgorithm(hashlib.sha256),
  29. "HS384": HMACAlgorithm(hashlib.sha384),
  30. "HS512": HMACAlgorithm(hashlib.sha512),
  31. "none": NoneAlgorithm(),
  32. }
  33. #: The default algorithm to use for signature generation
  34. default_algorithm = "HS512"
  35. default_serializer = _CompactJSON
  36. def __init__(
  37. self,
  38. secret_key,
  39. salt=None,
  40. serializer=None,
  41. serializer_kwargs=None,
  42. signer=None,
  43. signer_kwargs=None,
  44. algorithm_name=None,
  45. ):
  46. warnings.warn(
  47. "JWS support is deprecated and will be removed in"
  48. " ItsDangerous 2.1. Use a dedicated JWS/JWT library such as"
  49. " authlib.",
  50. DeprecationWarning,
  51. stacklevel=2,
  52. )
  53. super().__init__(
  54. secret_key,
  55. salt=salt,
  56. serializer=serializer,
  57. serializer_kwargs=serializer_kwargs,
  58. signer=signer,
  59. signer_kwargs=signer_kwargs,
  60. )
  61. if algorithm_name is None:
  62. algorithm_name = self.default_algorithm
  63. self.algorithm_name = algorithm_name
  64. self.algorithm = self.make_algorithm(algorithm_name)
  65. def load_payload(self, payload, serializer=None, return_header=False):
  66. payload = want_bytes(payload)
  67. if b"." not in payload:
  68. raise BadPayload('No "." found in value')
  69. base64d_header, base64d_payload = payload.split(b".", 1)
  70. try:
  71. json_header = base64_decode(base64d_header)
  72. except Exception as e:
  73. raise BadHeader(
  74. "Could not base64 decode the header because of an exception",
  75. original_error=e,
  76. )
  77. try:
  78. json_payload = base64_decode(base64d_payload)
  79. except Exception as e:
  80. raise BadPayload(
  81. "Could not base64 decode the payload because of an exception",
  82. original_error=e,
  83. )
  84. try:
  85. header = super().load_payload(json_header, serializer=_CompactJSON)
  86. except BadData as e:
  87. raise BadHeader(
  88. "Could not unserialize header because it was malformed",
  89. original_error=e,
  90. )
  91. if not isinstance(header, dict):
  92. raise BadHeader("Header payload is not a JSON object", header=header)
  93. payload = super().load_payload(json_payload, serializer=serializer)
  94. if return_header:
  95. return payload, header
  96. return payload
  97. def dump_payload(self, header, obj):
  98. base64d_header = base64_encode(
  99. self.serializer.dumps(header, **self.serializer_kwargs)
  100. )
  101. base64d_payload = base64_encode(
  102. self.serializer.dumps(obj, **self.serializer_kwargs)
  103. )
  104. return base64d_header + b"." + base64d_payload
  105. def make_algorithm(self, algorithm_name):
  106. try:
  107. return self.jws_algorithms[algorithm_name]
  108. except KeyError:
  109. raise NotImplementedError("Algorithm not supported")
  110. def make_signer(self, salt=None, algorithm=None):
  111. if salt is None:
  112. salt = self.salt
  113. key_derivation = "none" if salt is None else None
  114. if algorithm is None:
  115. algorithm = self.algorithm
  116. return self.signer(
  117. self.secret_keys,
  118. salt=salt,
  119. sep=".",
  120. key_derivation=key_derivation,
  121. algorithm=algorithm,
  122. )
  123. def make_header(self, header_fields):
  124. header = header_fields.copy() if header_fields else {}
  125. header["alg"] = self.algorithm_name
  126. return header
  127. def dumps(self, obj, salt=None, header_fields=None):
  128. """Like :meth:`.Serializer.dumps` but creates a JSON Web
  129. Signature. It also allows for specifying additional fields to be
  130. included in the JWS header.
  131. """
  132. header = self.make_header(header_fields)
  133. signer = self.make_signer(salt, self.algorithm)
  134. return signer.sign(self.dump_payload(header, obj))
  135. def loads(self, s, salt=None, return_header=False):
  136. """Reverse of :meth:`dumps`. If requested via ``return_header``
  137. it will return a tuple of payload and header.
  138. """
  139. payload, header = self.load_payload(
  140. self.make_signer(salt, self.algorithm).unsign(want_bytes(s)),
  141. return_header=True,
  142. )
  143. if header.get("alg") != self.algorithm_name:
  144. raise BadHeader("Algorithm mismatch", header=header, payload=payload)
  145. if return_header:
  146. return payload, header
  147. return payload
  148. def loads_unsafe(self, s, salt=None, return_header=False):
  149. kwargs = {"return_header": return_header}
  150. return self._loads_unsafe_impl(s, salt, kwargs, kwargs)
  151. class TimedJSONWebSignatureSerializer(JSONWebSignatureSerializer):
  152. """Works like the regular :class:`JSONWebSignatureSerializer` but
  153. also records the time of the signing and can be used to expire
  154. signatures.
  155. JWS currently does not specify this behavior but it mentions a
  156. possible extension like this in the spec. Expiry date is encoded
  157. into the header similar to what's specified in `draft-ietf-oauth
  158. -json-web-token <http://self-issued.info/docs/draft-ietf-oauth-json
  159. -web-token.html#expDef>`_.
  160. """
  161. DEFAULT_EXPIRES_IN = 3600
  162. def __init__(self, secret_key, expires_in=None, **kwargs):
  163. super().__init__(secret_key, **kwargs)
  164. if expires_in is None:
  165. expires_in = self.DEFAULT_EXPIRES_IN
  166. self.expires_in = expires_in
  167. def make_header(self, header_fields):
  168. header = super().make_header(header_fields)
  169. iat = self.now()
  170. exp = iat + self.expires_in
  171. header["iat"] = iat
  172. header["exp"] = exp
  173. return header
  174. def loads(self, s, salt=None, return_header=False):
  175. payload, header = super().loads(s, salt, return_header=True)
  176. if "exp" not in header:
  177. raise BadSignature("Missing expiry date", payload=payload)
  178. int_date_error = BadHeader("Expiry date is not an IntDate", payload=payload)
  179. try:
  180. header["exp"] = int(header["exp"])
  181. except ValueError:
  182. raise int_date_error
  183. if header["exp"] < 0:
  184. raise int_date_error
  185. if header["exp"] < self.now():
  186. raise SignatureExpired(
  187. "Signature expired",
  188. payload=payload,
  189. date_signed=self.get_issue_date(header),
  190. )
  191. if return_header:
  192. return payload, header
  193. return payload
  194. def get_issue_date(self, header):
  195. """If the header contains the ``iat`` field, return the date the
  196. signature was issued, as a timezone-aware
  197. :class:`datetime.datetime` in UTC.
  198. .. versionchanged:: 2.0
  199. The timestamp is returned as a timezone-aware ``datetime``
  200. in UTC rather than a naive ``datetime`` assumed to be UTC.
  201. """
  202. rv = header.get("iat")
  203. if isinstance(rv, (Real, Decimal)):
  204. return datetime.fromtimestamp(int(rv), tz=timezone.utc)
  205. def now(self):
  206. return int(time.time())