This file is indexed.

/usr/lib/python3/dist-packages/opcua/server/subscription_service.py is in python3-opcua 0.90.3-1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
"""
server side implementation of subscription service
"""

from threading import RLock
import logging

from opcua import ua
from opcua.server.internal_subscription import InternalSubscription


class SubscriptionService(object):

    def __init__(self, loop, aspace):
        self.logger = logging.getLogger(__name__)
        self.loop = loop
        self.aspace = aspace
        self.subscriptions = {}
        self._sub_id_counter = 77
        self._lock = RLock()

    def create_subscription(self, params, callback):
        self.logger.info("create subscription with callback: %s", callback)
        result = ua.CreateSubscriptionResult()
        result.RevisedPublishingInterval = params.RequestedPublishingInterval
        result.RevisedLifetimeCount = params.RequestedLifetimeCount
        result.RevisedMaxKeepAliveCount = params.RequestedMaxKeepAliveCount
        with self._lock:
            self._sub_id_counter += 1
            result.SubscriptionId = self._sub_id_counter

            sub = InternalSubscription(self, result, self.aspace, callback)
            sub.start()
            self.subscriptions[result.SubscriptionId] = sub

            return result

    def delete_subscriptions(self, ids):
        self.logger.info("delete subscriptions: %s", ids)
        res = []
        for i in ids:
            with self._lock:
                if i not in self.subscriptions:
                    res.append(ua.StatusCode(ua.StatusCodes.BadSubscriptionIdInvalid))
                else:
                    sub = self.subscriptions.pop(i)
                    sub.stop()
                    res.append(ua.StatusCode())
        return res

    def publish(self, acks):
        self.logger.info("publish request with acks %s", acks)
        with self._lock:
            for subid, sub in self.subscriptions.items():
                sub.publish([ack.SequenceNumber for ack in acks if ack.SubscriptionId == subid])

    def create_monitored_items(self, params):
        self.logger.info("create monitored items")
        with self._lock:
            if params.SubscriptionId not in self.subscriptions:
                res = []
                for _ in params.ItemsToCreate:
                    response = ua.MonitoredItemCreateResult()
                    response.StatusCode = ua.StatusCode(ua.StatusCodes.BadSubscriptionIdInvalid)
                    res.append(response)
                return res
            return self.subscriptions[params.SubscriptionId].monitored_item_srv.create_monitored_items(params)

    def modify_monitored_items(self, params):
        self.logger.info("modify monitored items")
        with self._lock:
            if params.SubscriptionId not in self.subscriptions:
                res = []
                for _ in params.ItemsToModify:
                    result = ua.MonitoredItemModifyResult()
                    result.StatusCode = ua.StatusCode(ua.StatusCodes.BadSubscriptionIdInvalid)
                    res.append(result)
                return res
            return self.subscriptions[params.SubscriptionId].monitored_item_srv.modify_monitored_items(params)

    def delete_monitored_items(self, params):
        self.logger.info("delete monitored items")
        with self._lock:
            if params.SubscriptionId not in self.subscriptions:
                res = []
                for _ in params.MonitoredItemIds:
                    res.append(ua.StatusCode(ua.StatusCodes.BadSubscriptionIdInvalid))
                return res
            return self.subscriptions[params.SubscriptionId].monitored_item_srv.delete_monitored_items(
                params.MonitoredItemIds)

    def republish(self, params):
        with self._lock:
            if params.SubscriptionId not in self.subscriptions:
                # TODO: what should I do?
                return ua.NotificationMessage()
            return self.subscriptions[params.SubscriptionId].republish(params.RetransmitSequenceNumber)

    def trigger_event(self, event):
        with self._lock:
            for sub in self.subscriptions.values():
                sub.monitored_item_srv.trigger_event(event)