/usr/lib/ruby/vendor_ruby/yapra/pipeline.rb is in yapra 0.1.2-7.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 | require 'yapra'
require 'yapra/runtime'
require 'yapra/inflector'
require 'yapra/legacy_plugin/base'
class Yapra::Pipeline
attr_reader :yapra, :context
attr_writer :logger
attr_accessor :legacy_plugin_registry
UPPER_CASE = /[A-Z]/
def initialize pipeline_name, yapra=Yapra::Runtime.new
@logger = nil
@yapra = yapra
@context = { 'pipeline_name' => pipeline_name }
@module_name_prefix = construct_module_name_prefix yapra.env
end
def name
self.context[ 'pipeline_name' ]
end
def logger
return @logger || Yapra::Runtime.logger
end
# start pipeline from commands.
#
# example:
#
# pipeline.run([
# {
# 'module' => 'Config::agent',
# 'config' => {
# 'user_agent_alias' => 'Windows IE 6'
# }
# },
# {
# 'module' => 'RSS::load',
# 'config' => {
# 'uri' => 'http://www.example.com/hoge.rdf'
# }
# },
# {
# 'module' => 'print'
# }
# ])
def run pipeline_command, data=[]
@plugins = []
begin
pipeline_command.inject(data) do |data, command|
execute_plugin(command, data.clone)
end
rescue => ex
@plugins.each do |plugin|
begin
plugin.on_error(ex) if plugin.respond_to?('on_error')
rescue => exx
self.logger.error("error is occured when error handling: #{exx.message}")
end
end
raise ex
end
end
def execute_plugin command, data
self.logger.info("exec plugin #{command["module"]}")
if class_based_plugin?(command['module'])
run_class_based_plugin command, data
else
run_legacy_plugin command, data
end
end
protected
def class_based_plugin? command_name
UPPER_CASE =~ command_name.split('::').last[0, 1]
end
def run_legacy_plugin command, data
self.logger.debug("evaluate plugin as legacy")
data = legacy_plugin_registry.get(command['module'])._yapra_run_as_legacy_plugin(command['config'], data)
return data
end
def run_class_based_plugin command, data
self.logger.debug("evaluate plugin as class based")
load_error_stack = []
plugin_class = nil
@module_name_prefix.each do |prefix|
yapra_module_name = "#{prefix}#{command['module']}"
begin
plugin_class = Yapra.load_class_constant(yapra_module_name)
break if plugin_class
rescue LoadError, NameError => ex
load_error_stack << ex
end
end
raise_load_error(load_error_stack, command) unless plugin_class
plugin = initialize_plugin(plugin_class, command)
@plugins << plugin
data = plugin.run(data)
return data
end
def raise_load_error load_error_stack, command
load_error = LoadError.new("#{command['module']} module is not found.")
backtrace = load_error.backtrace || []
load_error_stack.each do |e|
backtrace << "#{e.class.name} in '#{e.message}'"
backtrace = backtrace + e.backtrace
end
load_error.set_backtrace(backtrace)
raise load_error
end
def initialize_plugin plugin_class, command
plugin = plugin_class.new
plugin.yapra = yapra if plugin.respond_to?('yapra=')
plugin.pipeline = self if plugin.respond_to?('pipeline=')
plugin.plugin_config = command['config'] if plugin.respond_to?('plugin_config=')
plugin
end
def construct_module_name_prefix env
module_name_prefix = [ 'Yapra::Plugin::', '' ]
if env['module_name_prefix']
if env['module_name_prefix'].kind_of?(Array)
module_name_prefix = env['module_name_prefix']
else
module_name_prefix = [ env['module_name_prefix'] ]
end
end
module_name_prefix
end
end
|