This file is indexed.

/usr/share/pyshared/desktopcouch/application/pair/couchdb_pairing/dbus_io.py is in python-desktopcouch-application 1.0.8-0ubuntu3.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
# Copyright 2009 Canonical Ltd.
#
# This file is part of desktopcouch.
#
# desktopcouch is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License version 3
# as published by the Free Software Foundation.
#
# desktopcouch is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with desktopcouch.  If not, see <http://www.gnu.org/licenses/>.
#
# Authors: Chad Miller <chad.miller@canonical.com>
"""Communicate with DBUS and also the APIs it proxies, like Zeroconf."""

import logging

import dbus
import avahi

from desktopcouch.application.pair.couchdb_pairing import couchdb_io

INVITATIONS_DISCOVERY_SERVICE_TYPE = "_couchdb_pairing_invitations._tcp"
LOCATION_DISCOVERY_SERVICE_TYPE = "_couchdb_location._tcp"
DESKTOPCOUCH_DBUS_INTERFACE = "org.desktopcouch.CouchDB"


def get_local_hostname():
    """Get the name of this host, as Unicode host and domain parts."""
    _, server = get_dbus_bus_server()
    return server.GetHostName(), server.GetDomainName()


def get_remote_hostname(ip_address_as_ascii, iface=avahi.IF_UNSPEC):
    """Look up the name of a host by its address.  This is like DNS
    PTR in-arpa, but all zeroconf-y."""
    _, server = get_dbus_bus_server()
    try:
        hostname = server.ResolveAddress(
            iface, avahi.PROTO_UNSPEC, ip_address_as_ascii, dbus.UInt32(0))[4]
    except dbus.exceptions.DBusException:
        return ip_address_as_ascii

    return hostname


def get_dbus_bus_server():
    """Common sequence of steps to get a Bus and Server object from DBUS."""
    bus = dbus.SystemBus()
    root_name = bus.get_object(avahi.DBUS_NAME, avahi.DBUS_PATH_SERVER)
    server = dbus.Interface(root_name, avahi.DBUS_INTERFACE_SERVER)
    return bus, server


class Advertisement(object):
    """Represents an advertised service that exists on this host."""

    def __init__(self, port, name, stype="", domain="", host="", text=None):
        if text is None:
            text = {}
        super(Advertisement, self).__init__()

        self.logging = logging.getLogger(self.__class__.__name__)
        self.name = name
        self.stype = stype
        self.domain = domain
        self.host = host
        self.port = int(port)
        if hasattr(text, "keys"):
            self.text = avahi.dict_to_txt_array(text)
        else:
            self.text = text

        self.group = None

    def publish(self):
        """Start the advertisement."""
        bus, server = get_dbus_bus_server()

        entry_group = bus.get_object(avahi.DBUS_NAME, server.EntryGroupNew())
        g = dbus.Interface(entry_group, avahi.DBUS_INTERFACE_ENTRY_GROUP)

        g.AddService(avahi.IF_UNSPEC, avahi.PROTO_UNSPEC, dbus.UInt32(0),
                self.name, self.stype, self.domain, self.host,
                dbus.UInt16(self.port), self.text)

        g.Commit()
        self.logging.info("starting advertising %s on port %d",
                self.stype, self.port)
        self.group = g

    def unpublish(self):
        """End the advertisement."""
        try:
            self.logging.info("ending advertising %s on port %d",
                    self.stype, self.port)
            self.group.Reset()
        except dbus.DBusException:
            self.logging.exception("Couldn't reset DBus at shutdown.")
        self.group = None

    def die(self):
        """Quit."""
        self.unpublish()


class LocationAdvertisement(Advertisement):
    """An advertised couchdb location.  See Advertisement class."""
    def __init__(self, *args, **kwargs):
        kwargs["stype"] = LOCATION_DISCOVERY_SERVICE_TYPE
        super(LocationAdvertisement, self).__init__(*args, **kwargs)


class PairAdvertisement(Advertisement):
    """An advertised couchdb pairing opportunity.  See Advertisement class."""
    def __init__(self, *args, **kwargs):
        kwargs["stype"] = INVITATIONS_DISCOVERY_SERVICE_TYPE
        super(PairAdvertisement, self).__init__(*args, **kwargs)


def avahitext_to_dict(avahitext):
    """Turn avahi text into a dictionary."""
    text = {}
    for integers in avahitext:
        try:
            key, value = "".join(chr(i) for i in integers).split("=", 1)
            text[key] = value
        except ValueError:
            logging.exception("k/v field could not be decoded.")
    return text

# k=uuid, v=(addr, port)
nearby_desktop_couch_instances = dict()  # pylint: disable=C0103


def cb_found_desktopcouch_server(uuid, host_address, port):
    """Check if nearby server is found."""
    nearby_desktop_couch_instances[uuid] = (unicode(host_address), int(port))


def cb_lost_desktopcouch_server(uuid):
    """Check if nearby server is lost."""
    try:
        del nearby_desktop_couch_instances[uuid]
    except KeyError:
        pass


