rmst_filter.cc

Go to the documentation of this file.
00001 //
00002 // rmst_filter.cc  : RmstFilter Class Methods
00003 // authors         : Fred Stann
00004 //
00005 // Copyright (C) 2003 by the University of Southern California
00006 // $Id: rmst_filter.cc,v 1.3 2005/09/13 04:53:48 tomh Exp $
00007 //
00008 // This program is free software; you can redistribute it and/or
00009 // modify it under the terms of the GNU General Public License,
00010 // version 2, as published by the Free Software Foundation.
00011 //
00012 // This program is distributed in the hope that it will be useful,
00013 // but WITHOUT ANY WARRANTY; without even the implied warranty of
00014 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00015 // GNU General Public License for more details.
00016 //
00017 // You should have received a copy of the GNU General Public License along
00018 // with this program; if not, write to the Free Software Foundation, Inc.,
00019 // 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA.
00020 //
00021 // Linking this file statically or dynamically with other modules is making
00022 // a combined work based on this file.  Thus, the terms and conditions of
00023 // the GNU General Public License cover the whole combination.
00024 //
00025 // In addition, as a special exception, the copyright holders of this file
00026 // give you permission to combine this file with free software programs or
00027 // libraries that are released under the GNU LGPL and with code included in
00028 // the standard release of ns-2 under the Apache 2.0 license or under
00029 // otherwise-compatible licenses with advertising requirements (or modified
00030 // versions of such code, with unchanged license).  You may copy and
00031 // distribute such a system following the terms of the GNU GPL for this
00032 // file and the licenses of the other code concerned, provided that you
00033 // include the source code of that other code when and as the GNU GPL
00034 // requires distribution of source code.
00035 //
00036 // Note that people who make modified versions of this file are not
00037 // obligated to grant this special exception for their modified versions;
00038 // it is their choice whether to do so.  The GNU General Public License
00039 // gives permission to release a modified version without this exception;
00040 // this exception also makes it possible to release a modified version
00041 // which carries forward this exception.
00042 
00043 #include "rmst_filter.hh"
00044 
00045 char *rmstmsg_types[] = {"INTEREST", "POSITIVE REINFORCEMENT",
00046                      "NEGATIVE REINFORCEMENT", "DATA",
00047                      "EXPLORATORY DATA", "PUSH EXPLORATORY DATA",
00048                      "CONTROL", "REDIRECT"};
00049 
00050 #ifdef NS_DIFFUSION
00051 class DiffAppAgent;
00052 #endif // NS_DIFFUSION
00053 
00054 #ifdef NS_DIFFUSION
00055 static class RmstFilterClass : public TclClass {
00056 public:
00057   RmstFilterClass() : TclClass("Application/DiffApp/RmstFilter") {}
00058   TclObject* create(int argc, const char*const* argv) {
00059     return(new RmstFilter());
00060   }
00061 } class_rmst_filter;
00062 
00063 int RmstFilter::command(int argc, const char*const* argv) {
00064   //Tcl& tcl =  Tcl::instance();
00065   if (argc == 2) {
00066     if (strcmp(argv[1], "start") == 0) {
00067       run();
00068       return (TCL_OK);
00069     }
00070   }
00071   return (DiffApp::command(argc, argv));
00072 }
00073 #endif // NS_DIFFUSION
00074 
00075 class ReinfMessage {
00076 public:
00077   int32_t rdm_id_;
00078   int32_t pkt_num_;
00079 };
00080 
00081 
00082 // RmstFilterCallback::recv
00083 // Called by diffusion core when a message is available for this filter.
00084 // RmstFilterCallback is derived from the abstract class FilterCallback.
00085 // A pointer to the FilterCallback class is required in the API method "addFilter."
00086 
00087 void RmstFilterCallback::recv(Message *msg, handle h)
00088 {
00089   app_->recv(msg, h);
00090 }
00091 
00092 
00093 // RmstFilter::recv
00094 //
00095 // Called by the Callback::recv method.
00096 
00097 void RmstFilter::recv(Message *msg, handle h)
00098 {
00099   // Process the message handed to us by the core.
00100   // If true is returned we forward the message. Otherwise it dies here.
00101   if(processMessage(msg))
00102     ((DiffusionRouting *)dr_)->sendMessage(msg, h);
00103 }
00104 
00105 // RmstFilter::processMessage
00106 //
00107 // Called by the RmstFilter::recv method when this filter gets a message.
00108 
00109 bool RmstFilter::processMessage(Message *msg)
00110 {
00111   NRSimpleAttribute<int> *rmst_id_attr = NULL;
00112   NRSimpleAttribute<int> *frag_attr = NULL;
00113   NRSimpleAttribute<int> *pkts_sent_attr = NULL;
00114   NRSimpleAttribute<int> *tsprt_ctl_attr = NULL;
00115   NRSimpleAttribute<void *> *reinf_attr = NULL;
00116   NRSimpleAttribute<int> *nrscope = NULL;
00117   NRSimpleAttribute<int> *nr_class = NULL;
00118   NRAttrVec *data;
00119 
00120   Key2ExpLog::iterator exp_iterator;
00121   Int2Rmst::iterator rmst_iterator;
00122   int rmst_no;
00123   int frag_no;
00124   int class_type;
00125   int rmst_ctl_type;
00126   union LlToInt key;
00127   Rmst *rmst_ptr;
00128 
00129   // If this is a message that uses the transport layer, we process it.
00130   // Otherwise we send it back to the core (by returning true).
00131   tsprt_ctl_attr = RmstTsprtCtlAttr.find(msg->msg_attr_vec_);
00132   if (!tsprt_ctl_attr){
00133     DiffPrint(DEBUG_SOME_DETAILS, "RmstFilter got non-transport message\n");
00134     return true;
00135   }
00136   rmst_ctl_type = tsprt_ctl_attr->getVal();
00137 
00138   DiffPrint(DEBUG_IMPORTANT, "RmstFilter::processMessage got a");
00139 
00140   if (msg->new_message_)
00141     DiffPrint(DEBUG_IMPORTANT, " new (%d) ", msg->msg_type_);
00142   else
00143     DiffPrint(DEBUG_IMPORTANT, "n old (%d) ", msg->msg_type_);
00144 
00145   if (msg->last_hop_ != LOCALHOST_ADDR)
00146     DiffPrint(DEBUG_IMPORTANT, "%s message from %d to %d\n",
00147       rmstmsg_types[msg->msg_type_],
00148       msg->last_hop_, msg->next_hop_);
00149   else
00150     DiffPrint(DEBUG_IMPORTANT, "%s message from local agent\n",
00151       rmstmsg_types[msg->msg_type_]);
00152 
00153   // We only care about messages we haven't seen before,
00154   // but we generally let other filters get them (because they may need them).
00155   // However, if this is an old DATA message arriving at a sink, the sink may
00156   // negatively reinforce a reinforced path.  This is because we withold the 
00157   // new messages until we get the entire blob.  The old message is the result
00158   // of a lost ACK when using SMAC with ARQ.
00159   if (!msg->new_message_ && msg->msg_type_ == DATA
00160       && rmst_ctl_type == RMST_RESP){
00161     DiffPrint(DEBUG_SOME_DETAILS, 
00162       "  Sink got an old DATA message from node %d\n", msg->last_hop_);
00163     data = msg->msg_attr_vec_;
00164     rmst_id_attr = RmstIdAttr.find(data);
00165     if (!rmst_id_attr){
00166       DiffPrint(DEBUG_SOME_DETAILS,
00167         "  Filter received a bad transport packet!\n");
00168       return false;
00169     }
00170     rmst_no = rmst_id_attr->getVal();
00171     // Find the rmst.
00172     rmst_iterator = rmst_map_.find(rmst_no);
00173     if(rmst_iterator == rmst_map_.end()){
00174       DiffPrint(DEBUG_IMPORTANT,
00175         "  couldn't find DB entry for Rmst %d\n", rmst_no);
00176       return false;
00177     }
00178     else{
00179       rmst_ptr = (*rmst_iterator).second;
00180       if ( (local_sink_) && (msg->last_hop_ == rmst_ptr->last_hop_) ){
00181         // This is the case where SMAC sent the same DATA message twice to a sink.
00182         // We suppress this message so we don't kill our reinforced path.
00183         DiffPrint(DEBUG_IMPORTANT,
00184           "  We suppress old DATA message from smac retransmission!\n");
00185         return false;
00186       }
00187       else
00188         return true;
00189     }
00190   }
00191   else if (!msg->new_message_)
00192     return true;
00193 
00194   // When we get Rmst Fragments we must sync the local cache!
00195   if ( (rmst_ctl_type == RMST_RESP) && 
00196        ((msg->msg_type_ == DATA) || (msg->msg_type_ == EXPLORATORY_DATA)) ){
00197     rmst_ptr = syncLocalCache(msg);
00198     // syncLocalCache will return NULL if the 
00199     // attribute set doesn't make sense.
00200     if (rmst_ptr == NULL)
00201       return false;
00202     rmst_no = rmst_ptr->rmst_no_;
00203     // Mark the time we got some kind of data.
00204     GetTime (&last_data_rec_);
00205   }
00206 
00207   //  New exploratory messages are entered into the exp_map_,
00208   //  so that we can find the last hop if it gets reinforced.
00209   //  Positive reinforcement messages are used to find the 
00210   //  corresponding message in the exp_map_, so we know the
00211   //  current reinforced path to the source of an rmst.
00212   switch (msg->msg_type_){
00213 
00214     case(EXPLORATORY_DATA):
00215       ExpLog exp_msg;
00216 
00217       DiffPrint(DEBUG_LOTS_DETAILS,
00218         "  Exploratory_Msg: ptk_num = %x, rdm_id_ = %x, last_hop = %d\n",
00219         msg->pkt_num_, msg->rdm_id_, msg->last_hop_);
00220 
00221       // Put the ID for this Exploratory message, along with its last hop,
00222       // into the exp_map_.  If this message gets reinforced, we will be
00223       // able to identify the next hop in the back channel.
00224       DiffPrint(DEBUG_SOME_DETAILS,
00225         "  Exploratory message for Reliable transport Id = %d\n", rmst_no);
00226       key.int_val_[0] = msg->pkt_num_;
00227       key.int_val_[1] = msg->rdm_id_;
00228       DiffPrint(DEBUG_LOTS_DETAILS, "  Key = %llx\n", key.ll_val_);
00229       exp_msg.rmst_no_ = rmst_no;
00230       exp_msg.last_hop_ = msg->last_hop_;
00231       exp_map_.insert(Key2ExpLog::value_type(key.ll_val_, exp_msg));
00232 
00233       // If this is a new exploratory message arriving at a sink,
00234       // we assume that this path will get reinforced by the 
00235       // gradient filter. Sinks don't get positive reinforcement
00236       // messages, so we must record last_hop_ now.
00237       if (local_sink_){
00238         rmst_ptr->last_hop_ = msg->last_hop_; 
00239         if (rmst_ptr->reinf_){
00240           DiffPrint(DEBUG_IMPORTANT, "  got a new path exploratory msg at sink.\n");
00241           rmst_ptr->wait_for_new_path_ = true;
00242         }
00243         else{
00244           rmst_ptr->reinf_ = true;
00245           DiffPrint(DEBUG_IMPORTANT, "  got an initial exploratory msg at sink.\n");
00246         }
00247         DiffPrint(DEBUG_IMPORTANT, "  set last_hop for rmst %d to %d\n",
00248           rmst_no, rmst_ptr->last_hop_);
00249         rmst_ptr->pkts_rec_ = 0; 
00250         rmst_ptr->last_hop_pkts_sent_ = 0; 
00251       }
00252       else{
00253         // If this is not a sink we reset the base fragment that
00254         // we look for holes from.
00255         DiffPrint(DEBUG_LOTS_DETAILS,
00256           "  intermediate node resets sync_base_ and reinf_.\n");
00257         frag_attr = RmstFragAttr.find(msg->msg_attr_vec_);
00258         frag_no = frag_attr->getVal();
00259         rmst_ptr->sync_base_ = frag_no;
00260         if(rmst_ptr->reinf_)
00261           rmst_ptr->reinf_ = false;
00262         rmst_ptr->last_hop_ = 0;
00263         rmst_ptr->pkts_sent_ = 0;
00264         rmst_ptr->pkts_rec_ = 0;
00265         rmst_ptr->last_hop_pkts_sent_ = 0; 
00266         rmst_ptr->naks_rec_ = 0; 
00267       }
00268 
00269       // If this is not a sink and a watchdog timer is active, we cancel
00270       // it because we may not end up on the new reinforced path. We
00271       // don't want to look for fragments that will never arrive.
00272       if ((rmst_ptr->watchdog_active_) && (!local_sink_) 
00273         && (!rmst_ptr->local_source_)){
00274         rmst_ptr->cancel_watchdog_ = true;
00275         rmst_ptr->cleanHoleMap();
00276       }
00277 
00278       // We always forward exploratory data.
00279       return(true);
00280       break;
00281 
00282     case(DATA):
00283 
00284       if (rmst_ctl_type != RMST_RESP){
00285         processCtrlMessage(msg);
00286         // We don't let Rmst control messages go to the gradient or other filters.
00287         return false;
00288       }
00289 
00290       // We have a normal DATA packet.
00291       rmst_ptr->pkts_rec_++;
00292 
00293       // If we got the upstream send count - update it in Rmst.
00294       pkts_sent_attr = RmstPktsSentAttr.find(msg->msg_attr_vec_);
00295       if (pkts_sent_attr){
00296         rmst_ptr->last_hop_pkts_sent_ = pkts_sent_attr->getVal();
00297         DiffPrint(DEBUG_SOME_DETAILS,
00298           "processMessage:: got last_hop_pkts_sent_ = %d packets\n",
00299           rmst_ptr->last_hop_pkts_sent_);
00300         if ( (rmst_ptr->last_hop_pkts_sent_ > 20) && 
00301              (rmst_ptr->pkts_rec_ < (rmst_ptr->last_hop_pkts_sent_ * BLACKLIST_THRESHOLD)) ){
00302           Blacklist::iterator black_list_iterator;
00303           black_list_iterator = black_list_.begin();
00304           while(black_list_iterator != black_list_.end()){
00305             if(*black_list_iterator == rmst_ptr->last_hop_)
00306             break;
00307             black_list_iterator++;
00308           }
00309           if(black_list_iterator == black_list_.end()){
00310             DiffPrint(DEBUG_IMPORTANT, "Adding node %d to black_list_ !!\n",
00311               rmst_ptr->last_hop_);
00312             black_list_.push_front(rmst_ptr->last_hop_);
00313             ((DiffusionRouting *)dr_)->addToBlacklist(rmst_ptr->last_hop_);
00314             // Now send an EXP_REQ!
00315             sendExpReqUpstream(rmst_ptr);
00316             rmst_ptr->sent_exp_req_ = true;
00317             GetTime(&rmst_ptr->exp_req_time_);
00318             // We need to send a negative reinforcement on blacklisted link!
00319             Message *neg_reinf_msg;
00320             neg_reinf_msg = new Message(DIFFUSION_VERSION, NEGATIVE_REINFORCEMENT,
00321               0, 0, interest_attrs_->size(), pkt_count_, rdm_id_, 
00322               rmst_ptr->last_hop_, LOCALHOST_ADDR);
00323             neg_reinf_msg->msg_attr_vec_ = CopyAttrs(interest_attrs_);
00324             ((DiffusionRouting *)dr_)->sendMessage(neg_reinf_msg, filter_handle_, 1);
00325             pkt_count_++;
00326             delete neg_reinf_msg;
00327           }
00328         }
00329       }
00330 
00331       // We suppress new DATA messages that don't arrive on the 
00332       // reinforced path. 
00333       if ( msg->last_hop_ != rmst_ptr->last_hop_ ){
00334         DiffPrint(DEBUG_IMPORTANT,
00335           "  We suppress new DATA message on non-backchannel path!; backchannel = %d\n",
00336           rmst_ptr->last_hop_);
00337         msg->new_message_ = 0;
00338         return true;
00339       }
00340 
00341       if (rmst_ptr->wait_for_new_path_){
00342         rmst_ptr->wait_for_new_path_ = false;
00343         DiffPrint(DEBUG_SOME_DETAILS, "  node resets wait_for_new_path_.\n");
00344       }
00345 
00346       if (local_sink_ && rmst_ptr->sent_exp_req_){
00347         DiffPrint(DEBUG_SOME_DETAILS,
00348           "  source got a new path, set sent_exp_req_ false.\n");
00349         rmst_ptr->sent_exp_req_ = false;
00350       }
00351 
00352       // We forward DATA if we aren't a source or a sink.
00353       // Sources collect all fragments and send them from a timer.
00354       // Sinks collect all fragments and send them to the app when they
00355       // have all arrived.
00356       if(rmst_ptr->local_source_ || local_sink_)
00357         return false;
00358       else{
00359         rmst_ptr->pkts_sent_++;
00360         // We need to alter the RmstPktsSentAttr to reflect this node!
00361         if(pkts_sent_attr)
00362           pkts_sent_attr->setVal(rmst_ptr->pkts_sent_);
00363         return true;
00364       }
00365       break;
00366 
00367     case(INTEREST):
00368       data = msg->msg_attr_vec_;
00369       nr_class = NRClassAttr.find(data);
00370       if (nr_class){
00371         class_type = nr_class->getVal();
00372         if (class_type == NRAttribute::DISINTEREST_CLASS)
00373           DiffPrint(DEBUG_SOME_DETAILS, "  DISINTEREST_CLASS\n");
00374       }
00375 
00376       nrscope = NRScopeAttr.find(msg->msg_attr_vec_);
00377       if(nrscope->getVal() == NRAttribute::NODE_LOCAL_SCOPE)
00378         DiffPrint(DEBUG_SOME_DETAILS, "  rmst LOCAL_SCOPE Interest Message\n");
00379       else if (msg->last_hop_ == LOCALHOST_ADDR){
00380         DiffPrint(DEBUG_SOME_DETAILS, "  rmst Interest Message from local SINK\n");
00381         local_sink_ = true;
00382         local_sink_port_ = msg->source_port_;
00383         GetTime (&last_sink_time_);
00384         if (interest_attrs_ == NULL)
00385           interest_attrs_ = CopyAttrs(msg->msg_attr_vec_);
00386       }
00387       else{
00388         DiffPrint(DEBUG_SOME_DETAILS, "  rmst Interest Message from non-local node\n");
00389           if (interest_attrs_ == NULL)
00390             interest_attrs_ = CopyAttrs(msg->msg_attr_vec_);
00391       }
00392       break;
00393 
00394     case(POSITIVE_REINFORCEMENT):
00395       ReinfMessage *reinf_msg;
00396       ExpLog exp_log;
00397 
00398       DiffPrint(DEBUG_IMPORTANT, "  Positive Reinf arrived\n");
00399       reinf_attr = ReinforcementAttr.find(msg->msg_attr_vec_);
00400       reinf_msg = (ReinfMessage*)reinf_attr->getVal();
00401       DiffPrint(DEBUG_LOTS_DETAILS, "  Pos_Reinf: ptk_num = %x, rdm_id_ = %x\n",
00402         reinf_msg->pkt_num_, reinf_msg->rdm_id_);
00403 
00404       key.int_val_[0] = reinf_msg->pkt_num_;
00405       key.int_val_[1] = reinf_msg->rdm_id_;
00406       exp_iterator = exp_map_.find(key.ll_val_);
00407       if(exp_iterator != exp_map_.end()){
00408         exp_log = (*exp_iterator).second;
00409         DiffPrint(DEBUG_SOME_DETAILS, "  Reinforcement for rmst_no = %d, last_hop_ = %d\n",
00410           exp_log.rmst_no_, exp_log.last_hop_);
00411 
00412         // Here is where we must update the rmst with back-channel
00413         // last hop.
00414         rmst_no = exp_log.rmst_no_;
00415         rmst_iterator = rmst_map_.find(rmst_no);
00416         if(rmst_iterator != rmst_map_.end()){
00417           rmst_ptr = (*rmst_iterator).second;
00418           rmst_ptr->last_hop_ = exp_log.last_hop_;
00419           rmst_ptr->fwd_hop_ = msg->last_hop_;
00420           DiffPrint(DEBUG_SOME_DETAILS, "  Setting rmst_no %d last_hop_ = %d, fwd_hop_ = %d\n",
00421             rmst_no, rmst_ptr->last_hop_, rmst_ptr->fwd_hop_);
00422           if(!rmst_ptr->reinf_){
00423             rmst_ptr->reinf_ = true;
00424             if(rmst_ptr->local_source_)
00425               DiffPrint(DEBUG_LOTS_DETAILS, "  Local source got a Reinf\n");
00426           }
00427         }
00428         else{
00429           DiffPrint(DEBUG_IMPORTANT, "  Reinforcement cant't find rmst_no\n");
00430           break;
00431         }
00432         // We are on the reinforced path, so we must start a timer
00433         // if one hasn't already been started. Sinks don't get
00434         // reinforced, so they start a WATCHDOG in syncLocalCache.
00435         if( (rmst_ptr->watchdog_active_ == false) && (caching_mode_) ){
00436           TimerCallback *rmst_timer;
00437           DiffPrint(DEBUG_IMPORTANT,
00438             "  Set a WATCHDOG_TIMER at caching node for reinforced rmst_no %d\n",
00439             rmst_no);
00440           rmst_timer = new RmstTimeout(this, rmst_no, WATCHDOG_TIMER);
00441           // We check on things every 10 seconds.
00442           rmst_ptr->watchdog_handle_ = 
00443             ((DiffusionRouting *)dr_)->addTimer(WATCHDOG_INTERVAL, rmst_timer);
00444           rmst_ptr->watchdog_active_ = true;
00445         }
00446         if (rmst_ptr->wait_for_new_path_){
00447           DiffPrint(DEBUG_SOME_DETAILS, "  Resetting wait_for_new_path_ for rmst_no %d\n", rmst_no);
00448           rmst_ptr->wait_for_new_path_ = false;
00449         }
00450         if (rmst_ptr->sent_exp_req_){
00451           DiffPrint(DEBUG_SOME_DETAILS,
00452             "  intermediate node got a new path, set sent_exp_req_ false.\n");
00453           rmst_ptr->sent_exp_req_ = false;
00454         }
00455       }
00456       else{
00457         if(!rmst_ptr->local_source_)
00458           DiffPrint(DEBUG_IMPORTANT, "  Reinforcement matches no Exploratory msg\n");
00459       }
00460       break;
00461 
00462     case(NEGATIVE_REINFORCEMENT):
00463       bool ret_val;
00464       if (tsprt_ctl_attr){
00465         DiffPrint(DEBUG_SOME_DETAILS,
00466             "  NEGATIVE_REINFORCEMENT, last_hop_ = %d, rmst_ctl_type = %d\n", 
00467             msg->last_hop_, rmst_ctl_type);
00468       }
00469       // We need to check if we got a NEGATIVE REINFORCEMENT from a node that is the
00470       // next node in the forward direction (downstream).  If so, and we are the source
00471       // we must send a new EXPLORATORY message;  else if we are not the source,
00472       // we must send and exp request upstream.
00473       ret_val = true;
00474       rmst_iterator = rmst_map_.begin();
00475       while(rmst_iterator != rmst_map_.end()){
00476         rmst_ptr = (*rmst_iterator).second;
00477         DiffPrint(DEBUG_SOME_DETAILS,
00478             "  searching rmsts - rmst_no_ %d: fwd_hop_ = %d, reinf_ = %d, acked = %d\n",
00479             rmst_ptr->rmst_no_, rmst_ptr->fwd_hop_, rmst_ptr->reinf_, rmst_ptr->acked_);
00480         if (rmst_ptr->local_source_ && rmst_ptr->reinf_
00481             && (rmst_ptr->fwd_hop_ == msg->last_hop_)
00482             && !rmst_ptr->acked_){
00483           // If we are reinforced then we never got and EXP_REQ!!
00484           DiffPrint(DEBUG_SOME_DETAILS, "  local source sees NEG_REINF\n");
00485           processExpReq(rmst_ptr, rmst_ptr->max_frag_sent_);
00486         }
00487         else if (!rmst_ptr->local_source_ && (rmst_ptr->fwd_hop_ == msg->last_hop_)
00488                  && rmst_ptr->reinf_ && !rmst_ptr->acked_){
00489           DiffPrint(DEBUG_SOME_DETAILS, "  intermediate node sees NEG_REINF from reinforced node\n");
00490           DiffPrint(DEBUG_SOME_DETAILS, "  send Exp Request upstream!\n");
00491           ret_val = false;
00492           sendExpReqUpstream(rmst_ptr);
00493         }
00494         else{
00495           DiffPrint(DEBUG_SOME_DETAILS,
00496             "  node sees NEG_REINF from non-reinforced node - let routing layer see it\n");
00497       ret_val = true;
00498     }
00499         rmst_iterator++;
00500       }
00501       if (!ret_val)
00502         return false;
00503       break;
00504 
00505     default:
00506       break;
00507   } // switch msg->type
00508   return true;
00509 }
00510 
00511 
00512 // RmstFilter::syncLocalCache
00513 //
00514 // This routine adds new transport data messages to the local data base.
00515 
00516 Rmst* RmstFilter::syncLocalCache (Message *msg)
00517 {
00518   NRSimpleAttribute<int> *rmst_id_attr = NULL;
00519   NRSimpleAttribute<int> *frag_attr = NULL;
00520   NRSimpleAttribute<int> *max_frag_attr = NULL;
00521   NRSimpleAttribute<void *> *data_buf_attr = NULL;
00522   NRAttrVec *data = msg->msg_attr_vec_;
00523   Int2Rmst::iterator rmst_iterator;
00524   int rmst_no;
00525   int frag_no;
00526   int max_frag_no;
00527   void *blob_ptr;
00528   int blob_len;
00529   void *tmp_frag_ptr;
00530   Rmst *rmst_ptr;
00531 
00532   rmst_id_attr = RmstIdAttr.find(data);
00533   frag_attr = RmstFragAttr.find(data);
00534   max_frag_attr = RmstMaxFragAttr.find(data);
00535   data_buf_attr = RmstDataAttr.find(data);
00536 
00537   if (! (rmst_id_attr && frag_attr && data_buf_attr) ){
00538     DiffPrint(DEBUG_IMPORTANT, "  Filter received a BAD transport packet!\n");
00539     return NULL;
00540   }
00541 
00542   rmst_no = rmst_id_attr->getVal();
00543   frag_no = frag_attr->getVal();
00544   if(max_frag_attr)
00545     max_frag_no = max_frag_attr->getVal();
00546   else
00547     max_frag_no = 0;
00548   blob_ptr = data_buf_attr->getVal();
00549   blob_len = data_buf_attr->getLen();
00550 
00551   // Here is where I consuslt the Data Base and possibly add a new map,
00552   // or add to an existing map.
00553 
00554   rmst_iterator = rmst_map_.find(rmst_no);
00555   if(rmst_iterator == rmst_map_.end()){
00556     DiffPrint(DEBUG_IMPORTANT, "  creating a new DB entry for Rmst %d\n", rmst_no);
00557     DiffPrint(DEBUG_SOME_DETAILS, "  Max Fragment number = %d\n", max_frag_no);
00558     rmst_ptr = new Rmst(rmst_no);
00559     rmst_ptr->max_frag_ = max_frag_no;
00560     rmst_map_.insert(Int2Rmst::value_type(rmst_no, rmst_ptr));
00561 
00562     // Artificially initialize last_nak_time_ so it's older
00563     // than any Naks we may get.
00564     GetTime(&rmst_ptr->last_nak_time_);
00565 
00566     // Several decisions in this routine relate to messages that emanated
00567     // from a local source.
00568     if (msg->last_hop_ == LOCALHOST_ADDR) {
00569       rmst_ptr->local_source_ = true;
00570       rmst_ptr->local_source_port_ = msg->source_port_;
00571       // This is the first fragment of an rmst from a local source.
00572       // The message will be marked exploratory by this filter.
00573       // We need to mark the last hop as LOCALHOST_ADDR.
00574       rmst_ptr->last_hop_ = LOCALHOST_ADDR;
00575     }
00576     // We need to capture the RmstTargetAttr for concantenation on sendMessage.
00577     if((rmst_ptr->local_source_)||(local_sink_)||(caching_mode_)){
00578       NRSimpleAttribute<char *> *rmst_tgt_attr = NULL;
00579       rmst_tgt_attr = RmstTargetAttr.find(msg->msg_attr_vec_);
00580       if (rmst_tgt_attr){
00581         char *tmp_str = rmst_tgt_attr->getVal();
00582         rmst_ptr->target_str_ = new char[strlen(tmp_str)+1];
00583         strcpy (rmst_ptr->target_str_, tmp_str);
00584         DiffPrint(DEBUG_IMPORTANT, "  RmstTargetAttr = %s\n", rmst_ptr->target_str_);
00585       }
00586       else
00587         DiffPrint(DEBUG_IMPORTANT, "  no RmstTargetAttr Rmst %d !\n", rmst_no);
00588     }
00589   }
00590   else
00591     rmst_ptr = (*rmst_iterator).second;
00592 
00593   if(!rmst_ptr->local_source_)
00594     DiffPrint(DEBUG_IMPORTANT, "  Got a blob, rmstId = %d, frag_no = %d\n", rmst_no, frag_no);
00595 
00596   // Update the time we last saw data for this Rmst.
00597   GetTime(&rmst_ptr->last_data_time_);
00598 
00599   // We cache the fragment at the sink and source,
00600   // or in caching mode at each node that receives it.
00601   if((rmst_ptr->local_source_)||(local_sink_)||(caching_mode_)){
00602     tmp_frag_ptr = rmst_ptr->getFrag(frag_no);
00603     if (tmp_frag_ptr == NULL){
00604       if(!rmst_ptr->local_source_)
00605         DiffPrint(DEBUG_SOME_DETAILS, "  creating a new frag %d entry for Rmst %d\n",
00606           frag_no, rmst_no);
00607       if (frag_no == rmst_ptr->max_frag_)
00608       rmst_ptr->max_frag_len_ = blob_len;
00609       tmp_frag_ptr = new char[blob_len];
00610       memcpy(tmp_frag_ptr, blob_ptr, blob_len);
00611       rmst_ptr->putFrag(frag_no, tmp_frag_ptr);
00612 
00613       // Check to see if this fragment was NAKed.
00614       // If so, delete from the hole map.
00615       if(!rmst_ptr->local_source_){
00616         if ( rmst_ptr->inHoleMap(frag_no) ){
00617           // We need to see if we sent a NAK for this frag.
00618           NakData *nak_ptr = rmst_ptr->getHole(frag_no);
00619           if(nak_ptr->nak_sent_)
00620             DiffPrint(DEBUG_SOME_DETAILS, "  We sent a NAK_REQ for this fragment.\n");
00621           DiffPrint(DEBUG_SOME_DETAILS, "  filter removing hole %d from hole_map_\n",frag_no);
00622           rmst_ptr->delHole(frag_no);
00623         }
00624       }
00625       // We start a WATCHDOG for an rmst here if: this is a local sink,
00626       // we haven't started a timer, and this is not the initial fragment.
00627       // Intermediate nodes (in caching mode) start a timer if they are on the
00628       // reinforced path, which is checked in processMessage.
00629       if((!rmst_ptr->local_source_)&&(local_sink_)&&(rmst_ptr->watchdog_active_ == false)
00630         && (frag_no>0)){
00631         TimerCallback *rmst_timer;
00632         DiffPrint(DEBUG_IMPORTANT, "  Set a WATCHDOG_TIMER at sink for rmst_no %d\n", rmst_no);
00633         rmst_timer = new RmstTimeout(this, rmst_no, WATCHDOG_TIMER);
00634         // We check on things every 10 seconds.
00635         rmst_ptr->watchdog_handle_ = ((DiffusionRouting *)dr_)->addTimer(WATCHDOG_INTERVAL,
00636           rmst_timer);
00637         rmst_ptr->watchdog_active_ = true;
00638       }
00639     }
00640     else
00641       DiffPrint(DEBUG_SOME_DETAILS, "  got a duplicate frag %d for blob %d\n",
00642         frag_no, rmst_no);
00643 
00644     // If we have still have a hole in the fragment map, update the hole map.
00645 
00646     if ((!rmst_ptr->local_source_) && (rmst_ptr->holeInFragMap()))
00647       rmst_ptr->syncHoleMap();
00648 
00649     // If the Rmst is complete, cancell timer, stop timer,
00650     // send Ack, give to local sinks.
00651     if(rmst_ptr->rmstComplete()){
00652       if ((rmst_ptr->watchdog_active_) && (!rmst_ptr->local_source_)){
00653         DiffPrint(DEBUG_SOME_DETAILS, 
00654           "  Rmst #%d is complete set cancel_watchdog_ to stop WATCHDOG\n",
00655           rmst_no);
00656         rmst_ptr->cancel_watchdog_ = true;
00657       }
00658 
00659       // Send this Rmst to any local sink
00660       if(local_sink_ && !(rmst_ptr->acked_)){
00661         sendRmstToSink(rmst_ptr);
00662         // We mark the rmst acked at the sink so it will clean up.
00663         rmst_ptr->acked_ = true;
00664       }
00665 
00666       // If this is a source, we only send out the fragments when we've got
00667       // them all from the application. If the Rmst is complete we add the
00668       // Rmst to the send_list_, and if there is no send timer we start one.
00669       if(rmst_ptr->local_source_){
00670         SendMsgData new_send_msg;
00671         // The Rmst is complete and this is a source - put in send list.
00672         new_send_msg.rmst_no_ = rmst_no;
00673         new_send_msg.last_frag_sent_ = -1;
00674         new_send_msg.exp_base_ = 0;
00675         send_list_.push_back(new_send_msg);
00676         if(!send_timer_active_){
00677           TimerCallback *send_timer;
00678           // Now add a timer to send this and any NAKS.
00679           DiffPrint(DEBUG_SOME_DETAILS,
00680             "  Rmst %d ready to send - Set a SEND_TIMER\n", rmst_no);
00681           send_timer = new RmstTimeout(this, -1, SEND_TIMER);
00682           // We check on things every second.
00683           send_timer_handle_ = 
00684             ((DiffusionRouting *)dr_)->addTimer(SEND_INTERVAL, send_timer);
00685           send_timer_active_ = true;
00686         }
00687       }
00688       else
00689         // We must let upstream nodes know that we got the whole blob.
00690         sendAckToSource(rmst_ptr);
00691     }
00692   }
00693   else{
00694     rmst_ptr->max_frag_rec_ = frag_no;
00695     DiffPrint(DEBUG_LOTS_DETAILS, "  Not caching frag %d entry for Rmst %d\n", frag_no, rmst_no);
00696   }
00697 
00698   return rmst_ptr;
00699 }
00700 
00701 void RmstFilter::processCtrlMessage(Message *msg)
00702 {
00703   NRSimpleAttribute<int> *tsprt_ctl_attr = NULL;
00704   NRSimpleAttribute<int> *rmst_id_attr = NULL;
00705   NRSimpleAttribute<int> *frag_attr = NULL;
00706   NRAttrVec *data;
00707   NRAttrVec attrs;
00708   Int2Rmst::iterator rmst_iterator;
00709   int rmst_no; 
00710   int frag_no;
00711   int rmst_ctl_type;
00712   Rmst *rmst_ptr;
00713   void *frag_ptr;
00714   Message *nak_msg;
00715   NRAttrVec::iterator place;
00716   bool forwarding_nak = false;
00717 
00718   data = msg->msg_attr_vec_;
00719 
00720   tsprt_ctl_attr = RmstTsprtCtlAttr.find(data);
00721   rmst_ctl_type = tsprt_ctl_attr->getVal();
00722 
00723   rmst_id_attr = RmstIdAttr.find(data);
00724   if(!rmst_id_attr) {
00725     DiffPrint(DEBUG_SOME_DETAILS, "  Node got a bad Rmst control msg - no RmstIdAttr!\n");
00726     return;
00727   }
00728   rmst_no = rmst_id_attr->getVal();
00729 
00730   // Let's make sure we have this rmst
00731   rmst_iterator = rmst_map_.begin();
00732   rmst_iterator = rmst_map_.find(rmst_no);
00733   if(rmst_iterator != rmst_map_.end())
00734     rmst_ptr = (*rmst_iterator).second;
00735   else{
00736     DiffPrint(DEBUG_IMPORTANT, "  Filter can't find Rmst %d for Rmst control msg\n", rmst_no);
00737     return;
00738   }
00739 
00740   switch (rmst_ctl_type){
00741 
00742   case(ACK_RESP):
00743     DiffPrint(DEBUG_IMPORTANT, "  Got an ACK_RESP\n");
00744     // For now we automatically forward ACKs if we're not the source.
00745     rmst_ptr->acked_ = true;
00746     if(!rmst_ptr->local_source_){
00747       Message *ack_msg;
00748       // If we got an ACK and we aren't the source, we must be an
00749       // intermediate node (Sinks don't get ACKs, they send them).
00750       // We need to forward ACK toward source if possible.
00751       if (rmst_ptr->reinf_) {
00752         DiffPrint(DEBUG_SOME_DETAILS, "  forwarding ACK to %d\n", rmst_ptr->last_hop_);
00753         attrs.push_back(RmstTsprtCtlAttr.make(NRAttribute::IS, ACK_RESP));
00754         attrs.push_back(RmstIdAttr.make(NRAttribute::IS, rmst_no));
00755         ack_msg = new Message(DIFFUSION_VERSION, DATA, 0, 0, attrs.size(),
00756           pkt_count_, rdm_id_, rmst_ptr->last_hop_, LOCALHOST_ADDR);
00757         ack_msg->msg_attr_vec_ = CopyAttrs(&attrs);
00758         ((DiffusionRouting *)dr_)->sendMessage(ack_msg, filter_handle_, 1);
00759         pkt_count_++;
00760         delete ack_msg;
00761         ClearAttrs(&attrs);
00762       }
00763       else
00764         DiffPrint(DEBUG_IMPORTANT, "  intermediate node can't forward ACK for Rmst %d\n", rmst_no);
00765     }
00766     else{
00767       DiffPrint(DEBUG_IMPORTANT, "  Source got ACK for Rmst %d\n", rmst_no);
00768       sendContToSource(rmst_ptr);
00769     }
00770     break;
00771 
00772   case(NAK_REQ):
00773 
00774     // Mark the time we got this NAK for cleanup timer.
00775     GetTime(&rmst_ptr->last_nak_time_);
00776     rmst_ptr->naks_rec_++;
00777     DiffPrint(DEBUG_IMPORTANT, "  Got a NAK_REQ; number = %d\n", rmst_ptr->naks_rec_);
00778 
00779     if ((rmst_ptr->naks_rec_ > 10) && (rmst_ptr->naks_rec_ > (.30 * rmst_ptr->max_frag_)) &&
00780             rmst_ptr->local_source_){
00781       DiffPrint(DEBUG_IMPORTANT, "  Too many NAKs - send an EXPLORATORY msg!\n");
00782       processExpReq(rmst_ptr, rmst_ptr->max_frag_sent_);
00783       return;
00784     }
00785 
00786     // If we sent an exp request more than 30 seconds ago,
00787     // we send it again.
00788     if (rmst_ptr->sent_exp_req_){
00789       int exp_time = rmst_ptr->last_nak_time_.tv_sec - rmst_ptr->exp_req_time_.tv_sec;
00790       DiffPrint(DEBUG_SOME_DETAILS, 
00791         "  Node that sent an EXP_REQ got a NAK: time since last exp = %d\n", exp_time);
00792       if( (rmst_ptr->last_nak_time_.tv_sec - rmst_ptr->exp_req_time_.tv_sec) > 30){
00793         // Resend an EXP_REQ!!!
00794         DiffPrint(DEBUG_IMPORTANT, "  Node resends EXP_REQ up blacklisted stream!\n");
00795         sendExpReqUpstream(rmst_ptr);
00796         GetTime(&rmst_ptr->exp_req_time_);
00797         // Send another negative reinforcement on blacklisted link!
00798         Message *neg_reinf_msg;
00799         neg_reinf_msg = new Message(DIFFUSION_VERSION, NEGATIVE_REINFORCEMENT,
00800           0, 0, interest_attrs_->size(), pkt_count_, rdm_id_, 
00801           rmst_ptr->last_hop_, LOCALHOST_ADDR);
00802         neg_reinf_msg->msg_attr_vec_ = CopyAttrs(interest_attrs_);
00803         ((DiffusionRouting *)dr_)->sendMessage(neg_reinf_msg, filter_handle_, 1);
00804         pkt_count_++;
00805         delete neg_reinf_msg;
00806       }
00807       return;
00808     }
00809 
00810     // We need to send the naked fragments if we are the source,
00811     // or a caching node.
00812     place = data->begin();
00813     for(;;){
00814       frag_attr = RmstFragAttr.find_from(data, place, &place);
00815       if (!frag_attr)
00816         break;
00817       frag_no = frag_attr->getVal();
00818       DiffPrint(DEBUG_IMPORTANT, "  Filter received a NAK_REQ for Rmst %d, frag %d\n",
00819         rmst_no, frag_no);
00820 
00821       // Check if we have this fragment.
00822       // If not forward NAK toward source if possible.
00823       frag_ptr =  rmst_ptr->getFrag(frag_no);
00824       if (frag_ptr == NULL){
00825         DiffPrint(DEBUG_SOME_DETAILS, "  Filter can't find frag %d of Rmst %d for NAK\n",
00826         frag_no, rmst_no);
00827         // We need to forward NAK toward source if possible.
00828         if ( (rmst_ptr->reinf_) && (rmst_ptr->last_hop_ != LOCALHOST_ADDR) ){
00829           forwarding_nak = true;
00830           DiffPrint(DEBUG_IMPORTANT, "  forwarding NAK to %d\n", rmst_ptr->last_hop_);
00831           attrs.push_back(RmstFragAttr.make(NRAttribute::IS, frag_no));
00832 
00833           if(caching_mode_){
00834             // We need to add this fragment to our hole map!
00835             rmst_ptr->putHole(frag_no);
00836             NakData *nak_ptr = rmst_ptr->getHole(frag_no);
00837             // Artificially age this hole so it gets
00838             // naked immediately.
00839             nak_ptr->tmv.tv_sec -= 1;
00840           }
00841         }
00842         else
00843           DiffPrint(DEBUG_IMPORTANT, "  not forwarding NAK! - no place to send it!\n");
00844       }
00845       else{
00846         // We have this fragment so add it to the NakList for sending.
00847         NakMsgData nak_msg_data;
00848         NakList::iterator nak_list_iterator;
00849         nak_list_iterator = nak_list_.begin();
00850         while(nak_list_iterator != nak_list_.end()){
00851           if((nak_list_iterator->rmst_no_ == rmst_no) &&
00852             (nak_list_iterator->frag_no_ == frag_no))
00853             break;
00854           nak_list_iterator++;
00855         }
00856         if(nak_list_iterator == nak_list_.end()){
00857           DiffPrint(DEBUG_SOME_DETAILS,
00858             "  adding NAK for rmst %d frag %d to nak_list_\n", rmst_no, frag_no);
00859           nak_msg_data.rmst_no_ = rmst_no;
00860           nak_msg_data.frag_no_ = frag_no;
00861           nak_list_.push_back(nak_msg_data);
00862           if(!send_timer_active_){
00863             TimerCallback *send_timer;
00864             // Now add a timer to send this and any NAKS.
00865             DiffPrint(DEBUG_LOTS_DETAILS, 
00866               "  Set a SEND_TIMER for reinforced rmst_no %d\n", rmst_no);
00867             send_timer = new RmstTimeout(this, rmst_no, SEND_TIMER);
00868             // We check on things every second.
00869             send_timer_handle_ = 
00870               ((DiffusionRouting *)dr_)->addTimer(SEND_INTERVAL, send_timer);
00871             send_timer_active_ = true;
00872           }
00873         }
00874       }
00875       place++;
00876     }
00877 
00878     if (forwarding_nak){
00879       attrs.push_back(RmstTsprtCtlAttr.make(NRAttribute::IS, NAK_REQ));
00880       attrs.push_back(RmstIdAttr.make(NRAttribute::IS, rmst_no));
00881       nak_msg = new Message(DIFFUSION_VERSION, DATA, 0, 0,
00882       attrs.size(), pkt_count_, rdm_id_, rmst_ptr->last_hop_,
00883         LOCALHOST_ADDR);
00884       nak_msg->msg_attr_vec_ = CopyAttrs(&attrs);
00885       ((DiffusionRouting *)dr_)->sendMessage(nak_msg, filter_handle_, 1);
00886       pkt_count_++;
00887       delete nak_msg;
00888       ClearAttrs(&attrs);
00889     }
00890     break;
00891 
00892   case(EXP_REQ):
00893     DiffPrint(DEBUG_IMPORTANT, "  Got an EXP_REQ\n");
00894     if(!rmst_ptr->local_source_){
00895       DiffPrint(DEBUG_SOME_DETAILS, "  Filter forwarding EXP_REQ for Rmst %d\n", rmst_no);
00896       // We need to forward EXP_REQ toward source if possible.
00897       if (rmst_ptr->reinf_)
00898         sendExpReqUpstream(rmst_ptr);
00899     }
00900     else{
00901       // We need to call a routine that will clean the NAK list of 
00902       // outstanding NAK responses for this Rmst, put a new expBase
00903       // in the send list (lowest of nak or send Lists), and set this
00904       // rmst as not reinforced.
00905       frag_attr = RmstFragAttr.find(msg->msg_attr_vec_);
00906       frag_no = frag_attr->getVal();
00907       DiffPrint(DEBUG_IMPORTANT, "  Source got EXP request for Rmst %d\n", rmst_no);
00908       if (rmst_ptr->reinf_)
00909           processExpReq(rmst_ptr, frag_no);
00910       else
00911           DiffPrint(DEBUG_IMPORTANT, "  EXP request for non-reinforced Rmst %d\n", rmst_no);
00912     }
00913     break;
00914 
00915   default:
00916     break;
00917   }  // switch (rmst_ctl_type)
00918   return;
00919 }
00920 
00921 void RmstFilter::setupNak(int rmst_id)
00922 {
00923   NRAttrVec attrs;
00924   int frag_id;
00925   NakData *nak_ptr;
00926   Rmst *rmst_ptr;
00927   int nak_count = 0;
00928 
00929   Int2Rmst::iterator rmst_iterator = rmst_map_.find(rmst_id);
00930   if(rmst_iterator != rmst_map_.end())
00931     rmst_ptr = (*rmst_iterator).second;
00932   else{
00933     DiffPrint(DEBUG_IMPORTANT, "setupNak - can't find Rmst %d\n", rmst_id);
00934     return;
00935   }
00936 
00937   Int2Nak::iterator hole_iter = rmst_ptr->hole_map_.begin();
00938   bool send_new_nak = false;
00939   timeval cur_time;
00940 
00941   // We now have an iterator to look at each hole (hole_iter),
00942   // a Rmst Id (rmst_id), a fragment Id ((*hole_iter).first),
00943   // a NakData pointer ((*hole_iter).second), and an Rmst
00944   // pointer (rmst_ptr).
00945 
00946   GetTime (&cur_time);
00947 
00948   // The first pass finds holes that haven't been NAKed and should be.
00949   while(hole_iter != rmst_ptr->hole_map_.end()){
00950     frag_id = (*hole_iter).first;
00951     nak_ptr = (*hole_iter).second;
00952     DiffPrint(DEBUG_SOME_DETAILS,
00953       "  setupNak - found hole rmst_id %d, frag %d\n", rmst_id, frag_id);
00954 
00955     // If we never NAKed this fragment and it's past due,
00956     // mark it so it gets NAKed.
00957     if (!nak_ptr->nak_sent_){
00958       if ( (cur_time.tv_sec - nak_ptr->tmv.tv_sec) > 3 ){
00959         nak_ptr->nak_sent_ = true;
00960         nak_ptr->send_nak_ = true;
00961         send_new_nak = true;
00962       }
00963       else
00964         DiffPrint(DEBUG_SOME_DETAILS,
00965           "  setupNak - hole %d not old enough to NAK\n", frag_id);
00966     }
00967     // If we NAKed this fragment and the NAK response is past due,
00968     // NAK it again.
00969     else if ( (cur_time.tv_sec - nak_ptr->tmv.tv_sec) > NAK_RESPONSE_WAIT ){
00970       DiffPrint(DEBUG_SOME_DETAILS, "  setupNak - hole %d has an overdue NAK\n", frag_id);
00971       nak_ptr->send_nak_ = true;
00972       send_new_nak = true;
00973     }
00974     hole_iter++;
00975   }
00976 
00977   if (send_new_nak){
00978     Message *nak_msg;
00979     if ( rmst_ptr->last_hop_ == LOCALHOST_ADDR ){
00980       DiffPrint(DEBUG_IMPORTANT, "  can't send NAK, no last_hop_!\n");
00981       return;
00982     }
00983     // The second pass adds all holes that should be NAKed to vector.
00984     hole_iter = rmst_ptr->hole_map_.begin();
00985     while( (hole_iter != rmst_ptr->hole_map_.end()) && (nak_count <= 10) ){
00986       frag_id = (*hole_iter).first;
00987       nak_ptr = (*hole_iter).second;
00988 
00989       if ( nak_ptr->send_nak_ ){
00990         nak_ptr->send_nak_ = false;
00991         DiffPrint(DEBUG_SOME_DETAILS,
00992           "  setupNak - adding a NAK for frag_id %d to attrs\n", frag_id);
00993         attrs.push_back(RmstFragAttr.make(NRAttribute::IS, frag_id));
00994         GetTime(&(nak_ptr->tmv));
00995         nak_count++;
00996       }
00997       hole_iter++;
00998     }
00999     attrs.push_back(RmstTsprtCtlAttr.make(NRAttribute::IS, NAK_REQ));
01000     attrs.push_back(RmstIdAttr.make(NRAttribute::IS, rmst_id));
01001 
01002     // Code to send a message to last_hop_
01003     nak_msg = new Message(DIFFUSION_VERSION, DATA, 0, 0, attrs.size(),
01004       pkt_count_, rdm_id_, rmst_ptr->last_hop_, LOCALHOST_ADDR);
01005     pkt_count_++;
01006     nak_msg->msg_attr_vec_ = CopyAttrs(&attrs);
01007 
01008     DiffPrint(DEBUG_IMPORTANT, "  Sending NAK_REQ to node %d\n", rmst_ptr->last_hop_);
01009     ((DiffusionRouting *)dr_)->sendMessage(nak_msg, filter_handle_, 1);
01010 
01011     delete nak_msg;
01012     ClearAttrs(&attrs);
01013 
01014     // Mark the time we sent this NAK for cleanup timer.
01015     GetTime(&rmst_ptr->last_nak_time_);
01016 
01017   }
01018   else
01019     DiffPrint(DEBUG_SOME_DETAILS, "  setupNak - no need for a new NAK for rmst_id %d\n", rmst_id);
01020   return;
01021 }
01022 
01023 void RmstFilter::processExpReq(Rmst *rmst_ptr, int frag_no)
01024 {
01025   NakList::iterator nak_list_iterator;
01026   SendList::iterator send_list_iterator;
01027   int rmst_no = rmst_ptr->rmst_no_;
01028 
01029   DiffPrint(DEBUG_IMPORTANT, "  processExpReq called for rmstId %d, frag_no %d\n", rmst_no, frag_no);
01030 
01031   // Indicate that Rmst is not reinforced.
01032   rmst_ptr->reinf_ = false;
01033   rmst_ptr->pkts_sent_ = 0;
01034 
01035   // If we have an ACK_TIMER active cancel it. We want to resend some packets.
01036   if(rmst_ptr->ack_timer_active_){
01037     ((DiffusionRouting *)dr_)->removeTimer(rmst_ptr->ack_timer_handle_);
01038     rmst_ptr->ack_timer_active_ = false;
01039   }
01040 
01041   // Erase any NAKs. We are about to establish a new path.
01042   nak_list_iterator = nak_list_.begin();
01043   while (nak_list_iterator != nak_list_.end()){
01044     if (nak_list_iterator->rmst_no_ == rmst_no){
01045       DiffPrint(DEBUG_SOME_DETAILS,
01046         "  processExpReq erasing frag_no %d from nak_list_\n", nak_list_iterator->frag_no_);
01047       nak_list_iterator = nak_list_.erase(nak_list_iterator);
01048     }
01049     else
01050       nak_list_iterator++;
01051   }
01052 
01053   DiffPrint(DEBUG_LOTS_DETAILS, "  processExpReq done with nak_list_ for rmstId %d\n", rmst_no);
01054 
01055   // If we are being told to start by resending the last packet, back up by one.
01056   // When a sink gets an exploratory message, they don't start re-NAKing until they
01057   // know they are reinforced. Sinks only know they are reinforced when they get DATA.
01058   if ( (frag_no == rmst_ptr->max_frag_) && (rmst_ptr->max_frag_ > 0) ){
01059     frag_no--;
01060     DiffPrint(DEBUG_IMPORTANT, "  processExpReq decrements frag_no to %d\n", frag_no);
01061   }
01062 
01063   // Update send_list_ entry or add one.
01064   send_list_iterator = send_list_.begin();
01065   while (send_list_iterator != send_list_.end()){
01066     if (send_list_iterator->rmst_no_ == rmst_no)
01067       break;
01068     send_list_iterator++;
01069   }
01070   if (send_list_iterator != send_list_.end()){
01071     send_list_iterator->exp_base_ = frag_no;
01072     DiffPrint(DEBUG_SOME_DETAILS, "  processExpReq sets send_list_ expBase to %d\n", frag_no);
01073     send_list_iterator->last_frag_sent_ = frag_no-1;
01074   }
01075   else{
01076     SendMsgData new_send_msg;
01077     DiffPrint(DEBUG_SOME_DETAILS,
01078       "  processExpReq creating new send_list_ entry for rmstId %d\n", rmst_no);
01079     DiffPrint(DEBUG_SOME_DETAILS,
01080       "  processExpReq sets send_list_ expBase to %d\n", frag_no);
01081     new_send_msg.rmst_no_ = rmst_no;
01082     new_send_msg.exp_base_ = frag_no;
01083     new_send_msg.last_frag_sent_ = frag_no-1;
01084     send_list_.push_front(new_send_msg);
01085     if(!send_timer_active_){
01086       TimerCallback *send_timer;
01087       // Now add a timer to send this and any NAKS.
01088       DiffPrint(DEBUG_SOME_DETAILS, "  Set a SEND_TIMER for reinforced rmst_no %d\n", rmst_no);
01089       send_timer = new RmstTimeout(this, rmst_no, SEND_TIMER);
01090       // We check on things every second.
01091       send_timer_handle_ = ((DiffusionRouting *)dr_)->addTimer(SEND_INTERVAL, send_timer);
01092       send_timer_active_ = true;
01093     }
01094   }
01095 }
01096 
01097 handle RmstFilter::setupFilter()
01098 {
01099   NRAttrVec attrs;
01100   handle h;
01101 
01102   // This is a dummy attribute for filtering that matches everything
01103   attrs.push_back(NRClassAttr.make(NRAttribute::IS,
01104     NRAttribute::INTEREST_CLASS));
01105   h = ((DiffusionRouting *)dr_)->addFilter(&attrs, RMST_FILTER_PRIORITY, fcb_);
01106   ClearAttrs(&attrs);
01107   return h;
01108 }
01109 
01110 void RmstFilter::run()
01111 {
01112 #ifdef NS_DIFFUSION
01113   TimerCallback *stat_timer;
01114   filter_handle_ = setupFilter();
01115   DiffPrint(DEBUG_LOTS_DETAILS, "RmstFilter:: subscribed to all, received handle %d\n",
01116         (int)filter_handle_);
01117   
01118   DiffPrint(DEBUG_LOTS_DETAILS, "RmstFilter constructor: start cleanup timer\n");
01119   stat_timer = new RmstTimeout(this, -1, CLEANUP_TIMER);
01120   stat_timer_handle_ = ((DiffusionRouting *)dr_)->addTimer(CLEANUP_INTERVAL, stat_timer);
01121 #else
01122   // Doesn't do anything
01123   while(1){
01124       sleep(1000);
01125   }
01126 #endif // NS_DIFFUSION
01127 }
01128 
01129 RmstTimeout::RmstTimeout(RmstFilter *rmst_flt, int no, int type)
01130 {
01131   filter_ = rmst_flt;
01132   rmst_no_ = no;
01133   timer_type_ = type;
01134 }
01135 
01136 int RmstTimeout::expire()
01137 {
01138   int retval;
01139 
01140   retval = filter_->processTimer(rmst_no_, timer_type_);
01141   if(retval == -1)
01142     delete this;
01143 
01144   return retval;
01145 }
01146 
01147 int RmstFilter::processTimer(int rmst_no, int timer_type)
01148 {
01149   Rmst *rmst_ptr;
01150   void *frag_ptr;
01151   int frag_no;
01152   Int2Rmst::iterator rmst_iterator;
01153   timeval cur_time;
01154 
01155   GetTime (&cur_time);
01156 
01157   switch (timer_type){
01158 
01159   case SEND_TIMER:
01160     DiffPrint(DEBUG_SOME_DETAILS, "RmstFilter::processTimer SEND_TIMER");
01161     PrintTime(&cur_time);
01162     // If we haven't got any NAKs pending, send the next fragment of the Rmst
01163     // in progress. If we've sent all fragments of the current Rmst, we cancel
01164     // ourself. When we get a NAK, if this timer is active we send NAK responses
01165     // here. Otherwise we can send them directly from the NAK response routine.
01166     if (!nak_list_.empty()){
01167       Message *nak_resp;
01168       NRAttrVec nak_data_attrs;
01169       NakMsgData nak_msg_data = nak_list_.front();
01170       rmst_no = nak_msg_data.rmst_no_;
01171       frag_no = nak_msg_data.frag_no_;
01172 
01173       rmst_iterator = rmst_map_.find(rmst_no);
01174       if(rmst_iterator != rmst_map_.end()){
01175         rmst_ptr = (*rmst_iterator).second;
01176         // We have the fragment, set the last_data_time_ so that we defer
01177         // the cleanup of this Rmst, then we send the fragment to last hop.
01178         if (rmst_ptr->reinf_){
01179           GetTime(&rmst_ptr->last_data_time_);
01180           rmst_ptr->pkts_sent_++;
01181           nak_data_attrs.push_back(RmstTargetAttr.make(NRAttribute::IS,
01182             rmst_ptr->target_str_));
01183           nak_data_attrs.push_back(RmstTsprtCtlAttr.make(NRAttribute::IS,
01184             RMST_RESP));
01185           nak_data_attrs.push_back(RmstFragAttr.make(NRAttribute::IS, frag_no));
01186           // We routinely send the packet sent count on NAKs.
01187           nak_data_attrs.push_back(RmstPktsSentAttr.make(NRAttribute::IS,
01188             rmst_ptr->pkts_sent_));
01189           nak_data_attrs.push_back(RmstIdAttr.make(NRAttribute::IS, rmst_no));
01190           // Add the actual data; the length depends on if it's the last
01191           // fragment or not.
01192           frag_ptr =  rmst_ptr->getFrag(frag_no);
01193           if (frag_no == rmst_ptr->max_frag_)
01194             nak_data_attrs.push_back(RmstDataAttr.make(NRAttribute::IS,
01195               frag_ptr, rmst_ptr->max_frag_len_));
01196           else
01197             nak_data_attrs.push_back(RmstDataAttr.make(NRAttribute::IS,
01198               frag_ptr, MAX_FRAG_SIZE));
01199 
01200           DiffPrint(DEBUG_IMPORTANT, "  Filter sending Data for NAKed frag %d of Rmst %d\n",
01201             frag_no, rmst_no);
01202 
01203           nak_resp = new Message(DIFFUSION_VERSION, DATA, 0, 0,
01204             nak_data_attrs.size(), pkt_count_, rdm_id_,
01205             LOCALHOST_ADDR, LOCALHOST_ADDR);
01206           nak_resp->msg_attr_vec_ = CopyAttrs(&nak_data_attrs);
01207           ((DiffusionRouting *)dr_)->sendMessage(nak_resp, filter_handle_);
01208           pkt_count_++;
01209           delete nak_resp;
01210           ClearAttrs(&nak_data_attrs);
01211         }
01212         else
01213           DiffPrint(DEBUG_IMPORTANT, 
01214             "RmstFilter::processTimer sees non-reinforced path for NAK on rmst %d!\n",
01215             rmst_no);
01216       }
01217       else
01218         DiffPrint(DEBUG_IMPORTANT, "RmstFilter::processTimer can't find Rmst %d for NAK!\n",
01219           rmst_no);
01220       nak_list_.pop_front();
01221     }
01222 
01223     else if (!send_list_.empty()){
01224       int8_t msg_type;
01225       int action = DO_NOTHING;
01226       // Get the rmst and frag_no that is in progress.
01227       NRAttrVec data_attrs;
01228 
01229       SendMsgData send_data = send_list_.front();
01230       rmst_no = send_data.rmst_no_;
01231       rmst_iterator = rmst_map_.find(rmst_no);
01232       if(rmst_iterator == rmst_map_.end())
01233         action = DELETE_FROM_QUEUE;
01234       else{
01235         rmst_ptr = (*rmst_iterator).second;
01236         if ((send_data.last_frag_sent_ == rmst_ptr->max_frag_) && rmst_ptr->reinf_)
01237           action = DELETE_FROM_QUEUE;
01238         else if ( (send_data.last_frag_sent_ == send_data.exp_base_) &&
01239           (!rmst_ptr->reinf_) && (exp_gap_ < 10) ){
01240           action = DO_NOTHING;
01241           exp_gap_++;
01242         }
01243         else
01244           action = SEND_NEXT_FRAG;
01245       }
01246 
01247       switch (action){
01248 
01249       case(DELETE_FROM_QUEUE):
01250         // Delete message data from front. 
01251         send_list_.pop_front();
01252         break;
01253 
01254       case(SEND_NEXT_FRAG):
01255         send_list_.pop_front();
01256         if (rmst_ptr->reinf_){
01257           send_data.last_frag_sent_++;
01258           frag_no = send_data.last_frag_sent_;
01259         }
01260         else{
01261           frag_no = send_data.exp_base_;
01262           send_data.last_frag_sent_ = frag_no;
01263         }
01264         send_list_.push_front(send_data);
01265         rmst_ptr->max_frag_sent_ = frag_no;
01266 
01267         DiffPrint(DEBUG_IMPORTANT, "  Source Filter sending frag %d of Rmst %d\n",
01268           frag_no, rmst_no);
01269         
01270         // Now make a message.
01271         data_attrs.push_back(RmstTargetAttr.make(NRAttribute::IS, rmst_ptr->target_str_));
01272         data_attrs.push_back(RmstTsprtCtlAttr.make(NRAttribute::IS, RMST_RESP));
01273         data_attrs.push_back(RmstFragAttr.make(NRAttribute::IS, frag_no));
01274 
01275         // We send the MaxFragAttr on the first Exploratory packet,
01276         // and the PktsSentAttr on the last packet.
01277         if(frag_no == 0)
01278           data_attrs.push_back(RmstMaxFragAttr.make(NRAttribute::IS,
01279             rmst_ptr->max_frag_));
01280         else if (frag_no == rmst_ptr->max_frag_){
01281           rmst_ptr->pkts_sent_++;
01282           data_attrs.push_back(RmstPktsSentAttr.make(NRAttribute::IS,
01283             rmst_ptr->pkts_sent_));
01284         }
01285         else
01286           rmst_ptr->pkts_sent_++;
01287 
01288         data_attrs.push_back(RmstIdAttr.make(NRAttribute::IS, rmst_no));
01289 
01290         // Add the actual data; the length depends on if it's the last
01291         // fragment or not.
01292         frag_ptr =  rmst_ptr->getFrag(frag_no);
01293         if (rmst_ptr->max_frag_ == frag_no)
01294           data_attrs.push_back(RmstDataAttr.make(NRAttribute::IS, 
01295             frag_ptr, rmst_ptr->max_frag_len_));
01296         else
01297           data_attrs.push_back(RmstDataAttr.make(NRAttribute::IS,
01298             frag_ptr, MAX_FRAG_SIZE));
01299 
01300         if (frag_no == send_data.exp_base_){
01301           ExpLog exp_msg;
01302           union LlToInt key;
01303           // Insert this Exploratory message in exp_map_.
01304           // When we get a reinforcement we'll know what rmst it's for.
01305           key.int_val_[0] = pkt_count_;
01306           key.int_val_[1] = rdm_id_;
01307           DiffPrint(DEBUG_LOTS_DETAILS, "  Key = %llx\n", key.ll_val_);
01308           exp_msg.rmst_no_ = rmst_no;
01309           exp_msg.last_hop_ = LOCALHOST_ADDR;
01310           exp_map_.insert(Key2ExpLog::value_type(key.ll_val_, exp_msg));
01311           msg_type = EXPLORATORY_DATA;
01312           rmst_ptr->reinf_ = false;
01313           rmst_ptr->pkts_sent_ = 0;
01314           rmst_ptr->naks_rec_ = 0;
01315           exp_gap_ = 0;
01316           DiffPrint(DEBUG_IMPORTANT,
01317             "  Source Filter sending EXPLORATORY frag %d of Rmst %d\n",
01318             frag_no, rmst_no);
01319         }
01320         else
01321           msg_type = DATA;
01322 
01323         Message *new_frag;
01324         new_frag = new Message(DIFFUSION_VERSION, msg_type, 0, 0,
01325           data_attrs.size(), pkt_count_, rdm_id_,
01326           LOCALHOST_ADDR, LOCALHOST_ADDR);
01327         new_frag->msg_attr_vec_ = CopyAttrs(&data_attrs);
01328         ((DiffusionRouting *)dr_)->sendMessage(new_frag, filter_handle_);
01329         pkt_count_++;
01330         delete new_frag;
01331         ClearAttrs(&data_attrs);
01332         // We sent a fragment, set the last_data_time_ for the cleanup timer.
01333         GetTime(&rmst_ptr->last_data_time_);
01334 
01335         // If this is the last frag, we start an ACK_TIMER.
01336         if ( (rmst_ptr->max_frag_ == frag_no) &&
01337           (rmst_ptr->ack_timer_active_ == false) && (rmst_ptr->local_source_) ){
01338           TimerCallback *rmst_timer;
01339           DiffPrint(DEBUG_SOME_DETAILS, "  Set an ACK_TIMER at source for rmst_no %d\n",
01340             rmst_no);
01341           rmst_timer = new RmstTimeout(this, rmst_no, ACK_TIMER);
01342           // We check on things every 20 seconds.
01343           rmst_ptr->ack_timer_handle_ = 
01344             ((DiffusionRouting *)dr_)->addTimer(ACK_INTERVAL, rmst_timer);
01345           rmst_ptr->ack_timer_active_ = true;
01346         }
01347         break;
01348 
01349       case(DO_NOTHING):
01350         DiffPrint(DEBUG_LOTS_DETAILS, "  Nothing to do\n");
01351         break;
01352       } // Switch on Action
01353     }
01354 
01355     if (nak_list_.empty() && send_list_.empty()){
01356       DiffPrint(DEBUG_LOTS_DETAILS, "   Cancelling SEND_TIMER, no NAKS or data to send\n");
01357       send_timer_active_ = false;
01358       return -1;
01359     }
01360     else
01361       return 0;
01362     break;
01363 
01364   case WATCHDOG_TIMER:
01365     DiffPrint(DEBUG_SOME_DETAILS, "RmstFilter::processTimer WATCHDOG_TIMER for Rmst %d", rmst_no);
01366     PrintTime(&cur_time);
01367     rmst_iterator = rmst_map_.find(rmst_no);
01368     if(rmst_iterator != rmst_map_.end())
01369       rmst_ptr = (*rmst_iterator).second;
01370     else{
01371       DiffPrint(DEBUG_IMPORTANT,
01372         "RmstFilter::processTimer can't find Rmst %d for WATCHDOG, cancell timer!\n",
01373         rmst_no);
01374       return -1;
01375     }
01376 
01377     if(rmst_ptr->cancel_watchdog_){
01378       DiffPrint(DEBUG_SOME_DETAILS,
01379         "  processTimer cancelling WATCHDOG_TIMER for Rmst %d\n", rmst_no);
01380       rmst_ptr->watchdog_active_ = false;
01381       rmst_ptr->cancel_watchdog_ = false;
01382       return -1;
01383     }
01384 
01385     if (rmst_ptr->wait_for_new_path_){
01386       DiffPrint (DEBUG_IMPORTANT, "  WATCHDOG_TIMER sees wait_for_new_path_ - suspend NAKs\n");
01387       return 0;
01388     }
01389 
01390     // If we sent an exp request more than 20 seconds ago,
01391     // we send it again.
01392     if (rmst_ptr->sent_exp_req_){
01393       int exp_time;
01394       exp_time = cur_time.tv_sec - rmst_ptr->exp_req_time_.tv_sec;
01395       DiffPrint(DEBUG_SOME_DETAILS,
01396         "  Node sent an EXP_REQ: time since last exp = %d\n", exp_time);
01397       if( exp_time > 20){
01398         // Resend an EXP_REQ!!!
01399         DiffPrint(DEBUG_IMPORTANT, "  Node resends EXP_REQ up blacklisted stream!\n");
01400         sendExpReqUpstream(rmst_ptr);
01401         GetTime(&rmst_ptr->exp_req_time_);
01402       }
01403       else
01404         DiffPrint(DEBUG_LOTS_DETAILS, "  Node waits to send another EXP_REQ\n");
01405       return 0;
01406     }
01407 
01408     if (rmst_ptr->local_source_){
01409       if(rmst_ptr->acked_){
01410         DiffPrint(DEBUG_IMPORTANT, 
01411           "  WATCHDOG_TIMER Local Source sees acked state - cancel timer\n");
01412         return -1;
01413       }
01414       else{
01415         DiffPrint(DEBUG_LOTS_DETAILS, "  WATCHDOG_TIMER Local Source sees rmst not acked\n");
01416         return 0;
01417       }
01418     }
01419 
01420     // Check if we have waited too long for next fragment.
01421     if( ((cur_time.tv_sec - rmst_ptr->last_data_time_.tv_sec) > NEXT_FRAG_WAIT) &&
01422       (!rmst_ptr->rmstComplete()) ){
01423       int newHole = (rmst_ptr->max_frag_rec_)+1;
01424       if ( (newHole <= rmst_ptr->max_frag_) && (!rmst_ptr->inHoleMap(newHole)) ){
01425         DiffPrint(DEBUG_SOME_DETAILS, "  WATCHDOG_TIMER adds new hole, frag %d\n",
01426           newHole);
01427         rmst_ptr->putHole(newHole);
01428         NakData *nak_ptr = rmst_ptr->getHole(newHole);
01429         // Artificially age this hole so it gets naked immediately
01430         nak_ptr->tmv.tv_sec -= 4;
01431       }
01432     }
01433 
01434     if(rmst_ptr->holeMapEmpty()){
01435       // There aren't any holes!
01436       DiffPrint(DEBUG_SOME_DETAILS, "  WATCHDOG_TIMER sees No holes\n");
01437       return 0;
01438     }
01439     else{
01440       // The WATCHDOG_TIMER expired and we have a hole,
01441       //   so we may need to construct a NAK from the hole map
01442       //   If we've fallen off the reinforced path, we must
01443       //   stop adding NAKs to the NakList.
01444       DiffPrint(DEBUG_SOME_DETAILS, "  WATCHDOG_TIMER sees holes - check times.\n");
01445       if (rmst_ptr->reinf_)
01446         setupNak(rmst_no);
01447       // Reschedule timer with same value.
01448       return 0;
01449     }
01450     break;
01451 
01452   case ACK_TIMER:
01453     DiffPrint(DEBUG_SOME_DETAILS, "RmstFilter::processTimer ACK_TIMER for Rmst %d", rmst_no);
01454     PrintTime(&cur_time);
01455     rmst_iterator = rmst_map_.find(rmst_no);
01456     if(rmst_iterator != rmst_map_.end())
01457       rmst_ptr = (*rmst_iterator).second;
01458     else{
01459       DiffPrint(DEBUG_IMPORTANT,
01460         "RmstFilter::processTimer can't find Rmst %d for ACK_TIMER, cancell timer!\n",
01461            rmst_no);
01462       return -1;
01463     }
01464 
01465     DiffPrint(DEBUG_SOME_DETAILS, "RmstFilter::processTimer ACK_TIMER, at source for rmst_no %d\n",
01466       rmst_no);
01467 
01468     if (rmst_ptr->acked_){
01469       DiffPrint(DEBUG_IMPORTANT, "RmstFilter::processTimer cancel ACK_TIMER, Rmst %d ACKed\n",
01470         rmst_no);
01471       rmst_ptr->ack_timer_active_ = false;
01472       return -1;
01473     }
01474 
01475     // If there has been no data sent for 30 seconds like a NAK response, we need to resend a packet.
01476     if( (cur_time.tv_sec - rmst_ptr->last_data_time_.tv_sec) > ACK_WAIT ){
01477       NRAttrVec attrs;
01478       int8_t msg_type;
01479       DiffPrint(DEBUG_IMPORTANT, 
01480         "RmstFilter::processTimer ACK_TIMER, waited too long for Rmst %d ACK!\n", rmst_no);
01481       if(rmst_ptr->reinf_  && !rmst_ptr->resent_last_data_){
01482         // We should send the last frag again as an DATA packet.
01483         DiffPrint(DEBUG_SOME_DETAILS, 
01484           "RmstFilter::processTimer ACK_TIMER, resend last packet as DATA\n");
01485         msg_type = DATA;
01486         rmst_ptr->resent_last_data_ = true;
01487       }
01488       else if(rmst_ptr->resent_last_data_ && !rmst_ptr->resent_last_exp_){
01489         ExpLog exp_msg;
01490         union LlToInt key;
01491         // We tried resending last frag as data and it didn't work, try as EXP
01492         DiffPrint(DEBUG_IMPORTANT,
01493           "RmstFilter::processTimer ACK_TIMER, resend last packet as EXPLORATORY_DATA\n");
01494         // Insert this Exploratory message in exp_map_.
01495         // When we get a reinforcement we'll know what rmst it's for.
01496         key.int_val_[0] = pkt_count_;
01497         key.int_val_[1] = rdm_id_;
01498         DiffPrint(DEBUG_LOTS_DETAILS, "  Key = %llx\n", key.ll_val_);
01499         exp_msg.rmst_no_ = rmst_no;
01500         exp_msg.last_hop_ = LOCALHOST_ADDR;
01501         exp_map_.insert(Key2ExpLog::value_type(key.ll_val_, exp_msg));
01502         msg_type = EXPLORATORY_DATA;
01503         rmst_ptr->reinf_ = false;
01504         rmst_ptr->naks_rec_ = 0;
01505         rmst_ptr->pkts_sent_ = 0;
01506         rmst_ptr->resent_last_exp_ = true;
01507       }
01508       else if(rmst_ptr->resent_last_data_ && rmst_ptr->resent_last_exp_ && rmst_ptr->reinf_){
01509         // We should send the last frag again as an DATA packet.
01510         DiffPrint(DEBUG_IMPORTANT,
01511           "RmstFilter::processTimer ACK_TIMER, resend last packet on new reinf path as DATA\n");
01512         msg_type = DATA;
01513         rmst_ptr->resent_last_data_ = false;
01514         rmst_ptr->resent_last_exp_ = false;
01515       }
01516       else{
01517         ExpLog exp_msg;
01518         union LlToInt key;
01519         DiffPrint(DEBUG_IMPORTANT, 
01520           "RmstFilter::processTimer ACK_TIMER, resent last packet as EXP and no reinforced path, Try again!\n");
01521         // Insert this Exploratory message in exp_map_.
01522         // When we get a reinforcement we'll know what rmst it's for.
01523         key.int_val_[0] = pkt_count_;
01524         key.int_val_[1] = rdm_id_;
01525         DiffPrint(DEBUG_LOTS_DETAILS, "  Key = %llx\n", key.ll_val_);
01526         exp_msg.rmst_no_ = rmst_no;
01527         exp_msg.last_hop_ = LOCALHOST_ADDR;
01528         exp_map_.insert(Key2ExpLog::value_type(key.ll_val_, exp_msg));
01529         msg_type = EXPLORATORY_DATA;
01530       }
01531       attrs.push_back(RmstTargetAttr.make(NRAttribute::IS, rmst_ptr->target_str_));
01532       attrs.push_back(RmstTsprtCtlAttr.make(NRAttribute::IS, RMST_RESP));
01533       attrs.push_back(RmstFragAttr.make(NRAttribute::IS, rmst_ptr->max_frag_));
01534       attrs.push_back(RmstIdAttr.make(NRAttribute::IS, rmst_ptr->rmst_no_));
01535       frag_ptr =  rmst_ptr->getFrag(rmst_ptr->max_frag_);
01536       attrs.push_back(RmstDataAttr.make(NRAttribute::IS, 
01537         frag_ptr, rmst_ptr->max_frag_len_));
01538       Message *new_frag;
01539       new_frag = new Message(DIFFUSION_VERSION, msg_type, 0, 0, attrs.size(), pkt_count_,
01540         rdm_id_, LOCALHOST_ADDR, LOCALHOST_ADDR);
01541       new_frag->msg_attr_vec_ = CopyAttrs(&attrs);
01542       ((DiffusionRouting *)dr_)->sendMessage(new_frag, filter_handle_);
01543       pkt_count_++;
01544       delete new_frag;
01545       ClearAttrs(&attrs);
01546       // We sent a fragment, set the last_data_time_ for the cleanup timer.
01547       GetTime(&rmst_ptr->last_data_time_);
01548     }
01549     return 0;
01550     break;
01551 
01552   case CLEANUP_TIMER:
01553     DiffPrint(DEBUG_SOME_DETAILS, "RmstFilter::processTimer CLEANUP_TIMER");
01554     PrintTime(&cur_time);
01555 
01556     DiffPrint(DEBUG_IMPORTANT, "  CLEANUP_TIMER called\n");
01557     rmst_iterator = rmst_map_.begin();
01558     while(rmst_iterator != rmst_map_.end()){
01559       rmst_ptr = (*rmst_iterator).second;
01560 
01561       DiffPrint(DEBUG_SOME_DETAILS, 
01562         "  CLEANUP_TIMER:: rmst_no %d : pkts_sent_ = %d, pkts_rec_ = %d, last_hop_pkts_sent_ = %d\n",
01563         rmst_ptr->rmst_no_, rmst_ptr->pkts_sent_, rmst_ptr->pkts_rec_, rmst_ptr->last_hop_pkts_sent_);
01564 
01565       if((!rmst_ptr->reinf_)&&(!rmst_ptr->acked_)&&(!rmst_ptr->local_source_)&&(!local_sink_)){
01566         if( (cur_time.tv_sec - rmst_ptr->last_data_time_.tv_sec) > LONG_CLEANUP_WAIT )
01567           cleanUpRmst(rmst_ptr);
01568       }
01569       else if (rmst_ptr->acked_){
01570         if ( ( (cur_time.tv_sec - rmst_ptr->last_data_time_.tv_sec) > SHORT_CLEANUP_WAIT ) &&
01571            ( (cur_time.tv_sec - rmst_ptr->last_nak_time_.tv_sec) > SHORT_CLEANUP_WAIT ) )
01572            cleanUpRmst(rmst_ptr);
01573       }
01574             
01575       rmst_iterator++;
01576     }
01577 
01578     // Check on the BlackList, in case network is partitioned.
01579     if (!black_list_.empty()){
01580       if ( (cur_time.tv_sec - last_data_rec_.tv_sec) > RMST_BLACKLIST_WAIT ){
01581         DiffPrint(DEBUG_IMPORTANT, "  clearing black_list_!\n");
01582         ((DiffusionRouting *)dr_)->clearBlacklist();
01583         black_list_.clear();
01584       }
01585     }
01586 
01587     if (local_sink_){
01588       if ( (cur_time.tv_sec - last_sink_time_.tv_sec) > SINK_REFRESH_WAIT ){
01589         DiffPrint(DEBUG_IMPORTANT, "  local sink timed out\n");
01590         local_sink_ = false;
01591       }
01592       else
01593         DiffPrint(DEBUG_IMPORTANT, "  local sink still alive.\n");
01594     }
01595 
01596     return 0;
01597     break;
01598 
01599   default:
01600     break;
01601   }
01602   return -1;
01603 }
01604 
01605 void RmstFilter::sendRmstToSink(Rmst *rmst_ptr)
01606 {
01607   NRAttrVec attrs;
01608   Message *rmst_msg;
01609   NRSimpleAttribute<void *> *rmst_data_attr;
01610   NRSimpleAttribute<int> *frag_number_attr;
01611   void *frag_ptr;
01612   int size, i;
01613 
01614   DiffPrint(DEBUG_IMPORTANT, "RmstFilter::sendRmstToSink - sending rmst %d to local sink\n",
01615     rmst_ptr->rmst_no_);
01616 
01617   // Prepare attribute vector
01618   attrs.push_back(RmstTargetAttr.make(NRAttribute::IS, rmst_ptr->target_str_));
01619   attrs.push_back(RmstTsprtCtlAttr.make(NRAttribute::IS, RMST_RESP));
01620   attrs.push_back(RmstIdAttr.make(NRAttribute::IS, rmst_ptr->rmst_no_));
01621   attrs.push_back(RmstMaxFragAttr.make(NRAttribute::IS, rmst_ptr->max_frag_));
01622   frag_number_attr = RmstFragAttr.make(NRAttribute::IS, 0);
01623   attrs.push_back(frag_number_attr);
01624   // Add the blob fragment
01625   if (rmst_ptr->max_frag_ == 0)
01626     size = rmst_ptr->max_frag_len_;
01627   else
01628     size = MAX_FRAG_SIZE;
01629   frag_ptr =  rmst_ptr->getFrag(0);
01630   rmst_data_attr = RmstDataAttr.make(NRAttribute::IS, frag_ptr, size);
01631   attrs.push_back(rmst_data_attr);
01632 
01633   // Prepare the message
01634   rmst_msg = new Message(DIFFUSION_VERSION, DATA, 0, 0, attrs.size(),
01635     pkt_count_, rdm_id_, LOCALHOST_ADDR, LOCALHOST_ADDR);
01636   rmst_msg->next_hop_ = LOCALHOST_ADDR;
01637   rmst_msg->next_port_ = local_sink_port_;
01638   rmst_msg->msg_attr_vec_= CopyAttrs(&attrs);
01639   ((DiffusionRouting *)dr_)->sendMessage(rmst_msg, filter_handle_, 1);
01640   delete rmst_msg;
01641   pkt_count_++;
01642 
01643   // Send all the fragments
01644   for (i=1; i <= (rmst_ptr->max_frag_); i++){
01645     frag_number_attr->setVal(i);
01646     frag_ptr =  rmst_ptr->getFrag(i);
01647     if(frag_ptr == NULL)
01648       DiffPrint(DEBUG_IMPORTANT, "RmstFilter::sendRmstToSink - got a null frag_ptr for frag!%d\n",
01649       i);
01650     else{
01651       if (rmst_ptr->max_frag_ == i)
01652         rmst_data_attr->setVal(frag_ptr, rmst_ptr->max_frag_len_);
01653       else
01654         rmst_data_attr->setVal(frag_ptr, MAX_FRAG_SIZE);
01655     }
01656     rmst_msg = new Message(DIFFUSION_VERSION, DATA, 0, 0, attrs.size(),
01657       pkt_count_, rdm_id_, LOCALHOST_ADDR, LOCALHOST_ADDR);
01658     rmst_msg->next_hop_ = LOCALHOST_ADDR;
01659     rmst_msg->next_port_ = local_sink_port_;
01660     rmst_msg->msg_attr_vec_= CopyAttrs(&attrs);
01661     ((DiffusionRouting *)dr_)->sendMessage(rmst_msg, filter_handle_, 1);
01662     delete rmst_msg;
01663     pkt_count_++;
01664   }
01665 
01666   ClearAttrs(&attrs);
01667 }
01668 
01669 void RmstFilter::sendAckToSource(Rmst *rmst_ptr)
01670 {
01671   NRAttrVec attrs;
01672   Message *ack_msg;
01673 
01674   attrs.push_back(RmstTsprtCtlAttr.make(NRAttribute::IS, ACK_RESP));
01675   attrs.push_back(RmstIdAttr.make(NRAttribute::IS, rmst_ptr->rmst_no_));
01676 
01677   // New code to send a message to last_hop_
01678   ack_msg = new Message(DIFFUSION_VERSION, DATA, 0, 0,
01679             attrs.size(), pkt_count_, rdm_id_, rmst_ptr->last_hop_,
01680             LOCALHOST_ADDR);
01681   ack_msg->msg_attr_vec_ = CopyAttrs(&attrs);
01682 
01683   DiffPrint(DEBUG_IMPORTANT, "  Sending ACK_RESP to node %d\n", rmst_ptr->last_hop_);
01684   ((DiffusionRouting *)dr_)->sendMessage(ack_msg, filter_handle_, 1);
01685   pkt_count_++;
01686   delete ack_msg;
01687   ClearAttrs(&attrs);
01688 }
01689 
01690 void RmstFilter::sendExpReqUpstream(Rmst *rmst_ptr)
01691 {
01692   NRAttrVec attrs;
01693   Message *exp_msg;
01694 
01695   attrs.push_back(RmstTsprtCtlAttr.make(NRAttribute::IS, EXP_REQ));
01696   attrs.push_back(RmstIdAttr.make(NRAttribute::IS, rmst_ptr->rmst_no_));
01697   attrs.push_back(RmstFragAttr.make(NRAttribute::IS, rmst_ptr->max_frag_rec_));
01698 
01699   exp_msg = new Message(DIFFUSION_VERSION, DATA, 0, 0,
01700             attrs.size(), pkt_count_, rdm_id_, rmst_ptr->last_hop_,
01701             LOCALHOST_ADDR);
01702   exp_msg->msg_attr_vec_ = CopyAttrs(&attrs);
01703 
01704   DiffPrint(DEBUG_IMPORTANT, "  Sending EXP_REQ to node %d\n", rmst_ptr->last_hop_);
01705   ((DiffusionRouting *)dr_)->sendMessage(exp_msg, filter_handle_, 1);
01706   pkt_count_++;
01707   delete exp_msg;
01708   ClearAttrs(&attrs);
01709 }
01710 
01711 void RmstFilter::sendContToSource(Rmst *rmst_ptr)
01712 {
01713   NRAttrVec attrs;
01714   Message *cont_msg;
01715   attrs.push_back(RmstTsprtCtlAttr.make(NRAttribute::EQ, RMST_CONT));
01716   attrs.push_back(NRClassAttr.make(NRAttribute::IS,
01717     NRAttribute::INTEREST_CLASS));
01718   DiffPrint(DEBUG_IMPORTANT, "  Sending a RMST_CONT to source\n");
01719   cont_msg = new Message(DIFFUSION_VERSION, DATA, 0, 0, attrs.size(),
01720     pkt_count_, rdm_id_, LOCALHOST_ADDR, LOCALHOST_ADDR);
01721   cont_msg->msg_attr_vec_ = CopyAttrs(&attrs);
01722   cont_msg->next_hop_ = LOCALHOST_ADDR;
01723   cont_msg->next_port_ = rmst_ptr->local_source_port_;
01724   ((DiffusionRouting *)dr_)->sendMessage(cont_msg, filter_handle_, 1);
01725   pkt_count_++;
01726   delete cont_msg;
01727   ClearAttrs(&attrs);
01728 }
01729 
01730 void RmstFilter::cleanUpRmst(Rmst *rmst_ptr)
01731 {
01732   int rmst_no = rmst_ptr->rmst_no_;
01733 
01734   Int2Rmst::iterator rmst_iterator;
01735   Key2ExpLog::iterator exp_iterator;
01736   ExpLog exp_msg;
01737   rmst_no = rmst_ptr->rmst_no_;
01738   DiffPrint(DEBUG_IMPORTANT, "  cleanUpRmst called to delete Rmst %d\n", rmst_no);
01739 
01740   if(rmst_ptr->watchdog_active_)
01741     ((DiffusionRouting *)dr_)->removeTimer(rmst_ptr->watchdog_handle_);
01742   if(rmst_ptr->ack_timer_active_)
01743     ((DiffusionRouting *)dr_)->removeTimer(rmst_ptr->ack_timer_handle_);
01744 
01745   rmst_iterator = rmst_map_.find(rmst_no);
01746   if(rmst_iterator != rmst_map_.end()){
01747     rmst_map_.erase(rmst_iterator);
01748   }
01749   delete rmst_ptr;
01750 
01751   // clean up the exp_map_ of any entries base on this rmst
01752   exp_iterator = exp_map_.begin();
01753   while(exp_iterator != exp_map_.end()){
01754     exp_msg = (*exp_iterator).second;
01755     if(exp_msg.rmst_no_ == rmst_no){
01756       DiffPrint(DEBUG_LOTS_DETAILS, "  cleanUpRmst deleting exp_map_ entry for Rmst %d\n", rmst_no);
01757       exp_map_.erase(exp_iterator);
01758     }
01759     exp_iterator++;
01760   }
01761 }
01762 
01763 #ifdef NS_DIFFUSION
01764 RmstFilter::RmstFilter()
01765 {
01766 #else
01767 RmstFilter::RmstFilter(int argc, char **argv)
01768 
01769 {
01770   TimerCallback *stat_timer;
01771 
01772   parseCommandLine(argc, argv);
01773   dr_ = NR::createNR(diffusion_port_);
01774 #endif // NS_DIFFUSION
01775 
01776   fcb_ = new RmstFilterCallback;
01777   fcb_->app_ = this;
01778   rdm_id_ = rand();
01779   pkt_count_ = rand();
01780   local_sink_ = false;
01781   caching_mode_ = false;
01782   send_timer_active_ = false;
01783 
01784   DiffPrint(DEBUG_ALWAYS, "RmstFilter constructor: rdm_id_ = %x, pkt_count_ = %x\n",
01785         rdm_id_, pkt_count_);
01786 
01787 #ifndef NS_DIFFUSION
01788   filter_handle_ = setupFilter();
01789   DiffPrint(DEBUG_LOTS_DETAILS, "RmstFilter:: subscribed to all, received handle %d\n",
01790         (int)filter_handle_);
01791   
01792   DiffPrint(DEBUG_LOTS_DETAILS, "RmstFilter constructor: start cleanup timer\n");
01793   stat_timer = new RmstTimeout(this, -1, CLEANUP_TIMER);
01794   stat_timer_handle_ = ((DiffusionRouting *)dr_)->addTimer(CLEANUP_INTERVAL, stat_timer);
01795 #endif // !NS_DIFFUSION
01796 }
01797 
01798 #ifndef NS_DIFFUSION
01799 int main(int argc, char **argv)
01800 {
01801   RmstFilter *app;
01802   app = new RmstFilter(argc, argv);
01803   app->run();
01804 
01805   return 0;
01806 }
01807 #endif // !NS_DIFFUSION

Generated on Tue Mar 6 16:47:50 2007 for ns2 Network Simulator 2.29 by  doxygen 1.4.6