This file is indexed.

/usr/lib/python3/dist-packages/opcua/common/events.py is in python3-opcua 0.90.3-1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
import copy

from opcua import ua
import opcua
from opcua.ua.uaerrors import UaError
from opcua.common import ua_utils


class Event(object):
    """
    OPC UA Event object.
    This is class in inherited by the common event objects such as BaseEvent,
    other auto standard events and custom events
    Events are used to trigger events on server side and are
    sent to clients for every events from server

    Developper Warning:
    On server side the data type of attributes should be known, thus
    add properties using the add_property method!!!
    """

    def __init__(self):
        self.server_handle = None
        self.select_clauses = None
        self.event_fields = None
        self.data_types = {}
        # save current attributes
        self.internal_properties = list(self.__dict__.keys())[:] + ["internal_properties"]

    def __str__(self):
        return "{0}({1})".format(
            self.__class__.__name__, 
            [str(k) + ":" + str(v) for k, v in self.__dict__.items() if k not in self.internal_properties])
    __repr__ = __str__

    def add_property(self, name, val, datatype):
        """
        Add a property to event and tore its data type
        """
        setattr(self, name, val)
        self.data_types[name] = datatype

    def get_event_props_as_fields_dict(self):
        """
        convert all properties of the Event class to a dict of variants
        """
        field_vars = {}
        for key, value in vars(self).items():
            if not key.startswith("__") and key not in self.internal_properties:
                field_vars[key] = ua.Variant(value, self.data_types[key])
        return field_vars

    @staticmethod
    def from_field_dict(fields):
        """
        Create an Event object from a dict of name and variants
        """
        ev = Event()
        for k, v in fields.items():
            ev.add_property(k, v.Value, v.VariantType)
        return ev

    def to_event_fields_using_subscription_fields(self, select_clauses):
        """
        Using a new select_clauses and the original select_clauses
        used during subscription, return a field list 
        """
        fields = []
        for sattr in select_clauses:
            for idx, o_sattr in enumerate(self.select_clauses):
                if sattr.BrowsePath == o_sattr.BrowsePath and sattr.AttributeId == o_sattr.AttributeId:
                    fields.append(self.event_fields[idx])
                    break
        return fields

    def to_event_fields(self, select_clauses):
        """
        return a field list using a select clause and the object properties
        """
        fields = []
        for sattr in select_clauses:
            if not sattr.BrowsePath:
                name = ua.AttributeIds(sattr.AttributeId).name
            else:
                name = sattr.BrowsePath[0].Name
            try:
                val = getattr(self, name)
            except AttributeError:
                field = ua.Variant(None)
            else:
                field = ua.Variant(copy.deepcopy(val), self.data_types[name])
            fields.append(field)
        return fields

    @staticmethod
    def from_event_fields(select_clauses, fields):
        """
        Instantiate an Event object from a select_clauses and fields
        """
        ev = Event()
        ev.select_clauses = select_clauses
        ev.event_fields = fields
        for idx, sattr in enumerate(select_clauses):
            if len(sattr.BrowsePath) == 0:
                name = sattr.AttributeId.name
            else:
                name = sattr.BrowsePath[0].Name
            ev.add_property(name, fields[idx].Value, fields[idx].VariantType)
        return ev


def get_filter_from_event_type(eventtypes):
    evfilter = ua.EventFilter()
    evfilter.SelectClauses = select_clauses_from_evtype(eventtypes)
    evfilter.WhereClause = where_clause_from_evtype(eventtypes)
    return evfilter


def select_clauses_from_evtype(evtypes):
    clauses = []

    selected_paths = []
    for evtype in evtypes:
        for prop in get_event_properties_from_type_node(evtype):
            if prop.get_browse_name() not in selected_paths:
                op = ua.SimpleAttributeOperand()
                op.AttributeId = ua.AttributeIds.Value
                op.BrowsePath = [prop.get_browse_name()]
                clauses.append(op)
                selected_paths.append(prop.get_browse_name())
    return clauses


def where_clause_from_evtype(evtypes):
    cf = ua.ContentFilter()
    el = ua.ContentFilterElement()
    
    # operands can be ElementOperand, LiteralOperand, AttributeOperand, SimpleAttribute
    # Create a clause where the generate event type property EventType
    # must be a subtype of events in evtypes argument

    # the first operand is the attribute event type
    op = ua.SimpleAttributeOperand()
    # op.TypeDefinitionId = evtype.nodeid
    op.BrowsePath.append(ua.QualifiedName("EventType", 0))
    op.AttributeId = ua.AttributeIds.Value
    el.FilterOperands.append(op)

    # now create a list of all subtypes we want to accept
    subtypes = []
    for evtype in evtypes:
        subtypes += [st.nodeid for st in ua_utils.get_node_subtypes(evtype)]
    subtypes = list(set(subtypes))  # remove duplicates
    for subtypeid in subtypes:
        op = ua.LiteralOperand()
        op.Value = ua.Variant(subtypeid)
        el.FilterOperands.append(op)

    el.FilterOperator = ua.FilterOperator.InList
    cf.Elements.append(el)

    return cf


def get_event_properties_from_type_node(node):
    properties = []
    curr_node = node

    while True:
        properties.extend(curr_node.get_properties())

        if curr_node.nodeid.Identifier == ua.ObjectIds.BaseEventType:
            break

        parents = curr_node.get_referenced_nodes(refs=ua.ObjectIds.HasSubtype, direction=ua.BrowseDirection.Inverse, includesubtypes=True)
        if len(parents) != 1:  # Something went wrong
            return None
        curr_node = parents[0]

    return properties


def get_event_obj_from_type_node(node):
    """
    return an Event object from an event type node
    """
    if node.nodeid.Identifier in opcua.common.event_objects.IMPLEMENTED_EVENTS.keys():
        return opcua.common.event_objects.IMPLEMENTED_EVENTS[node.nodeid.Identifier]()
    else:
        parent_identifier, parent_eventtype = _find_parent_eventtype(node)

        class CustomEvent(parent_eventtype):

            def __init__(self):
                parent_eventtype.__init__(self)
                self.EventType = node.nodeid
                curr_node = node

                while curr_node.nodeid.Identifier != parent_identifier:
                    for prop in curr_node.get_properties():
                        name = prop.get_browse_name().Name
                        val = prop.get_data_value()
                        self.add_property(name, val.Value.Value, val.Value.VariantType)
                    parents = curr_node.get_referenced_nodes(refs=ua.ObjectIds.HasSubtype, direction=ua.BrowseDirection.Inverse, includesubtypes=True)

                    if len(parents) != 1:  # Something went wrong
                        raise UaError("Parent of event type could notbe found")
                    curr_node = parents[0]

                self._freeze = True

    return CustomEvent()


def _find_parent_eventtype(node):
    """
    """
    parents = node.get_referenced_nodes(refs=ua.ObjectIds.HasSubtype, direction=ua.BrowseDirection.Inverse, includesubtypes=True)

    if len(parents) != 1:   # Something went wrong
        raise UaError("Parent of event type could notbe found")
    if parents[0].nodeid.Identifier in opcua.common.event_objects.IMPLEMENTED_EVENTS.keys():
        return parents[0].nodeid.Identifier, opcua.common.event_objects.IMPLEMENTED_EVENTS[parents[0].nodeid.Identifier]
    else:
        return _find_parent_eventtype(parents[0])