This file is indexed.

/usr/lib/python3/dist-packages/UM/Signal.py is in python3-uranium 3.1.0-1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
# Copyright (c) 2017 Ultimaker B.V.
# Copyright (c) Thiago Marcos P. Santos
# Copyright (c) Christopher S. Case
# Copyright (c) David H. Bronke
# Uranium is released under the terms of the LGPLv3 or higher.

import enum #For the compress parameter of postponeSignals.
import inspect
import threading
import os
import weakref
from weakref import ReferenceType
from typing import Any, Union, Callable, TypeVar, Generic, List, Tuple, Iterable, cast, Optional
import contextlib
import traceback

import functools

from UM.Event import CallFunctionEvent
from UM.Decorators import deprecated, call_if_enabled
from UM.Logger import Logger
from UM.Platform import Platform
from UM import FlameProfiler

MYPY = False
if MYPY:
    from UM.Application import Application


# Helper functions for tracing signal emission.
def _traceEmit(signal: Any, *args: Any, **kwargs: Any) -> None:
    Logger.log("d", "Emitting %s with arguments %s", str(signal.getName()), str(args) + str(kwargs))

    if signal._Signal__type == Signal.Queued:
        Logger.log("d", "> Queued signal, postponing emit until next event loop run")

    if signal._Signal__type == Signal.Auto:
        if Signal._signalQueue is not None and threading.current_thread() is not Signal._signalQueue.getMainThread():
            Logger.log("d", "> Auto signal and not on main thread, postponing emit until next event loop run")

    for func in signal._Signal__functions:
        Logger.log("d", "> Calling %s", str(func))

    for dest, func in signal._Signal__methods:
        Logger.log("d", "> Calling %s on %s", str(func), str(dest))

    for signal in signal._Signal__signals:
        Logger.log("d", "> Emitting %s", str(signal._Signal__name))


def _traceConnect(signal: Any, *args: Any, **kwargs: Any) -> None:
    Logger.log("d", "Connecting signal %s to %s", str(signal._Signal__name), str(args[0]))


def _traceDisconnect(signal: Any, *args: Any, **kwargs: Any) -> None:
    Logger.log("d", "Connecting signal %s from %s", str(signal._Signal__name), str(args[0]))


def _isTraceEnabled() -> bool:
    return "URANIUM_TRACE_SIGNALS" in os.environ


class SignalQueue:
    def functionEvent(self, event):
        pass

    def getMainThread(self):
        pass

###########################################################################
# Integration with the Flame Profiler.


def _recordSignalNames():
    return FlameProfiler.enabled()


def profileEmit(func):
    if FlameProfiler.enabled():
        @functools.wraps(func)
        def wrapped(self, *args, **kwargs):
            FlameProfiler.updateProfileConfig()
            if FlameProfiler.isRecordingProfile():
                with FlameProfiler.profileCall("[SIG] " + self.getName()):
                    func(self, *args, **kwargs)
            else:
                func(self, *args, **kwargs)
        return wrapped

    else:
        return func


###########################################################################

