url.py 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812
  1. # engine/url.py
  2. # Copyright (C) 2005-2022 the SQLAlchemy authors and contributors
  3. # <see AUTHORS file>
  4. #
  5. # This module is part of SQLAlchemy and is released under
  6. # the MIT License: https://www.opensource.org/licenses/mit-license.php
  7. """Provides the :class:`~sqlalchemy.engine.url.URL` class which encapsulates
  8. information about a database connection specification.
  9. The URL object is created automatically when
  10. :func:`~sqlalchemy.engine.create_engine` is called with a string
  11. argument; alternatively, the URL is a public-facing construct which can
  12. be used directly and is also accepted directly by ``create_engine()``.
  13. """
  14. import re
  15. from .interfaces import Dialect
  16. from .. import exc
  17. from .. import util
  18. from ..dialects import plugins
  19. from ..dialects import registry
  20. from ..util import collections_abc
  21. from ..util import compat
  22. class URL(
  23. util.namedtuple(
  24. "URL",
  25. [
  26. "drivername",
  27. "username",
  28. "password",
  29. "host",
  30. "port",
  31. "database",
  32. "query",
  33. ],
  34. )
  35. ):
  36. """
  37. Represent the components of a URL used to connect to a database.
  38. This object is suitable to be passed directly to a
  39. :func:`_sa.create_engine` call. The fields of the URL are parsed
  40. from a string by the :func:`.make_url` function. The string
  41. format of the URL is an RFC-1738-style string.
  42. To create a new :class:`_engine.URL` object, use the
  43. :func:`_engine.url.make_url` function. To construct a :class:`_engine.URL`
  44. programmatically, use the :meth:`_engine.URL.create` constructor.
  45. .. versionchanged:: 1.4
  46. The :class:`_engine.URL` object is now an immutable object. To
  47. create a URL, use the :func:`_engine.make_url` or
  48. :meth:`_engine.URL.create` function / method. To modify
  49. a :class:`_engine.URL`, use methods like
  50. :meth:`_engine.URL.set` and
  51. :meth:`_engine.URL.update_query_dict` to return a new
  52. :class:`_engine.URL` object with modifications. See notes for this
  53. change at :ref:`change_5526`.
  54. :class:`_engine.URL` contains the following attributes:
  55. * :attr:`_engine.URL.drivername`: database backend and driver name, such as
  56. ``postgresql+psycopg2``
  57. * :attr:`_engine.URL.username`: username string
  58. * :attr:`_engine.URL.password`: password string, or object that includes
  59. a ``__str__()`` method that produces a password.
  60. .. note:: A password-producing object will be stringified only
  61. **once** per :class:`_engine.Engine` object. For dynamic password
  62. generation per connect, see :ref:`engines_dynamic_tokens`.
  63. * :attr:`_engine.URL.host`: string hostname
  64. * :attr:`_engine.URL.port`: integer port number
  65. * :attr:`_engine.URL.database`: string database name
  66. * :attr:`_engine.URL.query`: an immutable mapping representing the query
  67. string. contains strings for keys and either strings or tuples of
  68. strings for values.
  69. """
  70. def __new__(self, *arg, **kw):
  71. if kw.pop("_new_ok", False):
  72. return super(URL, self).__new__(self, *arg, **kw)
  73. else:
  74. util.warn_deprecated(
  75. "Calling URL() directly is deprecated and will be disabled "
  76. "in a future release. The public constructor for URL is "
  77. "now the URL.create() method.",
  78. "1.4",
  79. )
  80. return URL.create(*arg, **kw)
  81. @classmethod
  82. def create(
  83. cls,
  84. drivername,
  85. username=None,
  86. password=None,
  87. host=None,
  88. port=None,
  89. database=None,
  90. query=util.EMPTY_DICT,
  91. ):
  92. """Create a new :class:`_engine.URL` object.
  93. :param drivername: the name of the database backend. This name will
  94. correspond to a module in sqlalchemy/databases or a third party
  95. plug-in.
  96. :param username: The user name.
  97. :param password: database password. Is typically a string, but may
  98. also be an object that can be stringified with ``str()``.
  99. .. note:: A password-producing object will be stringified only
  100. **once** per :class:`_engine.Engine` object. For dynamic password
  101. generation per connect, see :ref:`engines_dynamic_tokens`.
  102. :param host: The name of the host.
  103. :param port: The port number.
  104. :param database: The database name.
  105. :param query: A dictionary of string keys to string values to be passed
  106. to the dialect and/or the DBAPI upon connect. To specify non-string
  107. parameters to a Python DBAPI directly, use the
  108. :paramref:`_sa.create_engine.connect_args` parameter to
  109. :func:`_sa.create_engine`. See also
  110. :attr:`_engine.URL.normalized_query` for a dictionary that is
  111. consistently string->list of string.
  112. :return: new :class:`_engine.URL` object.
  113. .. versionadded:: 1.4
  114. The :class:`_engine.URL` object is now an **immutable named
  115. tuple**. In addition, the ``query`` dictionary is also immutable.
  116. To create a URL, use the :func:`_engine.url.make_url` or
  117. :meth:`_engine.URL.create` function/ method. To modify a
  118. :class:`_engine.URL`, use the :meth:`_engine.URL.set` and
  119. :meth:`_engine.URL.update_query` methods.
  120. """
  121. return cls(
  122. cls._assert_str(drivername, "drivername"),
  123. cls._assert_none_str(username, "username"),
  124. password,
  125. cls._assert_none_str(host, "host"),
  126. cls._assert_port(port),
  127. cls._assert_none_str(database, "database"),
  128. cls._str_dict(query),
  129. _new_ok=True,
  130. )
  131. @classmethod
  132. def _assert_port(cls, port):
  133. if port is None:
  134. return None
  135. try:
  136. return int(port)
  137. except TypeError:
  138. raise TypeError("Port argument must be an integer or None")
  139. @classmethod
  140. def _assert_str(cls, v, paramname):
  141. if not isinstance(v, compat.string_types):
  142. raise TypeError("%s must be a string" % paramname)
  143. return v
  144. @classmethod
  145. def _assert_none_str(cls, v, paramname):
  146. if v is None:
  147. return v
  148. return cls._assert_str(v, paramname)
  149. @classmethod
  150. def _str_dict(cls, dict_):
  151. if dict_ is None:
  152. return util.EMPTY_DICT
  153. def _assert_value(val):
  154. if isinstance(val, compat.string_types):
  155. return val
  156. elif isinstance(val, collections_abc.Sequence):
  157. return tuple(_assert_value(elem) for elem in val)
  158. else:
  159. raise TypeError(
  160. "Query dictionary values must be strings or "
  161. "sequences of strings"
  162. )
  163. def _assert_str(v):
  164. if not isinstance(v, compat.string_types):
  165. raise TypeError("Query dictionary keys must be strings")
  166. return v
  167. if isinstance(dict_, collections_abc.Sequence):
  168. dict_items = dict_
  169. else:
  170. dict_items = dict_.items()
  171. return util.immutabledict(
  172. {
  173. _assert_str(key): _assert_value(
  174. value,
  175. )
  176. for key, value in dict_items
  177. }
  178. )
  179. def set(
  180. self,
  181. drivername=None,
  182. username=None,
  183. password=None,
  184. host=None,
  185. port=None,
  186. database=None,
  187. query=None,
  188. ):
  189. """return a new :class:`_engine.URL` object with modifications.
  190. Values are used if they are non-None. To set a value to ``None``
  191. explicitly, use the :meth:`_engine.URL._replace` method adapted
  192. from ``namedtuple``.
  193. :param drivername: new drivername
  194. :param username: new username
  195. :param password: new password
  196. :param host: new hostname
  197. :param port: new port
  198. :param query: new query parameters, passed a dict of string keys
  199. referring to string or sequence of string values. Fully
  200. replaces the previous list of arguments.
  201. :return: new :class:`_engine.URL` object.
  202. .. versionadded:: 1.4
  203. .. seealso::
  204. :meth:`_engine.URL.update_query_dict`
  205. """
  206. kw = {}
  207. if drivername is not None:
  208. kw["drivername"] = drivername
  209. if username is not None:
  210. kw["username"] = username
  211. if password is not None:
  212. kw["password"] = password
  213. if host is not None:
  214. kw["host"] = host
  215. if port is not None:
  216. kw["port"] = port
  217. if database is not None:
  218. kw["database"] = database
  219. if query is not None:
  220. kw["query"] = query
  221. return self._replace(**kw)
  222. def _replace(self, **kw):
  223. """Override ``namedtuple._replace()`` to provide argument checking."""
  224. if "drivername" in kw:
  225. self._assert_str(kw["drivername"], "drivername")
  226. for name in "username", "host", "database":
  227. if name in kw:
  228. self._assert_none_str(kw[name], name)
  229. if "port" in kw:
  230. self._assert_port(kw["port"])
  231. if "query" in kw:
  232. kw["query"] = self._str_dict(kw["query"])
  233. return super(URL, self)._replace(**kw)
  234. def update_query_string(self, query_string, append=False):
  235. """Return a new :class:`_engine.URL` object with the :attr:`_engine.URL.query`
  236. parameter dictionary updated by the given query string.
  237. E.g.::
  238. >>> from sqlalchemy.engine import make_url
  239. >>> url = make_url("postgresql://user:pass@host/dbname")
  240. >>> url = url.update_query_string("alt_host=host1&alt_host=host2&ssl_cipher=%2Fpath%2Fto%2Fcrt")
  241. >>> str(url)
  242. 'postgresql://user:pass@host/dbname?alt_host=host1&alt_host=host2&ssl_cipher=%2Fpath%2Fto%2Fcrt'
  243. :param query_string: a URL escaped query string, not including the
  244. question mark.
  245. :param append: if True, parameters in the existing query string will
  246. not be removed; new parameters will be in addition to those present.
  247. If left at its default of False, keys present in the given query
  248. parameters will replace those of the existing query string.
  249. .. versionadded:: 1.4
  250. .. seealso::
  251. :attr:`_engine.URL.query`
  252. :meth:`_engine.URL.update_query_dict`
  253. """ # noqa: E501
  254. return self.update_query_pairs(
  255. util.parse_qsl(query_string), append=append
  256. )
  257. def update_query_pairs(self, key_value_pairs, append=False):
  258. """Return a new :class:`_engine.URL` object with the
  259. :attr:`_engine.URL.query`
  260. parameter dictionary updated by the given sequence of key/value pairs
  261. E.g.::
  262. >>> from sqlalchemy.engine import make_url
  263. >>> url = make_url("postgresql://user:pass@host/dbname")
  264. >>> url = url.update_query_pairs([("alt_host", "host1"), ("alt_host", "host2"), ("ssl_cipher", "/path/to/crt")])
  265. >>> str(url)
  266. 'postgresql://user:pass@host/dbname?alt_host=host1&alt_host=host2&ssl_cipher=%2Fpath%2Fto%2Fcrt'
  267. :param key_value_pairs: A sequence of tuples containing two strings
  268. each.
  269. :param append: if True, parameters in the existing query string will
  270. not be removed; new parameters will be in addition to those present.
  271. If left at its default of False, keys present in the given query
  272. parameters will replace those of the existing query string.
  273. .. versionadded:: 1.4
  274. .. seealso::
  275. :attr:`_engine.URL.query`
  276. :meth:`_engine.URL.difference_update_query`
  277. :meth:`_engine.URL.set`
  278. """ # noqa: E501
  279. existing_query = self.query
  280. new_keys = {}
  281. for key, value in key_value_pairs:
  282. if key in new_keys:
  283. new_keys[key] = util.to_list(new_keys[key])
  284. new_keys[key].append(value)
  285. else:
  286. new_keys[key] = value
  287. if append:
  288. new_query = {}
  289. for k in new_keys:
  290. if k in existing_query:
  291. new_query[k] = util.to_list(
  292. existing_query[k]
  293. ) + util.to_list(new_keys[k])
  294. else:
  295. new_query[k] = new_keys[k]
  296. new_query.update(
  297. {
  298. k: existing_query[k]
  299. for k in set(existing_query).difference(new_keys)
  300. }
  301. )
  302. else:
  303. new_query = self.query.union(new_keys)
  304. return self.set(query=new_query)
  305. def update_query_dict(self, query_parameters, append=False):
  306. """Return a new :class:`_engine.URL` object with the
  307. :attr:`_engine.URL.query` parameter dictionary updated by the given
  308. dictionary.
  309. The dictionary typically contains string keys and string values.
  310. In order to represent a query parameter that is expressed multiple
  311. times, pass a sequence of string values.
  312. E.g.::
  313. >>> from sqlalchemy.engine import make_url
  314. >>> url = make_url("postgresql://user:pass@host/dbname")
  315. >>> url = url.update_query_dict({"alt_host": ["host1", "host2"], "ssl_cipher": "/path/to/crt"})
  316. >>> str(url)
  317. 'postgresql://user:pass@host/dbname?alt_host=host1&alt_host=host2&ssl_cipher=%2Fpath%2Fto%2Fcrt'
  318. :param query_parameters: A dictionary with string keys and values
  319. that are either strings, or sequences of strings.
  320. :param append: if True, parameters in the existing query string will
  321. not be removed; new parameters will be in addition to those present.
  322. If left at its default of False, keys present in the given query
  323. parameters will replace those of the existing query string.
  324. .. versionadded:: 1.4
  325. .. seealso::
  326. :attr:`_engine.URL.query`
  327. :meth:`_engine.URL.update_query_string`
  328. :meth:`_engine.URL.update_query_pairs`
  329. :meth:`_engine.URL.difference_update_query`
  330. :meth:`_engine.URL.set`
  331. """ # noqa: E501
  332. return self.update_query_pairs(query_parameters.items(), append=append)
  333. def difference_update_query(self, names):
  334. """
  335. Remove the given names from the :attr:`_engine.URL.query` dictionary,
  336. returning the new :class:`_engine.URL`.
  337. E.g.::
  338. url = url.difference_update_query(['foo', 'bar'])
  339. Equivalent to using :meth:`_engine.URL.set` as follows::
  340. url = url.set(
  341. query={
  342. key: url.query[key]
  343. for key in set(url.query).difference(['foo', 'bar'])
  344. }
  345. )
  346. .. versionadded:: 1.4
  347. .. seealso::
  348. :attr:`_engine.URL.query`
  349. :meth:`_engine.URL.update_query_dict`
  350. :meth:`_engine.URL.set`
  351. """
  352. if not set(names).intersection(self.query):
  353. return self
  354. return URL(
  355. self.drivername,
  356. self.username,
  357. self.password,
  358. self.host,
  359. self.port,
  360. self.database,
  361. util.immutabledict(
  362. {
  363. key: self.query[key]
  364. for key in set(self.query).difference(names)
  365. }
  366. ),
  367. _new_ok=True,
  368. )
  369. @util.memoized_property
  370. def normalized_query(self):
  371. """Return the :attr:`_engine.URL.query` dictionary with values normalized
  372. into sequences.
  373. As the :attr:`_engine.URL.query` dictionary may contain either
  374. string values or sequences of string values to differentiate between
  375. parameters that are specified multiple times in the query string,
  376. code that needs to handle multiple parameters generically will wish
  377. to use this attribute so that all parameters present are presented
  378. as sequences. Inspiration is from Python's ``urllib.parse.parse_qs``
  379. function. E.g.::
  380. >>> from sqlalchemy.engine import make_url
  381. >>> url = make_url("postgresql://user:pass@host/dbname?alt_host=host1&alt_host=host2&ssl_cipher=%2Fpath%2Fto%2Fcrt")
  382. >>> url.query
  383. immutabledict({'alt_host': ('host1', 'host2'), 'ssl_cipher': '/path/to/crt'})
  384. >>> url.normalized_query
  385. immutabledict({'alt_host': ('host1', 'host2'), 'ssl_cipher': ('/path/to/crt',)})
  386. """ # noqa: E501
  387. return util.immutabledict(
  388. {
  389. k: (v,) if not isinstance(v, tuple) else v
  390. for k, v in self.query.items()
  391. }
  392. )
  393. @util.deprecated(
  394. "1.4",
  395. "The :meth:`_engine.URL.__to_string__ method is deprecated and will "
  396. "be removed in a future release. Please use the "
  397. ":meth:`_engine.URL.render_as_string` method.",
  398. )
  399. def __to_string__(self, hide_password=True):
  400. """Render this :class:`_engine.URL` object as a string.
  401. :param hide_password: Defaults to True. The password is not shown
  402. in the string unless this is set to False.
  403. """
  404. return self.render_as_string(hide_password=hide_password)
  405. def render_as_string(self, hide_password=True):
  406. """Render this :class:`_engine.URL` object as a string.
  407. This method is used when the ``__str__()`` or ``__repr__()``
  408. methods are used. The method directly includes additional options.
  409. :param hide_password: Defaults to True. The password is not shown
  410. in the string unless this is set to False.
  411. """
  412. s = self.drivername + "://"
  413. if self.username is not None:
  414. s += _rfc_1738_quote(self.username)
  415. if self.password is not None:
  416. s += ":" + (
  417. "***"
  418. if hide_password
  419. else _rfc_1738_quote(str(self.password))
  420. )
  421. s += "@"
  422. if self.host is not None:
  423. if ":" in self.host:
  424. s += "[%s]" % self.host
  425. else:
  426. s += self.host
  427. if self.port is not None:
  428. s += ":" + str(self.port)
  429. if self.database is not None:
  430. s += "/" + self.database
  431. if self.query:
  432. keys = list(self.query)
  433. keys.sort()
  434. s += "?" + "&".join(
  435. "%s=%s" % (util.quote_plus(k), util.quote_plus(element))
  436. for k in keys
  437. for element in util.to_list(self.query[k])
  438. )
  439. return s
  440. def __str__(self):
  441. return self.render_as_string(hide_password=False)
  442. def __repr__(self):
  443. return self.render_as_string()
  444. def __copy__(self):
  445. return self.__class__.create(
  446. self.drivername,
  447. self.username,
  448. self.password,
  449. self.host,
  450. self.port,
  451. self.database,
  452. # note this is an immutabledict of str-> str / tuple of str,
  453. # also fully immutable. does not require deepcopy
  454. self.query,
  455. )
  456. def __deepcopy__(self, memo):
  457. return self.__copy__()
  458. def __hash__(self):
  459. return hash(str(self))
  460. def __eq__(self, other):
  461. return (
  462. isinstance(other, URL)
  463. and self.drivername == other.drivername
  464. and self.username == other.username
  465. and self.password == other.password
  466. and self.host == other.host
  467. and self.database == other.database
  468. and self.query == other.query
  469. and self.port == other.port
  470. )
  471. def __ne__(self, other):
  472. return not self == other
  473. def get_backend_name(self):
  474. """Return the backend name.
  475. This is the name that corresponds to the database backend in
  476. use, and is the portion of the :attr:`_engine.URL.drivername`
  477. that is to the left of the plus sign.
  478. """
  479. if "+" not in self.drivername:
  480. return self.drivername
  481. else:
  482. return self.drivername.split("+")[0]
  483. def get_driver_name(self):
  484. """Return the backend name.
  485. This is the name that corresponds to the DBAPI driver in
  486. use, and is the portion of the :attr:`_engine.URL.drivername`
  487. that is to the right of the plus sign.
  488. If the :attr:`_engine.URL.drivername` does not include a plus sign,
  489. then the default :class:`_engine.Dialect` for this :class:`_engine.URL`
  490. is imported in order to get the driver name.
  491. """
  492. if "+" not in self.drivername:
  493. return self.get_dialect().driver
  494. else:
  495. return self.drivername.split("+")[1]
  496. def _instantiate_plugins(self, kwargs):
  497. plugin_names = util.to_list(self.query.get("plugin", ()))
  498. plugin_names += kwargs.get("plugins", [])
  499. kwargs = dict(kwargs)
  500. loaded_plugins = [
  501. plugins.load(plugin_name)(self, kwargs)
  502. for plugin_name in plugin_names
  503. ]
  504. u = self.difference_update_query(["plugin", "plugins"])
  505. for plugin in loaded_plugins:
  506. new_u = plugin.update_url(u)
  507. if new_u is not None:
  508. u = new_u
  509. kwargs.pop("plugins", None)
  510. return u, loaded_plugins, kwargs
  511. def _get_entrypoint(self):
  512. """Return the "entry point" dialect class.
  513. This is normally the dialect itself except in the case when the
  514. returned class implements the get_dialect_cls() method.
  515. """
  516. if "+" not in self.drivername:
  517. name = self.drivername
  518. else:
  519. name = self.drivername.replace("+", ".")
  520. cls = registry.load(name)
  521. # check for legacy dialects that
  522. # would return a module with 'dialect' as the
  523. # actual class
  524. if (
  525. hasattr(cls, "dialect")
  526. and isinstance(cls.dialect, type)
  527. and issubclass(cls.dialect, Dialect)
  528. ):
  529. return cls.dialect
  530. else:
  531. return cls
  532. def get_dialect(self):
  533. """Return the SQLAlchemy :class:`_engine.Dialect` class corresponding
  534. to this URL's driver name.
  535. """
  536. entrypoint = self._get_entrypoint()
  537. dialect_cls = entrypoint.get_dialect_cls(self)
  538. return dialect_cls
  539. def translate_connect_args(self, names=None, **kw):
  540. r"""Translate url attributes into a dictionary of connection arguments.
  541. Returns attributes of this url (`host`, `database`, `username`,
  542. `password`, `port`) as a plain dictionary. The attribute names are
  543. used as the keys by default. Unset or false attributes are omitted
  544. from the final dictionary.
  545. :param \**kw: Optional, alternate key names for url attributes.
  546. :param names: Deprecated. Same purpose as the keyword-based alternate
  547. names, but correlates the name to the original positionally.
  548. """
  549. if names is not None:
  550. util.warn_deprecated(
  551. "The `URL.translate_connect_args.name`s parameter is "
  552. "deprecated. Please pass the "
  553. "alternate names as kw arguments.",
  554. "1.4",
  555. )
  556. translated = {}
  557. attribute_names = ["host", "database", "username", "password", "port"]
  558. for sname in attribute_names:
  559. if names:
  560. name = names.pop(0)
  561. elif sname in kw:
  562. name = kw[sname]
  563. else:
  564. name = sname
  565. if name is not None and getattr(self, sname, False):
  566. if sname == "password":
  567. translated[name] = str(getattr(self, sname))
  568. else:
  569. translated[name] = getattr(self, sname)
  570. return translated
  571. def make_url(name_or_url):
  572. """Given a string or unicode instance, produce a new URL instance.
  573. The given string is parsed according to the RFC 1738 spec. If an
  574. existing URL object is passed, just returns the object.
  575. """
  576. if isinstance(name_or_url, util.string_types):
  577. return _parse_rfc1738_args(name_or_url)
  578. else:
  579. return name_or_url
  580. def _parse_rfc1738_args(name):
  581. pattern = re.compile(
  582. r"""
  583. (?P<name>[\w\+]+)://
  584. (?:
  585. (?P<username>[^:/]*)
  586. (?::(?P<password>[^@]*))?
  587. @)?
  588. (?:
  589. (?:
  590. \[(?P<ipv6host>[^/\?]+)\] |
  591. (?P<ipv4host>[^/:\?]+)
  592. )?
  593. (?::(?P<port>[^/\?]*))?
  594. )?
  595. (?:/(?P<database>[^\?]*))?
  596. (?:\?(?P<query>.*))?
  597. """,
  598. re.X,
  599. )
  600. m = pattern.match(name)
  601. if m is not None:
  602. components = m.groupdict()
  603. if components["query"] is not None:
  604. query = {}
  605. for key, value in util.parse_qsl(components["query"]):
  606. if util.py2k:
  607. key = key.encode("ascii")
  608. if key in query:
  609. query[key] = util.to_list(query[key])
  610. query[key].append(value)
  611. else:
  612. query[key] = value
  613. else:
  614. query = None
  615. components["query"] = query
  616. if components["username"] is not None:
  617. components["username"] = _rfc_1738_unquote(components["username"])
  618. if components["password"] is not None:
  619. components["password"] = _rfc_1738_unquote(components["password"])
  620. ipv4host = components.pop("ipv4host")
  621. ipv6host = components.pop("ipv6host")
  622. components["host"] = ipv4host or ipv6host
  623. name = components.pop("name")
  624. if components["port"]:
  625. components["port"] = int(components["port"])
  626. return URL.create(name, **components)
  627. else:
  628. raise exc.ArgumentError(
  629. "Could not parse rfc1738 URL from string '%s'" % name
  630. )
  631. def _rfc_1738_quote(text):
  632. return re.sub(r"[:@/]", lambda m: "%%%X" % ord(m.group(0)), text)
  633. def _rfc_1738_unquote(text):
  634. return util.unquote(text)
  635. def _parse_keyvalue_args(name):
  636. m = re.match(r"(\w+)://(.*)", name)
  637. if m is not None:
  638. (name, args) = m.group(1, 2)
  639. opts = dict(util.parse_qsl(args))
  640. return URL(name, *opts)
  641. else:
  642. return None