/usr/share/pyshared/asterisk/astemu.py is in python-pyst 0.6.50-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 | import socket
from signal import SIGTERM
from os import fork, kill, waitpid
from time import sleep
class Event(dict):
""" Events are encoded as dicts with a header fieldname to
content-list map. Normally (for all typical asterisk events) the
content-list only has one element. For multiple elements
multiple lines with the same header (but different content) are
sent. This tests cases where asterisk events contain multiple
instances of the same header.
The key 'CONTENT' is special, it denotes text that is appended
to an event (e.g. for testing the output of the command action)
"""
sort_order = dict ((x, n) for n, x in enumerate
(( 'Event'
, 'Response'
, 'Username'
, 'Privilege'
, 'Secret'
, 'Command'
, 'Channel'
, 'ChannelState'
, 'ChannelStateDesc'
, 'CallerIDNum'
, 'CallerIDName'
, 'AccountCode'
, 'Context'
, 'Exten'
, 'Reason'
, 'Uniqueid'
, 'ActionID'
, 'OldAccountCode'
, 'Cause'
, 'Cause-txt'
)))
sort_order ['CONTENT'] = 100000
def sort(self, x):
return self.sort_order.get(x[0], 10000)
def as_string(self, id):
ret = []
if 'Response' in self:
self ['ActionID'] = [id]
for k,v in sorted(self.iteritems(), key=self.sort):
if k == 'CONTENT':
ret.append(v)
else :
if isinstance(v, str):
ret.append (": ".join ((k, v)))
else:
for x in v:
ret.append (": ".join ((k, x)))
ret.append ('')
ret.append ('')
return '\r\n'.join (ret)
@property
def name(self):
return self.get('Event','')
@property
def headers(self):
return self
class AsteriskEmu(object):
""" Emulator for asterisk management interface.
Used for unittests of asterisk.manager.
Now factored into a standalone module for others to use in
unittests of programs that build on pyst's asterisk.manager.
By default let the operating system decide the port number to
bind to, resulting port is stored in self.port.
"""
default_events = dict \
( Login =
( Event
( Response = ('Success',)
, Message = ('Authentication accepted',)
)
,
)
, Logoff =
( Event
( Response = ('Goodbye',)
, Message = ('Thanks for all the fish.',)
)
,
)
)
def __init__(self, chatscript, port = 0):
s = socket.socket (socket.AF_INET, socket.SOCK_STREAM)
s.bind(('localhost', port))
s.listen(1)
pid = fork()
if not pid:
# won't return
self.asterisk_emu(s, chatscript)
self.childpid = pid
host, self.port = s.getsockname()
s.close()
def asterisk_emu(self, sock, chatscript):
""" Emulate asterisk management interface on a socket.
Chatscript is a dict of command names to event list mapping.
The event list contains events to send when the given
command is recognized.
"""
while True:
conn, addr = sock.accept()
f = conn.makefile('rw')
conn.close()
f.write('Asterisk Call Manager/1.1\r\n')
f.flush()
cmd = lastid = ''
try:
for l in f:
if l.startswith ('ActionID:'):
lastid = l.split(':', 1)[1].strip()
elif l.startswith ('Action:'):
cmd = l.split(':', 1)[1].strip()
elif not l.strip():
for d in chatscript, self.default_events:
if cmd in d:
for event in d[cmd]:
f.write(event.as_string(id = lastid))
f.flush()
if cmd == 'Logoff':
f.close()
break
except:
pass
sleep(10000) # wait for being killed
def close(self):
if self.childpid:
kill(self.childpid, SIGTERM)
waitpid(self.childpid, 0)
self.childpid = None
|