00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040
00041
00042
00043 #include "gear_receiver.hh"
00044
00045 #ifdef NS_DIFFUSION
00046 static class GearReceiverAppClass : public TclClass {
00047 public:
00048 GearReceiverAppClass() : TclClass("Application/DiffApp/GearReceiverApp") {}
00049 TclObject * create(int argc, const char*const* argv) {
00050 return (new GearReceiverApp());
00051 }
00052 } class_gear_receiver_app_class;
00053
00054 int GearReceiverApp::command(int argc, const char*const* argv) {
00055 if (argc == 2) {
00056 if (strcmp(argv[1], "subscribe") == 0) {
00057 run();
00058 return TCL_OK;
00059 }
00060 }
00061 else if (argc >= 6) {
00062
00063 if (strcmp(argv[1], "push-pull-options") == 0) {
00064 if (strcmp(argv[2], "push") == 0)
00065 using_push_ = true;
00066 else
00067 using_push_ = false;
00068
00069 if (strcmp(argv[3], "point") == 0)
00070 {
00071 using_points_ = true;
00072 if (argv[4] != NULL && argv[5] != NULL)
00073 {
00074 lat_pt_ = atoi(argv[4]);
00075 long_pt_ = atoi(argv[5]);
00076 }
00077 }
00078 else
00079 {
00080 using_points_ = false;
00081 if (argv[4] != NULL && argv[5] != NULL && argv[6] != NULL && argv[7] != NULL)
00082 {
00083 lat_min_ = atoi(argv[4]);
00084 lat_max_ = atoi(argv[5]);
00085 long_min_ = atoi(argv[6]);
00086 long_max_ = atoi(argv[7]);
00087 }
00088 }
00089 return TCL_OK;
00090 }
00091 }
00092 return DiffApp::command(argc, argv);
00093 }
00094
00095 #endif //NS_DIFFUSION
00096
00097 void GearReceiverReceive::recv(NRAttrVec *data, NR::handle my_handle)
00098 {
00099 app_->recv(data, my_handle);
00100 }
00101
00102 void GearReceiverApp::recv(NRAttrVec *data, NR::handle my_handle)
00103 {
00104 NRSimpleAttribute<int> *counterAttr = NULL;
00105 NRSimpleAttribute<void *> *timeAttr = NULL;
00106 EventTime *probe_event;
00107 long delay_seconds;
00108 long delay_useconds;
00109 float total_delay;
00110 struct timeval tmv;
00111
00112 GetTime(&tmv);
00113
00114 counterAttr = GearCounterAttr.find(data);
00115 timeAttr = GearTimeAttr.find(data);
00116
00117 if (!counterAttr || !timeAttr){
00118 DiffPrint(DEBUG_ALWAYS, "Received a BAD packet !\n");
00119 PrintAttrs(data);
00120 return;
00121 }
00122
00123
00124 probe_event = (EventTime *) timeAttr->getVal();
00125 delay_seconds = tmv.tv_sec;
00126 delay_useconds = tmv.tv_usec;
00127
00128 if ((delay_seconds < probe_event->seconds_) ||
00129 ((delay_seconds == probe_event->seconds_) &&
00130 (delay_useconds < probe_event->useconds_))){
00131
00132 delay_seconds = -1;
00133 delay_useconds = 0;
00134 DiffPrint(DEBUG_ALWAYS, "Error calculating delay !\n");
00135 }
00136 else{
00137 delay_seconds = delay_seconds - probe_event->seconds_;
00138 if (delay_useconds < probe_event->useconds_){
00139 delay_seconds--;
00140 delay_useconds = delay_useconds + 1000000;
00141 }
00142 delay_useconds = delay_useconds - probe_event->useconds_;
00143 }
00144 total_delay = (float) (1.0 * delay_seconds) + ((float) delay_useconds / 1000000.0);
00145
00146
00147 if (first_msg_recv_ < 0){
00148 first_msg_recv_ = counterAttr->getVal();
00149 }
00150
00151
00152 if (last_seq_recv_ >= 0){
00153 if (counterAttr->getVal() < last_seq_recv_){
00154
00155 last_seq_recv_ = -1;
00156 DiffPrint(DEBUG_ALWAYS, "Node%d: Received data %d, total latency = %f!\n",
00157 ((DiffusionRouting *)dr_)->getNodeId(),
00158 counterAttr->getVal(), total_delay);
00159 }
00160 else{
00161 last_seq_recv_ = counterAttr->getVal();
00162 num_msg_recv_++;
00163 DiffPrint(DEBUG_ALWAYS, "Node%d: Received data: %d, total latency = %f, %% messages received: %f !\n",
00164 ((DiffusionRouting *)dr_)->getNodeId(),
00165 last_seq_recv_, total_delay,
00166 (float) ((num_msg_recv_ * 100.00) /
00167 ((last_seq_recv_ - first_msg_recv_) + 1)));
00168 }
00169 }
00170 else{
00171 DiffPrint(DEBUG_ALWAYS, "Node%d: Received data %d, total latency = %f !\n",
00172 ((DiffusionRouting *)dr_)->getNodeId(),
00173 counterAttr->getVal(), total_delay);
00174 }
00175 }
00176
00177 handle GearReceiverApp::setupSubscription()
00178 {
00179 NRAttrVec attrs;
00180
00181 attrs.push_back(NRClassAttr.make(NRAttribute::IS,
00182 NRAttribute::INTEREST_CLASS));
00183
00184 if (using_push_){
00185 attrs.push_back(NRScopeAttr.make(NRAttribute::IS,
00186 NRAttribute::NODE_LOCAL_SCOPE));
00187
00188
00189 attrs.push_back(LatitudeAttr.make(NRAttribute::IS, lat_pt_));
00190 attrs.push_back(LongitudeAttr.make(NRAttribute::IS, long_pt_));
00191 }
00192 else{
00193 attrs.push_back(NRScopeAttr.make(NRAttribute::IS,
00194 NRAttribute::GLOBAL_SCOPE));
00195
00196
00197 if (using_points_){
00198 attrs.push_back(LatitudeAttr.make(NRAttribute::EQ, lat_pt_));
00199 attrs.push_back(LongitudeAttr.make(NRAttribute::EQ, long_pt_));
00200 }
00201 else{
00202 attrs.push_back(LatitudeAttr.make(NRAttribute::GE, lat_min_));
00203 attrs.push_back(LatitudeAttr.make(NRAttribute::LE, lat_max_));
00204 attrs.push_back(LongitudeAttr.make(NRAttribute::GE, long_min_));
00205 attrs.push_back(LongitudeAttr.make(NRAttribute::LE, long_max_));
00206 }
00207 }
00208
00209 attrs.push_back(GearTargetAttr.make(NRAttribute::IS, "F117A"));
00210
00211 handle h = dr_->subscribe(&attrs, mr_);
00212
00213 ClearAttrs(&attrs);
00214
00215 return h;
00216 }
00217
00218 void GearReceiverApp::run()
00219 {
00220 subHandle_ = setupSubscription();
00221
00222 #ifndef NS_DIFFUSION
00223
00224 while (1){
00225 sleep(1000);
00226 }
00227 #endif // !NS_DIFFUSION
00228 }
00229
00230 void GearReceiverApp::usage(char *s){
00231 DiffPrint(DEBUG_ALWAYS, "Usage: %s [-d debug] [-p port] [-s] [-r] [-h]\n\n", s);
00232 DiffPrint(DEBUG_ALWAYS, "\t-d - Sets debug level (0-10)\n");
00233 DiffPrint(DEBUG_ALWAYS, "\t-p - Uses port 'port' to talk to diffusion\n");
00234 DiffPrint(DEBUG_ALWAYS, "\t-s - Uses push semantics (default: pull)\n");
00235 DiffPrint(DEBUG_ALWAYS, "\t-r - Uses regions (default: points)\n");
00236 DiffPrint(DEBUG_ALWAYS, "\t-h - Prints this information\n");
00237 DiffPrint(DEBUG_ALWAYS, "\n");
00238 exit(0);
00239 }
00240
00241 void GearReceiverApp::parseCommandLine(int argc, char **argv)
00242 {
00243 u_int16_t diff_port = DEFAULT_DIFFUSION_PORT;
00244 int debug_level;
00245 int opt;
00246
00247 config_file_ = NULL;
00248 using_points_ = true;
00249 using_push_ = false;
00250 opterr = 0;
00251
00252 while (1){
00253 opt = getopt(argc, argv, "srhd:p:");
00254 switch (opt){
00255
00256 case 'p':
00257
00258 diff_port = (u_int16_t) atoi(optarg);
00259 if ((diff_port < 1024) || (diff_port >= 65535)){
00260 DiffPrint(DEBUG_ALWAYS, "Error: Diffusion port must be between 1024 and 65535 !\n");
00261 exit(-1);
00262 }
00263
00264 break;
00265
00266 case 'h':
00267
00268 usage(argv[0]);
00269
00270 break;
00271
00272 case 'd':
00273
00274 debug_level = atoi(optarg);
00275
00276 if (debug_level < 1 || debug_level > 10){
00277 DiffPrint(DEBUG_ALWAYS, "Error: Debug level outside range or missing !\n");
00278 usage(argv[0]);
00279 }
00280
00281 global_debug_level = debug_level;
00282
00283 break;
00284
00285 case 's':
00286
00287 using_push_ = true;
00288
00289 break;
00290
00291 case 'r':
00292
00293 using_points_ = false;
00294
00295 break;
00296
00297 case '?':
00298
00299 DiffPrint(DEBUG_ALWAYS,
00300 "Error: %c isn't a valid option or its parameter is missing !\n", optopt);
00301 usage(argv[0]);
00302
00303 break;
00304
00305 case ':':
00306
00307 DiffPrint(DEBUG_ALWAYS, "Parameter missing !\n");
00308 usage(argv[0]);
00309
00310 break;
00311
00312 }
00313
00314 if (opt == -1)
00315 break;
00316 }
00317
00318 diffusion_port_ = diff_port;
00319 }
00320
00321 void GearReceiverApp::readGeographicCoordinates()
00322 {
00323 char *gear_coord_env;
00324
00325 if (using_points_ || using_push_){
00326
00327 gear_coord_env = getenv("lat_pt");
00328 if (!gear_coord_env){
00329 DiffPrint(DEBUG_ALWAYS, "Cannot read lat_pt !\n");
00330 exit(-1);
00331 }
00332 lat_pt_ = atoi(gear_coord_env);
00333
00334
00335 gear_coord_env = getenv("long_pt");
00336 if (!gear_coord_env){
00337 DiffPrint(DEBUG_ALWAYS, "Cannot read long_pt !\n");
00338 exit(-1);
00339 }
00340 long_pt_ = atoi(gear_coord_env);
00341 }
00342 else{
00343
00344 gear_coord_env = getenv("lat_min");
00345 if (!gear_coord_env){
00346 DiffPrint(DEBUG_ALWAYS, "Cannot read lat_min !\n");
00347 exit(-1);
00348 }
00349 lat_min_ = atoi(gear_coord_env);
00350
00351
00352 gear_coord_env = getenv("lat_max");
00353 if (!gear_coord_env){
00354 DiffPrint(DEBUG_ALWAYS, "Cannot read lat_max !\n");
00355 exit(-1);
00356 }
00357 lat_max_ = atoi(gear_coord_env);
00358
00359
00360 gear_coord_env = getenv("long_min");
00361 if (!gear_coord_env){
00362 DiffPrint(DEBUG_ALWAYS, "Cannot read long_min !\n");
00363 exit(-1);
00364 }
00365 long_min_ = atoi(gear_coord_env);
00366
00367
00368 gear_coord_env = getenv("long_max");
00369 if (!gear_coord_env){
00370 DiffPrint(DEBUG_ALWAYS, "Cannot read long_max !\n");
00371 exit(-1);
00372 }
00373 long_max_ = atoi(gear_coord_env);
00374 }
00375 }
00376
00377 #ifdef NS_DIFFUSION
00378 GearReceiverApp::GearReceiverApp()
00379 #else
00380 GearReceiverApp::GearReceiverApp(int argc, char **argv)
00381 #endif // NS_DIFFUSION
00382 {
00383 last_seq_recv_ = 0;
00384 num_msg_recv_ = 0;
00385 first_msg_recv_ = -1;
00386
00387 mr_ = new GearReceiverReceive(this);
00388
00389 #ifndef NS_DIFFUSION
00390 parseCommandLine(argc, argv);
00391 readGeographicCoordinates();
00392 dr_ = NR::createNR(diffusion_port_);
00393 #endif // !NS_DIFFUSION
00394 }
00395
00396 #ifndef NS_ADDRESS
00397 #ifndef USE_SINGLE_ADDRESS_SPACE
00398 int main(int argc, char **argv)
00399 {
00400 GearReceiverApp *app;
00401
00402 app = new GearReceiverApp(argc, argv);
00403 app->run();
00404
00405 return 0;
00406 }
00407 #endif // !USE_SINGLE_ADDRESS_SPACE
00408 #endif // !NS_ADDRESS