45 throw CosEventChannelAdmin::AlreadyConnected();
58 req->add_in_arg() <<= CosEventChannelAdmin::_tc_ProxyPushSupplier->id();
59 req->set_return_type(CORBA::_tc_boolean);
74 DB(5,
"ProxyPushConsumer_i::disconnect_push_consumer()")
80 CORBA::Request_var
req =
81 pos->second->_target->_request(
"disconnect_push_supplier");
82 pos->second->_remove_ref();
93 log.
os<<
"/SupplierAdmin/ProxyPushConsumer/"<<
oidstr<<
'\n';
97 DB(5,
"Ignoring disconnect_push_consumer(). Upgrade to omniORB4!")
104#ifdef OMNIEVENTS_REAL_TIME_PUSH
117 PortableServer::POA_ptr
p,
124 _consumerAdmin(consumerAdmin),
126 _useLocalQueue(
false)
130 using namespace PortableServer;
158 catch(POA::AdapterAlreadyExists&)
160 DB(0,
"ProxyPushConsumer_i::ProxyPushConsumer_i() - "
161 "POA::AdapterAlreadyExists")
163 catch(POA::InvalidPolicy&
ex)
165 DB(0,
"ProxyPushConsumer_i::ProxyPushConsumer_i() - "
166 "POA::InvalidPolicy: "<<
ex.index)
170 for(CORBA::ULong i=0; i<
policies.length(); ++i)
174 _poa->set_servant(
this);
180 DB(20,
"~ProxyPushConsumer_i()")
185 i->second->_remove_ref();
193CosEventChannelAdmin::ProxyPushConsumer_ptr
198 CosEventChannelAdmin::_tc_ProxyPushConsumer->id()
210 CORBA::Request_var
req =
211 curr->second->_target->_request(
"disconnect_push_supplier");
212 curr->second->_remove_ref();
216 req->send_deferred();
225 for(map<string,PersistNode*>::const_iterator i=
node._child.begin();
226 i!=
node._child.end();
229 const char*
oidstr =i->first.c_str();
230 string ior( i->second->attrString(
"IOR") );
231 bool isProxy( i->second->attrLong(
"proxy") );
243 DB(5,
"Reincarnated ProxyPushConsumer: "<<
oidstr)
248 DB(15,
"Attempting to reconnect ProxyPushConsumer: "<<
oidstr)
253 PortableServer::ObjectId_var
objectId =
254 PortableServer::string_to_ObjectId(
oidstr);
255 CORBA::Object_var
obj =
256 _poa->create_reference_with_id(
258 CosEventChannelAdmin::_tc_ProxyPushConsumer->id()
262 DB(7,
"Reconnected ProxyPushConsumer: "<<
oidstr)
265 catch(CORBA::BAD_PARAM&) {
267 DB(5,
"Failed to reincarnate ProxyPushConsumer: "<<
oidstr)
269 catch(CosEventChannelAdmin::AlreadyConnected&){
271 DB(7,
"Remote ProxyPushSupplier already connected: "<<
oidstr)
273 catch(CosEventChannelAdmin::TypeError&){
275 DB(2,
"Remote ProxyPushSupplier threw TypeError: "<<
oidstr)
277 catch(CORBA::OBJECT_NOT_EXIST&) {}
278 catch(CORBA::TRANSIENT& ) {}
279 catch(CORBA::COMM_FAILURE& ) {}
286 for(Connections_t::const_iterator i=
_connections.begin();
290 i->second->output(os);
300 using namespace PortableServer;
305 catch(PortableServer::Current::NoContext&)
309 catch(CORBA::BAD_PARAM&)
316 throw CORBA::NO_IMPLEMENT();
325#if OMNIEVENTS__DEBUG_SERVANT
326int ProxyPushConsumer_i::Connection::_objectCount =0;
340#if OMNIEVENTS__DEBUG_SERVANT
342 DB(21,
"ProxyPushConsumer_i::Connection::Connection() count="<<
_objectCount)
348#if OMNIEVENTS__DEBUG_SERVANT
350 DB(20,
"ProxyPushConsumer_i::Connection::~Connection() count="<<
_objectCount)
352 DB(20,
"ProxyPushConsumer_i::Connection::~Connection()")
360 bool save =_targetIsProxy;
361 if(
req->return_value()>>=CORBA::Any::to_boolean(_targetIsProxy))
367 DB(15,
"ProxyPushConsumer is federated.");
372 DB(2,
"ProxyPushConsumer got unexpected callback.");
380 os<<
"/SupplierAdmin/ProxyPushConsumer/"<<_oidstr;
382 if(!CORBA::is_nil(_target.in()))
#define OMNIEVENTS__DEBUG_REF_COUNTS__DEFN(C)
Defines debug versions of _add/remove_ref() for class C.
T::_ptr_type createNarrowedReference(PortableServer::POA_ptr poa, const char *repositoryId)
Helper method that creates a new CORBA object and then narrows it to the appropriate type.
Interface for classes that wish to receive callbacks from deferred requests.
OMNIEVENTS__DEBUG_REF_COUNTS__DECL void send(CORBA::Any *event)
Queues a single event for sending to consumers.
static bool exists()
Library code may create Event Service objects without the need for persistency.
Obtains an output stream to the active persistancy logfile, and locks it for exclusive access.
void deferredRequest(CORBA::Request_ptr req, Callback *callback=NULL)
Adopts the request and then stores it in _deferredRequests.
Default servant for ProxyPushConsumer objects.
ProxyPushConsumer_i(PortableServer::POA_ptr parentPoa, list< CORBA::Any * > &q, ConsumerAdmin_i &consumerAdmin)
void disconnect_push_consumer()
We may not have a record of the supplier, so this method must accept calls from any supplier without ...
list< CORBA::Any * > & _queue
ConsumerAdmin_i & _consumerAdmin
CORBA::String_var _channelName
Connections_t _connections
virtual ~ProxyPushConsumer_i()
void push(const CORBA::Any &event)
Accepts events from any supplier, not just those stored in _connections.
string currentObjectId() const
void output(ostream &os) const
Save this object's state to a stream.
CosEventChannelAdmin::ProxyPushConsumer_ptr createObject()
Constructs a new object.
bool _useLocalQueue
Switch between RT/chunked modes.
void reincarnate(const PersistNode &node)
Re-create all servants from information saved in the log file.
void disconnect()
Send disconnect_push_supplier() to all connected PushSuppliers.
void connect_push_supplier(CosEventComm::PushSupplier_ptr pushSupplier)
If pushSupplier is provided, then it is stored in _connections.
Connection()
NO IMPLEMENTATION.
void output(ostream &os) const
Save this object's state to a stream.
PortableServer::POA_var _poa