This file is indexed.

/usr/share/pyshared/desktopcouch/application/replication.py is in python-desktopcouch-application 1.0.8-0ubuntu3.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
# Copyright 2009 Canonical Ltd.
#
# This file is part of desktopcouch.
#
# desktopcouch is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License version 3
# as published by the Free Software Foundation.
#
# desktopcouch is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with desktopcouch.  If not, see <http://www.gnu.org/licenses/>.
#
# Authors: Chad Miller <chad.miller@canonical.com>

"""Replication."""

import logging
log = logging.getLogger("replication")  # pylint: disable=C0103

import dbus.exceptions

from desktopcouch.application.pair.couchdb_pairing import couchdb_io
from desktopcouch.application.pair.couchdb_pairing import dbus_io
from desktopcouch.application import replication_services

# pylint: disable=E0611,F0401
try:
    import urlparse
except ImportError:
    import urllib.parse as urlparse
# pylint: enable=E0611,F0401

from twisted.internet import task


known_bad_service_names = set()         # pylint: disable=C0103
is_running = True                       # pylint: disable=C0103


def db_targetprefix_for_service(service_name):
    """Use the service name to look up what the prefix should be on the
    databases.  This gives an egalitarian way for non-UbuntuOne servers to have
    their own remote-db-name scheme."""
    try:
        container = "desktopcouch.application.replication_services"
        log.debug("Looking up prefix for service %r", service_name)
        mod = __import__(container, fromlist=[service_name])
        return getattr(mod, service_name).db_name_prefix
    except ImportError:
        log.error("The service %r is unknown.  It is not a "
                "module in the %s package ." % (service_name, container))
        return ""
    except Exception:
        log.exception("Not changing remote db name.")
        return ""


def oauth_info_for_service(service_name):
    """Use the service name to look up what oauth information we should use
    when talking to that service."""
    try:
        container = "desktopcouch.application.replication_services"
        log.debug("Looking up prefix for service %r", service_name)
        mod = __import__(container, fromlist=[service_name])
        return getattr(mod, service_name).get_oauth_data()
    except ImportError:
        log.error("The service %r is unknown.  It is not a "
                "module in the %s package ." % (service_name, container))
        return None


