/usr/lib/broctl/BroControl/events.py is in broctl 1.4-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 | import time
import logging
from BroControl import config
from BroControl import util
try:
import broccoli
except ImportError:
broccoli = None
# Broccoli communication with running nodes.
# Sends event to a set of nodes in parallel.
#
# events is a list of tuples of the form (node, event, args, result_event).
# node: the destination node.
# event: the name of the event to send (node that receiver must subscribe
# to it as well).
# args: a list of event args; each arg must be a data type understood by
# the Broccoli module.
# result_event: name of a event the node sends back. None if no event is
# sent back.
#
# Returns a list of tuples (node, success, results_args).
# If success is True, result_args is a list of arguments as shipped with the
# result event, or [] if no result_event was specified.
# If success is False, results_args is a string with an error message.
def send_events_parallel(events):
results = []
sent = []
for (node, event, args, result_event) in events:
if not broccoli:
results += [(node, False, "no Python bindings for Broccoli installed")]
continue
(success, bc) = _send_event_init(node, event, args, result_event)
if success and result_event:
sent += [(node, result_event, bc)]
else:
results += [(node, success, bc)]
for (node, result_event, bc) in sent:
(success, result_args) = _send_event_wait(node, result_event, bc)
results += [(node, success, result_args)]
return results
def _send_event_init(node, event, args, result_event):
host = util.scope_addr(node.addr)
try:
bc = broccoli.Connection("%s:%d" % (host, node.getPort()), broclass="control",
flags=broccoli.BRO_CFLAG_ALWAYS_QUEUE, connect=False)
bc.subscribe(result_event, _event_callback(bc))
bc.got_result = False
bc.connect()
except IOError as e:
logging.debug("broccoli: cannot connect to node %s", node.name)
return (False, str(e))
logging.debug("broccoli: %s(%s) to node %s", event, ", ".join(args), node.name)
bc.send(event, *args)
return (True, bc)
def _send_event_wait(node, result_event, bc):
# Wait until we have sent the event out.
cnt = 0
while bc.processInput():
time.sleep(1)
cnt += 1
if cnt > int(config.Config.commtimeout):
logging.debug("broccoli: timeout during send to node %s", node.name)
return (False, "time-out")
if not result_event:
return (True, [])
# Wait for reply event.
cnt = 0
bc.processInput()
while not bc.got_result:
time.sleep(1)
bc.processInput()
cnt += 1
if cnt > int(config.Config.commtimeout):
logging.debug("broccoli: timeout during receive from node %s", node.name)
return (False, "time-out")
logging.debug("broccoli: %s(%s) from node %s", result_event, ", ".join(bc.result_args), node.name)
return (True, bc.result_args)
def _event_callback(bc):
def save_results(*args):
bc.got_result = True
bc.result_args = args
return save_results
|