/usr/share/pyshared/desktopcouch/application/replication_services/ubuntuone.py is in python-desktopcouch-application 1.0.8-0ubuntu3.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 | """Ubuntu One."""
from oauth import oauth
import logging
import urllib2
import dbus
import ubuntu_sso
try:
# Python 2.5
import simplejson as json
except ImportError:
# Python 2.6+
import json
NAME = "Ubuntu One"
DESCRIPTION = "The Ubuntu One cloud service"
def undbusify(value):
"""Convert dbus types back to native types."""
for singleton in (None, True, False):
if value == singleton:
return singleton
for val_type in (long, int, float, complex,
unicode, str,
list, tuple, dict, set):
if isinstance(value, val_type):
return val_type(value)
raise TypeError(value)
def in_main_thread(function, *args, **kwargs):
"""Check if we are in the main thread."""
from twisted.python.threadable import isInIOThread
if isInIOThread(): # We may not be in main thread right now.
from twisted.internet.threads import blockingCallFromThread
from twisted.internet import reactor
return blockingCallFromThread(reactor, function, *args, **kwargs)
else: # If in main, just do it.
return function(*args, **kwargs)
def is_active():
"""Can we deliver information?"""
return get_oauth_data() is not None
oauth_data = None # pylint: disable=C0103
def get_oauth_data():
"""Information needed to replicate to a server."""
global oauth_data # pylint: disable=W0603
if oauth_data is not None:
return oauth_data
bus = dbus.SessionBus()
proxy = bus.get_object(ubuntu_sso.DBUS_BUS_NAME,
ubuntu_sso.DBUS_CRED_PATH,
follow_name_owner_changes=True)
logging.info('get_oauth_data: asking for credentials to Ubuntu SSO. '
'App name: %s', NAME)
oauth_data = dict(
(undbusify(k), undbusify(v))
for k, v
in proxy.find_credentials(NAME).iteritems())
logging.info('Got credentials from Ubuntu SSO. Non emtpy credentials? %s',
len(oauth_data) > 0)
return oauth_data
def get_oauth_request_header(consumer, access_token, http_url):
"""Get an oauth request header given the token and the url"""
signature_method = oauth.OAuthSignatureMethod_PLAINTEXT()
assert http_url.startswith("https")
oauth_request = oauth.OAuthRequest.from_consumer_and_token(
http_url=http_url,
http_method="GET",
oauth_consumer=consumer,
token=access_token)
oauth_request.sign_request(signature_method, consumer, access_token)
return oauth_request.to_header()
class PrefixGetter():
"""Prefix getter class."""
def __init__(self):
self.str = None
self.oauth_header = None
self.user_id = None
def __str__(self):
if self.str is not None:
return self.str
url = "https://one.ubuntu.com/api/account/"
if self.oauth_header is None:
try:
get_oauth_data()
consumer = oauth.OAuthConsumer(oauth_data['consumer_key'],
oauth_data['consumer_secret'])
access_token = oauth.OAuthToken(oauth_data['token'],
oauth_data['token_secret'])
self.oauth_header = get_oauth_request_header(
consumer, access_token, url)
except:
logging.exception("Could not get access token from sso.")
raise ValueError("No access token.")
req = urllib2.Request(url, None, self.oauth_header)
result = urllib2.urlopen(req, None, 30)
code = result.getcode()
if code != 200:
logging.error("Server returned HTTP %d", code)
raise ValueError("server unavailable, code %d" % (code,))
document = json.load(result)
if "couchdb_root" not in document:
raise ValueError("couchdb_root not found in %s" % (document,))
self.str = document["couchdb_root"]
self.user_id = document["id"]
return self.str
# Access to this as a string fires off functions.
db_name_prefix = PrefixGetter() # pylint: disable=C0103
from desktopcouch.application.server import DesktopDatabase
from desktopcouch.application.pair.couchdb_pairing import couchdb_io
from desktopcouch.application import local_files
#
# One day, if we decide this is a good idea, we can generalize this to
# any pairing.
#
class ReplicationExclusion(object):
"""Set some databases to be replicatable or non-replicatable to
Ubuntu One cloud couchdb servers."""
def __init__(self, url=None, ctx=local_files.DEFAULT_CONTEXT):
super(ReplicationExclusion, self).__init__()
self.management_db = DesktopDatabase(
"management", uri=url, create=True, ctx=ctx)
all_pairing_records = self.management_db.get_all_records(
record_type=couchdb_io.PAIRED_SERVER_RECORD_TYPE)
for result in all_pairing_records:
if result.get("service_name", None) == "ubuntuone":
self.pairing_record = result
return
# else
raise ValueError("No pairing record for ubuntuone.")
def all_exclusions(self):
"""Gets all exclusions that can be removed, which omits 'management'
and 'users', as those are private to this machine always."""
return set(self.pairing_record.get("excluded_names", []))
def _update(self, verb, db_name):
"""Update the record."""
current = self.all_exclusions()
try:
getattr(current, verb)(db_name)
except KeyError:
pass # Not an error to remove something already gone.
if len(current) > 0:
self.pairing_record["excluded_names"] = list(current)
else:
if "excluded_names" in self.pairing_record:
del self.pairing_record["excluded_names"]
self.management_db.put_record(self.pairing_record)
def exclude(self, db_name):
"""Sets a database never to be replicatable to Ubuntu One."""
self._update("add", db_name)
def replicate(self, db_name):
"""Sets a database to be replicatable to Ubuntu One."""
self._update("remove", db_name)
if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG, format="%(message)s")
print str(db_name_prefix)
|