This file is indexed.

/usr/lib/python2.7/dist-packages/pyqtgraph/multiprocess/remoteproxy.py is in python-pyqtgraph 0.9.10-5.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

   1
   2
   3
   4
   5
   6
   7
   8
   9
  10
  11
  12
  13
  14
  15
  16
  17
  18
  19
  20
  21
  22
  23
  24
  25
  26
  27
  28
  29
  30
  31
  32
  33
  34
  35
  36
  37
  38
  39
  40
  41
  42
  43
  44
  45
  46
  47
  48
  49
  50
  51
  52
  53
  54
  55
  56
  57
  58
  59
  60
  61
  62
  63
  64
  65
  66
  67
  68
  69
  70
  71
  72
  73
  74
  75
  76
  77
  78
  79
  80
  81
  82
  83
  84
  85
  86
  87
  88
  89
  90
  91
  92
  93
  94
  95
  96
  97
  98
  99
 100
 101
 102
 103
 104
 105
 106
 107
 108
 109
 110
 111
 112
 113
 114
 115
 116
 117
 118
 119
 120
 121
 122
 123
 124
 125
 126
 127
 128
 129
 130
 131
 132
 133
 134
 135
 136
 137
 138
 139
 140
 141
 142
 143
 144
 145
 146
 147
 148
 149
 150
 151
 152
 153
 154
 155
 156
 157
 158
 159
 160
 161
 162
 163
 164
 165
 166
 167
 168
 169
 170
 171
 172
 173
 174
 175
 176
 177
 178
 179
 180
 181
 182
 183
 184
 185
 186
 187
 188
 189
 190
 191
 192
 193
 194
 195
 196
 197
 198
 199
 200
 201
 202
 203
 204
 205
 206
 207
 208
 209
 210
 211
 212
 213
 214
 215
 216
 217
 218
 219
 220
 221
 222
 223
 224
 225
 226
 227
 228
 229
 230
 231
 232
 233
 234
 235
 236
 237
 238
 239
 240
 241
 242
 243
 244
 245
 246
 247
 248
 249
 250
 251
 252
 253
 254
 255
 256
 257
 258
 259
 260
 261
 262
 263
 264
 265
 266
 267
 268
 269
 270
 271
 272
 273
 274
 275
 276
 277
 278
 279
 280
 281
 282
 283
 284
 285
 286
 287
 288
 289
 290
 291
 292
 293
 294
 295
 296
 297
 298
 299
 300
 301
 302
 303
 304
 305
 306
 307
 308
 309
 310
 311
 312
 313
 314
 315
 316
 317
 318
 319
 320
 321
 322
 323
 324
 325
 326
 327
 328
 329
 330
 331
 332
 333
 334
 335
 336
 337
 338
 339
 340
 341
 342
 343
 344
 345
 346
 347
 348
 349
 350
 351
 352
 353
 354
 355
 356
 357
 358
 359
 360
 361
 362
 363
 364
 365
 366
 367
 368
 369
 370
 371
 372
 373
 374
 375
 376
 377
 378
 379
 380
 381
 382
 383
 384
 385
 386
 387
 388
 389
 390
 391
 392
 393
 394
 395
 396
 397
 398
 399
 400
 401
 402
 403
 404
 405
 406
 407
 408
 409
 410
 411
 412
 413
 414
 415
 416
 417
 418
 419
 420
 421
 422
 423
 424
 425
 426
 427
 428
 429
 430
 431
 432
 433
 434
 435
 436
 437
 438
 439
 440
 441
 442
 443
 444
 445
 446
 447
 448
 449
 450
 451
 452
 453
 454
 455
 456
 457
 458
 459
 460
 461
 462
 463
 464
 465
 466
 467
 468
 469
 470
 471
 472
 473
 474
 475
 476
 477
 478
 479
 480
 481
 482
 483
 484
 485
 486
 487
 488
 489
 490
 491
 492
 493
 494
 495
 496
 497
 498
 499
 500
 501
 502
 503
 504
 505
 506
 507
 508
 509
 510
 511
 512
 513
 514
 515
 516
 517
 518
 519
 520
 521
 522
 523
 524
 525
 526
 527
 528
 529
 530
 531
 532
 533
 534
 535
 536
 537
 538
 539
 540
 541
 542
 543
 544
 545
 546
 547
 548
 549
 550
 551
 552
 553
 554
 555
 556
 557
 558
 559
 560
 561
 562
 563
 564
 565
 566
 567
 568
 569
 570
 571
 572
 573
 574
 575
 576
 577
 578
 579
 580
 581
 582
 583
 584
 585
 586
 587
 588
 589
 590
 591
 592
 593
 594
 595
 596
 597
 598
 599
 600
 601
 602
 603
 604
 605
 606
 607
 608
 609
 610
 611
 612
 613
 614
 615
 616
 617
 618
 619
 620
 621
 622
 623
 624
 625
 626
 627
 628
 629
 630
 631
 632
 633
 634
 635
 636
 637
 638
 639
 640
 641
 642
 643
 644
 645
 646
 647
 648
 649
 650
 651
 652
 653
 654
 655
 656
 657
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
import os, time, sys, traceback, weakref
import numpy as np
import threading
try:
    import __builtin__ as builtins
    import cPickle as pickle
except ImportError:
    import builtins
    import pickle

# color printing for debugging
from ..util import cprint

class ClosedError(Exception):
    """Raised when an event handler receives a request to close the connection
    or discovers that the connection has been closed."""
    pass

