/usr/share/pyshared/gwibber/microblog/uploader/ubuntuone_uploader/__init__.py is in gwibber-service 3.4.0-0ubuntu4.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 | "Ubuntu One uploader and publisher"
import dbus, os
from gi.repository import Gio
try:
from ubuntuone.platform.tools import SyncDaemonTool, DBusClient, DBUS_IFACE_STATUS_NAME
except ImportError:
SyncDaemonTool = None
class Uploader(object):
def __init__(self, success_callback, failure_callback):
self.success_callback = success_callback
self.failure_callback = failure_callback
self.path = None
if SyncDaemonTool:
self.sd = SyncDaemonTool(bus=dbus.SessionBus())
def uploadFile(self, path):
self.path = path
if not SyncDaemonTool:
failure_callback(path, "Could not find Ubuntu One library (python-ubuntuone-client)")
return
# First, confirm Ubuntu One is connected
d = self.sd.get_status()
d.addErrback(self.__failure)
d.addCallback(self.__got_status)
def __failure(self, *args):
self.failure_callback(self.path, "Problem uploading to Ubuntu One: %s" % str(args))
def __got_status(self, state):
if state["is_online"]:
self.__copy_file()
else:
# not online, so try to connect
self.sig_status_changed = self.sd.bus.add_signal_receiver(
handler_function=self.__status_changed, signal_name="StatusChanged",
dbus_interface=DBUS_IFACE_STATUS_NAME, path='/status')
d = self.sd.connect()
d.addErrback(self.__failure)
def __status_changed(self, status):
if status["is_online"]:
# We are connected; continue
self.sig_status_changed.remove()
self.__copy_file()
return
if status["is_error"]:
# We are not connected, and not going to be without user fixes
self.sig_status_changed.remove()
self.__failure("Could not connect to Ubuntu One")
return
def __copy_file(self):
# First, create a folder to put the copy of the specified file in
fol = os.path.expanduser("~/Ubuntu One/Gwibber Uploads")
try:
os.makedirs(fol)
except OSError:
if not os.path.isdir(fol):
self.__failure("Could not create Gwibber Uploads folder in Ubuntu One")
return
# OSError is OK if the folder already existed
fdir, ffullname = os.path.split(self.path)
fname, fext = os.path.splitext(ffullname)
src = Gio.File(self.path)
dest = Gio.File(os.path.join(fol, ffullname))
# We connect to the UploadFinished signal from syncdaemon here,
# before we even copy the file, so we know that it's right.
self.sig_upload_finished = self.sd.bus.add_signal_receiver(
handler_function=self.__file_uploaded, signal_name="UploadFinished",
dbus_interface=DBUS_IFACE_STATUS_NAME, path='/status')
try:
src.copy(dest)
except Gio.Error:
# file with this name exists. Try creating a file with a number in
differentiator = 1
while 1:
try:
dest = Gio.File(os.path.join(fol, "%s (%s)%s" % (fname, differentiator, fext)))
src.copy(dest)
except Gio.Error:
differentiator += 1
else:
break
self.u1path = dest.get_path() # the actual path in ~/Ubuntu One
def __file_uploaded(self, path, info):
if path == self.u1path:
# stop listening to the signal
self.sig_upload_finished.remove()
# publish the file
d = self.sd.change_public_access(path, True)
d.addCallback(self.__published)
d.addErrback(self.__failure)
def __published(self, info):
self.success_callback(self.path, info["public_url"])
|