pgm-agent.cc

Go to the documentation of this file.
00001 
00002 /*
00003  * pgm-agent.cc
00004  * Copyright (C) 2001 by the University of Southern California
00005  * $Id: pgm-agent.cc,v 1.9 2005/08/25 18:58:10 johnh Exp $
00006  *
00007  * This program is free software; you can redistribute it and/or
00008  * modify it under the terms of the GNU General Public License,
00009  * version 2, as published by the Free Software Foundation.
00010  *
00011  * This program is distributed in the hope that it will be useful,
00012  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00013  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00014  * GNU General Public License for more details.
00015  *
00016  * You should have received a copy of the GNU General Public License along
00017  * with this program; if not, write to the Free Software Foundation, Inc.,
00018  * 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA.
00019  *
00020  *
00021  * The copyright of this module includes the following
00022  * linking-with-specific-other-licenses addition:
00023  *
00024  * In addition, as a special exception, the copyright holders of
00025  * this module give you permission to combine (via static or
00026  * dynamic linking) this module with free software programs or
00027  * libraries that are released under the GNU LGPL and with code
00028  * included in the standard release of ns-2 under the Apache 2.0
00029  * license or under otherwise-compatible licenses with advertising
00030  * requirements (or modified versions of such code, with unchanged
00031  * license).  You may copy and distribute such a system following the
00032  * terms of the GNU GPL for this module and the licenses of the
00033  * other code concerned, provided that you include the source code of
00034  * that other code when and as the GNU GPL requires distribution of
00035  * source code.
00036  *
00037  * Note that people who make modified versions of this module
00038  * are not obligated to grant this special exception for their
00039  * modified versions; it is their choice whether to do so.  The GNU
00040  * General Public License gives permission to release a modified
00041  * version without this exception; this exception also makes it
00042  * possible to release a modified version which carries forward this
00043  * exception.
00044  *
00045  */
00046 
00047 /*
00048  * Pragmatic General Multicast (PGM), Reliable Multicast
00049  *
00050  * pgm-agent.cc
00051  *
00052  * This implements the network element PGM agent, "Agent/PGM".
00053  *
00054  * Ryan S. Barnett, 2001
00055  * rbarnett@catarina.usc.edu
00056  */
00057 
00058 #include "config.h"
00059 #ifdef HAVE_STL
00060 
00061 #include <stdlib.h>
00062 #include <stdio.h>
00063 
00064 #include <map>
00065 #include <list>
00066 #include <algorithm>
00067 
00068 #include "config.h"
00069 #include "tclcl.h"
00070 #include "agent.h"
00071 #include "packet.h"
00072 #include "ip.h"
00073 #include "random.h"
00074 #include "basetrace.h"
00075 
00076 #include "pgm.h"
00077 
00078 // ************************************************************
00079 // Register the PGM packet headers.
00080 // ************************************************************
00081 
00082 // Declare the static header offsets.
00083 int hdr_pgm::offset_;
00084 int hdr_pgm_spm::offset_;
00085 int hdr_pgm_nak::offset_;
00086 
00087 // Register the hdr_pgm with the packet header manager.
00088 class PGMHeaderClass : public PacketHeaderClass {
00089 public:
00090   PGMHeaderClass() : PacketHeaderClass("PacketHeader/PGM", sizeof(hdr_pgm)) {
00091     bind_offset(&hdr_pgm::offset_);
00092   }
00093 
00094 } class_pgmhdr;
00095 
00096 // Register the hdr_pgm_spm with the packet header manager.
00097 class PGM_SPMHeaderClass : public PacketHeaderClass {
00098 public:
00099   PGM_SPMHeaderClass() : PacketHeaderClass("PacketHeader/PGM_SPM",
00100                        sizeof(hdr_pgm_spm)) {
00101     bind_offset(&hdr_pgm_spm::offset_);
00102   }
00103 
00104 } class_pgm_spmhdr;
00105 
00106 // Register the hdr_pgm_nak with the packet header manager.
00107 class PGM_NAKHeaderClass : public PacketHeaderClass {
00108 public:
00109   PGM_NAKHeaderClass() : PacketHeaderClass("PacketHeader/PGM_NAK",
00110                        sizeof(hdr_pgm_nak)) {
00111     bind_offset(&hdr_pgm_nak::offset_);
00112   }
00113 
00114 } class_pgm_nakhdr;
00115 
00116 
00117 // ************************************************************
00118 // Define the PGM Agent Timer Class
00119 // ************************************************************
00120 class PgmAgent;
00121 
00122 // Different timer types.
00123 enum {
00124   TIMER_NAK_RETRANS = 0,
00125   TIMER_NAK_RPT = 1,
00126   TIMER_NAK_RDATA = 2,
00127   TIMER_NAK_ELIM = 3
00128 };
00129 
00130 class PgmAgentTimer : public TimerHandler {
00131 public:
00132   PgmAgentTimer(PgmAgent *a, int type) : TimerHandler(), data_(NULL) {
00133     a_ = a;
00134     type_ = type;
00135   }
00136 
00137   void * &data() { return data_; }
00138 
00139 protected:
00140   virtual void expire(Event *e);
00141   PgmAgent *a_;
00142   int type_;
00143   void *data_;
00144 };
00145 
00146 // ************************************************************
00147 // Define the Repair State control block.
00148 // ************************************************************
00149 
00150 // Different repair states.
00151 enum {
00152   NAK_PENDING = 0,
00153   NAK_CONFIRMED = 1
00154 };
00155 
00156 class StateInfo;
00157 
00158 class RepairState {
00159 public:
00160   RepairState(PgmAgent *a, StateInfo *sinfo, int seqno, ns_addr_t &source,
00161           ns_addr_t &group) : seqno_(seqno), source_(source),
00162                   group_(group),
00163         nak_state_(NAK_PENDING), nak_elimination_(true),
00164     nak_retrans_timer_(a, TIMER_NAK_RETRANS),
00165     nak_rpt_timer_(a, TIMER_NAK_RPT),
00166     nak_rdata_timer_(a, TIMER_NAK_RDATA),
00167     nak_elim_timer_(a, TIMER_NAK_ELIM),
00168         sinfo_(sinfo)
00169   { }
00170 
00171   int & seqno() { return seqno_; }
00172   ns_addr_t & source() { return source_; }
00173   ns_addr_t & group() { return group_; }
00174 
00175   int & nak_state() { return nak_state_; }
00176   bool & nak_elimination() { return nak_elimination_; }
00177   PgmAgentTimer & nak_retrans_timer() { return nak_retrans_timer_; }
00178   PgmAgentTimer & nak_rpt_timer() { return nak_rpt_timer_; }
00179   PgmAgentTimer & nak_rdata_timer() { return nak_rdata_timer_; }
00180   PgmAgentTimer & nak_elim_timer() { return nak_elim_timer_; }
00181   list<int> & iface_list() { return iface_list_; }
00182   list<NsObject *> & agent_list() { return agent_list_; }
00183   StateInfo * & sinfo() { return sinfo_; }
00184 
00185 protected:
00186 
00187   // Which sequence number is being requested for repair.
00188   int seqno_;
00189 
00190   ns_addr_t source_; // Original source of ODATA for the repair.
00191   ns_addr_t group_; // The multicast group.
00192 
00193   int nak_state_; // Present repair block state.
00194 
00195   // Indicates whether or not we are to discard incoming NAK packets
00196   // once a previous NAK is outstanding (got NCF, waiting for RDATA). 
00197   // (See 7.4 of PGM specification) By default we do. When nak_elim_timer_
00198   // expires, then we do not.
00199   bool nak_elimination_;
00200 
00201   // This timer controls sending retransmissions of NAK packets.
00202   PgmAgentTimer nak_retrans_timer_;
00203 
00204   // Timer that measures how long we can repeat NAK packets while waiting
00205   // for NCF confirmation. Once expires, the repair state is removed.
00206   PgmAgentTimer nak_rpt_timer_;
00207 
00208   // Timer that is triggered waiting for RDATA for a given NAK seqno,
00209   // provided that NAK has been confirmed by an NCF.  Only gets set
00210   // once NCF is received.
00211   PgmAgentTimer nak_rdata_timer_;
00212 
00213   // Timer that is triggered when we disable nak_elimination_, allowing
00214   // a duplicate NAK to be processed. This occurs after a previous
00215   // NAK has been confirmed with an NCF, but before the RDATA has been
00216   // received.  This timer gets set when we receive an NCF for a pending
00217   // NAK.
00218   PgmAgentTimer nak_elim_timer_;
00219 
00220   // List of interfaces upon which the RDATA will be sent to.
00221   list<int> iface_list_;
00222 
00223   // List of agents upon which the RDATA will be sent to.
00224   list<NsObject *> agent_list_;
00225 
00226   // Back-pointer to the state information block that is holding this
00227   // repair data.  We use this so we can get the upstream_path and the TSI.
00228   StateInfo *sinfo_;
00229 
00230 };
00231 
00232 // ************************************************************
00233 // Define the TSI State control block.
00234 // ************************************************************
00235 
00236 class StateInfo {
00237 public:
00238 
00239   StateInfo(ns_addr_t tsi) : tsi_(tsi), spm_seqno_(-1) { }
00240 
00241   // Only used if the container holding StateInfo's will be in sorted order.
00242   int operator<(const StateInfo &right) const {
00243     return ((tsi_.addr_ < right.tsi_.addr_) || ( (tsi_.addr_==right.tsi_.addr_) && (tsi_.port_ < right.tsi_.port_)));
00244   }
00245 
00246   ns_addr_t & tsi() { return tsi_; }
00247   int & spm_seqno() { return spm_seqno_; }
00248   ns_addr_t & upstream_node() { return upstream_node_; }
00249   int & upstream_iface() { return upstream_iface_; }
00250   map<int, RepairState> & repair() { return repair_; }
00251 
00252 protected:
00253 
00254   ns_addr_t tsi_; // Transport Session ID
00255   int spm_seqno_; // Most recent SPM sequence number.
00256   ns_addr_t upstream_node_; // Upstream node address.
00257   int upstream_iface_; // Upstream interface number.
00258 
00259   // Map between a NAK sequence number and the corresponding repair state
00260   // for that sequence number.
00261   map<int, RepairState> repair_;
00262 
00263 };
00264 
00265 
00266 // ************************************************************
00267 // Define the PGM Agent Class
00268 // ************************************************************
00269 
00270 // Structure to hold statistical information for PGM Agent.
00271 struct Stats {
00272   int num_unsolicited_ncf_;
00273   int num_unsolicited_rdata_;
00274   int num_suppressed_naks_;
00275   int num_naks_transmitted_;
00276 };
00277 
00278 // Used to count number of unique pgm agents.
00279 static int pgm_agent_uid_ = 0;
00280 
00281 class PgmAgent : public Agent {
00282 public:
00283   PgmAgent();
00284   virtual void recv(Packet *, Handler *);
00285   virtual void timeout(int type, void *data);
00286   virtual int command(int argc, const char*const* argv);
00287 
00288 protected:
00289 
00290   void handle_spm(Packet *pkt);
00291   void handle_odata(Packet *pkt);
00292   void handle_rdata(Packet *pkt);
00293   void handle_nak(Packet *pkt);
00294   void handle_ncf(Packet *pkt);
00295 
00296   void send_nak(ns_addr_t &upstream_node, ns_addr_t &tsi, int seqno, ns_addr_t &source, ns_addr_t &group);
00297 
00298   void timeout_nak_retrans(RepairState *rstate);
00299   void timeout_nak_rpt(RepairState *rstate);
00300   void timeout_nak_rdata(RepairState *rstate);
00301   void timeout_nak_elim(RepairState *rstate);
00302 
00303   void remove_repair_state(RepairState *rstate);
00304 
00305   void print_stats();
00306 
00307   void PgmAgent::trace_event(char *evType, double evTime);
00308 
00309 #ifdef PGM_DEBUG
00310   void display_packet(Packet *pkt);
00311 #endif
00312 
00313   NsObject* iface2link(int iface);
00314   NsObject* pkt2agent (Packet *pkt);
00315 
00316   StateInfo * find_TSI(ns_addr_t &tsi);
00317   StateInfo * insert_TSI(ns_addr_t &tsi);
00318 
00319   EventTrace * et_; //Trace Object for custom event trace
00320 
00321   int pgm_enabled_; // Is this agent enabled? Default is YES.
00322 
00323   char uname_[16]; // Agent's unique name.
00324 
00325   Stats stats_; // Statistical information.
00326 
00327   // TSI-indexed state control block list.
00328   list<StateInfo> state_list_;
00329 
00330   // Number of seconds to wait between retransmitting a NAK that is waiting
00331   // for a NCF packet.
00332   double nak_retrans_ival_;
00333 
00334   // The length of time for which a network element will continue to repeat
00335   // NAKs while waiting for a corresponding NCF.  Once this time expires and
00336   // no NCF is received, then we remove the entire repair state.
00337   double nak_rpt_ival_;
00338 
00339   // The length of time for which a network element will wait for the
00340   // corresponding RDATA before removing the entire repair state.
00341   double nak_rdata_ival_;
00342 
00343   // Once a NAK has been confirmed, network elements must discard all
00344   // further NAKs for up to this length of time.  Should be a fraction
00345   // of nak_rdata_ival_.
00346   double nak_elim_ival_;
00347 
00348 };
00349 
00350 static class PgmClass : public TclClass {
00351 public:
00352   PgmClass() : TclClass("Agent/PGM") {}
00353   TclObject * create(int argc, const char * const * argv) {
00354     return (new PgmAgent());
00355   }
00356 } class_pgm_agent;
00357 
00358 void PgmAgentTimer::expire(Event *e) {
00359   a_->timeout(type_, data_);
00360 }
00361 
00362 // Constructor.
00363 PgmAgent::PgmAgent() : Agent(PT_PGM), pgm_enabled_(1)
00364 {
00365   // Set the unique identifier.
00366   sprintf (uname_, "pgmAgent-%d", pgm_agent_uid_++);
00367 
00368   // Initialize statistics.
00369   stats_.num_unsolicited_ncf_ = 0;
00370   stats_.num_unsolicited_rdata_ = 0;
00371   stats_.num_suppressed_naks_ = 0;
00372   stats_.num_naks_transmitted_ = 0;
00373 
00374   bind("pgm_enabled_", &pgm_enabled_);
00375   bind_time("nak_retrans_ival_", &nak_retrans_ival_);
00376   bind_time("nak_rpt_ival_", &nak_rpt_ival_);
00377   bind_time("nak_rdata_ival_", &nak_rdata_ival_);
00378   bind_time("nak_elim_ival_", &nak_elim_ival_);
00379 
00380   et_ = (EventTrace *) NULL;
00381 }
00382 
00383 // Code to execute when a packet is received.
00384 void PgmAgent::recv(Packet* pkt, Handler*)
00385 {
00386   hdr_pgm *hp = HDR_PGM(pkt);
00387 
00388   if (!pgm_enabled_) {
00389     target_->recv(pkt);
00390     return;
00391   }
00392 
00393   hdr_cmn *hc = HDR_CMN(pkt);
00394 
00395   if (hc->ptype_ != PT_PGM) {
00396     printf("%s ERROR (PgmAgent::recv): received non PGM pkt type %d, discarding.\n", uname_, hc->ptype_);
00397     Packet::free(pkt);
00398     return;
00399   }
00400 
00401 #ifdef PGM_DEBUG
00402   display_packet(pkt);
00403 #endif
00404 
00405   // Note, each handle function will free the packet itself or modify the
00406   // headers and pass it on to the next NS object.
00407 
00408   switch(hp->type_) {
00409   case PGM_SPM:
00410     handle_spm(pkt);
00411     break;
00412   case PGM_ODATA:
00413     handle_odata(pkt);
00414     break;
00415   case PGM_RDATA:
00416     handle_rdata(pkt);
00417     break;
00418   case PGM_NAK:
00419     handle_nak(pkt);
00420     break;
00421   case PGM_NCF:
00422     handle_ncf(pkt);
00423     break;
00424   default:
00425     printf("ERROR (PgmAgent::recv): Received PGM packet with unknown type %d.\n", hp->type_);
00426 
00427     Packet::free(pkt);
00428 
00429     break;
00430   }
00431 
00432 }
00433 
00434 // Code to execute when a timeout occurs.
00435 void PgmAgent::timeout(int type, void *data)
00436 {
00437 
00438   switch(type) {
00439   case TIMER_NAK_RETRANS:
00440     timeout_nak_retrans((RepairState *) data);
00441     break;
00442   case TIMER_NAK_RPT:
00443     timeout_nak_rpt((RepairState *) data);
00444     break;
00445   case TIMER_NAK_RDATA:
00446     timeout_nak_rdata((RepairState *) data);
00447     break;
00448   case TIMER_NAK_ELIM:
00449     timeout_nak_elim((RepairState *) data);
00450     break;
00451   default:
00452     printf("ERROR (PgmAgent::timeout()): Invalid timeout type %d.\n", type);
00453     break;
00454   }
00455 
00456 }
00457 
00458 // Code to execute when a TCL command is issued to the PGM Agent object.
00459 int PgmAgent::command (int argc, const char*const* argv)
00460 {
00461   //  Tcl& tcl = Tcl::instance();
00462 
00463   if (argc == 2) {
00464     if (strcmp(argv[1], "print-stats") == 0) {
00465       print_stats();
00466       return (TCL_OK);
00467     }
00468   }
00469   else if (argc == 3) { //Set the Event Trace handle if Event Tracing is on
00470     if (strcmp(argv[1], "eventtrace") == 0) {
00471       et_ = (EventTrace *)TclObject::lookup(argv[2]);
00472       return (TCL_OK);
00473     }
00474   }
00475 
00476   return (Agent::command(argc, argv));
00477 }      
00478 
00479 void PgmAgent::trace_event(char *evType, double evTime) {
00480 
00481   if (et_ == NULL) return;
00482   char *wrk = et_->buffer();
00483   char *nwrk = et_->nbuffer();
00484 
00485   if (wrk != NULL) {
00486     sprintf(wrk, "E "TIME_FORMAT" %d %d PGM %s "TIME_FORMAT, 
00487             et_->round(Scheduler::instance().clock()),   
00488             addr(),                    
00489             0,                   
00490             evType,                  
00491             evTime);    
00492   if (nwrk != 0)
00493     sprintf(nwrk,
00494             "E -t "TIME_FORMAT" -o PGM -e %s -s %d.%d -d %d.%d",
00495             et_->round(Scheduler::instance().clock()),   // time
00496             evType,                    // event type
00497             addr(),                       // owner (src) node id
00498             port(),                       // owner (src) port id
00499             0,                      // dst node id
00500             0                       // dst port id
00501             );
00502     et_->dump();
00503   }
00504 
00505 }
00506 
00507 void PgmAgent::handle_spm(Packet *pkt)
00508 {
00509 
00510   hdr_pgm *hp = HDR_PGM(pkt);
00511   hdr_pgm_spm *hs = HDR_PGM_SPM(pkt);
00512   hdr_cmn *hc = HDR_CMN(pkt);
00513 
00514   // Use the TSI from the SPM packet and locate the proper state block.
00515   StateInfo *state = find_TSI(hp->tsi_);
00516 
00517   if (state == NULL) {
00518     // There is no state block for this TSI. Create new state.
00519     state = insert_TSI(hp->tsi_);
00520 
00521     // Set the sequence number.
00522     state->spm_seqno() = hp->seqno_;
00523     // Set the upstream path.
00524     state->upstream_node() = hs->spm_path_;
00525     state->upstream_iface() = hc->iface();
00526   }
00527   else {
00528     // State already exists for this TSI. Check if the sequence number is
00529     // newer than the last recorded sequence number.
00530 
00531     if ( state->spm_seqno() < hp->seqno_ ) {
00532       // Update the SPM sequence number.
00533       state->spm_seqno() = hp->seqno_;
00534       // Set the upstream path.
00535       state->upstream_node() = hs->spm_path_;
00536       state->upstream_iface() = hc->iface();
00537     }
00538     else {
00539       printf("%s received an old SPM seqno, discarding.\n", uname_);
00540       Packet::free(pkt);
00541       return;
00542     }
00543   }
00544 
00545   // Modify the SPM packet and set the upstream path to be equal to
00546   // the address of this agent.
00547   hs->spm_path_ = here_;
00548 
00549   // Send the modified packet off to the rest of the multicast group.
00550   send(pkt, 0);
00551 }
00552 
00553 void PgmAgent::handle_odata(Packet *pkt)
00554 {
00555   // Pass the ODATA along to the rest of the multicast group.  ODATA
00556   // does not cancel NAK forwarding.
00557 
00558   //hdr_cmn *hc = HDR_CMN(pkt);
00559 
00560   send(pkt, 0);
00561 }
00562 
00563 void PgmAgent::handle_rdata(Packet *pkt)
00564 {
00565 
00566   // Look for the TSI for this RDATA packet.
00567   hdr_pgm *hp = HDR_PGM(pkt);
00568   //  hdr_ip *hip = HDR_IP(pkt);
00569 
00570   StateInfo *state = find_TSI(hp->tsi_);
00571   if (state == NULL) {
00572     printf("%s received RDATA for which no SPM state is established, discarding.\n", uname_);
00573     stats_.num_unsolicited_rdata_++;
00574     Packet::free(pkt);
00575     return;
00576   }
00577 
00578   // Look for the repair state for this RDATA packet.
00579   map<int, RepairState>::iterator result = state->repair().find(hp->seqno_);
00580 
00581   if (result == state->repair().end()) {
00582     // No repair state present for this RDATA packet.
00583     printf("%s received RDATA for which no repair state is present, discarding.\n", uname_);
00584     stats_.num_unsolicited_rdata_++;
00585     Packet::free(pkt);
00586     return;
00587   }
00588 
00589   RepairState *rstate = &((*result).second);
00590 
00591   // Get the interface list for the repair state. For each interface, send
00592   // out the RDATA packet.  Similarly for each agent that is also receiving
00593   // RDATA attached to this node.
00594 
00595   if (rstate->iface_list().empty() && rstate->agent_list().empty()) {
00596     printf("%s received RDATA but repair state has no interfaces for it, discarding.\n", uname_);
00597 
00598     stats_.num_unsolicited_rdata_++;
00599 
00600     Packet::free(pkt);
00601   }
00602 
00603   NsObject *tgt;
00604   Packet *new_pkt;
00605   int flag = 0;
00606 
00607   trace_event("SEND RDATA", 0); //Repair is being forwarded
00608 
00609   //  hdr_cmn *hc = HDR_CMN(pkt);
00610 
00611   if (!rstate->iface_list().empty()) {
00612     list<int>::iterator iter = rstate->iface_list().begin();
00613 
00614     while (iter != rstate->iface_list().end()) {
00615       if (!flag) {
00616     tgt = iface2link(*iter);
00617     if (tgt == NULL) {
00618       printf("ERROR (PgmAgent::handle_rdata): iface2link returned NULL.\n");
00619       abort();
00620     }
00621     tgt->recv(pkt);
00622     flag = 1;
00623       }
00624       else {
00625     // Make a copy of each packet before sending it, so we don't free()
00626     // the same packet at the different receivers causing a deallocation
00627     // problem.
00628     new_pkt = pkt->copy();
00629     tgt = iface2link(*iter);
00630     if (tgt == NULL) {
00631       printf("ERROR (PgmAgent::handle_rdata): iface2link returned NULL.\n");
00632       abort();
00633     }
00634     tgt->recv(new_pkt);
00635       }
00636 
00637       iter++;
00638     }
00639   }
00640 
00641   if (!rstate->agent_list().empty()) {
00642     list<NsObject *>::iterator iter = rstate->agent_list().begin();
00643 
00644     while (iter != rstate->agent_list().end()) {
00645       if (!flag) {
00646     (*iter)->recv(pkt);
00647     flag = 1;
00648       }
00649       else {
00650     // Make a copy of each packet before sending it, so we don't free()
00651     // the same packet at the different receivers causing a deallocation
00652     // problem.
00653     new_pkt = pkt->copy();
00654     (*iter)->recv(new_pkt);
00655       }
00656 
00657       iter++;
00658     }
00659   }
00660 
00661   // Remove the repair state for this RDATA sequence number, since we sent
00662   // out the repairs.
00663   remove_repair_state(&((*result).second));
00664 
00665 }
00666 
00667 void PgmAgent::handle_nak(Packet *pkt)
00668 {
00669 
00670   hdr_pgm *hp = HDR_PGM(pkt);
00671   hdr_pgm_nak *hpn = HDR_PGM_NAK(pkt);
00672   hdr_cmn *hc = HDR_CMN(pkt);
00673   //  hdr_ip *hip = HDR_IP(pkt);
00674 
00675   // Check to see if there is a state control block for the given TSI.
00676   StateInfo *state = find_TSI(hp->tsi_);
00677 
00678   if (state == NULL) {
00679     printf("PGM Agent received NAK for which no SPM state is established, discarding.\n");
00680     Packet::free(pkt);
00681     return;
00682   }
00683 
00684   // Create an NCF packet in response to the NAK packet.
00685   Packet *ncf_pkt = allocpkt();
00686   hdr_cmn *ncf_hc = HDR_CMN(ncf_pkt);
00687   ncf_hc->size_ = sizeof(hdr_pgm); // Size of NCF packet.
00688   ncf_hc->ptype_ = PT_PGM;
00689   hdr_pgm *ncf_hp = HDR_PGM(ncf_pkt);
00690   ncf_hp->type_ = PGM_NCF;
00691   ncf_hp->tsi_ = hp->tsi_;
00692   ncf_hp->seqno_ = hp->seqno_;
00693 
00694   // Change the source of the NCF packet to be the original ODATA source.
00695   hdr_ip *ncf_ip = HDR_IP(ncf_pkt);
00696   ncf_ip->src() = hpn->source_;
00697   // Set the destination to be the multicast group.
00698   ncf_ip->dst() = hpn->group_;
00699 
00700   // Set the color of NCF packets in nam to be green.
00701   ncf_ip->fid_ = 6;
00702 
00703   // Send out the NCF to the interface (or agent) for which the NAK was
00704   // received.
00705   NsObject *tgt;
00706 
00707   if (hc->iface() < 0) {
00708     tgt = pkt2agent(pkt);
00709     if (tgt == NULL) {
00710       printf("ERROR: (PgmAgent::handle_nak) pkt2agent returned NULL.\n");
00711       abort();
00712     }
00713     tgt->recv(ncf_pkt);
00714   }
00715   else {
00716     tgt = iface2link(hc->iface());
00717     if (tgt == NULL) {
00718       printf("ERROR: (PgmAgent::handle_nak) iface2link returned NULL.\n");
00719       abort();
00720     }
00721     tgt->recv(ncf_pkt);
00722   }
00723 
00724   // Create repair state for the NAK query. Associate the sequence number
00725   // of the NAK packet with the interface where the packet was received.
00726   pair<map<int, RepairState>::iterator, bool> result;
00727 
00728   result = state->repair().insert(pair<int, RepairState>(hp->seqno_, RepairState(this, state, hp->seqno_, hpn->source_, hpn->group_)));
00729 
00730   RepairState *rstate = &(result.first->second);
00731 
00732   if (result.second == true) {
00733     // There was no previous repair state for the given NAK seqno.
00734     // This must be a new NAK.
00735 
00736     // Set the data fields of the timer.
00737     rstate->nak_retrans_timer().data() = rstate;
00738     rstate->nak_rpt_timer().data() = rstate;
00739     rstate->nak_rdata_timer().data() = rstate;
00740     rstate->nak_elim_timer().data() = rstate;
00741 
00742     // Add the interface (or agent) to the interface list.
00743     if (hc->iface() < 0) {
00744       rstate->agent_list().push_back(pkt2agent(pkt));
00745     }
00746     else {
00747       rstate->iface_list().push_back(hc->iface());
00748     }
00749 
00750     // Start the nak retransmission cycle time.
00751     rstate->nak_retrans_timer().resched(nak_retrans_ival_);
00752 
00753     // Set the nak repeat interval.
00754     rstate->nak_rpt_timer().resched(nak_rpt_ival_);
00755 
00756     trace_event("SEND NACK", nak_rpt_ival_); //Nack being Sent, Nack will refire after ival
00757 
00758     // Don't set the RDATA timer until the NCF is received.
00759     // Don't set the elimintation timer until the NCF is received.
00760 
00761     // We're now in the NAK_PENDING state.
00762   }
00763   else {
00764     // There was previous repair state for the given NAK seqno.
00765 
00766     if (hc->iface() < 0) {
00767       // Scan the agent list to see if the agent is in the list already
00768       // for this repair state.
00769       list<NsObject *> *agent_list = &(rstate->agent_list());
00770       list<NsObject *>::iterator res = find(agent_list->begin(), agent_list->end(), pkt2agent(pkt));
00771 
00772       if (res == agent_list->end()) {
00773     agent_list->push_back(pkt2agent(pkt));
00774       }
00775 
00776     }
00777     else {
00778 
00779       // Scan the interface list to see if the interface is in the list
00780       // already for this repair state.
00781       list<int> *iface_list = &(rstate->iface_list());
00782       list<int>::iterator res = find(iface_list->begin(), iface_list->end(), hc->iface());
00783 
00784       if (res == iface_list->end()) {
00785     // Interface not found in iface list for this NAK, add it.
00786     iface_list->push_back(hc->iface());
00787       }
00788     }
00789 
00790     // If the NAK elimination timer has expired, then we are allowed to
00791     // send another NAK to our upstream.
00792     if (rstate->nak_elimination() == false) {
00793       rstate->nak_state() = NAK_PENDING;
00794       // Start the nak retransmission cycle time.
00795       rstate->nak_retrans_timer().resched(nak_retrans_ival_);
00796       rstate->nak_rpt_timer().resched(nak_rpt_ival_);
00797 
00798       // Disable the rdata and elim timer if they were previously running.
00799       rstate->nak_rdata_timer().force_cancel();
00800       rstate->nak_elim_timer().force_cancel();
00801 
00802       rstate->nak_elimination() = true;
00803 
00804 #ifdef PGM_DEBUG
00805       printf("%s: Got NAK for seqno %d with previous NAK state, accepted.\n",
00806          uname_, hp->seqno_);
00807 #endif
00808     }
00809     else {
00810       // NAK elimination requires us to not act on this duplicate NAK packet.
00811 #ifdef PGM_DEBUG
00812       printf("%s: Got NAK for seqno %d but have previous NAK state, discarding.\n", uname_, hp->seqno_);
00813 #endif
00814       stats_.num_suppressed_naks_++;
00815       Packet::free(pkt);
00816       return;
00817     }
00818 
00819   }
00820 
00821   stats_.num_naks_transmitted_++;
00822 
00823   // Send the NAK packet to our upstream
00824   send_nak(state->upstream_node(), hp->tsi_, hp->seqno_, hpn->source_, hpn->group_);
00825 
00826   Packet::free(pkt);
00827 }
00828 
00829 void PgmAgent::handle_ncf(Packet *pkt)
00830 {
00831 
00832   hdr_pgm *hp = HDR_PGM(pkt);
00833   hdr_cmn *hc = HDR_CMN(pkt);
00834   hdr_ip *hip = HDR_IP(pkt);
00835 
00836   // Locate the state control block for this TSI.
00837   StateInfo *state = find_TSI(hp->tsi_);
00838 
00839   if (state == NULL) {
00840     printf("%s received NCF for which no SPM state is established, discarding.\n", uname_);
00841     stats_.num_unsolicited_ncf_++;
00842     Packet::free(pkt);
00843     return;
00844   }
00845 
00846   if (hc->iface() != state->upstream_iface()) {
00847     printf("%s received NCF from non-upstream interface, discarding.\n", uname_);
00848     stats_.num_unsolicited_ncf_++;
00849     Packet::free(pkt);
00850     return;
00851   }
00852 
00853   trace_event("SEND NCF", 0);
00854 
00855   // Look for the repair state for this NCF packet.
00856   map<int, RepairState>::iterator result = state->repair().find(hp->seqno_);
00857   RepairState *rstate;
00858 
00859   if (result == state->repair().end()) {
00860     // No repair state present for this NCF packet.
00861 
00862     // Since the interface for this NCF packet comes from the same interface
00863     // used to reach our upstream node, we can create empty repair state.
00864     // This is NAK Anticipation (see 7.5 in PGM specification).
00865 
00866     pair<map<int, RepairState>::iterator, bool> res;
00867     res = state->repair().insert(pair<int, RepairState>(hp->seqno_, RepairState(this, state, hp->seqno_, hip->src(), hip->dst())));
00868 
00869     rstate = &(res.first->second);
00870 
00871     // Set the data field of the timers.
00872     rstate->nak_retrans_timer().data() = rstate;
00873     rstate->nak_rpt_timer().data() = rstate;
00874     rstate->nak_rdata_timer().data() = rstate;
00875     rstate->nak_elim_timer().data() = rstate;
00876 
00877     stats_.num_unsolicited_ncf_++;
00878   }
00879   else {
00880     rstate = &((*result).second);
00881 
00882     // Disable either of the retransmission or repeat interval timers since
00883     // the NAK is confirmed.
00884     rstate->nak_retrans_timer().force_cancel();
00885     rstate->nak_rpt_timer().force_cancel();
00886   }
00887 
00888   rstate->nak_state() = NAK_CONFIRMED;
00889   // Set/reset the rdata and elim timer to expire at the appropriate time.
00890   rstate->nak_rdata_timer().resched(nak_rdata_ival_);
00891   rstate->nak_elim_timer().resched(nak_elim_ival_);
00892 
00893   Packet::free(pkt);
00894 }
00895 
00896 // Create and send a nak packet to the upstream path.
00897 void PgmAgent::send_nak(ns_addr_t &upstream_node, ns_addr_t &tsi, int seqno, ns_addr_t &source, ns_addr_t &group)
00898 {
00899 
00900 #ifdef PGM_DEBUG
00901   double now = Scheduler::instance().clock();
00902   printf("at %f %s sending NAK for seqno %d upstream.\n", now, uname_, seqno);
00903 #endif
00904 
00905   Packet *nak_pkt = allocpkt();
00906   // Set the simulated size of the NAK packet.
00907   hdr_cmn *nak_hc = HDR_CMN(nak_pkt);
00908   nak_hc->size_ = sizeof(hdr_pgm) + sizeof(hdr_pgm_nak);
00909   nak_hc->ptype_ = PT_PGM;
00910 
00911   // Set the destination address to be our upstream node.
00912   hdr_ip *nak_hip = HDR_IP(nak_pkt);
00913   nak_hip->dst() = upstream_node;
00914 
00915   // Set the color of NAK packet to be black in nam.
00916   nak_hip->fid_ = 8;
00917 
00918   // Fill in the PGM header for the NAK packet.
00919   hdr_pgm *nak_hp = HDR_PGM(nak_pkt);
00920   nak_hp->type_ = PGM_NAK;
00921   nak_hp->tsi_ = tsi;
00922   nak_hp->seqno_ = seqno;
00923 
00924   // Fill in the PGM NAK header for the NAK packet.
00925   hdr_pgm_nak *nak_hpn = HDR_PGM_NAK(nak_pkt);
00926   nak_hpn->source_ = source;
00927   nak_hpn->group_ = group;
00928 
00929   // Send out the packet.
00930   send(nak_pkt, 0);
00931 }
00932 
00933 // Code that is executed when the nak retransmission timer expires.
00934 void PgmAgent::timeout_nak_retrans(RepairState *rstate)
00935 {
00936 
00937   stats_.num_naks_transmitted_++;
00938 
00939   // Send out a new NAK packet.
00940   send_nak(rstate->sinfo()->upstream_node(), rstate->sinfo()->tsi(), rstate->seqno(), rstate->source(), rstate->group());
00941 
00942   // Reset the retransmission timer.
00943   rstate->nak_retrans_timer().resched(nak_retrans_ival_);
00944 
00945 }
00946 
00947 // Code that is executed when a repair state NAK RPT timer expires.
00948 void PgmAgent::timeout_nak_rpt(RepairState *rstate)
00949 {
00950 
00951   printf("%s: timeout_nak_rpt expired, removing repair state.\n", uname_);
00952 
00953   // We never got a confirmation for our NAK packet. We must now
00954   // remove the repair state entirely.
00955   remove_repair_state(rstate);
00956 
00957 }
00958 
00959 // Code that is executed when a repair state NAK RDATA timer expires.
00960 void PgmAgent::timeout_nak_rdata(RepairState *rstate)
00961 {
00962 
00963   printf("%s: timeout_nak_rdata expired, removing repair state.\n", uname_);
00964 
00965   // We never got the RDATA for our NAK. We must now remove the repair
00966   // state entirely.
00967   remove_repair_state(rstate);
00968 
00969 }
00970 
00971 // Code that is executed when a repair state NAK elimination timer expires.
00972 void PgmAgent::timeout_nak_elim(RepairState *rstate)
00973 {
00974 
00975   // Allow one duplicate NAK to come in to be processed and forwarded.
00976   rstate->nak_elimination() = false;
00977 
00978 }
00979 
00980 void PgmAgent::remove_repair_state(RepairState *rstate)
00981 {
00982 
00983   // Cancel all timers.
00984   rstate->nak_retrans_timer().force_cancel();
00985   rstate->nak_rpt_timer().force_cancel();
00986   rstate->nak_rdata_timer().force_cancel();
00987   rstate->nak_elim_timer().force_cancel();
00988 
00989   // Erase the repair state from the TSI repair map.
00990   if (!rstate->sinfo()->repair().erase(rstate->seqno())) {
00991     printf("ERROR (PgmAgent::remove_repair_state): Did not erase seqno from map.\n");
00992   }
00993 
00994 }
00995 
00996 NsObject* PgmAgent::iface2link (int iface)
00997 {
00998 //      Tcl::instance().evalf("%s get-outlink %d", name(), iface);
00999 //      char* ni = Tcl::instance().result();
01000         Tcl&    tcl = Tcl::instance();
01001         char    wrk[64];
01002 
01003     if (iface == -1) {
01004       return NULL;
01005     }
01006 
01007         sprintf (wrk, "[%s set node_] ifaceGetOutLink %d", name (), iface);
01008         tcl.evalc (wrk);
01009         const char* result = tcl.result ();
01010 #ifdef PGM_DEBUG
01011 printf ("[iface2link] agent %s\n", result);
01012 #endif
01013         NsObject* obj = (NsObject*)TclObject::lookup(result);
01014         return (obj);
01015 }
01016 
01017 NsObject* PgmAgent::pkt2agent (Packet *pkt)
01018 {
01019         Tcl&            tcl = Tcl::instance();
01020         char            wrk[64];
01021         const char     *result;
01022         int             port;
01023         NsObject*       agent;
01024         hdr_ip*         ih = HDR_IP(pkt);
01025         //nsaddr_t        src = ih->saddr();
01026 
01027         port = ih->sport();
01028 
01029         sprintf (wrk, "[%s set node_] agent %d", name (), port);
01030         tcl.evalc (wrk);
01031         result = tcl.result ();
01032 
01033 #ifdef PGM_DEBUG
01034 printf ("[pkt2agent] port %d, agent %s\n", port, result);
01035 #endif
01036 
01037         agent = (NsObject*)TclObject::lookup (result);
01038         return (agent);
01039 }
01040 
01041 // Find the state control block given a TSI. Returns NULL if not found.
01042 StateInfo * PgmAgent::find_TSI(ns_addr_t &tsi)
01043 {
01044 
01045   // Use the TSI from the SPM packet and locate the proper state block.
01046   list<StateInfo>::iterator iter = state_list_.begin();
01047   while(iter != state_list_.end()) {
01048     if ( (*iter).tsi().isEqual (tsi) ) {
01049       return &(*iter);
01050     }
01051     iter++;
01052   }
01053 
01054   return NULL;
01055 }
01056 
01057 // Insert a new state control block for the given TSI, and return a pointer
01058 // to the control block.
01059 StateInfo * PgmAgent::insert_TSI(ns_addr_t &tsi)
01060 {
01061   state_list_.push_back(StateInfo(tsi));
01062 
01063   return &(state_list_.back());
01064 }
01065 
01066 void PgmAgent::print_stats()
01067 {
01068   printf("%s:\n", uname_);
01069   printf("\tNAKs Transmitted: \t%d\n", stats_.num_naks_transmitted_);
01070   printf("\tNAKs Suppressed: \t%d\n", stats_.num_suppressed_naks_);
01071   printf("\tUnsolicited NCFs: \t%d\n", stats_.num_unsolicited_ncf_);
01072   printf("\tUnsolicited RDATA: \t%d\n", stats_.num_unsolicited_rdata_);
01073 }
01074 
01075 #ifdef PGM_DEBUG
01076 void PgmAgent::display_packet(Packet *pkt)
01077 {
01078 
01079   double now = Scheduler::instance().clock();
01080 
01081   hdr_ip *hip = HDR_IP(pkt);
01082   hdr_cmn *hc = HDR_CMN(pkt);
01083 
01084   printf("at %f %s received packet type ", now, uname_);
01085 
01086   hdr_pgm *hp = HDR_PGM(pkt);
01087   
01088   hdr_pgm_spm *hps;
01089   hdr_pgm_nak *hpn;
01090 
01091   switch(hp->type_) {
01092   case PGM_SPM:
01093     hps = HDR_PGM_SPM(pkt);
01094 
01095     printf("SPM (TSI %d:%d) from %d:%d to %d:%d iface %d, size %d, seqno %d, spm_path %d:%d\n", hp->tsi_.addr_, hp->tsi_.port_, hip->saddr(), hip->sport(), hip->daddr(), hip->dport(), hc->iface(), hc->size(), hp->seqno_, hps->spm_path_.addr_, hps->spm_path_.port_);
01096 
01097     break;
01098   case PGM_ODATA:
01099     printf("ODATA (TSI %d:%d) from %d:%d to %d:%d iface %d, size %d, seqno %d\n", hp->tsi_.addr_, hp->tsi_.port_, hip->saddr(), hip->sport(), hip->daddr(), hip->dport(), hc->iface(), hc->size(), hp->seqno_);
01100 
01101     break;
01102   case PGM_RDATA:
01103     printf("RDATA (TSI %d:%d) from %d:%d to %d:%d iface %d, size %d, seqno %d\n", hp->tsi_.addr_, hp->tsi_.port_, hip->saddr(), hip->sport(), hip->daddr(), hip->dport(), hc->iface(), hc->size(), hp->seqno_);
01104 
01105     break;
01106   case PGM_NAK:
01107     hpn = HDR_PGM_NAK(pkt);
01108 
01109     printf("NAK (TSI %d:%d) from %d:%d to %d:%d iface %d, size %d, seqno %d, source %d:%d, group %d:%d\n", hp->tsi_.addr_, hp->tsi_.port_, hip->saddr(), hip->sport(), hip->daddr(), hip->dport(), hc->iface(), hc->size(), hp->seqno_, hpn->source_.addr_, hpn->source_.port_, hpn->group_.addr_, hpn->group_.port_);
01110 
01111     break;
01112   case PGM_NCF:
01113     printf("NCF (TSI %d:%d) from %d:%d to %d:%d iface %d, size %d, seqno %d\n", hp->tsi_.addr_, hp->tsi_.port_, hip->saddr(), hip->sport(), hip->daddr(), hip->dport(), hc->iface(), hc->size(), hp->seqno_);
01114 
01115     break;
01116   default:
01117     printf("UNKNOWN (TSI %d:%d) from %d:%d to %d:%d iface %d, size %d, seqno %d\n", hp->tsi_.addr_, hp->tsi_.port_, hip->saddr(), hip->sport(), hip->daddr(), hip->dport(), hc->iface(), hc->size(), hp->seqno_);
01118 
01119     break;
01120   }
01121 
01122 }
01123 #endif // PGM_DEBUG
01124 
01125 #endif //HAVE_STL

Generated on Tue Mar 6 16:47:49 2007 for ns2 Network Simulator 2.29 by  doxygen 1.4.6