123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178 |
- import unittest
- import sys
- import gc
- import time
- import weakref
- import threading
- import greenlet
- class TestLeaks(unittest.TestCase):
- def test_arg_refs(self):
- args = ('a', 'b', 'c')
- refcount_before = sys.getrefcount(args)
- # pylint:disable=unnecessary-lambda
- g = greenlet.greenlet(
- lambda *args: greenlet.getcurrent().parent.switch(*args))
- for _ in range(100):
- g.switch(*args)
- self.assertEqual(sys.getrefcount(args), refcount_before)
- def test_kwarg_refs(self):
- kwargs = {}
- # pylint:disable=unnecessary-lambda
- g = greenlet.greenlet(
- lambda **kwargs: greenlet.getcurrent().parent.switch(**kwargs))
- for _ in range(100):
- g.switch(**kwargs)
- self.assertEqual(sys.getrefcount(kwargs), 2)
- assert greenlet.GREENLET_USE_GC # Option to disable this was removed in 1.0
- def recycle_threads(self):
- # By introducing a thread that does sleep we allow other threads,
- # that have triggered their __block condition, but did not have a
- # chance to deallocate their thread state yet, to finally do so.
- # The way it works is by requiring a GIL switch (different thread),
- # which does a GIL release (sleep), which might do a GIL switch
- # to finished threads and allow them to clean up.
- def worker():
- time.sleep(0.001)
- t = threading.Thread(target=worker)
- t.start()
- time.sleep(0.001)
- t.join()
- def test_threaded_leak(self):
- gg = []
- def worker():
- # only main greenlet present
- gg.append(weakref.ref(greenlet.getcurrent()))
- for _ in range(2):
- t = threading.Thread(target=worker)
- t.start()
- t.join()
- del t
- greenlet.getcurrent() # update ts_current
- self.recycle_threads()
- greenlet.getcurrent() # update ts_current
- gc.collect()
- greenlet.getcurrent() # update ts_current
- for g in gg:
- self.assertIsNone(g())
- def test_threaded_adv_leak(self):
- gg = []
- def worker():
- # main and additional *finished* greenlets
- ll = greenlet.getcurrent().ll = []
- def additional():
- ll.append(greenlet.getcurrent())
- for _ in range(2):
- greenlet.greenlet(additional).switch()
- gg.append(weakref.ref(greenlet.getcurrent()))
- for _ in range(2):
- t = threading.Thread(target=worker)
- t.start()
- t.join()
- del t
- greenlet.getcurrent() # update ts_current
- self.recycle_threads()
- greenlet.getcurrent() # update ts_current
- gc.collect()
- greenlet.getcurrent() # update ts_current
- for g in gg:
- self.assertIsNone(g())
- def test_issue251_killing_cross_thread_leaks_list(self, manually_collect_background=True):
- # See https://github.com/python-greenlet/greenlet/issues/251
- # Killing a greenlet (probably not the main one)
- # in one thread from another thread would
- # result in leaking a list (the ts_delkey list).
- # For the test to be valid, even empty lists have to be tracked by the
- # GC
- assert gc.is_tracked([])
- def count_objects(kind=list):
- # pylint:disable=unidiomatic-typecheck
- # Collect the garbage.
- for _ in range(3):
- gc.collect()
- gc.collect()
- return sum(
- 1
- for x in gc.get_objects()
- if type(x) is kind
- )
- # XXX: The main greenlet of a dead thread is only released
- # when one of the proper greenlet APIs is used from a different
- # running thread. See #252 (https://github.com/python-greenlet/greenlet/issues/252)
- greenlet.getcurrent()
- greenlets_before = count_objects(greenlet.greenlet)
- background_glet_running = threading.Event()
- background_glet_killed = threading.Event()
- background_greenlets = []
- def background_greenlet():
- # Throw control back to the main greenlet.
- greenlet.getcurrent().parent.switch()
- def background_thread():
- glet = greenlet.greenlet(background_greenlet)
- background_greenlets.append(glet)
- glet.switch() # Be sure it's active.
- # Control is ours again.
- del glet # Delete one reference from the thread it runs in.
- background_glet_running.set()
- background_glet_killed.wait()
- # To trigger the background collection of the dead
- # greenlet, thus clearing out the contents of the list, we
- # need to run some APIs. See issue 252.
- if manually_collect_background:
- greenlet.getcurrent()
- t = threading.Thread(target=background_thread)
- t.start()
- background_glet_running.wait()
- lists_before = count_objects()
- assert len(background_greenlets) == 1
- self.assertFalse(background_greenlets[0].dead)
- # Delete the last reference to the background greenlet
- # from a different thread. This puts it in the background thread's
- # ts_delkey list.
- del background_greenlets[:]
- background_glet_killed.set()
- # Now wait for the background thread to die.
- t.join(10)
- del t
- # Free the background main greenlet by forcing greenlet to notice a difference.
- greenlet.getcurrent()
- greenlets_after = count_objects(greenlet.greenlet)
- lists_after = count_objects()
- # On 2.7, we observe that lists_after is smaller than
- # lists_before. No idea what lists got cleaned up. All the
- # Python 3 versions match exactly.
- self.assertLessEqual(lists_after, lists_before)
- self.assertEqual(greenlets_before, greenlets_after)
- @unittest.expectedFailure
- def test_issue251_issue252_need_to_collect_in_background(self):
- # This still fails because the leak of the list
- # still exists when we don't call a greenlet API before exiting the
- # thread. The proximate cause is that neither of the two greenlets
- # from the background thread are actually being destroyed, even though
- # the GC is in fact visiting both objects.
- # It's not clear where that leak is? For some reason the thread-local dict
- # holding it isn't being cleaned up.
- self.test_issue251_killing_cross_thread_leaks_list(manually_collect_background=False)
|