This file is indexed.

/usr/lib/python2.7/dist-packages/scoop/_comm/scoopzmq.py is in python-scoop 0.7.1-3.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
#
#    This file is part of Scalable COncurrent Operations in Python (SCOOP).
#
#    SCOOP is free software: you can redistribute it and/or modify
#    it under the terms of the GNU Lesser General Public License as
#    published by the Free Software Foundation, either version 3 of
#    the License, or (at your option) any later version.
#
#    SCOOP is distributed in the hope that it will be useful,
#    but WITHOUT ANY WARRANTY; without even the implied warranty of
#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
#    GNU Lesser General Public License for more details.
#
#    You should have received a copy of the GNU Lesser General Public
#    License along with SCOOP. If not, see <http://www.gnu.org/licenses/>.
#
import time
import sys
import random
import socket
import copy
import logging
try:
    import cPickle as pickle
except ImportError:
    import pickle

import zmq

import scoop
from .. import shared, encapsulation, utils
from ..shared import SharedElementEncapsulation
from .scoopexceptions import Shutdown, ReferenceBroken


def CreateZMQSocket(sock_type):
    """Create a socket of the given sock_type and deactivate message dropping"""
    sock = ZMQCommunicator.context.socket(sock_type)
    sock.setsockopt(zmq.LINGER, 1000)

    # Remove message dropping
    sock.setsockopt(zmq.SNDHWM, 0)
    sock.setsockopt(zmq.RCVHWM, 0)

    # Don't accept unroutable messages
    if sock_type == zmq.ROUTER:
        sock.setsockopt(zmq.ROUTER_BEHAVIOR, 1)
    return sock


