/usr/lib/python3/dist-packages/mido/sockets.py is in python3-mido 1.2.7-2.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 | """
MIDI over TCP/IP.
"""
import socket
import select
from .parser import Parser
from .ports import MultiPort, BaseIOPort
from .py2 import PY2
def _is_readable(socket):
"""Return True if there is data to be read on the socket."""
timeout = 0
(rlist, wlist, elist) = select.select(
[socket.fileno()], [], [], timeout)
return bool(rlist)
class PortServer(MultiPort):
# Todo: queue size.
def __init__(self, host, portno, backlog=1):
MultiPort.__init__(self, format_address(host, portno))
self.ports = []
self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True)
self._socket.setblocking(True)
self._socket.bind((host, portno))
self._socket.listen(backlog)
def _get_device_type(self):
return 'server'
def _close(self):
# Close all connections.
for port in self.ports:
port.close()
self._socket.close()
def _update_ports(self):
"""Remove closed port ports."""
self.ports = [port for port in self.ports if not port.closed]
def accept(self, block=True):
"""
Accept a connection from a client.
Will block until there is a new connection, and then return a
SocketPort object.
If block=False, None will be returned if there is no
new connection waiting.
"""
if not block and not _is_readable(self._socket):
return None
self._update_ports()
conn, (host, port) = self._socket.accept()
return SocketPort(host, port, conn=conn)
def _send(self, message):
self._update_ports()
return MultiPort._send(self, message)
def _receive(self, block=True):
port = self.accept(block=False)
if port:
self.ports.append(port)
self._update_ports()
return MultiPort._receive(self)
class SocketPort(BaseIOPort):
def __init__(self, host, portno, conn=None):
BaseIOPort.__init__(self, name=format_address(host, portno))
self.closed = False
self._parser = Parser()
self._messages = self._parser.messages
if conn is None:
self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._socket.setblocking(True)
self._socket.connect((host, portno))
else:
self._socket = conn
if PY2:
kwargs = {'bufsize': 0}
else:
kwargs = {'buffering': None}
self._rfile = self._socket.makefile('rb', **kwargs)
self._wfile = self._socket.makefile('wb', **kwargs)
def _get_device_type(self):
return 'socket'
def _receive(self, block=True):
while _is_readable(self._socket):
try:
byte = self._rfile.read(1)
except socket.error as err:
raise IOError(err.args[1])
if byte == '':
# The other end has disconnected.
self.close()
break
else:
self._parser.feed_byte(ord(byte))
def _send(self, message):
try:
self._wfile.write(message.bin())
self._wfile.flush()
except socket.error as err:
if err.errno == 32:
# Broken pipe. The other end has disconnected.
self.close()
raise IOError(err.args[1])
def _close(self):
self._socket.close()
def connect(host, portno):
"""Connect to a socket port server.
The return value is a SocketPort object connected to another
SocketPort object at the server end. Messages can be sent either way.
"""
return SocketPort(host, portno)
def parse_address(address):
"""Parse and address on the format host:port.
Returns a tuple (host, port). Raises ValueError if format is
invalid or port is not an integer or out of range.
"""
words = address.split(':')
if len(words) != 2:
raise ValueError('address must contain exactly one colon')
host, port = words
try:
port = int(port)
except ValueError:
raise ValueError('port number must be an integer')
# Note: port 0 is not allowed.
if not 0 < port < (2**16):
raise ValueError('port number out of range')
return (host, port)
def format_address(host, portno):
return '{}{:d}'.format(host, portno)
|