This file is indexed.

/usr/share/pyshared/txzookeeper/tests/test_conn_failure.py is in python-txzookeeper 0.9.5-0ubuntu1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
#
#  Copyright (C) 2010-2011 Canonical Ltd. All Rights Reserved
#
#  This file is part of txzookeeper.
#
#  Authors:
#   Kapil Thangavelu
#
#  txzookeeper is free software: you can redistribute it and/or modify
#  it under the terms of the GNU Lesser General Public License as published by
#  the Free Software Foundation, either version 3 of the License, or
#  (at your option) any later version.
#
#  txzookeeper is distributed in the hope that it will be useful,
#  but WITHOUT ANY WARRANTY; without even the implied warranty of
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#  GNU Lesser General Public License for more details.
#
#  You should have received a copy of the GNU Lesser General Public License
#  along with txzookeeper.  If not, see <http://www.gnu.org/licenses/>.
#

import zookeeper

from twisted.internet import reactor
from twisted.internet.defer import inlineCallbacks

from txzookeeper import ZookeeperClient
from txzookeeper.tests import ZookeeperTestCase, utils
from txzookeeper.tests.proxy import ProxyFactory


class WatchDeliveryConnectionFailedTest(ZookeeperTestCase):
    """Watches are still sent on reconnect.
    """

    def setUp(self):
        super(WatchDeliveryConnectionFailedTest, self).setUp()

        self.proxy = ProxyFactory("127.0.0.1", 2181)
        self.proxy_port = reactor.listenTCP(0, self.proxy)
        host = self.proxy_port.getHost()
        self.proxied_client = ZookeeperClient(
            "%s:%s" % (host.host, host.port))
        self.direct_client = ZookeeperClient("127.0.0.1:2181", 3000)
        self.session_events = []

        def session_event_collector(conn, event):
            self.session_events.append(event)

        self.proxied_client.set_session_callback(session_event_collector)
        return self.direct_client.connect()

    @inlineCallbacks
    def tearDown(self):
        zookeeper.set_debug_level(0)
        if self.proxied_client.connected:
            yield self.proxied_client.close()
        if not self.direct_client.connected:
            yield self.direct_client.connect()
        utils.deleteTree(handle=self.direct_client.handle)
        yield self.direct_client.close()
        self.proxy.lose_connection()
        yield self.proxy_port.stopListening()

    def verify_events(self, events, expected):
        """Verify the state of the session events encountered.
        """
        for value, state in zip([e.state_name for e in events], expected):
            self.assertEqual(value, state)

    @inlineCallbacks
    def test_child_watch_fires_upon_reconnect(self):
        yield self.proxied_client.connect()

        # Setup tree
        cpath = "/test-tree"
        yield self.direct_client.create(cpath)

        # Setup watch
        child_d, watch_d = self.proxied_client.get_children_and_watch(cpath)

        self.assertEqual((yield child_d), [])

        # Kill the connection and fire the watch
        self.proxy.lose_connection()
        yield self.direct_client.create(
            cpath + "/abc", flags=zookeeper.SEQUENCE)

        # We should still get the child event.
        yield watch_d

        # We get two pairs of (connecting, connected) for the conn and watch
        self.assertEqual(len(self.session_events), 4)
        self.verify_events(
            self.session_events,
            ("connecting", "connecting", "connected", "connected"))

    @inlineCallbacks
    def test_exists_watch_fires_upon_reconnect(self):
        yield self.proxied_client.connect()
        cpath = "/test"

        # Setup watch
        exists_d, watch_d = self.proxied_client.exists_and_watch(cpath)

        self.assertEqual((yield exists_d), None)

        # Kill the connection and fire the watch
        self.proxy.lose_connection()
        yield self.direct_client.create(cpath)

        # We should still get the exists event.
        yield watch_d

        # We get two pairs of (connecting, connected) for the conn and watch
        self.assertEqual(len(self.session_events), 4)
        self.verify_events(
            self.session_events,
            ("connecting", "connecting", "connected", "connected"))

    @inlineCallbacks
    def test_get_watch_fires_upon_reconnect(self):
        yield self.proxied_client.connect()
        # Setup tree
        cpath = "/test"
        yield self.direct_client.create(cpath, "abc")

        # Setup watch
        get_d, watch_d = self.proxied_client.get_and_watch(cpath)
        content, stat = yield get_d
        self.assertEqual(content, "abc")

        # Kill the connection and fire the watch
        self.proxy.lose_connection()
        yield self.direct_client.set(cpath, "xyz")

        # We should still get the exists event.
        yield watch_d

        # We also two pairs of (connecting, connected) for the conn and watch
        self.assertEqual(len(self.session_events), 4)
        self.verify_events(
            self.session_events,
            ("connecting", "connecting", "connected", "connected"))

    @inlineCallbacks
    def test_watch_delivery_failure_resends(self):
        """Simulate a network failure for the watch delivery

        The zk server effectively sends the watch delivery to the client,
        but the client never recieves it.
        """
        yield self.proxied_client.connect()
        cpath = "/test"

        # Setup watch
        exists_d, watch_d = self.proxied_client.exists_and_watch(cpath)

        self.assertEqual((yield exists_d), None)

        # Pause the connection fire the watch, and blackhole the data.
        self.proxy.set_blocked(True)
        yield self.direct_client.create(cpath)
        self.proxy.set_blocked(False)
        self.proxy.lose_connection()

        # We should still get the exists event.
        yield watch_d

    @inlineCallbacks
    def test_session_exception(self):
        """Test session expiration.

        On a single server, the best way to produce a session expiration is
        timing out the client.
        """
        yield self.proxied_client.connect()
        data = yield self.proxied_client.exists("/")
        self.assertTrue(data)
        self.proxy.set_blocked(True)
        # Wait for session expiration, on a single server options are limited
        yield self.sleep(15)
        # Unblock the proxy for next connect, and then drop the connection.
        self.proxy.set_blocked(False)
        self.proxy.lose_connection()
        # Wait for a reconnect (see below why we can't just use a watch here)
        yield self.sleep(2)
        yield self.assertFailure(
            self.proxied_client.get("/a"),
            zookeeper.SessionExpiredException)
        self.assertEqual(self.session_events[-1].state_name, "expired")

    @inlineCallbacks
    def xtest_binding_bug_session_exception(self):
        """This test triggers an exception in the python-zookeeper binding.

        File "txzookeeper/client.py", line 491, in create
           self.handle, path, data, acls, flags, callback)
        exceptions.SystemError: error return without exception set
        """
        yield self.proxied_client.connect()
        data_d, watch_d = yield self.proxied_client.exists_and_watch("/")
        self.assertTrue((yield data_d))
        self.proxy.set_blocked(True)
        # Wait for session expiration, on a single server options are limited
        yield self.sleep(15)
        # Unblock the proxy for next connect, and then drop the connection.
        self.proxy.set_blocked(False)
        self.proxy.lose_connection()
        # Wait for a reconnect
        yield self.assertFailure(watch_d, zookeeper.SessionExpiredException)
        # Leads to bindings bug failure
        yield self.assertFailure(
            self.proxied_client.get("/a"),
            zookeeper.SessionExpiredException)
        self.assertEqual(self.session_events[-1].state_name, "expired")