/usr/lib/python3/dist-packages/serial/aio.py is in python3-serial 3.0.1-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 | #!/usr/bin/env python3
#
# Experimental implementation of asyncio support.
#
# This file is part of pySerial. https://github.com/pyserial/pyserial
# (C) 2015 Chris Liechti <cliechti@gmx.net>
#
# SPDX-License-Identifier: BSD-3-Clause
"""\
Support asyncio with serial ports. EXPERIMENTAL
Posix platforms only, Python 3.4+ only.
Windows event loops can not wait for serial ports with the current
implementation. It should be possible to get that working though.
"""
import asyncio
import serial
import logging
class SerialTransport(asyncio.Transport):
def __init__(self, loop, protocol, serial_instance):
self._loop = loop
self._protocol = protocol
self.serial = serial_instance
self._closing = False
self._paused = False
# XXX how to support url handlers too
self.serial.timeout = 0
self.serial.nonblocking()
loop.call_soon(protocol.connection_made, self)
# only start reading when connection_made() has been called
loop.call_soon(loop.add_reader, self.serial.fd, self._read_ready)
def __repr__(self):
return '{self.__class__.__name__}({self._loop}, {self._protocol}, {self.serial})'.format(self=self)
def close(self):
if self._closing:
return
self._closing = True
self._loop.remove_reader(self.serial.fd)
self.serial.close()
self._loop.call_soon(self._protocol.connection_lost, None)
def _read_ready(self):
data = self.serial.read(1024)
if data:
self._protocol.data_received(data)
def write(self, data):
self.serial.write(data)
def can_write_eof(self):
return False
def pause_reading(self):
if self._closing:
raise RuntimeError('Cannot pause_reading() when closing')
if self._paused:
raise RuntimeError('Already paused')
self._paused = True
self._loop.remove_reader(self._sock_fd)
if self._loop.get_debug():
logging.debug("%r pauses reading", self)
def resume_reading(self):
if not self._paused:
raise RuntimeError('Not paused')
self._paused = False
if self._closing:
return
self._loop.add_reader(self._sock_fd, self._read_ready)
if self._loop.get_debug():
logging.debug("%r resumes reading", self)
#~ def set_write_buffer_limits(self, high=None, low=None):
#~ def get_write_buffer_size(self):
#~ def writelines(self, list_of_data):
#~ def write_eof(self):
#~ def abort(self):
@asyncio.coroutine
def create_serial_connection(loop, protocol_factory, *args, **kwargs):
ser = serial.Serial(*args, **kwargs)
protocol = protocol_factory()
transport = SerialTransport(loop, protocol, ser)
return (transport, protocol)
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
# test
if __name__ == '__main__':
class Output(asyncio.Protocol):
def connection_made(self, transport):
self.transport = transport
print('port opened', transport)
transport.serial.rts = False
transport.write(b'hello world\n')
def data_received(self, data):
print('data received', repr(data))
self.transport.close()
def connection_lost(self, exc):
print('port closed')
asyncio.get_event_loop().stop()
loop = asyncio.get_event_loop()
coro = create_serial_connection(loop, Output, '/dev/ttyUSB0', baudrate=115200)
loop.run_until_complete(coro)
loop.run_forever()
loop.close()
|