summaryrefslogtreecommitdiffstats
path: root/venv/lib/python3.9/site-packages/pympler/classtracker.py
diff options
context:
space:
mode:
Diffstat (limited to 'venv/lib/python3.9/site-packages/pympler/classtracker.py')
-rw-r--r--venv/lib/python3.9/site-packages/pympler/classtracker.py590
1 files changed, 590 insertions, 0 deletions
diff --git a/venv/lib/python3.9/site-packages/pympler/classtracker.py b/venv/lib/python3.9/site-packages/pympler/classtracker.py
new file mode 100644
index 00000000..b187e4b3
--- /dev/null
+++ b/venv/lib/python3.9/site-packages/pympler/classtracker.py
@@ -0,0 +1,590 @@
+"""
+The `ClassTracker` is a facility delivering insight into the memory
+distribution of a Python program. It can introspect memory consumption of
+certain classes and objects. Facilities are provided to track and size
+individual objects or all instances of certain classes. Tracked objects are
+sized recursively to provide an overview of memory distribution between the
+different tracked objects.
+"""
+
+from typing import Any, Callable, Dict, IO, List, Optional, Tuple
+
+from collections import defaultdict
+from functools import partial
+from inspect import stack, isclass
+from threading import Thread, Lock
+from time import sleep, time
+from weakref import ref as weakref_ref
+
+from pympler.classtracker_stats import ConsoleStats
+from pympler.util.stringutils import safe_repr
+
+import pympler.asizeof as asizeof
+import pympler.process
+
+
+__all__ = ["ClassTracker"]
+
+# Fixpoint for program start relative time stamp.
+_local_start = time()
+
+
+class _ClassObserver(object):
+ """
+ Stores options for tracked classes.
+ The observer also keeps the original constructor of the observed class.
+ """
+ __slots__ = ('init', 'name', 'detail', 'keep', 'trace')
+
+ def __init__(self, init: Callable, name: str, detail: int, keep: bool,
+ trace: bool):
+ self.init = init
+ self.name = name
+ self.detail = detail
+ self.keep = keep
+ self.trace = trace
+
+ def modify(self, name: str, detail: int, keep: bool, trace: bool) -> None:
+ self.name = name
+ self.detail = detail
+ self.keep = keep
+ self.trace = trace
+
+
+def _get_time() -> float:
+ """
+ Get a timestamp relative to the program start time.
+ """
+ return time() - _local_start
+
+
+class TrackedObject(object):
+ """
+ Stores size and lifetime information of a tracked object. A weak reference
+ is attached to monitor the object without preventing its deletion.
+ """
+ __slots__ = ("ref", "id", "repr", "name", "birth", "death", "trace",
+ "snapshots", "_resolution_level", "__dict__")
+
+ def __init__(self, instance: Any, name: str, resolution_level: int = 0,
+ trace: bool = False, on_delete: Optional[Callable] = None):
+ """
+ Create a weak reference for 'instance' to observe an object but which
+ won't prevent its deletion (which is monitored by the finalize
+ callback). The size of the object is recorded in 'snapshots' as
+ (timestamp, size) tuples.
+ """
+ self.ref = weakref_ref(instance, self.finalize)
+ self.id = id(instance)
+ self.repr = ''
+ self.name = name
+ self.birth = _get_time()
+ self.death = None # type: Optional[float]
+ self._resolution_level = resolution_level
+ self.trace = None # type: Optional[List[Tuple]]
+
+ if trace:
+ self._save_trace()
+
+ initial_size = asizeof.basicsize(instance) or 0
+ size = asizeof.Asized(initial_size, initial_size)
+ self.snapshots = [(self.birth, size)]
+ self.on_delete = on_delete
+
+ def __getstate__(self) -> Dict:
+ """
+ Make the object serializable for dump_stats. Read the available slots
+ and store the values in a dictionary. Derived values (stored in the
+ dict) are not pickled as those can be reconstructed based on the other
+ data. References cannot be serialized, ignore 'ref' as well.
+ """
+ state = {}
+ for name in getattr(TrackedObject, '__slots__', ()):
+ if hasattr(self, name) and name not in ['ref', '__dict__']:
+ state[name] = getattr(self, name)
+ return state
+
+ def __setstate__(self, state: Dict) -> None:
+ """
+ Restore the state from pickled data. Needed because a slotted class is
+ used.
+ """
+ for key, value in list(state.items()):
+ setattr(self, key, value)
+
+ def _save_trace(self) -> None:
+ """
+ Save current stack trace as formatted string.
+ """
+ stack_trace = stack()
+ try:
+ self.trace = []
+ for frm in stack_trace[5:]: # eliminate our own overhead
+ self.trace.insert(0, frm[1:])
+ finally:
+ del stack_trace
+
+ def track_size(self, ts: float, sizer: asizeof.Asizer) -> None:
+ """
+ Store timestamp and current size for later evaluation.
+ The 'sizer' is a stateful sizing facility that excludes other tracked
+ objects.
+ """
+ obj = self.ref()
+ self.snapshots.append(
+ (ts, sizer.asized(obj, detail=self._resolution_level))
+ )
+ if obj is not None:
+ self.repr = safe_repr(obj, clip=128)
+
+ def get_max_size(self) -> int:
+ """
+ Get the maximum of all sampled sizes.
+ """
+ return max([s.size for (_, s) in self.snapshots])
+
+ def get_size_at_time(self, timestamp: float) -> int:
+ """
+ Get the size of the object at a specific time (snapshot).
+ If the object was not alive/sized at that instant, return 0.
+ """
+ size = 0
+ for (t, s) in self.snapshots:
+ if t == timestamp:
+ size = s.size
+ return size
+
+ def set_resolution_level(self, resolution_level: int) -> None:
+ """
+ Set resolution level to a new value. The next size estimation will
+ respect the new value. This is useful to set different levels for
+ different instances of tracked classes.
+ """
+ self._resolution_level = resolution_level
+
+ def finalize(self, ref: weakref_ref) -> None:
+ """
+ Mark the reference as dead and remember the timestamp. It would be
+ great if we could measure the pre-destruction size. Unfortunately, the
+ object is gone by the time the weakref callback is called. However,
+ weakref callbacks are useful to be informed when tracked objects died
+ without the need of destructors.
+
+ If the object is destroyed at the end of the program execution, it's
+ not possible to import modules anymore. Hence, the finalize callback
+ just does nothing (self.death stays None).
+ """
+ try:
+ self.death = _get_time()
+ if self.on_delete:
+ self.on_delete()
+ except Exception: # pragma: no cover
+ pass
+
+
+def track_object_creation(time_series: List[Tuple[float, int]]) -> None:
+ num_instances = time_series[-1][1] if time_series else 0
+ time_series.append((_get_time(), num_instances+1))
+
+
+def track_object_deletion(time_series: List[Tuple[float, int]]) -> None:
+ num_instances = time_series[-1][1]
+ time_series.append((_get_time(), num_instances-1))
+
+
+class PeriodicThread(Thread):
+ """
+ Thread object to take snapshots periodically.
+ """
+
+ def __init__(self, tracker: 'ClassTracker', interval: float, *args: Any,
+ **kwargs: Any):
+ """
+ Create thread with given interval and associated with the given
+ tracker.
+ """
+ self.interval = interval
+ self.tracker = tracker
+ self.stop = False
+ super(PeriodicThread, self).__init__(*args, **kwargs)
+
+ def run(self) -> None:
+ """
+ Loop until a stop signal is set.
+ """
+ self.stop = False
+ while not self.stop:
+ self.tracker.create_snapshot()
+ sleep(self.interval)
+
+
+class Snapshot(object):
+ """Sample sizes of objects and the process at an instant."""
+
+ def __init__(self, timestamp: float, description: str = '') -> None:
+ """Initialize process-wide size information."""
+ self.tracked_total = 0
+ self.asizeof_total = 0
+ self.overhead = 0
+ self.timestamp = timestamp
+ self.system_total = pympler.process.ProcessMemoryInfo()
+ self.desc = description
+ self.classes = None # type: Optional[Dict[str, Dict[str, Any]]]
+
+ @property
+ def total(self) -> int:
+ """
+ Return the total (virtual) size of the process in bytes. If process
+ information is not available, get the best number available, even if it
+ is a poor approximation of reality.
+ """
+ if self.system_total.available:
+ return self.system_total.vsz
+ elif self.asizeof_total: # pragma: no cover
+ return self.asizeof_total
+ else: # pragma: no cover
+ return self.tracked_total
+
+ @property
+ def label(self) -> str:
+ """Return timestamped label for this snapshot, or a raw timestamp."""
+ if not self.desc:
+ return "%.3fs" % self.timestamp
+ return "%s (%.3fs)" % (self.desc, self.timestamp)
+
+
+class ClassTracker(object):
+
+ def __init__(self, stream: Optional[IO] = None):
+ """
+ Creates a new `ClassTracker` object.
+
+ :param stream: Output stream to use when printing statistics via
+ ``stats``.
+ """
+ # Dictionaries of TrackedObject objects associated with the actual
+ # objects that are tracked. 'index' uses the class name as the key and
+ # associates a list of tracked objects. It contains all TrackedObject
+ # instances, including those of dead objects.
+ self.index = defaultdict(list) # type: Dict[str, List[TrackedObject]]
+
+ # 'objects' uses the id (address) as the key and associates the tracked
+ # object with it. TrackedObject's referring to dead objects are
+ # replaced lazily, i.e. when the id is recycled by another tracked
+ # object.
+ self.objects = {} # type: Dict[int, Any]
+
+ # List of `Snapshot` objects.
+ self.snapshots = [] # type: List[Snapshot]
+
+ # Time series of instance count for each tracked class.
+ self.history = defaultdict(list) \
+ # type: Dict[str, List[Tuple[float, int]]]
+
+ # Keep objects alive by holding a strong reference.
+ self._keepalive = [] # type: List[Any]
+
+ # Dictionary of class observers identified by classname.
+ self._observers = {} # type: Dict[type, _ClassObserver]
+
+ # Thread object responsible for background monitoring
+ self._periodic_thread = None # type: Optional[PeriodicThread]
+
+ self._stream = stream
+
+ @property
+ def stats(self) -> ConsoleStats:
+ """
+ Return a ``ConsoleStats`` instance initialized with the current state
+ of the class tracker.
+ """
+ return ConsoleStats(tracker=self, stream=self._stream)
+
+ def _tracker(self, _observer_: _ClassObserver, _self_: Any, *args: Any,
+ **kwds: Any) -> None:
+ """
+ Injected constructor for tracked classes.
+ Call the actual constructor of the object and track the object. Attach
+ to the object before calling the constructor to track the object with
+ the parameters of the most specialized class.
+ """
+ self.track_object(_self_,
+ name=_observer_.name,
+ resolution_level=_observer_.detail,
+ keep=_observer_.keep,
+ trace=_observer_.trace)
+ _observer_.init(_self_, *args, **kwds)
+
+ def _inject_constructor(self, cls: type, func: Callable, name: str,
+ resolution_level: int, keep: bool, trace: bool,
+ ) -> None:
+ """
+ Modifying Methods in Place - after the recipe 15.7 in the Python
+ Cookbook by Ken Seehof. The original constructors may be restored
+ later.
+ """
+ try:
+ constructor = cls.__init__ # type: ignore
+ except AttributeError:
+ def constructor(self: Any, *_args: Any, **_kwargs: Any) -> None:
+ pass
+
+ # Possible name clash between keyword arguments of the tracked class'
+ # constructor and the curried arguments of the injected constructor.
+ # Therefore, the additional argument has a 'magic' name to make it less
+ # likely that an argument name clash occurs.
+ observer = _ClassObserver(constructor,
+ name,
+ resolution_level,
+ keep,
+ trace)
+ self._observers[cls] = observer
+
+ def new_constructor(*args: Any, **kwargs: Any) -> None:
+ return func(observer, *args, **kwargs)
+
+ cls.__init__ = new_constructor # type: ignore
+
+ def _is_tracked(self, cls: type) -> bool:
+ """
+ Determine if the class is tracked.
+ """
+ return cls in self._observers
+
+ def _track_modify(self, cls: type, name: str, detail: int, keep: bool,
+ trace: bool) -> None:
+ """
+ Modify settings of a tracked class
+ """
+ self._observers[cls].modify(name, detail, keep, trace)
+
+ def _restore_constructor(self, cls: type) -> None:
+ """
+ Restore the original constructor, lose track of class.
+ """
+ cls.__init__ = self._observers[cls].init # type: ignore
+ del self._observers[cls]
+
+ def track_change(self, instance: Any, resolution_level: int = 0) -> None:
+ """
+ Change tracking options for the already tracked object 'instance'.
+ If instance is not tracked, a KeyError will be raised.
+ """
+ tobj = self.objects[id(instance)]
+ tobj.set_resolution_level(resolution_level)
+
+ def track_object(self, instance: Any, name: Optional[str] = None,
+ resolution_level: int = 0, keep: bool = False,
+ trace: bool = False) -> None:
+ """
+ Track object 'instance' and sample size and lifetime information. Not
+ all objects can be tracked; trackable objects are class instances and
+ other objects that can be weakly referenced. When an object cannot be
+ tracked, a `TypeError` is raised.
+
+ :param resolution_level: The recursion depth up to which referents are
+ sized individually. Resolution level 0 (default) treats the object
+ as an opaque entity, 1 sizes all direct referents individually, 2
+ also sizes the referents of the referents and so forth.
+ :param keep: Prevent the object's deletion by keeping a (strong)
+ reference to the object.
+ """
+
+ # Check if object is already tracked. This happens if track_object is
+ # called multiple times for the same object or if an object inherits
+ # from multiple tracked classes. In the latter case, the most
+ # specialized class wins. To detect id recycling, the weak reference
+ # is checked. If it is 'None' a tracked object is dead and another one
+ # takes the same 'id'.
+ if id(instance) in self.objects and \
+ self.objects[id(instance)].ref() is not None:
+ return
+
+ name = name if name else instance.__class__.__name__
+
+ track_object_creation(self.history[name])
+ on_delete = partial(track_object_deletion, self.history[name])
+
+ tobj = TrackedObject(instance,
+ name,
+ resolution_level=resolution_level,
+ trace=trace,
+ on_delete=on_delete)
+
+ self.index[name].append(tobj)
+ self.objects[id(instance)] = tobj
+
+ if keep:
+ self._keepalive.append(instance)
+
+ def track_class(self, cls: type, name: Optional[str] = None,
+ resolution_level: int = 0, keep: bool = False,
+ trace: bool = False) -> None:
+ """
+ Track all objects of the class `cls`. Objects of that type that already
+ exist are *not* tracked. If `track_class` is called for a class already
+ tracked, the tracking parameters are modified. Instantiation traces can
+ be generated by setting `trace` to True.
+ A constructor is injected to begin instance tracking on creation
+ of the object. The constructor calls `track_object` internally.
+
+ :param cls: class to be tracked, may be an old-style or a new-style
+ class
+ :param name: reference the class by a name, default is the
+ concatenation of module and class name
+ :param resolution_level: The recursion depth up to which referents are
+ sized individually. Resolution level 0 (default) treats the object
+ as an opaque entity, 1 sizes all direct referents individually, 2
+ also sizes the referents of the referents and so forth.
+ :param keep: Prevent the object's deletion by keeping a (strong)
+ reference to the object.
+ :param trace: Save instantiation stack trace for each instance
+ """
+ if not isclass(cls):
+ raise TypeError("only class objects can be tracked")
+ if name is None:
+ name = cls.__module__ + '.' + cls.__name__
+ if self._is_tracked(cls):
+ self._track_modify(cls, name, resolution_level, keep, trace)
+ else:
+ self._inject_constructor(cls, self._tracker, name,
+ resolution_level, keep, trace)
+
+ def detach_class(self, cls: type) -> None:
+ """
+ Stop tracking class 'cls'. Any new objects of that type are not
+ tracked anymore. Existing objects are still tracked.
+ """
+ self._restore_constructor(cls)
+
+ def detach_all_classes(self) -> None:
+ """
+ Detach from all tracked classes.
+ """
+ classes = list(self._observers.keys())
+ for cls in classes:
+ self.detach_class(cls)
+
+ def detach_all(self) -> None:
+ """
+ Detach from all tracked classes and objects.
+ Restore the original constructors and cleanse the tracking lists.
+ """
+ self.detach_all_classes()
+ self.objects.clear()
+ self.index.clear()
+ self._keepalive[:] = []
+
+ def clear(self) -> None:
+ """
+ Clear all gathered data and detach from all tracked objects/classes.
+ """
+ self.detach_all()
+ self.snapshots[:] = []
+
+ def close(self) -> None:
+ """
+ Detach from tracked classes by removing injected constructors. Makes it
+ possible to use ClassTracker in `contextlib.closing` to safely remove
+ profiling hooks when the tracker goes out of scope::
+
+ import contextlib
+ with contextlib.closing(ClassTracker()) as tracker:
+ tracker.track_class(Foo)
+
+ """
+ self.detach_all_classes()
+
+#
+# Background Monitoring
+#
+
+ def start_periodic_snapshots(self, interval: float = 1.0) -> None:
+ """
+ Start a thread which takes snapshots periodically. The `interval`
+ specifies the time in seconds the thread waits between taking
+ snapshots. The thread is started as a daemon allowing the program to
+ exit. If periodic snapshots are already active, the interval is
+ updated.
+ """
+ if not self._periodic_thread:
+ self._periodic_thread = PeriodicThread(self, interval,
+ name='BackgroundMonitor')
+ self._periodic_thread.setDaemon(True)
+ self._periodic_thread.start()
+ else:
+ self._periodic_thread.interval = interval
+
+ def stop_periodic_snapshots(self) -> None:
+ """
+ Post a stop signal to the thread that takes the periodic snapshots. The
+ function waits for the thread to terminate which can take some time
+ depending on the configured interval.
+ """
+ if self._periodic_thread and self._periodic_thread.is_alive():
+ self._periodic_thread.stop = True
+ self._periodic_thread.join()
+ self._periodic_thread = None
+
+#
+# Snapshots
+#
+
+ snapshot_lock = Lock()
+
+ def create_snapshot(self, description: str = '',
+ compute_total: bool = False) -> None:
+ """
+ Collect current per instance statistics and saves total amount of
+ memory associated with the Python process.
+
+ If `compute_total` is `True`, the total consumption of all objects
+ known to *asizeof* is computed. The latter might be very slow if many
+ objects are mapped into memory at the time the snapshot is taken.
+ Therefore, `compute_total` is set to `False` by default.
+
+ The overhead of the `ClassTracker` structure is also computed.
+
+ Snapshots can be taken asynchronously. The function is protected with a
+ lock to prevent race conditions.
+ """
+
+ try:
+ # TODO: It is not clear what happens when memory is allocated or
+ # released while this function is executed but it will likely lead
+ # to inconsistencies. Either pause all other threads or don't size
+ # individual objects in asynchronous mode.
+ self.snapshot_lock.acquire()
+
+ timestamp = _get_time()
+
+ sizer = asizeof.Asizer()
+ objs = [tobj.ref() for tobj in list(self.objects.values())]
+ sizer.exclude_refs(*objs)
+
+ # The objects need to be sized in a deterministic order. Sort the
+ # objects by its creation date which should at least work for
+ # non-parallel execution. The "proper" fix would be to handle
+ # shared data separately.
+ tracked_objects = list(self.objects.values())
+ tracked_objects.sort(key=lambda x: x.birth)
+ for tobj in tracked_objects:
+ tobj.track_size(timestamp, sizer)
+
+ snapshot = Snapshot(timestamp, str(description))
+ snapshot.tracked_total = sizer.total
+ if compute_total:
+ snapshot.asizeof_total = asizeof.asizeof(all=True, code=True)
+
+ # Compute overhead of all structures, use sizer to exclude tracked
+ # objects(!)
+ snapshot.overhead = 0
+ if snapshot.tracked_total:
+ snapshot.overhead = sizer.asizeof(self)
+ if snapshot.asizeof_total:
+ snapshot.asizeof_total -= snapshot.overhead
+
+ self.snapshots.append(snapshot)
+
+ finally:
+ self.snapshot_lock.release()