timed.py 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227
  1. import time
  2. import typing
  3. import typing as _t
  4. from datetime import datetime
  5. from datetime import timezone
  6. from .encoding import base64_decode
  7. from .encoding import base64_encode
  8. from .encoding import bytes_to_int
  9. from .encoding import int_to_bytes
  10. from .encoding import want_bytes
  11. from .exc import BadSignature
  12. from .exc import BadTimeSignature
  13. from .exc import SignatureExpired
  14. from .serializer import Serializer
  15. from .signer import Signer
  16. _t_str_bytes = _t.Union[str, bytes]
  17. _t_opt_str_bytes = _t.Optional[_t_str_bytes]
  18. _t_opt_int = _t.Optional[int]
  19. if _t.TYPE_CHECKING:
  20. import typing_extensions as _te
  21. class TimestampSigner(Signer):
  22. """Works like the regular :class:`.Signer` but also records the time
  23. of the signing and can be used to expire signatures. The
  24. :meth:`unsign` method can raise :exc:`.SignatureExpired` if the
  25. unsigning failed because the signature is expired.
  26. """
  27. def get_timestamp(self) -> int:
  28. """Returns the current timestamp. The function must return an
  29. integer.
  30. """
  31. return int(time.time())
  32. def timestamp_to_datetime(self, ts: int) -> datetime:
  33. """Convert the timestamp from :meth:`get_timestamp` into an
  34. aware :class`datetime.datetime` in UTC.
  35. .. versionchanged:: 2.0
  36. The timestamp is returned as a timezone-aware ``datetime``
  37. in UTC rather than a naive ``datetime`` assumed to be UTC.
  38. """
  39. return datetime.fromtimestamp(ts, tz=timezone.utc)
  40. def sign(self, value: _t_str_bytes) -> bytes:
  41. """Signs the given string and also attaches time information."""
  42. value = want_bytes(value)
  43. timestamp = base64_encode(int_to_bytes(self.get_timestamp()))
  44. sep = want_bytes(self.sep)
  45. value = value + sep + timestamp
  46. return value + sep + self.get_signature(value)
  47. # Ignore overlapping signatures check, return_timestamp is the only
  48. # parameter that affects the return type.
  49. @typing.overload
  50. def unsign( # type: ignore
  51. self,
  52. signed_value: _t_str_bytes,
  53. max_age: _t_opt_int = None,
  54. return_timestamp: "_te.Literal[False]" = False,
  55. ) -> bytes:
  56. ...
  57. @typing.overload
  58. def unsign(
  59. self,
  60. signed_value: _t_str_bytes,
  61. max_age: _t_opt_int = None,
  62. return_timestamp: "_te.Literal[True]" = True,
  63. ) -> _t.Tuple[bytes, datetime]:
  64. ...
  65. def unsign(
  66. self,
  67. signed_value: _t_str_bytes,
  68. max_age: _t_opt_int = None,
  69. return_timestamp: bool = False,
  70. ) -> _t.Union[_t.Tuple[bytes, datetime], bytes]:
  71. """Works like the regular :meth:`.Signer.unsign` but can also
  72. validate the time. See the base docstring of the class for
  73. the general behavior. If ``return_timestamp`` is ``True`` the
  74. timestamp of the signature will be returned as an aware
  75. :class:`datetime.datetime` object in UTC.
  76. .. versionchanged:: 2.0
  77. The timestamp is returned as a timezone-aware ``datetime``
  78. in UTC rather than a naive ``datetime`` assumed to be UTC.
  79. """
  80. try:
  81. result = super().unsign(signed_value)
  82. sig_error = None
  83. except BadSignature as e:
  84. sig_error = e
  85. result = e.payload or b""
  86. sep = want_bytes(self.sep)
  87. # If there is no timestamp in the result there is something
  88. # seriously wrong. In case there was a signature error, we raise
  89. # that one directly, otherwise we have a weird situation in
  90. # which we shouldn't have come except someone uses a time-based
  91. # serializer on non-timestamp data, so catch that.
  92. if sep not in result:
  93. if sig_error:
  94. raise sig_error
  95. raise BadTimeSignature("timestamp missing", payload=result)
  96. value, ts_bytes = result.rsplit(sep, 1)
  97. ts_int: _t_opt_int = None
  98. ts_dt: _t.Optional[datetime] = None
  99. try:
  100. ts_int = bytes_to_int(base64_decode(ts_bytes))
  101. except Exception:
  102. pass
  103. # Signature is *not* okay. Raise a proper error now that we have
  104. # split the value and the timestamp.
  105. if sig_error is not None:
  106. if ts_int is not None:
  107. ts_dt = self.timestamp_to_datetime(ts_int)
  108. raise BadTimeSignature(str(sig_error), payload=value, date_signed=ts_dt)
  109. # Signature was okay but the timestamp is actually not there or
  110. # malformed. Should not happen, but we handle it anyway.
  111. if ts_int is None:
  112. raise BadTimeSignature("Malformed timestamp", payload=value)
  113. # Check timestamp is not older than max_age
  114. if max_age is not None:
  115. age = self.get_timestamp() - ts_int
  116. if age > max_age:
  117. raise SignatureExpired(
  118. f"Signature age {age} > {max_age} seconds",
  119. payload=value,
  120. date_signed=self.timestamp_to_datetime(ts_int),
  121. )
  122. if age < 0:
  123. raise SignatureExpired(
  124. f"Signature age {age} < 0 seconds",
  125. payload=value,
  126. date_signed=self.timestamp_to_datetime(ts_int),
  127. )
  128. if return_timestamp:
  129. return value, self.timestamp_to_datetime(ts_int)
  130. return value
  131. def validate(self, signed_value: _t_str_bytes, max_age: _t_opt_int = None) -> bool:
  132. """Only validates the given signed value. Returns ``True`` if
  133. the signature exists and is valid."""
  134. try:
  135. self.unsign(signed_value, max_age=max_age)
  136. return True
  137. except BadSignature:
  138. return False
  139. class TimedSerializer(Serializer):
  140. """Uses :class:`TimestampSigner` instead of the default
  141. :class:`.Signer`.
  142. """
  143. default_signer: _t.Type[TimestampSigner] = TimestampSigner
  144. def iter_unsigners(
  145. self, salt: _t_opt_str_bytes = None
  146. ) -> _t.Iterator[TimestampSigner]:
  147. return _t.cast("_t.Iterator[TimestampSigner]", super().iter_unsigners(salt))
  148. # TODO: Signature is incompatible because parameters were added
  149. # before salt.
  150. def loads( # type: ignore
  151. self,
  152. s: _t_str_bytes,
  153. max_age: _t_opt_int = None,
  154. return_timestamp: bool = False,
  155. salt: _t_opt_str_bytes = None,
  156. ) -> _t.Any:
  157. """Reverse of :meth:`dumps`, raises :exc:`.BadSignature` if the
  158. signature validation fails. If a ``max_age`` is provided it will
  159. ensure the signature is not older than that time in seconds. In
  160. case the signature is outdated, :exc:`.SignatureExpired` is
  161. raised. All arguments are forwarded to the signer's
  162. :meth:`~TimestampSigner.unsign` method.
  163. """
  164. s = want_bytes(s)
  165. last_exception = None
  166. for signer in self.iter_unsigners(salt):
  167. try:
  168. base64d, timestamp = signer.unsign(
  169. s, max_age=max_age, return_timestamp=True
  170. )
  171. payload = self.load_payload(base64d)
  172. if return_timestamp:
  173. return payload, timestamp
  174. return payload
  175. except SignatureExpired:
  176. # The signature was unsigned successfully but was
  177. # expired. Do not try the next signer.
  178. raise
  179. except BadSignature as err:
  180. last_exception = err
  181. raise _t.cast(BadSignature, last_exception)
  182. def loads_unsafe( # type: ignore
  183. self,
  184. s: _t_str_bytes,
  185. max_age: _t_opt_int = None,
  186. salt: _t_opt_str_bytes = None,
  187. ) -> _t.Tuple[bool, _t.Any]:
  188. return self._loads_unsafe_impl(s, salt, load_kwargs={"max_age": max_age})