/usr/lib/python2.7/dist-packages/txwinrm/subscribe.py is in python-txwinrm 1.3.3-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 | ##############################################################################
#
# Copyright (C) Zenoss, Inc. 2013, all rights reserved.
#
# This content is made available according to terms specified in the LICENSE
# file at the top-level directory of this package.
#
##############################################################################
import logging
from collections import namedtuple
from twisted.internet import defer
from . import constants as c
from .util import create_etree_request_sender, get_datetime
log = logging.getLogger('winrm')
_MAX_PULL_REQUESTS_PER_BATCH = 999999
_EVENT_QUERY_FMT = '<QueryList><Query Path="{path}">' \
'<Select>{select}</Select></Query></QueryList>'
Event = namedtuple('Event', 'system data rendering_info')
System = namedtuple('System', [
'provider',
'event_id',
'event_id_qualifiers',
'level',
'task',
'keywords',
'time_created',
'event_record_id',
'channel',
'computer',
'user_id'])
RenderingInfo = namedtuple('RenderingInfo', [
'culture',
'message',
'level',
'opcode',
'keywords'])
def _find_subscription_id(subscribe_resp_elem):
xpath = './/{%s}Identifier' % c.XML_NS_EVENTING
return subscribe_resp_elem.findtext(xpath).strip()
def _find_enumeration_context(resp_elem):
xpath = './/{%s}EnumerationContext' % c.XML_NS_ENUMERATION
return resp_elem.findtext(xpath).strip()
def _event_attr(elem, localname, attr):
subelem = elem.find('.//{%s}%s' % (c.XML_NS_MSEVENT, localname))
return None if subelem is None else subelem.get(attr)
def _event_text(elem, localname):
text = elem.findtext('.//{%s}%s' % (c.XML_NS_MSEVENT, localname))
return None if text is None else text.strip()
def _event_datetime(elem, localname, attr):
date_str = _event_attr(elem, localname, attr)
return get_datetime(date_str)
def _event_list(elem, localname):
texts = []
for e in elem.findall('.//{%s}%s' % (c.XML_NS_MSEVENT, localname)):
texts.append(e.text)
return texts
def _safe_int(text, base=10):
return None if text is None else int(text, base)
def _find_events(pull_resp_elem):
event_elems = pull_resp_elem.findall('.//{%s}Event' % c.XML_NS_MSEVENT)
for event_elem in event_elems:
system_elem = event_elem.find('.//{%s}System' % c.XML_NS_MSEVENT)
ri_elem = event_elem.find('.//{%s}RenderingInfo' % c.XML_NS_MSEVENT)
system = System(
provider=_event_attr(system_elem, 'Provider', 'Name'),
event_id=_safe_int(_event_text(system_elem, 'EventID')),
event_id_qualifiers=_safe_int(_event_attr(
system_elem, 'EventID', 'Qualifiers')),
level=_safe_int(_event_text(system_elem, 'Level')),
task=_safe_int(_event_text(system_elem, 'Task')),
keywords=_safe_int(_event_text(system_elem, 'Keywords'), 16),
time_created=_event_datetime(
system_elem, 'TimeCreated', 'SystemTime'),
event_record_id=_safe_int(_event_text(
system_elem, 'EventRecordID')),
channel=_event_text(system_elem, 'Channel'),
computer=_event_text(system_elem, 'Computer'),
user_id=_event_attr(system_elem, 'Security', 'UserID'))
if ri_elem is None:
rendering_info = None
else:
rendering_info = RenderingInfo(
culture=ri_elem.get('Culture'),
message=_event_text(ri_elem, 'Message'),
level=_event_text(ri_elem, 'Level'),
opcode=_event_text(ri_elem, 'Opcode'),
keywords=_event_list(ri_elem, 'Keyword'))
yield Event(
system=system,
data=_event_text(event_elem, 'Data'),
rendering_info=rendering_info)
class EventSubscription(object):
def __init__(self, sender):
self._sender = sender
self._subscription_id = None
self._enumeration_context = None
@defer.inlineCallbacks
def subscribe(self, path='Application', select='*'):
if self._subscription_id is not None:
raise Exception('You must unsubscribe first.')
event_query = _EVENT_QUERY_FMT.format(path=path, select=select)
resp_elem = yield self._send_subscribe(event_query)
self._subscription_id = _find_subscription_id(resp_elem)
self._enumeration_context = _find_enumeration_context(resp_elem)
@defer.inlineCallbacks
def _send_subscribe(self, event_query):
resp_elem = yield self._sender.send_request(
'subscribe', event_query=event_query)
defer.returnValue(resp_elem)
@defer.inlineCallbacks
def pull_once(self, process_event_func):
if self._subscription_id is None:
raise Exception('You must subscribe first.')
resp_elem = yield self._send_pull(self._enumeration_context)
self._enumeration_context = _find_enumeration_context(resp_elem)
for event in _find_events(resp_elem):
process_event_func(event)
@defer.inlineCallbacks
def pull(self, process_event_func):
if self._subscription_id is None:
raise Exception('You must subscribe first.')
request_count = 0
while request_count < _MAX_PULL_REQUESTS_PER_BATCH:
request_count += 1
resp_elem = yield self._send_pull(self._enumeration_context)
self._enumeration_context = _find_enumeration_context(resp_elem)
found_events = 0
for event in _find_events(resp_elem):
found_events += 1
process_event_func(event)
if not found_events:
break
else:
raise Exception('Reached max pull requests per batch.')
@defer.inlineCallbacks
def _send_pull(self, enumeration_context):
resp_elem = yield self._sender.send_request(
'event_pull', enumeration_context=enumeration_context)
defer.returnValue(resp_elem)
@defer.inlineCallbacks
def unsubscribe(self):
if self._subscription_id is None:
return
yield self._send_unsubscribe(self._subscription_id)
self._subscription_id = None
self._enumeration_context = None
@defer.inlineCallbacks
def _send_unsubscribe(self, subscription_id):
resp_elem = yield self._sender.send_request(
'unsubscribe', subscription_id=subscription_id)
defer.returnValue(resp_elem)
def create_event_subscription(conn_info):
sender = create_etree_request_sender(conn_info)
return EventSubscription(sender)
|