/usr/share/pyshared/zope/server/dualmodechannel.py is in python-zope.server 3.8.6-0ubuntu1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 | ##############################################################################
#
# Copyright (c) 2001, 2002 Zope Foundation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Dual-mode channel
"""
import asyncore
import socket
from time import time
from zope.server import trigger
from zope.server.adjustments import default_adj
from zope.server.buffers import OverflowableBuffer
# Create the main trigger if it doesn't exist yet.
the_trigger = trigger.trigger()
class DualModeChannel(asyncore.dispatcher):
"""Channel that switches between asynchronous and synchronous mode.
Call set_sync() before using a channel in a thread other than
the thread handling the main loop.
Call set_async() to give the channel back to the thread handling
the main loop.
"""
# will_close is set to True to close the socket.
will_close = False
# boolean: async or sync mode
async_mode = True
def __init__(self, conn, addr, adj=None):
self.addr = addr
if adj is None:
adj = default_adj
self.adj = adj
self.outbuf = OverflowableBuffer(adj.outbuf_overflow)
self.creation_time = time()
asyncore.dispatcher.__init__(self, conn)
#
# ASYNCHRONOUS METHODS
#
def handle_close(self):
self.close()
def writable(self):
if not self.async_mode:
return 0
return self.will_close or self.outbuf
def handle_write(self):
if not self.async_mode:
return
if self.outbuf:
try:
self._flush_some()
except socket.error:
self.handle_comm_error()
elif self.will_close:
self.close()
self.last_activity = time()
def readable(self):
if not self.async_mode:
return 0
return not self.will_close
def handle_read(self):
if not self.async_mode or self.will_close:
return
try:
data = self.recv(self.adj.recv_bytes)
except socket.error:
self.handle_comm_error()
return
self.last_activity = time()
self.received(data)
def received(self, data):
"""
Override to receive data in async mode.
"""
pass
def handle_comm_error(self):
"""
Designed for handling communication errors that occur
during asynchronous operations *only*. Probably should log
this, but in a different place.
"""
self.handle_error()
def set_sync(self):
"""Switches to synchronous mode.
The main thread will stop calling received().
"""
self.async_mode = False
#
# SYNCHRONOUS METHODS
#
def flush(self, block=True):
"""Sends pending data.
If block is set, this pauses the application. If it is turned
off, only the amount of data that can be sent without blocking
is sent.
"""
if not block:
while self._flush_some():
pass
return
blocked = False
try:
while self.outbuf:
# We propagate errors to the application on purpose.
if not blocked:
self.socket.setblocking(1)
blocked = True
self._flush_some()
finally:
if blocked:
self.socket.setblocking(0)
def set_async(self):
"""Switches to asynchronous mode.
The main thread will begin calling received() again.
"""
self.async_mode = True
self.pull_trigger()
self.last_activity = time()
#
# METHODS USED IN BOTH MODES
#
def write(self, data):
wrote = 0
if isinstance(data, str):
if data:
self.outbuf.append(data)
wrote = len(data)
else:
for v in data:
if v:
self.outbuf.append(v)
wrote += len(v)
while len(self.outbuf) >= self.adj.send_bytes:
# Send what we can without blocking.
# We propagate errors to the application on purpose
# (to stop the application if the connection closes).
if not self._flush_some():
break
return wrote
def pull_trigger(self):
"""Wakes up the main loop.
"""
the_trigger.pull_trigger()
def _flush_some(self):
"""Flushes data.
Returns 1 if some data was sent."""
outbuf = self.outbuf
if outbuf and self.connected:
chunk = outbuf.get(self.adj.send_bytes)
num_sent = self.send(chunk)
if num_sent:
outbuf.skip(num_sent, 1)
return 1
return 0
def close_when_done(self):
# Flush all possible.
while self._flush_some():
pass
self.will_close = True
if not self.async_mode:
# For safety, don't close the socket until the
# main thread calls handle_write().
self.async_mode = True
self.pull_trigger()
def close(self):
# Always close in asynchronous mode. If the connection is
# closed in a thread, the main loop can end up with a bad file
# descriptor.
assert self.async_mode
self.connected = False
asyncore.dispatcher.close(self)
|