##  Simple implementation of signals and slots.
#
#   Signals and slots can be used as a light weight event system. A class can
#   define signals that other classes can connect functions or methods to, called slots.
#   Whenever the signal is called, it will proceed to call the connected slots.
#
#   To create a signal, create an instance variable of type Signal. Other objects can then
#   use that variable's `connect()` method to connect methods, callable objects or signals
#   to the signal. To emit the signal, call `emit()` on the signal. Arguments can be passed
#   along to the signal, but slots will be required to handle them. When connecting signals
#   to other signals, the connected signal will be emitted whenever the signal is emitted.
#
#   Signal-slot connections are weak references and as such will not prevent objects
#   from being destroyed. In addition, all slots will be implicitly disconnected when
#   the signal is destroyed.
#
#   \warning It is imperative that the signals are created as instance variables, otherwise
#   emitting signals will get confused. To help with this, see the SignalEmitter class.
#
#   Loosely based on http://code.activestate.com/recipes/577980-improved-signalsslots-implementation-in-python/ #pylint: disable=wrong-spelling-in-comment
#   \sa SignalEmitter
class Signal:
    ##  Signal types.
    #   These indicate the type of a signal, that is, how the signal handles calling the connected
    #   slots.
    #   - Direct connections immediately call the connected slots from the thread that called emit().
    #   - Auto connections will push the call onto the event loop if the current thread is
    #     not the main thread, but make a direct call if it is.
    #   - Queued connections will always push
    #     the call on to the event loop.
    Direct = 1
    Auto = 2
    Queued = 3

    ##  Initialize the instance.
    #
    #   \param kwargs Keyword arguments.
    #                 Possible keywords:
    #                 - type: The signal type. Defaults to Auto.
    def __init__(self, type: int = Auto) -> None:
        # These collections must be treated as immutable otherwise we lose thread safety.
        self.__functions = WeakImmutableList()      # type: "WeakImmutableList"
        self.__methods = WeakImmutablePairList()    # type: "WeakImmutablePairList"
        self.__signals = WeakImmutableList()        # type: "WeakImmutableList"

        self.__lock = threading.Lock()  # Guards access to the fields above.
        self.__type = type

        self._postpone_emit = False
        self._postpone_thread = None    # type: threading.Thread
        self._compress_postpone = False
        self._postponed_emits = None    # type: Any

        if _recordSignalNames():
            try:
                if Platform.isWindows():
                    self.__name = inspect.stack()[1][0].f_locals["key"]
                else:
                    self.__name = inspect.stack()[1].frame.f_locals["key"]
            except KeyError:
                self.__name = "Signal"
        else:
            self.__name = "Anon"

    def getName(self):
        return self.__name

    ##  \exception NotImplementedError
    def __call__(self) -> None:
        raise NotImplementedError("Call emit() to emit a signal")

    ##  Get type of the signal
    #   \return \type{int} Direct(1), Auto(2) or Queued(3)
    def getType(self) -> int:
        return self.__type

    ##  Emit the signal which indirectly calls all of the connected slots.
    #
    #   \param args The positional arguments to pass along.
    #   \param kwargs The keyword arguments to pass along.
    #
    #   \note If the Signal type is Queued and this is not called from the application thread
    #   the call will be posted as an event to the application main thread, which means the
    #   function will be called on the next application event loop tick.
    @call_if_enabled(_traceEmit, _isTraceEnabled())
    @profileEmit
    def emit(self, *args: Any, **kwargs: Any) -> None:
        # Check to see if we need to postpone emits
        if self._postpone_emit:
            if threading.current_thread() != self._postpone_thread:
                Logger.log("w", "Tried to emit signal from thread %s while emits are being postponed by %s. Traceback:", threading.current_thread(), self._postpone_thread)
                tb = traceback.format_stack()
                for line in tb:
                    Logger.log("w", line)

            if self._compress_postpone == CompressTechnique.CompressSingle:
                # If emits should be compressed, we only emit the last emit that was called
                self._postponed_emits = (args, kwargs)
            else:
                # If emits should not be compressed or compressed per parameter value, we catch all calls to emit and put them in a list to be called later.
                if not self._postponed_emits:
                    self._postponed_emits = []
                self._postponed_emits.append((args, kwargs))
            return

        try:
            if self.__type == Signal.Queued:
                Signal._app.functionEvent(CallFunctionEvent(self.__performEmit, args, kwargs))
                return
            if self.__type == Signal.Auto:
                if threading.current_thread() is not Signal._app.getMainThread():
                    Signal._app.functionEvent(CallFunctionEvent(self.__performEmit, args, kwargs))
                    return
        except AttributeError: # If Signal._app is not set
            return

        self.__performEmit(*args, **kwargs)

    ##  Connect to this signal.
    #   \param connector The signal or slot (function) to connect.
    @call_if_enabled(_traceConnect, _isTraceEnabled())
    def connect(self, connector: Union['Signal', Callable[[],None]]) -> None:
        if self._postpone_emit:
            Logger.log("w", "Tried to connect to signal %s that is currently being postponed, this is not possible", self.__name)
            return

        with self.__lock:
            if isinstance(connector, Signal):
                if connector == self:
                    return
                self.__signals = self.__signals.append(connector)
            elif inspect.ismethod(connector):
                # if SIGNAL_PROFILE:
                #     Logger.log('d', "Connector method qual name: " + connector.__func__.__qualname__)
                self.__methods = self.__methods.append(cast(Any, connector).__self__, cast(Any, connector).__func__)
            else:
                # Once again, update the list of functions using a whole new list.
                # if SIGNAL_PROFILE:
                #     Logger.log('d', "Connector function qual name: " + connector.__qualname__)

                self.__functions = self.__functions.append(connector)

    ##  Disconnect from this signal.
    #   \param connector The signal or slot (function) to disconnect.
    @call_if_enabled(_traceDisconnect, _isTraceEnabled())
    def disconnect(self, connector):
        if self._postpone_emit:
            Logger.log("w", "Tried to disconnect from signal %s that is currently being postponed, this is not possible", self.__name)
            return

        with self.__lock:
            if isinstance(connector, Signal):
                self.__signals = self.__signals.remove(connector)
            elif inspect.ismethod(connector):
                self.__methods = self.__methods.remove(connector.__self__, connector.__func__)
            else:
                self.__functions = self.__functions.remove(connector)

    ##  Disconnect all connected slots.
    def disconnectAll(self):
        if self._postpone_emit:
            Logger.log("w", "Tried to disconnect from signal %s that is currently being postponed, this is not possible", self.__name)
            return

        with self.__lock:
            self.__functions = WeakImmutableList()      # type: "WeakImmutableList"
            self.__methods = WeakImmutablePairList()    # type: "WeakImmutablePairList"
            self.__signals = WeakImmutableList()        # type: "WeakImmutableList"

    ##  To support Pickle
    #
    #   Since Weak containers cannot be serialized by Pickle we just return an empty dict as state.
    def __getstate__(self):
        return {}

    ##  To properly handle deepcopy in combination with __getstate__
    #
    #   Apparently deepcopy uses __getstate__ internally, which is not documented. The reimplementation
    #   of __getstate__ then breaks deepcopy. On the other hand, if we do not reimplement it like that,
    #   we break pickle. So instead make sure to also reimplement __deepcopy__.
    def __deepcopy__(self, memo):
        # Snapshot these fields
        with self.__lock:
            functions = self.__functions
            methods = self.__methods
            signals = self.__signals

        signal = Signal(type = self.__type)
        signal.__functions = functions
        signal.__methods = methods
        signal.__signals = signals
        return signal

    ##  private:

    #   To avoid circular references when importing Application, this should be
    #   set by the Application instance.
    _app = None  # type: Application

    _signalQueue = None  # type: SignalQueue

    # Private implementation of the actual emit.
    # This is done to make it possible to freely push function events without needing to maintain state.
    def __performEmit(self, *args, **kwargs):
        # Quickly make some private references to the collections we need to process.
        # Although the these fields are always safe to use read and use with regards to threading,
        # we want to operate on a consistent snapshot of the whole set of fields.
        with self.__lock:
            functions = self.__functions
            methods = self.__methods
            signals = self.__signals

        if not FlameProfiler.isRecordingProfile():
            # Call handler functions
            for func in functions:
                func(*args, **kwargs)

            # Call handler methods
            for dest, func in methods:
                func(dest, *args, **kwargs)

            # Emit connected signals
            for signal in signals:
                signal.emit(*args, **kwargs)
        else:
            # Call handler functions
            for func in functions:
                with FlameProfiler.profileCall(func.__qualname__):
                    func(*args, **kwargs)

            # Call handler methods
            for dest, func in methods:
                with FlameProfiler.profileCall(func.__qualname__):
                    func(dest, *args, **kwargs)

            # Emit connected signals
            for signal in signals:
                with FlameProfiler.profileCall("[SIG]" + signal.getName()):
                    signal.emit(*args, **kwargs)

    # This __str__() is useful for debugging.
    # def __str__(self):
    #     function_str = ", ".join([repr(f) for f in self.__functions])
    #     method_str = ", ".join([ "{dest: " + str(dest) + ", funcs: " + strMethodSet(funcs) + "}" for dest, funcs in self.__methods])
    #     signal_str = ", ".join([str(signal) for signal in self.__signals])
    #     return "Signal<{}> {{ __functions={{ {} }}, __methods={{ {} }}, __signals={{ {} }} }}".format(id(self), function_str, method_str, signal_str)


