/usr/lib/ruby/vendor_ruby/celluloid/pool_manager.rb is in ruby-celluloid 0.16.0-4.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 | require 'set'
module Celluloid
# Manages a fixed-size pool of workers
# Delegates work (i.e. methods) and supervises workers
# Don't use this class directly. Instead use MyKlass.pool
class PoolManager
include Celluloid
trap_exit :__crash_handler__
finalizer :__shutdown__
def initialize(worker_class, options = {})
@size = options[:size] || [Celluloid.cores || 2, 2].max
raise ArgumentError, "minimum pool size is 2" if @size < 2
@worker_class = worker_class
@args = options[:args] ? Array(options[:args]) : []
@idle = @size.times.map { worker_class.new_link(*@args) }
# FIXME: Another data structure (e.g. Set) would be more appropriate
# here except it causes MRI to crash :o
@busy = []
end
def __shutdown__
terminators = (@idle + @busy).map do |actor|
begin
actor.future(:terminate)
rescue DeadActorError
end
end
terminators.compact.each { |terminator| terminator.value rescue nil }
end
def _send_(method, *args, &block)
worker = __provision_worker__
begin
worker._send_ method, *args, &block
rescue DeadActorError # if we get a dead actor out of the pool
wait :respawn_complete
worker = __provision_worker__
retry
rescue Exception => ex
abort ex
ensure
if worker.alive?
@idle << worker
@busy.delete worker
end
end
end
def name
_send_ @mailbox, :name
end
def is_a?(klass)
_send_ :is_a?, klass
end
def kind_of?(klass)
_send_ :kind_of?, klass
end
def methods(include_ancestors = true)
_send_ :methods, include_ancestors
end
def to_s
_send_ :to_s
end
def inspect
_send_ :inspect
end
def size
@size
end
def size=(new_size)
new_size = [0, new_size].max
if new_size > size
delta = new_size - size
delta.times { @idle << @worker_class.new_link(*@args) }
else
(size - new_size).times do
worker = __provision_worker__
unlink worker
@busy.delete worker
worker.terminate
end
end
@size = new_size
end
def busy_size
@busy.length
end
def idle_size
@idle.length
end
# Provision a new worker
def __provision_worker__
Task.current.guard_warnings = true
while @idle.empty?
# Wait for responses from one of the busy workers
response = exclusive { receive { |msg| msg.is_a?(Response) } }
Thread.current[:celluloid_actor].handle_message(response)
end
worker = @idle.shift
@busy << worker
worker
end
# Spawn a new worker for every crashed one
def __crash_handler__(actor, reason)
@busy.delete actor
@idle.delete actor
return unless reason
@idle << @worker_class.new_link(*@args)
signal :respawn_complete
end
def respond_to?(method, include_private = false)
super || @worker_class.instance_methods.include?(method.to_sym)
end
def method_missing(method, *args, &block)
if respond_to?(method)
_send_ method, *args, &block
else
super
end
end
end
end
|