00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040
00041
00042
00043 #include "rmst_filter.hh"
00044
00045 char *rmstmsg_types[] = {"INTEREST", "POSITIVE REINFORCEMENT",
00046 "NEGATIVE REINFORCEMENT", "DATA",
00047 "EXPLORATORY DATA", "PUSH EXPLORATORY DATA",
00048 "CONTROL", "REDIRECT"};
00049
00050 #ifdef NS_DIFFUSION
00051 class DiffAppAgent;
00052 #endif // NS_DIFFUSION
00053
00054 #ifdef NS_DIFFUSION
00055 static class RmstFilterClass : public TclClass {
00056 public:
00057 RmstFilterClass() : TclClass("Application/DiffApp/RmstFilter") {}
00058 TclObject* create(int argc, const char*const* argv) {
00059 return(new RmstFilter());
00060 }
00061 } class_rmst_filter;
00062
00063 int RmstFilter::command(int argc, const char*const* argv) {
00064
00065 if (argc == 2) {
00066 if (strcmp(argv[1], "start") == 0) {
00067 run();
00068 return (TCL_OK);
00069 }
00070 }
00071 return (DiffApp::command(argc, argv));
00072 }
00073 #endif // NS_DIFFUSION
00074
00075 class ReinfMessage {
00076 public:
00077 int32_t rdm_id_;
00078 int32_t pkt_num_;
00079 };
00080
00081
00082
00083
00084
00085
00086
00087 void RmstFilterCallback::recv(Message *msg, handle h)
00088 {
00089 app_->recv(msg, h);
00090 }
00091
00092
00093
00094
00095
00096
00097 void RmstFilter::recv(Message *msg, handle h)
00098 {
00099
00100
00101 if(processMessage(msg))
00102 ((DiffusionRouting *)dr_)->sendMessage(msg, h);
00103 }
00104
00105
00106
00107
00108
00109 bool RmstFilter::processMessage(Message *msg)
00110 {
00111 NRSimpleAttribute<int> *rmst_id_attr = NULL;
00112 NRSimpleAttribute<int> *frag_attr = NULL;
00113 NRSimpleAttribute<int> *pkts_sent_attr = NULL;
00114 NRSimpleAttribute<int> *tsprt_ctl_attr = NULL;
00115 NRSimpleAttribute<void *> *reinf_attr = NULL;
00116 NRSimpleAttribute<int> *nrscope = NULL;
00117 NRSimpleAttribute<int> *nr_class = NULL;
00118 NRAttrVec *data;
00119
00120 Key2ExpLog::iterator exp_iterator;
00121 Int2Rmst::iterator rmst_iterator;
00122 int rmst_no;
00123 int frag_no;
00124 int class_type;
00125 int rmst_ctl_type;
00126 union LlToInt key;
00127 Rmst *rmst_ptr;
00128
00129
00130
00131 tsprt_ctl_attr = RmstTsprtCtlAttr.find(msg->msg_attr_vec_);
00132 if (!tsprt_ctl_attr){
00133 DiffPrint(DEBUG_SOME_DETAILS, "RmstFilter got non-transport message\n");
00134 return true;
00135 }
00136 rmst_ctl_type = tsprt_ctl_attr->getVal();
00137
00138 DiffPrint(DEBUG_IMPORTANT, "RmstFilter::processMessage got a");
00139
00140 if (msg->new_message_)
00141 DiffPrint(DEBUG_IMPORTANT, " new (%d) ", msg->msg_type_);
00142 else
00143 DiffPrint(DEBUG_IMPORTANT, "n old (%d) ", msg->msg_type_);
00144
00145 if (msg->last_hop_ != LOCALHOST_ADDR)
00146 DiffPrint(DEBUG_IMPORTANT, "%s message from %d to %d\n",
00147 rmstmsg_types[msg->msg_type_],
00148 msg->last_hop_, msg->next_hop_);
00149 else
00150 DiffPrint(DEBUG_IMPORTANT, "%s message from local agent\n",
00151 rmstmsg_types[msg->msg_type_]);
00152
00153
00154
00155
00156
00157
00158
00159 if (!msg->new_message_ && msg->msg_type_ == DATA
00160 && rmst_ctl_type == RMST_RESP){
00161 DiffPrint(DEBUG_SOME_DETAILS,
00162 " Sink got an old DATA message from node %d\n", msg->last_hop_);
00163 data = msg->msg_attr_vec_;
00164 rmst_id_attr = RmstIdAttr.find(data);
00165 if (!rmst_id_attr){
00166 DiffPrint(DEBUG_SOME_DETAILS,
00167 " Filter received a bad transport packet!\n");
00168 return false;
00169 }
00170 rmst_no = rmst_id_attr->getVal();
00171
00172 rmst_iterator = rmst_map_.find(rmst_no);
00173 if(rmst_iterator == rmst_map_.end()){
00174 DiffPrint(DEBUG_IMPORTANT,
00175 " couldn't find DB entry for Rmst %d\n", rmst_no);
00176 return false;
00177 }
00178 else{
00179 rmst_ptr = (*rmst_iterator).second;
00180 if ( (local_sink_) && (msg->last_hop_ == rmst_ptr->last_hop_) ){
00181
00182
00183 DiffPrint(DEBUG_IMPORTANT,
00184 " We suppress old DATA message from smac retransmission!\n");
00185 return false;
00186 }
00187 else
00188 return true;
00189 }
00190 }
00191 else if (!msg->new_message_)
00192 return true;
00193
00194
00195 if ( (rmst_ctl_type == RMST_RESP) &&
00196 ((msg->msg_type_ == DATA) || (msg->msg_type_ == EXPLORATORY_DATA)) ){
00197 rmst_ptr = syncLocalCache(msg);
00198
00199
00200 if (rmst_ptr == NULL)
00201 return false;
00202 rmst_no = rmst_ptr->rmst_no_;
00203
00204 GetTime (&last_data_rec_);
00205 }
00206
00207
00208
00209
00210
00211
00212 switch (msg->msg_type_){
00213
00214 case(EXPLORATORY_DATA):
00215 ExpLog exp_msg;
00216
00217 DiffPrint(DEBUG_LOTS_DETAILS,
00218 " Exploratory_Msg: ptk_num = %x, rdm_id_ = %x, last_hop = %d\n",
00219 msg->pkt_num_, msg->rdm_id_, msg->last_hop_);
00220
00221
00222
00223
00224 DiffPrint(DEBUG_SOME_DETAILS,
00225 " Exploratory message for Reliable transport Id = %d\n", rmst_no);
00226 key.int_val_[0] = msg->pkt_num_;
00227 key.int_val_[1] = msg->rdm_id_;
00228 DiffPrint(DEBUG_LOTS_DETAILS, " Key = %llx\n", key.ll_val_);
00229 exp_msg.rmst_no_ = rmst_no;
00230 exp_msg.last_hop_ = msg->last_hop_;
00231 exp_map_.insert(Key2ExpLog::value_type(key.ll_val_, exp_msg));
00232
00233
00234
00235
00236
00237 if (local_sink_){
00238 rmst_ptr->last_hop_ = msg->last_hop_;
00239 if (rmst_ptr->reinf_){
00240 DiffPrint(DEBUG_IMPORTANT, " got a new path exploratory msg at sink.\n");
00241 rmst_ptr->wait_for_new_path_ = true;
00242 }
00243 else{
00244 rmst_ptr->reinf_ = true;
00245 DiffPrint(DEBUG_IMPORTANT, " got an initial exploratory msg at sink.\n");
00246 }
00247 DiffPrint(DEBUG_IMPORTANT, " set last_hop for rmst %d to %d\n",
00248 rmst_no, rmst_ptr->last_hop_);
00249 rmst_ptr->pkts_rec_ = 0;
00250 rmst_ptr->last_hop_pkts_sent_ = 0;
00251 }
00252 else{
00253
00254
00255 DiffPrint(DEBUG_LOTS_DETAILS,
00256 " intermediate node resets sync_base_ and reinf_.\n");
00257 frag_attr = RmstFragAttr.find(msg->msg_attr_vec_);
00258 frag_no = frag_attr->getVal();
00259 rmst_ptr->sync_base_ = frag_no;
00260 if(rmst_ptr->reinf_)
00261 rmst_ptr->reinf_ = false;
00262 rmst_ptr->last_hop_ = 0;
00263 rmst_ptr->pkts_sent_ = 0;
00264 rmst_ptr->pkts_rec_ = 0;
00265 rmst_ptr->last_hop_pkts_sent_ = 0;
00266 rmst_ptr->naks_rec_ = 0;
00267 }
00268
00269
00270
00271
00272 if ((rmst_ptr->watchdog_active_) && (!local_sink_)
00273 && (!rmst_ptr->local_source_)){
00274 rmst_ptr->cancel_watchdog_ = true;
00275 rmst_ptr->cleanHoleMap();
00276 }
00277
00278
00279 return(true);
00280 break;
00281
00282 case(DATA):
00283
00284 if (rmst_ctl_type != RMST_RESP){
00285 processCtrlMessage(msg);
00286
00287 return false;
00288 }
00289
00290
00291 rmst_ptr->pkts_rec_++;
00292
00293
00294 pkts_sent_attr = RmstPktsSentAttr.find(msg->msg_attr_vec_);
00295 if (pkts_sent_attr){
00296 rmst_ptr->last_hop_pkts_sent_ = pkts_sent_attr->getVal();
00297 DiffPrint(DEBUG_SOME_DETAILS,
00298 "processMessage:: got last_hop_pkts_sent_ = %d packets\n",
00299 rmst_ptr->last_hop_pkts_sent_);
00300 if ( (rmst_ptr->last_hop_pkts_sent_ > 20) &&
00301 (rmst_ptr->pkts_rec_ < (rmst_ptr->last_hop_pkts_sent_ * BLACKLIST_THRESHOLD)) ){
00302 Blacklist::iterator black_list_iterator;
00303 black_list_iterator = black_list_.begin();
00304 while(black_list_iterator != black_list_.end()){
00305 if(*black_list_iterator == rmst_ptr->last_hop_)
00306 break;
00307 black_list_iterator++;
00308 }
00309 if(black_list_iterator == black_list_.end()){
00310 DiffPrint(DEBUG_IMPORTANT, "Adding node %d to black_list_ !!\n",
00311 rmst_ptr->last_hop_);
00312 black_list_.push_front(rmst_ptr->last_hop_);
00313 ((DiffusionRouting *)dr_)->addToBlacklist(rmst_ptr->last_hop_);
00314
00315 sendExpReqUpstream(rmst_ptr);
00316 rmst_ptr->sent_exp_req_ = true;
00317 GetTime(&rmst_ptr->exp_req_time_);
00318
00319 Message *neg_reinf_msg;
00320 neg_reinf_msg = new Message(DIFFUSION_VERSION, NEGATIVE_REINFORCEMENT,
00321 0, 0, interest_attrs_->size(), pkt_count_, rdm_id_,
00322 rmst_ptr->last_hop_, LOCALHOST_ADDR);
00323 neg_reinf_msg->msg_attr_vec_ = CopyAttrs(interest_attrs_);
00324 ((DiffusionRouting *)dr_)->sendMessage(neg_reinf_msg, filter_handle_, 1);
00325 pkt_count_++;
00326 delete neg_reinf_msg;
00327 }
00328 }
00329 }
00330
00331
00332
00333 if ( msg->last_hop_ != rmst_ptr->last_hop_ ){
00334 DiffPrint(DEBUG_IMPORTANT,
00335 " We suppress new DATA message on non-backchannel path!; backchannel = %d\n",
00336 rmst_ptr->last_hop_);
00337 msg->new_message_ = 0;
00338 return true;
00339 }
00340
00341 if (rmst_ptr->wait_for_new_path_){
00342 rmst_ptr->wait_for_new_path_ = false;
00343 DiffPrint(DEBUG_SOME_DETAILS, " node resets wait_for_new_path_.\n");
00344 }
00345
00346 if (local_sink_ && rmst_ptr->sent_exp_req_){
00347 DiffPrint(DEBUG_SOME_DETAILS,
00348 " source got a new path, set sent_exp_req_ false.\n");
00349 rmst_ptr->sent_exp_req_ = false;
00350 }
00351
00352
00353
00354
00355
00356 if(rmst_ptr->local_source_ || local_sink_)
00357 return false;
00358 else{
00359 rmst_ptr->pkts_sent_++;
00360
00361 if(pkts_sent_attr)
00362 pkts_sent_attr->setVal(rmst_ptr->pkts_sent_);
00363 return true;
00364 }
00365 break;
00366
00367 case(INTEREST):
00368 data = msg->msg_attr_vec_;
00369 nr_class = NRClassAttr.find(data);
00370 if (nr_class){
00371 class_type = nr_class->getVal();
00372 if (class_type == NRAttribute::DISINTEREST_CLASS)
00373 DiffPrint(DEBUG_SOME_DETAILS, " DISINTEREST_CLASS\n");
00374 }
00375
00376 nrscope = NRScopeAttr.find(msg->msg_attr_vec_);
00377 if(nrscope->getVal() == NRAttribute::NODE_LOCAL_SCOPE)
00378 DiffPrint(DEBUG_SOME_DETAILS, " rmst LOCAL_SCOPE Interest Message\n");
00379 else if (msg->last_hop_ == LOCALHOST_ADDR){
00380 DiffPrint(DEBUG_SOME_DETAILS, " rmst Interest Message from local SINK\n");
00381 local_sink_ = true;
00382 local_sink_port_ = msg->source_port_;
00383 GetTime (&last_sink_time_);
00384 if (interest_attrs_ == NULL)
00385 interest_attrs_ = CopyAttrs(msg->msg_attr_vec_);
00386 }
00387 else{
00388 DiffPrint(DEBUG_SOME_DETAILS, " rmst Interest Message from non-local node\n");
00389 if (interest_attrs_ == NULL)
00390 interest_attrs_ = CopyAttrs(msg->msg_attr_vec_);
00391 }
00392 break;
00393
00394 case(POSITIVE_REINFORCEMENT):
00395 ReinfMessage *reinf_msg;
00396 ExpLog exp_log;
00397
00398 DiffPrint(DEBUG_IMPORTANT, " Positive Reinf arrived\n");
00399 reinf_attr = ReinforcementAttr.find(msg->msg_attr_vec_);
00400 reinf_msg = (ReinfMessage*)reinf_attr->getVal();
00401 DiffPrint(DEBUG_LOTS_DETAILS, " Pos_Reinf: ptk_num = %x, rdm_id_ = %x\n",
00402 reinf_msg->pkt_num_, reinf_msg->rdm_id_);
00403
00404 key.int_val_[0] = reinf_msg->pkt_num_;
00405 key.int_val_[1] = reinf_msg->rdm_id_;
00406 exp_iterator = exp_map_.find(key.ll_val_);
00407 if(exp_iterator != exp_map_.end()){
00408 exp_log = (*exp_iterator).second;
00409 DiffPrint(DEBUG_SOME_DETAILS, " Reinforcement for rmst_no = %d, last_hop_ = %d\n",
00410 exp_log.rmst_no_, exp_log.last_hop_);
00411
00412
00413
00414 rmst_no = exp_log.rmst_no_;
00415 rmst_iterator = rmst_map_.find(rmst_no);
00416 if(rmst_iterator != rmst_map_.end()){
00417 rmst_ptr = (*rmst_iterator).second;
00418 rmst_ptr->last_hop_ = exp_log.last_hop_;
00419 rmst_ptr->fwd_hop_ = msg->last_hop_;
00420 DiffPrint(DEBUG_SOME_DETAILS, " Setting rmst_no %d last_hop_ = %d, fwd_hop_ = %d\n",
00421 rmst_no, rmst_ptr->last_hop_, rmst_ptr->fwd_hop_);
00422 if(!rmst_ptr->reinf_){
00423 rmst_ptr->reinf_ = true;
00424 if(rmst_ptr->local_source_)
00425 DiffPrint(DEBUG_LOTS_DETAILS, " Local source got a Reinf\n");
00426 }
00427 }
00428 else{
00429 DiffPrint(DEBUG_IMPORTANT, " Reinforcement cant't find rmst_no\n");
00430 break;
00431 }
00432
00433
00434
00435 if( (rmst_ptr->watchdog_active_ == false) && (caching_mode_) ){
00436 TimerCallback *rmst_timer;
00437 DiffPrint(DEBUG_IMPORTANT,
00438 " Set a WATCHDOG_TIMER at caching node for reinforced rmst_no %d\n",
00439 rmst_no);
00440 rmst_timer = new RmstTimeout(this, rmst_no, WATCHDOG_TIMER);
00441
00442 rmst_ptr->watchdog_handle_ =
00443 ((DiffusionRouting *)dr_)->addTimer(WATCHDOG_INTERVAL, rmst_timer);
00444 rmst_ptr->watchdog_active_ = true;
00445 }
00446 if (rmst_ptr->wait_for_new_path_){
00447 DiffPrint(DEBUG_SOME_DETAILS, " Resetting wait_for_new_path_ for rmst_no %d\n", rmst_no);
00448 rmst_ptr->wait_for_new_path_ = false;
00449 }
00450 if (rmst_ptr->sent_exp_req_){
00451 DiffPrint(DEBUG_SOME_DETAILS,
00452 " intermediate node got a new path, set sent_exp_req_ false.\n");
00453 rmst_ptr->sent_exp_req_ = false;
00454 }
00455 }
00456 else{
00457 if(!rmst_ptr->local_source_)
00458 DiffPrint(DEBUG_IMPORTANT, " Reinforcement matches no Exploratory msg\n");
00459 }
00460 break;
00461
00462 case(NEGATIVE_REINFORCEMENT):
00463 bool ret_val;
00464 if (tsprt_ctl_attr){
00465 DiffPrint(DEBUG_SOME_DETAILS,
00466 " NEGATIVE_REINFORCEMENT, last_hop_ = %d, rmst_ctl_type = %d\n",
00467 msg->last_hop_, rmst_ctl_type);
00468 }
00469
00470
00471
00472
00473 ret_val = true;
00474 rmst_iterator = rmst_map_.begin();
00475 while(rmst_iterator != rmst_map_.end()){
00476 rmst_ptr = (*rmst_iterator).second;
00477 DiffPrint(DEBUG_SOME_DETAILS,
00478 " searching rmsts - rmst_no_ %d: fwd_hop_ = %d, reinf_ = %d, acked = %d\n",
00479 rmst_ptr->rmst_no_, rmst_ptr->fwd_hop_, rmst_ptr->reinf_, rmst_ptr->acked_);
00480 if (rmst_ptr->local_source_ && rmst_ptr->reinf_
00481 && (rmst_ptr->fwd_hop_ == msg->last_hop_)
00482 && !rmst_ptr->acked_){
00483
00484 DiffPrint(DEBUG_SOME_DETAILS, " local source sees NEG_REINF\n");
00485 processExpReq(rmst_ptr, rmst_ptr->max_frag_sent_);
00486 }
00487 else if (!rmst_ptr->local_source_ && (rmst_ptr->fwd_hop_ == msg->last_hop_)
00488 && rmst_ptr->reinf_ && !rmst_ptr->acked_){
00489 DiffPrint(DEBUG_SOME_DETAILS, " intermediate node sees NEG_REINF from reinforced node\n");
00490 DiffPrint(DEBUG_SOME_DETAILS, " send Exp Request upstream!\n");
00491 ret_val = false;
00492 sendExpReqUpstream(rmst_ptr);
00493 }
00494 else{
00495 DiffPrint(DEBUG_SOME_DETAILS,
00496 " node sees NEG_REINF from non-reinforced node - let routing layer see it\n");
00497 ret_val = true;
00498 }
00499 rmst_iterator++;
00500 }
00501 if (!ret_val)
00502 return false;
00503 break;
00504
00505 default:
00506 break;
00507 }
00508 return true;
00509 }
00510
00511
00512
00513
00514
00515
00516 Rmst* RmstFilter::syncLocalCache (Message *msg)
00517 {
00518 NRSimpleAttribute<int> *rmst_id_attr = NULL;
00519 NRSimpleAttribute<int> *frag_attr = NULL;
00520 NRSimpleAttribute<int> *max_frag_attr = NULL;
00521 NRSimpleAttribute<void *> *data_buf_attr = NULL;
00522 NRAttrVec *data = msg->msg_attr_vec_;
00523 Int2Rmst::iterator rmst_iterator;
00524 int rmst_no;
00525 int frag_no;
00526 int max_frag_no;
00527 void *blob_ptr;
00528 int blob_len;
00529 void *tmp_frag_ptr;
00530 Rmst *rmst_ptr;
00531
00532 rmst_id_attr = RmstIdAttr.find(data);
00533 frag_attr = RmstFragAttr.find(data);
00534 max_frag_attr = RmstMaxFragAttr.find(data);
00535 data_buf_attr = RmstDataAttr.find(data);
00536
00537 if (! (rmst_id_attr && frag_attr && data_buf_attr) ){
00538 DiffPrint(DEBUG_IMPORTANT, " Filter received a BAD transport packet!\n");
00539 return NULL;
00540 }
00541
00542 rmst_no = rmst_id_attr->getVal();
00543 frag_no = frag_attr->getVal();
00544 if(max_frag_attr)
00545 max_frag_no = max_frag_attr->getVal();
00546 else
00547 max_frag_no = 0;
00548 blob_ptr = data_buf_attr->getVal();
00549 blob_len = data_buf_attr->getLen();
00550
00551
00552
00553
00554 rmst_iterator = rmst_map_.find(rmst_no);
00555 if(rmst_iterator == rmst_map_.end()){
00556 DiffPrint(DEBUG_IMPORTANT, " creating a new DB entry for Rmst %d\n", rmst_no);
00557 DiffPrint(DEBUG_SOME_DETAILS, " Max Fragment number = %d\n", max_frag_no);
00558 rmst_ptr = new Rmst(rmst_no);
00559 rmst_ptr->max_frag_ = max_frag_no;
00560 rmst_map_.insert(Int2Rmst::value_type(rmst_no, rmst_ptr));
00561
00562
00563
00564 GetTime(&rmst_ptr->last_nak_time_);
00565
00566
00567
00568 if (msg->last_hop_ == LOCALHOST_ADDR) {
00569 rmst_ptr->local_source_ = true;
00570 rmst_ptr->local_source_port_ = msg->source_port_;
00571
00572
00573
00574 rmst_ptr->last_hop_ = LOCALHOST_ADDR;
00575 }
00576
00577 if((rmst_ptr->local_source_)||(local_sink_)||(caching_mode_)){
00578 NRSimpleAttribute<char *> *rmst_tgt_attr = NULL;
00579 rmst_tgt_attr = RmstTargetAttr.find(msg->msg_attr_vec_);
00580 if (rmst_tgt_attr){
00581 char *tmp_str = rmst_tgt_attr->getVal();
00582 rmst_ptr->target_str_ = new char[strlen(tmp_str)+1];
00583 strcpy (rmst_ptr->target_str_, tmp_str);
00584 DiffPrint(DEBUG_IMPORTANT, " RmstTargetAttr = %s\n", rmst_ptr->target_str_);
00585 }
00586 else
00587 DiffPrint(DEBUG_IMPORTANT, " no RmstTargetAttr Rmst %d !\n", rmst_no);
00588 }
00589 }
00590 else
00591 rmst_ptr = (*rmst_iterator).second;
00592
00593 if(!rmst_ptr->local_source_)
00594 DiffPrint(DEBUG_IMPORTANT, " Got a blob, rmstId = %d, frag_no = %d\n", rmst_no, frag_no);
00595
00596
00597 GetTime(&rmst_ptr->last_data_time_);
00598
00599
00600
00601 if((rmst_ptr->local_source_)||(local_sink_)||(caching_mode_)){
00602 tmp_frag_ptr = rmst_ptr->getFrag(frag_no);
00603 if (tmp_frag_ptr == NULL){
00604 if(!rmst_ptr->local_source_)
00605 DiffPrint(DEBUG_SOME_DETAILS, " creating a new frag %d entry for Rmst %d\n",
00606 frag_no, rmst_no);
00607 if (frag_no == rmst_ptr->max_frag_)
00608 rmst_ptr->max_frag_len_ = blob_len;
00609 tmp_frag_ptr = new char[blob_len];
00610 memcpy(tmp_frag_ptr, blob_ptr, blob_len);
00611 rmst_ptr->putFrag(frag_no, tmp_frag_ptr);
00612
00613
00614
00615 if(!rmst_ptr->local_source_){
00616 if ( rmst_ptr->inHoleMap(frag_no) ){
00617
00618 NakData *nak_ptr = rmst_ptr->getHole(frag_no);
00619 if(nak_ptr->nak_sent_)
00620 DiffPrint(DEBUG_SOME_DETAILS, " We sent a NAK_REQ for this fragment.\n");
00621 DiffPrint(DEBUG_SOME_DETAILS, " filter removing hole %d from hole_map_\n",frag_no);
00622 rmst_ptr->delHole(frag_no);
00623 }
00624 }
00625
00626
00627
00628
00629 if((!rmst_ptr->local_source_)&&(local_sink_)&&(rmst_ptr->watchdog_active_ == false)
00630 && (frag_no>0)){
00631 TimerCallback *rmst_timer;
00632 DiffPrint(DEBUG_IMPORTANT, " Set a WATCHDOG_TIMER at sink for rmst_no %d\n", rmst_no);
00633 rmst_timer = new RmstTimeout(this, rmst_no, WATCHDOG_TIMER);
00634
00635 rmst_ptr->watchdog_handle_ = ((DiffusionRouting *)dr_)->addTimer(WATCHDOG_INTERVAL,
00636 rmst_timer);
00637 rmst_ptr->watchdog_active_ = true;
00638 }
00639 }
00640 else
00641 DiffPrint(DEBUG_SOME_DETAILS, " got a duplicate frag %d for blob %d\n",
00642 frag_no, rmst_no);
00643
00644
00645
00646 if ((!rmst_ptr->local_source_) && (rmst_ptr->holeInFragMap()))
00647 rmst_ptr->syncHoleMap();
00648
00649
00650
00651 if(rmst_ptr->rmstComplete()){
00652 if ((rmst_ptr->watchdog_active_) && (!rmst_ptr->local_source_)){
00653 DiffPrint(DEBUG_SOME_DETAILS,
00654 " Rmst #%d is complete set cancel_watchdog_ to stop WATCHDOG\n",
00655 rmst_no);
00656 rmst_ptr->cancel_watchdog_ = true;
00657 }
00658
00659
00660 if(local_sink_ && !(rmst_ptr->acked_)){
00661 sendRmstToSink(rmst_ptr);
00662
00663 rmst_ptr->acked_ = true;
00664 }
00665
00666
00667
00668
00669 if(rmst_ptr->local_source_){
00670 SendMsgData new_send_msg;
00671
00672 new_send_msg.rmst_no_ = rmst_no;
00673 new_send_msg.last_frag_sent_ = -1;
00674 new_send_msg.exp_base_ = 0;
00675 send_list_.push_back(new_send_msg);
00676 if(!send_timer_active_){
00677 TimerCallback *send_timer;
00678
00679 DiffPrint(DEBUG_SOME_DETAILS,
00680 " Rmst %d ready to send - Set a SEND_TIMER\n", rmst_no);
00681 send_timer = new RmstTimeout(this, -1, SEND_TIMER);
00682
00683 send_timer_handle_ =
00684 ((DiffusionRouting *)dr_)->addTimer(SEND_INTERVAL, send_timer);
00685 send_timer_active_ = true;
00686 }
00687 }
00688 else
00689
00690 sendAckToSource(rmst_ptr);
00691 }
00692 }
00693 else{
00694 rmst_ptr->max_frag_rec_ = frag_no;
00695 DiffPrint(DEBUG_LOTS_DETAILS, " Not caching frag %d entry for Rmst %d\n", frag_no, rmst_no);
00696 }
00697
00698 return rmst_ptr;
00699 }
00700
00701 void RmstFilter::processCtrlMessage(Message *msg)
00702 {
00703 NRSimpleAttribute<int> *tsprt_ctl_attr = NULL;
00704 NRSimpleAttribute<int> *rmst_id_attr = NULL;
00705 NRSimpleAttribute<int> *frag_attr = NULL;
00706 NRAttrVec *data;
00707 NRAttrVec attrs;
00708 Int2Rmst::iterator rmst_iterator;
00709 int rmst_no;
00710 int frag_no;
00711 int rmst_ctl_type;
00712 Rmst *rmst_ptr;
00713 void *frag_ptr;
00714 Message *nak_msg;
00715 NRAttrVec::iterator place;
00716 bool forwarding_nak = false;
00717
00718 data = msg->msg_attr_vec_;
00719
00720 tsprt_ctl_attr = RmstTsprtCtlAttr.find(data);
00721 rmst_ctl_type = tsprt_ctl_attr->getVal();
00722
00723 rmst_id_attr = RmstIdAttr.find(data);
00724 if(!rmst_id_attr) {
00725 DiffPrint(DEBUG_SOME_DETAILS, " Node got a bad Rmst control msg - no RmstIdAttr!\n");
00726 return;
00727 }
00728 rmst_no = rmst_id_attr->getVal();
00729
00730
00731 rmst_iterator = rmst_map_.begin();
00732 rmst_iterator = rmst_map_.find(rmst_no);
00733 if(rmst_iterator != rmst_map_.end())
00734 rmst_ptr = (*rmst_iterator).second;
00735 else{
00736 DiffPrint(DEBUG_IMPORTANT, " Filter can't find Rmst %d for Rmst control msg\n", rmst_no);
00737 return;
00738 }
00739
00740 switch (rmst_ctl_type){
00741
00742 case(ACK_RESP):
00743 DiffPrint(DEBUG_IMPORTANT, " Got an ACK_RESP\n");
00744
00745 rmst_ptr->acked_ = true;
00746 if(!rmst_ptr->local_source_){
00747 Message *ack_msg;
00748
00749
00750
00751 if (rmst_ptr->reinf_) {
00752 DiffPrint(DEBUG_SOME_DETAILS, " forwarding ACK to %d\n", rmst_ptr->last_hop_);
00753 attrs.push_back(RmstTsprtCtlAttr.make(NRAttribute::IS, ACK_RESP));
00754 attrs.push_back(RmstIdAttr.make(NRAttribute::IS, rmst_no));
00755 ack_msg = new Message(DIFFUSION_VERSION, DATA, 0, 0, attrs.size(),
00756 pkt_count_, rdm_id_, rmst_ptr->last_hop_, LOCALHOST_ADDR);
00757 ack_msg->msg_attr_vec_ = CopyAttrs(&attrs);
00758 ((DiffusionRouting *)dr_)->sendMessage(ack_msg, filter_handle_, 1);
00759 pkt_count_++;
00760 delete ack_msg;
00761 ClearAttrs(&attrs);
00762 }
00763 else
00764 DiffPrint(DEBUG_IMPORTANT, " intermediate node can't forward ACK for Rmst %d\n", rmst_no);
00765 }
00766 else{
00767 DiffPrint(DEBUG_IMPORTANT, " Source got ACK for Rmst %d\n", rmst_no);
00768 sendContToSource(rmst_ptr);
00769 }
00770 break;
00771
00772 case(NAK_REQ):
00773
00774
00775 GetTime(&rmst_ptr->last_nak_time_);
00776 rmst_ptr->naks_rec_++;
00777 DiffPrint(DEBUG_IMPORTANT, " Got a NAK_REQ; number = %d\n", rmst_ptr->naks_rec_);
00778
00779 if ((rmst_ptr->naks_rec_ > 10) && (rmst_ptr->naks_rec_ > (.30 * rmst_ptr->max_frag_)) &&
00780 rmst_ptr->local_source_){
00781 DiffPrint(DEBUG_IMPORTANT, " Too many NAKs - send an EXPLORATORY msg!\n");
00782 processExpReq(rmst_ptr, rmst_ptr->max_frag_sent_);
00783 return;
00784 }
00785
00786
00787
00788 if (rmst_ptr->sent_exp_req_){
00789 int exp_time = rmst_ptr->last_nak_time_.tv_sec - rmst_ptr->exp_req_time_.tv_sec;
00790 DiffPrint(DEBUG_SOME_DETAILS,
00791 " Node that sent an EXP_REQ got a NAK: time since last exp = %d\n", exp_time);
00792 if( (rmst_ptr->last_nak_time_.tv_sec - rmst_ptr->exp_req_time_.tv_sec) > 30){
00793
00794 DiffPrint(DEBUG_IMPORTANT, " Node resends EXP_REQ up blacklisted stream!\n");
00795 sendExpReqUpstream(rmst_ptr);
00796 GetTime(&rmst_ptr->exp_req_time_);
00797
00798 Message *neg_reinf_msg;
00799 neg_reinf_msg = new Message(DIFFUSION_VERSION, NEGATIVE_REINFORCEMENT,
00800 0, 0, interest_attrs_->size(), pkt_count_, rdm_id_,
00801 rmst_ptr->last_hop_, LOCALHOST_ADDR);
00802 neg_reinf_msg->msg_attr_vec_ = CopyAttrs(interest_attrs_);
00803 ((DiffusionRouting *)dr_)->sendMessage(neg_reinf_msg, filter_handle_, 1);
00804 pkt_count_++;
00805 delete neg_reinf_msg;
00806 }
00807 return;
00808 }
00809
00810
00811
00812 place = data->begin();
00813 for(;;){
00814 frag_attr = RmstFragAttr.find_from(data, place, &place);
00815 if (!frag_attr)
00816 break;
00817 frag_no = frag_attr->getVal();
00818 DiffPrint(DEBUG_IMPORTANT, " Filter received a NAK_REQ for Rmst %d, frag %d\n",
00819 rmst_no, frag_no);
00820
00821
00822
00823 frag_ptr = rmst_ptr->getFrag(frag_no);
00824 if (frag_ptr == NULL){
00825 DiffPrint(DEBUG_SOME_DETAILS, " Filter can't find frag %d of Rmst %d for NAK\n",
00826 frag_no, rmst_no);
00827
00828 if ( (rmst_ptr->reinf_) && (rmst_ptr->last_hop_ != LOCALHOST_ADDR) ){
00829 forwarding_nak = true;
00830 DiffPrint(DEBUG_IMPORTANT, " forwarding NAK to %d\n", rmst_ptr->last_hop_);
00831 attrs.push_back(RmstFragAttr.make(NRAttribute::IS, frag_no));
00832
00833 if(caching_mode_){
00834
00835 rmst_ptr->putHole(frag_no);
00836 NakData *nak_ptr = rmst_ptr->getHole(frag_no);
00837
00838
00839 nak_ptr->tmv.tv_sec -= 1;
00840 }
00841 }
00842 else
00843 DiffPrint(DEBUG_IMPORTANT, " not forwarding NAK! - no place to send it!\n");
00844 }
00845 else{
00846
00847 NakMsgData nak_msg_data;
00848 NakList::iterator nak_list_iterator;
00849 nak_list_iterator = nak_list_.begin();
00850 while(nak_list_iterator != nak_list_.end()){
00851 if((nak_list_iterator->rmst_no_ == rmst_no) &&
00852 (nak_list_iterator->frag_no_ == frag_no))
00853 break;
00854 nak_list_iterator++;
00855 }
00856 if(nak_list_iterator == nak_list_.end()){
00857 DiffPrint(DEBUG_SOME_DETAILS,
00858 " adding NAK for rmst %d frag %d to nak_list_\n", rmst_no, frag_no);
00859 nak_msg_data.rmst_no_ = rmst_no;
00860 nak_msg_data.frag_no_ = frag_no;
00861 nak_list_.push_back(nak_msg_data);
00862 if(!send_timer_active_){
00863 TimerCallback *send_timer;
00864
00865 DiffPrint(DEBUG_LOTS_DETAILS,
00866 " Set a SEND_TIMER for reinforced rmst_no %d\n", rmst_no);
00867 send_timer = new RmstTimeout(this, rmst_no, SEND_TIMER);
00868
00869 send_timer_handle_ =
00870 ((DiffusionRouting *)dr_)->addTimer(SEND_INTERVAL, send_timer);
00871 send_timer_active_ = true;
00872 }
00873 }
00874 }
00875 place++;
00876 }
00877
00878 if (forwarding_nak){
00879 attrs.push_back(RmstTsprtCtlAttr.make(NRAttribute::IS, NAK_REQ));
00880 attrs.push_back(RmstIdAttr.make(NRAttribute::IS, rmst_no));
00881 nak_msg = new Message(DIFFUSION_VERSION, DATA, 0, 0,
00882 attrs.size(), pkt_count_, rdm_id_, rmst_ptr->last_hop_,
00883 LOCALHOST_ADDR);
00884 nak_msg->msg_attr_vec_ = CopyAttrs(&attrs);
00885 ((DiffusionRouting *)dr_)->sendMessage(nak_msg, filter_handle_, 1);
00886 pkt_count_++;
00887 delete nak_msg;
00888 ClearAttrs(&attrs);
00889 }
00890 break;
00891
00892 case(EXP_REQ):
00893 DiffPrint(DEBUG_IMPORTANT, " Got an EXP_REQ\n");
00894 if(!rmst_ptr->local_source_){
00895 DiffPrint(DEBUG_SOME_DETAILS, " Filter forwarding EXP_REQ for Rmst %d\n", rmst_no);
00896
00897 if (rmst_ptr->reinf_)
00898 sendExpReqUpstream(rmst_ptr);
00899 }
00900 else{
00901
00902
00903
00904
00905 frag_attr = RmstFragAttr.find(msg->msg_attr_vec_);
00906 frag_no = frag_attr->getVal();
00907 DiffPrint(DEBUG_IMPORTANT, " Source got EXP request for Rmst %d\n", rmst_no);
00908 if (rmst_ptr->reinf_)
00909 processExpReq(rmst_ptr, frag_no);
00910 else
00911 DiffPrint(DEBUG_IMPORTANT, " EXP request for non-reinforced Rmst %d\n", rmst_no);
00912 }
00913 break;
00914
00915 default:
00916 break;
00917 }
00918 return;
00919 }
00920
00921 void RmstFilter::setupNak(int rmst_id)
00922 {
00923 NRAttrVec attrs;
00924 int frag_id;
00925 NakData *nak_ptr;
00926 Rmst *rmst_ptr;
00927 int nak_count = 0;
00928
00929 Int2Rmst::iterator rmst_iterator = rmst_map_.find(rmst_id);
00930 if(rmst_iterator != rmst_map_.end())
00931 rmst_ptr = (*rmst_iterator).second;
00932 else{
00933 DiffPrint(DEBUG_IMPORTANT, "setupNak - can't find Rmst %d\n", rmst_id);
00934 return;
00935 }
00936
00937 Int2Nak::iterator hole_iter = rmst_ptr->hole_map_.begin();
00938 bool send_new_nak = false;
00939 timeval cur_time;
00940
00941
00942
00943
00944
00945
00946 GetTime (&cur_time);
00947
00948
00949 while(hole_iter != rmst_ptr->hole_map_.end()){
00950 frag_id = (*hole_iter).first;
00951 nak_ptr = (*hole_iter).second;
00952 DiffPrint(DEBUG_SOME_DETAILS,
00953 " setupNak - found hole rmst_id %d, frag %d\n", rmst_id, frag_id);
00954
00955
00956
00957 if (!nak_ptr->nak_sent_){
00958 if ( (cur_time.tv_sec - nak_ptr->tmv.tv_sec) > 3 ){
00959 nak_ptr->nak_sent_ = true;
00960 nak_ptr->send_nak_ = true;
00961 send_new_nak = true;
00962 }
00963 else
00964 DiffPrint(DEBUG_SOME_DETAILS,
00965 " setupNak - hole %d not old enough to NAK\n", frag_id);
00966 }
00967
00968
00969 else if ( (cur_time.tv_sec - nak_ptr->tmv.tv_sec) > NAK_RESPONSE_WAIT ){
00970 DiffPrint(DEBUG_SOME_DETAILS, " setupNak - hole %d has an overdue NAK\n", frag_id);
00971 nak_ptr->send_nak_ = true;
00972 send_new_nak = true;
00973 }
00974 hole_iter++;
00975 }
00976
00977 if (send_new_nak){
00978 Message *nak_msg;
00979 if ( rmst_ptr->last_hop_ == LOCALHOST_ADDR ){
00980 DiffPrint(DEBUG_IMPORTANT, " can't send NAK, no last_hop_!\n");
00981 return;
00982 }
00983
00984 hole_iter = rmst_ptr->hole_map_.begin();
00985 while( (hole_iter != rmst_ptr->hole_map_.end()) && (nak_count <= 10) ){
00986 frag_id = (*hole_iter).first;
00987 nak_ptr = (*hole_iter).second;
00988
00989 if ( nak_ptr->send_nak_ ){
00990 nak_ptr->send_nak_ = false;
00991 DiffPrint(DEBUG_SOME_DETAILS,
00992 " setupNak - adding a NAK for frag_id %d to attrs\n", frag_id);
00993 attrs.push_back(RmstFragAttr.make(NRAttribute::IS, frag_id));
00994 GetTime(&(nak_ptr->tmv));
00995 nak_count++;
00996 }
00997 hole_iter++;
00998 }
00999 attrs.push_back(RmstTsprtCtlAttr.make(NRAttribute::IS, NAK_REQ));
01000 attrs.push_back(RmstIdAttr.make(NRAttribute::IS, rmst_id));
01001
01002
01003 nak_msg = new Message(DIFFUSION_VERSION, DATA, 0, 0, attrs.size(),
01004 pkt_count_, rdm_id_, rmst_ptr->last_hop_, LOCALHOST_ADDR);
01005 pkt_count_++;
01006 nak_msg->msg_attr_vec_ = CopyAttrs(&attrs);
01007
01008 DiffPrint(DEBUG_IMPORTANT, " Sending NAK_REQ to node %d\n", rmst_ptr->last_hop_);
01009 ((DiffusionRouting *)dr_)->sendMessage(nak_msg, filter_handle_, 1);
01010
01011 delete nak_msg;
01012 ClearAttrs(&attrs);
01013
01014
01015 GetTime(&rmst_ptr->last_nak_time_);
01016
01017 }
01018 else
01019 DiffPrint(DEBUG_SOME_DETAILS, " setupNak - no need for a new NAK for rmst_id %d\n", rmst_id);
01020 return;
01021 }
01022
01023 void RmstFilter::processExpReq(Rmst *rmst_ptr, int frag_no)
01024 {
01025 NakList::iterator nak_list_iterator;
01026 SendList::iterator send_list_iterator;
01027 int rmst_no = rmst_ptr->rmst_no_;
01028
01029 DiffPrint(DEBUG_IMPORTANT, " processExpReq called for rmstId %d, frag_no %d\n", rmst_no, frag_no);
01030
01031
01032 rmst_ptr->reinf_ = false;
01033 rmst_ptr->pkts_sent_ = 0;
01034
01035
01036 if(rmst_ptr->ack_timer_active_){
01037 ((DiffusionRouting *)dr_)->removeTimer(rmst_ptr->ack_timer_handle_);
01038 rmst_ptr->ack_timer_active_ = false;
01039 }
01040
01041
01042 nak_list_iterator = nak_list_.begin();
01043 while (nak_list_iterator != nak_list_.end()){
01044 if (nak_list_iterator->rmst_no_ == rmst_no){
01045 DiffPrint(DEBUG_SOME_DETAILS,
01046 " processExpReq erasing frag_no %d from nak_list_\n", nak_list_iterator->frag_no_);
01047 nak_list_iterator = nak_list_.erase(nak_list_iterator);
01048 }
01049 else
01050 nak_list_iterator++;
01051 }
01052
01053 DiffPrint(DEBUG_LOTS_DETAILS, " processExpReq done with nak_list_ for rmstId %d\n", rmst_no);
01054
01055
01056
01057
01058 if ( (frag_no == rmst_ptr->max_frag_) && (rmst_ptr->max_frag_ > 0) ){
01059 frag_no--;
01060 DiffPrint(DEBUG_IMPORTANT, " processExpReq decrements frag_no to %d\n", frag_no);
01061 }
01062
01063
01064 send_list_iterator = send_list_.begin();
01065 while (send_list_iterator != send_list_.end()){
01066 if (send_list_iterator->rmst_no_ == rmst_no)
01067 break;
01068 send_list_iterator++;
01069 }
01070 if (send_list_iterator != send_list_.end()){
01071 send_list_iterator->exp_base_ = frag_no;
01072 DiffPrint(DEBUG_SOME_DETAILS, " processExpReq sets send_list_ expBase to %d\n", frag_no);
01073 send_list_iterator->last_frag_sent_ = frag_no-1;
01074 }
01075 else{
01076 SendMsgData new_send_msg;
01077 DiffPrint(DEBUG_SOME_DETAILS,
01078 " processExpReq creating new send_list_ entry for rmstId %d\n", rmst_no);
01079 DiffPrint(DEBUG_SOME_DETAILS,
01080 " processExpReq sets send_list_ expBase to %d\n", frag_no);
01081 new_send_msg.rmst_no_ = rmst_no;
01082 new_send_msg.exp_base_ = frag_no;
01083 new_send_msg.last_frag_sent_ = frag_no-1;
01084 send_list_.push_front(new_send_msg);
01085 if(!send_timer_active_){
01086 TimerCallback *send_timer;
01087
01088 DiffPrint(DEBUG_SOME_DETAILS, " Set a SEND_TIMER for reinforced rmst_no %d\n", rmst_no);
01089 send_timer = new RmstTimeout(this, rmst_no, SEND_TIMER);
01090
01091 send_timer_handle_ = ((DiffusionRouting *)dr_)->addTimer(SEND_INTERVAL, send_timer);
01092 send_timer_active_ = true;
01093 }
01094 }
01095 }
01096
01097 handle RmstFilter::setupFilter()
01098 {
01099 NRAttrVec attrs;
01100 handle h;
01101
01102
01103 attrs.push_back(NRClassAttr.make(NRAttribute::IS,
01104 NRAttribute::INTEREST_CLASS));
01105 h = ((DiffusionRouting *)dr_)->addFilter(&attrs, RMST_FILTER_PRIORITY, fcb_);
01106 ClearAttrs(&attrs);
01107 return h;
01108 }
01109
01110 void RmstFilter::run()
01111 {
01112 #ifdef NS_DIFFUSION
01113 TimerCallback *stat_timer;
01114 filter_handle_ = setupFilter();
01115 DiffPrint(DEBUG_LOTS_DETAILS, "RmstFilter:: subscribed to all, received handle %d\n",
01116 (int)filter_handle_);
01117
01118 DiffPrint(DEBUG_LOTS_DETAILS, "RmstFilter constructor: start cleanup timer\n");
01119 stat_timer = new RmstTimeout(this, -1, CLEANUP_TIMER);
01120 stat_timer_handle_ = ((DiffusionRouting *)dr_)->addTimer(CLEANUP_INTERVAL, stat_timer);
01121 #else
01122
01123 while(1){
01124 sleep(1000);
01125 }
01126 #endif // NS_DIFFUSION
01127 }
01128
01129 RmstTimeout::RmstTimeout(RmstFilter *rmst_flt, int no, int type)
01130 {
01131 filter_ = rmst_flt;
01132 rmst_no_ = no;
01133 timer_type_ = type;
01134 }
01135
01136 int RmstTimeout::expire()
01137 {
01138 int retval;
01139
01140 retval = filter_->processTimer(rmst_no_, timer_type_);
01141 if(retval == -1)
01142 delete this;
01143
01144 return retval;
01145 }
01146
01147 int RmstFilter::processTimer(int rmst_no, int timer_type)
01148 {
01149 Rmst *rmst_ptr;
01150 void *frag_ptr;
01151 int frag_no;
01152 Int2Rmst::iterator rmst_iterator;
01153 timeval cur_time;
01154
01155 GetTime (&cur_time);
01156
01157 switch (timer_type){
01158
01159 case SEND_TIMER:
01160 DiffPrint(DEBUG_SOME_DETAILS, "RmstFilter::processTimer SEND_TIMER");
01161 PrintTime(&cur_time);
01162
01163
01164
01165
01166 if (!nak_list_.empty()){
01167 Message *nak_resp;
01168 NRAttrVec nak_data_attrs;
01169 NakMsgData nak_msg_data = nak_list_.front();
01170 rmst_no = nak_msg_data.rmst_no_;
01171 frag_no = nak_msg_data.frag_no_;
01172
01173 rmst_iterator = rmst_map_.find(rmst_no);
01174 if(rmst_iterator != rmst_map_.end()){
01175 rmst_ptr = (*rmst_iterator).second;
01176
01177
01178 if (rmst_ptr->reinf_){
01179 GetTime(&rmst_ptr->last_data_time_);
01180 rmst_ptr->pkts_sent_++;
01181 nak_data_attrs.push_back(RmstTargetAttr.make(NRAttribute::IS,
01182 rmst_ptr->target_str_));
01183 nak_data_attrs.push_back(RmstTsprtCtlAttr.make(NRAttribute::IS,
01184 RMST_RESP));
01185 nak_data_attrs.push_back(RmstFragAttr.make(NRAttribute::IS, frag_no));
01186
01187 nak_data_attrs.push_back(RmstPktsSentAttr.make(NRAttribute::IS,
01188 rmst_ptr->pkts_sent_));
01189 nak_data_attrs.push_back(RmstIdAttr.make(NRAttribute::IS, rmst_no));
01190
01191
01192 frag_ptr = rmst_ptr->getFrag(frag_no);
01193 if (frag_no == rmst_ptr->max_frag_)
01194 nak_data_attrs.push_back(RmstDataAttr.make(NRAttribute::IS,
01195 frag_ptr, rmst_ptr->max_frag_len_));
01196 else
01197 nak_data_attrs.push_back(RmstDataAttr.make(NRAttribute::IS,
01198 frag_ptr, MAX_FRAG_SIZE));
01199
01200 DiffPrint(DEBUG_IMPORTANT, " Filter sending Data for NAKed frag %d of Rmst %d\n",
01201 frag_no, rmst_no);
01202
01203 nak_resp = new Message(DIFFUSION_VERSION, DATA, 0, 0,
01204 nak_data_attrs.size(), pkt_count_, rdm_id_,
01205 LOCALHOST_ADDR, LOCALHOST_ADDR);
01206 nak_resp->msg_attr_vec_ = CopyAttrs(&nak_data_attrs);
01207 ((DiffusionRouting *)dr_)->sendMessage(nak_resp, filter_handle_);
01208 pkt_count_++;
01209 delete nak_resp;
01210 ClearAttrs(&nak_data_attrs);
01211 }
01212 else
01213 DiffPrint(DEBUG_IMPORTANT,
01214 "RmstFilter::processTimer sees non-reinforced path for NAK on rmst %d!\n",
01215 rmst_no);
01216 }
01217 else
01218 DiffPrint(DEBUG_IMPORTANT, "RmstFilter::processTimer can't find Rmst %d for NAK!\n",
01219 rmst_no);
01220 nak_list_.pop_front();
01221 }
01222
01223 else if (!send_list_.empty()){
01224 int8_t msg_type;
01225 int action = DO_NOTHING;
01226
01227 NRAttrVec data_attrs;
01228
01229 SendMsgData send_data = send_list_.front();
01230 rmst_no = send_data.rmst_no_;
01231 rmst_iterator = rmst_map_.find(rmst_no);
01232 if(rmst_iterator == rmst_map_.end())
01233 action = DELETE_FROM_QUEUE;
01234 else{
01235 rmst_ptr = (*rmst_iterator).second;
01236 if ((send_data.last_frag_sent_ == rmst_ptr->max_frag_) && rmst_ptr->reinf_)
01237 action = DELETE_FROM_QUEUE;
01238 else if ( (send_data.last_frag_sent_ == send_data.exp_base_) &&
01239 (!rmst_ptr->reinf_) && (exp_gap_ < 10) ){
01240 action = DO_NOTHING;
01241 exp_gap_++;
01242 }
01243 else
01244 action = SEND_NEXT_FRAG;
01245 }
01246
01247 switch (action){
01248
01249 case(DELETE_FROM_QUEUE):
01250
01251 send_list_.pop_front();
01252 break;
01253
01254 case(SEND_NEXT_FRAG):
01255 send_list_.pop_front();
01256 if (rmst_ptr->reinf_){
01257 send_data.last_frag_sent_++;
01258 frag_no = send_data.last_frag_sent_;
01259 }
01260 else{
01261 frag_no = send_data.exp_base_;
01262 send_data.last_frag_sent_ = frag_no;
01263 }
01264 send_list_.push_front(send_data);
01265 rmst_ptr->max_frag_sent_ = frag_no;
01266
01267 DiffPrint(DEBUG_IMPORTANT, " Source Filter sending frag %d of Rmst %d\n",
01268 frag_no, rmst_no);
01269
01270
01271 data_attrs.push_back(RmstTargetAttr.make(NRAttribute::IS, rmst_ptr->target_str_));
01272 data_attrs.push_back(RmstTsprtCtlAttr.make(NRAttribute::IS, RMST_RESP));
01273 data_attrs.push_back(RmstFragAttr.make(NRAttribute::IS, frag_no));
01274
01275
01276
01277 if(frag_no == 0)
01278 data_attrs.push_back(RmstMaxFragAttr.make(NRAttribute::IS,
01279 rmst_ptr->max_frag_));
01280 else if (frag_no == rmst_ptr->max_frag_){
01281 rmst_ptr->pkts_sent_++;
01282 data_attrs.push_back(RmstPktsSentAttr.make(NRAttribute::IS,
01283 rmst_ptr->pkts_sent_));
01284 }
01285 else
01286 rmst_ptr->pkts_sent_++;
01287
01288 data_attrs.push_back(RmstIdAttr.make(NRAttribute::IS, rmst_no));
01289
01290
01291
01292 frag_ptr = rmst_ptr->getFrag(frag_no);
01293 if (rmst_ptr->max_frag_ == frag_no)
01294 data_attrs.push_back(RmstDataAttr.make(NRAttribute::IS,
01295 frag_ptr, rmst_ptr->max_frag_len_));
01296 else
01297 data_attrs.push_back(RmstDataAttr.make(NRAttribute::IS,
01298 frag_ptr, MAX_FRAG_SIZE));
01299
01300 if (frag_no == send_data.exp_base_){
01301 ExpLog exp_msg;
01302 union LlToInt key;
01303
01304
01305 key.int_val_[0] = pkt_count_;
01306 key.int_val_[1] = rdm_id_;
01307 DiffPrint(DEBUG_LOTS_DETAILS, " Key = %llx\n", key.ll_val_);
01308 exp_msg.rmst_no_ = rmst_no;
01309 exp_msg.last_hop_ = LOCALHOST_ADDR;
01310 exp_map_.insert(Key2ExpLog::value_type(key.ll_val_, exp_msg));
01311 msg_type = EXPLORATORY_DATA;
01312 rmst_ptr->reinf_ = false;
01313 rmst_ptr->pkts_sent_ = 0;
01314 rmst_ptr->naks_rec_ = 0;
01315 exp_gap_ = 0;
01316 DiffPrint(DEBUG_IMPORTANT,
01317 " Source Filter sending EXPLORATORY frag %d of Rmst %d\n",
01318 frag_no, rmst_no);
01319 }
01320 else
01321 msg_type = DATA;
01322
01323 Message *new_frag;
01324 new_frag = new Message(DIFFUSION_VERSION, msg_type, 0, 0,
01325 data_attrs.size(), pkt_count_, rdm_id_,
01326 LOCALHOST_ADDR, LOCALHOST_ADDR);
01327 new_frag->msg_attr_vec_ = CopyAttrs(&data_attrs);
01328 ((DiffusionRouting *)dr_)->sendMessage(new_frag, filter_handle_);
01329 pkt_count_++;
01330 delete new_frag;
01331 ClearAttrs(&data_attrs);
01332
01333 GetTime(&rmst_ptr->last_data_time_);
01334
01335
01336 if ( (rmst_ptr->max_frag_ == frag_no) &&
01337 (rmst_ptr->ack_timer_active_ == false) && (rmst_ptr->local_source_) ){
01338 TimerCallback *rmst_timer;
01339 DiffPrint(DEBUG_SOME_DETAILS, " Set an ACK_TIMER at source for rmst_no %d\n",
01340 rmst_no);
01341 rmst_timer = new RmstTimeout(this, rmst_no, ACK_TIMER);
01342
01343 rmst_ptr->ack_timer_handle_ =
01344 ((DiffusionRouting *)dr_)->addTimer(ACK_INTERVAL, rmst_timer);
01345 rmst_ptr->ack_timer_active_ = true;
01346 }
01347 break;
01348
01349 case(DO_NOTHING):
01350 DiffPrint(DEBUG_LOTS_DETAILS, " Nothing to do\n");
01351 break;
01352 }
01353 }
01354
01355 if (nak_list_.empty() && send_list_.empty()){
01356 DiffPrint(DEBUG_LOTS_DETAILS, " Cancelling SEND_TIMER, no NAKS or data to send\n");
01357 send_timer_active_ = false;
01358 return -1;
01359 }
01360 else
01361 return 0;
01362 break;
01363
01364 case WATCHDOG_TIMER:
01365 DiffPrint(DEBUG_SOME_DETAILS, "RmstFilter::processTimer WATCHDOG_TIMER for Rmst %d", rmst_no);
01366 PrintTime(&cur_time);
01367 rmst_iterator = rmst_map_.find(rmst_no);
01368 if(rmst_iterator != rmst_map_.end())
01369 rmst_ptr = (*rmst_iterator).second;
01370 else{
01371 DiffPrint(DEBUG_IMPORTANT,
01372 "RmstFilter::processTimer can't find Rmst %d for WATCHDOG, cancell timer!\n",
01373 rmst_no);
01374 return -1;
01375 }
01376
01377 if(rmst_ptr->cancel_watchdog_){
01378 DiffPrint(DEBUG_SOME_DETAILS,
01379 " processTimer cancelling WATCHDOG_TIMER for Rmst %d\n", rmst_no);
01380 rmst_ptr->watchdog_active_ = false;
01381 rmst_ptr->cancel_watchdog_ = false;
01382 return -1;
01383 }
01384
01385 if (rmst_ptr->wait_for_new_path_){
01386 DiffPrint (DEBUG_IMPORTANT, " WATCHDOG_TIMER sees wait_for_new_path_ - suspend NAKs\n");
01387 return 0;
01388 }
01389
01390
01391
01392 if (rmst_ptr->sent_exp_req_){
01393 int exp_time;
01394 exp_time = cur_time.tv_sec - rmst_ptr->exp_req_time_.tv_sec;
01395 DiffPrint(DEBUG_SOME_DETAILS,
01396 " Node sent an EXP_REQ: time since last exp = %d\n", exp_time);
01397 if( exp_time > 20){
01398
01399 DiffPrint(DEBUG_IMPORTANT, " Node resends EXP_REQ up blacklisted stream!\n");
01400 sendExpReqUpstream(rmst_ptr);
01401 GetTime(&rmst_ptr->exp_req_time_);
01402 }
01403 else
01404 DiffPrint(DEBUG_LOTS_DETAILS, " Node waits to send another EXP_REQ\n");
01405 return 0;
01406 }
01407
01408 if (rmst_ptr->local_source_){
01409 if(rmst_ptr->acked_){
01410 DiffPrint(DEBUG_IMPORTANT,
01411 " WATCHDOG_TIMER Local Source sees acked state - cancel timer\n");
01412 return -1;
01413 }
01414 else{
01415 DiffPrint(DEBUG_LOTS_DETAILS, " WATCHDOG_TIMER Local Source sees rmst not acked\n");
01416 return 0;
01417 }
01418 }
01419
01420
01421 if( ((cur_time.tv_sec - rmst_ptr->last_data_time_.tv_sec) > NEXT_FRAG_WAIT) &&
01422 (!rmst_ptr->rmstComplete()) ){
01423 int newHole = (rmst_ptr->max_frag_rec_)+1;
01424 if ( (newHole <= rmst_ptr->max_frag_) && (!rmst_ptr->inHoleMap(newHole)) ){
01425 DiffPrint(DEBUG_SOME_DETAILS, " WATCHDOG_TIMER adds new hole, frag %d\n",
01426 newHole);
01427 rmst_ptr->putHole(newHole);
01428 NakData *nak_ptr = rmst_ptr->getHole(newHole);
01429
01430 nak_ptr->tmv.tv_sec -= 4;
01431 }
01432 }
01433
01434 if(rmst_ptr->holeMapEmpty()){
01435
01436 DiffPrint(DEBUG_SOME_DETAILS, " WATCHDOG_TIMER sees No holes\n");
01437 return 0;
01438 }
01439 else{
01440
01441
01442
01443
01444 DiffPrint(DEBUG_SOME_DETAILS, " WATCHDOG_TIMER sees holes - check times.\n");
01445 if (rmst_ptr->reinf_)
01446 setupNak(rmst_no);
01447
01448 return 0;
01449 }
01450 break;
01451
01452 case ACK_TIMER:
01453 DiffPrint(DEBUG_SOME_DETAILS, "RmstFilter::processTimer ACK_TIMER for Rmst %d", rmst_no);
01454 PrintTime(&cur_time);
01455 rmst_iterator = rmst_map_.find(rmst_no);
01456 if(rmst_iterator != rmst_map_.end())
01457 rmst_ptr = (*rmst_iterator).second;
01458 else{
01459 DiffPrint(DEBUG_IMPORTANT,
01460 "RmstFilter::processTimer can't find Rmst %d for ACK_TIMER, cancell timer!\n",
01461 rmst_no);
01462 return -1;
01463 }
01464
01465 DiffPrint(DEBUG_SOME_DETAILS, "RmstFilter::processTimer ACK_TIMER, at source for rmst_no %d\n",
01466 rmst_no);
01467
01468 if (rmst_ptr->acked_){
01469 DiffPrint(DEBUG_IMPORTANT, "RmstFilter::processTimer cancel ACK_TIMER, Rmst %d ACKed\n",
01470 rmst_no);
01471 rmst_ptr->ack_timer_active_ = false;
01472 return -1;
01473 }
01474
01475
01476 if( (cur_time.tv_sec - rmst_ptr->last_data_time_.tv_sec) > ACK_WAIT ){
01477 NRAttrVec attrs;
01478 int8_t msg_type;
01479 DiffPrint(DEBUG_IMPORTANT,
01480 "RmstFilter::processTimer ACK_TIMER, waited too long for Rmst %d ACK!\n", rmst_no);
01481 if(rmst_ptr->reinf_ && !rmst_ptr->resent_last_data_){
01482
01483 DiffPrint(DEBUG_SOME_DETAILS,
01484 "RmstFilter::processTimer ACK_TIMER, resend last packet as DATA\n");
01485 msg_type = DATA;
01486 rmst_ptr->resent_last_data_ = true;
01487 }
01488 else if(rmst_ptr->resent_last_data_ && !rmst_ptr->resent_last_exp_){
01489 ExpLog exp_msg;
01490 union LlToInt key;
01491
01492 DiffPrint(DEBUG_IMPORTANT,
01493 "RmstFilter::processTimer ACK_TIMER, resend last packet as EXPLORATORY_DATA\n");
01494
01495
01496 key.int_val_[0] = pkt_count_;
01497 key.int_val_[1] = rdm_id_;
01498 DiffPrint(DEBUG_LOTS_DETAILS, " Key = %llx\n", key.ll_val_);
01499 exp_msg.rmst_no_ = rmst_no;
01500 exp_msg.last_hop_ = LOCALHOST_ADDR;
01501 exp_map_.insert(Key2ExpLog::value_type(key.ll_val_, exp_msg));
01502 msg_type = EXPLORATORY_DATA;
01503 rmst_ptr->reinf_ = false;
01504 rmst_ptr->naks_rec_ = 0;
01505 rmst_ptr->pkts_sent_ = 0;
01506 rmst_ptr->resent_last_exp_ = true;
01507 }
01508 else if(rmst_ptr->resent_last_data_ && rmst_ptr->resent_last_exp_ && rmst_ptr->reinf_){
01509
01510 DiffPrint(DEBUG_IMPORTANT,
01511 "RmstFilter::processTimer ACK_TIMER, resend last packet on new reinf path as DATA\n");
01512 msg_type = DATA;
01513 rmst_ptr->resent_last_data_ = false;
01514 rmst_ptr->resent_last_exp_ = false;
01515 }
01516 else{
01517 ExpLog exp_msg;
01518 union LlToInt key;
01519 DiffPrint(DEBUG_IMPORTANT,
01520 "RmstFilter::processTimer ACK_TIMER, resent last packet as EXP and no reinforced path, Try again!\n");
01521
01522
01523 key.int_val_[0] = pkt_count_;
01524 key.int_val_[1] = rdm_id_;
01525 DiffPrint(DEBUG_LOTS_DETAILS, " Key = %llx\n", key.ll_val_);
01526 exp_msg.rmst_no_ = rmst_no;
01527 exp_msg.last_hop_ = LOCALHOST_ADDR;
01528 exp_map_.insert(Key2ExpLog::value_type(key.ll_val_, exp_msg));
01529 msg_type = EXPLORATORY_DATA;
01530 }
01531 attrs.push_back(RmstTargetAttr.make(NRAttribute::IS, rmst_ptr->target_str_));
01532 attrs.push_back(RmstTsprtCtlAttr.make(NRAttribute::IS, RMST_RESP));
01533 attrs.push_back(RmstFragAttr.make(NRAttribute::IS, rmst_ptr->max_frag_));
01534 attrs.push_back(RmstIdAttr.make(NRAttribute::IS, rmst_ptr->rmst_no_));
01535 frag_ptr = rmst_ptr->getFrag(rmst_ptr->max_frag_);
01536 attrs.push_back(RmstDataAttr.make(NRAttribute::IS,
01537 frag_ptr, rmst_ptr->max_frag_len_));
01538 Message *new_frag;
01539 new_frag = new Message(DIFFUSION_VERSION, msg_type, 0, 0, attrs.size(), pkt_count_,
01540 rdm_id_, LOCALHOST_ADDR, LOCALHOST_ADDR);
01541 new_frag->msg_attr_vec_ = CopyAttrs(&attrs);
01542 ((DiffusionRouting *)dr_)->sendMessage(new_frag, filter_handle_);
01543 pkt_count_++;
01544 delete new_frag;
01545 ClearAttrs(&attrs);
01546
01547 GetTime(&rmst_ptr->last_data_time_);
01548 }
01549 return 0;
01550 break;
01551
01552 case CLEANUP_TIMER:
01553 DiffPrint(DEBUG_SOME_DETAILS, "RmstFilter::processTimer CLEANUP_TIMER");
01554 PrintTime(&cur_time);
01555
01556 DiffPrint(DEBUG_IMPORTANT, " CLEANUP_TIMER called\n");
01557 rmst_iterator = rmst_map_.begin();
01558 while(rmst_iterator != rmst_map_.end()){
01559 rmst_ptr = (*rmst_iterator).second;
01560
01561 DiffPrint(DEBUG_SOME_DETAILS,
01562 " CLEANUP_TIMER:: rmst_no %d : pkts_sent_ = %d, pkts_rec_ = %d, last_hop_pkts_sent_ = %d\n",
01563 rmst_ptr->rmst_no_, rmst_ptr->pkts_sent_, rmst_ptr->pkts_rec_, rmst_ptr->last_hop_pkts_sent_);
01564
01565 if((!rmst_ptr->reinf_)&&(!rmst_ptr->acked_)&&(!rmst_ptr->local_source_)&&(!local_sink_)){
01566 if( (cur_time.tv_sec - rmst_ptr->last_data_time_.tv_sec) > LONG_CLEANUP_WAIT )
01567 cleanUpRmst(rmst_ptr);
01568 }
01569 else if (rmst_ptr->acked_){
01570 if ( ( (cur_time.tv_sec - rmst_ptr->last_data_time_.tv_sec) > SHORT_CLEANUP_WAIT ) &&
01571 ( (cur_time.tv_sec - rmst_ptr->last_nak_time_.tv_sec) > SHORT_CLEANUP_WAIT ) )
01572 cleanUpRmst(rmst_ptr);
01573 }
01574
01575 rmst_iterator++;
01576 }
01577
01578
01579 if (!black_list_.empty()){
01580 if ( (cur_time.tv_sec - last_data_rec_.tv_sec) > RMST_BLACKLIST_WAIT ){
01581 DiffPrint(DEBUG_IMPORTANT, " clearing black_list_!\n");
01582 ((DiffusionRouting *)dr_)->clearBlacklist();
01583 black_list_.clear();
01584 }
01585 }
01586
01587 if (local_sink_){
01588 if ( (cur_time.tv_sec - last_sink_time_.tv_sec) > SINK_REFRESH_WAIT ){
01589 DiffPrint(DEBUG_IMPORTANT, " local sink timed out\n");
01590 local_sink_ = false;
01591 }
01592 else
01593 DiffPrint(DEBUG_IMPORTANT, " local sink still alive.\n");
01594 }
01595
01596 return 0;
01597 break;
01598
01599 default:
01600 break;
01601 }
01602 return -1;
01603 }
01604
01605 void RmstFilter::sendRmstToSink(Rmst *rmst_ptr)
01606 {
01607 NRAttrVec attrs;
01608 Message *rmst_msg;
01609 NRSimpleAttribute<void *> *rmst_data_attr;
01610 NRSimpleAttribute<int> *frag_number_attr;
01611 void *frag_ptr;
01612 int size, i;
01613
01614 DiffPrint(DEBUG_IMPORTANT, "RmstFilter::sendRmstToSink - sending rmst %d to local sink\n",
01615 rmst_ptr->rmst_no_);
01616
01617
01618 attrs.push_back(RmstTargetAttr.make(NRAttribute::IS, rmst_ptr->target_str_));
01619 attrs.push_back(RmstTsprtCtlAttr.make(NRAttribute::IS, RMST_RESP));
01620 attrs.push_back(RmstIdAttr.make(NRAttribute::IS, rmst_ptr->rmst_no_));
01621 attrs.push_back(RmstMaxFragAttr.make(NRAttribute::IS, rmst_ptr->max_frag_));
01622 frag_number_attr = RmstFragAttr.make(NRAttribute::IS, 0);
01623 attrs.push_back(frag_number_attr);
01624
01625 if (rmst_ptr->max_frag_ == 0)
01626 size = rmst_ptr->max_frag_len_;
01627 else
01628 size = MAX_FRAG_SIZE;
01629 frag_ptr = rmst_ptr->getFrag(0);
01630 rmst_data_attr = RmstDataAttr.make(NRAttribute::IS, frag_ptr, size);
01631 attrs.push_back(rmst_data_attr);
01632
01633
01634 rmst_msg = new Message(DIFFUSION_VERSION, DATA, 0, 0, attrs.size(),
01635 pkt_count_, rdm_id_, LOCALHOST_ADDR, LOCALHOST_ADDR);
01636 rmst_msg->next_hop_ = LOCALHOST_ADDR;
01637 rmst_msg->next_port_ = local_sink_port_;
01638 rmst_msg->msg_attr_vec_= CopyAttrs(&attrs);
01639 ((DiffusionRouting *)dr_)->sendMessage(rmst_msg, filter_handle_, 1);
01640 delete rmst_msg;
01641 pkt_count_++;
01642
01643
01644 for (i=1; i <= (rmst_ptr->max_frag_); i++){
01645 frag_number_attr->setVal(i);
01646 frag_ptr = rmst_ptr->getFrag(i);
01647 if(frag_ptr == NULL)
01648 DiffPrint(DEBUG_IMPORTANT, "RmstFilter::sendRmstToSink - got a null frag_ptr for frag!%d\n",
01649 i);
01650 else{
01651 if (rmst_ptr->max_frag_ == i)
01652 rmst_data_attr->setVal(frag_ptr, rmst_ptr->max_frag_len_);
01653 else
01654 rmst_data_attr->setVal(frag_ptr, MAX_FRAG_SIZE);
01655 }
01656 rmst_msg = new Message(DIFFUSION_VERSION, DATA, 0, 0, attrs.size(),
01657 pkt_count_, rdm_id_, LOCALHOST_ADDR, LOCALHOST_ADDR);
01658 rmst_msg->next_hop_ = LOCALHOST_ADDR;
01659 rmst_msg->next_port_ = local_sink_port_;
01660 rmst_msg->msg_attr_vec_= CopyAttrs(&attrs);
01661 ((DiffusionRouting *)dr_)->sendMessage(rmst_msg, filter_handle_, 1);
01662 delete rmst_msg;
01663 pkt_count_++;
01664 }
01665
01666 ClearAttrs(&attrs);
01667 }
01668
01669 void RmstFilter::sendAckToSource(Rmst *rmst_ptr)
01670 {
01671 NRAttrVec attrs;
01672 Message *ack_msg;
01673
01674 attrs.push_back(RmstTsprtCtlAttr.make(NRAttribute::IS, ACK_RESP));
01675 attrs.push_back(RmstIdAttr.make(NRAttribute::IS, rmst_ptr->rmst_no_));
01676
01677
01678 ack_msg = new Message(DIFFUSION_VERSION, DATA, 0, 0,
01679 attrs.size(), pkt_count_, rdm_id_, rmst_ptr->last_hop_,
01680 LOCALHOST_ADDR);
01681 ack_msg->msg_attr_vec_ = CopyAttrs(&attrs);
01682
01683 DiffPrint(DEBUG_IMPORTANT, " Sending ACK_RESP to node %d\n", rmst_ptr->last_hop_);
01684 ((DiffusionRouting *)dr_)->sendMessage(ack_msg, filter_handle_, 1);
01685 pkt_count_++;
01686 delete ack_msg;
01687 ClearAttrs(&attrs);
01688 }
01689
01690 void RmstFilter::sendExpReqUpstream(Rmst *rmst_ptr)
01691 {
01692 NRAttrVec attrs;
01693 Message *exp_msg;
01694
01695 attrs.push_back(RmstTsprtCtlAttr.make(NRAttribute::IS, EXP_REQ));
01696 attrs.push_back(RmstIdAttr.make(NRAttribute::IS, rmst_ptr->rmst_no_));
01697 attrs.push_back(RmstFragAttr.make(NRAttribute::IS, rmst_ptr->max_frag_rec_));
01698
01699 exp_msg = new Message(DIFFUSION_VERSION, DATA, 0, 0,
01700 attrs.size(), pkt_count_, rdm_id_, rmst_ptr->last_hop_,
01701 LOCALHOST_ADDR);
01702 exp_msg->msg_attr_vec_ = CopyAttrs(&attrs);
01703
01704 DiffPrint(DEBUG_IMPORTANT, " Sending EXP_REQ to node %d\n", rmst_ptr->last_hop_);
01705 ((DiffusionRouting *)dr_)->sendMessage(exp_msg, filter_handle_, 1);
01706 pkt_count_++;
01707 delete exp_msg;
01708 ClearAttrs(&attrs);
01709 }
01710
01711 void RmstFilter::sendContToSource(Rmst *rmst_ptr)
01712 {
01713 NRAttrVec attrs;
01714 Message *cont_msg;
01715 attrs.push_back(RmstTsprtCtlAttr.make(NRAttribute::EQ, RMST_CONT));
01716 attrs.push_back(NRClassAttr.make(NRAttribute::IS,
01717 NRAttribute::INTEREST_CLASS));
01718 DiffPrint(DEBUG_IMPORTANT, " Sending a RMST_CONT to source\n");
01719 cont_msg = new Message(DIFFUSION_VERSION, DATA, 0, 0, attrs.size(),
01720 pkt_count_, rdm_id_, LOCALHOST_ADDR, LOCALHOST_ADDR);
01721 cont_msg->msg_attr_vec_ = CopyAttrs(&attrs);
01722 cont_msg->next_hop_ = LOCALHOST_ADDR;
01723 cont_msg->next_port_ = rmst_ptr->local_source_port_;
01724 ((DiffusionRouting *)dr_)->sendMessage(cont_msg, filter_handle_, 1);
01725 pkt_count_++;
01726 delete cont_msg;
01727 ClearAttrs(&attrs);
01728 }
01729
01730 void RmstFilter::cleanUpRmst(Rmst *rmst_ptr)
01731 {
01732 int rmst_no = rmst_ptr->rmst_no_;
01733
01734 Int2Rmst::iterator rmst_iterator;
01735 Key2ExpLog::iterator exp_iterator;
01736 ExpLog exp_msg;
01737 rmst_no = rmst_ptr->rmst_no_;
01738 DiffPrint(DEBUG_IMPORTANT, " cleanUpRmst called to delete Rmst %d\n", rmst_no);
01739
01740 if(rmst_ptr->watchdog_active_)
01741 ((DiffusionRouting *)dr_)->removeTimer(rmst_ptr->watchdog_handle_);
01742 if(rmst_ptr->ack_timer_active_)
01743 ((DiffusionRouting *)dr_)->removeTimer(rmst_ptr->ack_timer_handle_);
01744
01745 rmst_iterator = rmst_map_.find(rmst_no);
01746 if(rmst_iterator != rmst_map_.end()){
01747 rmst_map_.erase(rmst_iterator);
01748 }
01749 delete rmst_ptr;
01750
01751
01752 exp_iterator = exp_map_.begin();
01753 while(exp_iterator != exp_map_.end()){
01754 exp_msg = (*exp_iterator).second;
01755 if(exp_msg.rmst_no_ == rmst_no){
01756 DiffPrint(DEBUG_LOTS_DETAILS, " cleanUpRmst deleting exp_map_ entry for Rmst %d\n", rmst_no);
01757 exp_map_.erase(exp_iterator);
01758 }
01759 exp_iterator++;
01760 }
01761 }
01762
01763 #ifdef NS_DIFFUSION
01764 RmstFilter::RmstFilter()
01765 {
01766 #else
01767 RmstFilter::RmstFilter(int argc, char **argv)
01768
01769 {
01770 TimerCallback *stat_timer;
01771
01772 parseCommandLine(argc, argv);
01773 dr_ = NR::createNR(diffusion_port_);
01774 #endif // NS_DIFFUSION
01775
01776 fcb_ = new RmstFilterCallback;
01777 fcb_->app_ = this;
01778 rdm_id_ = rand();
01779 pkt_count_ = rand();
01780 local_sink_ = false;
01781 caching_mode_ = false;
01782 send_timer_active_ = false;
01783
01784 DiffPrint(DEBUG_ALWAYS, "RmstFilter constructor: rdm_id_ = %x, pkt_count_ = %x\n",
01785 rdm_id_, pkt_count_);
01786
01787 #ifndef NS_DIFFUSION
01788 filter_handle_ = setupFilter();
01789 DiffPrint(DEBUG_LOTS_DETAILS, "RmstFilter:: subscribed to all, received handle %d\n",
01790 (int)filter_handle_);
01791
01792 DiffPrint(DEBUG_LOTS_DETAILS, "RmstFilter constructor: start cleanup timer\n");
01793 stat_timer = new RmstTimeout(this, -1, CLEANUP_TIMER);
01794 stat_timer_handle_ = ((DiffusionRouting *)dr_)->addTimer(CLEANUP_INTERVAL, stat_timer);
01795 #endif // !NS_DIFFUSION
01796 }
01797
01798 #ifndef NS_DIFFUSION
01799 int main(int argc, char **argv)
01800 {
01801 RmstFilter *app;
01802 app = new RmstFilter(argc, argv);
01803 app->run();
01804
01805 return 0;
01806 }
01807 #endif // !NS_DIFFUSION