OmniEvents
ProxyPushConsumer.cc
Go to the documentation of this file.
1// Package : omniEvents
2// ProxyPushConsumer.cc Created : 2003/12/04
3// Author : Alex Tingle
4//
5// Copyright (C) 2003,2005 Alex Tingle.
6//
7// This file is part of the omniEvents application.
8//
9// omniEvents is free software; you can redistribute it and/or
10// modify it under the terms of the GNU Lesser General Public
11// License as published by the Free Software Foundation; either
12// version 2.1 of the License, or (at your option) any later version.
13//
14// omniEvents is distributed in the hope that it will be useful,
15// but WITHOUT ANY WARRANTY; without even the implied warranty of
16// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17// Lesser General Public License for more details.
18//
19// You should have received a copy of the GNU Lesser General Public
20// License along with this library; if not, write to the Free Software
21// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
22//
23
24#include "ProxyPushConsumer.h"
25#include "ConsumerAdmin.h"
26#include "Orb.h"
27#include "omniEventsLog.h"
28#include "PersistNode.h"
29
30#include <assert.h>
31
32namespace OmniEvents {
33
35 CosEventComm::PushSupplier_ptr pushSupplier)
36{
37 // pushSupplier is permitted to be nil.
38 if(CORBA::is_nil(pushSupplier))
39 return;
40
41 string oidstr =currentObjectId();
42 Connections_t::iterator pos =_connections.find(oidstr);
43
44 if(pos!=_connections.end())
45 throw CosEventChannelAdmin::AlreadyConnected();
46
48 new Connection(
49 _channelName.in(),
50 oidstr,
51 CosEventComm::PushSupplier::_duplicate(pushSupplier)
52 );
53 _connections.insert( Connections_t::value_type(oidstr,newConnection) );
54
55 // Test to see whether pushSupplier is a ProxyPushSupplier.
56 // If so, then we will aggressively try to reconnect, when we are reincarnated
57 CORBA::Request_var req =pushSupplier->_request("_is_a");
58 req->add_in_arg() <<= CosEventChannelAdmin::_tc_ProxyPushSupplier->id();
59 req->set_return_type(CORBA::_tc_boolean);
60 req->send_deferred();
61 Orb::inst().deferredRequest(req._retn(),newConnection); // Register callback
62
64 {
65 WriteLock log;
66 newConnection->output(log.os);
67 }
68}
69
70
72{
73#ifdef HAVE_OMNIORB4
74 DB(5,"ProxyPushConsumer_i::disconnect_push_consumer()")
75 string oidstr =currentObjectId();
76 Connections_t::iterator pos =_connections.find(oidstr);
77
78 if(pos!=_connections.end())
79 {
80 CORBA::Request_var req =
81 pos->second->_target->_request("disconnect_push_supplier");
82 pos->second->_remove_ref();
83 _connections.erase(pos);
84 // The following line could result in a reentrant callback, if this call was
85 // not made through the POA => must erase the connection BEFORE this point.
86 req->send_deferred();
87 Orb::inst().deferredRequest(req._retn());
89 {
90 // Erase this connection from the log file.
91 WriteLock log;
92 log.os<<"-ecf/"<<_channelName.in();
93 log.os<<"/SupplierAdmin/ProxyPushConsumer/"<<oidstr<<'\n';
94 }
95 }
96#else /* Silently ignore disconnects with omniORB3 */
97 DB(5,"Ignoring disconnect_push_consumer(). Upgrade to omniORB4!")
98#endif
99}
100
101
102void ProxyPushConsumer_i::push(const CORBA::Any& event)
103{
104#ifdef OMNIEVENTS_REAL_TIME_PUSH
105 if(!_useLocalQueue)
106 {
107 _consumerAdmin.send(new CORBA::Any(event));
108 _useLocalQueue=true;
109 }
110 else
111#endif
112 _queue.push_back(new CORBA::Any(event));
113}
114
115
117 PortableServer::POA_ptr p,
119 ConsumerAdmin_i& consumerAdmin
120)
121: Servant(PortableServer::POA::_nil()),
122 _connections(),
123 _channelName(p->the_name()),
124 _consumerAdmin(consumerAdmin),
125 _queue(q),
126 _useLocalQueue(false)
127{
128 _consumerAdmin._add_ref();
129
130 using namespace PortableServer;
131
132 // POLICIES:
133 // Lifespan =PERSISTENT // we can persist
134 // Assignment =USER_ID // write our own oid
135 // Uniqueness =MULTIPLE_ID // only one servant
136 // ImplicitActivation=NO_IMPLICIT_ACTIVATION // disable auto activation
137 // RequestProcessing =USE_DEFAULT_SERVANT // only one servant
138 // ServantRetention =NON_RETAIN // stateless POA
139 // Thread =SINGLE_THREAD_MODEL // keep it simple
140
141 CORBA::PolicyList policies;
142 policies.length(7);
143 policies[0]=p->create_lifespan_policy(PERSISTENT);
144 policies[1]=p->create_id_assignment_policy(USER_ID);
145 policies[2]=p->create_id_uniqueness_policy(MULTIPLE_ID);
146 policies[3]=p->create_implicit_activation_policy(NO_IMPLICIT_ACTIVATION);
147 policies[4]=p->create_request_processing_policy(USE_DEFAULT_SERVANT);
148 policies[5]=p->create_servant_retention_policy(NON_RETAIN);
149 policies[6]=p->create_thread_policy(SINGLE_THREAD_MODEL);
150
151 try
152 {
153 // Create a POA for this proxy type in this channel.
154 string poaName =string(_channelName.in())+".ProxyPushConsumer";
155 POAManager_var parentManager =p->the_POAManager();
156 _poa=p->create_POA(poaName.c_str(),parentManager.in(),policies);
157 }
158 catch(POA::AdapterAlreadyExists&) // create_POA
159 {
160 DB(0,"ProxyPushConsumer_i::ProxyPushConsumer_i() - "
161 "POA::AdapterAlreadyExists")
162 }
163 catch(POA::InvalidPolicy& ex) // create_POA
164 {
165 DB(0,"ProxyPushConsumer_i::ProxyPushConsumer_i() - "
166 "POA::InvalidPolicy: "<<ex.index)
167 }
168
169 // Destroy the policy objects (Not strictly necessary in omniORB)
170 for(CORBA::ULong i=0; i<policies.length(); ++i)
171 policies[i]->destroy();
172
173 // This object is the POA's default servant.
174 _poa->set_servant(this);
175}
176
177
179{
180 DB(20,"~ProxyPushConsumer_i()")
181 for(Connections_t::iterator i =_connections.begin();
182 i!=_connections.end();
183 ++i)
184 {
185 i->second->_remove_ref();
186 }
187 _connections.clear();
188
189 _consumerAdmin._remove_ref();
190}
191
192
193CosEventChannelAdmin::ProxyPushConsumer_ptr
195{
197 _poa.in(),
198 CosEventChannelAdmin::_tc_ProxyPushConsumer->id()
199 );
200}
201
202
204{
205 // Note. We are (probably) in the EventChannel's thread.
206 Connections_t::iterator curr,next=_connections.begin();
207 while(next!=_connections.end())
208 {
209 curr=next++;
210 CORBA::Request_var req =
211 curr->second->_target->_request("disconnect_push_supplier");
212 curr->second->_remove_ref();
213 _connections.erase(curr);
214 // The following line could result in a reentrant callback
215 // => must erase the connection BEFORE this point.
216 req->send_deferred();
217 Orb::inst().deferredRequest(req._retn());
218 }
219}
220
221
223{
224 // Reincarnate all connections from node's children.
225 for(map<string,PersistNode*>::const_iterator i=node._child.begin();
226 i!=node._child.end();
227 ++i)
228 {
229 const char* oidstr =i->first.c_str();
230 string ior( i->second->attrString("IOR") );
231 bool isProxy( i->second->attrLong("proxy") );
233 try
234 {
235 using namespace CosEventComm;
236 using namespace CosEventChannelAdmin;
237
239 _connections.insert(Connections_t::value_type(
240 oidstr,
241 new Connection(_channelName.in(),oidstr,supp._retn(),isProxy)
242 ));
243 DB(5,"Reincarnated ProxyPushConsumer: "<<oidstr)
244
245 // If supp is a ProxyPushSupplier, then try to reconnect.
246 if(isProxy)
247 {
248 DB(15,"Attempting to reconnect ProxyPushConsumer: "<<oidstr)
249 // This will only work if the proxy is implemented in the same way as
250 // omniEvents, so connect_() automatically creates a proxy.
253 PortableServer::ObjectId_var objectId =
254 PortableServer::string_to_ObjectId(oidstr);
255 CORBA::Object_var obj =
256 _poa->create_reference_with_id(
257 objectId.in(),
258 CosEventChannelAdmin::_tc_ProxyPushConsumer->id()
259 );
260 PushConsumer_var thisCons =CosEventComm::PushConsumer::_narrow(obj);
261 proxySupp->connect_push_consumer(thisCons.in());
262 DB(7,"Reconnected ProxyPushConsumer: "<<oidstr)
263 }
264 }
265 catch(CORBA::BAD_PARAM&) {
266 // This will happen when IOR fails to narrow.
267 DB(5,"Failed to reincarnate ProxyPushConsumer: "<<oidstr)
268 }
269 catch(CosEventChannelAdmin::AlreadyConnected&){ //connect_push_consumer()
270 // The supplier doesn't need to be reconnected.
271 DB(7,"Remote ProxyPushSupplier already connected: "<<oidstr)
272 }
273 catch(CosEventChannelAdmin::TypeError&){ // connect_push_consumer()
274 // Don't know what to make of this...
275 DB(2,"Remote ProxyPushSupplier threw TypeError: "<<oidstr)
276 }
277 catch(CORBA::OBJECT_NOT_EXIST&) {} // object 'supp' not responding.
278 catch(CORBA::TRANSIENT& ) {} // object 'supp' not responding.
279 catch(CORBA::COMM_FAILURE& ) {} // object 'supp' not responding.
280 } // end loop for(i)
281}
282
283
285{
286 for(Connections_t::const_iterator i=_connections.begin();
287 i!=_connections.end();
288 ++i)
289 {
290 i->second->output(os);
291 }
292}
293
294
296{
297#ifdef HAVE_OMNIORB4
298 try
299 {
300 using namespace PortableServer;
301 ObjectId_var oid =Orb::inst()._POACurrent->get_object_id();
302 CORBA::String_var oidStr =ObjectId_to_string(oid.in());
303 return string(oidStr.in());
304 }
305 catch(PortableServer::Current::NoContext&) // get_object_id()
306 {
307 DB(0,"No context!!")
308 }
309 catch(CORBA::BAD_PARAM&) // ObjectId_to_string()
310 {
311 // Should never get here in omniORB, because ObjectID is a char*.
312 assert(0);
313 }
314 return "ERROR";
315#else
316 throw CORBA::NO_IMPLEMENT();
317#endif
318}
319
320
321//
322// ProxyPushConsumer_i::Connection
323//
324
325#if OMNIEVENTS__DEBUG_SERVANT
326int ProxyPushConsumer_i::Connection::_objectCount =0;
327#endif
328
330 const char* channelName,
331 const string& oidstr,
332 CosEventComm::PushSupplier_ptr pushSupplier,
333 bool isProxy
334):Callback(),
335 _channelName(channelName),
336 _oidstr(oidstr),
337 _target(pushSupplier),
338 _targetIsProxy(isProxy)
339{
340#if OMNIEVENTS__DEBUG_SERVANT
341 ++_objectCount;
342 DB(21,"ProxyPushConsumer_i::Connection::Connection() count="<<_objectCount)
343#endif
344}
345
347{
348#if OMNIEVENTS__DEBUG_SERVANT
349 --_objectCount;
350 DB(20,"ProxyPushConsumer_i::Connection::~Connection() count="<<_objectCount)
351#else
352 DB(20,"ProxyPushConsumer_i::Connection::~Connection()")
353#endif
354}
355
357
359{
360 bool save =_targetIsProxy;
361 if(req->return_value()>>=CORBA::Any::to_boolean(_targetIsProxy))
362 {
363 if(_targetIsProxy && omniEventsLog::exists())
364 {
365 WriteLock log;
366 output(log.os);
367 DB(15,"ProxyPushConsumer is federated.");
368 }
369 }
370 else
371 {
372 DB(2,"ProxyPushConsumer got unexpected callback.");
373 _targetIsProxy=save; // Reset it just to be sure.
374 }
375}
376
378{
379 os<<"ecf/"<<_channelName;
380 os<<"/SupplierAdmin/ProxyPushConsumer/"<<_oidstr;
381
382 if(!CORBA::is_nil(_target.in()))
383 {
384 CORBA::String_var iorstr;
385 iorstr = Orb::inst()._orb->object_to_string(_target.in());
386 os<<" IOR="<<iorstr.in();
387 if(_targetIsProxy)
388 os<<" proxy=1";
389 }
390 os<<" ;;\n";
391}
392
393
394}; // end namespace OmniEvents
#define OMNIEVENTS__DEBUG_REF_COUNTS__DEFN(C)
Defines debug versions of _add/remove_ref() for class C.
Definition Servant.h:70
#define DB(l, x)
Definition Orb.h:49
T::_ptr_type createNarrowedReference(PortableServer::POA_ptr poa, const char *repositoryId)
Helper method that creates a new CORBA object and then narrows it to the appropriate type.
Definition Servant.h:96
Interface for classes that wish to receive callbacks from deferred requests.
Definition Callback.h:46
OMNIEVENTS__DEBUG_REF_COUNTS__DECL void send(CORBA::Any *event)
Queues a single event for sending to consumers.
static bool exists()
Library code may create Event Service objects without the need for persistency.
Obtains an output stream to the active persistancy logfile, and locks it for exclusive access.
CORBA::ORB_var _orb
Definition Orb.h:88
void deferredRequest(CORBA::Request_ptr req, Callback *callback=NULL)
Adopts the request and then stores it in _deferredRequests.
Definition Orb.cc:187
static Orb & inst()
Definition Orb.h:81
Default servant for ProxyPushConsumer objects.
ProxyPushConsumer_i(PortableServer::POA_ptr parentPoa, list< CORBA::Any * > &q, ConsumerAdmin_i &consumerAdmin)
void disconnect_push_consumer()
We may not have a record of the supplier, so this method must accept calls from any supplier without ...
void push(const CORBA::Any &event)
Accepts events from any supplier, not just those stored in _connections.
void output(ostream &os) const
Save this object's state to a stream.
CosEventChannelAdmin::ProxyPushConsumer_ptr createObject()
Constructs a new object.
bool _useLocalQueue
Switch between RT/chunked modes.
void reincarnate(const PersistNode &node)
Re-create all servants from information saved in the log file.
void disconnect()
Send disconnect_push_supplier() to all connected PushSuppliers.
void connect_push_supplier(CosEventComm::PushSupplier_ptr pushSupplier)
If pushSupplier is provided, then it is stored in _connections.
void output(ostream &os) const
Save this object's state to a stream.
Base class for servants.
Definition Servant.h:114
PortableServer::POA_var _poa
Definition Servant.h:131