This file is indexed.

/usr/share/pyshared/rope/base/taskhandle.py is in python-rope 0.9.2-1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
import warnings

from rope.base import exceptions


class TaskHandle(object):

    def __init__(self, name='Task', interrupts=True):
        """Construct a TaskHandle

        If `interrupts` is `False` the task won't be interrupted by
        calling `TaskHandle.stop()`.

        """
        self.name = name
        self.interrupts = interrupts
        self.stopped = False
        self.job_sets = []
        self.observers = []

    def stop(self):
        """Interrupts the refactoring"""
        if self.interrupts:
            self.stopped = True
            self._inform_observers()

    def current_jobset(self):
        """Return the current `JobSet`"""
        if self.job_sets:
            return self.job_sets[-1]

    def add_observer(self, observer):
        """Register an observer for this task handle

        The observer is notified whenever the task is stopped or
        a job gets finished.

        """
        self.observers.append(observer)

    def is_stopped(self):
        return self.stopped

    def get_jobsets(self):
        return self.job_sets

    def create_jobset(self, name='JobSet', count=None):
        result = JobSet(self, name=name, count=count)
        self.job_sets.append(result)
        self._inform_observers()
        return result

    def _inform_observers(self):
        for observer in list(self.observers):
            observer()


class JobSet(object):

    def __init__(self, handle, name, count):
        self.handle = handle
        self.name = name
        self.count = count
        self.done = 0
        self.job_name = None

    def started_job(self, name):
        self.check_status()
        self.job_name = name
        self.handle._inform_observers()

    def finished_job(self):
        self.check_status()
        self.handle._inform_observers()
        self.job_name = None
        self.done += 1

    def check_status(self):
        if self.handle.is_stopped():
            raise exceptions.InterruptedTaskError()

    def get_active_job_name(self):
        return self.job_name

    def get_percent_done(self):
        if self.count is not None and self.count > 0:
            percent = self.done * 100 // self.count
            return min(percent, 100)

    def get_name(self):
        return self.name


class NullTaskHandle(object):

    def __init__(self):
        pass

    def is_stopped(self):
        return False

    def stop(self):
        pass

    def create_jobset(self, *args, **kwds):
        return NullJobSet()

    def get_jobsets(self):
        return []

    def add_observer(self, observer):
        pass


class NullJobSet(object):

    def started_job(self, name):
        pass

    def finished_job(self):
        pass

    def check_status(self):
        pass

    def get_active_job_name(self):
        pass

    def get_percent_done(self):
        pass

    def get_name(self):
        pass