/usr/share/pyshared/circuits/io/serial.py is in python-circuits 2.1.0-2.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 | # Module: serial
# Date: 4th August 2004
# Author: James Mills <prologic@shortcircuit.net.au>
"""Serial I/O
This module implements basic Serial (RS232) I/O.
"""
import os
import select
from collections import deque
from circuits.core import Component
from circuits.tools import tryimport
from .events import Closed, Error, Opened, Read
serial = tryimport("serial")
TIMEOUT = 0.2
BUFSIZE = 4096
class Serial(Component):
channel = "serial"
def __init__(self, port, baudrate=115200, bufsize=BUFSIZE,
timeout=TIMEOUT, channel=channel):
super(Serial, self).__init__(channel=channel)
if serial is None:
raise RuntimeError("No serial support available")
self.port = port
self.baudrate = baudrate
self._serial = serial.Serial()
self._serial.port = port
self._serial.baudrate = baudrate
if os.name == "posix":
self._serial.timeout = 0 # non-blocking (POSIX)
else:
self._serial.timeout = timeout
self._buffer = deque()
self._bufsize = bufsize
self._timeout = timeout
self._read = []
self._write = []
self._serial.open()
self._fd = self._serial.fileno()
self._read.append(self._fd)
self.fire(Opened(self.port))
def __tick__(self):
r, w, e = select.select(self._read, self._write, [], self._timeout)
if w and self._buffer:
data = self._buffer.popleft()
try:
bytes = os.write(self._fd, data)
if bytes < len(data):
self._buffer.append(data[bytes:])
else:
if not self._buffer and self._fd in self._write:
self._write.remove(self._fd)
except OSError as error:
self.fire(Error(error))
if r:
data = os.read(self._fd, self._bufsize)
if data:
self.fire(Read(data))
def write(self, data):
if self._fd not in self._write:
self._write.append(self._fd)
self._buffer.append(data)
def close(self):
self._fd = None
self._read = []
self._write = []
self._serial.close()
self.fire(Closed(self.port))
|