00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040
00041
00042
00043 #include "gear_sender.hh"
00044 #include <unistd.h>
00045
00046 #ifdef NS_DIFFUSION
00047 static class GearSenderAppClass : public TclClass {
00048 public:
00049 GearSenderAppClass() : TclClass("Application/DiffApp/GearSenderApp") {}
00050 TclObject * create(int argc, const char*const* argv) {
00051 return (new GearSenderApp());
00052 }
00053 } class_gear_sender_app_class;
00054
00055 void GearSendDataTimer::expire(Event *e) {
00056 a_->send();
00057 }
00058
00059 void GearSenderApp::send()
00060 {
00061 struct timeval tmv;
00062 int retval;
00063
00064
00065 if ((num_subscriptions_ > 0) || using_push_)
00066 {
00067
00068 GetTime(&tmv);
00069 lastEventTime_->seconds_ = tmv.tv_sec;
00070 lastEventTime_->useconds_ = tmv.tv_usec;
00071
00072
00073 DiffPrint(DEBUG_ALWAYS, "Node%d: Sending Data %d\n", ((DiffusionRouting *)dr_)->getNodeId(), last_seq_sent_);
00074 retval = dr_->send(pubHandle_, &data_attr_);
00075
00076
00077 last_seq_sent_++;
00078 counterAttr_->setVal(last_seq_sent_);
00079 }
00080
00081
00082 sdt_.resched(SEND_DATA_INTERVAL);
00083 }
00084
00085 int GearSenderApp::command(int argc, const char*const* argv) {
00086 if (argc == 2) {
00087 if (strcmp(argv[1], "publish") == 0) {
00088 run();
00089 return TCL_OK;
00090 }
00091 }
00092 else if (argc >= 6) {
00093
00094 if (strcmp(argv[1], "push-pull-options") == 0) {
00095 if (strcmp(argv[2], "push") == 0)
00096 using_push_ = true;
00097 else
00098 using_push_ = false;
00099
00100 if (strcmp(argv[3], "point") == 0)
00101 {
00102 using_points_ = true;
00103 if (argv[4] != NULL && argv[5] != NULL)
00104 {
00105 lat_pt_ = atoi(argv[4]);
00106 long_pt_ = atoi(argv[5]);
00107 }
00108 }
00109 else
00110 {
00111 using_points_ = false;
00112 if (argv[4] != NULL && argv[5] != NULL && argv[6] != NULL && argv[7] != NULL)
00113 {
00114 lat_min_ = atoi(argv[4]);
00115 lat_max_ = atoi(argv[5]);
00116 long_min_ = atoi(argv[6]);
00117 long_max_ = atoi(argv[7]);
00118 }
00119 }
00120 return TCL_OK;
00121 }
00122 }
00123 return DiffApp::command(argc, argv);
00124 }
00125
00126 #endif //NS_DIFFUSION
00127
00128 void GearSenderReceive::recv(NRAttrVec *data, NR::handle my_handle)
00129 {
00130 app_->recv(data, my_handle);
00131 }
00132
00133 void GearSenderApp::recv(NRAttrVec *data, NR::handle my_handle)
00134 {
00135 NRSimpleAttribute<int> *nrclass = NULL;
00136
00137 nrclass = NRClassAttr.find(data);
00138
00139 if (!nrclass){
00140 DiffPrint(DEBUG_ALWAYS, "Received a BAD packet !\n");
00141 return;
00142 }
00143
00144 switch (nrclass->getVal()){
00145
00146 case NRAttribute::INTEREST_CLASS:
00147
00148 DiffPrint(DEBUG_ALWAYS, "Received an Interest message !\n");
00149 num_subscriptions_++;
00150 break;
00151
00152 case NRAttribute::DISINTEREST_CLASS:
00153
00154 DiffPrint(DEBUG_ALWAYS, "Received a Disinterest message !\n");
00155 num_subscriptions_--;
00156 break;
00157
00158 default:
00159
00160 DiffPrint(DEBUG_ALWAYS, "Received an unknown message (%d)!\n", nrclass->getVal());
00161 break;
00162
00163 }
00164 }
00165
00166 handle GearSenderApp::setupSubscription()
00167 {
00168 NRAttrVec attrs;
00169
00170 attrs.push_back(NRClassAttr.make(NRAttribute::NE,
00171 NRAttribute::DATA_CLASS));
00172 attrs.push_back(NRScopeAttr.make(NRAttribute::IS,
00173 NRAttribute::NODE_LOCAL_SCOPE));
00174 attrs.push_back(GearTargetAttr.make(NRAttribute::EQ, "F117A"));
00175 attrs.push_back(LatitudeAttr.make(NRAttribute::IS, lat_pt_));
00176 attrs.push_back(LongitudeAttr.make(NRAttribute::IS, long_pt_));
00177
00178 handle h = dr_->subscribe(&attrs, mr_);
00179
00180 ClearAttrs(&attrs);
00181
00182 return h;
00183 }
00184
00185 handle GearSenderApp::setupPublication()
00186 {
00187 NRAttrVec attrs;
00188
00189 attrs.push_back(NRClassAttr.make(NRAttribute::IS,
00190 NRAttribute::DATA_CLASS));
00191
00192 if (using_push_){
00193 attrs.push_back(NRScopeAttr.make(NRAttribute::IS,
00194 NRAttribute::GLOBAL_SCOPE));
00195
00196
00197 if (using_points_){
00198 attrs.push_back(LatitudeAttr.make(NRAttribute::EQ, lat_pt_));
00199 attrs.push_back(LongitudeAttr.make(NRAttribute::EQ, long_pt_));
00200 }
00201 else{
00202 attrs.push_back(LatitudeAttr.make(NRAttribute::GE, lat_min_));
00203 attrs.push_back(LatitudeAttr.make(NRAttribute::LE, lat_max_));
00204 attrs.push_back(LongitudeAttr.make(NRAttribute::GE, long_min_));
00205 attrs.push_back(LongitudeAttr.make(NRAttribute::LE, long_max_));
00206 }
00207 }
00208 else{
00209 attrs.push_back(NRScopeAttr.make(NRAttribute::IS,
00210 NRAttribute::NODE_LOCAL_SCOPE));
00211
00212
00213 attrs.push_back(LatitudeAttr.make(NRAttribute::IS, lat_pt_));
00214 attrs.push_back(LongitudeAttr.make(NRAttribute::IS, long_pt_));
00215 }
00216
00217 attrs.push_back(GearTargetAttr.make(NRAttribute::IS, "F117A"));
00218
00219 handle h = dr_->publish(&attrs);
00220
00221 ClearAttrs(&attrs);
00222
00223 return h;
00224 }
00225
00226 void GearSenderApp::run()
00227 {
00228 struct timeval tmv;
00229 #ifndef NS_DIFFUSION
00230 int retval;
00231 #endif // !NS_DIFFUSION
00232
00233 #ifdef INTERACTIVE
00234 char input;
00235 fd_set FDS;
00236 #endif // INTERATIVE
00237
00238
00239 if (!using_push_)
00240 subHandle_ = setupSubscription();
00241 pubHandle_ = setupPublication();
00242
00243
00244 GetTime(&tmv);
00245 lastEventTime_ = new EventTime;
00246 lastEventTime_->seconds_ = tmv.tv_sec;
00247 lastEventTime_->useconds_ = tmv.tv_usec;
00248 timeAttr_ = GearTimeAttr.make(NRAttribute::IS, (void *) &lastEventTime_,
00249 sizeof(EventTime));
00250 data_attr_.push_back(timeAttr_);
00251
00252
00253 delete lastEventTime_;
00254 lastEventTime_ = (EventTime *) timeAttr_->getVal();
00255
00256
00257 counterAttr_ = GearCounterAttr.make(NRAttribute::IS, last_seq_sent_);
00258 data_attr_.push_back(counterAttr_);
00259
00260 #ifndef NS_DIFFUSION
00261
00262 while(1){
00263 #ifdef INTERACTIVE
00264 FD_SET(0, &FDS);
00265 fprintf(stdout, "Press <Enter> to send a ping probe...");
00266 fflush(NULL);
00267 select(1, &FDS, NULL, NULL, NULL);
00268 input = getc(stdin);
00269 #else
00270 sleep(SEND_DATA_INTERVAL);
00271 #endif // INTERACTIVE
00272
00273
00274 if ((num_subscriptions_ > 0) || using_push_){
00275
00276 GetTime(&tmv);
00277 lastEventTime_->seconds_ = tmv.tv_sec;
00278 lastEventTime_->useconds_ = tmv.tv_usec;
00279
00280
00281 DiffPrint(DEBUG_ALWAYS, "Node%d: Sending Data %d\n", ((DiffusionRouting *)dr_)->getNodeId(), last_seq_sent_);
00282 retval = dr_->send(pubHandle_, &data_attr_);
00283
00284
00285 last_seq_sent_++;
00286 counterAttr_->setVal(last_seq_sent_);
00287 }
00288 }
00289 #else
00290 send();
00291 #endif // !NS_DIFFUSION
00292 }
00293
00294 void GearSenderApp::usage(char *s){
00295 DiffPrint(DEBUG_ALWAYS, "Usage: %s [-d debug] [-p port] [-s] [-r] [-h]\n\n", s);
00296 DiffPrint(DEBUG_ALWAYS, "\t-d - Sets debug level (0-10)\n");
00297 DiffPrint(DEBUG_ALWAYS, "\t-p - Uses port 'port' to talk to diffusion\n");
00298 DiffPrint(DEBUG_ALWAYS, "\t-s - Uses push semantics (default: pull)\n");
00299 DiffPrint(DEBUG_ALWAYS, "\t-r - Uses regions (default: points)\n");
00300 DiffPrint(DEBUG_ALWAYS, "\t-h - Prints this information\n");
00301 DiffPrint(DEBUG_ALWAYS, "\n");
00302 exit(0);
00303 }
00304
00305 void GearSenderApp::parseCommandLine(int argc, char **argv)
00306 {
00307 u_int16_t diff_port = DEFAULT_DIFFUSION_PORT;
00308 int debug_level;
00309 int opt;
00310
00311 config_file_ = NULL;
00312 using_points_ = true;
00313 using_push_ = false;
00314 opterr = 0;
00315
00316 while (1){
00317 opt = getopt(argc, argv, "srhd:p:");
00318 switch (opt){
00319
00320 case 'p':
00321
00322 diff_port = (u_int16_t) atoi(optarg);
00323 if ((diff_port < 1024) || (diff_port >= 65535)){
00324 DiffPrint(DEBUG_ALWAYS, "Error: Diffusion port must be between 1024 and 65535 !\n");
00325 exit(-1);
00326 }
00327
00328 break;
00329
00330 case 'h':
00331
00332 usage(argv[0]);
00333
00334 break;
00335
00336 case 'd':
00337
00338 debug_level = atoi(optarg);
00339
00340 if (debug_level < 1 || debug_level > 10){
00341 DiffPrint(DEBUG_ALWAYS, "Error: Debug level outside range or missing !\n");
00342 usage(argv[0]);
00343 }
00344
00345 global_debug_level = debug_level;
00346
00347 break;
00348
00349 case 's':
00350
00351 using_push_ = true;
00352
00353 break;
00354
00355 case 'r':
00356
00357 using_points_ = false;
00358
00359 break;
00360
00361 case '?':
00362
00363 DiffPrint(DEBUG_ALWAYS,
00364 "Error: %c isn't a valid option or its parameter is missing !\n", optopt);
00365 usage(argv[0]);
00366
00367 break;
00368
00369 case ':':
00370
00371 DiffPrint(DEBUG_ALWAYS, "Parameter missing !\n");
00372 usage(argv[0]);
00373
00374 break;
00375
00376 }
00377
00378 if (opt == -1)
00379 break;
00380 }
00381
00382 diffusion_port_ = diff_port;
00383 }
00384
00385 void GearSenderApp::readGeographicCoordinates()
00386 {
00387 char *gear_coord_env;
00388
00389 if (using_points_ || !using_push_){
00390
00391 gear_coord_env = getenv("lat_pt");
00392 if (!gear_coord_env){
00393 DiffPrint(DEBUG_ALWAYS, "Cannot read lat_pt !\n");
00394 exit(-1);
00395 }
00396 lat_pt_ = atoi(gear_coord_env);
00397
00398
00399 gear_coord_env = getenv("long_pt");
00400 if (!gear_coord_env){
00401 DiffPrint(DEBUG_ALWAYS, "Cannot read long_pt !\n");
00402 exit(-1);
00403 }
00404 long_pt_ = atoi(gear_coord_env);
00405 }
00406 else{
00407
00408 gear_coord_env = getenv("lat_min");
00409 if (!gear_coord_env){
00410 DiffPrint(DEBUG_ALWAYS, "Cannot read lat_min !\n");
00411 exit(-1);
00412 }
00413 lat_min_ = atoi(gear_coord_env);
00414
00415
00416 gear_coord_env = getenv("lat_max");
00417 if (!gear_coord_env){
00418 DiffPrint(DEBUG_ALWAYS, "Cannot read lat_max !\n");
00419 exit(-1);
00420 }
00421 lat_max_ = atoi(gear_coord_env);
00422
00423
00424 gear_coord_env = getenv("long_min");
00425 if (!gear_coord_env){
00426 DiffPrint(DEBUG_ALWAYS, "Cannot read long_min !\n");
00427 exit(-1);
00428 }
00429 long_min_ = atoi(gear_coord_env);
00430
00431
00432 gear_coord_env = getenv("long_max");
00433 if (!gear_coord_env){
00434 DiffPrint(DEBUG_ALWAYS, "Cannot read long_max !\n");
00435 exit(-1);
00436 }
00437 long_max_ = atoi(gear_coord_env);
00438 }
00439 }
00440
00441 #ifdef NS_DIFFUSION
00442 GearSenderApp::GearSenderApp() : sdt_(this)
00443 #else
00444 GearSenderApp::GearSenderApp(int argc, char **argv)
00445 #endif
00446 {
00447 last_seq_sent_ = 0;
00448 num_subscriptions_ = 0;
00449
00450 mr_ = new GearSenderReceive(this);
00451
00452 #ifndef NS_DIFFUSION
00453 parseCommandLine(argc, argv);
00454 readGeographicCoordinates();
00455 dr_ = NR::createNR(diffusion_port_);
00456 #endif
00457 }
00458
00459 #ifndef USE_SINGLE_ADDRESS_SPACE
00460 int main(int argc, char **argv)
00461 {
00462 GearSenderApp *app;
00463
00464 app = new GearSenderApp(argc, argv);
00465 app->run();
00466
00467 return 0;
00468 }
00469 #endif