/usr/share/pyshared/epsilon/test/utils.py is in python-epsilon 0.6.0-3.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 | from zope.interface import implements
from twisted.internet import interfaces
class FileWrapper:
"""A wrapper around a file-like object to make it behave as a Transport.
This doesn't actually stream the file to the attached protocol,
and is thus useful mainly as a utility for debugging protocols.
"""
implements(interfaces.ITransport)
closed = 0
disconnecting = 0
disconnected = 0
producer = None
streamingProducer = 0
def __init__(self, file):
self.file = file
def write(self, data):
try:
self.file.write(data)
except:
self.handleException()
# self._checkProducer()
def _checkProducer(self):
# Cheating; this is called at "idle" times to allow producers to be
# found and dealt with
if self.producer:
self.producer.resumeProducing()
def registerProducer(self, producer, streaming):
"""From abstract.FileDescriptor
"""
self.producer = producer
self.streamingProducer = streaming
if not streaming:
producer.resumeProducing()
def unregisterProducer(self):
self.producer = None
def stopConsuming(self):
self.unregisterProducer()
self.loseConnection()
def writeSequence(self, iovec):
self.write("".join(iovec))
def loseConnection(self):
self.disconnecting = True
def getPeer(self):
# XXX: According to ITransport, this should return an IAddress!
return 'file', 'file'
def getHost(self):
# XXX: According to ITransport, this should return an IAddress!
return 'file'
def handleException(self):
pass
def resumeProducing(self):
# Never sends data anyways
pass
def pauseProducing(self):
# Never sends data anyways
pass
def stopProducing(self):
self.loseConnection()
# Additional Q2Q Transport requirements
def getQ2QPeer(self):
from vertex import q2q
return q2q.Q2QAddress('file.domain', 'peer.resource')
|