def strMethodSet(method_set):
    return "{" + ", ".join([str(m) for m in method_set]) + "}"

class CompressTechnique(enum.Enum):
    NoCompression = 0
    CompressSingle = 1
    CompressPerParameterValue = 2

##  A context manager that allows postponing of signal emissions
#
#   This context manager will collect any calls to emit() made for the provided signals
#   and only emit them after exiting. This ensures more batched processing of signals.
#
#   The optional "compress" argument will limit the emit calls to 1. This means that
#   when a bunch of calls are made to the signal's emit() method, only the last call
#   will be emitted on exit.
#
#   \warning When compress is True, only the **last** call will be emitted. This means
#   that any other calls will be ignored, _including their arguments_.
#
#   \param signals The signals to postpone emits for.
#   \param compress Whether to enable compression of emits or not.
@contextlib.contextmanager
def postponeSignals(*signals, compress: CompressTechnique = CompressTechnique.NoCompression):
    # To allow for nested postpones on the same signals, we should check if signals are not already
    # postponed and only change those that are not yet postponed.
    restore_emit = []
    for signal in signals:
        if not signal._postpone_emit: # Do nothing if the signal has already been changed
            signal._postpone_emit = True
            signal._postpone_thread = threading.current_thread()
            signal._compress_postpone = compress
            # Since we made changes, make sure to restore the signal after exiting the context manager
            restore_emit.append(signal)

    # Execute the code block in the "with" statement
    yield

    for signal in restore_emit:
        # We are done with the code, restore all changed signals to their "normal" state
        signal._postpone_emit = False

        if signal._postponed_emits:
            # Send any signal emits that were collected while emits were being postponed
            if signal._compress_postpone == CompressTechnique.CompressSingle:
                signal.emit(*signal._postponed_emits[0], **signal._postponed_emits[1])
            elif signal._compress_postpone == CompressTechnique.CompressPerParameterValue:
                uniques = {(tuple(args), tuple(kwargs.items())) for args, kwargs in signal._postponed_emits} #Have to make them tuples in order to make them hashable.
                for args, kwargs in uniques:
                    signal.emit(*args, **dict(kwargs))
            else:
                for args, kwargs in signal._postponed_emits:
                    signal.emit(*args, **kwargs)
            signal._postponed_emits = None

        signal._postpone_thread = None
        signal._compress_postpone = False


