/usr/lib/python2.7/dist-packages/billiard/reduction.py is in python-billiard 3.5.0.3-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 | #
# Module which deals with pickling of objects.
#
# multiprocessing/reduction.py
#
# Copyright (c) 2006-2008, R Oudkerk
# Licensed to PSF under a Contributor Agreement.
#
from __future__ import absolute_import
import functools
import io
import os
import pickle
import socket
import sys
from . import context
__all__ = ['send_handle', 'recv_handle', 'ForkingPickler', 'register', 'dump']
PY3 = sys.version_info[0] == 3
HAVE_SEND_HANDLE = (sys.platform == 'win32' or
(hasattr(socket, 'CMSG_LEN') and
hasattr(socket, 'SCM_RIGHTS') and
hasattr(socket.socket, 'sendmsg')))
#
# Pickler subclass
#
if PY3:
import copyreg
class ForkingPickler(pickle.Pickler):
'''Pickler subclass used by multiprocessing.'''
_extra_reducers = {}
_copyreg_dispatch_table = copyreg.dispatch_table
def __init__(self, *args):
super(ForkingPickler, self).__init__(*args)
self.dispatch_table = self._copyreg_dispatch_table.copy()
self.dispatch_table.update(self._extra_reducers)
@classmethod
def register(cls, type, reduce):
'''Register a reduce function for a type.'''
cls._extra_reducers[type] = reduce
@classmethod
def dumps(cls, obj, protocol=None):
buf = io.BytesIO()
cls(buf, protocol).dump(obj)
return buf.getbuffer()
@classmethod
def loadbuf(cls, buf, protocol=None):
return cls.loads(buf.getbuffer())
loads = pickle.loads
else:
class ForkingPickler(pickle.Pickler): # noqa
'''Pickler subclass used by multiprocessing.'''
dispatch = pickle.Pickler.dispatch.copy()
@classmethod
def register(cls, type, reduce):
'''Register a reduce function for a type.'''
def dispatcher(self, obj):
rv = reduce(obj)
self.save_reduce(obj=obj, *rv)
cls.dispatch[type] = dispatcher
@classmethod
def dumps(cls, obj, protocol=None):
buf = io.BytesIO()
cls(buf, protocol).dump(obj)
return buf.getvalue()
@classmethod
def loadbuf(cls, buf, protocol=None):
return cls.loads(buf.getvalue())
@classmethod
def loads(cls, buf, loads=pickle.loads):
if isinstance(buf, io.BytesIO):
buf = buf.getvalue()
return loads(buf)
register = ForkingPickler.register
def dump(obj, file, protocol=None):
'''Replacement for pickle.dump() using ForkingPickler.'''
ForkingPickler(file, protocol).dump(obj)
#
# Platform specific definitions
#
if sys.platform == 'win32':
# Windows
__all__ += ['DupHandle', 'duplicate', 'steal_handle']
from .compat import _winapi
def duplicate(handle, target_process=None, inheritable=False):
'''Duplicate a handle. (target_process is a handle not a pid!)'''
if target_process is None:
target_process = _winapi.GetCurrentProcess()
return _winapi.DuplicateHandle(
_winapi.GetCurrentProcess(), handle, target_process,
0, inheritable, _winapi.DUPLICATE_SAME_ACCESS)
def steal_handle(source_pid, handle):
'''Steal a handle from process identified by source_pid.'''
source_process_handle = _winapi.OpenProcess(
_winapi.PROCESS_DUP_HANDLE, False, source_pid)
try:
return _winapi.DuplicateHandle(
source_process_handle, handle,
_winapi.GetCurrentProcess(), 0, False,
_winapi.DUPLICATE_SAME_ACCESS | _winapi.DUPLICATE_CLOSE_SOURCE)
finally:
_winapi.CloseHandle(source_process_handle)
def send_handle(conn, handle, destination_pid):
'''Send a handle over a local connection.'''
dh = DupHandle(handle, _winapi.DUPLICATE_SAME_ACCESS, destination_pid)
conn.send(dh)
def recv_handle(conn):
'''Receive a handle over a local connection.'''
return conn.recv().detach()
class DupHandle(object):
'''Picklable wrapper for a handle.'''
def __init__(self, handle, access, pid=None):
if pid is None:
# We just duplicate the handle in the current process and
# let the receiving process steal the handle.
pid = os.getpid()
proc = _winapi.OpenProcess(_winapi.PROCESS_DUP_HANDLE, False, pid)
try:
self._handle = _winapi.DuplicateHandle(
_winapi.GetCurrentProcess(),
handle, proc, access, False, 0)
finally:
_winapi.CloseHandle(proc)
self._access = access
self._pid = pid
def detach(self):
'''Get the handle. This should only be called once.'''
# retrieve handle from process which currently owns it
if self._pid == os.getpid():
# The handle has already been duplicated for this process.
return self._handle
# We must steal the handle from the process whose pid is self._pid.
proc = _winapi.OpenProcess(_winapi.PROCESS_DUP_HANDLE, False,
self._pid)
try:
return _winapi.DuplicateHandle(
proc, self._handle, _winapi.GetCurrentProcess(),
self._access, False, _winapi.DUPLICATE_CLOSE_SOURCE)
finally:
_winapi.CloseHandle(proc)
else:
# Unix
__all__ += ['DupFd', 'sendfds', 'recvfds']
import array
# On macOS we should acknowledge receipt of fds -- see Issue14669
ACKNOWLEDGE = sys.platform == 'darwin'
def sendfds(sock, fds):
'''Send an array of fds over an AF_UNIX socket.'''
fds = array.array('i', fds)
msg = bytes([len(fds) % 256])
sock.sendmsg([msg], [(socket.SOL_SOCKET, socket.SCM_RIGHTS, fds)])
if ACKNOWLEDGE and sock.recv(1) != b'A':
raise RuntimeError('did not receive acknowledgement of fd')
def recvfds(sock, size):
'''Receive an array of fds over an AF_UNIX socket.'''
a = array.array('i')
bytes_size = a.itemsize * size
msg, ancdata, flags, addr = sock.recvmsg(
1, socket.CMSG_LEN(bytes_size),
)
if not msg and not ancdata:
raise EOFError
try:
if ACKNOWLEDGE:
sock.send(b'A')
if len(ancdata) != 1:
raise RuntimeError(
'received %d items of ancdata' % len(ancdata),
)
cmsg_level, cmsg_type, cmsg_data = ancdata[0]
if (cmsg_level == socket.SOL_SOCKET and
cmsg_type == socket.SCM_RIGHTS):
if len(cmsg_data) % a.itemsize != 0:
raise ValueError
a.frombytes(cmsg_data)
assert len(a) % 256 == msg[0]
return list(a)
except (ValueError, IndexError):
pass
raise RuntimeError('Invalid data received')
def send_handle(conn, handle, destination_pid): # noqa
'''Send a handle over a local connection.'''
fd = conn.fileno()
with socket.fromfd(fd, socket.AF_UNIX, socket.SOCK_STREAM) as s:
sendfds(s, [handle])
def recv_handle(conn): # noqa
'''Receive a handle over a local connection.'''
fd = conn.fileno()
with socket.fromfd(fd, socket.AF_UNIX, socket.SOCK_STREAM) as s:
return recvfds(s, 1)[0]
def DupFd(fd):
'''Return a wrapper for an fd.'''
popen_obj = context.get_spawning_popen()
if popen_obj is not None:
return popen_obj.DupFd(popen_obj.duplicate_for_child(fd))
elif HAVE_SEND_HANDLE:
from . import resource_sharer
return resource_sharer.DupFd(fd)
else:
raise ValueError('SCM_RIGHTS appears not to be available')
#
# Try making some callable types picklable
#
def _reduce_method(m):
if m.__self__ is None:
return getattr, (m.__class__, m.__func__.__name__)
else:
return getattr, (m.__self__, m.__func__.__name__)
class _C:
def f(self):
pass
register(type(_C().f), _reduce_method)
def _reduce_method_descriptor(m):
return getattr, (m.__objclass__, m.__name__)
register(type(list.append), _reduce_method_descriptor)
register(type(int.__add__), _reduce_method_descriptor)
def _reduce_partial(p):
return _rebuild_partial, (p.func, p.args, p.keywords or {})
def _rebuild_partial(func, args, keywords):
return functools.partial(func, *args, **keywords)
register(functools.partial, _reduce_partial)
#
# Make sockets picklable
#
if sys.platform == 'win32':
def _reduce_socket(s):
from .resource_sharer import DupSocket
return _rebuild_socket, (DupSocket(s),)
def _rebuild_socket(ds):
return ds.detach()
register(socket.socket, _reduce_socket)
else:
def _reduce_socket(s): # noqa
df = DupFd(s.fileno())
return _rebuild_socket, (df, s.family, s.type, s.proto)
def _rebuild_socket(df, family, type, proto): # noqa
fd = df.detach()
return socket.socket(family, type, proto, fileno=fd)
register(socket.socket, _reduce_socket)
|