123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267 |
- import sys
- import unittest
- import greenlet
- class SomeError(Exception):
- pass
- class GreenletTracer(object):
- oldtrace = None
- def __init__(self, error_on_trace=False):
- self.actions = []
- self.error_on_trace = error_on_trace
- def __call__(self, *args):
- self.actions.append(args)
- if self.error_on_trace:
- raise SomeError
- def __enter__(self):
- self.oldtrace = greenlet.settrace(self)
- return self.actions
- def __exit__(self, *args):
- greenlet.settrace(self.oldtrace)
- class TestGreenletTracing(unittest.TestCase):
- """
- Tests of ``greenlet.settrace()``
- """
- def test_greenlet_tracing(self):
- main = greenlet.getcurrent()
- def dummy():
- pass
- def dummyexc():
- raise SomeError()
- with GreenletTracer() as actions:
- g1 = greenlet.greenlet(dummy)
- g1.switch()
- g2 = greenlet.greenlet(dummyexc)
- self.assertRaises(SomeError, g2.switch)
- self.assertEqual(actions, [
- ('switch', (main, g1)),
- ('switch', (g1, main)),
- ('switch', (main, g2)),
- ('throw', (g2, main)),
- ])
- def test_exception_disables_tracing(self):
- main = greenlet.getcurrent()
- def dummy():
- main.switch()
- g = greenlet.greenlet(dummy)
- g.switch()
- with GreenletTracer(error_on_trace=True) as actions:
- self.assertRaises(SomeError, g.switch)
- self.assertEqual(greenlet.gettrace(), None)
- self.assertEqual(actions, [
- ('switch', (main, g)),
- ])
- class PythonTracer(object):
- oldtrace = None
- def __init__(self):
- self.actions = []
- def __call__(self, frame, event, arg):
- # Record the co_name so we have an idea what function we're in.
- self.actions.append((event, frame.f_code.co_name))
- def __enter__(self):
- self.oldtrace = sys.setprofile(self)
- return self.actions
- def __exit__(self, *args):
- sys.setprofile(self.oldtrace)
- def tpt_callback():
- return 42
- class TestPythonTracing(unittest.TestCase):
- """
- Tests of the interaction of ``sys.settrace()``
- with greenlet facilities.
- NOTE: Most of this is probably CPython specific.
- """
- maxDiff = None
- def test_trace_events_trivial(self):
- with PythonTracer() as actions:
- tpt_callback()
- # If we use the sys.settrace instead of setprofile, we get
- # this:
- # self.assertEqual(actions, [
- # ('call', 'tpt_callback'),
- # ('call', '__exit__'),
- # ])
- self.assertEqual(actions, [
- ('return', '__enter__'),
- ('call', 'tpt_callback'),
- ('return', 'tpt_callback'),
- ('call', '__exit__'),
- ('c_call', '__exit__'),
- ])
- def _trace_switch(self, glet):
- with PythonTracer() as actions:
- glet.switch()
- return actions
- def _check_trace_events_func_already_set(self, glet):
- actions = self._trace_switch(glet)
- self.assertEqual(actions, [
- ('return', '__enter__'),
- ('c_call', '_trace_switch'),
- ('call', 'run'),
- ('call', 'tpt_callback'),
- ('return', 'tpt_callback'),
- ('return', 'run'),
- ('c_return', '_trace_switch'),
- ('call', '__exit__'),
- ('c_call', '__exit__'),
- ])
- def test_trace_events_into_greenlet_func_already_set(self):
- def run():
- return tpt_callback()
- self._check_trace_events_func_already_set(greenlet.greenlet(run))
- def test_trace_events_into_greenlet_subclass_already_set(self):
- class X(greenlet.greenlet):
- def run(self):
- return tpt_callback()
- self._check_trace_events_func_already_set(X())
- def _check_trace_events_from_greenlet_sets_profiler(self, g, tracer):
- g.switch()
- tpt_callback()
- tracer.__exit__()
- self.assertEqual(tracer.actions, [
- ('return', '__enter__'),
- ('call', 'tpt_callback'),
- ('return', 'tpt_callback'),
- ('return', 'run'),
- ('call', 'tpt_callback'),
- ('return', 'tpt_callback'),
- ('call', '__exit__'),
- ('c_call', '__exit__'),
- ])
- def test_trace_events_from_greenlet_func_sets_profiler(self):
- tracer = PythonTracer()
- def run():
- tracer.__enter__()
- return tpt_callback()
- self._check_trace_events_from_greenlet_sets_profiler(greenlet.greenlet(run),
- tracer)
- def test_trace_events_from_greenlet_subclass_sets_profiler(self):
- tracer = PythonTracer()
- class X(greenlet.greenlet):
- def run(self):
- tracer.__enter__()
- return tpt_callback()
- self._check_trace_events_from_greenlet_sets_profiler(X(), tracer)
- def test_trace_events_multiple_greenlets_switching(self):
- tracer = PythonTracer()
- g1 = None
- g2 = None
- def g1_run():
- tracer.__enter__()
- tpt_callback()
- g2.switch()
- tpt_callback()
- return 42
- def g2_run():
- tpt_callback()
- tracer.__exit__()
- tpt_callback()
- g1.switch()
- g1 = greenlet.greenlet(g1_run)
- g2 = greenlet.greenlet(g2_run)
- x = g1.switch()
- self.assertEqual(x, 42)
- tpt_callback() # ensure not in the trace
- self.assertEqual(tracer.actions, [
- ('return', '__enter__'),
- ('call', 'tpt_callback'),
- ('return', 'tpt_callback'),
- ('c_call', 'g1_run'),
- ('call', 'g2_run'),
- ('call', 'tpt_callback'),
- ('return', 'tpt_callback'),
- ('call', '__exit__'),
- ('c_call', '__exit__'),
- ])
- def test_trace_events_multiple_greenlets_switching_siblings(self):
- # Like the first version, but get both greenlets running first
- # as "siblings" and then establish the tracing.
- tracer = PythonTracer()
- g1 = None
- g2 = None
- def g1_run():
- greenlet.getcurrent().parent.switch()
- tracer.__enter__()
- tpt_callback()
- g2.switch()
- tpt_callback()
- return 42
- def g2_run():
- greenlet.getcurrent().parent.switch()
- tpt_callback()
- tracer.__exit__()
- tpt_callback()
- g1.switch()
- g1 = greenlet.greenlet(g1_run)
- g2 = greenlet.greenlet(g2_run)
- # Start g1
- g1.switch()
- # And it immediately returns control to us.
- # Start g2
- g2.switch()
- # Which also returns. Now kick of the real part of the
- # test.
- x = g1.switch()
- self.assertEqual(x, 42)
- tpt_callback() # ensure not in the trace
- self.assertEqual(tracer.actions, [
- ('return', '__enter__'),
- ('call', 'tpt_callback'),
- ('return', 'tpt_callback'),
- ('c_call', 'g1_run'),
- ('call', 'tpt_callback'),
- ('return', 'tpt_callback'),
- ('call', '__exit__'),
- ('c_call', '__exit__'),
- ])
|