def get_seen_paired_hosts(uri=None):
    """Get the hosts that have been seen."""
    pairing_encyclopedia = couchdb_io.get_all_known_pairings(uri=uri)
    return (
            (
                uuid,
                addr,
                port,
                not pairing_encyclopedia[uuid]["active"],
                pairing_encyclopedia[uuid]["oauth"],
            )
            for uuid, (addr, port)
            in nearby_desktop_couch_instances.items()
            if uuid in pairing_encyclopedia)


def maintain_discovered_servers(add_cb=cb_found_desktopcouch_server,
                                del_cb=cb_lost_desktopcouch_server):
    """Maintain the discovered servers."""

    # pylint: disable=W0613
    def remove_item_handler(cb, interface, protocol, name, stype, domain,
                            flags):
        """A service disappeared."""

        if name.startswith("desktopcouch "):
            hostid = name[13:]
            logging.debug("lost sight of %r", hostid)
            cb(hostid)
        else:
            logging.error("annc doesn't look like one of ours.  %r", name)
    # pylint: enable=W0613

    # pylint: disable=W0613
    def new_item_handler(cb, interface, protocol, name, stype, domain, flags):
        """A service appeared."""

        def handle_error(*args):
            """An error in resolving a new service."""
            logging.error("zeroconf ItemNew error for services, %s", args)

        def handle_resolved(cb, *args):
            """Successfully resolved a new service, which we decode and send
            back to our calling environment with the callback function."""

            name, host, port = args[2], args[5], args[8]
            if name.startswith("desktopcouch "):
                cb(name[13:], host, port)
            else:
                logging.error("annc doesn't look like one of ours.  %r", name)
            return True

        server.ResolveService(interface, protocol, name, stype,
            domain, avahi.PROTO_UNSPEC, dbus.UInt32(0),
            reply_handler=lambda *a: handle_resolved(cb, *a),
            error_handler=handle_error)
    # pylint: enable=W0613

    bus, server = get_dbus_bus_server()
    domain_name = get_local_hostname()[1]
    browser = server.ServiceBrowserNew(avahi.IF_UNSPEC, avahi.PROTO_UNSPEC,
            LOCATION_DISCOVERY_SERVICE_TYPE, domain_name, dbus.UInt32(0))
    browser_name = bus.get_object(avahi.DBUS_NAME, browser)

    sbrowser = dbus.Interface(browser_name,
            avahi.DBUS_INTERFACE_SERVICE_BROWSER)
    sbrowser.connect_to_signal("ItemNew",
            lambda *a: new_item_handler(add_cb, *a))
    sbrowser.connect_to_signal("ItemRemove",
            lambda *a: remove_item_handler(del_cb, *a))
    sbrowser.connect_to_signal("Failure",
            lambda *a: logging.error("avahi error %r", a))


def discover_services(add_commport_name_cb, del_commport_name_cb,
        show_local=False):
    """Start looking for services.  Use two callbacks to handle seeing
    new services and seeing services disappear."""

    # pylint: disable=W0613
    def remove_item_handler(cb, interface, protocol, name, stype, domain,
                            flags):
        """A service disappeared."""

        if not show_local and flags & avahi.LOOKUP_RESULT_LOCAL:
            return
        cb(name)
    # pylint: enable=W0613

    def new_item_handler(cb, interface, protocol, name, stype, domain, flags):
        """A service appeared."""

        def handle_error(*args):
            """An error in resolving a new service."""
            logging.error("zeroconf ItemNew error for services, %s", args)

        def handle_resolved(cb, *args):
            """Successfully resolved a new service, which we decode and send
            back to our calling environment with the callback function."""
            text = avahitext_to_dict(args[9])
            name, host, port = args[2], args[5], args[8]
            cb(name, text.get("description", "?"),
                    host, port, text.get("version", None))

        if not show_local and flags & avahi.LOOKUP_RESULT_LOCAL:
            return

        server.ResolveService(interface, protocol, name, stype,
            domain, avahi.PROTO_UNSPEC, dbus.UInt32(0),
            reply_handler=lambda *a: handle_resolved(cb, *a),
            error_handler=handle_error)

    bus, server = get_dbus_bus_server()
    domain_name = get_local_hostname()[1]

    browser = server.ServiceBrowserNew(avahi.IF_UNSPEC, avahi.PROTO_UNSPEC,
            INVITATIONS_DISCOVERY_SERVICE_TYPE, domain_name, dbus.UInt32(0))
    browser_name = bus.get_object(avahi.DBUS_NAME, browser)

    sbrowser = dbus.Interface(browser_name,
            avahi.DBUS_INTERFACE_SERVICE_BROWSER)
    sbrowser.connect_to_signal("ItemNew",
            lambda *a: new_item_handler(add_commport_name_cb, *a))
    sbrowser.connect_to_signal("ItemRemove",
            lambda *a: remove_item_handler(del_commport_name_cb, *a))
    sbrowser.connect_to_signal("Failure",
            lambda *a: logging.error("avahi error %r", a))