/usr/share/pyshared/txaws/client/tests/test_base.py is in python-txaws 0.2.3-1ubuntu1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 | import os
from twisted.internet import reactor
from twisted.internet.error import ConnectionRefusedError
from twisted.protocols.policies import WrappingFactory
from twisted.python import log
from twisted.python.filepath import FilePath
from twisted.python.failure import Failure
from twisted.test.test_sslverify import makeCertificate
from twisted.web import server, static
from twisted.web.client import HTTPClientFactory
from twisted.web.error import Error as TwistedWebError
from txaws.client import ssl
from txaws.client.base import BaseClient, BaseQuery, error_wrapper
from txaws.service import AWSServiceEndpoint
from txaws.testing.base import TXAWSTestCase
class ErrorWrapperTestCase(TXAWSTestCase):
def test_204_no_content(self):
failure = Failure(TwistedWebError(204, "No content"))
wrapped = error_wrapper(failure, None)
self.assertEquals(wrapped, "204 No content")
def test_302_found(self):
# XXX I'm not sure we want to raise for 300s...
failure = Failure(TwistedWebError(302, "found"))
error = self.assertRaises(
Exception, error_wrapper, failure, None)
self.assertEquals(failure.type, type(error))
self.assertTrue(isinstance(error, TwistedWebError))
self.assertEquals(str(error), "302 found")
def test_500(self):
failure = Failure(TwistedWebError(500, "internal error"))
error = self.assertRaises(
Exception, error_wrapper, failure, None)
self.assertTrue(isinstance(error, TwistedWebError))
self.assertEquals(str(error), "500 internal error")
def test_timeout_error(self):
failure = Failure(Exception("timeout"))
error = self.assertRaises(Exception, error_wrapper, failure, None)
self.assertTrue(isinstance(error, Exception))
self.assertEquals(str(error), "timeout")
def test_connection_error(self):
failure = Failure(ConnectionRefusedError("timeout"))
error = self.assertRaises(
Exception, error_wrapper, failure, ConnectionRefusedError)
self.assertTrue(isinstance(error, ConnectionRefusedError))
class BaseClientTestCase(TXAWSTestCase):
def test_creation(self):
client = BaseClient("creds", "endpoint", "query factory", "parser")
self.assertEquals(client.creds, "creds")
self.assertEquals(client.endpoint, "endpoint")
self.assertEquals(client.query_factory, "query factory")
self.assertEquals(client.parser, "parser")
class BaseQueryTestCase(TXAWSTestCase):
def setUp(self):
self.cleanupServerConnections = 0
name = self.mktemp()
os.mkdir(name)
FilePath(name).child("file").setContent("0123456789")
r = static.File(name)
self.site = server.Site(r, timeout=None)
self.wrapper = WrappingFactory(self.site)
self.port = self._listen(self.wrapper)
self.portno = self.port.getHost().port
def tearDown(self):
# If the test indicated it might leave some server-side connections
# around, clean them up.
connections = self.wrapper.protocols.keys()
# If there are fewer server-side connections than requested,
# that's okay. Some might have noticed that the client closed
# the connection and cleaned up after themselves.
for n in range(min(len(connections), self.cleanupServerConnections)):
proto = connections.pop()
log.msg("Closing %r" % (proto,))
proto.transport.loseConnection()
if connections:
log.msg("Some left-over connections; this test is probably buggy.")
return self.port.stopListening()
def _listen(self, site):
return reactor.listenTCP(0, site, interface="127.0.0.1")
def _get_url(self, path):
return "http://127.0.0.1:%d/%s" % (self.portno, path)
def test_creation(self):
query = BaseQuery("an action", "creds", "http://endpoint")
self.assertEquals(query.factory, HTTPClientFactory)
self.assertEquals(query.action, "an action")
self.assertEquals(query.creds, "creds")
self.assertEquals(query.endpoint, "http://endpoint")
def test_init_requires_action(self):
self.assertRaises(TypeError, BaseQuery)
def test_init_requires_creds(self):
self.assertRaises(TypeError, BaseQuery, None)
def test_get_page(self):
query = BaseQuery("an action", "creds", "http://endpoint")
d = query.get_page(self._get_url("file"))
d.addCallback(self.assertEquals, "0123456789")
return d
def test_get_request_headers_no_client(self):
query = BaseQuery("an action", "creds", "http://endpoint")
results = query.get_request_headers()
self.assertEquals(results, None)
def test_get_request_headers_with_client(self):
def check_results(results):
self.assertEquals(results.keys(), [])
self.assertEquals(results.values(), [])
query = BaseQuery("an action", "creds", "http://endpoint")
d = query.get_page(self._get_url("file"))
d.addCallback(query.get_request_headers)
return d.addCallback(check_results)
def test_get_response_headers_no_client(self):
query = BaseQuery("an action", "creds", "http://endpoint")
results = query.get_response_headers()
self.assertEquals(results, None)
def test_get_response_headers_with_client(self):
def check_results(results):
self.assertEquals(sorted(results.keys()), [
"accept-ranges", "content-length", "content-type", "date",
"last-modified", "server"])
self.assertEquals(len(results.values()), 6)
query = BaseQuery("an action", "creds", "http://endpoint")
d = query.get_page(self._get_url("file"))
d.addCallback(query.get_response_headers)
return d.addCallback(check_results)
# XXX for systems that don't have certs in the DEFAULT_CERT_PATH, this test
# will fail; instead, let's create some certs in a temp directory and set
# the DEFAULT_CERT_PATH to point there.
def test_ssl_hostname_verification(self):
"""
If the endpoint passed to L{BaseQuery} has C{ssl_hostname_verification}
sets to C{True}, a L{VerifyingContextFactory} is passed to
C{connectSSL}.
"""
class FakeReactor(object):
def __init__(self):
self.connects = []
def connectSSL(self, host, port, client, factory):
self.connects.append((host, port, client, factory))
certs = makeCertificate(O="Test Certificate", CN="something")[1]
self.patch(ssl, "_ca_certs", certs)
fake_reactor = FakeReactor()
endpoint = AWSServiceEndpoint(ssl_hostname_verification=True)
query = BaseQuery("an action", "creds", endpoint, fake_reactor)
query.get_page("https://example.com/file")
[(host, port, client, factory)] = fake_reactor.connects
self.assertEqual("example.com", host)
self.assertEqual(443, port)
self.assertTrue(isinstance(factory, ssl.VerifyingContextFactory))
self.assertEqual("example.com", factory.host)
self.assertNotEqual([], factory.caCerts)
|