/usr/share/pyshared/rope/base/taskhandle.py is in python-rope 0.9.2-1ubuntu1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 | import warnings
from rope.base import exceptions
class TaskHandle(object):
def __init__(self, name='Task', interrupts=True):
"""Construct a TaskHandle
If `interrupts` is `False` the task won't be interrupted by
calling `TaskHandle.stop()`.
"""
self.name = name
self.interrupts = interrupts
self.stopped = False
self.job_sets = []
self.observers = []
def stop(self):
"""Interrupts the refactoring"""
if self.interrupts:
self.stopped = True
self._inform_observers()
def current_jobset(self):
"""Return the current `JobSet`"""
if self.job_sets:
return self.job_sets[-1]
def add_observer(self, observer):
"""Register an observer for this task handle
The observer is notified whenever the task is stopped or
a job gets finished.
"""
self.observers.append(observer)
def is_stopped(self):
return self.stopped
def get_jobsets(self):
return self.job_sets
def create_jobset(self, name='JobSet', count=None):
result = JobSet(self, name=name, count=count)
self.job_sets.append(result)
self._inform_observers()
return result
def _inform_observers(self):
for observer in list(self.observers):
observer()
class JobSet(object):
def __init__(self, handle, name, count):
self.handle = handle
self.name = name
self.count = count
self.done = 0
self.job_name = None
def started_job(self, name):
self.check_status()
self.job_name = name
self.handle._inform_observers()
def finished_job(self):
self.check_status()
self.handle._inform_observers()
self.job_name = None
self.done += 1
def check_status(self):
if self.handle.is_stopped():
raise exceptions.InterruptedTaskError()
def get_active_job_name(self):
return self.job_name
def get_percent_done(self):
if self.count is not None and self.count > 0:
percent = self.done * 100 // self.count
return min(percent, 100)
def get_name(self):
return self.name
class NullTaskHandle(object):
def __init__(self):
pass
def is_stopped(self):
return False
def stop(self):
pass
def create_jobset(self, *args, **kwds):
return NullJobSet()
def get_jobsets(self):
return []
def add_observer(self, observer):
pass
class NullJobSet(object):
def started_job(self, name):
pass
def finished_job(self):
pass
def check_status(self):
pass
def get_active_job_name(self):
pass
def get_percent_done(self):
pass
def get_name(self):
pass
|