OmniEvents
events.cc
Go to the documentation of this file.
1// -*- Mode: C++; -*-
2// Package : omniEvents
3// events.cc Created : 2004/05/02
4// Author : Alex Tingle
5//
6// Copyright (C) 2004 Alex Tingle
7//
8// This file is part of the omniEvents application.
9//
10// omniEvents is free software; you can redistribute it and/or
11// modify it under the terms of the GNU Lesser General Public
12// License as published by the Free Software Foundation; either
13// version 2.1 of the License, or (at your option) any later version.
14//
15// omniEvents is distributed in the hope that it will be useful,
16// but WITHOUT ANY WARRANTY; without even the implied warranty of
17// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
18// Lesser General Public License for more details.
19//
20// You should have received a copy of the GNU Lesser General Public
21// License along with this library; if not, write to the Free Software
22// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
23//
24// Description:
25// Push Model streamer.
26//
27
28#ifdef HAVE_CONFIG_H
29# include "config.h"
30#endif
31
32#ifdef HAVE_GETOPT
33# include <unistd.h>
34extern char* optarg;
35extern int optind;
36#else
37# include "getopt.h"
38#endif
39
40#ifdef HAVE_IOSTREAM
41# include <iostream>
42#else
43# include <iostream.h>
44#endif
45
46#ifdef HAVE_STD_IOSTREAM
47using namespace std;
48#endif
49
50#ifdef HAVE_STDLIB_H
51# include <stdlib.h>
52#endif
53
54#include <stdio.h>
55
56#if defined HAVE_UNISTD_H
57# include <unistd.h> // read(), write()
58#elif defined __WIN32__
59# include <io.h>
60# define write(fd,buf,count) _write(fd,buf,count)
61# define read(fd,buf,count) _read(fd,buf,count)
62# define ssize_t int
63#endif
64
65#ifdef HAVE_SIGNAL_H
66# include <signal.h>
67#endif
68
69#include "CosEventComm.hh"
70#include "CosEventChannelAdmin.hh"
71#include "naming.h"
72
73#ifndef STDIN_FILENO
74# define STDIN_FILENO 0
75# define STDOUT_FILENO 1
76#endif
77
78CORBA::ORB_ptr orb;
79
80static void usage(int argc, char **argv);
81
82//
83// Time
84//
85
86#define BILLION 1000000000
87
88class Time;
89class Time
90{
91private:
92 CORBA::ULong _sec;
93 CORBA::ULong _nano;
94public:
95 static Time current()
96 {
97 Time result;
98 unsigned long sec,nano;
99 omni_thread::get_time(&sec,&nano);
100 result._sec=sec;
101 result._nano=nano;
102 return result;
103 }
104 static void sleepUntil(const Time& futureTime)
105 {
106 Time now =current();
107 if(now<futureTime)
108 {
109 Time offset=futureTime-now;
110 omni_thread::sleep(offset._sec,offset._nano);
111 }
112 }
113 //
114 Time():_sec(0),_nano(0){}
115 Time(CORBA::ULong sec,CORBA::ULong nano):_sec(sec),_nano(nano){}
116 Time(const Time& right):_sec(right._sec),_nano(right._nano){}
117 Time& operator=(const Time& right)
118 {
119 if(this!=&right)
120 {
121 _sec =right._sec;
122 _nano=right._nano;
123 }
124 return *this;
125 }
126 bool operator<(const Time& right) const
127 {
128 if(_sec==right._sec)
129 return _nano<right._nano;
130 else
131 return _sec<right._sec;
132 }
133 Time& operator+=(const Time& right)
134 {
135 _sec +=right._sec;
136 _nano+=right._nano;
137 if(_nano>BILLION)
138 {
140 ++_sec;
141 }
142 return *this;
143 }
144 Time operator+(const Time& right) const
145 {
146 Time result(*this);
147 result+=right;
148 return result;
149 }
150 Time& operator-=(const Time& right)
151 {
152 if(operator<(right))
153 {
154 cerr<<"Negative time!"<<endl;
155 throw CORBA::BAD_PARAM();
156 }
157 _sec-=right._sec;
158 if(_nano<right._nano)
159 {
160 _nano+=BILLION;
161 --_sec;
162 }
163 _nano-=right._nano;
164 return *this;
165 }
166 Time operator-(const Time& right) const
167 {
168 Time result(*this);
169 result-=right;
170 return result;
171 }
172 void operator>>=(cdrMemoryStream& s) const
173 {
174 _sec>>=s;
175 _nano>>=s;
176 }
177 void operator<<=(cdrMemoryStream& s)
178 {
179 _sec<<=s;
180 _nano<<=s;
181 }
182 bool is_nil() const { return(_sec==0 && _nano==0); }
183}; // end class Time
184
185
186//
187// Consumer_i
188//
189
190class Consumer_i : virtual public POA_CosEventComm::PushConsumer
191{
192public:
193 Consumer_i(long disconnect=0): _memstream() {}
194 void push(const CORBA::Any& data)
195 {
196 // Record the event timestamp.
197 Time now=Time::current();
198 now>>=_memstream;
199 // stream event data.
200 data>>=_memstream;
201 // Write to file.
202 write(STDOUT_FILENO,_memstream.bufPtr(),_memstream.bufSize());
203 // Reset.
204 _memstream.rewindPtrs();
205 }
207 {
208 cout<<"disconnected"<<endl;
209 orb->shutdown(0);
210 }
212 CosEventChannelAdmin::EventChannel_ptr channel,
213 const char*& action)
214 {
215 action="get ConsumerAdmin";
216 CosEventChannelAdmin::ConsumerAdmin_var consumer_admin =
217 channel->for_consumers();
218
219 action="get ProxyPushSupplier";
220 CosEventChannelAdmin::ProxyPushSupplier_var proxy_supplier =
221 consumer_admin->obtain_push_supplier();
222
223 action="connect to ProxyPushSupplier";
224 proxy_supplier->connect_push_consumer(_this());
225 }
226private:
227 cdrMemoryStream _memstream;
228};
229
230
231//
232// Supplier_i
233//
234
235class Supplier_i : virtual public POA_CosEventComm::PushSupplier
236{
237public:
240 {
241 cout<<"disconnected"<<endl;
242 _connected=false;
243 }
244 void supply(
245 CosEventChannelAdmin::EventChannel_ptr channel,
246 const char*& action)
247 {
248 action="get SupplierAdmin";
249 CosEventChannelAdmin::SupplierAdmin_var supplier_admin =
250 channel->for_suppliers();
251
252 action="get ProxyPushConsumer";
253 CosEventChannelAdmin::ProxyPushConsumer_var proxy_consumer =
254 supplier_admin->obtain_push_consumer();
255
256 action="connect to ProxyPushConsumer";
257 proxy_consumer->connect_push_supplier(_this());
258
259 char buf[1024];
260 ssize_t len;
261 action="read standard input";
262 // Stream start time (seconds,nanoseconds)
263 Time offsetTime;
264 while(_connected && (len=read(STDIN_FILENO,buf,1024)))
265 {
266 CORBA::Any any;
267 cdrMemoryStream memstr;
268 action="put_octet_array";
269 memstr.put_octet_array( (_CORBA_Octet*)buf, (int)len );
270 while(_connected && memstr.currentInputPtr()<memstr.bufSize())
271 {
272 action="unmarshal";
273 Time eventTime;
274 eventTime<<=memstr;
275 any<<=memstr;
276
277 if(offsetTime.is_nil()) // first time special.
278 offsetTime=Time::current()-eventTime;
279 Time::sleepUntil(eventTime+offsetTime);
280
281 action="push";
282 proxy_consumer->push(any);
283 }
284 }
285 }
286private:
288};
289
290
291//
292// main()
293//
294
295int main(int argc, char **argv)
296{
297 //
298 // Start orb.
299#if defined(HAVE_OMNIORB4)
300 orb=CORBA::ORB_init(argc,argv,"omniORB4");
301#else
302 orb=CORBA::ORB_init(argc,argv,"omniORB3");
303#endif
304
305 // Process Options
306 bool supplierMode =false;
307 const char* channelName ="EventChannel";
308
309 int c;
310 while ((c = getopt(argc,argv,"shn:")) != EOF)
311 {
312 switch (c)
313 {
314 case 's': supplierMode=true;
315 break;
316
317 case 'n': channelName = optarg;
318 break;
319
320 case 'h': usage(argc,argv);
321 exit(0);
322 default : usage(argc,argv);
323 exit(-1);
324 }
325 }
326
327#if defined(HAVE_SIGNAL_H) && defined(SIGPIPE)
328 // Ignore broken pipes
329 signal(SIGPIPE, SIG_IGN);
330#endif
331
332 const char* action=""; // Use this variable to help report errors.
333 try {
334 CORBA::Object_var obj;
335
336 action="resolve initial reference 'RootPOA'";
337 obj=orb->resolve_initial_references("RootPOA");
338 PortableServer::POA_var rootPoa =PortableServer::POA::_narrow(obj);
339 if(CORBA::is_nil(rootPoa))
340 throw CORBA::OBJECT_NOT_EXIST();
341
342 action="activate the RootPOA's POAManager";
343 PortableServer::POAManager_var pman =rootPoa->the_POAManager();
344 pman->activate();
345
346 //
347 // Obtain object reference to EventChannel
348 // (from command-line argument or from the Naming Service).
349 if(optind<argc)
350 {
351 action="convert URI from command line into object reference";
352 obj=orb->string_to_object(argv[optind]);
353 }
354 else
355 {
356 action="resolve initial reference 'NameService'";
357 obj=orb->resolve_initial_references("NameService");
358 CosNaming::NamingContext_var rootContext=
359 CosNaming::NamingContext::_narrow(obj);
360 if(CORBA::is_nil(rootContext))
361 throw CORBA::OBJECT_NOT_EXIST();
362
363 action="find EventChannel in NameService";
364 cout << action << endl;
365 obj=rootContext->resolve(str2name(channelName));
366 }
367
368 action="narrow object reference to event channel";
369 CosEventChannelAdmin::EventChannel_var channel =
370 CosEventChannelAdmin::EventChannel::_narrow(obj);
371 if(CORBA::is_nil(channel))
372 {
373 cerr << "Failed to narrow Event Channel reference." << endl;
374 exit(1);
375 }
376
377 if(supplierMode)
378 {
379 action="construct PushSupplier";
380 Supplier_i* supplier =new Supplier_i();
381 supplier->supply(channel,action);
382 }
383 else
384 {
385 action="construct PushConsumer";
386 Consumer_i* consumer =new Consumer_i();
387 consumer->consume(channel,action);
388
389 action="run ORB";
390 orb->run();
391 }
392
393 return 0;
394
395 }
396 catch(CORBA::ORB::InvalidName& ex) { // resolve_initial_references
397 cerr<<"Failed to "<<action<<". ORB::InvalidName"<<endl;
398 }
399 catch(CosNaming::NamingContext::InvalidName& ex) { // resolve
400 cerr<<"Failed to "<<action<<". NamingContext::InvalidName"<<endl;
401 }
402 catch(CosNaming::NamingContext::NotFound& ex) { // resolve
403 cerr<<"Failed to "<<action<<". NamingContext::NotFound"<<endl;
404 }
405 catch(CosNaming::NamingContext::CannotProceed& ex) { // resolve
406 cerr<<"Failed to "<<action<<". NamingContext::CannotProceed"<<endl;
407 }
408 catch(CORBA::TRANSIENT& ex) { // _narrow()
409 cerr<<"Failed to "<<action<<". TRANSIENT"<<endl;
410 }
411 catch(CORBA::OBJECT_NOT_EXIST& ex) { // _narrow()
412 cerr<<"Failed to "<<action<<". OBJECT_NOT_EXIST"<<endl;
413 }
414 catch(CORBA::SystemException& ex) {
415 cerr<<"Failed to "<<action<<"."
416#if defined(HAVE_OMNIORB4)
417 " "<<ex._name()<<" ("<<ex.NP_minorString()<<")"
418#endif
419 <<endl;
420 }
421 catch(CORBA::Exception& ex) {
422 cerr<<"Failed to "<<action<<"."
423#if defined(HAVE_OMNIORB4)
424 " "<<ex._name()
425#endif
426 <<endl;
427 }
428
429 return 1;
430}
431
432static void usage(int argc, char **argv)
433{
434 cerr<<
435"\nStream events from a channel to stdout, or (-s) from stdin to a channel.\n"
436"syntax: "<<(argc?argv[0]:"events")<<" OPTIONS [CHANNEL_URI]\n"
437"\n"
438"CHANNEL_URI: The event channel may be specified as a URI.\n"
439" This may be an IOR, or a corbaloc::: or corbaname::: URI.\n"
440"\n"
441"OPTIONS: DEFAULT:\n"
442" -s supply mode. Read events from stdin.\n"
443" -n NAME channel name (if URI is not specified) [\"EventChannel\"]\n"
444" -h display this help text\n" << endl;
445}
CosNaming::Name str2name(const char *namestr)
Converts stringified name to naming service name.
Definition naming.cc:117
int optind
Definition getopt.cc:82
char * optarg
Definition getopt.cc:83
int getopt(int argc, char *argv[], const char *optionS)
Definition getopt.cc:88
CORBA::ORB_ptr orb
Definition eventf.cc:60
#define BILLION
Definition events.cc:86
int main(int argc, char **argv)
The main process entry point.
Definition events.cc:295
CORBA::ORB_ptr orb
Definition events.cc:78
#define STDOUT_FILENO
Definition events.cc:75
static void usage(int argc, char **argv)
Definition events.cc:432
#define STDIN_FILENO
Definition events.cc:74
Definition events.cc:90
bool is_nil() const
Definition events.cc:182
bool operator<(const Time &right) const
Definition events.cc:126
static Time current()
Definition events.cc:95
Time & operator-=(const Time &right)
Definition events.cc:150
Time(const Time &right)
Definition events.cc:116
Time operator+(const Time &right) const
Definition events.cc:144
Time()
Definition events.cc:114
CORBA::ULong _nano
Definition events.cc:93
Time(CORBA::ULong sec, CORBA::ULong nano)
Definition events.cc:115
static void sleepUntil(const Time &futureTime)
Definition events.cc:104
Time & operator=(const Time &right)
Definition events.cc:117
Time operator-(const Time &right) const
Definition events.cc:166
void operator>>=(cdrMemoryStream &s) const
Definition events.cc:172
Time & operator+=(const Time &right)
Definition events.cc:133
void operator<<=(cdrMemoryStream &s)
Definition events.cc:177
CORBA::ULong _sec
Definition events.cc:92
void consume(CosEventChannelAdmin::EventChannel_ptr channel, const char *&action)
Definition events.cc:211
void push(const CORBA::Any &data)
Definition events.cc:194
cdrMemoryStream _memstream
Definition events.cc:227
void disconnect_push_consumer()
Definition events.cc:206
Consumer_i(long disconnect=0)
Definition events.cc:193
void disconnect_push_supplier()
Definition events.cc:239
void supply(CosEventChannelAdmin::EventChannel_ptr channel, const char *&action)
Definition events.cc:244
bool _connected
Definition events.cc:287