##  Convenience class to simplify signal creation.
#
#   This class is a Convenience class to simplify signal creation. Since signals
#   need to be instance variables, normally you would need to create all signals
#   in the class" `__init__` method. However, this makes them rather awkward to
#   document. This class instead makes it possible to declare them as class variables,
#   which makes documenting them near the function they are used possible.
#   During the call to `__init__()`, this class will then search through all the
#   properties of the instance and create instance variables for each class variable
#   that is an instance of Signal.
class SignalEmitter:
    ##  Initialize method.
    @deprecated("Please use the new @signalemitter decorator", "2.2")
    def __init__(self, **kwargs):
        super().__init__()
        for name, signal in inspect.getmembers(self, lambda i: isinstance(i, Signal)):
            setattr(self, name, Signal(type = signal.getType())) #pylint: disable=bad-whitespace


##  Class decorator that ensures a class has unique instances of signals.
#
#   Since signals need to be instance variables, normally you would need to create all
#   signals in the class" `__init__` method. However, this makes them rather awkward to
#   document. This decorator instead makes it possible to declare them as class variables,
#   which makes documenting them near the function they are used possible. This decorator
#   adjusts the class' __new__ method to create new signal instances for all class signals.
def signalemitter(cls):
    # First, check if the base class has any signals defined
    signals = inspect.getmembers(cls, lambda i: isinstance(i, Signal))
    if not signals:
        raise TypeError("Class {0} is marked as signal emitter but no signal were found".format(cls))

    # Then, replace the class' new method with one that modifies the created instance to have
    # unique signals.
    old_new = cls.__new__
    def new_new(subclass, *args, **kwargs):
        if old_new == object.__new__:
            sub = object.__new__(subclass)
        else:
            sub = old_new(subclass, *args, **kwargs)

        for key, value in inspect.getmembers(cls, lambda i: isinstance(i, Signal)):
            setattr(sub, key, Signal(type = value.getType()))

        return sub

    cls.__new__ = new_new
    return cls

T = TypeVar('T')


