/usr/share/pyshared/txzookeeper/tests/test_conn_failure.py is in python-txzookeeper 0.9.5-0ubuntu1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 | #
# Copyright (C) 2010-2011 Canonical Ltd. All Rights Reserved
#
# This file is part of txzookeeper.
#
# Authors:
# Kapil Thangavelu
#
# txzookeeper is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# txzookeeper is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with txzookeeper. If not, see <http://www.gnu.org/licenses/>.
#
import zookeeper
from twisted.internet import reactor
from twisted.internet.defer import inlineCallbacks
from txzookeeper import ZookeeperClient
from txzookeeper.tests import ZookeeperTestCase, utils
from txzookeeper.tests.proxy import ProxyFactory
class WatchDeliveryConnectionFailedTest(ZookeeperTestCase):
"""Watches are still sent on reconnect.
"""
def setUp(self):
super(WatchDeliveryConnectionFailedTest, self).setUp()
self.proxy = ProxyFactory("127.0.0.1", 2181)
self.proxy_port = reactor.listenTCP(0, self.proxy)
host = self.proxy_port.getHost()
self.proxied_client = ZookeeperClient(
"%s:%s" % (host.host, host.port))
self.direct_client = ZookeeperClient("127.0.0.1:2181", 3000)
self.session_events = []
def session_event_collector(conn, event):
self.session_events.append(event)
self.proxied_client.set_session_callback(session_event_collector)
return self.direct_client.connect()
@inlineCallbacks
def tearDown(self):
zookeeper.set_debug_level(0)
if self.proxied_client.connected:
yield self.proxied_client.close()
if not self.direct_client.connected:
yield self.direct_client.connect()
utils.deleteTree(handle=self.direct_client.handle)
yield self.direct_client.close()
self.proxy.lose_connection()
yield self.proxy_port.stopListening()
def verify_events(self, events, expected):
"""Verify the state of the session events encountered.
"""
for value, state in zip([e.state_name for e in events], expected):
self.assertEqual(value, state)
@inlineCallbacks
def test_child_watch_fires_upon_reconnect(self):
yield self.proxied_client.connect()
# Setup tree
cpath = "/test-tree"
yield self.direct_client.create(cpath)
# Setup watch
child_d, watch_d = self.proxied_client.get_children_and_watch(cpath)
self.assertEqual((yield child_d), [])
# Kill the connection and fire the watch
self.proxy.lose_connection()
yield self.direct_client.create(
cpath + "/abc", flags=zookeeper.SEQUENCE)
# We should still get the child event.
yield watch_d
# We get two pairs of (connecting, connected) for the conn and watch
self.assertEqual(len(self.session_events), 4)
self.verify_events(
self.session_events,
("connecting", "connecting", "connected", "connected"))
@inlineCallbacks
def test_exists_watch_fires_upon_reconnect(self):
yield self.proxied_client.connect()
cpath = "/test"
# Setup watch
exists_d, watch_d = self.proxied_client.exists_and_watch(cpath)
self.assertEqual((yield exists_d), None)
# Kill the connection and fire the watch
self.proxy.lose_connection()
yield self.direct_client.create(cpath)
# We should still get the exists event.
yield watch_d
# We get two pairs of (connecting, connected) for the conn and watch
self.assertEqual(len(self.session_events), 4)
self.verify_events(
self.session_events,
("connecting", "connecting", "connected", "connected"))
@inlineCallbacks
def test_get_watch_fires_upon_reconnect(self):
yield self.proxied_client.connect()
# Setup tree
cpath = "/test"
yield self.direct_client.create(cpath, "abc")
# Setup watch
get_d, watch_d = self.proxied_client.get_and_watch(cpath)
content, stat = yield get_d
self.assertEqual(content, "abc")
# Kill the connection and fire the watch
self.proxy.lose_connection()
yield self.direct_client.set(cpath, "xyz")
# We should still get the exists event.
yield watch_d
# We also two pairs of (connecting, connected) for the conn and watch
self.assertEqual(len(self.session_events), 4)
self.verify_events(
self.session_events,
("connecting", "connecting", "connected", "connected"))
@inlineCallbacks
def test_watch_delivery_failure_resends(self):
"""Simulate a network failure for the watch delivery
The zk server effectively sends the watch delivery to the client,
but the client never recieves it.
"""
yield self.proxied_client.connect()
cpath = "/test"
# Setup watch
exists_d, watch_d = self.proxied_client.exists_and_watch(cpath)
self.assertEqual((yield exists_d), None)
# Pause the connection fire the watch, and blackhole the data.
self.proxy.set_blocked(True)
yield self.direct_client.create(cpath)
self.proxy.set_blocked(False)
self.proxy.lose_connection()
# We should still get the exists event.
yield watch_d
@inlineCallbacks
def test_session_exception(self):
"""Test session expiration.
On a single server, the best way to produce a session expiration is
timing out the client.
"""
yield self.proxied_client.connect()
data = yield self.proxied_client.exists("/")
self.assertTrue(data)
self.proxy.set_blocked(True)
# Wait for session expiration, on a single server options are limited
yield self.sleep(15)
# Unblock the proxy for next connect, and then drop the connection.
self.proxy.set_blocked(False)
self.proxy.lose_connection()
# Wait for a reconnect (see below why we can't just use a watch here)
yield self.sleep(2)
yield self.assertFailure(
self.proxied_client.get("/a"),
zookeeper.SessionExpiredException)
self.assertEqual(self.session_events[-1].state_name, "expired")
@inlineCallbacks
def xtest_binding_bug_session_exception(self):
"""This test triggers an exception in the python-zookeeper binding.
File "txzookeeper/client.py", line 491, in create
self.handle, path, data, acls, flags, callback)
exceptions.SystemError: error return without exception set
"""
yield self.proxied_client.connect()
data_d, watch_d = yield self.proxied_client.exists_and_watch("/")
self.assertTrue((yield data_d))
self.proxy.set_blocked(True)
# Wait for session expiration, on a single server options are limited
yield self.sleep(15)
# Unblock the proxy for next connect, and then drop the connection.
self.proxy.set_blocked(False)
self.proxy.lose_connection()
# Wait for a reconnect
yield self.assertFailure(watch_d, zookeeper.SessionExpiredException)
# Leads to bindings bug failure
yield self.assertFailure(
self.proxied_client.get("/a"),
zookeeper.SessionExpiredException)
self.assertEqual(self.session_events[-1].state_name, "expired")
|