/usr/lib/telepathy-gabble-tests/twisted/tubes/close-muc-with-closed-tube.py is in telepathy-gabble-tests 0.18.2-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 | """Test IBB stream tube support in the context of a MUC."""
import dbus
from servicetest import call_async, EventPattern, unwrap, assertEquals
from gabbletest import exec_test, make_result_iq, acknowledge_iq, make_muc_presence
import constants as cs
import ns
import tubetestutil as t
sample_parameters = dbus.Dictionary({
's': 'hello',
'ay': dbus.ByteArray('hello'),
'u': dbus.UInt32(123),
'i': dbus.Int32(-123),
}, signature='sv')
def test(q, bus, conn, stream):
iq_event = q.expect('stream-iq', to=None, query_ns='vcard-temp',
query_name='vCard')
acknowledge_iq(stream, iq_event.stanza)
call_async(q, conn, 'RequestHandles', cs.HT_ROOM, ['chat@conf.localhost'])
event = q.expect('dbus-return', method='RequestHandles')
handles = event.value[0]
room_handle = handles[0]
# join the muc
call_async(q, conn, 'RequestChannel',
cs.CHANNEL_TYPE_TEXT, cs.HT_ROOM, room_handle, True)
_, stream_event = q.expect_many(
EventPattern('dbus-signal', signal='MembersChanged',
args=[u'', [], [], [], [2], 0, 0]),
EventPattern('stream-presence', to='chat@conf.localhost/test'))
# Send presence for other member of room.
stream.send(make_muc_presence('owner', 'moderator', 'chat@conf.localhost', 'bob'))
# Send presence for own membership of room.
stream.send(make_muc_presence('none', 'participant', 'chat@conf.localhost', 'test'))
q.expect('dbus-signal', signal='MembersChanged',
args=[u'', [2, 3], [], [], [], 0, 0])
assert conn.InspectHandles(cs.HT_CONTACT, [2, 3]) == \
['chat@conf.localhost/test', 'chat@conf.localhost/bob']
bob_handle = 3
event = q.expect('dbus-return', method='RequestChannel')
text_chan = bus.get_object(conn.bus_name, event.value[0])
# Bob offers a muc tube
tube_id = 666
stream_id = 1234
presence = make_muc_presence('owner', 'moderator', 'chat@conf.localhost', 'bob')
tubes = presence.addElement((ns.TUBES, 'tubes'))
tube = tubes.addElement((None, 'tube'))
tube['type'] = 'dbus'
tube['service'] = 'org.telepathy.freedesktop.test'
tube['id'] = str(tube_id)
tube['stream-id'] = str(stream_id)
tube['dbus-name'] = ':2.Y2Fzc2lkeS10ZXN0MgAA'
tube['initiator'] = 'chat@conf.localhost/bob'
parameters = tube.addElement((None, 'parameters'))
parameter = parameters.addElement((None, 'parameter'))
parameter['name'] = 's'
parameter['type'] = 'str'
parameter.addContent('hello')
parameter = parameters.addElement((None, 'parameter'))
parameter['name'] = 'ay'
parameter['type'] = 'bytes'
parameter.addContent('aGVsbG8=')
parameter = parameters.addElement((None, 'parameter'))
parameter['name'] = 'u'
parameter['type'] = 'uint'
parameter.addContent('123')
parameter = parameters.addElement((None, 'parameter'))
parameter['name'] = 'i'
parameter['type'] = 'int'
parameter.addContent('-123')
stream.send(presence)
def new_chan_predicate(e):
path, props = e.args[0][0]
return props[cs.CHANNEL_TYPE] == cs.CHANNEL_TYPE_DBUS_TUBE
# tube channel is automatically created
event = q.expect('dbus-signal', signal='NewChannels',
predicate=new_chan_predicate)
path, props = event.args[0][0]
assertEquals(cs.CHANNEL_TYPE_DBUS_TUBE, props[cs.CHANNEL_TYPE])
assertEquals(cs.HT_ROOM, props[cs.TARGET_HANDLE_TYPE])
assertEquals(room_handle, props[cs.TARGET_HANDLE])
assertEquals('chat@conf.localhost', props[cs.TARGET_ID])
assertEquals(False, props[cs.REQUESTED])
tube_chan = bus.get_object(conn.bus_name, path)
tube_iface = dbus.Interface(tube_chan, cs.CHANNEL_TYPE_DBUS_TUBE)
# reject the tube
tube_iface.Close(dbus_interface=cs.CHANNEL)
q.expect('dbus-signal', signal='ChannelClosed')
# close the text channel
text_chan.Close()
if __name__ == '__main__':
exec_test(test)
|