##  Minimal implementation of a weak reference list with immutable tendencies.
#
#   Strictly speaking this isn't immutable because the garbage collector can modify
#   it, but no application code can. Also, this class doesn't implement the Python
#   list API, only the handful of methods we actually need in the code above.
class WeakImmutableList(Generic[T], Iterable):
    def __init__(self):
        self.__list = []    # type: List[ReferenceType[Optional[T]]]

    ## Append an item and return a new list
    #
    #  \param item the item to append
    #  \return a new list
    def append(self, item: T) -> "WeakImmutableList[T]":
        new_instance = WeakImmutableList()  # type: WeakImmutableList[T]
        new_instance.__list = self.__cleanList()
        new_instance.__list.append(ReferenceType(item))
        return new_instance

    ## Remove an item and return a list
    #
    #  Note that unlike the normal Python list.remove() method, this ones
    #  doesn't throw a ValueError if the item isn't in the list.
    #  \param item item to remove
    #  \return a list which does not have the item.
    def remove(self, item: T) -> "WeakImmutableList[T]":
        for item_ref in self.__list:
            if item_ref() is item:
                new_instance = WeakImmutableList()   # type: WeakImmutableList[T]
                new_instance.__list = self.__cleanList()
                new_instance.__list.remove(item_ref)
                return new_instance
        else:
            return self  # No changes needed

    # Create a new list with the missing values removed.
    def __cleanList(self) -> "List[ReferenceType[Optional[T]]]":
        return [item_ref for item_ref in self.__list if item_ref() is not None]

    def __iter__(self):
        return WeakImmutableListIterator(self.__list)


## Iterator wrapper which filters out missing values.
#
# It dereferences each weak reference object and filters out the objects
# which have already disappeared via GC.
class WeakImmutableListIterator(Generic[T], Iterable):
    def __init__(self, list_):
        self.__it = list_.__iter__()

    def __iter__(self):
        return self

    def __next__(self):
        next_item = self.__it.__next__()()
        while next_item is None:    # Skip missing values
            next_item = self.__it.__next__()()
        return next_item


U = TypeVar('U')


##  A variation of WeakImmutableList which holds a pair of values using weak refernces.
class WeakImmutablePairList(Generic[T,U], Iterable):
    def __init__(self):
        self.__list = []    # type: List[Tuple[ReferenceType[T],ReferenceType[U]]]

    ## Append an item and return a new list
    #
    #  \param item the item to append
    #  \return a new list
    def append(self, left_item: T, right_item: U) -> "WeakImmutablePairList[T,U]":
        new_instance = WeakImmutablePairList()  # type: WeakImmutablePairList[T,U]
        new_instance.__list = self.__cleanList()
        new_instance.__list.append( (weakref.ref(left_item), weakref.ref(right_item)) )
        return new_instance

    ## Remove an item and return a list
    #
    #  Note that unlike the normal Python list.remove() method, this ones
    #  doesn't throw a ValueError if the item isn't in the list.
    #  \param item item to remove
    #  \return a list which does not have the item.
    def remove(self, left_item: T, right_item: U) -> "WeakImmutablePairList[T,U]":
        for pair in self.__list:
            left = pair[0]()
            right = pair[1]()

            if left is left_item and right is right_item:
                new_instance = WeakImmutablePairList() # type: WeakImmutablePairList[T,U]
                new_instance.__list = self.__cleanList()
                new_instance.__list.remove(pair)
                return new_instance
        else:
            return self # No changes needed

    # Create a new list with the missing values removed.
    def __cleanList(self) -> List[Tuple[ReferenceType,ReferenceType]]:
        return [pair for pair in self.__list if pair[0]() is not None and pair[1]() is not None]

    def __iter__(self):
        return WeakImmutablePairListIterator(self.__list)


# A small iterator wrapper which dereferences the weak ref objects and filters
# out the objects which have already disappeared via GC.
class WeakImmutablePairListIterator:
    def __init__(self, list_) -> None:
        self.__it = list_.__iter__()

    def __iter__(self):
        return self

    def __next__(self):
        pair = self.__it.__next__()
        left = pair[0]()
        right = pair[1]()
        while left is None or right is None:    # Skip missing values
            pair = self.__it.__next__()
            left = pair[0]()
            right = pair[1]()

        return (left, right)