/usr/lib/python3/dist-packages/UM/JobQueue.py is in python3-uranium 3.1.0-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 | # Copyright (c) 2015 Ultimaker B.V.
# Uranium is released under the terms of the LGPLv3 or higher.
import multiprocessing
import threading
from UM.Signal import Signal, signalemitter
from UM.Logger import Logger
from typing import TYPE_CHECKING, List, Callable, Any
if TYPE_CHECKING:
from UM.Job import Job
## A thread pool and queue manager for Jobs.
#
# The JobQueue class manages a queue of Job objects and a set of threads that
# can take things from this queue to process them.
# \sa Job
@signalemitter
class JobQueue():
## Initialize.
#
# \param thread_count The amount of threads to use. Can be a positive integer or 'auto'.
# When 'auto', the number of threads is based on the number of processors and cores on the machine.
def __init__(self, thread_count: (str, int) = "auto"): #pylint: disable=bad-whitespace
if JobQueue._instance is None:
JobQueue._instance = self
else:
raise RuntimeError("Attempted to create multiple instances of JobQueue")
super().__init__()
if thread_count == "auto":
try:
thread_count = multiprocessing.cpu_count()
except NotImplementedError:
thread_count = 0
if thread_count <= 0:
thread_count = 2 # Assume we can run at least two threads in parallel.
self._threads = [_Worker(self) for t in range(thread_count)]
self._semaphore = threading.Semaphore(0)
self._jobs = [] # type: List[Job]
self._jobs_lock = threading.Lock()
for thread in self._threads:
thread.daemon = True
thread.start()
## Add a Job to the queue.
#
# \param job \type{Job} The Job to add.
def add(self, job: "Job"):
with self._jobs_lock:
self._jobs.append(job)
self._semaphore.release()
## Remove a waiting Job from the queue.
#
# \param job \type{Job} The Job to remove.
#
# \note If a job has already begun processing it is already removed from the queue
# and thus can no longer be cancelled.
def remove(self, job: "Job"):
with self._jobs_lock:
if job in self._jobs:
self._jobs.remove(job)
## Emitted whenever a job starts processing.
#
# \param job \type{Job} The job that has started processing.
jobStarted = Signal()
## Emitted whenever a job has finished processing.
#
# \param job \type{Job} The job that has finished processing.
jobFinished = Signal()
## protected:
# Get the next job off the queue.
# Note that this will block until a job is available.
def _nextJob(self):
self._semaphore.acquire()
with self._jobs_lock:
# Semaphore release() can apparently cause all waiting threads to continue.
# So to prevent issues, double check whether we actually have waiting jobs.
if not self._jobs:
return None
return self._jobs.pop(0)
## Get the singleton instance of the JobQueue.
@classmethod
def getInstance(cls) -> "JobQueue":
if not cls._instance:
cls._instance = JobQueue()
return cls._instance
_instance = None # type: JobQueue
## Internal
#
# A worker thread that can process jobs from the JobQueue.
class _Worker(threading.Thread):
def __init__(self, queue):
super().__init__()
self._queue = queue
def run(self):
while True:
# Get the next job from the queue. Note that this blocks until a new job is available.
job = self._queue._nextJob()
if not job:
continue
# Process the job.
self._queue.jobStarted.emit(job)
job._running = True
try:
job.run()
except Exception as e:
Logger.logException("e", "Job %s caused an exception", str(job))
job.setError(e)
job._running = False
job._finished = True
job.finished.emit(job)
self._queue.jobFinished.emit(job)
|