/usr/share/pyshared/twisted/conch/stdio.py is in python-twisted-conch 1:11.1.0-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 | # -*- test-case-name: twisted.conch.test.test_manhole -*-
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.
"""
Asynchronous local terminal input handling
@author: Jp Calderone
"""
import os, tty, sys, termios
from twisted.internet import reactor, stdio, protocol, defer
from twisted.python import failure, reflect, log
from twisted.conch.insults.insults import ServerProtocol
from twisted.conch.manhole import ColoredManhole
class UnexpectedOutputError(Exception):
pass
class TerminalProcessProtocol(protocol.ProcessProtocol):
def __init__(self, proto):
self.proto = proto
self.onConnection = defer.Deferred()
def connectionMade(self):
self.proto.makeConnection(self)
self.onConnection.callback(None)
self.onConnection = None
def write(self, bytes):
self.transport.write(bytes)
def outReceived(self, bytes):
self.proto.dataReceived(bytes)
def errReceived(self, bytes):
self.transport.loseConnection()
if self.proto is not None:
self.proto.connectionLost(failure.Failure(UnexpectedOutputError(bytes)))
self.proto = None
def childConnectionLost(self, childFD):
if self.proto is not None:
self.proto.childConnectionLost(childFD)
def processEnded(self, reason):
if self.proto is not None:
self.proto.connectionLost(reason)
self.proto = None
class ConsoleManhole(ColoredManhole):
"""
A manhole protocol specifically for use with L{stdio.StandardIO}.
"""
def connectionLost(self, reason):
"""
When the connection is lost, there is nothing more to do. Stop the
reactor so that the process can exit.
"""
reactor.stop()
def runWithProtocol(klass):
fd = sys.__stdin__.fileno()
oldSettings = termios.tcgetattr(fd)
tty.setraw(fd)
try:
p = ServerProtocol(klass)
stdio.StandardIO(p)
reactor.run()
finally:
termios.tcsetattr(fd, termios.TCSANOW, oldSettings)
os.write(fd, "\r\x1bc\r")
def main(argv=None):
log.startLogging(file('child.log', 'w'))
if argv is None:
argv = sys.argv[1:]
if argv:
klass = reflect.namedClass(argv[0])
else:
klass = ConsoleManhole
runWithProtocol(klass)
if __name__ == '__main__':
main()
|