def do_all_replication(local_port):     # pylint: disable=R0914,R0912,R0915
    """Perform all replication tasks."""
    log.debug("started replicating")
    local_uri = couchdb_io.mkuri("localhost", local_port)
    try:
        try:
            # All machines running desktopcouch must advertise themselves with
            # zeroconf.  We collect those elsewhere and filter out the ones
            # that we have paired with.  Now, it's time to send our changes to
            # all those.

            for remote_hostid, addr, port, is_unpaired, remote_oauth in \
                    dbus_io.get_seen_paired_hosts(uri=local_uri):

                if is_unpaired:
                    # The far end doesn't know want to break up.
                    count = 0
                    for local_identifier in couchdb_io.get_my_host_unique_id():
                        last_exception = None
                        try:
                            # Tell her gently, using each pseudonym.
                            # pylint: disable=E1121
                            couchdb_io.expunge_pairing(local_identifier,
                                    couchdb_io.mkuri(addr, port), remote_oauth)
                            # pylint: enable=E1121
                            count += 1
                        except Exception, e:  # pylint: disable=W0703
                            last_exception = e
                    if count == 0:
                        if last_exception is not None:
                            # If she didn't recognize us, something's wrong.
                            try:
                                raise last_exception  # pylint: disable=E0702
                                # push caught exception back...
                            except:
                                # ... so that we log it here.
                                log.exception(  # pylint: disable=W0702
                                    "failed to unpair from other end.")
                                continue
                    else:
                        # Finally, find your inner peace...
                        couchdb_io.expunge_pairing(remote_hostid)
                    # ...and move on.
                    continue

                # Ah, good, this is an active relationship.  Be a giver.
                log.debug("want to replipush to discovered host %r @ %s",
                        remote_hostid, addr)
                for db_name in couchdb_io.get_database_names_replicatable(
                        local_uri):
                    if not is_running:
                        return
                    couchdb_io.replicate(db_name, db_name,
                            target_host=addr, target_port=port,
                            source_port=local_port, target_oauth=remote_oauth,
                            local_uri=local_uri)
            log.debug("replication of discovered hosts finished")
        except Exception:            # pylint: disable=W0703
            log.exception("replication of discovered hosts aborted")

        try:
            # There may be services we send data to.  Use the service name (sn)
            # to look up what the service needs from us.

            for remote_hostid, sn, to_pull, to_push in \
                        couchdb_io.get_static_paired_hosts(port=local_port):

                if not sn in dir(replication_services):
                    if not is_running:
                        return
                    if sn in known_bad_service_names:
                        continue  # Don't nag.
                    known_bad_service_names.add(sn)

                remote_oauth_data = oauth_info_for_service(sn)

                # TODO: push all this into service module.
                try:
                    prefix_getter = db_targetprefix_for_service(sn)
                    remote_location = str(prefix_getter)
                    if hasattr(prefix_getter, 'user_id'):
                        # pylint: disable=E1103
                        user_id = prefix_getter.user_id
                        # pylint: enable=E1103
                    else:
                        user_id = None
                    urlinfo = urlparse.urlsplit(str(remote_location))
                except ValueError, e:
                    log.warn("Can't reach service %s.  %s", sn, e)
                    continue
                # pylint: disable=E1103
                if ":" in urlinfo.netloc:
                    addr, port = urlinfo.netloc.rsplit(":", 1)
                else:
                    addr = urlinfo.netloc
                    port = 443 if urlinfo.scheme == "https" else 80
                remote_db_name_prefix = urlinfo.path.strip("/")
                # pylint: enable=E1103
                # TODO: end ^
                if to_push:
                    for db_name in couchdb_io.get_database_names_replicatable(
                            local_uri):
                        if not is_running:
                            return

                        remote_db_name = remote_db_name_prefix + "/" + db_name

                        log.debug(
                            "want to replipush %r to static host %r @ %s",
                            remote_db_name, remote_hostid, addr)

                        couchdb_io.replicate(db_name, remote_db_name,
                                target_host=addr, target_port=port,
                                source_port=local_port, target_ssl=True,
                                target_oauth=remote_oauth_data,
                                local_uri=local_uri)
                if to_pull:
                    for remote_db_name in \
                            couchdb_io.get_database_names_replicatable(
                    # pylint: disable=E1103
                    couchdb_io.mkuri(
                            addr, int(port), has_ssl=(
                                urlinfo.scheme == 'https')),
                            oauth_tokens=remote_oauth_data, service=True,
                            user_id=user_id):
                        if not is_running:
                            return
                        try:
                            if not remote_db_name.startswith(
                                    str(remote_db_name_prefix + "/")):
                                continue
                        except ValueError, e:
                            log.error("skipping %r on %s.  %s", db_name, sn, e)
                            continue
                        prefix_len = len(str(remote_db_name_prefix))
                        db_name = remote_db_name[prefix_len + 1:]
                        if db_name.strip("/") == "management":
                            continue  # be paranoid about what we accept.
                        log.debug(
                            "want to replipull %r from static host %r @ %s",
                            db_name, remote_hostid, addr)
                        couchdb_io.replicate(remote_db_name, db_name,
                                source_host=addr, source_port=port,
                                target_port=local_port, source_ssl=True,
                                source_oauth=remote_oauth_data,
                                local_uri=local_uri)
                    # pylint: enable=E1103

        except Exception:               # pylint: disable=W0703
            log.exception("replication of services aborted")
    finally:
        log.debug("finished replicating")


def set_up(port_getter):
    """Set up the port getter."""
    port = port_getter()
    unique_identifiers = couchdb_io.get_my_host_unique_id(
            couchdb_io.mkuri("localhost", int(port)), create=True)

    beacons = [dbus_io.LocationAdvertisement(port, "desktopcouch " + i)
            for i in unique_identifiers]
    for beacon in beacons:
        try:
            beacon.publish()
        except dbus.exceptions.DBusException, e:
            log.error("We seem to be running already, or can't publish "
                    "our zeroconf advert.  %s", e)
            return None

    dbus_io.maintain_discovered_servers()

    task_running = task.LoopingCall(do_all_replication, int(port))
    task_running.start(3600)

    # TODO:  port may change, so every so often, check it and
    # perhaps refresh the beacons.  We return an array of beacons, so we could
    # keep a reference to that array and mutate it when the port-beacons
    # change.

    return beacons, task_running


def tear_down(beacons, looping_task):
    """Tear down the port getter."""
    for beacon in beacons:
        beacon.unpublish()
    try:
        looping_task.stop()
    except:
        pass                            # pylint: disable=W0702