mock.py 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112
  1. import contextlib
  2. import datetime
  3. import inspect
  4. import re
  5. import six
  6. import sqlalchemy as sa
  7. def create_mock_engine(bind, stream=None):
  8. """Create a mock SQLAlchemy engine from the passed engine or bind URL.
  9. :param bind: A SQLAlchemy engine or bind URL to mock.
  10. :param stream: Render all DDL operations to the stream.
  11. """
  12. if not isinstance(bind, six.string_types):
  13. bind_url = str(bind.url)
  14. else:
  15. bind_url = bind
  16. if stream is not None:
  17. def dump(sql, *args, **kwargs):
  18. class Compiler(type(sql._compiler(engine.dialect))):
  19. def visit_bindparam(self, bindparam, *args, **kwargs):
  20. return self.render_literal_value(
  21. bindparam.value, bindparam.type)
  22. def render_literal_value(self, value, type_):
  23. if isinstance(value, six.integer_types):
  24. return str(value)
  25. elif isinstance(value, (datetime.date, datetime.datetime)):
  26. return "'%s'" % value
  27. return super(Compiler, self).render_literal_value(
  28. value, type_)
  29. text = str(Compiler(engine.dialect, sql).process(sql))
  30. text = re.sub(r'\n+', '\n', text)
  31. text = text.strip('\n').strip()
  32. stream.write('\n%s;' % text)
  33. else:
  34. def dump(*args, **kw):
  35. return None
  36. try:
  37. engine = sa.create_mock_engine(bind_url, executor=dump)
  38. except AttributeError: # SQLAlchemy <1.4
  39. engine = sa.create_engine(bind_url, strategy='mock', executor=dump)
  40. return engine
  41. @contextlib.contextmanager
  42. def mock_engine(engine, stream=None):
  43. """Mocks out the engine specified in the passed bind expression.
  44. Note this function is meant for convenience and protected usage. Do NOT
  45. blindly pass user input to this function as it uses exec.
  46. :param engine: A python expression that represents the engine to mock.
  47. :param stream: Render all DDL operations to the stream.
  48. """
  49. # Create a stream if not present.
  50. if stream is None:
  51. stream = six.moves.cStringIO()
  52. # Navigate the stack and find the calling frame that allows the
  53. # expression to execuate.
  54. for frame in inspect.stack()[1:]:
  55. try:
  56. frame = frame[0]
  57. expression = '__target = %s' % engine
  58. six.exec_(expression, frame.f_globals, frame.f_locals)
  59. target = frame.f_locals['__target']
  60. break
  61. except Exception:
  62. pass
  63. else:
  64. raise ValueError('Not a valid python expression', engine)
  65. # Evaluate the expression and get the target engine.
  66. frame.f_locals['__mock'] = create_mock_engine(target, stream)
  67. # Replace the target with our mock.
  68. six.exec_('%s = __mock' % engine, frame.f_globals, frame.f_locals)
  69. # Give control back.
  70. yield stream
  71. # Put the target engine back.
  72. frame.f_locals['__target'] = target
  73. six.exec_('%s = __target' % engine, frame.f_globals, frame.f_locals)
  74. six.exec_('del __target', frame.f_globals, frame.f_locals)
  75. six.exec_('del __mock', frame.f_globals, frame.f_locals)