class ZMQCommunicator(object):
    """This class encapsulates the communication features toward the broker."""
    context = zmq.Context()

    def __init__(self):
        # TODO number of broker
        self.number_of_broker = float('inf')
        self.broker_set = set()

        # Get the current address of the interface facing the broker
        s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        s.connect((scoop.BROKER.externalHostname, scoop.BROKER.task_port))
        external_addr = s.getsockname()[0]
        s.close()

        if external_addr in utils.loopbackReferences:
            external_addr = scoop.BROKER.externalHostname

        # Create an inter-worker socket
        self.direct_socket_peers = []
        self.direct_socket = CreateZMQSocket(zmq.ROUTER)
        # TODO: This doesn't seems to be respected in the ROUTER socket
        self.direct_socket.setsockopt(zmq.SNDTIMEO, 0)
        # Code stolen from pyzmq's bind_to_random_port() from sugar/socket.py
        for i in range(100):
            try:
                self.direct_socket_port = random.randrange(49152, 65536)
                # Set current worker inter-worker socket name to its addr:port
                scoop.worker = "{addr}:{port}".format(
                    addr=external_addr,
                    port=self.direct_socket_port,
                ).encode()
                self.direct_socket.setsockopt(zmq.IDENTITY, scoop.worker)
                self.direct_socket.bind("tcp://*:{0}".format(
                    self.direct_socket_port,
                ))
            except:
                # Except on ZMQError with a check on EADDRINUSE should go here
                # but its definition is not consistent in pyzmq over multiple
                # versions
                pass
            else:
                break
        else:
            raise Exception("Could not create direct connection socket")

        # Update the logger to display our name
        try:
            scoop.logger.handlers[0].setFormatter(
                logging.Formatter(
                    "[%(asctime)-15s] %(module)-9s ({0}) %(levelname)-7s "
                    "%(message)s".format(scoop.worker)
                )
            )
        except IndexError:
            scoop.logger.debug(
                "Could not set worker name into logger ({0})".format(
                    scoop.worker
                )
            )

        # socket for the futures, replies and request
        self.socket = CreateZMQSocket(zmq.DEALER)
        self.socket.setsockopt(zmq.IDENTITY, scoop.worker)

        # socket for the shutdown signal
        self.infoSocket = CreateZMQSocket(zmq.SUB)
        
        # Set poller
        self.poller = zmq.Poller()
        self.poller.register(self.socket, zmq.POLLIN)
        self.poller.register(self.direct_socket, zmq.POLLIN)
        self.poller.register(self.infoSocket, zmq.POLLIN)

        self._addBroker(scoop.BROKER)

        # Send an INIT to get all previously set variables and share
        # current configuration to broker
        self.socket.send_multipart([
            b"INIT",
            pickle.dumps(scoop.CONFIGURATION)
        ])
        scoop.CONFIGURATION.update(pickle.loads(self.socket.recv()))
        inboundVariables = pickle.loads(self.socket.recv())
        shared.elements = dict([
            (pickle.loads(key),
                dict([(pickle.loads(varName),
                       pickle.loads(varValue))
                    for varName, varValue in value.items()
                ]))
                for key, value in inboundVariables.items()
        ])
        for broker in pickle.loads(self.socket.recv()):
            # Skip already connected brokers
            if broker in self.broker_set:
                continue
            self._addBroker(broker)

        self.OPEN = True

    def addPeer(self, peer):
        if peer not in self.direct_socket_peers:
            self.direct_socket_peers.append(peer)
            new_peer = "tcp://{0}".format(peer.decode("utf-8"))
            self.direct_socket.connect(new_peer)

    def _addBroker(self, brokerEntry):
        # Add a broker to the socket and the infosocket.
        broker_address = "tcp://{hostname}:{port}".format(
            hostname=brokerEntry.hostname,
            port=brokerEntry.task_port,
        )
        meta_address = "tcp://{hostname}:{port}".format(
            hostname=brokerEntry.hostname,
            port=brokerEntry.info_port,
        )
        self.socket.connect(broker_address)

        self.infoSocket.connect(meta_address)
        self.infoSocket.setsockopt(zmq.SUBSCRIBE, b"")

        self.broker_set.add(brokerEntry)

    def _poll(self, timeout):
        self.pumpInfoSocket()
        return self.poller.poll(timeout)

    def _recv(self):
        # Prioritize answers over new tasks
        if self.direct_socket.poll(0):
            router_msg = self.direct_socket.recv_multipart()
            # Remove the sender address
            msg = router_msg[1:] + [router_msg[0]]
        else:
            msg = self.socket.recv_multipart()
        
        try:
            thisFuture = pickle.loads(msg[1])
        except AttributeError as e:
            scoop.logger.error(
                "An instance could not find its base reference on a worker. "
                "Ensure that your objects have their definition available in "
                "the root scope of your program.\n{error}".format(
                    error=e,
                )
            )
            raise ReferenceBroken(e)

        if msg[0] == b"TASK":
            # Try to connect directly to this worker to send the result
            # afterwards if Future is from a map.
            if thisFuture.sendResultBack:
                self.addPeer(thisFuture.id.worker)

        isCallable = callable(thisFuture.callable)
        isDone = thisFuture._ended()
        if not isCallable and not isDone:
            # TODO: Also check in root module globals for fully qualified name
            try:
                module_found = hasattr(sys.modules["__main__"],
                                       thisFuture.callable)
            except TypeError:
                module_found = False
            if module_found:
                thisFuture.callable = getattr(sys.modules["__main__"],
                                              thisFuture.callable)
            else:
                raise ReferenceBroken("This element could not be pickled: "
                                      "{0}.".format(thisFuture))
        return thisFuture

    def pumpInfoSocket(self):
        while self.infoSocket.poll(0):
            msg = self.infoSocket.recv_multipart()
            if msg[0] == b"SHUTDOWN":
                if scoop.IS_ORIGIN is False:
                    raise Shutdown("Shutdown received")
                if not scoop.SHUTDOWN_REQUESTED:
                    scoop.logger.error(
                        "A worker exited unexpectedly. Read the worker logs "
                        "for more information. SCOOP pool will now shutdown."
                    )
                    raise Shutdown("Unexpected shutdown received")
            elif msg[0] == b"VARIABLE":
                key = pickle.loads(msg[3])
                varValue = pickle.loads(msg[2])
                varName = pickle.loads(msg[1])
                shared.elements.setdefault(key, {}).update({varName: varValue})
                self.convertVariable(key, varName, varValue)
            elif msg[0] == b"BROKER_INFO":
                # TODO: find out what to do here ...
                if len(self.broker_set) == 0: # The first update
                    self.broker_set.add(pickle.loads(msg[1]))
                if len(self.broker_set) < self.number_of_broker:
                    brokers = pickle.loads(msg[2])
                    needed = self.number_of_broker - len(self.broker_set)
                    try:
                        new_brokers = random.sample(brokers, needed)
                    except ValueError:
                        new_brokers = brokers
                        self.number_of_broker = len(self.broker_set) + len(new_brokers)
                        scoop.logger.warning(("The number of brokers could not be set"
                                        " on worker {0}. A total of {1} worker(s)"
                                        " were set.".format(scoop.worker,
                                                            self.number_of_broker)))

                    for broker in new_brokers:
                        broker_address = "tcp://" + broker.hostname + broker.task_port
                        meta_address = "tcp://" + broker.hostname + broker.info_port
                        self._addBroker(broker_address, meta_address)
                    self.broker_set.update(new_brokers)

    def convertVariable(self, key, varName, varValue):
        """Puts the function in the globals() of the main module."""
        if isinstance(varValue, encapsulation.FunctionEncapsulation):
            result = varValue.getFunction()

            # Update the global scope of the function to match the current module
            mainModule = sys.modules["__main__"]
            result.__name__ = varName
            result.__globals__.update(mainModule.__dict__)
            setattr(mainModule, varName, result)
            shared.elements[key].update({
                varName: result,
            })

    def recvFuture(self):
        while self._poll(0):
            received = self._recv()
            if received:
                yield received

    def sendFuture(self, future):
        """Send a Future to be executed remotely."""
        try:
            if shared.getConst(hash(future.callable),
                               timeout=0):
                # Enforce name reference passing if already shared
                future.callable = SharedElementEncapsulation(hash(future.callable))
            self.socket.send_multipart([b"TASK",
                                        pickle.dumps(future,
                                                     pickle.HIGHEST_PROTOCOL)])
        except pickle.PicklingError as e:
            # If element not picklable, pickle its name
            # TODO: use its fully qualified name
            scoop.logger.warn("Pickling Error: {0}".format(e))
            previousCallable = future.callable
            future.callable = hash(future.callable)
            self.socket.send_multipart([b"TASK",
                                        pickle.dumps(future,
                                                     pickle.HIGHEST_PROTOCOL)])
            future.callable = previousCallable

    def sendResult(self, future):
        """Send a terminated future back to its parent."""
        future = copy.copy(future)

        # Remove the (now) extraneous elements from future class
        future.callable = future.args = future.kargs = future.greenlet = None

        if not future.sendResultBack:
            # Don't reply back the result if it isn't asked
            future.resultValue = None

        self._sendReply(
            future.id.worker,
            pickle.dumps(
                future,
                pickle.HIGHEST_PROTOCOL,
            ),
        )

    def _sendReply(self, destination, *args):
        """Send a REPLY directly to its destination. If it doesn't work, launch
        it back to the broker."""
        # Try to send the result directly to its parent
        self.addPeer(destination)

        try:
            self.direct_socket.send_multipart([
                destination,
                b"REPLY",
            ] + list(args),
                flags=zmq.NOBLOCK)
        except zmq.error.ZMQError as e:
            # Fallback on Broker routing if no direct connection possible
            scoop.logger.debug(
                "{0}: Could not send result directly to peer {1}, routing through "
                "broker.".format(scoop.worker, destination)
            )
            self.socket.send_multipart([
                b"REPLY", 
                ] + list(args) + [
                destination,
            ])

    def sendVariable(self, key, value):
        self.socket.send_multipart([b"VARIABLE",
                                    pickle.dumps(key),
                                    pickle.dumps(value,
                                                 pickle.HIGHEST_PROTOCOL),
                                    pickle.dumps(scoop.worker,
                                                 pickle.HIGHEST_PROTOCOL)])

    def taskEnd(self, groupID, askResults=False):
        self.socket.send_multipart([
            b"TASKEND",
            pickle.dumps(
                askResults,
                pickle.HIGHEST_PROTOCOL
            ),
            pickle.dumps(
                groupID,
                pickle.HIGHEST_PROTOCOL
            ),
        ])

    def sendRequest(self):
        for _ in range(len(self.broker_set)):
            self.socket.send(b"REQUEST")

    def workerDown(self):
        self.socket.send(b"WORKERDOWN")

    def shutdown(self):
        """Sends a shutdown message to other workers."""
        if self.OPEN:
            self.OPEN = False
            scoop.SHUTDOWN_REQUESTED = True
            self.socket.send(b"SHUTDOWN")
            self.socket.close()
            self.infoSocket.close()
            time.sleep(0.3)