/usr/lib/python2.7/dist-packages/celery/worker/autoscale.py is in python-celery 3.1.20-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 | # -*- coding: utf-8 -*-
"""
celery.worker.autoscale
~~~~~~~~~~~~~~~~~~~~~~~
This module implements the internal thread responsible
for growing and shrinking the pool according to the
current autoscale settings.
The autoscale thread is only enabled if :option:`--autoscale`
has been enabled on the command-line.
"""
from __future__ import absolute_import
import os
import threading
from time import sleep
from kombu.async.semaphore import DummyLock
from celery import bootsteps
from celery.five import monotonic
from celery.utils.log import get_logger
from celery.utils.threads import bgThread
from . import state
from .components import Pool
__all__ = ['Autoscaler', 'WorkerComponent']
logger = get_logger(__name__)
debug, info, error = logger.debug, logger.info, logger.error
AUTOSCALE_KEEPALIVE = float(os.environ.get('AUTOSCALE_KEEPALIVE', 30))
class WorkerComponent(bootsteps.StartStopStep):
label = 'Autoscaler'
conditional = True
requires = (Pool, )
def __init__(self, w, **kwargs):
self.enabled = w.autoscale
w.autoscaler = None
def create(self, w):
scaler = w.autoscaler = self.instantiate(
w.autoscaler_cls,
w.pool, w.max_concurrency, w.min_concurrency,
worker=w, mutex=DummyLock() if w.use_eventloop else None,
)
return scaler if not w.use_eventloop else None
def register_with_event_loop(self, w, hub):
w.consumer.on_task_message.add(w.autoscaler.maybe_scale)
hub.call_repeatedly(
w.autoscaler.keepalive, w.autoscaler.maybe_scale,
)
class Autoscaler(bgThread):
def __init__(self, pool, max_concurrency,
min_concurrency=0, worker=None,
keepalive=AUTOSCALE_KEEPALIVE, mutex=None):
super(Autoscaler, self).__init__()
self.pool = pool
self.mutex = mutex or threading.Lock()
self.max_concurrency = max_concurrency
self.min_concurrency = min_concurrency
self.keepalive = keepalive
self._last_action = None
self.worker = worker
assert self.keepalive, 'cannot scale down too fast.'
def body(self):
with self.mutex:
self.maybe_scale()
sleep(1.0)
def _maybe_scale(self, req=None):
procs = self.processes
cur = min(self.qty, self.max_concurrency)
if cur > procs:
self.scale_up(cur - procs)
return True
elif cur < procs:
self.scale_down((procs - cur) - self.min_concurrency)
return True
def maybe_scale(self, req=None):
if self._maybe_scale(req):
self.pool.maintain_pool()
def update(self, max=None, min=None):
with self.mutex:
if max is not None:
if max < self.max_concurrency:
self._shrink(self.processes - max)
self.max_concurrency = max
if min is not None:
if min > self.min_concurrency:
self._grow(min - self.min_concurrency)
self.min_concurrency = min
return self.max_concurrency, self.min_concurrency
def force_scale_up(self, n):
with self.mutex:
new = self.processes + n
if new > self.max_concurrency:
self.max_concurrency = new
self.min_concurrency += 1
self._grow(n)
def force_scale_down(self, n):
with self.mutex:
new = self.processes - n
if new < self.min_concurrency:
self.min_concurrency = max(new, 0)
self._shrink(min(n, self.processes))
def scale_up(self, n):
self._last_action = monotonic()
return self._grow(n)
def scale_down(self, n):
if n and self._last_action and (
monotonic() - self._last_action > self.keepalive):
self._last_action = monotonic()
return self._shrink(n)
def _grow(self, n):
info('Scaling up %s processes.', n)
self.pool.grow(n)
self.worker.consumer._update_prefetch_count(n)
def _shrink(self, n):
info('Scaling down %s processes.', n)
try:
self.pool.shrink(n)
except ValueError:
debug("Autoscaler won't scale down: all processes busy.")
except Exception as exc:
error('Autoscaler: scale_down: %r', exc, exc_info=True)
self.worker.consumer._update_prefetch_count(-n)
def info(self):
return {'max': self.max_concurrency,
'min': self.min_concurrency,
'current': self.processes,
'qty': self.qty}
@property
def qty(self):
return len(state.reserved_requests)
@property
def processes(self):
return self.pool.num_processes
|