/usr/include/tango/eventsupplier.h is in libtango8-dev 8.1.2c+dfsg-5.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 | ////////////////////////////////////////////////////////////////////////////////
///
/// file eventsupplier.h
///
/// C++ include file for implementing the TANGO event server and
/// client singleton classes - EventSupplier and EventConsumer.
/// These classes are used to send events from the server
/// to the notification service and to receive events from
/// the notification service.
///
/// author(s) : A.Gotz (goetz@esrf.fr)
///
/// original : 7 April 2003
//
// Copyright (C) : 2003,2004,2005,2006,2007,2008,2009,2010,2011,2012,2013
// European Synchrotron Radiation Facility
// BP 220, Grenoble 38043
// FRANCE
//
// This file is part of Tango.
//
// Tango is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// Tango is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with Tango. If not, see <http://www.gnu.org/licenses/>.
///
/// $Revision: 22302 $
///
/// copyright : European Synchrotron Radiation Facility
/// BP 220, Grenoble 38043
/// FRANCE
///
////////////////////////////////////////////////////////////////////////////////
#ifndef _EVENT_SUPPLIER_API_H
#define _EVENT_SUPPLIER_API_H
#include <except.h>
#if defined (_TG_WINDOWS_) && defined (_USRDLL) && !defined(_TANGO_LIB)
#define USE_stub_in_nt_dll
#endif
#include <COS/CosNotification.hh>
#include <COS/CosNotifyChannelAdmin.hh>
#include <COS/CosNotifyComm.hh>
#include <zmq.hpp>
#if defined (_TG_WINDOWS_) && defined (_USRDLL) && !defined(_TANGO_LIB)
#undef USE_stub_in_nt_dll
#endif
#include <eventconsumer.h>
#include <omnithread.h>
#ifndef _TG_WINDOWS_
#include <sys/time.h>
#endif
#include <math.h>
namespace Tango
{
typedef struct _NotifService
{
CosNotifyChannelAdmin::SupplierAdmin_var SupAdm;
CosNotifyChannelAdmin::ProxyID pID;
CosNotifyChannelAdmin::ProxyConsumer_var ProCon;
CosNotifyChannelAdmin::StructuredProxyPushConsumer_var StrProPush;
CosNotifyChannelAdmin::EventChannelFactory_var EveChaFac;
CosNotifyChannelAdmin::EventChannel_var EveCha;
string ec_ior;
} NotifService;
//---------------------------------------------------------------------
//
// EventSupplier base class
//
//---------------------------------------------------------------------
class EventSupplier
{
public :
EventSupplier(Util *);
virtual ~EventSupplier() {}
void push_att_data_ready_event(DeviceImpl *,const string &,long,DevLong);
struct AttributeData
{
const AttributeValue *attr_val;
const AttributeValue_3 *attr_val_3;
const AttributeValue_4 *attr_val_4;
const AttributeConfig_2 *attr_conf_2;
const AttributeConfig_3 *attr_conf_3;
const AttDataReady *attr_dat_ready;
};
SendEventType detect_and_push_events(DeviceImpl *,struct AttributeData &,DevFailed *,string &,struct timeval *);
//------------------ Change event ---------------------------
bool detect_change(Attribute &,struct AttributeData &,bool,double &,double &,DevFailed *,bool &,DeviceImpl *);
//------------------ Detect, push change event --------------
bool detect_and_push_change_event(DeviceImpl *,struct AttributeData &,Attribute &,string &,DevFailed *,bool user_push = false);
//------------------ Detect, push archive event --------------
bool detect_and_push_archive_event(DeviceImpl *,struct AttributeData &,Attribute &,string &,DevFailed *,struct timeval *,bool user_push = false);
//------------------ Detect, push periodic event -------------
bool detect_and_push_periodic_event(DeviceImpl *,struct AttributeData &,Attribute &,string &,DevFailed *,struct timeval *);
//------------------ Push event -------------------------------
virtual void push_event(DeviceImpl *,string,vector<string> &,vector<double> &,vector<string> &,vector<long> &,struct AttributeData &,string &,DevFailed *) = 0;
virtual void push_heartbeat_event() = 0;
//------------------- Attribute conf change event ---------------------
void push_att_conf_events(DeviceImpl *device_impl,AttributeData &,DevFailed *,string &);
omni_mutex &get_push_mutex() {return push_mutex;}
static omni_mutex &get_event_mutex() {return event_mutex;}
string &get_fqdn_prefix() {return fqdn_prefix;}
bool get_one_subscription_cmd() {return one_subscription_cmd;}
void set_one_subscription_cmd(bool val) {one_subscription_cmd = val;}
protected :
inline int timeval_diff(TimeVal before, TimeVal after)
{
return ((after.tv_sec-before.tv_sec)*1000000 + after.tv_usec - before.tv_usec);
}
static string fqdn_prefix;
// Added a mutex to synchronize the access to
// detect_and_push_change_event and
// detect_and_push_archive_event which are used
// from different threads
static omni_mutex event_mutex;
// Added a mutex to synchronize the access to
// push_event which is used
// from different threads
static omni_mutex push_mutex;
// Added a mutex to synchronize the access to
// detect_event which is used
// from different threads
static omni_mutex detect_mutex;
private:
bool one_subscription_cmd;
};
//---------------------------------------------------------------------
//
// NotifdEventSupplier class
//
//---------------------------------------------------------------------
class NotifdEventSupplier : public EventSupplier, public POA_CosNotifyComm::StructuredPushSupplier
{
public :
TANGO_IMP_EXP static NotifdEventSupplier *create(CORBA::ORB_var,string,Util *);
void connect();
void disconnect_structured_push_supplier();
void disconnect_from_notifd();
void subscription_change(const CosNotification::EventTypeSeq& added,const CosNotification::EventTypeSeq& deled);
void push_heartbeat_event();
string &get_event_channel_ior() {return event_channel_ior;}
void file_db_svr();
//------------------ Push event -------------------------------
virtual void push_event(DeviceImpl *,string,vector<string> &,vector<double> &,vector<string> &,vector<long> &,struct AttributeData &,string &,DevFailed *);
protected :
NotifdEventSupplier(CORBA::ORB_var,
CosNotifyChannelAdmin::SupplierAdmin_var,
CosNotifyChannelAdmin::ProxyID,
CosNotifyChannelAdmin::ProxyConsumer_var,
CosNotifyChannelAdmin::StructuredProxyPushConsumer_var,
CosNotifyChannelAdmin::EventChannelFactory_var,
CosNotifyChannelAdmin::EventChannel_var,
string &,
Util *);
private :
static NotifdEventSupplier *_instance;
CosNotifyChannelAdmin::EventChannel_var eventChannel;
CosNotifyChannelAdmin::SupplierAdmin_var supplierAdmin;
CosNotifyChannelAdmin::ProxyID proxyId;
CosNotifyChannelAdmin::ProxyConsumer_var proxyConsumer;
CosNotifyChannelAdmin::StructuredProxyPushConsumer_var structuredProxyPushConsumer;
CosNotifyChannelAdmin::EventChannelFactory_var eventChannelFactory;
CORBA::ORB_var orb_;
string event_channel_ior;
void reconnect_notifd();
TANGO_IMP_EXP static void connect_to_notifd(NotifService &,CORBA::ORB_var &,string &,Util *);
};
//---------------------------------------------------------------------
//
// ZmqEventSupplier class
//
//---------------------------------------------------------------------
#define LARGE_DATA_THRESHOLD 2048
#define LARGE_DATA_THRESHOLD_ENCODED LARGE_DATA_THRESHOLD * 4
#ifndef HAS_LAMBDA_FUNC
template <typename A1,typename A2,typename R>
struct WantedClient : public binary_function<A1,A2,R>
{
R operator() (A1 conn_client, A2 client) const
{
return conn_client.clnt == client;
}
};
template <typename A1,typename A2,typename R>
struct OldClient : public binary_function<A1,A2,R>
{
R operator() (A1 conn_client, A2 ti) const
{
if (ti > (conn_client.date + 500))
{
return true;
}
else
return false;
}
};
#endif
class ZmqEventSupplier : public EventSupplier
{
public :
TANGO_IMP_EXP static ZmqEventSupplier *create(Util *);
virtual ~ZmqEventSupplier();
//------------------ Push event -------------------------------
void push_heartbeat_event();
virtual void push_event(DeviceImpl *,string,vector<string> &,vector<double> &,vector<string> &,vector<long> &,struct AttributeData &,string &,DevFailed *);
string &get_heartbeat_endpoint() {return heartbeat_endpoint;}
string &get_event_endpoint() {return event_endpoint;}
void create_event_socket();
void create_mcast_event_socket(string &,string &,int,bool);
bool is_event_mcast(string &);
string &get_mcast_event_endpoint(string &);
void init_event_cptr(string &event_name);
size_t get_mcast_event_nb() {return event_mcast.size();}
bool update_connected_client(client_addr *);
void set_double_send() {double_send=true;double_send_heartbeat=true;}
int get_zmq_release() {return zmq_release;}
protected :
ZmqEventSupplier(Util *);
private :
static ZmqEventSupplier *_instance;
struct McastSocketPub
{
string endpoint;
zmq::socket_t *pub_socket;
bool local_client;
bool double_send;
};
struct ConnectedClient
{
client_addr clnt;
time_t date;
};
zmq::context_t zmq_context; // ZMQ context
zmq::socket_t *heartbeat_pub_sock; // heartbeat publisher socket
zmq::socket_t *event_pub_sock; // events publisher socket
map<string,McastSocketPub> event_mcast; // multicast socket(s) map
// The key is the full event name
// ie: tango://kidiboo.esrf.fr:1000/dev/test/10/att.change
string heartbeat_endpoint; // heartbeat publisher endpoint
string host_ip; // Host IP address
string heartbeat_event_name; // The event name used for the heartbeat
ZmqCallInfo heartbeat_call; // The heartbeat call info
cdrMemoryStream heartbeat_call_cdr; //
TangoCdrMemoryStream data_call_cdr;
string event_name;
zmq::message_t endian_mess; // Zmq messages
zmq::message_t endian_mess_2; //
zmq::message_t endian_mess_heartbeat; //
zmq::message_t endian_mess_heartbeat_2;//
zmq::message_t heartbeat_call_mess; //
zmq::message_t heartbeat_call_mess_2; //
unsigned char host_endian; // the host endianess
bool heartbeat_name_init;
bool ip_specified; // The user has specified an IP address
string user_ip; // The specified IP address
string event_endpoint; // event publisher endpoint
map<string,unsigned int> event_cptr; // event counter map
list<ConnectedClient> con_client; // Connected clients
bool double_send; // Double send flag
bool double_send_heartbeat;
int zmq_release; // ZMQ lib release
void tango_bind(zmq::socket_t *,string &);
unsigned char test_endian();
void create_mcast_socket(string &,int,McastSocketPub &);
};
} // End of namespace
#endif // _EVENT_SUPPLIER_API_H
|