pgm-receiver.cc

Go to the documentation of this file.
00001 
00002 /*
00003  * pgm-receiver.cc
00004  * Copyright (C) 2001 by the University of Southern California
00005  * $Id: pgm-receiver.cc,v 1.7 2005/08/25 18:58:10 johnh Exp $
00006  *
00007  * This program is free software; you can redistribute it and/or
00008  * modify it under the terms of the GNU General Public License,
00009  * version 2, as published by the Free Software Foundation.
00010  *
00011  * This program is distributed in the hope that it will be useful,
00012  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00013  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00014  * GNU General Public License for more details.
00015  *
00016  * You should have received a copy of the GNU General Public License along
00017  * with this program; if not, write to the Free Software Foundation, Inc.,
00018  * 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA.
00019  *
00020  *
00021  * The copyright of this module includes the following
00022  * linking-with-specific-other-licenses addition:
00023  *
00024  * In addition, as a special exception, the copyright holders of
00025  * this module give you permission to combine (via static or
00026  * dynamic linking) this module with free software programs or
00027  * libraries that are released under the GNU LGPL and with code
00028  * included in the standard release of ns-2 under the Apache 2.0
00029  * license or under otherwise-compatible licenses with advertising
00030  * requirements (or modified versions of such code, with unchanged
00031  * license).  You may copy and distribute such a system following the
00032  * terms of the GNU GPL for this module and the licenses of the
00033  * other code concerned, provided that you include the source code of
00034  * that other code when and as the GNU GPL requires distribution of
00035  * source code.
00036  *
00037  * Note that people who make modified versions of this module
00038  * are not obligated to grant this special exception for their
00039  * modified versions; it is their choice whether to do so.  The GNU
00040  * General Public License gives permission to release a modified
00041  * version without this exception; this exception also makes it
00042  * possible to release a modified version which carries forward this
00043  * exception.
00044  *
00045  */
00046 
00047 /*
00048  * Pragmatic General Multicast (PGM), Reliable Multicast
00049  *
00050  * pgm-receiver.cc
00051  *
00052  * This implements the Receiving PGM agent, "Agent/PGM/Receiver".
00053  *
00054  * Ryan S. Barnett, 2001
00055  * rbarnett@catarina.usc.edu
00056  */
00057 
00058 #include "config.h"
00059 #ifdef HAVE_STL
00060 
00061 #include <stdlib.h>
00062 #include <stdio.h>
00063 
00064 #include <map>
00065 
00066 #include "config.h"
00067 #include "tclcl.h"
00068 #include "agent.h"
00069 #include "packet.h"
00070 #include "ip.h"
00071 #include "random.h"
00072 #include "basetrace.h"
00073 
00074 #include "rcvbuf.h"
00075 
00076 #include "pgm.h"
00077 
00078 // ************************************************************
00079 // Define the PGM Receive Timer Class
00080 // ************************************************************
00081 class PgmReceiver;
00082 
00083 // Types of timers.
00084 enum {
00085   NAK_TIMER = 0
00086 };
00087 
00088 class PgmReceiverTimer : public TimerHandler {
00089 public:
00090   PgmReceiverTimer(PgmReceiver *a, int type) : TimerHandler(), data_(NULL) {
00091     a_ = a;
00092     type_ = type;
00093   }
00094 
00095   void * & data() { return data_; }
00096 
00097 protected:
00098   virtual void expire(Event *e);
00099   PgmReceiver *a_;
00100   int type_;
00101   void *data_;
00102 };
00103 
00104 // ************************************************************
00105 // Define the NakItem Class
00106 // ************************************************************
00107 
00108 // The different states of a NakItem entry.
00109 enum {
00110   BACK_OFF_STATE = 0,
00111   WAIT_NCF_STATE = 1,
00112   WAIT_DATA_STATE = 2
00113 };
00114 
00115 class NakItem {
00116 public:
00117   NakItem(PgmReceiver *a, int seqno) : nak_state_(BACK_OFF_STATE),
00118        nak_sent_(false), seqno_(seqno), ncf_retry_count_(0),
00119        data_retry_count_(0), nak_timer_(a, NAK_TIMER) { }
00120 
00121   int & nak_state() { return nak_state_; }
00122   bool & nak_sent() { return nak_sent_; }
00123   int & seqno() { return seqno_; }
00124   int & ncf_retry_count() { return ncf_retry_count_; }
00125   int & data_retry_count() { return data_retry_count_; }
00126   PgmReceiverTimer & nak_timer() { return nak_timer_; }
00127 
00128 protected:
00129 
00130   // The current state of the NAK entry.
00131   int nak_state_;
00132 
00133   // Whether or not this NAK has been transmitted once or more.
00134   bool nak_sent_;
00135 
00136   // Sequence number of the missing NAK item.
00137   int seqno_;
00138 
00139   // Number of times we have sent out a NAK but timed out waiting for an NCF.
00140   int ncf_retry_count_;
00141 
00142   // Number of times we got an NCF but timed out waiting for RDATA/ODATA.
00143   int data_retry_count_;
00144 
00145   PgmReceiverTimer nak_timer_;
00146 };
00147 
00148 // ************************************************************
00149 // Define the PGM Receiver Class
00150 // ************************************************************
00151 static int pgm_rcv_uid_ = 0;
00152 
00153 struct Stats {
00154   // Number of naks that did NOT get sent because we received an NCF
00155   // before our timer went up.
00156   int naks_transmitted_;
00157 
00158   // Number of duplicate naks that were sent to the upstream node.
00159   int naks_duplicated_;
00160 };
00161 
00162 class PgmReceiver: public Agent {
00163 public:
00164   PgmReceiver();
00165 
00166   virtual void recv(Packet *, Handler *);
00167   virtual void timeout(int type, void *data);
00168   virtual int command(int argc, const char*const* argv);
00169 
00170 protected:
00171 
00172   void handle_spm(Packet *pkt);
00173   void handle_odata(Packet *pkt);
00174   void handle_rdata(Packet *pkt);
00175   void handle_nak(Packet *pkt);
00176   void handle_ncf(Packet *pkt);
00177 
00178   void generate_Nak(int seqno);
00179   void cancel_Nak(int seqno, NakItem *nitem = NULL);
00180   void timeout_nak(NakItem *data);
00181   void send_nak(int seqno);
00182 
00183   void print_stats();
00184   void display_packet(Packet *pkt); // For debugging.
00185 
00186   void PgmReceiver::trace_event(char *evType, double evTime);
00187 
00188   EventTrace * et_;  //Trace Object for Custom Event Trace
00189 
00190   char uname_[16]; // Unique PGM receiver name, for debugging.
00191 
00192   // Various statistical information.
00193   Stats stats_;
00194 
00195   // Maximum number of times we can send out a NAK and time-out waiting for
00196   // an NCF reply. Once we hit this many times, we discard the NAK state
00197   // entirely and loose data.
00198   int max_nak_ncf_retries_;
00199 
00200   // Maximum number of times we can time-out waiting for RDATA after an
00201   // NCF confirmation for a NAK request.  Once we hit this many times, we
00202   // discard the NAK state entirely and loose data.
00203   int max_nak_data_retries_;
00204 
00205   // A random amount of this time period will be selected to wait for an
00206   // NCF after detecting a gap in the data stream, before sending out a NAK.
00207   double nak_bo_ivl_;
00208 
00209   // The amount of time to wait for a NCF packet after sending out a NAK
00210   // packet to the upstream node.
00211   double nak_rpt_ivl_;
00212 
00213   // The amount of time to wait for RDATA after receiving an NCF confirmation
00214   // for a given NAK.
00215   double nak_rdata_ivl_;
00216 
00217   // Whether or not the tsi/upstream_node/upstream_face are valid, i.e. have
00218   // we received at least one SPM packet for the session.
00219   bool have_tsi_state_;
00220 
00221   int spm_seqno_; // Last largest received SPM sequence number.
00222 
00223   ns_addr_t tsi_; // Transport Session ID
00224   ns_addr_t upstream_node_; // Address of upstream PGM router.
00225   int upstream_iface_; // Interface of upstream PGM router.
00226 
00227   // Source and group of ODATA/RDATA packets. Used when sending NAK messages.
00228   ns_addr_t source_;
00229   ns_addr_t group_;
00230 
00231   // Keep track of received packets and collect various statistics.
00232   RcvBuffer rcvbuf_;
00233 
00234   // Collection of sequence numbers that we are waiting for RDATA/ODATA.
00235   map<int, NakItem> naks_;
00236 
00237 };
00238 
00239 static class PgmReceiverClass : public TclClass {
00240 public:
00241   PgmReceiverClass() : TclClass("Agent/PGM/Receiver") {}
00242   TclObject * create(int argc, const char * const * argv) {
00243     return (new PgmReceiver());
00244   }
00245 } class_pgm_receiver;
00246 
00247 void PgmReceiverTimer::expire(Event *e) {
00248   a_->timeout(type_, data_);
00249 }
00250 
00251 // Constructor.
00252 PgmReceiver::PgmReceiver() : Agent(PT_PGM), have_tsi_state_(false),
00253                  spm_seqno_(-1)
00254 {
00255   stats_.naks_transmitted_ = 0;
00256   stats_.naks_duplicated_ = 0;
00257 
00258   sprintf (uname_, "pgmRecv-%d", pgm_rcv_uid_++);
00259   bind("max_nak_ncf_retries_", &max_nak_ncf_retries_);
00260   bind("max_nak_data_retries_", &max_nak_data_retries_);
00261   bind_time("nak_bo_ivl_", &nak_bo_ivl_);
00262   bind_time("nak_rpt_ivl_", &nak_rpt_ivl_);
00263   bind_time("nak_rdata_ivl_", &nak_rdata_ivl_);
00264 
00265   et_ = (EventTrace *) NULL;
00266 
00267 }
00268 
00269 // Code to execute when a packet is received.
00270 void PgmReceiver::recv(Packet *pkt, Handler *)
00271 {
00272   hdr_pgm *hp = HDR_PGM(pkt);
00273 
00274   hdr_cmn *hc = HDR_CMN(pkt);
00275 
00276   if (hc->ptype_ != PT_PGM) {
00277     printf("%s ERROR (PgmReceiver::recv): received non PGM pkt type %d, discarding.\n", uname_, hc->ptype_);
00278     Packet::free(pkt);
00279     return;
00280   }
00281 
00282 #ifdef PGM_DEBUG
00283   display_packet(pkt);
00284 #endif
00285 
00286   switch(hp->type_) {
00287   case PGM_SPM:
00288     handle_spm(pkt);
00289     break;
00290   case PGM_ODATA:
00291     handle_odata(pkt);
00292     break;
00293   case PGM_RDATA:
00294     handle_rdata(pkt);
00295     break;
00296   case PGM_NAK:
00297     // We only receive a NAK if it is multicast from another receiver
00298     // who is not directly connected to a PGM router.
00299     handle_nak(pkt);
00300     break;
00301   case PGM_NCF:
00302     handle_ncf(pkt);
00303     break;
00304   default:
00305     printf("ERROR (PgmReceiver::recv): Received invalid PGM type %d.\n",
00306        hp->type_);
00307     break;
00308   }
00309 
00310   Packet::free(pkt);
00311 }
00312 
00313 // Code to execute when a timeout occurs.
00314 void PgmReceiver::timeout(int type, void *data)
00315 {
00316 
00317   switch(type) {
00318   case NAK_TIMER:
00319     timeout_nak((NakItem *) data);
00320     break;
00321   default:
00322     printf("ERROR (PgmReceiver::timeout): Unknown timeout type %d.\n", type);
00323     break;
00324   }
00325 
00326 }
00327 
00328 // Called when a TCL command is issued to the PGM Receiver object.
00329 int PgmReceiver::command(int argc, const char*const* argv)
00330 {
00331   //  Tcl& tcl = Tcl::instance();
00332 
00333   if (argc == 2) {
00334     if (strcmp(argv[1], "print-stats") == 0) {
00335       print_stats();
00336       return (TCL_OK);
00337     }
00338   }
00339   else if (argc == 3) {
00340     if (strcmp(argv[1], "eventtrace") == 0) {
00341       et_ = (EventTrace *)TclObject::lookup(argv[2]);
00342       return (TCL_OK);
00343     }
00344   }
00345 
00346   return (Agent::command(argc, argv));
00347 }
00348 
00349 void PgmReceiver::trace_event(char *evType, double evTime) {
00350 
00351   if (et_ == NULL) return;
00352   char *wrk = et_->buffer();
00353   char *nwrk = et_->nbuffer();
00354 
00355   if (wrk != NULL) {
00356     sprintf(wrk, "E "TIME_FORMAT" %d %d PGM %s "TIME_FORMAT, 
00357             et_->round(Scheduler::instance().clock()),   
00358             addr(),                    
00359             0,                   
00360             evType,                  
00361             evTime);    
00362   if (nwrk != 0)
00363     sprintf(nwrk,
00364             "E -t "TIME_FORMAT" -o PGM -e %s -s %d.%d -d %d.%d",
00365             et_->round(Scheduler::instance().clock()),   // time
00366             evType,                    // event type
00367             addr(),                       // owner (src) node id
00368             port(),                       // owner (src) port id
00369             0,                      // dst node id
00370             0                       // dst port id
00371             );
00372     et_->dump();
00373   }
00374 
00375 }
00376 
00377 void PgmReceiver::handle_spm(Packet *pkt)
00378 {
00379   hdr_cmn *hc = HDR_CMN(pkt);
00380   hdr_ip *hip = HDR_IP(pkt);
00381   hdr_pgm *hp = HDR_PGM(pkt);
00382   hdr_pgm_spm *hps = HDR_PGM_SPM(pkt);
00383 
00384   if (have_tsi_state_ == false) {
00385     // First SPM message.
00386     have_tsi_state_ = true;
00387 
00388     // Set the TSI.
00389     tsi_ = hp->tsi_;
00390 
00391     // Set the source and group addresses for this TSI.
00392     source_ = hip->src();
00393     group_ = hip->dst();
00394   }
00395   else {
00396 
00397     // Check that the TSI is correct.
00398     if (!(hp->tsi_.isEqual (tsi_))) {
00399       printf("%s Received SPM with incorrect TSI, discarding.\n", uname_);
00400       return;
00401     }
00402 
00403     // Check that the sequence number is newer than a previous SPM message.
00404     if (hp->seqno_ <= spm_seqno_) {
00405       printf("%s received an old SPM seqno, discarding.\n", uname_);
00406       return;
00407     }
00408   }
00409 
00410   // Set the initial sequence number.
00411   spm_seqno_ = hp->seqno_;
00412 
00413   // Set the upstream node.
00414   upstream_node_ = hps->spm_path_;
00415 
00416   // Set the upstream interface.
00417   upstream_iface_ = hc->iface();
00418 }
00419 
00420 void PgmReceiver::handle_odata(Packet *pkt)
00421 {
00422   hdr_pgm *hp = HDR_PGM(pkt);
00423 
00424   // Check that the TSI is correct.
00425   if ( (have_tsi_state_ == true) && !(hp->tsi_.isEqual (tsi_)) ) {
00426     printf("PGM Receiver received ODATA with incorrect TSI, discarding.\n");
00427     return;
00428   }
00429 
00430   double clock = Scheduler::instance().clock();
00431 
00432   if (rcvbuf_.nextpkt_ < hp->seqno_) {
00433     int lo = rcvbuf_.nextpkt_;
00434     int hi = hp->seqno_ - 1;
00435 
00436     for (int i = lo; i <= hi; i++) {
00437       printf("%s detected loss of seq %d\n", uname_, i);
00438 
00439       if (have_tsi_state_ == false) {
00440     printf("%s has no TSI/SPM state when lost packet was detected. This results in unrecoverable data loss.\n", uname_);
00441       }
00442       else {
00443     generate_Nak(i);
00444       }
00445     }
00446   }
00447 
00448   rcvbuf_.add_pkt(hp->seqno_, clock);
00449 
00450   // Recept of ODATA that came in late could cancel a previously
00451   // generated NAK.
00452   cancel_Nak(hp->seqno_, NULL);
00453 }
00454 
00455 void PgmReceiver::handle_rdata(Packet *pkt)
00456 {
00457 
00458   hdr_pgm *hp = HDR_PGM(pkt);
00459 
00460   // Check that the TSI is correct.
00461   if ( (have_tsi_state_ == true) && !(hp->tsi_.isEqual (tsi_)) ) {
00462     printf("%s received RDATA with incorrect TSI, discarding.\n", uname_);
00463     return;
00464   }
00465 
00466   if ( (rcvbuf_.nextpkt_ > hp->seqno_) && !rcvbuf_.exists_pkt(hp->seqno_) ) {
00467     // The receive state may or may not exist, depending on how late the
00468     // RDATA packet came back to this node.
00469     cancel_Nak(hp->seqno_, NULL);
00470   }
00471 
00472   double clock = Scheduler::instance().clock();
00473   rcvbuf_.add_pkt (hp->seqno_, clock);
00474 
00475 }
00476 
00477 // The receiver will receive a NAK only if it is sent as a multicast from
00478 // another receiver in the event that the other receiver is not directly
00479 // connected to a PGM router.
00480 void PgmReceiver::handle_nak(Packet *pkt)
00481 {
00482   hdr_pgm *hp = HDR_PGM(pkt);
00483 
00484   // Check that the TSI is correct.
00485   if ( (have_tsi_state_ == true) && !(hp->tsi_.isEqual (tsi_)) ) {
00486     printf("%s received NAK with incorrect TSI, discarding.\n", uname_);
00487     return;
00488   }
00489 
00490   // Locate the nak state for the given multicast NAK.
00491   map<int, NakItem>::iterator result = naks_.find(hp->seqno_);
00492 
00493   if (result == naks_.end()) {
00494     // No state was found. Discard the NCF.
00495     printf("%s received multicast NAK but no NAK state found, discarding.\n", uname_);
00496     return;
00497   }
00498 
00499   NakItem *nitem = &((*result).second);
00500 
00501   switch( nitem->nak_state() ) {
00502   case BACK_OFF_STATE:
00503     // Move to WAIT_NCF_STATE.
00504     nitem->nak_state() = WAIT_NCF_STATE;
00505 
00506     // Reset the timer.
00507     nitem->nak_timer().resched(nak_rpt_ivl_);
00508 
00509     break;
00510   case WAIT_NCF_STATE:
00511     // Stay in the same state.
00512 
00513     // Reset the timer.
00514     nitem->nak_timer().resched(nak_rpt_ivl_);
00515 
00516     break;
00517   case WAIT_DATA_STATE:
00518     // Stay in the same state.
00519 
00520     // Reset the timer.
00521     nitem->nak_timer().resched(nak_rdata_ivl_);
00522 
00523     break;
00524   default:
00525     printf("ERROR (PgmReceiver::handle_nak): Unknown nak state %d.\n", nitem->nak_state());
00526     break;
00527   }
00528 
00529 }
00530 
00531 void PgmReceiver::handle_ncf(Packet *pkt)
00532 {
00533   hdr_pgm *hp = HDR_PGM(pkt);
00534 
00535   // Check that the TSI is correct.
00536   if ( (have_tsi_state_ == true) && !(hp->tsi_.isEqual (tsi_)) ) {
00537     printf("%s received NCF with incorrect TSI, discarding.\n", uname_);
00538     return;
00539   }
00540 
00541   // Check that the NCF came from our uplink interface.
00542   // Causes a problem because iface doesn't get relabeled.  But this isn't
00543   // needed if every receiver also has a PGM/Agent running on the node.
00544   /*
00545   if (hc->iface() != upstream_iface_) {
00546     printf("%s received NCF from non-upstream interface, discarding. Upstream_iface = %d\n", uname_, upstream_iface_);
00547     return;
00548   }
00549   */
00550 
00551   // Locate the nak state for the given NCF.
00552   map<int, NakItem>::iterator result = naks_.find(hp->seqno_);
00553 
00554   if (result == naks_.end()) {
00555     // No state was found. Discard the NCF.
00556     printf("%s received NCF but no NAK state found, discarding.\n", uname_);
00557     return;
00558   }  
00559 
00560   NakItem *nitem = &((*result).second);
00561 
00562   switch( nitem->nak_state() ) {
00563   case BACK_OFF_STATE:
00564     // Move to WAIT_DATA_STATE.
00565     nitem->nak_state() = WAIT_DATA_STATE;
00566 
00567     // Reset the timer.
00568     nitem->nak_timer().resched(nak_rdata_ivl_);
00569 
00570     break;
00571   case WAIT_NCF_STATE:
00572     // Move to WAIT_DATA_STATE.
00573     nitem->nak_state() = WAIT_DATA_STATE;
00574 
00575     // Reset the timer.
00576     nitem->nak_timer().resched(nak_rdata_ivl_);
00577     break;
00578   case WAIT_DATA_STATE:
00579     // Stay in the same state.
00580 
00581     // Reset the timer.
00582     nitem->nak_timer().resched(nak_rdata_ivl_);
00583     
00584     break;
00585   default:
00586     printf("ERROR (PgmReceiver::handle_ncf): Unknown nak state %d.\n", nitem->nak_state());
00587     return;
00588   }
00589 
00590 }
00591 
00592 void PgmReceiver::generate_Nak(int seqno)
00593 {
00594 #ifdef PGM_DEBUG
00595   double now = Scheduler::instance().clock();
00596   printf("at %f %s generating NAK state for seqno %d.\n", now, uname_, seqno);
00597 #endif
00598 
00599   // Insert the given sequence number into the nak map.
00600   pair<map<int, NakItem>::iterator, bool> result;
00601 
00602   result = naks_.insert(pair<int, NakItem>(seqno, NakItem(this, seqno)));
00603 
00604   NakItem *nitem = &(result.first->second);
00605 
00606   if (result.second == true) {
00607     // New NAK entry was added. Select a backoff time period over nak_bo_ivl_.
00608     double backoff = Random::uniform(nak_bo_ivl_);
00609 
00610     // Set the data field of the nak timer.
00611     nitem->nak_timer().data() = nitem;
00612 
00613     // Set the NAK timer to expire in BACK_OFF_STATE with the selected time.
00614     nitem->nak_timer().resched(backoff);
00615 
00616     printf("backoff: %f\n", backoff);
00617     trace_event("DETECT", backoff);     //Detected Loss, will send NACK after backoff
00618   }
00619   else {
00620     printf("%s generate_Nak was called with NAK state already established, ignoring.\n", uname_);
00621   }
00622 
00623 }
00624 
00625 void PgmReceiver::cancel_Nak(int seqno, NakItem *nitem)
00626 {
00627 
00628   if (nitem == NULL) {
00629     // Look up the sequence number in the nak map.
00630     map<int, NakItem>::iterator result = naks_.find(seqno);
00631 
00632     if (result == naks_.end()) {
00633       // The NAK state was not found. This is fine since the handle_odata()
00634       // function calls cancel_Nak on all packets in case 
00635       return;
00636     }
00637 
00638     nitem = &((*result).second);
00639   }
00640 
00641   // Cancel the NAK timer.
00642   nitem->nak_timer().force_cancel();
00643 
00644   // Erase the item from the Nak map.
00645   if (!naks_.erase(seqno)) {
00646     printf("ERROR (PgmReceiver::cancel_Nak): Failed erasing seqno from nak map.\n");
00647   }
00648 
00649 }
00650 
00651 void PgmReceiver::timeout_nak(NakItem *nitem)
00652 {
00653 
00654   double backoff;
00655 
00656   switch(nitem->nak_state()) {
00657   case BACK_OFF_STATE:
00658 
00659     if (nitem->nak_sent() == false) {
00660       nitem->nak_sent() = true;
00661     }
00662     else {
00663       stats_.naks_duplicated_++;
00664     }
00665 
00666     // Move into WAIT_NCF_STATE.
00667     nitem->nak_state() = WAIT_NCF_STATE;
00668 
00669     // Set new timer to go off.
00670     nitem->nak_timer().resched(nak_rpt_ivl_);
00671 
00672     send_nak(nitem->seqno());
00673 
00674     break;
00675   case WAIT_NCF_STATE:
00676 
00677     // If we have exceeded the number of times we can retry this NAK,
00678     // then cancel.
00679     if (nitem->ncf_retry_count() > max_nak_ncf_retries_) {
00680       // Cancel this NAK generation. Remove all state associated with the
00681       // NAK, we have unrecoverable data loss.
00682       printf("%s reached max_nak_ncf_retries, stopping NAK generation.\n", uname_);
00683       cancel_Nak(nitem->seqno(), nitem);
00684       return;
00685     }
00686 
00687     nitem->ncf_retry_count() += 1;
00688 
00689     // Move into BACK_OFF_STATE
00690     nitem->nak_state() = BACK_OFF_STATE;
00691 
00692     // Set timer to go off.
00693     backoff = Random::uniform(nak_bo_ivl_);
00694 
00695     // Set the NAK timer to expire in BACK_OFF_STATE with the selected time.
00696     nitem->nak_timer().resched(backoff);
00697 
00698     break;
00699   case WAIT_DATA_STATE:
00700 
00701     // Exceeded the number of times we wait for RDATA for this confirmed NAK?
00702     if (nitem->data_retry_count() > max_nak_data_retries_) {
00703       printf("%s reached max_nak_data_retries, stopping NAK generation.\n", uname_);
00704       cancel_Nak(nitem->seqno(), nitem);
00705       return;
00706     }
00707 
00708     nitem->data_retry_count() += 1;
00709 
00710     // Move into BACK_OFF_STATE
00711     nitem->nak_state() = BACK_OFF_STATE;
00712 
00713     // Set timer to go off.
00714     backoff = Random::uniform(nak_bo_ivl_);
00715 
00716     // Set the NAK timer to expire in BACK_OFF_STATE with the selected time.
00717     nitem->nak_timer().resched(backoff);
00718 
00719     break;
00720   default:
00721     printf("ERROR (PgmReceiver::timeout_nak): Unknown NAK state %d.\n", nitem->nak_state());
00722     break;
00723   }
00724 
00725 }
00726 
00727 void PgmReceiver::send_nak(int seqno)
00728 {
00729   printf("%s send_nak is called.\n", uname_);
00730 
00731   Packet *nak_pkt = allocpkt();
00732   // Set the simulated size of the NAK packet.
00733   hdr_cmn *nak_hc = HDR_CMN(nak_pkt);
00734   nak_hc->size_ = sizeof(hdr_pgm) + sizeof(hdr_pgm_nak);
00735   nak_hc->ptype_ = PT_PGM;
00736 
00737   // Set the destination address to be our upstream node.
00738   hdr_ip *nak_hip = HDR_IP(nak_pkt);
00739   nak_hip->dst() = upstream_node_;
00740 
00741   // Set the color for NAK packets in nam.
00742   nak_hip->fid_ = 8;
00743 
00744   // Fill in the PGM header for the NAK packet.
00745   hdr_pgm *nak_hp = HDR_PGM(nak_pkt);
00746   nak_hp->type_ = PGM_NAK;
00747   nak_hp->tsi_ = tsi_;
00748   nak_hp->seqno_ = seqno;
00749 
00750   // Fill in the PGM NAK header for the NAK packet.
00751   hdr_pgm_nak *nak_hpn = HDR_PGM_NAK(nak_pkt);
00752   nak_hpn->source_ = source_;
00753   nak_hpn->group_ = group_;
00754 
00755   // Increment the statistical counter that keeps track of the number
00756   // of naks transmitted.
00757   stats_.naks_transmitted_++;
00758 
00759   // Send out the packet.
00760   send(nak_pkt, 0);
00761 
00762   // TBA: Send out the NAK packet to multicast with TTL 1 if the uplink
00763   //      PGM router is not directly connected to this node.
00764 }
00765 
00766 void PgmReceiver::print_stats()
00767 {
00768   printf("%s:\n", uname_);
00769   printf("\tLast packet:\t\t%d\n", rcvbuf_.nextpkt_-1);
00770   printf("\tMax packet:\t\t%d\n", rcvbuf_.maxpkt_);
00771   if (rcvbuf_.pkts_recovered_) {
00772     printf("\tPackets recovered:\t%d\n", rcvbuf_.pkts_recovered_);
00773     printf("\tLatency (min, max, avg):\t%lf, %lf, %lf\n",
00774        rcvbuf_.min_delay_, rcvbuf_.max_delay_,
00775        rcvbuf_.delay_sum_ / rcvbuf_.pkts_recovered_);
00776   }
00777   if (rcvbuf_.duplicates_) {
00778     printf("\tDuplicate RDATA:\t%d\n", rcvbuf_.duplicates_);
00779   }
00780   printf("\tTotal NAKs sent:\t%d\n", stats_.naks_transmitted_);
00781   printf("\tRetransmitted NAKs:\t%d\n", stats_.naks_duplicated_);
00782 }
00783 
00784 #ifdef PGM_DEBUG
00785 void PgmReceiver::display_packet(Packet *pkt)
00786 {
00787 
00788   double now = Scheduler::instance().clock();
00789 
00790   hdr_ip *hip = HDR_IP(pkt);
00791   hdr_cmn *hc = HDR_CMN(pkt);
00792 
00793   printf("at %f %s received packet type ", now, uname_);
00794 
00795   hdr_pgm *hp = HDR_PGM(pkt);
00796   
00797   hdr_pgm_spm *hps;
00798   hdr_pgm_nak *hpn;
00799 
00800   switch(hp->type_) {
00801   case PGM_SPM:
00802     hps = HDR_PGM_SPM(pkt);
00803 
00804     printf("SPM (TSI %d:%d) from %d:%d to %d:%d iface %d, size %d, seqno %d, spm_path %d:%d\n", hp->tsi_.addr_, hp->tsi_.port_, hip->saddr(), hip->sport(), hip->daddr(), hip->dport(), hc->iface(), hc->size(), hp->seqno_, hps->spm_path_.addr_, hps->spm_path_.port_);
00805 
00806     break;
00807   case PGM_ODATA:
00808     printf("ODATA (TSI %d:%d) from %d:%d to %d:%d iface %d, size %d, seqno %d\n", hp->tsi_.addr_, hp->tsi_.port_, hip->saddr(), hip->sport(), hip->daddr(), hip->dport(), hc->iface(), hc->size(), hp->seqno_);
00809 
00810     break;
00811   case PGM_RDATA:
00812     printf("RDATA (TSI %d:%d) from %d:%d to %d:%d iface %d, size %d, seqno %d\n", hp->tsi_.addr_, hp->tsi_.port_, hip->saddr(), hip->sport(), hip->daddr(), hip->dport(), hc->iface(), hc->size(), hp->seqno_);
00813 
00814     break;
00815   case PGM_NAK:
00816     hpn = HDR_PGM_NAK(pkt);
00817 
00818     printf("NAK (TSI %d:%d) from %d:%d to %d:%d iface %d, size %d, seqno %d, source %d:%d, group %d:%d\n", hp->tsi_.addr_, hp->tsi_.port_, hip->saddr(), hip->sport(), hip->daddr(), hip->dport(), hc->iface(), hc->size(), hp->seqno_, hpn->source_.addr_, hpn->source_.port_, hpn->group_.addr_, hpn->group_.port_);
00819 
00820     break;
00821   case PGM_NCF:
00822     printf("NCF (TSI %d:%d) from %d:%d to %d:%d iface %d, size %d, seqno %d\n", hp->tsi_.addr_, hp->tsi_.port_, hip->saddr(), hip->sport(), hip->daddr(), hip->dport(), hc->iface(), hc->size(), hp->seqno_);
00823 
00824     break;
00825   default:
00826     printf("UNKNOWN (TSI %d:%d) from %d:%d to %d:%d iface %d, size %d, seqno %d\n", hp->tsi_.addr_, hp->tsi_.port_, hip->saddr(), hip->sport(), hip->daddr(), hip->dport(), hc->iface(), hc->size(), hp->seqno_);
00827 
00828     break;
00829   }
00830 
00831 }
00832 #endif // PGM_DEBUG
00833 
00834 #endif //HAVE_STL

Generated on Tue Mar 6 16:47:49 2007 for ns2 Network Simulator 2.29 by  doxygen 1.4.6