/usr/share/pyshared/async/test/test_channel.py is in python-async 0.6.1-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 | """Channel testing"""
from lib import *
from async.channel import *
import time
class TestChannels(TestBase):
def test_base(self):
# creating channel yields a write and a read channal
wc, rc = mkchannel()
assert isinstance(wc, ChannelWriter) # default args
assert isinstance(rc, ChannelReader)
# TEST UNLIMITED SIZE CHANNEL - writing+reading is FIFO
item = 1
item2 = 2
wc.write(item)
wc.write(item2)
# read all - it blocks as its still open for writing
to = 0.2
st = time.time()
assert rc.read(timeout=to) == [item, item2]
assert time.time() - st >= to
# next read blocks. it waits a second
st = time.time()
assert len(rc.read(1, True, to)) == 0
assert time.time() - st >= to
# writing to a closed channel raises
assert not wc.closed()
wc.close()
assert wc.closed()
wc.close() # fine
assert wc.closed()
self.failUnlessRaises(ReadOnly, wc.write, 1)
# reading from a closed channel never blocks
assert len(rc.read()) == 0
assert len(rc.read(5)) == 0
assert len(rc.read(1)) == 0
# test callback channels
wc, rc = mkchannel(wtype = CallbackChannelWriter, rtype = CallbackChannelReader)
cb = [0, 0, 0] # set slots to one if called
def pre_write(item):
cb[0] = 1
return item + 1
def pre_read(count):
cb[1] = 1
def post_read(items):
assert isinstance(items, list)
cb[2] = 1
return [ i+1 for i in items]
# set, verify it returns previous one
assert wc.set_pre_cb(pre_write) is None
assert rc.set_pre_cb(pre_read) is None
assert rc.set_post_cb(post_read) is None
assert wc.set_pre_cb(pre_write) is pre_write
assert rc.set_pre_cb(pre_read) is pre_read
assert rc.set_post_cb(post_read) is post_read
# writer transforms input
val = 5
wc.write(val)
assert cb[0] == 1 and cb[1] == 0
rval = rc.read(1)[0] # read one item, must not block
assert cb[0] == 1 and cb[1] == 1 and cb[2] == 1
assert rval == val + 1 + 1
# ITERATOR READER
reader = IteratorReader(iter(range(10)))
assert len(reader.read(2)) == 2
assert len(reader.read(0)) == 8
# its empty now
assert len(reader.read(0)) == 0
assert len(reader.read(5)) == 0
# doesn't work if item is not an iterator
self.failUnlessRaises(ValueError, IteratorReader, list())
# test general read-iteration - its supported by all readers
reader = IteratorReader(iter(range(10)))
assert len(list(reader)) == 10
# NOTE: its thread-safety is tested by the pool
|