/usr/lib/python3/dist-packages/opcua/server/subscription_service.py is in python3-opcua 0.90.3-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 | """
server side implementation of subscription service
"""
from threading import RLock
import logging
from opcua import ua
from opcua.server.internal_subscription import InternalSubscription
class SubscriptionService(object):
def __init__(self, loop, aspace):
self.logger = logging.getLogger(__name__)
self.loop = loop
self.aspace = aspace
self.subscriptions = {}
self._sub_id_counter = 77
self._lock = RLock()
def create_subscription(self, params, callback):
self.logger.info("create subscription with callback: %s", callback)
result = ua.CreateSubscriptionResult()
result.RevisedPublishingInterval = params.RequestedPublishingInterval
result.RevisedLifetimeCount = params.RequestedLifetimeCount
result.RevisedMaxKeepAliveCount = params.RequestedMaxKeepAliveCount
with self._lock:
self._sub_id_counter += 1
result.SubscriptionId = self._sub_id_counter
sub = InternalSubscription(self, result, self.aspace, callback)
sub.start()
self.subscriptions[result.SubscriptionId] = sub
return result
def delete_subscriptions(self, ids):
self.logger.info("delete subscriptions: %s", ids)
res = []
for i in ids:
with self._lock:
if i not in self.subscriptions:
res.append(ua.StatusCode(ua.StatusCodes.BadSubscriptionIdInvalid))
else:
sub = self.subscriptions.pop(i)
sub.stop()
res.append(ua.StatusCode())
return res
def publish(self, acks):
self.logger.info("publish request with acks %s", acks)
with self._lock:
for subid, sub in self.subscriptions.items():
sub.publish([ack.SequenceNumber for ack in acks if ack.SubscriptionId == subid])
def create_monitored_items(self, params):
self.logger.info("create monitored items")
with self._lock:
if params.SubscriptionId not in self.subscriptions:
res = []
for _ in params.ItemsToCreate:
response = ua.MonitoredItemCreateResult()
response.StatusCode = ua.StatusCode(ua.StatusCodes.BadSubscriptionIdInvalid)
res.append(response)
return res
return self.subscriptions[params.SubscriptionId].monitored_item_srv.create_monitored_items(params)
def modify_monitored_items(self, params):
self.logger.info("modify monitored items")
with self._lock:
if params.SubscriptionId not in self.subscriptions:
res = []
for _ in params.ItemsToModify:
result = ua.MonitoredItemModifyResult()
result.StatusCode = ua.StatusCode(ua.StatusCodes.BadSubscriptionIdInvalid)
res.append(result)
return res
return self.subscriptions[params.SubscriptionId].monitored_item_srv.modify_monitored_items(params)
def delete_monitored_items(self, params):
self.logger.info("delete monitored items")
with self._lock:
if params.SubscriptionId not in self.subscriptions:
res = []
for _ in params.MonitoredItemIds:
res.append(ua.StatusCode(ua.StatusCodes.BadSubscriptionIdInvalid))
return res
return self.subscriptions[params.SubscriptionId].monitored_item_srv.delete_monitored_items(
params.MonitoredItemIds)
def republish(self, params):
with self._lock:
if params.SubscriptionId not in self.subscriptions:
# TODO: what should I do?
return ua.NotificationMessage()
return self.subscriptions[params.SubscriptionId].republish(params.RetransmitSequenceNumber)
def trigger_event(self, event):
with self._lock:
for sub in self.subscriptions.values():
sub.monitored_item_srv.trigger_event(event)
|