/usr/share/backgroundrb/server/lib/master_worker.rb is in libbackgroundrb-ruby1.8 1.1-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 | #!/usr/bin/env ruby
module BackgrounDRb
# Class wraps a logger object for debugging internal errors within server
class DebugMaster
attr_accessor :log_mode,:logger,:log_flag
def initialize(log_mode,log_flag = true)
@log_mode = log_mode
@log_flag = log_flag
if @log_mode == :foreground
@logger = ::Logger.new(STDOUT)
else
@logger = ::Logger.new("#{RAILS_HOME}/log/backgroundrb_debug_#{BDRB_CONFIG[:backgroundrb][:port]}.log")
end
end
def info(data)
return unless @log_flag
@logger.info(data)
end
def debug(data)
return unless @log_flag
@logger.debug(data)
end
end
class MasterWorker
attr_accessor :debug_logger
include BackgrounDRb::BdrbServerHelper
# receives requests from rails and based on request type invoke appropriate method
def receive_data p_data
@tokenizer.extract(p_data) do |b_data|
begin
t_data = load_data b_data
if t_data
case t_data[:type]
# async method invocation
when :async_invoke: async_method_invoke(t_data)
# get status/result
when :get_result: get_result_object(t_data)
# sync method invocation
when :sync_invoke: method_invoke(t_data)
when :start_worker: start_worker_request(t_data)
when :delete_worker: delete_drb_worker(t_data)
when :worker_info: pass_worker_info(t_data)
when :all_worker_info: all_worker_info(t_data)
else; debug_logger.info("Invalid request")
end
end
rescue Exception => e
debug_logger.info(e)
debug_logger.info(e.backtrace.join("\n"))
send_object(nil)
end
end
end
# Send worker info to the user
def pass_worker_info(t_data)
worker_name_key = gen_worker_key(t_data[:worker],t_data[:worker_key])
worker_instance = reactor.live_workers[worker_name_key]
info_response = { :worker => t_data[:worker],:worker_key => t_data[:worker_key]}
worker_instance ? (info_response[:status] = :running) : (info_response[:status] = :stopped)
send_object(info_response)
end
# collect all worker info in an array and send to the user
def all_worker_info(t_data)
info_response = []
reactor.live_workers.each do |key,value|
worker_key = (value.worker_key.to_s).gsub(/#{value.worker_name}_?/,"")
info_response << { :worker => value.worker_name,:worker_key => worker_key,:status => :running }
end
send_object(info_response)
end
# Delete the worker. Sends TERM signal to the worker process and removes
# worker key from list of available workers
def delete_drb_worker(t_data)
worker_name = t_data[:worker]
worker_key = t_data[:worker_key]
worker_name_key = gen_worker_key(worker_name,worker_key)
begin
worker_instance = reactor.live_workers[worker_name_key]
raise Packet::InvalidWorker.new("Invalid worker with name #{worker_name} key #{worker_key}") unless worker_instance
Process.kill('TERM',worker_instance.pid)
# Warning: Change is temporary, may break things
reactor.live_workers.delete(worker_name_key)
rescue Packet::DisconnectError => sock_error
reactor.remove_worker(sock_error)
rescue
debug_logger.info($!.to_s)
debug_logger.info($!.backtrace.join("\n"))
end
end
# start a new worker
def start_worker_request(p_data)
start_worker(p_data)
end
# Invoke an asynchronous method on a worker
def async_method_invoke(t_data)
worker_name = t_data[:worker]
worker_name_key = gen_worker_key(worker_name,t_data[:worker_key])
t_data.delete(:worker)
t_data.delete(:type)
begin
ask_worker(worker_name_key,:data => t_data, :type => :request, :result => false)
rescue Packet::DisconnectError => sock_error
reactor.live_workers.delete(worker_name_key)
rescue
debug_logger.info($!.message)
debug_logger.info($!.backtrace.join("\n"))
return
end
end
# Given a cache key, ask the worker for result stored in it.
# If you are using Memcache for result storage, this method won't be
# called at all and bdrb client library will directly fetch
# the results from memcache and return
def get_result_object(t_data)
worker_name = t_data[:worker]
worker_name_key = gen_worker_key(worker_name,t_data[:worker_key])
t_data.delete(:worker)
t_data.delete(:type)
begin
ask_worker(worker_name_key,:data => t_data, :type => :get_result,:result => true)
rescue Packet::DisconnectError => sock_error
reactor.live_workers.delete(worker_name_key)
rescue
debug_logger.info($!.to_s)
debug_logger.info($!.backtrace.join("\n"))
return
end
end
# Invoke a synchronous/blocking method on a worker.
def method_invoke(t_data)
worker_name = t_data[:worker]
worker_name_key = gen_worker_key(worker_name,t_data[:worker_key])
t_data.delete(:worker)
t_data.delete(:type)
begin
ask_worker(worker_name_key,:data => t_data, :type => :request,:result => true)
rescue Packet::DisconnectError => sock_error
reactor.live_workers.delete(worker_name_key)
rescue
debug_logger.info($!.message)
debug_logger.info($!.backtrace.join("\n"))
return
end
end
# Receieve responses from workers and dispatch them back to the client
def worker_receive p_data
send_object(p_data)
end
def unbind; end
# called whenever a new connection is made.Initializes binary data parser
def post_init
@tokenizer = Packet::BinParser.new
end
def connection_completed; end
end
end
|