/usr/lib/python3/dist-packages/boltons/queueutils.py is in python3-boltons 17.1.0-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 | # -*- coding: utf-8 -*-
"""Python comes with a many great data structures, from :class:`dict`
to :class:`collections.deque`, and no shortage of serviceable
algorithm implementations, from :func:`sorted` to :mod:`bisect`. But
priority queues are curiously relegated to an example documented in
:mod:`heapq`. Even there, the approach presented is not full-featured
and object-oriented. There is a built-in priority queue,
:class:`Queue.PriorityQueue`, but in addition to its austere API, it
carries the double-edged sword of threadsafety, making it fine for
multi-threaded, multi-consumer applications, but high-overhead for
cooperative/single-threaded use cases.
The ``queueutils`` module currently provides two Queue
implementations: :class:`HeapPriorityQueue`, based on a heap, and
:class:`SortedPriorityQueue`, based on a sorted list. Both use a
unified API based on :class:`BasePriortyQueue` to facilitate testing
the slightly different performance characteristics on various
application use cases.
>>> pq = PriorityQueue()
>>> pq.add('low priority task', 0)
>>> pq.add('high priority task', 2)
>>> pq.add('medium priority task 1', 1)
>>> pq.add('medium priority task 2', 1)
>>> len(pq)
4
>>> pq.pop()
'high priority task'
>>> pq.peek()
'medium priority task 1'
>>> len(pq)
3
"""
from heapq import heappush, heappop
from bisect import insort
import itertools
try:
from typeutils import make_sentinel
_REMOVED = make_sentinel(var_name='_REMOVED')
except ImportError:
_REMOVED = object()
try:
from listutils import BList
# see BarrelList docstring for notes
except ImportError:
BList = list
__all__ = ['PriorityQueue', 'BasePriorityQueue',
'HeapPriorityQueue', 'SortedPriorityQueue']
# TODO: make Base a real abstract class
# TODO: add uniqueification
class BasePriorityQueue(object):
"""The abstract base class for the other PriorityQueues in this
module. Override the ``_backend_type`` class attribute, as well as
the :meth:`_push_entry` and :meth:`_pop_entry` staticmethods for
custom subclass behavior. (Don't forget to use
:func:`staticmethod`).
Args:
priority_key (callable): A function that takes *priority* as
passed in by :meth:`add` and returns an integer
representing the effective priority.
"""
# negating priority means larger numbers = higher priority
_default_priority_key = staticmethod(lambda p: -int(p or 0))
_backend_type = list
def __init__(self, **kw):
self._pq = self._backend_type()
self._entry_map = {}
self._counter = itertools.count()
self._get_priority = kw.pop('priority_key', self._default_priority_key)
if kw:
raise TypeError('unexpected keyword arguments: %r' % kw.keys())
@staticmethod
def _push_entry(backend, entry):
pass # abstract
@staticmethod
def _pop_entry(backend):
pass # abstract
def add(self, task, priority=None):
"""Add a task to the queue, or change the *task*'s priority if
*task* is already in the queue. *task* can be any type, and
*priority* defaults to ``0``. Higher values representing
higher priority, but this behavior can be controlled by
setting *priority_key* in the constructor.
"""
priority = self._get_priority(priority)
if task in self._entry_map:
self.remove(task)
count = next(self._counter)
entry = [priority, count, task]
self._entry_map[task] = entry
self._push_entry(self._pq, entry)
def remove(self, task):
"""Remove a task from the priority queue. Raises :exc:`KeyError` if
the *task* is absent.
"""
entry = self._entry_map.pop(task)
entry[-1] = _REMOVED
def _cull(self, raise_exc=True):
"Remove entries marked as removed by previous :meth:`remove` calls."
while self._pq:
priority, count, task = self._pq[0]
if task is _REMOVED:
self._pop_entry(self._pq)
continue
return
if raise_exc:
raise IndexError('empty priority queue')
def peek(self, default=_REMOVED):
"""Read the next value in the queue without removing it. Returns
*default* on an empty queue, or raises :exc:`KeyError` if
*default* is not set.
"""
try:
self._cull()
_, _, task = self._pq[0]
except IndexError:
if default is not _REMOVED:
return default
raise IndexError('peek on empty queue')
return task
def pop(self, default=_REMOVED):
"""Remove and return the next value in the queue. Returns *default* on
an empty queue, or raises :exc:`KeyError` if *default* is not
set.
"""
try:
self._cull()
_, _, task = self._pop_entry(self._pq)
del self._entry_map[task]
except IndexError:
if default is not _REMOVED:
return default
raise IndexError('pop on empty queue')
return task
def __len__(self):
"Return the number of tasks in the queue."
return len(self._entry_map)
class HeapPriorityQueue(BasePriorityQueue):
"""A priority queue inherited from :class:`BasePriorityQueue`,
backed by a list and based on the :func:`heapq.heappop` and
:func:`heapq.heappush` functions in the built-in :mod:`heapq`
module.
"""
@staticmethod
def _pop_entry(backend):
return heappop(backend)
@staticmethod
def _push_entry(backend, entry):
heappush(backend, entry)
class SortedPriorityQueue(BasePriorityQueue):
"""A priority queue inherited from :class:`BasePriorityQueue`, based
on the :func:`bisect.insort` approach for in-order insertion into
a sorted list.
"""
_backend_type = BList
@staticmethod
def _pop_entry(backend):
return backend.pop(0)
@staticmethod
def _push_entry(backend, entry):
insort(backend, entry)
PriorityQueue = SortedPriorityQueue
|