/usr/share/pyshared/tp/netlib/discover/avahi_browser.py is in python-tp-netlib 0.2.5-3.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 | import avahi_disabled
import os, sys
import traceback
import time
import avahi, avahi.ServiceTypeDatabase
import gobject
import dbus
from dbus.mainloop.glib import DBusGMainLoop
service_type_browsers = {}
service_browsers = {}
service_type_db = avahi.ServiceTypeDatabase.ServiceTypeDatabase()
service_seen = {}
from browse import ZeroConfBrowser as ZeroConfBrowserBase
import threading
class ZeroConfBrowser(ZeroConfBrowserBase):
def check():
bus = dbus.SystemBus(mainloop=DBusGMainLoop(set_as_default=False))
try:
server = dbus.Interface(bus.get_object(avahi.DBUS_NAME, avahi.DBUS_PATH_SERVER), avahi.DBUS_INTERFACE_SERVER)
print "avahi version", server.GetVersionString()
except dbus.DBusException, e:
print e
return False
return True
check = staticmethod(check)
######################################
# Helper functions
######################################
def protoname(self, protocol):
if protocol == avahi.PROTO_INET:
return "IPv4"
if protocol == avahi.PROTO_INET6:
return "IPv6"
return "n/a"
def siocgifname(self, interface):
if interface <= 0:
return "n/a"
else:
return self.server.GetNetworkInterfaceNameByIndex(interface)
def get_interface_name(self, interface, protocol):
if interface == avahi.IF_UNSPEC and protocol == avahi.PROTO_UNSPEC:
return "Wide Area"
else:
return str(self.siocgifname(interface)) + " " + str(self.protoname(protocol))
def lookup_type(self, stype):
global service_type_db
try:
return service_type_db[stype]
except KeyError:
return stype
def print_error(self, err):
print "Error:", str(err)
def pair_to_dict(self, l):
res = dict()
for el in l:
if "=" not in el:
res[el]=''
else:
tmp = el.split('=',1)
if len(tmp[0]) > 0:
res[tmp[0]] = tmp[1]
return res
######################################
# Callback functions
######################################
def callback(self, function):
def wrapper(*args, **kw):
self.pending.append((function, args, kw))
return wrapper
def service_resolved(self, interface, protocol, name, stype, domain, host, aprotocol, address, port, txt, flags):
"""\
Called when all the information about a service is avaliable.
"""
assert threading.currentThread() == self.dbusthread
if len(txt) != 0:
details = self.pair_to_dict(avahi.txt_array_to_string_array(txt))
else:
details = []
self.ServiceFound(name, stype.split('.')[0][1:], (host, address, port), details, details)
def new_service(self, interface, protocol, name, stype, domain, flags):
"""\
Called when a new service is found.
"""
assert threading.currentThread() == self.dbusthread
self.server.ResolveService( interface, protocol, name, stype, domain, \
avahi.PROTO_UNSPEC, dbus.UInt32(0), reply_handler=self.callback(self.service_resolved), error_handler=self.callback(self.print_error))
def remove_service(self, interface, protocol, name, stype, domain, flags):
"""\
Called when a service disappears.
"""
assert threading.currentThread() == self.dbusthread
self.ServiceGone(name, stype.split('.')[0][1:], None)
def new_domain(self, interface, protocol, domain, flags):
"""\
Called when a new domain appears.
"""
assert threading.currentThread() == self.dbusthread
ifn = self.get_interface_name(interface, protocol)
if domain != "local":
self.browse_domain(interface, protocol, domain)
def browse_domain(self, interface, protocol, domain):
"""
Register to browse a given domain.
"""
assert threading.currentThread() == self.dbusthread
# FIXME: This is probably quite bad!
global service_type_browsers
# Are we already browsing this domain?
if service_type_browsers.has_key((interface, protocol, domain)):
return
try:
b = dbus.Interface( \
self.bus.get_object(avahi.DBUS_NAME, \
self.server.ServiceTypeBrowserNew(interface, protocol, domain, dbus.UInt32(0))
), avahi.DBUS_INTERFACE_SERVICE_TYPE_BROWSER)
except dbus.DBusException, e:
print e
traceback.print_exc()
for stype in ['_tp', '_tps', '_tp-http', '_tp-https']:
stype = stype+'._tcp'
b = dbus.Interface( \
self.bus.get_object(avahi.DBUS_NAME, \
self.server.ServiceBrowserNew(interface, protocol, stype, domain, dbus.UInt32(0))
), avahi.DBUS_INTERFACE_SERVICE_BROWSER)
service_browsers[(interface, protocol, stype, domain)] = b
b.connect_to_signal('ItemNew', self.callback(self.new_service))
b.connect_to_signal('ItemRemove', self.callback(self.remove_service))
def __init__(self):
ZeroConfBrowserBase.__init__(self)
import dbus.mainloop.glib
dbus.mainloop.glib.threads_init()
self.pending = []
def run(self):
self.dbusthread = threading.currentThread()
from dbus.mainloop.glib import DBusGMainLoop
mainloop = DBusGMainLoop(set_as_default=True)
self.bus = dbus.SystemBus(mainloop=mainloop)
self.server = dbus.Interface(self.bus.get_object(avahi.DBUS_NAME, avahi.DBUS_PATH_SERVER), avahi.DBUS_INTERFACE_SERVER)
# Explicitly browse .local
self.browse_domain(avahi.IF_UNSPEC, avahi.PROTO_UNSPEC, "local")
# Browse for other browsable domains
db = dbus.Interface(self.bus.get_object(avahi.DBUS_NAME, self.server.DomainBrowserNew(avahi.IF_UNSPEC, avahi.PROTO_UNSPEC, "", avahi.DOMAIN_BROWSER_BROWSE, dbus.UInt32(0))), avahi.DBUS_INTERFACE_DOMAIN_BROWSER)
db.connect_to_signal('ItemNew', self.new_domain)
self.mainloop = gobject.MainLoop()
gcontext = self.mainloop.get_context()
while not self.mainloop is None:
if len(self.pending) > 0:
command, args, kw = self.pending.pop(0)
command(*args, **kw)
if gcontext.pending():
gcontext.iteration()
else:
time.sleep(0.01)
def exit(self):
if hasattr(self, 'mainloop'):
self.mainloop.quit()
self.mainloop = None
def main():
a = ZeroConfBrowser()
a.run()
if __name__ == "__main__":
main()
|