class NoResultError(Exception):
    """Raised when a request for the return value of a remote call fails
    because the call has not yet returned."""
    pass

    
class RemoteEventHandler(object):
    """
    This class handles communication between two processes. One instance is present on 
    each process and listens for communication from the other process. This enables
    (amongst other things) ObjectProxy instances to look up their attributes and call 
    their methods.
    
    This class is responsible for carrying out actions on behalf of the remote process.
    Each instance holds one end of a Connection which allows python
    objects to be passed between processes.
    
    For the most common operations, see _import(), close(), and transfer()
    
    To handle and respond to incoming requests, RemoteEventHandler requires that its
    processRequests method is called repeatedly (this is usually handled by the Process
    classes defined in multiprocess.processes).
    
    
    
    
    """
    handlers = {}   ## maps {process ID : handler}. This allows unpickler to determine which process
                    ## an object proxy belongs to
                         
    def __init__(self, connection, name, pid, debug=False):
        self.debug = debug
        self.conn = connection
        self.name = name
        self.results = {} ## reqId: (status, result); cache of request results received from the remote process
                          ## status is either 'result' or 'error'
                          ##   if 'error', then result will be (exception, formatted exceprion)
                          ##   where exception may be None if it could not be passed through the Connection.
        self.resultLock = threading.RLock()
                          
        self.proxies = {} ## maps {weakref(proxy): proxyId}; used to inform the remote process when a proxy has been deleted.
        self.proxyLock = threading.RLock()
        
        ## attributes that affect the behavior of the proxy. 
        ## See ObjectProxy._setProxyOptions for description
        self.proxyOptions = {
            'callSync': 'sync',      ## 'sync', 'async', 'off'
            'timeout': 10,           ## float
            'returnType': 'auto',    ## 'proxy', 'value', 'auto'
            'autoProxy': False,      ## bool
            'deferGetattr': False,   ## True, False
            'noProxyTypes': [ type(None), str, int, float, tuple, list, dict, LocalObjectProxy, ObjectProxy ],
        }
        self.optsLock = threading.RLock()
        
        self.nextRequestId = 0
        self.exited = False
        
        # Mutexes to help prevent issues when multiple threads access the same RemoteEventHandler
        self.processLock = threading.RLock()
        self.sendLock = threading.RLock()
        
        RemoteEventHandler.handlers[pid] = self  ## register this handler as the one communicating with pid
    
    @classmethod
    def getHandler(cls, pid):
        try:
            return cls.handlers[pid]
        except:
            print(pid, cls.handlers)
            raise
    
    def debugMsg(self, msg):
        if not self.debug:
            return
        cprint.cout(self.debug, "[%d] %s\n" % (os.getpid(), str(msg)), -1) 
    
    def getProxyOption(self, opt):
        with self.optsLock:
            return self.proxyOptions[opt]
        
    def setProxyOptions(self, **kwds):
        """
        Set the default behavior options for object proxies.
        See ObjectProxy._setProxyOptions for more info.
        """
        with self.optsLock:
            self.proxyOptions.update(kwds)
    
    def processRequests(self):
        """Process all pending requests from the pipe, return
        after no more events are immediately available. (non-blocking)
        Returns the number of events processed.
        """
        with self.processLock:
            
            if self.exited:
                self.debugMsg('  processRequests: exited already; raise ClosedError.')
                raise ClosedError()
            
            numProcessed = 0
            
            while self.conn.poll():
                #try:
                    #poll = self.conn.poll()
                    #if not poll:
                        #break
                #except IOError:  # this can happen if the remote process dies.
                                ## might it also happen in other circumstances?
                    #raise ClosedError()
                        
                try:
                    self.handleRequest()
                    numProcessed += 1
                except ClosedError:
                    self.debugMsg('processRequests: got ClosedError from handleRequest; setting exited=True.')
                    self.exited = True
                    raise
                #except IOError as err:  ## let handleRequest take care of this.
                    #self.debugMsg('  got IOError from handleRequest; try again.')
                    #if err.errno == 4:  ## interrupted system call; try again
                        #continue
                    #else:
                        #raise
                except:
                    print("Error in process %s" % self.name)
                    sys.excepthook(*sys.exc_info())
                    
            if numProcessed > 0:
                self.debugMsg('processRequests: finished %d requests' % numProcessed)
            return numProcessed
    
    def handleRequest(self):
        """Handle a single request from the remote process. 
        Blocks until a request is available."""
        result = None
        while True:
            try:
                ## args, kwds are double-pickled to ensure this recv() call never fails                
                cmd, reqId, nByteMsgs, optStr = self.conn.recv() 
                break
            except EOFError:
                self.debugMsg('  handleRequest: got EOFError from recv; raise ClosedError.')
                ## remote process has shut down; end event loop
                raise ClosedError()
            except IOError as err:
                if err.errno == 4:  ## interrupted system call; try again
                    self.debugMsg('  handleRequest: got IOError 4 from recv; try again.')
                    continue
                else:
                    self.debugMsg('  handleRequest: got IOError %d from recv (%s); raise ClosedError.' % (err.errno, err.strerror))
                    raise ClosedError()
        
        self.debugMsg("  handleRequest: received %s %s" % (str(cmd), str(reqId)))
            
        ## read byte messages following the main request
        byteData = []
        if nByteMsgs > 0:
            self.debugMsg("    handleRequest: reading %d byte messages" % nByteMsgs)
        for i in range(nByteMsgs):
            while True:
                try:
                    byteData.append(self.conn.recv_bytes())
                    break
                except EOFError:
                    self.debugMsg("    handleRequest: got EOF while reading byte messages; raise ClosedError.")
                    raise ClosedError()
                except IOError as err:
                    if err.errno == 4:
                        self.debugMsg("    handleRequest: got IOError 4 while reading byte messages; try again.")
                        continue
                    else:
                        self.debugMsg("    handleRequest: got IOError while reading byte messages; raise ClosedError.")
                        raise ClosedError()
            
        
        try:
            if cmd == 'result' or cmd == 'error':
                resultId = reqId
                reqId = None  ## prevents attempt to return information from this request
                              ## (this is already a return from a previous request)
            
            opts = pickle.loads(optStr)
            self.debugMsg("    handleRequest: id=%s opts=%s" % (str(reqId), str(opts)))
            #print os.getpid(), "received request:", cmd, reqId, opts
            returnType = opts.get('returnType', 'auto')
            
            if cmd == 'result':
                with self.resultLock:
                    self.results[resultId] = ('result', opts['result'])
            elif cmd == 'error':
                with self.resultLock:
                    self.results[resultId] = ('error', (opts['exception'], opts['excString']))
            elif cmd == 'getObjAttr':
                result = getattr(opts['obj'], opts['attr'])
            elif cmd == 'callObj':
                obj = opts['obj']
                fnargs = opts['args']
                fnkwds = opts['kwds']
                
                ## If arrays were sent as byte messages, they must be re-inserted into the 
                ## arguments
                if len(byteData) > 0:
                    for i,arg in enumerate(fnargs):
                        if isinstance(arg, tuple) and len(arg) > 0 and arg[0] == '__byte_message__':
                            ind = arg[1]
                            dtype, shape = arg[2]
                            fnargs[i] = np.fromstring(byteData[ind], dtype=dtype).reshape(shape)
                    for k,arg in fnkwds.items():
                        if isinstance(arg, tuple) and len(arg) > 0 and arg[0] == '__byte_message__':
                            ind = arg[1]
                            dtype, shape = arg[2]
                            fnkwds[k] = np.fromstring(byteData[ind], dtype=dtype).reshape(shape)
                
                if len(fnkwds) == 0:  ## need to do this because some functions do not allow keyword arguments.
                    try:
                        result = obj(*fnargs)
                    except:
                        print("Failed to call object %s: %d, %s" % (obj, len(fnargs), fnargs[1:]))
                        raise
                else:
                    result = obj(*fnargs, **fnkwds)
                    
            elif cmd == 'getObjValue':
                result = opts['obj']  ## has already been unpickled into its local value
                returnType = 'value'
            elif cmd == 'transfer':
                result = opts['obj']
                returnType = 'proxy'
            elif cmd == 'transferArray':
                ## read array data from next message:
                result = np.fromstring(byteData[0], dtype=opts['dtype']).reshape(opts['shape'])
                returnType = 'proxy'
            elif cmd == 'import':
                name = opts['module']
                fromlist = opts.get('fromlist', [])
                mod = builtins.__import__(name, fromlist=fromlist)
                
                if len(fromlist) == 0:
                    parts = name.lstrip('.').split('.')
                    result = mod
                    for part in parts[1:]:
                        result = getattr(result, part)
                else:
                    result = map(mod.__getattr__, fromlist)
                
            elif cmd == 'del':
                LocalObjectProxy.releaseProxyId(opts['proxyId'])
                #del self.proxiedObjects[opts['objId']]
                
            elif cmd == 'close':
                if reqId is not None:
                    result = True
                    returnType = 'value'
                    
            exc = None
        except:
            exc = sys.exc_info()

            
            
        if reqId is not None:
            if exc is None:
                self.debugMsg("    handleRequest: sending return value for %d: %s" % (reqId, str(result))) 
                #print "returnValue:", returnValue, result
                if returnType == 'auto':
                    with self.optsLock:
                        noProxyTypes = self.proxyOptions['noProxyTypes']
                    result = self.autoProxy(result, noProxyTypes)
                elif returnType == 'proxy':
                    result = LocalObjectProxy(result)
                
                try:
                    self.replyResult(reqId, result)
                except:
                    sys.excepthook(*sys.exc_info())
                    self.replyError(reqId, *sys.exc_info())
            else:
                self.debugMsg("    handleRequest: returning exception for %d" % reqId) 
                self.replyError(reqId, *exc)
                    
        elif exc is not None:
            sys.excepthook(*exc)
    
        if cmd == 'close':
            if opts.get('noCleanup', False) is True:
                os._exit(0)  ## exit immediately, do not pass GO, do not collect $200.
                             ## (more importantly, do not call any code that would
                             ## normally be invoked at exit)
            else:
                raise ClosedError()
        
    
    
    def replyResult(self, reqId, result):
        self.send(request='result', reqId=reqId, callSync='off', opts=dict(result=result))
    
    def replyError(self, reqId, *exc):
        print("error: %s %s %s" % (self.name, str(reqId), str(exc[1])))
        excStr = traceback.format_exception(*exc)
        try:
            self.send(request='error', reqId=reqId, callSync='off', opts=dict(exception=exc[1], excString=excStr))
        except:
            self.send(request='error', reqId=reqId, callSync='off', opts=dict(exception=None, excString=excStr))
    
    def send(self, request, opts=None, reqId=None, callSync='sync', timeout=10, returnType=None, byteData=None, **kwds):
        """Send a request or return packet to the remote process.
        Generally it is not necessary to call this method directly; it is for internal use.
        (The docstring has information that is nevertheless useful to the programmer
        as it describes the internal protocol used to communicate between processes)
        
        ==============  ====================================================================
        **Arguments:**
        request         String describing the type of request being sent (see below)
        reqId           Integer uniquely linking a result back to the request that generated
                        it. (most requests leave this blank)
        callSync        'sync':  return the actual result of the request
                        'async': return a Request object which can be used to look up the
                                result later
                        'off':   return no result
        timeout         Time in seconds to wait for a response when callSync=='sync'
        opts            Extra arguments sent to the remote process that determine the way
                        the request will be handled (see below)
        returnType      'proxy', 'value', or 'auto'
        byteData        If specified, this is a list of objects to be sent as byte messages
                        to the remote process.
                        This is used to send large arrays without the cost of pickling.
        ==============  ====================================================================
        
        Description of request strings and options allowed for each:
        
        =============  =============  ========================================================
        request        option         description
        -------------  -------------  --------------------------------------------------------
        getObjAttr                    Request the remote process return (proxy to) an
                                      attribute of an object.
                       obj            reference to object whose attribute should be 
                                      returned
                       attr           string name of attribute to return
                       returnValue    bool or 'auto' indicating whether to return a proxy or
                                      the actual value. 
                       
        callObj                       Request the remote process call a function or 
                                      method. If a request ID is given, then the call's
                                      return value will be sent back (or information
                                      about the error that occurred while running the
                                      function)
                       obj            the (reference to) object to call
                       args           tuple of arguments to pass to callable
                       kwds           dict of keyword arguments to pass to callable
                       returnValue    bool or 'auto' indicating whether to return a proxy or
                                      the actual value. 
                       
        getObjValue                   Request the remote process return the value of
                                      a proxied object (must be picklable)
                       obj            reference to object whose value should be returned
                       
        transfer                      Copy an object to the remote process and request
                                      it return a proxy for the new object.
                       obj            The object to transfer.
                       
        import                        Request the remote process import new symbols
                                      and return proxy(ies) to the imported objects
                       module         the string name of the module to import
                       fromlist       optional list of string names to import from module
                       
        del                           Inform the remote process that a proxy has been 
                                      released (thus the remote process may be able to 
                                      release the original object)
                       proxyId        id of proxy which is no longer referenced by 
                                      remote host
                                      
        close                         Instruct the remote process to stop its event loop
                                      and exit. Optionally, this request may return a 
                                      confirmation.
            
        result                        Inform the remote process that its request has 
                                      been processed                        
                       result         return value of a request
                       
        error                         Inform the remote process that its request failed
                       exception      the Exception that was raised (or None if the 
                                      exception could not be pickled)
                       excString      string-formatted version of the exception and 
                                      traceback
        =============  =====================================================================
        """
        if self.exited:
            self.debugMsg('  send: exited already; raise ClosedError.')
            raise ClosedError()
        
        with self.sendLock:
            #if len(kwds) > 0:
                #print "Warning: send() ignored args:", kwds
                
            if opts is None:
                opts = {}
            
            assert callSync in ['off', 'sync', 'async'], 'callSync must be one of "off", "sync", or "async"'
            if reqId is None:
                if callSync != 'off': ## requested return value; use the next available request ID
                    reqId = self.nextRequestId
                    self.nextRequestId += 1
            else:
                ## If requestId is provided, this _must_ be a response to a previously received request.
                assert request in ['result', 'error']
            
            if returnType is not None:
                opts['returnType'] = returnType
                
            #print os.getpid(), "send request:", request, reqId, opts
            
            ## double-pickle args to ensure that at least status and request ID get through
            try:
                optStr = pickle.dumps(opts)
            except:
                print("====  Error pickling this object:  ====")
                print(opts)
                print("=======================================")
                raise
            
            nByteMsgs = 0
            if byteData is not None:
                nByteMsgs = len(byteData)
                
            ## Send primary request
            request = (request, reqId, nByteMsgs, optStr)
            self.debugMsg('send request: cmd=%s nByteMsgs=%d id=%s opts=%s' % (str(request[0]), nByteMsgs, str(reqId), str(opts)))
            self.conn.send(request)
            
            ## follow up by sending byte messages
            if byteData is not None:
                for obj in byteData:  ## Remote process _must_ be prepared to read the same number of byte messages!
                    self.conn.send_bytes(obj)
                self.debugMsg('  sent %d byte messages' % len(byteData))
            
            self.debugMsg('  call sync: %s' % callSync)
            if callSync == 'off':
                return
            
        req = Request(self, reqId, description=str(request), timeout=timeout)
        if callSync == 'async':
            return req
            
        if callSync == 'sync':
            try:
                return req.result()
            except NoResultError:
                return req
        
    def close(self, callSync='off', noCleanup=False, **kwds):
        try:
            self.send(request='close', opts=dict(noCleanup=noCleanup), callSync=callSync, **kwds)
            self.exited = True
        except ClosedError:
            pass
    
    def getResult(self, reqId):
        ## raises NoResultError if the result is not available yet
        #print self.results.keys(), os.getpid()
        with self.resultLock:
            haveResult = reqId in self.results
        
        if not haveResult:
            try:
                self.processRequests()
            except ClosedError:  ## even if remote connection has closed, we may have 
                                 ## received new data during this call to processRequests()
                pass
        
        with self.resultLock:
            if reqId not in self.results:
                raise NoResultError()
            status, result = self.results.pop(reqId)
        
        if status == 'result': 
            return result
        elif status == 'error':
            #print ''.join(result)
            exc, excStr = result
            if exc is not None:
                print("===== Remote process raised exception on request: =====")
                print(''.join(excStr))
                print("===== Local Traceback to request follows: =====")
                raise exc
            else:
                print(''.join(excStr))
                raise Exception("Error getting result. See above for exception from remote process.")
                
        else:
            raise Exception("Internal error.")
    
    def _import(self, mod, **kwds):
        """
        Request the remote process import a module (or symbols from a module)
        and return the proxied results. Uses built-in __import__() function, but 
        adds a bit more processing:
        
            _import('module')  =>  returns module
            _import('module.submodule')  =>  returns submodule 
                                             (note this differs from behavior of __import__)
            _import('module', fromlist=[name1, name2, ...])  =>  returns [module.name1, module.name2, ...]
                                             (this also differs from behavior of __import__)
            
        """
        return self.send(request='import', callSync='sync', opts=dict(module=mod), **kwds)
        
    def getObjAttr(self, obj, attr, **kwds):
        return self.send(request='getObjAttr', opts=dict(obj=obj, attr=attr), **kwds)
        
    def getObjValue(self, obj, **kwds):
        return self.send(request='getObjValue', opts=dict(obj=obj), **kwds)
        
    def callObj(self, obj, args, kwds, **opts):
        opts = opts.copy()
        args = list(args)
        
        ## Decide whether to send arguments by value or by proxy
        with self.optsLock:
            noProxyTypes = opts.pop('noProxyTypes', None)
            if noProxyTypes is None:
                noProxyTypes = self.proxyOptions['noProxyTypes']
                
            autoProxy = opts.pop('autoProxy', self.proxyOptions['autoProxy'])
        
        if autoProxy is True:
            args = [self.autoProxy(v, noProxyTypes) for v in args]
            for k, v in kwds.iteritems():
                opts[k] = self.autoProxy(v, noProxyTypes)
        
        byteMsgs = []
        
        ## If there are arrays in the arguments, send those as byte messages.
        ## We do this because pickling arrays is too expensive.
        for i,arg in enumerate(args):
            if arg.__class__ == np.ndarray:
                args[i] = ("__byte_message__", len(byteMsgs), (arg.dtype, arg.shape))
                byteMsgs.append(arg)
        for k,v in kwds.items():
            if v.__class__ == np.ndarray:
                kwds[k] = ("__byte_message__", len(byteMsgs), (v.dtype, v.shape))
                byteMsgs.append(v)
        
        return self.send(request='callObj', opts=dict(obj=obj, args=args, kwds=kwds), byteData=byteMsgs, **opts)

    def registerProxy(self, proxy):
        with self.proxyLock:
            ref = weakref.ref(proxy, self.deleteProxy)
            self.proxies[ref] = proxy._proxyId
    
    def deleteProxy(self, ref):
        with self.proxyLock:
            proxyId = self.proxies.pop(ref)
            
        try:
            self.send(request='del', opts=dict(proxyId=proxyId), callSync='off')
        except IOError:  ## if remote process has closed down, there is no need to send delete requests anymore
            pass

    def transfer(self, obj, **kwds):
        """
        Transfer an object by value to the remote host (the object must be picklable) 
        and return a proxy for the new remote object.
        """
        if obj.__class__ is np.ndarray:
            opts = {'dtype': obj.dtype, 'shape': obj.shape}
            return self.send(request='transferArray', opts=opts, byteData=[obj], **kwds)            
        else:
            return self.send(request='transfer', opts=dict(obj=obj), **kwds)
        
    def autoProxy(self, obj, noProxyTypes):
        ## Return object wrapped in LocalObjectProxy _unless_ its type is in noProxyTypes.
        for typ in noProxyTypes:
            if isinstance(obj, typ):
                return obj
        return LocalObjectProxy(obj)
        
        
