This file is indexed.

/usr/share/pyshared/desktopcouch/application/replication_services/ubuntuone.py is in python-desktopcouch-application 1.0.8-0ubuntu3.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
"""Ubuntu One."""

from oauth import oauth
import logging
import urllib2

import dbus
import ubuntu_sso

try:
    # Python 2.5
    import simplejson as json
except ImportError:
    # Python 2.6+
    import json


NAME = "Ubuntu One"
DESCRIPTION = "The Ubuntu One cloud service"


def undbusify(value):
    """Convert dbus types back to native types."""
    for singleton in (None, True, False):
        if value == singleton:
            return singleton
    for val_type in (long, int, float, complex,
                     unicode, str,
                     list, tuple, dict, set):
        if isinstance(value, val_type):
            return val_type(value)
    raise TypeError(value)


def in_main_thread(function, *args, **kwargs):
    """Check if we are in the main thread."""
    from twisted.python.threadable import isInIOThread
    if isInIOThread():  # We may not be in main thread right now.
        from twisted.internet.threads import blockingCallFromThread
        from twisted.internet import reactor
        return blockingCallFromThread(reactor, function, *args, **kwargs)
    else:  # If in main, just do it.
        return function(*args, **kwargs)


def is_active():
    """Can we deliver information?"""
    return get_oauth_data() is not None

oauth_data = None                       # pylint: disable=C0103


def get_oauth_data():
    """Information needed to replicate to a server."""
    global oauth_data                   # pylint: disable=W0603
    if oauth_data is not None:
        return oauth_data

    bus = dbus.SessionBus()
    proxy = bus.get_object(ubuntu_sso.DBUS_BUS_NAME,
                           ubuntu_sso.DBUS_CRED_PATH,
                           follow_name_owner_changes=True)
    logging.info('get_oauth_data: asking for credentials to Ubuntu SSO. '
                 'App name: %s', NAME)
    oauth_data = dict(
                      (undbusify(k), undbusify(v))
                      for k, v
                      in proxy.find_credentials(NAME).iteritems())
    logging.info('Got credentials from Ubuntu SSO. Non emtpy credentials? %s',
                 len(oauth_data) > 0)
    return oauth_data


def get_oauth_request_header(consumer, access_token, http_url):
    """Get an oauth request header given the token and the url"""
    signature_method = oauth.OAuthSignatureMethod_PLAINTEXT()
    assert http_url.startswith("https")
    oauth_request = oauth.OAuthRequest.from_consumer_and_token(
        http_url=http_url,
        http_method="GET",
        oauth_consumer=consumer,
        token=access_token)
    oauth_request.sign_request(signature_method, consumer, access_token)
    return oauth_request.to_header()


class PrefixGetter():
    """Prefix getter class."""

    def __init__(self):
        self.str = None
        self.oauth_header = None
        self.user_id = None

    def __str__(self):
        if self.str is not None:
            return self.str

        url = "https://one.ubuntu.com/api/account/"
        if self.oauth_header is None:
            try:
                get_oauth_data()
                consumer = oauth.OAuthConsumer(oauth_data['consumer_key'],
                                               oauth_data['consumer_secret'])
                access_token = oauth.OAuthToken(oauth_data['token'],
                                                oauth_data['token_secret'])
                self.oauth_header = get_oauth_request_header(
                        consumer, access_token, url)
            except:
                logging.exception("Could not get access token from sso.")
                raise ValueError("No access token.")

        req = urllib2.Request(url, None, self.oauth_header)
        result = urllib2.urlopen(req, None, 30)

        code = result.getcode()
        if code != 200:
            logging.error("Server returned HTTP %d", code)
            raise ValueError("server unavailable, code %d" % (code,))

        document = json.load(result)
        if "couchdb_root" not in document:
            raise ValueError("couchdb_root not found in %s" % (document,))
        self.str = document["couchdb_root"]
        self.user_id = document["id"]

        return self.str

# Access to this as a string fires off functions.
db_name_prefix = PrefixGetter()         # pylint: disable=C0103


from desktopcouch.application.server import DesktopDatabase
from desktopcouch.application.pair.couchdb_pairing import couchdb_io
from desktopcouch.application import local_files

#
#  One day, if we decide this is a good idea, we can generalize this to
#  any pairing.
#


class ReplicationExclusion(object):
    """Set some databases to be replicatable or non-replicatable to
    Ubuntu One cloud couchdb servers."""
    def __init__(self, url=None, ctx=local_files.DEFAULT_CONTEXT):
        super(ReplicationExclusion, self).__init__()
        self.management_db = DesktopDatabase(
            "management", uri=url, create=True, ctx=ctx)
        all_pairing_records = self.management_db.get_all_records(
                record_type=couchdb_io.PAIRED_SERVER_RECORD_TYPE)
        for result in all_pairing_records:
            if result.get("service_name", None) == "ubuntuone":
                self.pairing_record = result
                return
        # else
        raise ValueError("No pairing record for ubuntuone.")

    def all_exclusions(self):
        """Gets all exclusions that can be removed, which omits 'management'
        and 'users', as those are private to this machine always."""

        return set(self.pairing_record.get("excluded_names", []))

    def _update(self, verb, db_name):
        """Update the record."""
        current = self.all_exclusions()
        try:
            getattr(current, verb)(db_name)
        except KeyError:
            pass  # Not an error to remove something already gone.
        if len(current) > 0:
            self.pairing_record["excluded_names"] = list(current)
        else:
            if "excluded_names" in self.pairing_record:
                del self.pairing_record["excluded_names"]
        self.management_db.put_record(self.pairing_record)

    def exclude(self, db_name):
        """Sets a database never to be replicatable to Ubuntu One."""
        self._update("add", db_name)

    def replicate(self, db_name):
        """Sets a database to be replicatable to Ubuntu One."""
        self._update("remove", db_name)

if __name__ == "__main__":
    logging.basicConfig(level=logging.DEBUG, format="%(message)s")
    print str(db_name_prefix)