OmniEvents
ConsumerAdmin.cc
Go to the documentation of this file.
1// Package : omniEvents
2// ConsumerAdmin.cc Created : 2003/12/04
3// Author : Alex Tingle
4//
5// Copyright (C) 2003-2005 Alex Tingle.
6//
7// This file is part of the omniEvents application.
8//
9// omniEvents is free software; you can redistribute it and/or
10// modify it under the terms of the GNU Lesser General Public
11// License as published by the Free Software Foundation; either
12// version 2.1 of the License, or (at your option) any later version.
13//
14// omniEvents is distributed in the hope that it will be useful,
15// but WITHOUT ANY WARRANTY; without even the implied warranty of
16// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17// Lesser General Public License for more details.
18//
19// You should have received a copy of the GNU Lesser General Public
20// License along with this library; if not, write to the Free Software
21// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
22//
23
24#include "ConsumerAdmin.h"
25
26#include "EventChannel.h"
27#include "ProxyPushSupplier.h"
28#include "ProxyPullSupplier.h"
29#include "Orb.h"
30#include "PersistNode.h"
31#include "Filter.h"
32
33namespace OmniEvents {
34
35
36CosEventChannelAdmin::ProxyPushSupplier_ptr
43
44
45CosEventChannelAdmin::ProxyPullSupplier_ptr
52
53
55 const EventChannel_i& channel,
56 PortableServer::POA_ptr poa
57)
58: Servant(poa),
59 _channel(channel),
60 _queue(channel.maxQueueLength()),
61 _pushSupplier(NULL),
62 _pullSupplier(NULL)
63{
64 if(_channel.properties().hasAttr("FilterId"))
65 {
66 string rid =_channel.properties().attrString("FilterId");
67 _queue.setFilter(new FilterByRepositoryId(rid.c_str()));
68 }
69 else if(_channel.properties().hasAttr("FilterKind"))
70 {
71 CORBA::TCKind kind =
72 CORBA::TCKind(_channel.properties().attrLong("FilterKind"));
74 }
75
76 activateObjectWithId("ConsumerAdmin");
77}
78
79
81{
82 DB(20,"~ConsumerAdmin_i()")
84 {
85 _pushSupplier->_remove_ref(); // terminates thread.
86 _pushSupplier=NULL;
87 }
89 {
90 _pullSupplier->_remove_ref();
91 _pullSupplier=NULL;
92 }
93}
94
95
97
98
99void ConsumerAdmin_i::send(CORBA::Any* event)
100{
102 _queue.append(event);
103}
104
105
106void ConsumerAdmin_i::send(list<CORBA::Any*>& events)
107{
108 if(!events.empty())
109 {
111 for(list<CORBA::Any*>::iterator i=events.begin(); i!=events.end(); ++i)
112 _queue.append( *i );
113 events.clear();
114 }
115}
116
117
125
126
128{
129 // Build Push Supplier proxies
130 PersistNode* pushsNode =node.child("ProxyPushSupplier");
131 if(pushsNode && !pushsNode->_child.empty())
132 {
134 _pushSupplier->reincarnate(*pushsNode);
135 }
136
137 // Build Pull Supplier proxies
138 PersistNode* pullsNode =node.child("ProxyPullSupplier");
139 if(pullsNode && !pullsNode->_child.empty())
140 {
142 _pullSupplier->reincarnate(*pullsNode);
143 }
144}
145
146
147void ConsumerAdmin_i::output(ostream& os)
148{
149 if(_pushSupplier)
150 {
151 omni_mutex_lock l(_pushSupplier->_lock);
153 }
154 if(_pullSupplier)
155 {
157 }
158}
159
160
161}; // end namespace OmniEvents
#define OMNIEVENTS__DEBUG_REF_COUNTS__DEFN(C)
Defines debug versions of _add/remove_ref() for class C.
Definition Servant.h:70
#define DB(l, x)
Definition Orb.h:49
OMNIEVENTS__DEBUG_REF_COUNTS__DECL void send(CORBA::Any *event)
Queues a single event for sending to consumers.
ConsumerAdmin_i(const EventChannel_i &channel, PortableServer::POA_ptr poa)
void reincarnate(const PersistNode &node)
Populate this servant from log information.
void disconnect()
Send disconnect_XXX_consumer() to all connected consumers.
ProxyPushSupplierManager * _pushSupplier
const EventChannel_i & _channel
CosEventChannelAdmin::ProxyPushSupplier_ptr obtain_push_supplier()
CosEventChannelAdmin::ProxyPullSupplier_ptr obtain_pull_supplier()
void output(ostream &os)
Save this object's state to a stream.
ProxyPullSupplierManager * _pullSupplier
Servant for CosEventChannelAdmin::EventChannel objects, also inherits from omni_thread.
const PersistNode & properties() const
void setFilter(Filter *filter)
Definition EventQueue.h:72
void append(CORBA::Any *event)
Definition EventQueue.h:78
The most basic event filter allows only events of a certain CORBA TCKind to pass.
Definition Filter.h:67
Allows only events of a certain CORBA RepositoryId to pass.
Definition Filter.h:85
map< string, PersistNode * > _child
Definition PersistNode.h:71
bool hasAttr(const string &key) const
string attrString(const string &key, const string &fallback="") const
long attrLong(const string &key, long fallback=0) const
PersistNode * child(const string &key) const
void reincarnate(const PersistNode &node)
Re-create servants from information saved in the log file.
void output(ostream &os)
Save this object's state to a stream.
void disconnect()
Send disconnect_pull_consumer() to all connected PullConsumers.
OMNIEVENTS__DEBUG_REF_COUNTS__DECL CosEventChannelAdmin::ProxyPullSupplier_ptr createObject()
void disconnect()
Send disconnect_push_consumer() to all connected PushConsumers.
CosEventChannelAdmin::ProxyPushSupplier_ptr createObject()
void _remove_ref()
Shutdown the thread when refCount reaches zero.
Helper class that locks ProxyPushSupplier upon construction, and wakes it up on destruction.
Base class for servants.
Definition Servant.h:114
PortableServer::POA_var _poa
Definition Servant.h:131
void activateObjectWithId(const char *oidStr)
Calls activate_object_with_id() to activate this servant in its POA.
Definition Servant.cc:125