OmniEvents
pushcons.cc
Go to the documentation of this file.
1// -*- Mode: C++; -*-
2// Package : omniEvents
3// pushcons.cc Created : 1/4/98
4// Author : Paul Nader (pwn)
5//
6// Copyright (C) 1998 Paul Nader, 2003-2004 Alex Tingle
7//
8// This file is part of the omniEvents application.
9//
10// omniEvents is free software; you can redistribute it and/or
11// modify it under the terms of the GNU Lesser General Public
12// License as published by the Free Software Foundation; either
13// version 2.1 of the License, or (at your option) any later version.
14//
15// omniEvents is distributed in the hope that it will be useful,
16// but WITHOUT ANY WARRANTY; without even the implied warranty of
17// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
18// Lesser General Public License for more details.
19//
20// You should have received a copy of the GNU Lesser General Public
21// License along with this library; if not, write to the Free Software
22// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
23//
24// Description:
25// Push Model consumer implementation
26//
27
28/*
29 $Log: pushcons.cc,v $
30 Revision 1.12.2.1 2005/06/16 09:39:49 alextingle
31 Fixed theoretical race caused by sloppy use of condition variable.
32
33 Revision 1.12 2004/10/08 09:06:08 alextingle
34 More robust exception minor code handling.
35
36 Revision 1.11 2004/08/18 17:49:45 alextingle
37 Added check for SIGPIPE before trying to use it.
38
39 Revision 1.10 2004/08/06 16:19:23 alextingle
40 -k & -K options removed.
41 Naming service names may now be as complex as you like.
42
43 Revision 1.9 2004/04/30 17:54:47 alextingle
44 Corrected handling of CORBA::Any.
45
46 Revision 1.8 2004/04/20 16:52:17 alextingle
47 All examples updated for latest version on omniEvents. Server may now be
48 specified as a 'corbaloc' string or IOR, instead of as naming service id/kind.
49
50 Revision 1.7 2004/04/01 22:28:36 alextingle
51 Corrected usage message.
52
53 Revision 1.6 2004/03/23 19:09:26 alextingle
54 Fixed typos.
55
56 Revision 1.5 2004/02/21 19:07:45 alextingle
57 Corrected servants to use POA instead of BOA.
58
59 Revision 1.4 2004/02/04 22:29:55 alextingle
60 Reworked all C++ examples.
61 Removed catch(...) as it tends to make it harder to see what's going on.
62 Now uses POA instead of BOA.
63 Uses omniORB4's Exception name probing.
64 No longer uses 'naming.h/cc' utility code.
65
66 Revision 1.3 2003/11/03 22:19:56 alextingle
67 Removed all platform specific switches. Now uses autoconf, config.h.
68 Removed stub header in order to allow makefile dependency checking to work
69 correctly.
70 Corrected usage of omni_condition/omni_mutex. Mutexes are now always unlocked by
71 the same thread that locked them.
72
73 Revision 1.1.1.1.2.1 2002/09/28 22:20:51 shamus13
74 Added ifdefs to enable omniEvents to compile
75 with both omniORB3 and omniORB4. If __OMNIORB4__
76 is defined during compilation, omniORB4 headers
77 and command line option syntax is used, otherwise
78 fall back to omniORB3 style.
79
80 Revision 1.1.1.1 2002/09/25 19:00:26 shamus13
81 Import of OmniEvents source tree from release 2.1.1
82
83 Revision 0.13 2000/08/30 04:39:48 naderp
84 Port to omniORB 3.0.1.
85
86 Revision 0.12 2000/03/16 05:37:27 naderp
87 Added stdlib.h for getopt.
88
89 Revision 0.11 2000/03/06 13:27:02 naderp
90 Using util getRootNamingContext function.
91 Using stub headers.
92 Fixed error messages.
93
94 Revision 0.10 2000/03/02 03:20:24 naderp
95 Added retry resiliency for handling COMM_FAUILURE exceptions.
96
97 Revision 0.9 1999/11/02 13:39:15 naderp
98 Added <signal.h>
99
100 Revision 0.8 1999/11/02 07:57:04 naderp
101 Updated usage.
102
103Revision 0.7 99/11/01 18:10:29 18:10:29 naderp (Paul Nader)
104Added ahndling of COMM_FAILURE exception for connect_push_consumer.
105
106Revision 0.6 99/11/01 16:11:03 16:11:03 naderp (Paul Nader)
107omniEvents 2.0 Release.
108
109Revision 0.5 99/10/27 19:46:01 19:46:01 naderp (Paul Nader)
110Ignoring Unix SIGPIPE signal.
111Catching COMM_FAILURE exception for obtain_push_supplier.
112Continuing if it fails to obtain Proxy Supplier.
113Try/Catch block for disconnect_push_supplier.
114
115Revision 0.4 99/04/23 16:05:46 16:05:46 naderp (Paul Nader)
116gcc port.
117
118Revision 0.3 99/04/23 09:34:03 09:34:03 naderp (Paul Nader)
119Windows Port.
120
121Revision 0.2 99/04/21 18:06:26 18:06:26 naderp (Paul Nader)
122*** empty log message ***
123
124Revision 0.1.1.1 98/11/27 16:59:37 16:59:37 naderp (Paul Nader)
125Added -s option to sleep after disconnecting.
126
127Revision 0.1 98/11/25 14:08:21 14:08:21 naderp (Paul Nader)
128Initial Revision
129
130*/
131
132#ifdef HAVE_CONFIG_H
133# include "config.h"
134#endif
135
136#ifdef HAVE_GETOPT
137# include <unistd.h>
138extern char* optarg;
139extern int optind;
140#else
141# include "getopt.h"
142#endif
143
144#ifdef HAVE_IOSTREAM
145# include <iostream>
146#else
147# include <iostream.h>
148#endif
149
150#ifdef HAVE_STD_IOSTREAM
151using namespace std;
152#endif
153
154#ifdef HAVE_STDLIB_H
155# include <stdlib.h>
156#endif
157
158#ifdef HAVE_SIGNAL_H
159# include <signal.h>
160#endif
161
162#include <cstdio>
163
164#include "CosEventComm.hh"
165#include "CosEventChannelAdmin.hh"
166#include "naming.h"
167
168static omni_mutex mutex;
169static omni_condition connect_cond(&mutex);
170static void usage(int argc, char **argv);
171
172class Consumer_i : virtual public POA_CosEventComm::PushConsumer {
173public:
174 Consumer_i(long disconnect=0): _disconnect(disconnect) {}
175
176 void push(const CORBA::Any& data);
178
179private:
181};
182
183void Consumer_i::push(const CORBA::Any& data) {
184 CORBA::ULong l;
185 static int i = 0;
186
187 i++;
188 if( data>>=l )
189 {
190 cout<<"Push Consumer: push() called. Data : "<< l <<endl;
191
192 // Exercise Disconnect
193 if (i == _disconnect)
194 {
195 i = 0;
196 // NOTE : The proxy_supplier object is disposed at the server
197 // during the disconnect_push_supplier call. Do NOT
198 // use the proxy_supplier reference after disconnecting.
199
200 // Signal main thread to disconnect and re-connect.
201 omni_mutex_lock condition_lock(mutex); // ensure main thread in wait()
202 connect_cond.signal();
203 }
204 }
205 else
206 {
207 cerr<<"Push Consumer: push() called. UNEXPECTED TYPE"<<endl;
208 }
209}
210
212 cout << "Push Consumer: disconnected." << endl;
213}
214
215int
216main(int argc, char **argv)
217{
218 //
219 // Start orb.
220 CORBA::ORB_ptr orb = CORBA::ORB_init(argc,argv);
221
222 // Process Options
223 int discnum =0;
224 int sleepInterval =0;
225 const char* channelName ="EventChannel";
226
227 int c;
228 while ((c = getopt(argc,argv,"hd:s:n:")) != EOF)
229 {
230 switch (c)
231 {
232 case 'd': discnum = atoi(optarg);
233 break;
234
235 case 's': sleepInterval = atoi(optarg);
236 break;
237
238 case 'n': channelName = optarg;
239 break;
240
241 case 'h': usage(argc,argv);
242 exit(0);
243 default : usage(argc,argv);
244 exit(-1);
245 }
246 }
247
248#if defined(HAVE_SIGNAL_H) && defined(SIGPIPE)
249 // Ignore broken pipes
250 signal(SIGPIPE, SIG_IGN);
251#endif
252
253 Consumer_i* consumer = new Consumer_i (discnum);
254 CosEventChannelAdmin::EventChannel_var channel;
255
256 const char* action=""; // Use this variable to help report errors.
257 try {
258 CORBA::Object_var obj;
259
260 action="resolve initial reference 'RootPOA'";
261 obj=orb->resolve_initial_references("RootPOA");
262 PortableServer::POA_var rootPoa =PortableServer::POA::_narrow(obj);
263 if(CORBA::is_nil(rootPoa))
264 throw CORBA::OBJECT_NOT_EXIST();
265
266 action="activate the RootPOA's POAManager";
267 PortableServer::POAManager_var pman =rootPoa->the_POAManager();
268 pman->activate();
269
270 //
271 // Obtain object reference to EventChannel
272 // (from command-line argument or from the Naming Service).
273 if(optind<argc)
274 {
275 action="convert URI from command line into object reference";
276 obj=orb->string_to_object(argv[optind]);
277 }
278 else
279 {
280 action="resolve initial reference 'NameService'";
281 obj=orb->resolve_initial_references("NameService");
282 CosNaming::NamingContext_var rootContext=
283 CosNaming::NamingContext::_narrow(obj);
284 if(CORBA::is_nil(rootContext))
285 throw CORBA::OBJECT_NOT_EXIST();
286
287 action="find EventChannel in NameService";
288 cout << action << endl;
289 obj=rootContext->resolve(str2name(channelName));
290 }
291
292 action="narrow object reference to event channel";
293 channel=CosEventChannelAdmin::EventChannel::_narrow(obj);
294 if(CORBA::is_nil(channel))
295 {
296 cerr << "Failed to narrow Event Channel reference." << endl;
297 exit(1);
298 }
299
300 }
301 catch(CORBA::ORB::InvalidName& ex) { // resolve_initial_references
302 cerr<<"Failed to "<<action<<". ORB::InvalidName"<<endl;
303 exit(1);
304 }
305 catch(CosNaming::NamingContext::InvalidName& ex) { // resolve
306 cerr<<"Failed to "<<action<<". NamingContext::InvalidName"<<endl;
307 exit(1);
308 }
309 catch(CosNaming::NamingContext::NotFound& ex) { // resolve
310 cerr<<"Failed to "<<action<<". NamingContext::NotFound"<<endl;
311 exit(1);
312 }
313 catch(CosNaming::NamingContext::CannotProceed& ex) { // resolve
314 cerr<<"Failed to "<<action<<". NamingContext::CannotProceed"<<endl;
315 exit(1);
316 }
317 catch(CORBA::TRANSIENT& ex) { // _narrow()
318 cerr<<"Failed to "<<action<<". TRANSIENT"<<endl;
319 exit(1);
320 }
321 catch(CORBA::OBJECT_NOT_EXIST& ex) { // _narrow()
322 cerr<<"Failed to "<<action<<". OBJECT_NOT_EXIST"<<endl;
323 exit(1);
324 }
325 catch(CORBA::SystemException& ex) {
326 cerr<<"Failed to "<<action<<".";
327#if defined(HAVE_OMNIORB4)
328 cerr<<" "<<ex._name();
329 if(ex.NP_minorString())
330 cerr<<" ("<<ex.NP_minorString()<<")";
331#endif
332 cerr<<endl;
333 exit(1);
334 }
335 catch(CORBA::Exception& ex) {
336 cerr<<"Failed to "<<action<<"."
337#if defined(HAVE_OMNIORB4)
338 " "<<ex._name()
339#endif
340 <<endl;
341 exit(1);
342 }
343
344 //
345 // Get Consumer admin interface - retrying on Comms Failure.
346 CosEventChannelAdmin::ConsumerAdmin_var consumer_admin;
347 while (1)
348 {
349 try {
350 consumer_admin = channel->for_consumers ();
351 if (CORBA::is_nil (consumer_admin))
352 {
353 cerr << "Event Channel returned nil Consumer Admin!" << endl;
354 exit(1);
355 }
356 break;
357 }
358 catch (CORBA::COMM_FAILURE& ex) {
359 cerr << "Caught COMM_FAILURE exception "
360 << "obtaining Consumer Admin! Retrying..."
361 << endl;
362 continue;
363 }
364 }
365 cout << "Obtained ConsumerAdmin." << endl;
366
367 omni_mutex_lock condition_lock(mutex);
368 while (1) {
369 //
370 // Get proxy supplier - retrying on Comms Failure.
371 CosEventChannelAdmin::ProxyPushSupplier_var proxy_supplier;
372 while (1)
373 {
374 try {
375 proxy_supplier = consumer_admin->obtain_push_supplier ();
376 if (CORBA::is_nil (proxy_supplier))
377 {
378 cerr << "Consumer Admin returned nil proxy_supplier!"
379 << endl;
380 exit (1);
381 }
382 break;
383 }
384 catch (CORBA::COMM_FAILURE& ex) {
385 cerr << "Caught COMM_FAILURE Exception "
386 << "obtaining Push Supplier! Retrying..."
387 << endl;
388 continue;
389 }
390 }
391 cout << "Obtained ProxyPushSupplier." << endl;
392
393 //
394 // Connect Push Consumer - retrying on Comms Failure.
395 while (1)
396 {
397 try {
398 proxy_supplier->connect_push_consumer(consumer->_this());
399 break;
400 }
401 catch (CORBA::BAD_PARAM& ex) {
402 cerr << "Caught BAD_PARAM Exception connecting Push Consumer!"
403 << endl;
404 exit (1);
405 }
406 catch (CosEventChannelAdmin::AlreadyConnected& ex) {
407 cerr << "Proxy Push Supplier already connected!"
408 << endl;
409 break;
410 }
411 catch (CORBA::COMM_FAILURE& ex) {
412 cerr << "Caught COMM_FAILURE exception "
413 << "connecting Push Consumer! Retrying..."
414 << endl;
415 continue;
416 }
417 }
418 cout << "Connected Push Consumer." << endl;
419
420 // Wait for indication to disconnect before re-connecting.
421 connect_cond.wait();
422
423 // Disconnect - retrying on Comms Failure.
424 while (1)
425 {
426 try {
427 proxy_supplier->disconnect_push_supplier();
428 break;
429 }
430 catch (CORBA::COMM_FAILURE& ex) {
431 cerr << "Caught COMM_FAILURE Exception "
432 << "disconnecting Push Consumer! Retrying..."
433 << endl;
434 continue;
435 }
436 }
437 cout << "Disconnected Push Consumer." << endl;
438
439 // Yawn
440 cout << "Sleeping " << sleepInterval << " seconds." << endl;
441 omni_thread::sleep(sleepInterval);
442 }
443
444 // NEVER GET HERE
445 return 0;
446}
447
448static void
449usage(int argc, char **argv)
450{
451 cerr<<
452"\nCreate a PushConsumer to receive events from a channel.\n"
453"syntax: "<<(argc?argv[0]:"pushcons")<<" OPTIONS [CHANNEL_URI]\n"
454"\n"
455"CHANNEL_URI: The event channel may be specified as a URI.\n"
456" This may be an IOR, or a corbaloc::: or corbaname::: URI.\n"
457"\n"
458"OPTIONS: DEFAULT:\n"
459" -d NUM disconnect after receiving NUM events [0 - never disconnect]\n"
460" -s SECS sleep SECS seconds after disconnecting [0]\n"
461" -n NAME channel name (if URI is not specified) [\"EventChannel\"]\n"
462" -h display this help text\n" << endl;
463}
CosNaming::Name str2name(const char *namestr)
Converts stringified name to naming service name.
Definition naming.cc:117
int optind
Definition getopt.cc:82
char * optarg
Definition getopt.cc:83
int getopt(int argc, char *argv[], const char *optionS)
Definition getopt.cc:88
CORBA::ORB_ptr orb
Definition eventf.cc:60
static omni_semaphore connect_cond(0)
int main(int argc, char **argv)
The main process entry point.
Definition pushcons.cc:216
static omni_mutex mutex
Definition pushcons.cc:168
static void usage(int argc, char **argv)
Definition pushcons.cc:449
void push(const CORBA::Any &data)
void disconnect_push_consumer()
Consumer_i(long disconnect=0)
Definition pushcons.cc:174
long _disconnect
Definition pushcons.cc:180