/usr/lib/python2.7/dist-packages/framework/patterns/tasklet.py is in fso-frameworkd 0.10.1-3.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 | # Tichy
# copyright 2008 Guillaume Chereau (charlie@openmoko.org)
#
# This file is part of Tichy.
#
# Tichy is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Tichy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Tichy. If not, see <http://www.gnu.org/licenses/>.
"""
The tasklet module is a very powerfull tool that allow us to write
functions that look like thread (with blocking call), but are in fact using
callback.
"""
__docformat__ = "restructuredtext en"
import sys, traceback
from types import GeneratorType
import gobject # Only used for the Sleep tasklet
import dbus # Only used for the WaitDBusName tasklet
import logging
logger = logging.getLogger( "tasklet" )
# TODO:
# - better stack printing in case of error
def tasklet(func):
"""
A decorator that turns a generator function into a tasklet instance.
"""
def ret(*args, **kargs):
return Tasklet( generator=func(*args, **kargs) )
ret.__dict__ = func.__dict__
ret.__name__ = func.__name__
ret.__doc__ = func.__doc__
return ret
class Tasklet(object):
"""
This class can be used to write easy callback style functions using the 'yield'
python expression.
It is usefull in some cases where callback functions are the right thing to do,
but make the code too messy
This class is largely inspired by python PEP 0342:
http://www.python.org/dev/peps/pep-0342/
See the examples below to understand how to use it.
There is a very simple comunication mechanisme between tasklets :
A tasklet can wait for an incoming message using `yield WaitMessage()`,
an other tasklet can then send a message to this tasklet using the send_message method.
See the example 8 to see how to use this.
"""
def __init__(self, *args, **kargs):
if 'generator' in kargs:
self.generator = kargs['generator']
else:
self.generator = self.do_run(*args, **kargs)
assert isinstance(self.generator, GeneratorType), type(self.generator)
self.stack = traceback.extract_stack()[:-2]
# The tasklet we are waiting for...
self.waiting = None
self.closed = False
# The two lists used for messages passing between tasklets
self.waiting_to_send_message = []
self.waiting_for_message = []
def __del__(self):
if not self.closed and self.generator:
logger.error(
"Tasklet deleted without being executed\nTraceback to instantiation (most recent call last):\n%s",
''.join(traceback.format_list(self.stack)).rstrip()
)
def do_run(self, *args, **kargs):
return self.run(*args, **kargs)
def run(self):
"""The default task run by the tasklet"""
yield
def start(self, callback = None, err_callback = None, *args, **kargs):
"""Start the tasklet, connected to a callback and an error callback
:Parameters:
- `callback`: a function that will be called with the
returned value as argument
- `err_callback`: a function that is called if the tasklet raises an exception.
The function take 3 arguments as parameters, that are the standard python exception arguments.
- `*args`: any argument that will be passed to the callback function as well
- `**kargs`: any kargs argument that will be passed to the callback function as well
"""
self.callback = callback or self.default_callback
self.err_callback = err_callback or self.default_err_callback
self.args = args # possible additional args that will be passed to the callback
self.kargs = kargs # possible additional keywords args that will be passed to the callback
self.send(None) # And now we can initiate the task
def start_from(self, tasklet):
"""Start the tasklet from an other tasklet"""
self.start(tasklet.send, tasklet.throw)
def start_dbus(self, on_ok, on_err, *args, **kargs):
"""Like start, except that the callback methods comply to the dbus async signature
We should use this method instead of start when we want to connect to the callbacks
defined in the dbus async_callbacks keyword.
"""
# If the returned value is None, then we don't pass it to the callback.
def callback(value):
if value is None:
on_ok()
else:
on_ok(value)
# DBus error callback take only one argument.
def err_callback(type, e, trace):
on_err(e)
self.start(callback=callback, err_callback=err_callback, *args, **kargs)
def default_callback(self, value):
"""The default callback if None is specified"""
pass
def default_err_callback(self, type, value, traceback):
"""The default error call back if None is specified"""
if type is GeneratorExit:
return
# If a task generates a exception without having an error callback we kill the app.
# It is not very nice, but the only way to avoid blocking.
import traceback as tb
import sys
tb.print_exception(*sys.exc_info())
sys.exit(-1)
def close(self):
if self.closed:
return
self.callback = None
self.err_callback = None
if self.waiting:
self.waiting.close()
self.generator.close()
self.closed = True
def exit(self): # TODO: is this really useful, or should we use close here ?
e = GeneratorExit()
self.err_callback(*sys.exc_info())
def send(self, value = None, *args):
"""Resume and send a value into the tasklet generator
"""
# This somehow complicated try switch is used to handle all possible return and exception
# from the generator function
### <ML addition FIXME FIXME>
if self.generator == None:
logger.error( "generator has vanished!" )
return
### </ML addition FIXME FIXME>
assert self.closed == False, "Trying to send to a closed tasklet"
try:
value = self.generator.send(value)
except StopIteration:
# We don't propagate StopIteration
value = None
except Exception:
self.err_callback(*sys.exc_info())
self.close() # This is very important, cause we need to make sure we free the memory of the callback !
return
self.handle_yielded_value(value)
def throw(self, type, value = None, traceback = None):
"""Throw an exeption into the tasklet generator"""
try:
value = self.generator.throw(type, value, traceback)
except StopIteration:
# We don't propagate StopIteration
value = None
except Exception:
self.err_callback(*sys.exc_info())
self.close() # This is very important, cause we need to make sure we free the memory of the callback !
return
self.handle_yielded_value(value)
def handle_yielded_value(self, value):
"""This method is called after the waiting tasklet yielded a value
We have to take care of two cases:
- If the value is a Tasklet : we start it and connect the call back
to the 'parent' Tasklet send and throw hooks
- Otherwise, we consider that the tasklet finished, and we can call
our callback function
"""
if isinstance(value, GeneratorType):
value = Tasklet(generator = value)
if isinstance(value, Tasklet):
self.waiting = value
value.start_from(self)
else:
assert self.callback, "%s has no callback !" % self
self.callback(value, *self.args, **self.kargs)
self.close()
@tasklet
def send_message(self, value = None):
"""Block until the tasklet accepts the incoming message"""
if self.waiting_for_message:
listener = self.waiting_for_message.pop(0)
listener.trigger(value)
else:
sender = WaitTrigger()
self.waiting_to_send_message.append((sender, value))
yield sender
@tasklet
def wait_message(self):
"""Block until the tasklet receive an incoming message
Since we usually don't have access to the tasklet `self` argument (when using generators based tasklets)
it is easier to use the WaitMessage class for this.
"""
if self.waiting_to_send_message:
sender, value = self.waiting_to_send_message.pop(0)
sender.trigger(value)
yield value
else:
waiter = WaitTrigger()
self.waiting_for_message.append(waiter)
ret = yield waiter
yield ret
class WaitTrigger(Tasklet):
"""Special tasklet that will block until its `trigger` method is called
This is mostly used by the send_message and WaitMessage tasklet.
"""
def start(self, callback = None, err_callback = None, *args, **kargs):
self.callback = callback
def trigger(self, v = None):
if self.callback:
self.callback(v)
self.close()
def close(self):
self.callback = None
class WaitMessage(Tasklet):
"""Special tasklet that will block until the caller tasklet receive a message."""
def start_from(self, tasklet):
tasklet.wait_message().start(tasklet.send)
def close(self):
pass
class Wait(Tasklet):
"""
A special tasklet that wait for an event to be emitted
If o is an Object that can emit a signal 'signal', then we can create a
tasklet that waits for this event like this : Wait(o, 'signal')
"""
def __init__(self, obj, event):
assert obj is not None
super(Wait, self).__init__()
self.obj = obj
self.event = event
self.connect_id = None
def _callback(self, o, *args):
"""This is the callback that is triggered by the signal"""
assert o is self.obj
if not self.connect_id:
return # We have been closed already
# We need to remember to disconnect to the signal
o.disconnect(self.connect_id)
self.connect_id = None
# We can finally call our real callback
try:
self.callback(*args)
except:
self.err_callback(*sys.exc_info())
# We give a hint to the garbage collector
self.obj = self.callback = None
return False
def start(self, callback, err_callback, *args):
assert hasattr(self.obj, 'connect'), self.obj
self.callback = callback
self.err_callback = err_callback
self.connect_id = self.obj.connect(self.event, self._callback, *args)
def close(self):
# It is very important to disconnect the callback here !
if self.connect_id:
self.obj.disconnect(self.connect_id)
self.obj = self.callback = self.connect_id = None
class WaitFirst(Tasklet):
"""
A special tasklet that waits for the first to return of a list of tasklets.
"""
def __init__(self, *tasklets):
super(WaitFirst, self).__init__()
self.done = None
self.tasklets = tasklets
def _callback(self, *args):
i = args[-1]
values = args[:-1]
if self.done:
return
self.done = True
self.callback((i,values))
for t in self.tasklets:
t.close()
self.callback = None
self.tasklets = None
def start(self, callback = None, err_callback = None):
self.callback = callback
self.err_callback = Tasklet.default_err_callback
# We connect all the tasklets
for (i,t) in enumerate(self.tasklets):
t.start(self._callback, err_callback, i)
class WaitDBus(Tasklet):
"""Special tasket that wait for a DBus call"""
def __init__(self, method, *args):
super(WaitDBus, self).__init__()
self.method = method
self.args = args
def start(self, callback, err_callback):
self.callback = callback
self.err_callback = err_callback
kargs = {'reply_handler':self._callback, 'error_handler':self._err_callback}
self.method(*self.args, **kargs)
def _callback(self, *args):
self.callback(*args)
def _err_callback(self, e):
self.err_callback(type(e), e, sys.exc_info()[2])
class WaitDBusSignal(Tasklet):
"""A special tasklet that wait for a DBUs event to be emited"""
def __init__(self, obj, event, time_out = None):
super(WaitDBusSignal, self).__init__()
self.obj = obj
self.event = event
self.time_out = time_out
self.connection = None
self.timeout_connection = None
def _callback(self, *args):
if not self.connection:
return # We have been closed already
self.connection.remove()
# don't forget to remove the timeout callback
if self.timeout_connection:
gobject.source_remove(self.timeout_connection)
self.timeout_connection = None
if len(args) == 1: # What is going on here is that if we have a single value, we return it directly,
args = args[0] # but if we have several value we pack them in a tuple for the callback
# because the callback only accpet a single argument
try:
self.callback(args)
except:
import sys
self.err_callback(*sys.exc_info())
self.obj = self.callback = None
return False
def _err_callback(self):
# can only be called on timeout
self.timout_connection = None
e = Exception("TimeOut")
self.err_callback(type(e), e, sys.exc_info()[2])
def start(self, callback, err_callback):
self.callback = callback
self.err_callback = err_callback
self.connection = self.obj.connect_to_signal(self.event, self._callback)
if self.time_out:
self.timeout_connection = gobject.timeout_add_seconds(self.time_out, self._err_callback)
def close(self):
# Note : it is not working very well !!!! Why ? I don't know...
if self.connection:
self.connection.remove()
if self.timeout_connection:
gobject.source_remove(self.timeout_connection)
self.obj = self.callback = self.connection = self.timeout_connection = None
class WaitDBusName(Tasklet):
"""Special tasklet that blocks until a given DBus name is available on the system bus"""
def run(self, name):
bus_obj = dbus.SystemBus().get_object('org.freedesktop.DBus', '/org/freedesktop/DBus')
bus_obj_iface = dbus.proxies.Interface(bus_obj, 'org.freedesktop.DBus')
all_bus_names = bus_obj_iface.ListNames()
if name in all_bus_names:
yield None
while True:
var = yield WaitDBusSignal( bus_obj_iface, 'NameOwnerChanged' )
if var[0] == name:
yield None
class WaitFunc(Tasklet):
"""A special tasklet that will wait for a function to call a callback.
This is useful to reuse old style callback function.
The function should take 2 parameters that are the callback to call
"""
def __init__(self, func):
"""Create the tasklet using a given function
`func` should have this signature : func(on_ok, on_err)
where :
on_ok is a callback to call on return.
on_err is a callback to call in case of an error, that take one single error argument.
"""
super(WaitFun, self).__init__()
self.func = func
def __callback(self, ret = None):
self._callback(ret)
def __err_callback(self, e):
self._err_callback(type(e), e, sys.exc_info()[2])
def start(self, callback, err_callback):
self._callback = callback
self._err_callback = err_callback
self.func(self.__callback, self.__err_callback)
def close(self):
pass
class Producer(Tasklet):
"""
A Producer is a modified Tasklet that is not automatically closed after
returing a value.
This is still expermimental...
"""
def send(self, value = None, *args):
"""Resume and send a value into the tasklet generator
"""
# This somehow complicated try switch is used to handle all possible return and exception
# from the generator function
try:
value = self.generator.send(value)
except Exception:
self.err_callback(*sys.exc_info())
self.close() # This is very important, cause we need to make sure we free the memory of the callback !
return
self.handle_yielded_value(value)
def throw(self, type, value, traceback):
"""Throw an exeption into the tasklet generator"""
try:
value = self.generator.throw(type, value, traceback)
except Exception:
self.err_callback(*sys.exc_info())
self.close() # This is very important, cause we need to make sure we free the memory of the callback !
return
self.handle_yielded_value(value)
def handle_yielded_value(self, value):
"""This method is called after the waiting tasklet yielded a value
We have to take care of two cases:
- If the value is a Tasklet : we start it and connect the call back
to the 'parent' Tasklet send and throw hooks
- Otherwise, we consider that the tasklet finished, and we can call
our callback function
"""
if isinstance(value, GeneratorType):
value = Tasklet(generator = value)
if isinstance(value, Tasklet):
self.waiting = value
value.start_from(self)
else:
assert self.callback, "%s has no callback !" % self
self.callback(value, *self.args, **self.kargs)
class Sleep(Tasklet):
""" This is a 'primitive' tasklet that will trigger our call back after a short time
"""
def __init__(self, time):
"""This tasklet has one parameter"""
super(Sleep, self).__init__()
self.time = time
def start(self, callback, err_callback, *args):
self.event_id = gobject.timeout_add_seconds(self.time, callback, None, *args)
def close(self):
# We cancel the event
gobject.source_remove(self.event_id)
class WaitFileReady(Tasklet):
"""This special Tasklet will block until a file descriptor is ready for reading or sending"""
def __init__(self, fd, cond):
super(WaitFileReady, self).__init__()
self.fd = fd
self.cond = cond
self.event_id = None
def _callback(self, *args):
self.event_id = None
self.callback(*args)
return False
def start(self, callback, err_callback, *args):
self.callback = callback
self.event_id = gobject.io_add_watch(self.fd, self.cond, self._callback, *args)
def close(self):
if self.event_id:
gobject.source_remove(self.event_id)
self.event_id = None
if __name__ == '__main__':
# And here is a simple example application using our tasklet class
import gobject
def example1():
print "== Simple example that waits two times for an input event =="
loop = gobject.MainLoop()
@tasklet
def task1(x):
"""An example Tasklet generator function"""
print "task1 started with value %s" % x
yield Sleep(1)
print "tick"
yield Sleep(1)
print "task1 stopped"
loop.quit()
task1(10).start()
print 'I do other things'
loop.run()
def example2():
print "== We can call a tasklet form an other tasklet =="
@tasklet
def task1():
print "task1 started"
value = yield task2(10)
print "rask2 returned value %s" % value
print "task1 stopped"
@tasklet
def task2(x):
print "task2 started"
print "task2 returns"
yield 2 * x # Return value
task1().start()
def example3():
print "== We can pass exception through tasklets =="
@tasklet
def task1():
try:
yield task2()
except TypeError:
print "task2 raised a TypeError"
yield task4()
@tasklet
def task2():
try:
yield task3()
except TypeError:
print "task3 raised a TypeError"
raise
@tasklet
def task3():
raise TypeError
yield 10
@tasklet
def task4():
print 'task4'
yield 10
task1().start()
def example4():
print "== We can cancel execution of a task before it ends =="
loop = gobject.MainLoop()
@tasklet
def task():
print "task started"
yield Sleep(10)
print "task stopped"
loop.quit()
task = task()
task.start()
# At this point, we decide to cancel the task
task.close()
print "task canceled"
def example5():
print "== A task can choose to perform specific action if it is canceld =="
loop = gobject.MainLoop()
@tasklet
def task():
print "task started"
try:
yield Sleep(1)
except GeneratorExit:
print "Executed before the task is canceled"
raise
print "task stopped"
loop.quit()
task = task()
task.start()
# At this point, we decide to cancel the task
task.close()
print "task canceled"
def example6():
print "== Using WaitFirst, we can wait for several tasks at the same time =="
loop = gobject.MainLoop()
@tasklet
def task1(x):
print "Wait for the first task to return"
value = yield WaitFirst(Sleep(2), Sleep(1))
print value
loop.quit()
task1(10).start()
loop.run()
def example7():
print "== Using Producer, we can create pipes =="
class MyProducer(Producer):
def run(self):
for i in range(3):
yield Sleep(1)
print "producing %d" % i
yield i
class MyConsumer(Tasklet):
def run(self, input):
print "start"
try:
while True:
value = yield input
print "get value %s" % value
except StopIteration:
print "Stop"
loop.quit()
loop = gobject.MainLoop()
MyConsumer(MyProducer()).start()
print "We can do other things in the meanwhile"
loop.run()
def example8():
print "== Using messages to comunicate between tasklets =="
loop = gobject.MainLoop()
@tasklet
def task1():
while True:
msg = yield WaitMessage()
if msg == 'end':
break
print "got message %s" % msg
print "end task1"
loop.quit()
@tasklet
def task2(task):
for i in range(4):
print "sending message %d" % i
yield task.send_message(i)
yield Sleep(1)
yield task.send_message('end')
print "end task2"
task1 = task1()
task1.start()
task2(task1).start()
loop.run()
def test():
print "== Checking memory usage =="
def task1():
yield None
import gc
gc.collect()
n = len(gc.get_objects())
for i in range(1000):
t = Tasklet(generator=task1())
t.start()
del t
gc.collect()
print len(gc.get_objects()) - n
# test()
# example1()
# example2()
# example3()
# example4()
# example6()
# example7()
example8()
|