OmniEvents
ProxyPushSupplier.cc
Go to the documentation of this file.
1// Package : omniEvents
2// ProxyPushSupplier.cc Created : 2003/12/04
3// Author : Alex Tingle
4//
5// Copyright (C) 2003,2005 Alex Tingle.
6//
7// This file is part of the omniEvents application.
8//
9// omniEvents is free software; you can redistribute it and/or
10// modify it under the terms of the GNU Lesser General Public
11// License as published by the Free Software Foundation; either
12// version 2.1 of the License, or (at your option) any later version.
13//
14// omniEvents is distributed in the hope that it will be useful,
15// but WITHOUT ANY WARRANTY; without even the implied warranty of
16// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17// Lesser General Public License for more details.
18//
19// You should have received a copy of the GNU Lesser General Public
20// License along with this library; if not, write to the Free Software
21// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
22//
23
24#include "ProxyPushSupplier.h"
25#include "Orb.h"
26#include "omniEventsLog.h"
27#include "PersistNode.h"
28#include <assert.h>
29
30namespace OmniEvents {
31
37public:
39 ~omni_mutex_kcol(void) { mutex.lock(); }
40private:
41 // dummy copy constructor and operator= to prevent copying
44};
45
46
47//
48// ProxyPushSupplierManager
49//
50
51PortableServer::Servant
53 const PortableServer::ObjectId& oid,
54 PortableServer::POA_ptr poa
55)
56{
58 PauseThenWake p(this);
59 _servants.insert(result);
60 return result;
61}
62
63void
65 const PortableServer::ObjectId& oid,
66 PortableServer::POA_ptr adapter,
67 PortableServer::Servant serv,
68 CORBA::Boolean cleanup_in_progress,
69 CORBA::Boolean remaining_activations
70)
71{
72 // This etherealize method needs a special implementation because
73 // ProxyPushSupplier_i objects are freed with _remove_ref() rather than
74 // delete.
75 // Otherwise, this method strongly resembles ProxyManager::etherealize().
79 set<Proxy*>::iterator pos =_servants.find(narrowed);
80 if(pos!=_servants.end())
81 {
82 _servants.erase(pos);
83 narrowed->_remove_ref();
84 }
85 else
86 {
87 DB(1,"\t\teh? - POA attempted to etherealize unknown servant.");
88 }
89}
90
92 PortableServer::POA_ptr parentPoa,
94)
96 omni_thread(NULL,PRIORITY_HIGH),
97 _queue(q),
98 _lock(),_condition(&_lock),
99 _refCount(1)
100{
101 ProxyManager::activate("ProxyPushSupplier");
103}
104
106{
107 DB(20,"~ProxyPushSupplierManager()")
108}
109
110CosEventChannelAdmin::ProxyPushSupplier_ptr
112{
114 _managedPoa.in(),
115 CosEventChannelAdmin::_tc_ProxyPushSupplier->id()
116 );
117}
118
120{
121 for(set<Proxy*>::iterator i =_servants.begin(); i!=_servants.end(); ++i)
122 {
123 Proxy* p =*i; // Sun's CC requires this temporary.
125 // We are in the EventChannel's thread.
126 // Make sure all calls go though the ProxyPushSupplier POA.
127 CosEventChannelAdmin::ProxyPushSupplier_var ppsv =pps->_this();
129 }
130}
131
132void*
134{
135 // This loop repeatedly triggers all of the servants in turn. As long as
136 // something happens each time, then we loop as fast as we can.
137 // As soon as activity dries up, we start to wait longer and longer between
138 // loops (up to a maximum). When there is no work to do, just block until
139 // a new event arrives.
140 //
141 // Rationale: The faster we loop the more events we can deliver to each
142 // consumer per second. However, when nothing is happening, this busy loop
143 // just soaks up CPU and kills performance. The optimum sleep time varies
144 // wildly from platform to platform, and also depends upon the typical ping
145 // time to the consumers.
146 //
147 // This dynamic approach should deliver reasonable performance when things
148 // are hectic, but not soak up too much CPU when not much is happening.
149 //
150 const unsigned long sleepTimeNanosec0 =0x8000; // 33us (doubled before use)
151 const unsigned long maxSleepNanosec =0x800000; // 8.4ms
152 unsigned long sleepTimeNanosec =sleepTimeNanosec0;
153
155 while(true)
156 {
157 try {
158 if(_refCount<1)
159 break;
160
161 bool busy=false;
162 bool waiting=false;
163
164 // Trigger each servant in turn.
165 for(set<Proxy*>::iterator i =_servants.begin(); i!=_servants.end(); ++i)
166 {
167 Proxy* p =*i; // Sun's CC requires this temporary.
170 }
171
172 if(busy)
173 {
174 // Something happened last time round. So we'll be optimistic and
175 // immediately go round for another go. Briefly unlock the mutex first,
176 // just to let the other kids get in if they need to.
177 omni_mutex_kcol l(_lock); // 'lock' reversed!
178 // Reset the sleep time.
180 }
181 else if(waiting)
182 {
183 // Nothing happened, so we'll wait for a bit and then give it another
184 // go. Each time we wait for twice as long, up to the maximum.
186 sleepTimeNanosec<<=1; // (multiply by 2)
187 unsigned long sec,nsec;
188 omni_thread::get_time(&sec,&nsec,0,sleepTimeNanosec);
189 _condition.timedwait(sec,nsec);
190 }
191 else
192 {
193 // There is nothing to do, so block until a new event arrives.
194 _condition.wait();
195 }
196
197 }
198 catch (CORBA::SystemException& ex) {
199 DB(2,"ProxyPushSupplierManager ignoring CORBA system exception"
200 IF_OMNIORB4(": "<<ex._name()<<" ("<<NP_MINORSTRING(ex)<<")") ".")
201 }
202 catch (CORBA::Exception& ex) {
203 DB(2,"ProxyPushSupplierManager ignoring CORBA exception"
204 IF_OMNIORB4(": "<<ex._name()<<) ".")
205 }
206 catch(...) {
207 DB(2,"ProxyPushSupplierManager thread killed by unknown exception.")
208 break;
209 }
210 }
211 return NULL;
212}
213
215{
216#if OMNIEVENTS__DEBUG_REF_COUNTS
217 DB(20,"ProxyPushSupplierManager::_add_ref()")
218#endif
220 ++_refCount;
221}
222
224{
225#if OMNIEVENTS__DEBUG_REF_COUNTS
226 DB(20,"ProxyPushSupplierManager::_remove_ref()")
227#endif
228 int myref;
229 {
230 PauseThenWake p(this);
231 myref = --_refCount;
232 }
233 if(myref<0)
234 {
235 DB(2,"ProxyPushSupplierManager has negative ref count! "<<myref)
236 }
237 else if(myref==0)
238 {
239 DB(15,"ProxyPushSupplierManager has zero ref count -- shutdown.")
240 join(NULL);
241 }
242}
243
244
245//
246// ProxyPushSupplier_i
247//
248
250 CosEventComm::PushConsumer_ptr pushConsumer)
251{
252 if(CORBA::is_nil(pushConsumer))
253 throw CORBA::BAD_PARAM();
254 if(!CORBA::is_nil(_target) || !CORBA::is_nil(_req))
255 throw CosEventChannelAdmin::AlreadyConnected();
256 _target=CosEventComm::PushConsumer::_duplicate(pushConsumer);
257
258 // Test to see whether pushSupplier is a ProxyPushSupplier.
259 // If so, then we will aggressively try to reconnect, when we are reincarnated
260 CORBA::Request_var req =_target->_request("_is_a");
261 req->add_in_arg() <<= CosEventChannelAdmin::_tc_ProxyPushConsumer->id();
262 req->set_return_type(CORBA::_tc_boolean);
263 req->send_deferred();
264 Orb::inst().deferredRequest(req._retn(),this); // Register for callback
265
267 {
268 WriteLock log;
269 output(log.os);
270 }
271}
272
273
275{
276 DB(5,"ProxyPushSupplier_i::disconnect_push_supplier()");
277 eraseKey("ConsumerAdmin/ProxyPushSupplier");
279 if(CORBA::is_nil(_target))
280 {
281 throw CORBA::OBJECT_NOT_EXIST(
282 IFELSE_OMNIORB4(omni::OBJECT_NOT_EXIST_NoMatch,0),
283 CORBA::COMPLETED_NO
284 );
285 }
286 else
287 {
288 CORBA::Request_var req=_target->_request("disconnect_push_consumer");
289 _target=CosEventComm::PushConsumer::_nil();
290 req->send_deferred();
291 Orb::inst().deferredRequest(req._retn());
292 }
293}
294
295
297 PortableServer::POA_ptr poa,
299)
300: Proxy(poa),
301 EventQueue::Reader(q),
302 _target(CosEventComm::PushConsumer::_nil()),
303 _targetIsProxy(false)
304{
305 // pass
306}
307
309{
310 DB(20,"~ProxyPushSupplier_i()")
311}
312
314
315inline void ProxyPushSupplier_i::trigger(bool& busy, bool& waiting)
316{
317 if(!CORBA::is_nil(_req) && _req->poll_response()) // response has arrived
318 {
319 CORBA::Environment_ptr env=_req->env(); // No need to free environment.
320 if(!CORBA::is_nil(env) && env->exception())
321 {
322 // Shut down the connection
323 CORBA::Exception* ex =env->exception(); // No need to free exception.
324 DB(10,"ProxyPushSupplier got exception" IF_OMNIORB4(": "<<ex->_name()) );
325 Orb::inst().reportObjectFailure(HERE,_target.in(),ex);
326 _req=CORBA::Request::_nil();
327
328 // Try to notify the Consumer that the connection is closing.
329 CORBA::Request_var req=_target->_request("disconnect_push_consumer");
330 req->send_deferred();
331 Orb::inst().deferredRequest(req._retn());
332
333 _target=CosEventComm::PushConsumer::_nil(); // disconnected.
334 eraseKey("ConsumerAdmin/ProxyPushSupplier");
335 deactivateObject();
336 return; // No more work to do
337 }
338 _req=CORBA::Request::_nil();
339 busy=true;
340 }
341 if(CORBA::is_nil(_req) && !CORBA::is_nil(_target) && moreEvents())
342 {
343 _req=_target->_request("push");
344 _req->add_in_arg() <<= *(nextEvent());
345 _req->send_deferred();
346 busy=true;
347 }
348 if(!CORBA::is_nil(_req)) // More work to do, if _req NOT nil.
349 waiting=true;
350}
351
352
353void ProxyPushSupplier_i::callback(CORBA::Request_ptr req)
354{
356 {
357 // There should only ever be one of these callbacks per proxy,
358 // because each proxy should only be connected once.
359 DB(2,"WARNING: Multiple connections to ProxyPushSupplier.");
360 }
361 else if(req->return_value()>>=CORBA::Any::to_boolean(_targetIsProxy))
362 {
364 {
365 WriteLock log;
366 output(log.os);
367 DB(15,"ProxyPushSupplier is federated.");
368 }
369 }
370 else
371 {
372 DB(2,"ProxyPushSupplier got unexpected callback.");
373 _targetIsProxy=false; // Reset it just to be sure.
374 }
375}
376
377
379 const string& oid,
380 const PersistNode& node
381)
382{
383 try
384 {
385 using namespace CosEventChannelAdmin;
386
387 string ior( node.attrString("IOR").c_str() );
388 CosEventComm::PushConsumer_var pushConsumer =
390 // Do not activate until we know that we have read a valid target.
391 activateObjectWithId(oid.c_str());
392 _remove_ref();
393 _target=pushConsumer._retn();
394 _targetIsProxy=bool(node.attrLong("proxy"));
395
396 // If pushConsumer is a proxy, then try to reconnect.
398 {
399 DB(15,"Attempting to reconnect ProxyPushSupplier: "<<oid.c_str())
400 // This will only work if the proxy is implemented in the same way as
401 // omniEvents, so connect_() automatically creates a proxy.
404 CosEventComm::PushSupplier_var thisSupp =_this();
405 proxyCons->connect_push_supplier(thisSupp);
406 DB(7,"Reconnected ProxyPushSupplier: "<<oid.c_str())
407 }
408 }
409 catch(CosEventChannelAdmin::AlreadyConnected&){ // connect_push_supplier()
410 // The supplier doesn't need to be reconnected.
411 DB(7,"Remote ProxyPushConsumer already connected: "<<oid.c_str())
412 }
413 catch(CosEventChannelAdmin::TypeError&){ // connect_push_supplier()
414 // Don't know what to make of this...
415 DB(2,"Remote ProxyPushConsumer threw TypeError: "<<oid.c_str())
416 }
417 catch(CORBA::OBJECT_NOT_EXIST&) {} // object 'pushConsumer' not responding.
418 catch(CORBA::TRANSIENT& ) {} // object 'pushConsumer' not responding.
419 catch(CORBA::COMM_FAILURE& ) {} // object 'pushConsumer' not responding.
420}
421
422
424{
426 os,"ConsumerAdmin/ProxyPushSupplier",
427 _target.in(),
428 _targetIsProxy? " proxy=1": NULL
429 );
430}
431
432
433}; // end namespace OmniEvents
#define HERE
Generates a string literal that describes the filename and line number.
#define OMNIEVENTS__DEBUG_REF_COUNTS__DEFN(C)
Defines debug versions of _add/remove_ref() for class C.
Definition Servant.h:70
#define IFELSE_OMNIORB4(omniORB4_code, default_code)
Definition Orb.h:45
#define DB(l, x)
Definition Orb.h:49
#define IF_OMNIORB4(omniORB4_code)
Definition Orb.h:46
#define NP_MINORSTRING(systemException)
Definition Orb.h:52
T::_ptr_type createNarrowedReference(PortableServer::POA_ptr poa, const char *repositoryId)
Helper method that creates a new CORBA object and then narrows it to the appropriate type.
Definition Servant.h:96
The EventQueue is a circular buffer, that contains _size-1 events.
Definition EventQueue.h:57
static bool exists()
Library code may create Event Service objects without the need for persistency.
Obtains an output stream to the active persistancy logfile, and locks it for exclusive access.
void deferredRequest(CORBA::Request_ptr req, Callback *callback=NULL)
Adopts the request and then stores it in _deferredRequests.
Definition Orb.cc:187
void reportObjectFailure(const char *here, CORBA::Object_ptr obj, CORBA::Exception *ex)
Called by omniEvents when an object has failed (fatal exception).
Definition Orb.cc:204
static Orb & inst()
Definition Orb.h:81
Base class for ServantActivator classes that manage Proxy servants.
void activate(const char *name)
Creates the Proxy-type's POA, and registers this object as its ServantManager.
set< Proxy * > _servants
The set of all active Proxies in this object's _managedPoa.
PortableServer::POA_var _managedPoa
The POA owned & managed by this object.
Base class for three of the four Proxy servants.
void basicOutput(ostream &os, const char *name, CORBA::Object_ptr target=CORBA::Object::_nil(), const char *extraAttributes=NULL)
Helper method for constructing persistency output.
CORBA::Request_var _req
void eraseKey(const char *name)
Helper method for constructing persistency output.
The opposite of omni_mutex_lock, unlocks the mutex upon construction and re-locks it upon destruction...
omni_mutex_kcol & operator=(const omni_mutex_kcol &)
omni_mutex_kcol(const omni_mutex_kcol &)
ProxyPushSupplierManager(PortableServer::POA_ptr parentPoa, EventQueue &q)
void disconnect()
Send disconnect_push_consumer() to all connected PushConsumers.
CosEventChannelAdmin::ProxyPushSupplier_ptr createObject()
void etherealize(const PortableServer::ObjectId &oid, PortableServer::POA_ptr adapter, PortableServer::Servant serv, CORBA::Boolean cleanup_in_progress, CORBA::Boolean remaining_activations)
Pauses the thread, and then calls the parent's implementation.
PortableServer::Servant incarnate(const PortableServer::ObjectId &oid, PortableServer::POA_ptr poa)
void _remove_ref()
Shutdown the thread when refCount reaches zero.
Helper class that locks ProxyPushSupplier upon construction, and wakes it up on destruction.
CosEventComm::PushConsumer_var _target
void connect_push_consumer(CosEventComm::PushConsumer_ptr pushConsumer)
ProxyPushSupplier_i(PortableServer::POA_ptr poa, EventQueue &q)
OMNIEVENTS__DEBUG_REF_COUNTS__DECL void trigger(bool &busy, bool &waiting)
Sets 'busy' if some work was done.
void reincarnate(const string &oid, const PersistNode &node)
Re-create a servant from information saved in the log file.
bool _targetIsProxy
TRUE if _target is a ProxyPushConsumer.
void output(ostream &os)
Save this object's state to a stream.
void callback(CORBA::Request_ptr req)
Sets _targetIsProxy, if it is.
void activateObjectWithId(const char *oidStr)
Calls activate_object_with_id() to activate this servant in its POA.
Definition Servant.cc:125
void deactivateObject()
Calls deactivate_object() to deactivate this servant in its POA.
Definition Servant.cc:160