/usr/share/voctomix/voctocore/lib/controlserver.py is in voctomix-core 1.0+git4-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 | import logging
from queue import Queue
from gi.repository import GObject
from lib.commands import ControlServerCommands
from lib.tcpmulticonnection import TCPMultiConnection
from lib.response import NotifyResponse
class ControlServer(TCPMultiConnection):
def __init__(self, pipeline):
'''Initialize server and start listening.'''
self.log = logging.getLogger('ControlServer')
super().__init__(port=9999)
self.command_queue = Queue()
self.commands = ControlServerCommands(pipeline)
def on_accepted(self, conn, addr):
'''Asynchronous connection listener.
Starts a handler for each connection.'''
self.log.debug('setting gobject io-watch on connection')
GObject.io_add_watch(conn, GObject.IO_IN, self.on_data, [''])
def on_data(self, conn, _, leftovers, *args):
'''Asynchronous connection handler.
Pushes data from socket into command queue linewise'''
close_after = False
try:
while True:
try:
leftovers.append(conn.recv(4096).decode(errors='replace'))
if len(leftovers[-1]) == 0:
self.log.info("Socket was closed")
leftovers.pop()
close_after = True
break
except UnicodeDecodeError as e:
continue
except BlockingIOError:
pass
data = "".join(leftovers)
del leftovers[:]
lines = data.split('\n')
for line in lines[:-1]:
self.log.debug("got line: %r", line)
line = line.strip()
# 'quit' = remote wants us to close the connection
if line == 'quit' or line == 'exit':
self.log.info("Client asked us to close the Connection")
self.close_connection(conn)
return False
self.log.debug('re-starting on_loop scheduling')
GObject.idle_add(self.on_loop)
self.command_queue.put((line, conn))
if close_after:
self.close_connection(conn)
return False
if lines[-1] != '':
self.log.debug("remaining %r", lines[-1])
leftovers.append(lines[-1])
return True
def on_loop(self):
'''Command handler. Processes commands in the command queue whenever
nothing else is happening (registered as GObject idle callback)'''
self.log.debug('on_loop called')
if self.command_queue.empty():
self.log.debug('command_queue is empty again, '
'stopping on_loop scheduling')
return False
line, requestor = self.command_queue.get()
words = line.split()
if len(words) < 1:
self.log.debug('command_queue is empty again, '
'stopping on_loop scheduling')
return True
command = words[0]
args = words[1:]
self.log.info("processing command %r with args %s", command, args)
response = None
try:
# deny calling private methods
if command[0] == '_':
self.log.info('private methods are not callable')
raise KeyError()
command_function = self.commands.__class__.__dict__[command]
except KeyError as e:
self.log.info("received unknown command %s", command)
response = "error unknown command %s\n" % command
else:
try:
responseObject = command_function(self.commands, *args)
except Exception as e:
message = str(e) or "<no message>"
response = "error %s\n" % message
else:
if isinstance(responseObject, NotifyResponse):
responseObject = [responseObject]
if isinstance(responseObject, list):
for obj in responseObject:
signal = "%s\n" % str(obj)
for conn in self.currentConnections:
self._schedule_write(conn, signal)
else:
response = "%s\n" % str(responseObject)
finally:
if response is not None and requestor in self.currentConnections:
self._schedule_write(requestor, response)
return False
def _schedule_write(self, conn, message):
queue = self.currentConnections[conn]
self.log.debug('re-starting on_write[%u] scheduling', conn.fileno())
GObject.io_add_watch(conn, GObject.IO_OUT, self.on_write)
queue.put(message)
def on_write(self, conn, *args):
self.log.debug('on_write[%u] called', conn.fileno())
try:
queue = self.currentConnections[conn]
except KeyError:
return False
if queue.empty():
self.log.debug('write_queue[%u] is empty again, '
'stopping on_write scheduling',
conn.fileno())
return False
message = queue.get()
try:
conn.send(message.encode())
except Exception as e:
self.log.warning('failed to send message', exc_info=True)
return True
|