class Request(object):
    """
    Request objects are returned when calling an ObjectProxy in asynchronous mode
    or if a synchronous call has timed out. Use hasResult() to ask whether
    the result of the call has been returned yet. Use result() to get
    the returned value.
    """
    def __init__(self, process, reqId, description=None, timeout=10):
        self.proc = process
        self.description = description
        self.reqId = reqId
        self.gotResult = False
        self._result = None
        self.timeout = timeout
        
    def result(self, block=True, timeout=None):
        """
        Return the result for this request. 
        
        If block is True, wait until the result has arrived or *timeout* seconds passes.
        If the timeout is reached, raise NoResultError. (use timeout=None to disable)
        If block is False, raise NoResultError immediately if the result has not arrived yet.
        
        If the process's connection has closed before the result arrives, raise ClosedError.
        """
        
        if self.gotResult:
            return self._result
            
        if timeout is None:
            timeout = self.timeout 
        
        if block:
            start = time.time()
            while not self.hasResult():
                if self.proc.exited:
                    raise ClosedError()
                time.sleep(0.005)
                if timeout >= 0 and time.time() - start > timeout:
                    print("Request timed out: %s" % self.description)
                    import traceback
                    traceback.print_stack()
                    raise NoResultError()
            return self._result
        else:
            self._result = self.proc.getResult(self.reqId)  ## raises NoResultError if result is not available yet
            self.gotResult = True
            return self._result
        
    def hasResult(self):
        """Returns True if the result for this request has arrived."""
        try:
            self.result(block=False)
        except NoResultError:
            pass
        
        return self.gotResult

