This file is indexed.

/usr/share/pyshared/async/test/test_channel.py is in python-async 0.6.1-1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
"""Channel testing"""
from lib import *
from async.channel import *

import time

class TestChannels(TestBase):
	
	def test_base(self):
		# creating channel yields a write and a read channal
		wc, rc = mkchannel()
		assert isinstance(wc, ChannelWriter)		# default args
		assert isinstance(rc, ChannelReader)
		
		
		# TEST UNLIMITED SIZE CHANNEL - writing+reading is FIFO
		item = 1
		item2 = 2
		wc.write(item)
		wc.write(item2)
		
		# read all - it blocks as its still open for writing
		to = 0.2
		st = time.time()
		assert rc.read(timeout=to) == [item, item2]
		assert time.time() - st >= to
		
		# next read blocks. it waits a second
		st = time.time()
		assert len(rc.read(1, True, to)) == 0
		assert time.time() - st >= to
		
		# writing to a closed channel raises
		assert not wc.closed()
		wc.close()
		assert wc.closed()
		wc.close()	# fine
		assert wc.closed()
		
		self.failUnlessRaises(ReadOnly, wc.write, 1)
		
		# reading from a closed channel never blocks
		assert len(rc.read()) == 0
		assert len(rc.read(5)) == 0
		assert len(rc.read(1)) == 0
		
		
		# test callback channels
		wc, rc = mkchannel(wtype = CallbackChannelWriter, rtype = CallbackChannelReader)
		
		cb = [0, 0, 0]		# set slots to one if called
		def pre_write(item):
			cb[0] = 1
			return item + 1
		def pre_read(count):
			cb[1] = 1
		def post_read(items):
			assert isinstance(items, list)
			cb[2] = 1
			return [ i+1 for i in items]
			
			
		# set, verify it returns previous one
		assert wc.set_pre_cb(pre_write) is None
		assert rc.set_pre_cb(pre_read) is None
		assert rc.set_post_cb(post_read) is None
		assert wc.set_pre_cb(pre_write) is pre_write
		assert rc.set_pre_cb(pre_read) is pre_read
		assert rc.set_post_cb(post_read) is post_read
		
		# writer transforms input
		val = 5
		wc.write(val)
		assert cb[0] == 1 and cb[1] == 0
		
		rval = rc.read(1)[0]		# read one item, must not block
		assert cb[0] == 1 and cb[1] == 1 and cb[2] == 1
		assert rval == val + 1 + 1
		
		
		# ITERATOR READER
		reader = IteratorReader(iter(range(10)))
		assert len(reader.read(2)) == 2
		assert len(reader.read(0)) == 8
		# its empty now
		assert len(reader.read(0)) == 0
		assert len(reader.read(5)) == 0
		
		# doesn't work if item is not an iterator
		self.failUnlessRaises(ValueError, IteratorReader, list())
		
		
		# test general read-iteration - its supported by all readers
		reader = IteratorReader(iter(range(10)))
		assert len(list(reader)) == 10
		
		# NOTE: its thread-safety is tested by the pool