OmniEvents
SupplierAdmin.cc
Go to the documentation of this file.
1// Package : omniEvents
2// SupplierAdmin.h Created : 2003/12/04
3// Author : Alex Tingle
4//
5// Copyright (C) 2003-2005 Alex Tingle.
6//
7// This file is part of the omniEvents application.
8//
9// omniEvents is free software; you can redistribute it and/or
10// modify it under the terms of the GNU Lesser General Public
11// License as published by the Free Software Foundation; either
12// version 2.1 of the License, or (at your option) any later version.
13//
14// omniEvents is distributed in the hope that it will be useful,
15// but WITHOUT ANY WARRANTY; without even the implied warranty of
16// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17// Lesser General Public License for more details.
18//
19// You should have received a copy of the GNU Lesser General Public
20// License along with this library; if not, write to the Free Software
21// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
22//
23
24#include "SupplierAdmin.h"
25
26#include "EventChannel.h"
27#include "ProxyPushConsumer.h"
28#include "ProxyPullConsumer.h"
29#include "Orb.h"
30#include "PersistNode.h"
31
32#define MILLION 1000000
33#define BILLION 1000000000
34
35namespace OmniEvents {
36
37CosEventChannelAdmin::ProxyPushConsumer_ptr
42
43
44CosEventChannelAdmin::ProxyPullConsumer_ptr
51
52
55 PortableServer::POA_ptr poa
56)
57: Servant(poa),
58 _channel(channel),
59 _pushConsumer(NULL),
60 _pullConsumer(NULL),
61 _queue(),
62 _nextPull(0,0)
63{
64 // Initialise _nextPull. Only set it if the cycle period is LESS than the
65 // pull retry period - otherwise just pull every cycle.
67 {
68 omni_thread::get_time(&(_nextPull.first),&(_nextPull.second));
69 }
70
71 // Always create the ProxyPushConsumer_i default servant. This allows
72 // lazy clients to connect suppliers without having to go through the
73 // proper procedure - they can make up an appropriate ObjectId, call push()
74 // and it will just work (TM).
75 // Note: A SupplierAdmin_i is always created by the EventChannel to allow this
76 // behaviour.
78
79 activateObjectWithId("SupplierAdmin");
80}
81
82
84{
85 DB(20,"~SupplierAdmin_i()")
87 {
88 _pullConsumer->_remove_ref();
90 }
92 {
93 delete _pushConsumer;
95 }
96 for(list<CORBA::Any*>::iterator i=_queue.begin(); i!=_queue.end(); ++i)
97 delete *i;
98}
99
100
102
103
105{
106 if(_pullConsumer)
107 {
108 _pullConsumer->collect();
109 if(0==_nextPull.first)
110 { // No delay between pulls.
111 _pullConsumer->triggerRequest();
112 }
113 else
114 { // Only trigger new pull() calls if `pullRetry' ms have passed.
116 omni_thread::get_time(&(now.first),&(now.second));
117 if(now>=_nextPull)
118 {
119 _pullConsumer->triggerRequest();
120
121 CORBA::ULong p =_channel.pullRetryPeriod_ms();
122 do{
123 _nextPull.second += (p%1000)*MILLION; // nsec
124 _nextPull.first += p/1000 + _nextPull.second/BILLION; // sec
125 _nextPull.second %= BILLION; // nsec
126 } while(now>=_nextPull);
127 }
128 }
129 }
130 _pushConsumer->trigger();
131 // Pick up events from both pull & push consumers.
132 events=_queue;
133 _queue.clear();
134}
135
136
144
145
147{
148 // Build Push Consumer proxies
149 PersistNode* pushcNode =node.child("ProxyPushConsumer");
150 if(pushcNode && !pushcNode->_child.empty())
151 {
154 }
155
156 // Build Pull Consumer proxies
157 PersistNode* pullcNode =node.child("ProxyPullConsumer");
158 if(pullcNode && !pullcNode->_child.empty())
159 {
160 if(!_pullConsumer)
163 }
164}
165
166
174
175
176}; // end namespace OmniEvents
#define OMNIEVENTS__DEBUG_REF_COUNTS__DEFN(C)
Defines debug versions of _add/remove_ref() for class C.
Definition Servant.h:70
#define BILLION
#define MILLION
#define DB(l, x)
Definition Orb.h:49
T::_ptr_type createNarrowedReference(PortableServer::POA_ptr poa, const char *repositoryId)
Helper method that creates a new CORBA object and then narrows it to the appropriate type.
Definition Servant.h:96
Servant for CosEventChannelAdmin::EventChannel objects, also inherits from omni_thread.
ConsumerAdmin_i & consumerAdmin() const
CORBA::ULong pullRetryPeriod_ms() const
unsigned long cyclePeriod_ns() const
PersistNode * child(const string &key) const
void reincarnate(const PersistNode &node)
Re-create servants from information saved in the log file.
void output(ostream &os)
Save this object's state to a stream.
void disconnect()
Send disconnect_pull_supplier() to all connected PullSuppliers.
OMNIEVENTS__DEBUG_REF_COUNTS__DECL CosEventChannelAdmin::ProxyPullConsumer_ptr createObject()
Default servant for ProxyPushConsumer objects.
void output(ostream &os) const
Save this object's state to a stream.
CosEventChannelAdmin::ProxyPushConsumer_ptr createObject()
Constructs a new object.
void reincarnate(const PersistNode &node)
Re-create all servants from information saved in the log file.
void disconnect()
Send disconnect_push_supplier() to all connected PushSuppliers.
Base class for servants.
Definition Servant.h:114
PortableServer::POA_var _poa
Definition Servant.h:131
void activateObjectWithId(const char *oidStr)
Calls activate_object_with_id() to activate this servant in its POA.
Definition Servant.cc:125
list< CORBA::Any * > _queue
Incoming queue for the PushConsumer.
pair< unsigned long, unsigned long > _nextPull
Next time to retry pull (sec,nsec).
CosEventChannelAdmin::ProxyPushConsumer_ptr obtain_push_consumer()
SupplierAdmin_i(const EventChannel_i &channel, PortableServer::POA_ptr poa)
const EventChannel_i & _channel
void disconnect()
Send disconnect_XXX_supplier() to all connected consumers.
void output(ostream &os)
Save this object's state to a stream.
ProxyPushConsumer_i * _pushConsumer
ProxyPullConsumerManager * _pullConsumer
OMNIEVENTS__DEBUG_REF_COUNTS__DECL void collect(list< CORBA::Any * > &events)
Collects all events that have arrived since the last call.
void reincarnate(const PersistNode &node)
Populate this servant from log information.
CosEventChannelAdmin::ProxyPullConsumer_ptr obtain_pull_consumer()