/usr/lib/python2.7/dist-packages/zmq/utils/garbage.py is in python-zmq 14.0.1-1build2.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 | """Garbage collection thread for representing zmq refcount of Python objects
used in zero-copy sends.
"""
#-----------------------------------------------------------------------------
# Copyright (c) 2013 Brian E. Granger & Min Ragan-Kelley
#
# This file is part of pyzmq
#
# Distributed under the terms of the New BSD License. The full license is in
# the file COPYING.BSD, distributed as part of this software.
#-----------------------------------------------------------------------------
#-----------------------------------------------------------------------------
# Imports
#-----------------------------------------------------------------------------
import atexit
import struct
from os import getpid
from collections import namedtuple
from threading import Thread, Event, Lock
import zmq
#-----------------------------------------------------------------------------
# Code
#-----------------------------------------------------------------------------
gcref = namedtuple('gcref', ['obj', 'event'])
class GarbageCollectorThread(Thread):
"""Thread in which garbage collection actually happens."""
def __init__(self, gc):
super(GarbageCollectorThread, self).__init__()
self.gc = gc
self.daemon = True
self.pid = getpid()
self.ready = Event()
def run(self):
s = self.gc.context.socket(zmq.PULL)
s.linger = 0
s.bind(self.gc.url)
self.ready.set()
while True:
# detect fork
if getpid is None or getpid() != self.pid:
return
msg = s.recv()
if msg == b'DIE':
break
fmt = 'L' if len(msg) == 4 else 'Q'
key = struct.unpack(fmt, msg)[0]
tup = self.gc.refs.pop(key, None)
if tup and tup.event:
tup.event.set()
del tup
s.close()
class GarbageCollector(object):
"""PyZMQ Garbage Collector
Used for representing the reference held by libzmq during zero-copy sends.
This object holds a dictionary, keyed by Python id,
of the Python objects whose memory are currently in use by zeromq.
When zeromq is done with the memory, it sends a message on an inproc PUSH socket
containing the packed size_t (32 or 64-bit unsigned int),
which is the key in the dict.
When the PULL socket in the gc thread receives that message,
the reference is popped from the dict,
and any tracker events that should be signaled fire.
"""
context = None
refs = None
_finished = False
_lock = None
url = "inproc://pyzmq.gc.01"
def __init__(self):
super(GarbageCollector, self).__init__()
self.refs = {}
self.pid = None
self.thread = None
self.context = None
self._lock = Lock()
atexit.register(self.stop)
def stop(self):
"""stop the garbage-collection thread"""
if not self.is_alive():
return
push = self.context.socket(zmq.PUSH)
push.connect(self.url)
push.send(b'DIE')
push.close()
self.thread.join()
self.context.term()
self.refs.clear()
def start(self):
"""Start a new garbage collection thread.
Creates a new zmq Context used for garbage collection.
Under most circumstances, this will only be called once per process.
"""
self.pid = getpid()
self.context = zmq.Context()
self.refs = {}
self.thread = GarbageCollectorThread(self)
self.thread.start()
self.thread.ready.wait()
def is_alive(self):
"""Is the garbage collection thread currently running?
Includes checks for process shutdown or fork.
"""
if (getpid is None or
getpid() != self.pid or
self.thread is None or
not self.thread.is_alive()
):
return False
return True
def store(self, obj, event=None):
"""store an object and (optionally) event for zero-copy"""
if not self.is_alive():
# safely start the gc thread
# use lock and double check,
# so we don't start multiple threads
with self._lock:
if not self.is_alive():
self.start()
tup = gcref(obj, event)
theid = id(tup)
self.refs[theid] = tup
return theid
def __del__(self):
if not self.is_alive():
return
try:
self.stop()
except Exception as e:
raise (e)
gc = GarbageCollector()
|