class LocalObjectProxy(object):
    """
    Used for wrapping local objects to ensure that they are send by proxy to a remote host.
    Note that 'proxy' is just a shorter alias for LocalObjectProxy.
    
    For example::
    
        data = [1,2,3,4,5]
        remotePlot.plot(data)         ## by default, lists are pickled and sent by value
        remotePlot.plot(proxy(data))  ## force the object to be sent by proxy
    
    """
    nextProxyId = 0
    proxiedObjects = {}  ## maps {proxyId: object}
    
    
    @classmethod
    def registerObject(cls, obj):
        ## assign it a unique ID so we can keep a reference to the local object
        
        pid = cls.nextProxyId
        cls.nextProxyId += 1
        cls.proxiedObjects[pid] = obj
        #print "register:", cls.proxiedObjects
        return pid
    
    @classmethod
    def lookupProxyId(cls, pid):
        return cls.proxiedObjects[pid]
    
    @classmethod
    def releaseProxyId(cls, pid):
        del cls.proxiedObjects[pid]
        #print "release:", cls.proxiedObjects 
    
    def __init__(self, obj, **opts):
        """
        Create a 'local' proxy object that, when sent to a remote host,
        will appear as a normal ObjectProxy to *obj*. 
        Any extra keyword arguments are passed to proxy._setProxyOptions()
        on the remote side.
        """
        self.processId = os.getpid()
        #self.objectId = id(obj)
        self.typeStr = repr(obj)
        #self.handler = handler
        self.obj = obj
        self.opts = opts
        
    def __reduce__(self):
        ## a proxy is being pickled and sent to a remote process.
        ## every time this happens, a new proxy will be generated in the remote process,
        ## so we keep a new ID so we can track when each is released.
        pid = LocalObjectProxy.registerObject(self.obj)
        return (unpickleObjectProxy, (self.processId, pid, self.typeStr, None, self.opts))
        
