OmniEvents
EventChannel.cc
Go to the documentation of this file.
1// Package : omniEvents
2// EventChannel.cc Created : 2003/12/04
3// Author : Alex Tingle
4//
5// Copyright (C) 2003-2005 Alex Tingle.
6//
7// This file is part of the omniEvents application.
8//
9// omniEvents is free software; you can redistribute it and/or
10// modify it under the terms of the GNU Lesser General Public
11// License as published by the Free Software Foundation; either
12// version 2.1 of the License, or (at your option) any later version.
13//
14// omniEvents is distributed in the hope that it will be useful,
15// but WITHOUT ANY WARRANTY; without even the implied warranty of
16// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17// Lesser General Public License for more details.
18//
19// You should have received a copy of the GNU Lesser General Public
20// License along with this library; if not, write to the Free Software
21// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
22//
23
24#include "EventChannel.h"
25#include "ConsumerAdmin.h"
26#include "SupplierAdmin.h"
27#include "omniEventsLog.h"
28#include "Orb.h"
29
30#include <list>
31
32namespace OmniEvents {
33
34// CORBA interface methods
35CosEventChannelAdmin::ConsumerAdmin_ptr EventChannel_i::for_consumers()
36{
38 throw CORBA::OBJECT_NOT_EXIST();
39 return _consumerAdmin->_this();
40}
41
42
43CosEventChannelAdmin::SupplierAdmin_ptr EventChannel_i::for_suppliers()
44{
46 throw CORBA::OBJECT_NOT_EXIST();
47 return _supplierAdmin->_this();
48}
49
50
52{
54 throw CORBA::OBJECT_NOT_EXIST();
55
56 // Prevent further incoming connections.
58
59 DB(5,"EventChannel_i::destroy()")
60
61 // Send disconnect messages to connected clients.
66}
67
68
70: Servant(PortableServer::POA::_nil()),
71 _eventChannelStore(store),
72 _consumerAdmin(NULL),
73 _supplierAdmin(NULL),
74 _poaManager(),
75 _shutdownRequested(false),
76 _properties(),
77 _mapper(NULL),
78 _lock(),
79 _refCount(1)
80{}
81
82
84 const char* channelName,
85 const PersistNode* node
86)
87{
88 // The order of these various initialization methods is very important.
89 // I've documented dependencies as 'REQUIRES' comments.
90
91 createPoa(channelName);
92
93 if(node)
95
96 // REQUIRES: _properties
98
99 // REQUIRES: _consumerAdmin, _properties
101
102 if(node)
103 {
104 PersistNode* saNode =node->child("SupplierAdmin");
105 if(saNode)
106 _supplierAdmin->reincarnate(*saNode);
107
108 PersistNode* caNode =node->child("ConsumerAdmin");
109 if(caNode)
110 _consumerAdmin->reincarnate(*caNode);
111 }
112
113 activateObjectWithId("EventChannel");
114
115 // Remove the constructor's reference. This object will now be destroyed when
116 // the POA releases it.
117 _remove_ref();
118
119 // REQUIRES: activate() ...since it uses _this().
121
122 // Start the channel's thread running.
123 start_undetached();
124}
125
126
128{
129 DB(20,"~EventChannel_i()")
130 // Destroy the mapper object, even when the EventChannel is being shut down
131 // without a call to destroy(). This can happen if the channel is
132 // implemented through libomniEvents - the channel could be shut down and
133 // later reincarnated in the same process. The Mapper's lifecycle should
134 // match that of the EventChannel.
135 if(_mapper)
136 {
137 _mapper->destroy();
138 _mapper=NULL;
139 }
141 {
142 _consumerAdmin->_remove_ref();
143 _consumerAdmin=NULL;
144 }
146 {
147 _supplierAdmin->_remove_ref();
148 _supplierAdmin=NULL;
149 }
150}
151
152
154{
155 // Ensure that activate() is called before start()/run().
156 assert(!CORBA::is_nil(_poa));
157
158 const char* action="";
159 try
160 {
162 {
163 action="add this object to the store";
165 }
166
168 {
169 action="create this object in the persistency database";
170 WriteLock log;
171 output(log.os);
172 }
173
174 // Process events until the channel is destroyed.
175 action="run main loop";
176 mainLoop();
177
179 {
180 action="remove this object from the store";
182 }
183
185 {
187 {
188 action="remove record from persistency database";
189 CORBA::String_var poaName =_poa->the_name();
190 WriteLock log;
191 log.os<<"-ecf/"<<poaName.in()<<'\n';
192 }
193 action="destroy POA";
194 _poa->destroy(
195 CORBA::Boolean(1) /* etherealize_objects */,
196 CORBA::Boolean(0) /* wait_for_completion */
197 );
198 _poa=PortableServer::POA::_nil();
199
200 } // end if(_shutdownRequested)
201
202 }
203 catch(PortableServer::POAManager::AdapterInactive& ex) {
204 DB(0,"EventChannel_i::run_undetached() - failed to "<<action<<
205 ", POA deactivated from the outside.")
206 }
207 catch (CORBA::SystemException& ex) {
208 DB(0,"EventChannel_i::run_undetached() - failed to "<<action<<
209 ", System exception: "<<ex._name()<<" ("<<NP_MINORSTRING(ex)<<")")
210 }
211 catch (CORBA::Exception& ex) {
212 DB(0,"EventChannel_i::run_undetached() - failed to "<<action<<
213 ", CORBA exception: "<<ex._name())
214 }
215
216 // Thread now exits, and this object is deleted.
217 return NULL;
218}
219
220
222{
223 _poaManager->activate();
224 unsigned long localCyclePeriod_ns=cyclePeriod_ns();
225 while(_refCount>0 && !_shutdownRequested)
226 {
227 //
228 // TRANSFER PHASE - transfer events from SupplierAdmin to ConsumerAdmin.
229 _poaManager->hold_requests(CORBA::Boolean(1) /* wait_for_completion */);
230
231 if(_shutdownRequested) break;
232
233 list<CORBA::Any*> events;
234 _supplierAdmin->collect(events);
235 _consumerAdmin->send(events);
236 assert(events.empty());
237
238 _poaManager->activate();
239
240 //
241 // COMMUNICATION PHASE - talk with clients' suppliers & consumers.
242 // Note: On Linux the resolution of nanosleep is a huge 10ms.
243 omni_thread::sleep(0,localCyclePeriod_ns);
244 }
245}
246
247
249{
250#if OMNIEVENTS__DEBUG_REF_COUNTS
251 DB(20,"EventChannel_i::_add_ref()")
252#endif
253 omni_mutex_lock pause(_lock);
254 ++_refCount;
255}
256
257
259{
260#if OMNIEVENTS__DEBUG_REF_COUNTS
261 DB(20,"EventChannel_i::_remove_ref()")
262#endif
263 int myref;
264 {
265 omni_mutex_lock pause(_lock);
266 myref = --_refCount;
267 }
268
269 if(myref<0)
270 {
271 DB(2,"EventChannel has negative ref count! "<<myref)
272 }
273 else if(myref==0)
274 {
275 DB(15,"EventChannel has zero ref count -- shutdown.")
276 join(NULL);
277 }
278}
279
280
281void EventChannel_i::output(ostream& os)
282{
283 CORBA::String_var poaName =_poa->the_name();
284 string name =string("ecf/")+poaName.in();
285 _properties.output(os,name);
290}
291
292
293void EventChannel_i::setInsName(const string v)
294{
295 Mapper* newMapper =NULL;
296 try
297 {
298
299 // If _insName is set, then create a mapper object to allow clients to
300 // find this object with a `corbaloc' string.
301 if(!v.empty())
302 {
303 // !! Throws when there is already an object named 'v' in the INSPOA.
304 CORBA::Object_var obj( _this() );
305 newMapper=new Mapper(v.c_str(),obj.in());
306 }
307 // Deactivate the old _mapper object.
308 if(_mapper)
309 _mapper->destroy();
310 _mapper=newMapper;
311
312 }
313 catch(...)
314 {
315 // Can't use an auto_ptr, because MS VC++ 6 has no auto_ptr::reset()
316 delete newMapper;
317 throw;
318 }
319}
320
321
322void EventChannel_i::createPoa(const char* channelName)
323{
324 using namespace PortableServer;
325 POA_ptr p=Orb::inst()._RootPOA.in();
326
327 // POLICIES:
328 // Lifespan =PERSISTENT // we can persist
329 // Assignment =USER_ID // write our own oid
330 // Uniqueness =[default] UNIQUE_ID // one servant per object
331 // ImplicitActivation=[default] IMPLICIT_ACTIVATION // auto activation
332 // RequestProcessing =[default] USE_ACTIVE_OBJECT_MAP_ONLY
333 // ServantRetention =[default] RETAIN // stateless POA
334 // Thread =SINGLE_THREAD_MODEL // keep it simple
335
336 CORBA::PolicyList policies;
337 policies.length(3);
338 policies[0]=p->create_lifespan_policy(PERSISTENT);
339 policies[1]=p->create_id_assignment_policy(USER_ID);
340 policies[2]=p->create_thread_policy(SINGLE_THREAD_MODEL);
341
342 try // finally
343 {
344 try
345 {
346 // Create a new POA (and new POAManager) for this channel.
347 // The POAManager will be used for all of this channel's POAs.
348 _poa=p->create_POA(channelName,POAManager::_nil(),policies);
349 _poaManager=_poa->the_POAManager();
350 }
351 catch(POA::AdapterAlreadyExists& ex) // create_POA
352 {
353 DB(0,"EventChannel_i::createPoa() - POA::AdapterAlreadyExists")
354 throw;
355 }
356 catch(POA::InvalidPolicy& ex) // create_POA
357 {
358 DB(0,"EventChannel_i::createPoa() - POA::InvalidPolicy: "<<ex.index)
359 throw;
360 }
361 }
362 catch(...) // finally
363 {
364 // Destroy the policy objects (Not strictly necessary in omniORB)
365 for(CORBA::ULong i=0; i<policies.length(); ++i)
366 policies[i]->destroy();
367 throw;
368 }
369
370 // Destroy the policy objects (Not strictly necessary in omniORB)
371 for(CORBA::ULong i=0; i<policies.length(); ++i)
372 policies[i]->destroy();
373}
374
375
376//
377// class EventChannelStore
378//
379
380
382:_channels(),_lock()
383{}
384
386{
387 // ?? IMPLEMENT ME
388}
389
391{
392 omni_mutex_lock l(_lock);
393 bool insertOK =_channels.insert(channel).second;
394 if(!insertOK)
395 DB(2,"Attempted to store an EventChannel, when it is already stored.");
396}
397
399{
400 omni_mutex_lock l(_lock);
401 set<EventChannel_i*>::iterator pos =_channels.find(channel);
402 if(pos==_channels.end())
403 DB(2,"Failed to erase unknown EventChannel.")
404 else
405 _channels.erase(pos);
406}
407
409{
410 omni_mutex_lock l(_lock);
411 for(set<EventChannel_i*>::iterator i=_channels.begin();
412 i!=_channels.end();
413 ++i)
414 {
415 (*i)->output(os);
416 }
417}
418
419
420}; // end namespace OmniEvents
421
#define DB(l, x)
Definition Orb.h:49
#define NP_MINORSTRING(systemException)
Definition Orb.h:52
OMNIEVENTS__DEBUG_REF_COUNTS__DECL void send(CORBA::Any *event)
Queues a single event for sending to consumers.
void reincarnate(const PersistNode &node)
Populate this servant from log information.
void disconnect()
Send disconnect_XXX_consumer() to all connected consumers.
void output(ostream &os)
Save this object's state to a stream.
Servant for CosEventChannelAdmin::EventChannel objects, also inherits from omni_thread.
void activate(const char *channelName, const PersistNode *node=NULL)
Creates the channel's POA, and any child objects.
~EventChannel_i()
Cleans up the _poa, if this object is deleted before its thread starts.
void _remove_ref()
Shutdown the thread when refCount reaches zero.
void mainLoop()
The main loop for a channel.
void * run_undetached(void *)
Entry point for the channel's thread.
EventChannelStore * _eventChannelStore
ConsumerAdmin_i * _consumerAdmin
PortableServer::POAManager_var _poaManager
void createPoa(const char *channelName)
Constructs the main POA for this channel.
unsigned long cyclePeriod_ns() const
CosEventChannelAdmin::ConsumerAdmin_ptr for_consumers()
CosEventChannelAdmin::SupplierAdmin_ptr for_suppliers()
SupplierAdmin_i * _supplierAdmin
EventChannel_i(EventChannelStore *store=NULL)
void setInsName(const string v)
Construct a new Mapper object, and registers it in the INSPOA.
Container for Event Channels.
void insert(EventChannel_i *channel)
set< EventChannel_i * > _channels
void erase(EventChannel_i *channel)
A dummy servant that installs itself into the INSPOA and redirects all calls to the real destination.
Definition Mapper.h:36
static bool exists()
Library code may create Event Service objects without the need for persistency.
Obtains an output stream to the active persistancy logfile, and locks it for exclusive access.
PortableServer::POA_var _RootPOA
Definition Orb.h:89
static Orb & inst()
Definition Orb.h:81
void output(ostream &os, string name) const
map< string, string > _attr
Definition PersistNode.h:72
string attrString(const string &key, const string &fallback="") const
PersistNode * child(const string &key) const
Base class for servants.
Definition Servant.h:114
PortableServer::POA_var _poa
Definition Servant.h:131
void activateObjectWithId(const char *oidStr)
Calls activate_object_with_id() to activate this servant in its POA.
Definition Servant.cc:125
void disconnect()
Send disconnect_XXX_supplier() to all connected consumers.
void output(ostream &os)
Save this object's state to a stream.
OMNIEVENTS__DEBUG_REF_COUNTS__DECL void collect(list< CORBA::Any * > &events)
Collects all events that have arrived since the last call.
void reincarnate(const PersistNode &node)
Populate this servant from log information.