direct_url.py 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220
  1. """ PEP 610 """
  2. import json
  3. import re
  4. import urllib.parse
  5. from typing import Any, Dict, Iterable, Optional, Type, TypeVar, Union
  6. __all__ = [
  7. "DirectUrl",
  8. "DirectUrlValidationError",
  9. "DirInfo",
  10. "ArchiveInfo",
  11. "VcsInfo",
  12. ]
  13. T = TypeVar("T")
  14. DIRECT_URL_METADATA_NAME = "direct_url.json"
  15. ENV_VAR_RE = re.compile(r"^\$\{[A-Za-z0-9-_]+\}(:\$\{[A-Za-z0-9-_]+\})?$")
  16. class DirectUrlValidationError(Exception):
  17. pass
  18. def _get(
  19. d: Dict[str, Any], expected_type: Type[T], key: str, default: Optional[T] = None
  20. ) -> Optional[T]:
  21. """Get value from dictionary and verify expected type."""
  22. if key not in d:
  23. return default
  24. value = d[key]
  25. if not isinstance(value, expected_type):
  26. raise DirectUrlValidationError(
  27. "{!r} has unexpected type for {} (expected {})".format(
  28. value, key, expected_type
  29. )
  30. )
  31. return value
  32. def _get_required(
  33. d: Dict[str, Any], expected_type: Type[T], key: str, default: Optional[T] = None
  34. ) -> T:
  35. value = _get(d, expected_type, key, default)
  36. if value is None:
  37. raise DirectUrlValidationError(f"{key} must have a value")
  38. return value
  39. def _exactly_one_of(infos: Iterable[Optional["InfoType"]]) -> "InfoType":
  40. infos = [info for info in infos if info is not None]
  41. if not infos:
  42. raise DirectUrlValidationError(
  43. "missing one of archive_info, dir_info, vcs_info"
  44. )
  45. if len(infos) > 1:
  46. raise DirectUrlValidationError(
  47. "more than one of archive_info, dir_info, vcs_info"
  48. )
  49. assert infos[0] is not None
  50. return infos[0]
  51. def _filter_none(**kwargs: Any) -> Dict[str, Any]:
  52. """Make dict excluding None values."""
  53. return {k: v for k, v in kwargs.items() if v is not None}
  54. class VcsInfo:
  55. name = "vcs_info"
  56. def __init__(
  57. self,
  58. vcs: str,
  59. commit_id: str,
  60. requested_revision: Optional[str] = None,
  61. resolved_revision: Optional[str] = None,
  62. resolved_revision_type: Optional[str] = None,
  63. ) -> None:
  64. self.vcs = vcs
  65. self.requested_revision = requested_revision
  66. self.commit_id = commit_id
  67. self.resolved_revision = resolved_revision
  68. self.resolved_revision_type = resolved_revision_type
  69. @classmethod
  70. def _from_dict(cls, d: Optional[Dict[str, Any]]) -> Optional["VcsInfo"]:
  71. if d is None:
  72. return None
  73. return cls(
  74. vcs=_get_required(d, str, "vcs"),
  75. commit_id=_get_required(d, str, "commit_id"),
  76. requested_revision=_get(d, str, "requested_revision"),
  77. resolved_revision=_get(d, str, "resolved_revision"),
  78. resolved_revision_type=_get(d, str, "resolved_revision_type"),
  79. )
  80. def _to_dict(self) -> Dict[str, Any]:
  81. return _filter_none(
  82. vcs=self.vcs,
  83. requested_revision=self.requested_revision,
  84. commit_id=self.commit_id,
  85. resolved_revision=self.resolved_revision,
  86. resolved_revision_type=self.resolved_revision_type,
  87. )
  88. class ArchiveInfo:
  89. name = "archive_info"
  90. def __init__(
  91. self,
  92. hash: Optional[str] = None,
  93. ) -> None:
  94. self.hash = hash
  95. @classmethod
  96. def _from_dict(cls, d: Optional[Dict[str, Any]]) -> Optional["ArchiveInfo"]:
  97. if d is None:
  98. return None
  99. return cls(hash=_get(d, str, "hash"))
  100. def _to_dict(self) -> Dict[str, Any]:
  101. return _filter_none(hash=self.hash)
  102. class DirInfo:
  103. name = "dir_info"
  104. def __init__(
  105. self,
  106. editable: bool = False,
  107. ) -> None:
  108. self.editable = editable
  109. @classmethod
  110. def _from_dict(cls, d: Optional[Dict[str, Any]]) -> Optional["DirInfo"]:
  111. if d is None:
  112. return None
  113. return cls(
  114. editable=_get_required(d, bool, "editable", default=False)
  115. )
  116. def _to_dict(self) -> Dict[str, Any]:
  117. return _filter_none(editable=self.editable or None)
  118. InfoType = Union[ArchiveInfo, DirInfo, VcsInfo]
  119. class DirectUrl:
  120. def __init__(
  121. self,
  122. url: str,
  123. info: InfoType,
  124. subdirectory: Optional[str] = None,
  125. ) -> None:
  126. self.url = url
  127. self.info = info
  128. self.subdirectory = subdirectory
  129. def _remove_auth_from_netloc(self, netloc: str) -> str:
  130. if "@" not in netloc:
  131. return netloc
  132. user_pass, netloc_no_user_pass = netloc.split("@", 1)
  133. if (
  134. isinstance(self.info, VcsInfo) and
  135. self.info.vcs == "git" and
  136. user_pass == "git"
  137. ):
  138. return netloc
  139. if ENV_VAR_RE.match(user_pass):
  140. return netloc
  141. return netloc_no_user_pass
  142. @property
  143. def redacted_url(self) -> str:
  144. """url with user:password part removed unless it is formed with
  145. environment variables as specified in PEP 610, or it is ``git``
  146. in the case of a git URL.
  147. """
  148. purl = urllib.parse.urlsplit(self.url)
  149. netloc = self._remove_auth_from_netloc(purl.netloc)
  150. surl = urllib.parse.urlunsplit(
  151. (purl.scheme, netloc, purl.path, purl.query, purl.fragment)
  152. )
  153. return surl
  154. def validate(self) -> None:
  155. self.from_dict(self.to_dict())
  156. @classmethod
  157. def from_dict(cls, d: Dict[str, Any]) -> "DirectUrl":
  158. return DirectUrl(
  159. url=_get_required(d, str, "url"),
  160. subdirectory=_get(d, str, "subdirectory"),
  161. info=_exactly_one_of(
  162. [
  163. ArchiveInfo._from_dict(_get(d, dict, "archive_info")),
  164. DirInfo._from_dict(_get(d, dict, "dir_info")),
  165. VcsInfo._from_dict(_get(d, dict, "vcs_info")),
  166. ]
  167. ),
  168. )
  169. def to_dict(self) -> Dict[str, Any]:
  170. res = _filter_none(
  171. url=self.redacted_url,
  172. subdirectory=self.subdirectory,
  173. )
  174. res[self.info.name] = self.info._to_dict()
  175. return res
  176. @classmethod
  177. def from_json(cls, s: str) -> "DirectUrl":
  178. return cls.from_dict(json.loads(s))
  179. def to_json(self) -> str:
  180. return json.dumps(self.to_dict(), sort_keys=True)