/usr/share/voctomix/voctocore/lib/tcpsingleconnection.py is in voctomix-core 1.0+git4-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 | import logging
import socket
import time
from abc import ABCMeta, abstractmethod
from gi.repository import GObject
class TCPSingleConnection(object, metaclass=ABCMeta):
def __init__(self, port):
if not hasattr(self, 'log'):
self.log = logging.getLogger('TCPSingleConnection')
self.log.debug('Binding to Source-Socket on [::]:%u', port)
self.boundSocket = socket.socket(socket.AF_INET6)
self.boundSocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.boundSocket.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY,
False)
self.boundSocket.bind(('::', port))
self.boundSocket.listen(1)
self.currentConnection = None
self.log.debug('Setting GObject io-watch on Socket')
GObject.io_add_watch(self.boundSocket, GObject.IO_IN, self.on_connect)
def on_connect(self, sock, *args):
conn, addr = sock.accept()
self.log.info('Incomming Connection from %s', addr)
if self.currentConnection is not None:
self.log.warning('Another Source is already connected, '
'closing existing pipeline')
self.disconnect()
time.sleep(1)
self.on_accepted(conn, addr)
self.currentConnection = conn
return True
def close_connection(self):
if self.currentConnection:
self.currentConnection.close()
self.currentConnection = None
self.log.info('Connection closed')
@abstractmethod
def on_accepted(self, conn, addr):
raise NotImplementedError(
"child classes of TCPSingleConnection must implement on_accepted()"
)
@abstractmethod
def disconnect(self):
raise NotImplementedError(
"child classes of TCPSingleConnection must implement disconnect()"
)
|