OmniEvents
pullsupp.cc
Go to the documentation of this file.
1// -*- Mode: C++; -*-
2// Package : omniEvents
3// pullsupp.cc Created : 1/4/98
4// Author : Paul Nader (pwn)
5//
6// Copyright (C) 1998 Paul Nader, 2003-2004 Alex Tingle
7//
8// This file is part of the omniEvents application.
9//
10// omniEvents is free software; you can redistribute it and/or
11// modify it under the terms of the GNU Lesser General Public
12// License as published by the Free Software Foundation; either
13// version 2.1 of the License, or (at your option) any later version.
14//
15// omniEvents is distributed in the hope that it will be useful,
16// but WITHOUT ANY WARRANTY; without even the implied warranty of
17// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
18// Lesser General Public License for more details.
19//
20// You should have received a copy of the GNU Lesser General Public
21// License along with this library; if not, write to the Free Software
22// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
23//
24// Description:
25// Pull Model supplier implementation.
26//
27
28/*
29 $Log: pullsupp.cc,v $
30 Revision 1.9 2004/10/08 09:06:08 alextingle
31 More robust exception minor code handling.
32
33 Revision 1.8 2004/08/18 17:49:45 alextingle
34 Added check for SIGPIPE before trying to use it.
35
36 Revision 1.7 2004/08/06 16:19:23 alextingle
37 -k & -K options removed.
38 Naming service names may now be as complex as you like.
39
40 Revision 1.6 2004/04/20 16:52:05 alextingle
41 All examples updated for latest version on omniEvents. Server may now be
42 specified as a 'corbaloc' string or IOR, instead of as naming service id/kind.
43
44 Revision 1.5 2004/02/04 22:29:55 alextingle
45 Reworked all C++ examples.
46 Removed catch(...) as it tends to make it harder to see what's going on.
47 Now uses POA instead of BOA.
48 Uses omniORB4's Exception name probing.
49 No longer uses 'naming.h/cc' utility code.
50
51 Revision 1.4 2003/11/17 08:47:09 alextingle
52 Corrected typo.
53
54 Revision 1.2 2003/11/16 23:24:22 alex
55 Corrected typo.
56
57 Revision 1.1.1.1 2003/11/11 22:28:56 alex
58 Import of testing 2.3.0
59
60 Revision 1.3 2003/11/03 22:20:26 alextingle
61 Removed all platform specific switches. Now uses autoconf, config.h.
62 Removed stub header in order to allow makefile dependency checking to work
63 correctly.
64
65 Revision 1.1.1.1.2.1 2002/09/28 22:20:51 shamus13
66 Added ifdefs to enable omniEvents to compile
67 with both omniORB3 and omniORB4. If __OMNIORB4__
68 is defined during compilation, omniORB4 headers
69 and command line option syntax is used, otherwise
70 fall back to omniORB3 style.
71
72 Revision 1.1.1.1 2002/09/25 19:00:26 shamus13
73 Import of OmniEvents source tree from release 2.1.1
74
75 Revision 0.11 2000/08/30 04:39:48 naderp
76 Port to omniORB 3.0.1.
77
78 Revision 0.10 2000/03/16 05:37:27 naderp
79 Added stdlib.h for getopt.
80
81 Revision 0.9 2000/03/06 13:26:32 naderp
82 Using util getRootNamingContext function.
83 Using stub headers.
84 Fixed error messages.
85
86 Revision 0.8 2000/03/02 03:16:26 naderp
87 Added retry resiliency for handling COMM_FAUILURE exceptions.
88 Replaced condition variable by counting semaphore.
89
90 Revision 0.7 1999/11/02 13:39:13 naderp
91 Added <signal.h>
92
93 Revision 0.6 1999/11/02 07:57:22 naderp
94 Updated usage.
95
96Revision 0.5 99/11/01 16:10:12 16:10:12 naderp (Paul Nader)
97omniEvents 2.0 Release.
98Ignoring SIGPIPE for UNIX platforms.
99
100Revision 0.4 99/04/23 16:05:44 16:05:44 naderp (Paul Nader)
101gcc port.
102
103Revision 0.3 99/04/23 09:34:01 09:34:01 naderp (Paul Nader)
104Windows Port.
105
106Revision 0.2 99/04/21 18:06:25 18:06:25 naderp (Paul Nader)
107*** empty log message ***
108
109Revision 0.1.1.1 98/11/27 16:59:33 16:59:33 naderp (Paul Nader)
110Added -s option to sleep after disconnecting.
111
112Revision 0.1 98/11/25 14:08:07 14:08:07 naderp (Paul Nader)
113Initial Revision
114
115*/
116
117#ifdef HAVE_CONFIG_H
118# include "config.h"
119#endif
120
121#ifdef HAVE_GETOPT
122# include <unistd.h>
123extern char* optarg;
124extern int optind;
125#else
126# include "getopt.h"
127#endif
128
129#ifdef HAVE_IOSTREAM
130# include <iostream>
131#else
132# include <iostream.h>
133#endif
134
135#ifdef HAVE_STD_IOSTREAM
136using namespace std;
137#endif
138
139#ifdef HAVE_STDLIB_H
140# include <stdlib.h>
141#endif
142
143#ifdef HAVE_SIGNAL_H
144# include <signal.h>
145#endif
146
147#include <cstdio>
148
149#include "CosEventComm.hh"
150#include "CosEventChannelAdmin.hh"
151#include "naming.h"
152
153static omni_semaphore connect_cond(0);
154static void usage(int argc, char **argv);
155
156class Supplier_i : virtual public POA_CosEventComm::PullSupplier {
157public:
158 Supplier_i (long disconnect = 0) : i(0), _disconnect(disconnect), l(0) {};
159 CORBA::Any *pull();
160 CORBA::Any *try_pull(CORBA::Boolean &has_event);
162
163private:
164 long i;
166 CORBA::ULong l;
167};
168
169void
171 cout << "Pull Supplier: disconnected by channel." << endl;
172}
173
174CORBA::Any *
176 cout << "Pull Supplier: pull() called. Data : ";
177 CORBA::Any *any = new CORBA::Any();
178 *any <<= l++;
179 cout << l-1 << endl;
180
181 // Exercise Disconnect
182 if ((_disconnect > 0) && (i == _disconnect)) {
183 i = 0;
184 // Signal main thread to disconnect and re-connect.
185 connect_cond.post();
186 }
187 i++;
188 return (any);
189}
190
191CORBA::Any *
192Supplier_i::try_pull(CORBA::Boolean &has_event)
193{
194 cout << "Pull Supplier: try_pull() called. Data : ";
195 CORBA::Any *any = new CORBA::Any();
196 *any <<= l++;
197 cout << l-1 << endl;
198 has_event = 1;
199
200 // Exercise Disconnect
201 if ((_disconnect > 0) && (i == _disconnect)) {
202 i = 0;
203 // Signal main thread to disconnect and re-connect.
204 connect_cond.post();
205 }
206 i++;
207 return (any);
208}
209
210
211int
212main (int argc, char** argv)
213{
214#if defined(HAVE_OMNIORB4)
215 CORBA::ORB_var orb =CORBA::ORB_init(argc,argv,"omniORB4");
216#else
217 CORBA::ORB_var orb =CORBA::ORB_init(argc,argv,"omniORB3");
218#endif
219
220 // Process Options
221 int discnum =0;
222 int sleepInterval =0;
223 const char* channelName ="EventChannel";
224
225 int c;
226 while ((c = getopt(argc,argv,"d:s:n:h")) != EOF)
227 {
228 switch (c)
229 {
230 case 'd': discnum = atoi(optarg);
231 break;
232
233 case 's': sleepInterval = atoi(optarg);
234 break;
235
236 case 'n': channelName = optarg;
237 break;
238
239 case 'h':
240 default : usage(argc,argv);
241 exit(-1);
242 break;
243 }
244 }
245
246#if defined(HAVE_SIGNAL_H) && defined(SIGPIPE)
247 // Ignore broken pipes
248 signal(SIGPIPE, SIG_IGN);
249#endif
250
251 Supplier_i* supplier = new Supplier_i (discnum);
252 CosEventChannelAdmin::EventChannel_var channel;
253
254 const char* action=""; // Use this variable to help report errors.
255 try {
256 CORBA::Object_var obj;
257
258 action="resolve initial reference 'RootPOA'";
259 obj=orb->resolve_initial_references("RootPOA");
260 PortableServer::POA_var rootPoa =PortableServer::POA::_narrow(obj);
261 if(CORBA::is_nil(rootPoa))
262 throw CORBA::OBJECT_NOT_EXIST();
263
264 action="activate the RootPOA's POAManager";
265 PortableServer::POAManager_var pman =rootPoa->the_POAManager();
266 pman->activate();
267
268 //
269 // Obtain object reference to EventChannel
270 // (from command-line argument or from the Naming Service).
271 if(optind<argc)
272 {
273 action="convert URI from command line into object reference";
274 obj=orb->string_to_object(argv[optind]);
275 }
276 else
277 {
278 action="resolve initial reference 'NameService'";
279 obj=orb->resolve_initial_references("NameService");
280 CosNaming::NamingContext_var rootContext=
281 CosNaming::NamingContext::_narrow(obj);
282 if(CORBA::is_nil(rootContext))
283 throw CORBA::OBJECT_NOT_EXIST();
284
285 action="find EventChannel in NameService";
286 cout << action << endl;
287 obj=rootContext->resolve(str2name(channelName));
288 }
289
290 action="narrow object reference to event channel";
291 channel=CosEventChannelAdmin::EventChannel::_narrow(obj);
292 if(CORBA::is_nil(channel))
293 {
294 cerr << "Failed to narrow Event Channel reference." << endl;
295 exit(1);
296 }
297
298 }
299 catch(CORBA::ORB::InvalidName& ex) { // resolve_initial_references
300 cerr<<"Failed to "<<action<<". ORB::InvalidName"<<endl;
301 exit(1);
302 }
303 catch(CosNaming::NamingContext::InvalidName& ex) { // resolve
304 cerr<<"Failed to "<<action<<". NamingContext::InvalidName"<<endl;
305 exit(1);
306 }
307 catch(CosNaming::NamingContext::NotFound& ex) { // resolve
308 cerr<<"Failed to "<<action<<". NamingContext::NotFound"<<endl;
309 exit(1);
310 }
311 catch(CosNaming::NamingContext::CannotProceed& ex) { // resolve
312 cerr<<"Failed to "<<action<<". NamingContext::CannotProceed"<<endl;
313 exit(1);
314 }
315 catch(CORBA::TRANSIENT& ex) { // _narrow()
316 cerr<<"Failed to "<<action<<". TRANSIENT"<<endl;
317 exit(1);
318 }
319 catch(CORBA::OBJECT_NOT_EXIST& ex) { // _narrow()
320 cerr<<"Failed to "<<action<<". OBJECT_NOT_EXIST"<<endl;
321 exit(1);
322 }
323 catch(CORBA::SystemException& ex) {
324 cerr<<"Failed to "<<action<<".";
325#if defined(HAVE_OMNIORB4)
326 cerr<<" "<<ex._name();
327 if(ex.NP_minorString())
328 cerr<<" ("<<ex.NP_minorString()<<")";
329#endif
330 cerr<<endl;
331 exit(1);
332 }
333 catch(CORBA::Exception& ex) {
334 cerr<<"Failed to "<<action<<"."
335#if defined(HAVE_OMNIORB4)
336 " "<<ex._name()
337#endif
338 <<endl;
339 exit(1);
340 }
341
342 //
343 // Get Supplier Admin interface - retrying on Comms Failure.
344 CosEventChannelAdmin::SupplierAdmin_var supplier_admin;
345 while (1)
346 {
347 try {
348 supplier_admin = channel->for_suppliers ();
349 if (CORBA::is_nil(supplier_admin))
350 {
351 cerr << "Event Channel returned nil Supplier Admin!"
352 << endl;
353 exit(1);
354 }
355 break;
356 }
357 catch (CORBA::COMM_FAILURE& ex) {
358 cerr << "Caught COMM_FAILURE exception "
359 << "obtaining Supplier Admin! Retrying..."
360 << endl;
361 continue;
362 }
363 }
364 cout << "Obtained SupplierAdmin." << endl;
365
366 while (1)
367 {
368 //
369 // Get proxy consumer - retrying on Comms Failure.
370 CosEventChannelAdmin::ProxyPullConsumer_var proxy_consumer;
371 while (1)
372 {
373 try {
374 proxy_consumer = supplier_admin->obtain_pull_consumer ();
375 if (CORBA::is_nil(proxy_consumer))
376 {
377 cerr << "Supplier Admin returned nil proxy_consumer!"
378 << endl;
379 exit(1);
380 }
381 break;
382 }
383 catch (CORBA::COMM_FAILURE& ex) {
384 cerr << "Caught COMM_FAILURE exception "
385 << "obtaining Proxy Pull Consumer! Retrying..."
386 << endl;
387 continue;
388 }
389 }
390 cout << "Obtained ProxyPullConsumer." << endl;
391
392 // Connect Pull Supplier - retrying on Comms Failure.
393 CosEventComm::PullSupplier_var supplierRef =supplier->_this();
394 while (1)
395 {
396 try {
397 proxy_consumer->connect_pull_supplier(supplierRef.in());
398 break;
399 }
400 catch (CORBA::BAD_PARAM& ex) {
401 cerr<<"Caught BAD_PARAM Exception connecting Pull Supplier!"<<endl;
402 exit(1);
403 }
404 catch (CosEventChannelAdmin::AlreadyConnected& ex) {
405 cerr << "Pull Supplier already connected!"
406 << endl;
407 break;
408 }
409 catch (CORBA::COMM_FAILURE& ex) {
410 cerr << "Caught COMM_FAILURE exception "
411 << "connecting Pull Supplier! Retrying..."
412 << endl;
413 continue;
414 }
415 }
416 cout << "Connected Pull Supplier." << endl;
417
418 // Wait for indication to disconnect before re-connecting.
419 connect_cond.wait();
420
421 // Disconnect - retrying on Comms Failure.
422 while (1)
423 {
424 try {
425 proxy_consumer->disconnect_pull_consumer();
426 break;
427 }
428 catch (CORBA::COMM_FAILURE& ex) {
429 cerr << "Caught COMM_FAILURE exception "
430 << "disconnecting Pull Supplier! Retrying..."
431 << endl;
432 continue;
433 }
434 }
435 cout << "Disconnected Pull Supplier." << endl;
436
437 // Yawn.
438 cout << "Sleeping " << sleepInterval << " seconds." << endl;
439 omni_thread::sleep(sleepInterval);
440 }
441
442 // Not Reached
443 return 0;
444}
445
446static void
447usage(int argc, char **argv)
448{
449 cerr<<
450"\nCreate a PullSupplier to send events to a channel.\n"
451"syntax: "<<(argc?argv[0]:"pullsupp")<<" OPTIONS [CHANNEL_URI]\n"
452"\n"
453"CHANNEL_URI: The event channel may be specified as a URI.\n"
454" This may be an IOR, or a corbaloc::: or corbaname::: URI.\n"
455"\n"
456"OPTIONS: DEFAULT:\n"
457" -d NUM disconnect after sending NUM events [0 - never disconnect]\n"
458" -s SECS sleep SECS seconds after disconnecting [0]\n"
459" -n NAME channel name (if URI is not specified) [\"EventChannel\"]\n"
460" -h display this help text\n" << endl;
461}
CosNaming::Name str2name(const char *namestr)
Converts stringified name to naming service name.
Definition naming.cc:117
int optind
Definition getopt.cc:82
char * optarg
Definition getopt.cc:83
int getopt(int argc, char *argv[], const char *optionS)
Definition getopt.cc:88
CORBA::ORB_ptr orb
Definition eventf.cc:60
int main(int argc, char **argv)
The main process entry point.
Definition pullsupp.cc:212
static void usage(int argc, char **argv)
Definition pullsupp.cc:447
static omni_semaphore connect_cond(0)
long _disconnect
Definition pullsupp.cc:165
Supplier_i(long disconnect=0)
Definition pullsupp.cc:158
CORBA::Any * pull()
Definition pullsupp.cc:175
CORBA::Any * try_pull(CORBA::Boolean &has_event)
Definition pullsupp.cc:192
CORBA::ULong l
Definition pullsupp.cc:166
void disconnect_pull_supplier()
Definition pullsupp.cc:170