## alias
proxy = LocalObjectProxy

def unpickleObjectProxy(processId, proxyId, typeStr, attributes=None, opts=None):
    if processId == os.getpid():
        obj = LocalObjectProxy.lookupProxyId(proxyId)
        if attributes is not None:
            for attr in attributes:
                obj = getattr(obj, attr)
        return obj
    else:
        proxy = ObjectProxy(processId, proxyId=proxyId, typeStr=typeStr)
        if opts is not None:
            proxy._setProxyOptions(**opts)
        return proxy
    
class ObjectProxy(object):
    """
    Proxy to an object stored by the remote process. Proxies are created
    by calling Process._import(), Process.transfer(), or by requesting/calling
    attributes on existing proxy objects.
    
    For the most part, this object can be used exactly as if it
    were a local object::
    
        rsys = proc._import('sys')   # returns proxy to sys module on remote process
        rsys.stdout                  # proxy to remote sys.stdout
        rsys.stdout.write            # proxy to remote sys.stdout.write
        rsys.stdout.write('hello')   # calls sys.stdout.write('hello') on remote machine
                                     # and returns the result (None)
    
    When calling a proxy to a remote function, the call can be made synchronous
    (result of call is returned immediately), asynchronous (result is returned later),
    or return can be disabled entirely::
    
        ros = proc._import('os')
        
        ## synchronous call; result is returned immediately
        pid = ros.getpid()
        
        ## asynchronous call
        request = ros.getpid(_callSync='async')
        while not request.hasResult():
            time.sleep(0.01)
        pid = request.result()
        
        ## disable return when we know it isn't needed
        rsys.stdout.write('hello', _callSync='off')
    
    Additionally, values returned from a remote function call are automatically
    returned either by value (must be picklable) or by proxy. 
    This behavior can be forced::
    
        rnp = proc._import('numpy')
        arrProxy = rnp.array([1,2,3,4], _returnType='proxy')
        arrValue = rnp.array([1,2,3,4], _returnType='value')
    
    The default callSync and returnType behaviors (as well as others) can be set 
    for each proxy individually using ObjectProxy._setProxyOptions() or globally using 
    proc.setProxyOptions(). 
    
    """
    def __init__(self, processId, proxyId, typeStr='', parent=None):
        object.__init__(self)
        ## can't set attributes directly because setattr is overridden.
        self.__dict__['_processId'] = processId
        self.__dict__['_typeStr'] = typeStr
        self.__dict__['_proxyId'] = proxyId
        self.__dict__['_attributes'] = ()
        ## attributes that affect the behavior of the proxy. 
        ## in all cases, a value of None causes the proxy to ask
        ## its parent event handler to make the decision
        self.__dict__['_proxyOptions'] = {
            'callSync': None,      ## 'sync', 'async', None 
            'timeout': None,       ## float, None
            'returnType': None,    ## 'proxy', 'value', 'auto', None
            'deferGetattr': None,  ## True, False, None
            'noProxyTypes': None,  ## list of types to send by value instead of by proxy
        }
        
        self.__dict__['_handler'] = RemoteEventHandler.getHandler(processId)
        self.__dict__['_handler'].registerProxy(self)  ## handler will watch proxy; inform remote process when the proxy is deleted.
    
    def _setProxyOptions(self, **kwds):
        """
        Change the behavior of this proxy. For all options, a value of None
        will cause the proxy to instead use the default behavior defined
        by its parent Process.
        
        Options are:
        
        =============  =============================================================
        callSync       'sync', 'async', 'off', or None. 
                       If 'async', then calling methods will return a Request object
                       which can be used to inquire later about the result of the 
                       method call.
                       If 'sync', then calling a method
                       will block until the remote process has returned its result
                       or the timeout has elapsed (in this case, a Request object
                       is returned instead).
                       If 'off', then the remote process is instructed _not_ to 
                       reply and the method call will return None immediately.
        returnType     'auto', 'proxy', 'value', or None. 
                       If 'proxy', then the value returned when calling a method
                       will be a proxy to the object on the remote process.
                       If 'value', then attempt to pickle the returned object and
                       send it back.
                       If 'auto', then the decision is made by consulting the
                       'noProxyTypes' option.
        autoProxy      bool or None. If True, arguments to __call__ are 
                       automatically converted to proxy unless their type is 
                       listed in noProxyTypes (see below). If False, arguments
                       are left untouched. Use proxy(obj) to manually convert
                       arguments before sending. 
        timeout        float or None. Length of time to wait during synchronous 
                       requests before returning a Request object instead.
        deferGetattr   True, False, or None. 
                       If False, all attribute requests will be sent to the remote 
                       process immediately and will block until a response is
                       received (or timeout has elapsed).
                       If True, requesting an attribute from the proxy returns a
                       new proxy immediately. The remote process is _not_ contacted
                       to make this request. This is faster, but it is possible to 
                       request an attribute that does not exist on the proxied
                       object. In this case, AttributeError will not be raised
                       until an attempt is made to look up the attribute on the
                       remote process.
        noProxyTypes   List of object types that should _not_ be proxied when
                       sent to the remote process.
        =============  =============================================================
        """
        self._proxyOptions.update(kwds)
    
    def _getValue(self):
        """
        Return the value of the proxied object
        (the remote object must be picklable)
        """
        return self._handler.getObjValue(self)
        
    def _getProxyOption(self, opt):
        val = self._proxyOptions[opt]
        if val is None:
            return self._handler.getProxyOption(opt)
        return val
    
    def _getProxyOptions(self):
        return dict([(k, self._getProxyOption(k)) for k in self._proxyOptions])
    
    def __reduce__(self):
        return (unpickleObjectProxy, (self._processId, self._proxyId, self._typeStr, self._attributes))
    
    def __repr__(self):
        #objRepr = self.__getattr__('__repr__')(callSync='value')
        return "<ObjectProxy for process %d, object 0x%x: %s >" % (self._processId, self._proxyId, self._typeStr)
        
        
    def __getattr__(self, attr, **kwds):
        """
        Calls __getattr__ on the remote object and returns the attribute
        by value or by proxy depending on the options set (see
        ObjectProxy._setProxyOptions and RemoteEventHandler.setProxyOptions)
        
        If the option 'deferGetattr' is True for this proxy, then a new proxy object
        is returned _without_ asking the remote object whether the named attribute exists.
        This can save time when making multiple chained attribute requests,
        but may also defer a possible AttributeError until later, making
        them more difficult to debug.
        """
        opts = self._getProxyOptions()
        for k in opts:
            if '_'+k in kwds:
                opts[k] = kwds.pop('_'+k)
        if opts['deferGetattr'] is True:
            return self._deferredAttr(attr)
        else:
            #opts = self._getProxyOptions()
            return self._handler.getObjAttr(self, attr, **opts)
    
    def _deferredAttr(self, attr):
        return DeferredObjectProxy(self, attr)
    
    def __call__(self, *args, **kwds):
        """
        Attempts to call the proxied object from the remote process.
        Accepts extra keyword arguments:
        
            _callSync    'off', 'sync', or 'async'
            _returnType   'value', 'proxy', or 'auto'
        
        If the remote call raises an exception on the remote process,
        it will be re-raised on the local process.
        
        """
        opts = self._getProxyOptions()
        for k in opts:
            if '_'+k in kwds:
                opts[k] = kwds.pop('_'+k)
        return self._handler.callObj(obj=self, args=args, kwds=kwds, **opts)
    
    
    ## Explicitly proxy special methods. Is there a better way to do this??
    
    def _getSpecialAttr(self, attr):
        ## this just gives us an easy way to change the behavior of the special methods
        return self._deferredAttr(attr)
    
    def __getitem__(self, *args):
        return self._getSpecialAttr('__getitem__')(*args)
    
    def __setitem__(self, *args):
        return self._getSpecialAttr('__setitem__')(*args, _callSync='off')
        
    def __setattr__(self, *args):
        return self._getSpecialAttr('__setattr__')(*args, _callSync='off')
        
    def __str__(self, *args):
        return self._getSpecialAttr('__str__')(*args, _returnType='value')
        
    def __len__(self, *args):
        return self._getSpecialAttr('__len__')(*args)
    
    def __add__(self, *args):
        return self._getSpecialAttr('__add__')(*args)
    
    def __sub__(self, *args):
        return self._getSpecialAttr('__sub__')(*args)
        
    def __div__(self, *args):
        return self._getSpecialAttr('__div__')(*args)
        
    def __truediv__(self, *args):
        return self._getSpecialAttr('__truediv__')(*args)
        
    def __floordiv__(self, *args):
        return self._getSpecialAttr('__floordiv__')(*args)
        
    def __mul__(self, *args):
        return self._getSpecialAttr('__mul__')(*args)
        
    def __pow__(self, *args):
        return self._getSpecialAttr('__pow__')(*args)
        
    def __iadd__(self, *args):
        return self._getSpecialAttr('__iadd__')(*args, _callSync='off')
    
    def __isub__(self, *args):
        return self._getSpecialAttr('__isub__')(*args, _callSync='off')
        
    def __idiv__(self, *args):
        return self._getSpecialAttr('__idiv__')(*args, _callSync='off')
        
    def __itruediv__(self, *args):
        return self._getSpecialAttr('__itruediv__')(*args, _callSync='off')
        
    def __ifloordiv__(self, *args):
        return self._getSpecialAttr('__ifloordiv__')(*args, _callSync='off')
        
    def __imul__(self, *args):
        return self._getSpecialAttr('__imul__')(*args, _callSync='off')
        
    def __ipow__(self, *args):
        return self._getSpecialAttr('__ipow__')(*args, _callSync='off')
        
    def __rshift__(self, *args):
        return self._getSpecialAttr('__rshift__')(*args)
        
    def __lshift__(self, *args):
        return self._getSpecialAttr('__lshift__')(*args)
        
    def __irshift__(self, *args):
        return self._getSpecialAttr('__irshift__')(*args, _callSync='off')
        
    def __ilshift__(self, *args):
        return self._getSpecialAttr('__ilshift__')(*args, _callSync='off')
        
    def __eq__(self, *args):
        return self._getSpecialAttr('__eq__')(*args)
    
    def __ne__(self, *args):
        return self._getSpecialAttr('__ne__')(*args)
        
    def __lt__(self, *args):
        return self._getSpecialAttr('__lt__')(*args)
    
    def __gt__(self, *args):
        return self._getSpecialAttr('__gt__')(*args)
        
    def __le__(self, *args):
        return self._getSpecialAttr('__le__')(*args)
    
    def __ge__(self, *args):
        return self._getSpecialAttr('__ge__')(*args)
        
    def __and__(self, *args):
        return self._getSpecialAttr('__and__')(*args)
        
    def __or__(self, *args):
        return self._getSpecialAttr('__or__')(*args)
        
    def __xor__(self, *args):
        return self._getSpecialAttr('__xor__')(*args)
        
    def __iand__(self, *args):
        return self._getSpecialAttr('__iand__')(*args, _callSync='off')
        
    def __ior__(self, *args):
        return self._getSpecialAttr('__ior__')(*args, _callSync='off')
        
    def __ixor__(self, *args):
        return self._getSpecialAttr('__ixor__')(*args, _callSync='off')
        
    def __mod__(self, *args):
        return self._getSpecialAttr('__mod__')(*args)
        
    def __radd__(self, *args):
        return self._getSpecialAttr('__radd__')(*args)
    
    def __rsub__(self, *args):
        return self._getSpecialAttr('__rsub__')(*args)
        
    def __rdiv__(self, *args):
        return self._getSpecialAttr('__rdiv__')(*args)
        
    def __rfloordiv__(self, *args):
        return self._getSpecialAttr('__rfloordiv__')(*args)
        
    def __rtruediv__(self, *args):
        return self._getSpecialAttr('__rtruediv__')(*args)
        
    def __rmul__(self, *args):
        return self._getSpecialAttr('__rmul__')(*args)
        
    def __rpow__(self, *args):
        return self._getSpecialAttr('__rpow__')(*args)
        
    def __rrshift__(self, *args):
        return self._getSpecialAttr('__rrshift__')(*args)
        
    def __rlshift__(self, *args):
        return self._getSpecialAttr('__rlshift__')(*args)
        
    def __rand__(self, *args):
        return self._getSpecialAttr('__rand__')(*args)
        
    def __ror__(self, *args):
        return self._getSpecialAttr('__ror__')(*args)
        
    def __rxor__(self, *args):
        return self._getSpecialAttr('__ror__')(*args)
        
    def __rmod__(self, *args):
        return self._getSpecialAttr('__rmod__')(*args)
        
    def __hash__(self):
        ## Required for python3 since __eq__ is defined.
        return id(self)
        
