/usr/lib/python3/dist-packages/opcua/common/events.py is in python3-opcua 0.90.3-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 | import copy
from opcua import ua
import opcua
from opcua.ua.uaerrors import UaError
from opcua.common import ua_utils
class Event(object):
"""
OPC UA Event object.
This is class in inherited by the common event objects such as BaseEvent,
other auto standard events and custom events
Events are used to trigger events on server side and are
sent to clients for every events from server
Developper Warning:
On server side the data type of attributes should be known, thus
add properties using the add_property method!!!
"""
def __init__(self):
self.server_handle = None
self.select_clauses = None
self.event_fields = None
self.data_types = {}
# save current attributes
self.internal_properties = list(self.__dict__.keys())[:] + ["internal_properties"]
def __str__(self):
return "{0}({1})".format(
self.__class__.__name__,
[str(k) + ":" + str(v) for k, v in self.__dict__.items() if k not in self.internal_properties])
__repr__ = __str__
def add_property(self, name, val, datatype):
"""
Add a property to event and tore its data type
"""
setattr(self, name, val)
self.data_types[name] = datatype
def get_event_props_as_fields_dict(self):
"""
convert all properties of the Event class to a dict of variants
"""
field_vars = {}
for key, value in vars(self).items():
if not key.startswith("__") and key not in self.internal_properties:
field_vars[key] = ua.Variant(value, self.data_types[key])
return field_vars
@staticmethod
def from_field_dict(fields):
"""
Create an Event object from a dict of name and variants
"""
ev = Event()
for k, v in fields.items():
ev.add_property(k, v.Value, v.VariantType)
return ev
def to_event_fields_using_subscription_fields(self, select_clauses):
"""
Using a new select_clauses and the original select_clauses
used during subscription, return a field list
"""
fields = []
for sattr in select_clauses:
for idx, o_sattr in enumerate(self.select_clauses):
if sattr.BrowsePath == o_sattr.BrowsePath and sattr.AttributeId == o_sattr.AttributeId:
fields.append(self.event_fields[idx])
break
return fields
def to_event_fields(self, select_clauses):
"""
return a field list using a select clause and the object properties
"""
fields = []
for sattr in select_clauses:
if not sattr.BrowsePath:
name = ua.AttributeIds(sattr.AttributeId).name
else:
name = sattr.BrowsePath[0].Name
try:
val = getattr(self, name)
except AttributeError:
field = ua.Variant(None)
else:
field = ua.Variant(copy.deepcopy(val), self.data_types[name])
fields.append(field)
return fields
@staticmethod
def from_event_fields(select_clauses, fields):
"""
Instantiate an Event object from a select_clauses and fields
"""
ev = Event()
ev.select_clauses = select_clauses
ev.event_fields = fields
for idx, sattr in enumerate(select_clauses):
if len(sattr.BrowsePath) == 0:
name = sattr.AttributeId.name
else:
name = sattr.BrowsePath[0].Name
ev.add_property(name, fields[idx].Value, fields[idx].VariantType)
return ev
def get_filter_from_event_type(eventtypes):
evfilter = ua.EventFilter()
evfilter.SelectClauses = select_clauses_from_evtype(eventtypes)
evfilter.WhereClause = where_clause_from_evtype(eventtypes)
return evfilter
def select_clauses_from_evtype(evtypes):
clauses = []
selected_paths = []
for evtype in evtypes:
for prop in get_event_properties_from_type_node(evtype):
if prop.get_browse_name() not in selected_paths:
op = ua.SimpleAttributeOperand()
op.AttributeId = ua.AttributeIds.Value
op.BrowsePath = [prop.get_browse_name()]
clauses.append(op)
selected_paths.append(prop.get_browse_name())
return clauses
def where_clause_from_evtype(evtypes):
cf = ua.ContentFilter()
el = ua.ContentFilterElement()
# operands can be ElementOperand, LiteralOperand, AttributeOperand, SimpleAttribute
# Create a clause where the generate event type property EventType
# must be a subtype of events in evtypes argument
# the first operand is the attribute event type
op = ua.SimpleAttributeOperand()
# op.TypeDefinitionId = evtype.nodeid
op.BrowsePath.append(ua.QualifiedName("EventType", 0))
op.AttributeId = ua.AttributeIds.Value
el.FilterOperands.append(op)
# now create a list of all subtypes we want to accept
subtypes = []
for evtype in evtypes:
subtypes += [st.nodeid for st in ua_utils.get_node_subtypes(evtype)]
subtypes = list(set(subtypes)) # remove duplicates
for subtypeid in subtypes:
op = ua.LiteralOperand()
op.Value = ua.Variant(subtypeid)
el.FilterOperands.append(op)
el.FilterOperator = ua.FilterOperator.InList
cf.Elements.append(el)
return cf
def get_event_properties_from_type_node(node):
properties = []
curr_node = node
while True:
properties.extend(curr_node.get_properties())
if curr_node.nodeid.Identifier == ua.ObjectIds.BaseEventType:
break
parents = curr_node.get_referenced_nodes(refs=ua.ObjectIds.HasSubtype, direction=ua.BrowseDirection.Inverse, includesubtypes=True)
if len(parents) != 1: # Something went wrong
return None
curr_node = parents[0]
return properties
def get_event_obj_from_type_node(node):
"""
return an Event object from an event type node
"""
if node.nodeid.Identifier in opcua.common.event_objects.IMPLEMENTED_EVENTS.keys():
return opcua.common.event_objects.IMPLEMENTED_EVENTS[node.nodeid.Identifier]()
else:
parent_identifier, parent_eventtype = _find_parent_eventtype(node)
class CustomEvent(parent_eventtype):
def __init__(self):
parent_eventtype.__init__(self)
self.EventType = node.nodeid
curr_node = node
while curr_node.nodeid.Identifier != parent_identifier:
for prop in curr_node.get_properties():
name = prop.get_browse_name().Name
val = prop.get_data_value()
self.add_property(name, val.Value.Value, val.Value.VariantType)
parents = curr_node.get_referenced_nodes(refs=ua.ObjectIds.HasSubtype, direction=ua.BrowseDirection.Inverse, includesubtypes=True)
if len(parents) != 1: # Something went wrong
raise UaError("Parent of event type could notbe found")
curr_node = parents[0]
self._freeze = True
return CustomEvent()
def _find_parent_eventtype(node):
"""
"""
parents = node.get_referenced_nodes(refs=ua.ObjectIds.HasSubtype, direction=ua.BrowseDirection.Inverse, includesubtypes=True)
if len(parents) != 1: # Something went wrong
raise UaError("Parent of event type could notbe found")
if parents[0].nodeid.Identifier in opcua.common.event_objects.IMPLEMENTED_EVENTS.keys():
return parents[0].nodeid.Identifier, opcua.common.event_objects.IMPLEMENTED_EVENTS[parents[0].nodeid.Identifier]
else:
return _find_parent_eventtype(parents[0])
|