/usr/lib/python3/dist-packages/livestreamer/buffers.py is in python3-livestreamer 1.12.2-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 | from collections import deque
from io import BytesIO
from threading import Event, Lock
class Chunk(BytesIO):
"""A single chunk, part of the buffer."""
def __init__(self, buf):
self.length = len(buf)
BytesIO.__init__(self, buf)
@property
def empty(self):
return self.tell() == self.length
class Buffer(object):
"""Simple buffer for use in single-threaded consumer/filler.
Stores chunks in a deque to avoid inefficient reallocating
of large buffers.
"""
def __init__(self):
self.chunks = deque()
self.current_chunk = None
self.closed = False
self.length = 0
def _iterate_chunks(self, size):
bytes_left = size
while bytes_left:
try:
current_chunk = (self.current_chunk or
Chunk(self.chunks.popleft()))
except IndexError:
break
data = current_chunk.read(bytes_left)
bytes_left -= len(data)
if current_chunk.empty:
self.current_chunk = None
else:
self.current_chunk = current_chunk
yield data
def write(self, data):
if not self.closed:
data = bytes(data) # Copy so that original buffer may be reused
self.chunks.append(data)
self.length += len(data)
def read(self, size=-1):
if size < 0 or size > self.length:
size = self.length
if not size:
return b""
data = b"".join(self._iterate_chunks(size))
self.length -= len(data)
return data
def close(self):
self.closed = True
class RingBuffer(Buffer):
"""Circular buffer for use in multi-threaded consumer/filler."""
def __init__(self, size=8192*4):
Buffer.__init__(self)
self.buffer_size = size
self.buffer_lock = Lock()
self.event_free = Event()
self.event_free.set()
self.event_used = Event()
def _check_events(self):
if self.length > 0:
self.event_used.set()
else:
self.event_used.clear()
if self.is_full:
self.event_free.clear()
else:
self.event_free.set()
def _read(self, size=-1):
with self.buffer_lock:
data = Buffer.read(self, size)
self._check_events()
return data
def read(self, size=-1, block=True, timeout=None):
if block and not self.closed:
self.event_used.wait(timeout)
# If the event is still not set it's a timeout
if not self.event_used.is_set() and self.length == 0:
raise IOError("Read timeout")
return self._read(size)
def write(self, data):
if self.closed:
return
data_left = len(data)
data_total = len(data)
while data_left > 0:
self.event_free.wait()
if self.closed:
return
with self.buffer_lock:
write_len = min(self.free, data_left)
written = data_total - data_left
Buffer.write(self, data[written:written+write_len])
data_left -= write_len
self._check_events()
def resize(self, size):
with self.buffer_lock:
self.buffer_size = size
self._check_events()
def wait_free(self, timeout=None):
self.event_free.wait(timeout)
def wait_used(self, timeout=None):
self.event_used.wait(timeout)
def close(self):
Buffer.close(self)
# Make sure we don't let a .write() and .read() block forever
self.event_free.set()
self.event_used.set()
@property
def free(self):
return max(self.buffer_size - self.length, 0)
@property
def is_full(self):
return self.free == 0
__all__ = ["Buffer", "RingBuffer"]
|