/usr/share/pyshared/couchdbkit/consumer/cgevent.py is in python-couchdbkit 0.6.5-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 | # -*- coding: utf-8 -
#
# This file is part of couchdbkit released under the MIT license.
# See the NOTICE for more information.
from __future__ import with_statement
import traceback
import gevent
from gevent import event
from gevent import monkey
from .base import check_callable
from .sync import SyncConsumer
from ..utils import json
class ChangeConsumer(object):
def __init__(self, db, callback, **params):
self.process_change = callback
self.params = params
self.db = db
self.stop_event = event.Event()
def stop(self):
self.stop_event.set()
def wait(self):
gevent.spawn(self._run)
self.stop_event.wait()
def wait_async(self):
gevent.spawn(self._run)
def _run(self):
while True:
try:
resp = self.db.res.get("_changes", **self.params)
return self.consume(resp)
except (SystemExit, KeyboardInterrupt):
gevent.sleep(5)
break
except:
traceback.print_exc()
gevent.sleep(5)
break
self.stop_event.set()
def consume(self, resp):
raise NotImplementedError
class ContinuousChangeConsumer(ChangeConsumer):
def consume(self, resp):
with resp.body_stream() as body:
while True:
line = body.readline()
if not line:
break
if line.endswith("\r\n"):
line = line[:-2]
else:
line = line[:-1]
if not line:
continue
self.process_change(line)
class LongPollChangeConsumer(ChangeConsumer):
def consume(self, resp):
with resp.body_stream() as body:
buf = []
while True:
data = body.read()
if not data:
break
buf.append(data)
change = "".join(buf)
try:
change = json.loads(change)
except ValueError:
pass
self.process_change(change)
class GeventConsumer(SyncConsumer):
def __init__(self, db):
monkey.patch_socket()
super(GeventConsumer, self).__init__(db)
def _fetch(self, cb, **params):
resp = self.db.res.get("_changes", **params)
cb(resp.json_body)
def fetch(self, cb=None, **params):
if cb is None:
return super(GeventConsumer, self).wait_once(**params)
return gevent.spawn(self._fetch, cb, **params)
def wait_once(self, cb=None, **params):
if cb is None:
return super(GeventConsumer, self).wait_once(**params)
check_callable(cb)
params.update({"feed": "longpoll"})
consumer = LongPollChangeConsumer(self.db, callback=cb,
**params)
consumer.wait()
def wait(self, cb, **params):
check_callable(cb)
params.update({"feed": "continuous"})
consumer = ContinuousChangeConsumer(self.db, callback=cb,
**params)
consumer.wait()
def wait_once_async(self, cb, **params):
check_callable(cb)
params.update({"feed": "longpoll"})
consumer = LongPollChangeConsumer(self.db, callback=cb,
**params)
return consumer.wait_async()
def wait_async(self, cb, **params):
check_callable(cb)
params.update({"feed": "continuous"})
consumer = ContinuousChangeConsumer(self.db, callback=cb,
**params)
return consumer.wait_async()
|