/usr/lib/python3.6/multiprocessing/resource_sharer.py is in libpython3.6-stdlib 3.6.5-3.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 | #
# We use a background thread for sharing fds on Unix, and for sharing sockets on
# Windows.
#
# A client which wants to pickle a resource registers it with the resource
# sharer and gets an identifier in return. The unpickling process will connect
# to the resource sharer, sends the identifier and its pid, and then receives
# the resource.
#
import os
import signal
import socket
import sys
import threading
from . import process
from .context import reduction
from . import util
__all__ = ['stop']
if sys.platform == 'win32':
__all__ += ['DupSocket']
class DupSocket(object):
'''Picklable wrapper for a socket.'''
def __init__(self, sock):
new_sock = sock.dup()
def send(conn, pid):
share = new_sock.share(pid)
conn.send_bytes(share)
self._id = _resource_sharer.register(send, new_sock.close)
def detach(self):
'''Get the socket. This should only be called once.'''
with _resource_sharer.get_connection(self._id) as conn:
share = conn.recv_bytes()
return socket.fromshare(share)
else:
__all__ += ['DupFd']
class DupFd(object):
'''Wrapper for fd which can be used at any time.'''
def __init__(self, fd):
new_fd = os.dup(fd)
def send(conn, pid):
reduction.send_handle(conn, new_fd, pid)
def close():
os.close(new_fd)
self._id = _resource_sharer.register(send, close)
def detach(self):
'''Get the fd. This should only be called once.'''
with _resource_sharer.get_connection(self._id) as conn:
return reduction.recv_handle(conn)
class _ResourceSharer(object):
'''Manager for resouces using background thread.'''
def __init__(self):
self._key = 0
self._cache = {}
self._old_locks = []
self._lock = threading.Lock()
self._listener = None
self._address = None
self._thread = None
util.register_after_fork(self, _ResourceSharer._afterfork)
def register(self, send, close):
'''Register resource, returning an identifier.'''
with self._lock:
if self._address is None:
self._start()
self._key += 1
self._cache[self._key] = (send, close)
return (self._address, self._key)
@staticmethod
def get_connection(ident):
'''Return connection from which to receive identified resource.'''
from .connection import Client
address, key = ident
c = Client(address, authkey=process.current_process().authkey)
c.send((key, os.getpid()))
return c
def stop(self, timeout=None):
'''Stop the background thread and clear registered resources.'''
from .connection import Client
with self._lock:
if self._address is not None:
c = Client(self._address,
authkey=process.current_process().authkey)
c.send(None)
c.close()
self._thread.join(timeout)
if self._thread.is_alive():
util.sub_warning('_ResourceSharer thread did '
'not stop when asked')
self._listener.close()
self._thread = None
self._address = None
self._listener = None
for key, (send, close) in self._cache.items():
close()
self._cache.clear()
def _afterfork(self):
for key, (send, close) in self._cache.items():
close()
self._cache.clear()
# If self._lock was locked at the time of the fork, it may be broken
# -- see issue 6721. Replace it without letting it be gc'ed.
self._old_locks.append(self._lock)
self._lock = threading.Lock()
if self._listener is not None:
self._listener.close()
self._listener = None
self._address = None
self._thread = None
def _start(self):
from .connection import Listener
assert self._listener is None
util.debug('starting listener and thread for sending handles')
self._listener = Listener(authkey=process.current_process().authkey)
self._address = self._listener.address
t = threading.Thread(target=self._serve)
t.daemon = True
t.start()
self._thread = t
def _serve(self):
if hasattr(signal, 'pthread_sigmask'):
signal.pthread_sigmask(signal.SIG_BLOCK, range(1, signal.NSIG))
while 1:
try:
with self._listener.accept() as conn:
msg = conn.recv()
if msg is None:
break
key, destination_pid = msg
send, close = self._cache.pop(key)
try:
send(conn, destination_pid)
finally:
close()
except:
if not util.is_exiting():
sys.excepthook(*sys.exc_info())
_resource_sharer = _ResourceSharer()
stop = _resource_sharer.stop
|