OmniEvents
ProxyPullSupplier.cc
Go to the documentation of this file.
1// Package : omniEvents
2// ProxyPullSupplier.cc Created : 2003/12/04
3// Author : Alex Tingle
4//
5// Copyright (C) 2003-2005 Alex Tingle.
6//
7// This file is part of the omniEvents application.
8//
9// omniEvents is free software; you can redistribute it and/or
10// modify it under the terms of the GNU Lesser General Public
11// License as published by the Free Software Foundation; either
12// version 2.1 of the License, or (at your option) any later version.
13//
14// omniEvents is distributed in the hope that it will be useful,
15// but WITHOUT ANY WARRANTY; without even the implied warranty of
16// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17// Lesser General Public License for more details.
18//
19// You should have received a copy of the GNU Lesser General Public
20// License along with this library; if not, write to the Free Software
21// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
22//
23
24#include "ProxyPullSupplier.h"
25#include "EventChannel.h"
26#include "Orb.h"
27#include "omniEventsLog.h"
28#include "PersistNode.h"
29#include <assert.h>
30
31namespace OmniEvents {
32
33//
34// ProxyPullSupplierManager
35//
36
38 const PortableServer::ObjectId& oid,
39 PortableServer::POA_ptr poa
40)
41{
42 // Evict the oldest proxy servant, if we have reached the maximum number.
43 if(_servants.size()>=_channel.maxNumProxies())
44 {
46 unsigned long age =0;
47 for(set<Proxy*>::iterator i=_servants.begin(); i!=_servants.end(); ++i)
48 if(!oldest || dynamic_cast<ProxyPullSupplier_i*>(*i)->timestamp()<age)
49 {
50 oldest=dynamic_cast<ProxyPullSupplier_i*>(*i);
52 }
53 DB(5,"Evicting oldest ProxyPullSupplier to make space for a new one")
54 try{ oldest->disconnect_pull_supplier(); }catch(CORBA::OBJECT_NOT_EXIST&){}
55 }
56 // Make a new servant.
58 _servants.insert(result);
59 return result;
60}
61
64 PortableServer::POA_ptr parentPoa,
66)
68 _queue(q),
69 _channel(channel)
70{
71 ProxyManager::activate("ProxyPullSupplier");
72}
73
75{
76 DB(20,"~ProxyPullSupplierManager()")
77}
78
80
81CosEventChannelAdmin::ProxyPullSupplier_ptr
83{
85 _managedPoa.in(),
86 CosEventChannelAdmin::_tc_ProxyPullSupplier->id()
87 );
88}
89
91{
92 for(set<Proxy*>::iterator i =_servants.begin(); i!=_servants.end(); ++i)
93 {
94 ProxyPullSupplier_i* pps =dynamic_cast<ProxyPullSupplier_i*>(*i);
95 // We are in the EventChannel's thread.
96 // Make sure all calls go though the ProxyPullSupplier POA.
97 CosEventChannelAdmin::ProxyPullSupplier_var ppsv =pps->_this();
99
100 }
101}
102
103
104//
105// ProxyPullSupplier_i
106//
107
108// CORBA interface methods
109
111 CosEventComm::PullConsumer_ptr pullConsumer
112)
113{
114 if(_connected || !CORBA::is_nil(_target) || !CORBA::is_nil(_req))
115 throw CosEventChannelAdmin::AlreadyConnected();
116 touch();
117 _connected=true;
118 if(!CORBA::is_nil(pullConsumer))
119 _target=CosEventComm::PullConsumer::_duplicate(pullConsumer);
120
122 {
123 WriteLock log;
124 output(log.os);
125 }
126}
127
129{
130 DB(5,"ProxyPullSupplier_i::disconnect_pull_supplier()");
131 touch();
132 eraseKey("ConsumerAdmin/ProxyPullSupplier");
134 if(!_connected)
135 {
136 throw CORBA::OBJECT_NOT_EXIST(
137 IFELSE_OMNIORB4(omni::OBJECT_NOT_EXIST_NoMatch,0),
138 CORBA::COMPLETED_NO
139 );
140 }
141 else if(!CORBA::is_nil(_target))
142 {
143 CORBA::Request_var req=_target->_request("disconnect_pull_consumer");
144 _target=CosEventComm::PullConsumer::_nil();
145 req->send_deferred();
146 Orb::inst().deferredRequest(req._retn());
147 }
148}
149
151{
152 if(!_connected)
153 throw CosEventComm::Disconnected();
154 touch();
155 if(moreEvents())
156 return new CORBA::Any(*nextEvent());
157 else
158 throw CORBA::TRANSIENT(
159 IFELSE_OMNIORB4(omni::TRANSIENT_CallTimedout,0),
160 CORBA::COMPLETED_NO
161 );
162}
163
164CORBA::Any* ProxyPullSupplier_i::try_pull(CORBA::Boolean& has_event)
165{
166 if(!_connected)
167 throw CosEventComm::Disconnected();
168 touch();
169 if(moreEvents())
170 {
171 has_event=1;
172 return new CORBA::Any(*nextEvent());
173 }
174 else
175 {
176 has_event=0;
177 return new CORBA::Any();
178 }
179}
180
181//
182
184 PortableServer::POA_ptr poa,
186)
187: Proxy(poa),
188 EventQueue::Reader(q),
189 _target(CosEventComm::PullConsumer::_nil()),
190 _connected(false),
191 _timestamp(0)
192{
193 touch();
194}
195
197{
198 DB(20,"~ProxyPullSupplier_i()")
199}
200
202 const string& oid,
203 const PersistNode& node
204)
205{
206 CosEventComm::PullConsumer_var pullConsumer =
207 string_to_<CosEventComm::PullConsumer>(node.attrString("IOR").c_str());
208 // Do not activate until we know that we have read a valid target.
209 activateObjectWithId(oid.c_str());
211}
212
214{
215 basicOutput(os,"ConsumerAdmin/ProxyPullSupplier",_target.in());
216}
217
219{
220 unsigned long nsec; // dummy
221 omni_thread::get_time(&_timestamp,&nsec);
222}
223
224}; // end namespace OmniEvents
#define OMNIEVENTS__DEBUG_REF_COUNTS__DEFN(C)
Defines debug versions of _add/remove_ref() for class C.
Definition Servant.h:70
#define IFELSE_OMNIORB4(omniORB4_code, default_code)
Definition Orb.h:45
#define DB(l, x)
Definition Orb.h:49
T::_ptr_type createNarrowedReference(PortableServer::POA_ptr poa, const char *repositoryId)
Helper method that creates a new CORBA object and then narrows it to the appropriate type.
Definition Servant.h:96
Servant for CosEventChannelAdmin::EventChannel objects, also inherits from omni_thread.
CORBA::ULong maxNumProxies() const
The EventQueue is a circular buffer, that contains _size-1 events.
Definition EventQueue.h:57
static bool exists()
Library code may create Event Service objects without the need for persistency.
Obtains an output stream to the active persistancy logfile, and locks it for exclusive access.
void deferredRequest(CORBA::Request_ptr req, Callback *callback=NULL)
Adopts the request and then stores it in _deferredRequests.
Definition Orb.cc:187
static Orb & inst()
Definition Orb.h:81
Base class for ServantActivator classes that manage Proxy servants.
void activate(const char *name)
Creates the Proxy-type's POA, and registers this object as its ServantManager.
set< Proxy * > _servants
The set of all active Proxies in this object's _managedPoa.
PortableServer::POA_var _managedPoa
The POA owned & managed by this object.
Base class for three of the four Proxy servants.
void basicOutput(ostream &os, const char *name, CORBA::Object_ptr target=CORBA::Object::_nil(), const char *extraAttributes=NULL)
Helper method for constructing persistency output.
CORBA::Request_var _req
void eraseKey(const char *name)
Helper method for constructing persistency output.
EventQueue & _queue
Reference to queue shared with ProxyPushSuppliers.
void disconnect()
Send disconnect_pull_consumer() to all connected PullConsumers.
OMNIEVENTS__DEBUG_REF_COUNTS__DECL CosEventChannelAdmin::ProxyPullSupplier_ptr createObject()
ProxyPullSupplierManager(const EventChannel_i &channel, PortableServer::POA_ptr parentPoa, EventQueue &q)
PortableServer::Servant incarnate(const PortableServer::ObjectId &oid, PortableServer::POA_ptr poa)
Servant for ProxyPullSupplier interface.
void output(ostream &os)
Save this object's state to a stream.
ProxyPullSupplier_i(PortableServer::POA_ptr poa, EventQueue &q)
void touch()
Update the _timestamp to the current moment.
CORBA::Any * try_pull(CORBA::Boolean &has_event)
CosEventComm::PullConsumer_var _target
unsigned long _timestamp
Keep track of when this proxy was last contacted.
bool _connected
Can't use _target to keep track of whether this object is connected, because it is legal to connect w...
void connect_pull_consumer(CosEventComm::PullConsumer_ptr pullConsumer)
void reincarnate(const string &oid, const PersistNode &node)
Re-create a servant from information saved in the log file.
void activateObjectWithId(const char *oidStr)
Calls activate_object_with_id() to activate this servant in its POA.
Definition Servant.cc:125
void deactivateObject()
Calls deactivate_object() to deactivate this servant in its POA.
Definition Servant.cc:160