OmniEvents
ProxyPullConsumer.cc
Go to the documentation of this file.
1// Package : omniEvents
2// ProxyPullConsumer.cc Created : 2003/12/04
3// Author : Alex Tingle
4//
5// Copyright (C) 2003-2005 Alex Tingle.
6//
7// This file is part of the omniEvents application.
8//
9// omniEvents is free software; you can redistribute it and/or
10// modify it under the terms of the GNU Lesser General Public
11// License as published by the Free Software Foundation; either
12// version 2.1 of the License, or (at your option) any later version.
13//
14// omniEvents is distributed in the hope that it will be useful,
15// but WITHOUT ANY WARRANTY; without even the implied warranty of
16// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17// Lesser General Public License for more details.
18//
19// You should have received a copy of the GNU Lesser General Public
20// License along with this library; if not, write to the Free Software
21// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
22//
23
24#include "ProxyPullConsumer.h"
25#include "Orb.h"
26#include "omniEventsLog.h"
27#include "PersistNode.h"
28#include <assert.h>
29
30namespace OmniEvents {
31
32//
33// ProxyPullConsumerManager
34//
35
36PortableServer::Servant
38 const PortableServer::ObjectId& oid,
39 PortableServer::POA_ptr poa
40)
41{
42 DB(20,"ProxyPullConsumerManager::incarnate()")
44 _servants.insert(result);
45 return result;
46}
47
49 PortableServer::POA_ptr parentPoa,
50 list<CORBA::Any*>& q
51)
52: ProxyManager(parentPoa),
53 _queue(q)
54{
55 ProxyManager::activate("ProxyPullConsumer");
56}
57
59{
60 DB(20,"~ProxyPullConsumerManager()")
61}
62
64
65CosEventChannelAdmin::ProxyPullConsumer_ptr
67{
69 _managedPoa.in(),
70 CosEventChannelAdmin::_tc_ProxyPullConsumer->id()
71 );
72}
73
75{
76 // Collect events from each servant in turn.
77 for(set<Proxy*>::iterator i =_servants.begin(); i!=_servants.end(); ++i)
78 {
80 proxy->collect();
81 }
82}
83
85{
86 // Trigger each servant in turn.
87 for(set<Proxy*>::iterator i =_servants.begin(); i!=_servants.end(); ++i)
88 {
91 }
92}
93
95{
96 for(set<Proxy*>::iterator i =_servants.begin(); i!=_servants.end(); ++i)
97 {
98 Proxy* p =*i; // Sun's CC requires this temporary.
100 // We are in the EventChannel's thread.
101 // Make sure all calls go though the ProxyPullConsumer POA.
102 CosEventChannelAdmin::ProxyPullConsumer_var ppcv =ppc->_this();
104 }
105}
106
107
108//
109// ProxyPullConsumer_i
110//
111
112// CORBA interface methods
113
115 CosEventComm::PullSupplier_ptr pullSupplier
116)
117{
118 if(CORBA::is_nil(pullSupplier))
119 throw CORBA::BAD_PARAM();
120 if(!CORBA::is_nil(_target) || !CORBA::is_nil(_req))
121 throw CosEventChannelAdmin::AlreadyConnected();
122 _target=CosEventComm::PullSupplier::_duplicate(pullSupplier);
123
125 {
126 WriteLock log;
127 output(log.os);
128 }
129}
130
132{
133 DB(5,"ProxyPullConsumer_i::disconnect_pull_consumer()");
134 eraseKey("SupplierAdmin/ProxyPullConsumer");
136 if(CORBA::is_nil(_target))
137 {
138 throw CORBA::OBJECT_NOT_EXIST(
139 IFELSE_OMNIORB4(omni::OBJECT_NOT_EXIST_NoMatch,0),
140 CORBA::COMPLETED_NO
141 );
142 }
143 else
144 {
145 CORBA::Request_var req=_target->_request("disconnect_pull_supplier");
146 _target=CosEventComm::PullSupplier::_nil();
147 req->send_deferred();
148 Orb::inst().deferredRequest(req._retn());
149 }
150}
151
152//
153
155 PortableServer::POA_ptr poa,
157)
158: Proxy(poa),
159 _target(CosEventComm::PullSupplier::_nil()),
160 _queue(q),
161 _mode(Pull), // Prefer 'pull' method calls.
162 _exceptionCount(0)
163{}
164
166{
167 DB(20,"~ProxyPullConsumer_i()")
168}
169
171{
172 if(!CORBA::is_nil(_req) && _req->poll_response())
173 {
174 const char* opname =_req->operation();
175 assert(opname);
176 CORBA::Environment_ptr env =_req->env(); // No need to release environment.
177
178 if(!CORBA::is_nil(env) && env->exception())
179 {
180 CORBA::Exception* ex =env->exception(); // No need to free exception.
181 DB(10,"ProxyPullConsumer got exception"
182 IF_OMNIORB4(<<": "<<ex->_name())<<", op:"<<opname);
183 if(0==strcmp("pull",opname) || 0==strcmp("try_pull",opname))
184 {
186 _mode=( _mode==Pull? TryPull: Pull ); // Try something else next time.
187 }
188 else
189 DB(2,"Ignoring unrecognised response. operation:"<<opname);
190 if(_exceptionCount>=4)
191 {
193
194 // Try to notify the Supplier that the connection is closing.
195 CORBA::Request_var req=_target->_request("disconnect_pull_supplier");
196 req->send_deferred();
197 Orb::inst().deferredRequest(req._retn());
198
199 _target=CosEventComm::PullSupplier::_nil(); // disconnected
200 eraseKey("SupplierAdmin/ProxyPullConsumer");
202 }
203 }
204 else
205 {
206 // Do we have an event?
207 bool hasEvent=false;
208 if(0==strcmp("pull",opname))
209 {
210 hasEvent=true;
211 }
212 else if(0==strcmp("try_pull",opname))
213 {
214 CORBA::NVList_ptr args=_req->arguments(); // No need to release args.
215 if(args->count()==1)
216 {
217 CORBA::NamedValue_var hasEventArg=args->item(0);
218 if(0==strcmp(hasEventArg->name(),"has_event"))
219 {
220 CORBA::Any* a =hasEventArg->value();
221 CORBA::Boolean b;
222 CORBA::Any::to_boolean tb(b); //MS VC++6 is on drugs!
223 hasEvent=(((*a)>>=tb) && b);
224 }
225 }
226 }
227 // Pick up an event, if we have one.
228 if(hasEvent)
229 {
230 CORBA::Any* event =new CORBA::Any();
231 _req->return_value() >>= (*event);
232 _queue.push_back(event);
233 }
234 // Reset the exception count.
236 }
237 _req=CORBA::Request::_nil();
238 }
239} // ProxyPullConsumer_i::end collect()
240
242{
243 if(CORBA::is_nil(_req) && !CORBA::is_nil(_target))
244 {
245 switch(_mode)
246 {
247 case Pull:
248 _req=_target->_request("pull");
249 break;
250 case TryPull:
251 _req=_target->_request("try_pull");
252 _req->add_out_arg("has_event")<<=CORBA::Any::from_boolean(1);
253 break;
254 default:
255 assert(0);
256 }
257 _req->set_return_type(CORBA::_tc_any);
258 _req->send_deferred();
259 }
260}
261
263 const string& oid,
264 const PersistNode& node
265)
266{
267 CosEventComm::PullSupplier_var pullSupplier =
268 string_to_<CosEventComm::PullSupplier>(node.attrString("IOR").c_str());
269 // Do not activate until we know that we have read a valid target.
270 activateObjectWithId(oid.c_str());
272}
273
275{
276 basicOutput(os,"SupplierAdmin/ProxyPullConsumer",_target.in());
277}
278
279}; // end namespace OmniEvents
#define HERE
Generates a string literal that describes the filename and line number.
#define OMNIEVENTS__DEBUG_REF_COUNTS__DEFN(C)
Defines debug versions of _add/remove_ref() for class C.
Definition Servant.h:70
#define IFELSE_OMNIORB4(omniORB4_code, default_code)
Definition Orb.h:45
#define DB(l, x)
Definition Orb.h:49
#define IF_OMNIORB4(omniORB4_code)
Definition Orb.h:46
T::_ptr_type createNarrowedReference(PortableServer::POA_ptr poa, const char *repositoryId)
Helper method that creates a new CORBA object and then narrows it to the appropriate type.
Definition Servant.h:96
static bool exists()
Library code may create Event Service objects without the need for persistency.
Obtains an output stream to the active persistancy logfile, and locks it for exclusive access.
void deferredRequest(CORBA::Request_ptr req, Callback *callback=NULL)
Adopts the request and then stores it in _deferredRequests.
Definition Orb.cc:187
void reportObjectFailure(const char *here, CORBA::Object_ptr obj, CORBA::Exception *ex)
Called by omniEvents when an object has failed (fatal exception).
Definition Orb.cc:204
static Orb & inst()
Definition Orb.h:81
Base class for ServantActivator classes that manage Proxy servants.
void activate(const char *name)
Creates the Proxy-type's POA, and registers this object as its ServantManager.
set< Proxy * > _servants
The set of all active Proxies in this object's _managedPoa.
PortableServer::POA_var _managedPoa
The POA owned & managed by this object.
Base class for three of the four Proxy servants.
void basicOutput(ostream &os, const char *name, CORBA::Object_ptr target=CORBA::Object::_nil(), const char *extraAttributes=NULL)
Helper method for constructing persistency output.
CORBA::Request_var _req
void eraseKey(const char *name)
Helper method for constructing persistency output.
void disconnect()
Send disconnect_pull_supplier() to all connected PullSuppliers.
void triggerRequest()
For each connected proxy, if there is no request in progress, send a new request to the current opera...
void collect()
Collects events that have arrived at connected proxies.
PortableServer::Servant incarnate(const PortableServer::ObjectId &oid, PortableServer::POA_ptr poa)
OMNIEVENTS__DEBUG_REF_COUNTS__DECL CosEventChannelAdmin::ProxyPullConsumer_ptr createObject()
ProxyPullConsumerManager(PortableServer::POA_ptr parentPoa, list< CORBA::Any * > &q)
Implementation of the ProxyPullConsumer interface.
void output(ostream &os)
Save this object's state to a stream.
CosEventComm::PullSupplier_var _target
int _exceptionCount
Only when two consecutive exceptions have been received from each mode, do we consider the connection...
void connect_pull_supplier(CosEventComm::PullSupplier_ptr pullSupplier)
ProxyPullConsumer_i(PortableServer::POA_ptr poa, list< CORBA::Any * > &q)
void reincarnate(const string &oid, const PersistNode &node)
Re-create a servant from information saved in the log file.
void triggerRequest()
When _req is NIL, sends out a new pull() or try_pull() call.
void collect()
Collects responses since the last trigger.
void activateObjectWithId(const char *oidStr)
Calls activate_object_with_id() to activate this servant in its POA.
Definition Servant.cc:125
void deactivateObject()
Calls deactivate_object() to deactivate this servant in its POA.
Definition Servant.cc:160