class DeferredObjectProxy(ObjectProxy):
    """
    This class represents an attribute (or sub-attribute) of a proxied object.
    It is used to speed up attribute requests. Take the following scenario::
    
        rsys = proc._import('sys')
        rsys.stdout.write('hello')
        
    For this simple example, a total of 4 synchronous requests are made to 
    the remote process: 
    
    1) import sys
    2) getattr(sys, 'stdout')
    3) getattr(stdout, 'write')
    4) write('hello')
    
    This takes a lot longer than running the equivalent code locally. To
    speed things up, we can 'defer' the two attribute lookups so they are
    only carried out when neccessary::
    
        rsys = proc._import('sys')
        rsys._setProxyOptions(deferGetattr=True)
        rsys.stdout.write('hello')
        
    This example only makes two requests to the remote process; the two 
    attribute lookups immediately return DeferredObjectProxy instances 
    immediately without contacting the remote process. When the call 
    to write() is made, all attribute requests are processed at the same time.
    
    Note that if the attributes requested do not exist on the remote object, 
    making the call to write() will raise an AttributeError.
    """
    def __init__(self, parentProxy, attribute):
        ## can't set attributes directly because setattr is overridden.
        for k in ['_processId', '_typeStr', '_proxyId', '_handler']:
            self.__dict__[k] = getattr(parentProxy, k)
        self.__dict__['_parent'] = parentProxy  ## make sure parent stays alive
        self.__dict__['_attributes'] = parentProxy._attributes + (attribute,)
        self.__dict__['_proxyOptions'] = parentProxy._proxyOptions.copy()
    
    def __repr__(self):
        return ObjectProxy.__repr__(self) + '.' + '.'.join(self._attributes)
    
    def _undefer(self):
        """
        Return a non-deferred ObjectProxy referencing the same object
        """
        return self._parent.__getattr__(self._attributes[-1], _deferGetattr=False)