/usr/lib/telepathy-gabble-tests/twisted/sasl/complex.py is in telepathy-gabble-tests 0.18.2-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 | """
Test the server sasl channel with the PLAIN mechanism
"""
import dbus
from servicetest import EventPattern, assertEquals, assertSameSets, call_async
from gabbletest import exec_test
import constants as cs
from saslutil import SaslEventAuthenticator, connect_and_get_sasl_channel
JID = "test@example.org"
INITIAL_RESPONSE = 'Thunder and lightning. Enter three Witches.'
CR_PAIRS = [
('When shall we three meet again?', 'Ere the set of sun.'),
('Where the place?', 'Upon the heath.'),
]
SUCCESS_DATA = 'Exeunt.'
MECHANISMS = ["PLAIN", "DIGEST-MD5", "SCOTTISH-PLAY"]
def test_complex_success(q, bus, conn, stream, with_extra_data=True,
accept_early=False):
chan, props = connect_and_get_sasl_channel(q, bus, conn)
assertSameSets(MECHANISMS + ['X-TELEPATHY-PASSWORD'],
props.get(cs.SASL_AVAILABLE_MECHANISMS))
call_async(q, chan.SASLAuthentication, 'StartMechanismWithData',
"FOO", "")
q.expect('dbus-error', method='StartMechanismWithData',
name=cs.NOT_IMPLEMENTED)
if with_extra_data:
chan.SASLAuthentication.StartMechanismWithData("SCOTTISH-PLAY",
INITIAL_RESPONSE)
e = q.expect('sasl-auth', initial_response=INITIAL_RESPONSE)
else:
chan.SASLAuthentication.StartMechanism("SCOTTISH-PLAY")
e = q.expect('sasl-auth', has_initial_response=False)
authenticator = e.authenticator
q.expect('dbus-signal', signal='SASLStatusChanged',
interface=cs.CHANNEL_IFACE_SASL_AUTH,
args=[cs.SASL_STATUS_IN_PROGRESS, '', {}])
if not with_extra_data:
# send the stage directions in-band instead
authenticator.challenge('')
e = q.expect('dbus-signal', signal='NewChallenge',
interface=cs.CHANNEL_IFACE_SASL_AUTH)
# this ought to be '' but dbus-python has fd.o #28131
assert e.args in ([''], ['None'])
chan.SASLAuthentication.Respond(INITIAL_RESPONSE)
q.expect('sasl-response', response=INITIAL_RESPONSE)
for challenge, response in CR_PAIRS:
authenticator.challenge(challenge)
q.expect('dbus-signal', signal='NewChallenge',
interface=cs.CHANNEL_IFACE_SASL_AUTH,
args=[challenge])
chan.SASLAuthentication.Respond(response)
q.expect('sasl-response', response=response)
if with_extra_data:
authenticator.success(SUCCESS_DATA)
else:
# The success data is sent in-band as a challenge
authenticator.challenge(SUCCESS_DATA)
q.expect('dbus-signal', signal='NewChallenge',
interface=cs.CHANNEL_IFACE_SASL_AUTH,
args=[SUCCESS_DATA])
if accept_early:
# the UI can tell that this challenge isn't actually a challenge,
# it's a success in disguise
chan.SASLAuthentication.AcceptSASL()
q.expect('dbus-signal', signal='SASLStatusChanged',
interface=cs.CHANNEL_IFACE_SASL_AUTH,
args=[cs.SASL_STATUS_CLIENT_ACCEPTED, '', {}])
else:
chan.SASLAuthentication.Respond(dbus.ByteArray(''))
if with_extra_data:
# Wocky removes the distinction between a challenge containing
# success data followed by a plain success, and a success
# containing initial data, so we won't get to Server_Succeeded
# til we "respond" to the "challenge". However, at the XMPP level,
# we shouldn't get a response to a success.
q.forbid_events([EventPattern('sasl-response')])
else:
q.expect('sasl-response', response='')
authenticator.success(None)
if not accept_early:
# *now* we accept
q.expect('dbus-signal', signal='SASLStatusChanged',
interface=cs.CHANNEL_IFACE_SASL_AUTH,
args=[cs.SASL_STATUS_SERVER_SUCCEEDED, '', {}])
# We're willing to accept this SASL transaction
chan.SASLAuthentication.AcceptSASL()
q.expect('dbus-signal', signal='SASLStatusChanged',
interface=cs.CHANNEL_IFACE_SASL_AUTH,
args=[cs.SASL_STATUS_SUCCEEDED, '', {}])
q.expect('dbus-signal', signal='StatusChanged',
args=[cs.CONN_STATUS_CONNECTED, cs.CSR_REQUESTED])
chan.Close()
# ... and check that the Connection is still OK
conn.GetSelfHandle()
def test_complex_success_data(q, bus, conn, stream):
test_complex_success(q, bus, conn, stream, True)
def test_complex_success_no_data(q, bus, conn, stream):
test_complex_success(q, bus, conn, stream, False)
def test_complex_success_data_accept(q, bus, conn, stream):
test_complex_success(q, bus, conn, stream, True, True)
def test_complex_success_no_data_accept(q, bus, conn, stream):
test_complex_success(q, bus, conn, stream, False, True)
if __name__ == '__main__':
exec_test(
test_complex_success_data, {'password': None, 'account' : JID},
authenticator=SaslEventAuthenticator(JID.split('@')[0], MECHANISMS),
do_connect=False)
exec_test(
test_complex_success_no_data, {'password': None, 'account' : JID},
authenticator=SaslEventAuthenticator(JID.split('@')[0], MECHANISMS),
do_connect=False)
exec_test(
test_complex_success_data_accept, {'password': None, 'account' : JID},
authenticator=SaslEventAuthenticator(JID.split('@')[0], MECHANISMS),
do_connect=False)
exec_test(
test_complex_success_no_data_accept, {'password': None, 'account' : JID},
authenticator=SaslEventAuthenticator(JID.split('@')[0], MECHANISMS),
do_connect=False)
|