123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163 |
- from weakref import ref
- from blinker._saferef import BoundMethodWeakref
- try:
- callable
- except NameError:
- def callable(object):
- return hasattr(object, '__call__')
- try:
- from collections import defaultdict
- except:
- class defaultdict(dict):
- def __init__(self, default_factory=None, *a, **kw):
- if (default_factory is not None and
- not hasattr(default_factory, '__call__')):
- raise TypeError('first argument must be callable')
- dict.__init__(self, *a, **kw)
- self.default_factory = default_factory
- def __getitem__(self, key):
- try:
- return dict.__getitem__(self, key)
- except KeyError:
- return self.__missing__(key)
- def __missing__(self, key):
- if self.default_factory is None:
- raise KeyError(key)
- self[key] = value = self.default_factory()
- return value
- def __reduce__(self):
- if self.default_factory is None:
- args = tuple()
- else:
- args = self.default_factory,
- return type(self), args, None, None, self.items()
- def copy(self):
- return self.__copy__()
- def __copy__(self):
- return type(self)(self.default_factory, self)
- def __deepcopy__(self, memo):
- import copy
- return type(self)(self.default_factory,
- copy.deepcopy(self.items()))
- def __repr__(self):
- return 'defaultdict(%s, %s)' % (self.default_factory,
- dict.__repr__(self))
- try:
- from contextlib import contextmanager
- except ImportError:
- def contextmanager(fn):
- def oops(*args, **kw):
- raise RuntimeError("Python 2.5 or above is required to use "
- "context managers.")
- oops.__name__ = fn.__name__
- return oops
- class _symbol(object):
- def __init__(self, name):
- """Construct a new named symbol."""
- self.__name__ = self.name = name
- def __reduce__(self):
- return symbol, (self.name,)
- def __repr__(self):
- return self.name
- _symbol.__name__ = 'symbol'
- class symbol(object):
- """A constant symbol.
- >>> symbol('foo') is symbol('foo')
- True
- >>> symbol('foo')
- foo
- A slight refinement of the MAGICCOOKIE=object() pattern. The primary
- advantage of symbol() is its repr(). They are also singletons.
- Repeated calls of symbol('name') will all return the same instance.
- """
- symbols = {}
- def __new__(cls, name):
- try:
- return cls.symbols[name]
- except KeyError:
- return cls.symbols.setdefault(name, _symbol(name))
- try:
- text = (str, unicode)
- except NameError:
- text = str
- def hashable_identity(obj):
- if hasattr(obj, '__func__'):
- return (id(obj.__func__), id(obj.__self__))
- elif hasattr(obj, 'im_func'):
- return (id(obj.im_func), id(obj.im_self))
- elif isinstance(obj, text):
- return obj
- else:
- return id(obj)
- WeakTypes = (ref, BoundMethodWeakref)
- class annotatable_weakref(ref):
- """A weakref.ref that supports custom instance attributes."""
- def reference(object, callback=None, **annotations):
- """Return an annotated weak ref."""
- if callable(object):
- weak = callable_reference(object, callback)
- else:
- weak = annotatable_weakref(object, callback)
- for key, value in annotations.items():
- setattr(weak, key, value)
- return weak
- def callable_reference(object, callback=None):
- """Return an annotated weak ref, supporting bound instance methods."""
- if hasattr(object, 'im_self') and object.im_self is not None:
- return BoundMethodWeakref(target=object, on_delete=callback)
- elif hasattr(object, '__self__') and object.__self__ is not None:
- return BoundMethodWeakref(target=object, on_delete=callback)
- return annotatable_weakref(object, callback)
- class lazy_property(object):
- """A @property that is only evaluated once."""
- def __init__(self, deferred):
- self._deferred = deferred
- self.__doc__ = deferred.__doc__
- def __get__(self, obj, cls):
- if obj is None:
- return self
- value = self._deferred(obj)
- setattr(obj, self._deferred.__name__, value)
- return value
|