pgm-sender.cc

Go to the documentation of this file.
00001 
00002 /*
00003  * pgm-sender.cc
00004  * Copyright (C) 2001 by the University of Southern California
00005  * $Id: pgm-sender.cc,v 1.11 2005/08/25 18:58:10 johnh Exp $
00006  *
00007  * This program is free software; you can redistribute it and/or
00008  * modify it under the terms of the GNU General Public License,
00009  * version 2, as published by the Free Software Foundation.
00010  *
00011  * This program is distributed in the hope that it will be useful,
00012  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00013  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00014  * GNU General Public License for more details.
00015  *
00016  * You should have received a copy of the GNU General Public License along
00017  * with this program; if not, write to the Free Software Foundation, Inc.,
00018  * 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA.
00019  *
00020  *
00021  * The copyright of this module includes the following
00022  * linking-with-specific-other-licenses addition:
00023  *
00024  * In addition, as a special exception, the copyright holders of
00025  * this module give you permission to combine (via static or
00026  * dynamic linking) this module with free software programs or
00027  * libraries that are released under the GNU LGPL and with code
00028  * included in the standard release of ns-2 under the Apache 2.0
00029  * license or under otherwise-compatible licenses with advertising
00030  * requirements (or modified versions of such code, with unchanged
00031  * license).  You may copy and distribute such a system following the
00032  * terms of the GNU GPL for this module and the licenses of the
00033  * other code concerned, provided that you include the source code of
00034  * that other code when and as the GNU GPL requires distribution of
00035  * source code.
00036  *
00037  * Note that people who make modified versions of this module
00038  * are not obligated to grant this special exception for their
00039  * modified versions; it is their choice whether to do so.  The GNU
00040  * General Public License gives permission to release a modified
00041  * version without this exception; this exception also makes it
00042  * possible to release a modified version which carries forward this
00043  * exception.
00044  *
00045  */
00046 
00047 /*
00048  * Pragmatic General Multicast (PGM), Reliable Multicast
00049  *
00050  * pgm-sender.cc
00051  *
00052  * This implements the Sending PGM agent, "Agent/PGM/Sender".
00053  *
00054  * Ryan S. Barnett, 2001
00055  * rbarnett@catarina.usc.edu
00056  */
00057 
00058 #include "config.h"
00059 #ifdef HAVE_STL
00060 
00061 #include <stdlib.h>
00062 #include <stdio.h>
00063 
00064 /* Standard Template Library */
00065 #include <map>
00066 #include <list>
00067 #include <algorithm>
00068 
00069 #include "config.h"
00070 #include "tclcl.h"
00071 #include "agent.h"
00072 #include "packet.h"
00073 #include "ip.h"
00074 #include "random.h"
00075 #include "basetrace.h"
00076 
00077 #include "pgm.h"
00078 
00079 // ************************************************************
00080 // Define the PGM Sender Timer Class
00081 // ************************************************************
00082 class PgmSender;
00083 
00084 /* Define the different timer types. */
00085 enum {
00086   TIMER_SPM = 0,
00087   TIMER_RDATA = 1
00088 };
00089 
00090 class PgmSenderTimer : public TimerHandler {
00091 public:
00092   PgmSenderTimer(PgmSender *a, int type) : TimerHandler(), data_(NULL) {
00093     a_ = a;
00094     type_ = type;
00095   }
00096 
00097   int & type() { return type_; }
00098   void * &data() { return data_; }
00099 
00100 protected:
00101   virtual void expire(Event *e);
00102   PgmSender *a_;
00103   int type_;
00104   void *data_;
00105 };
00106 
00107 // Bundles an RDATA packet with a timer (for sending the RDATA packet),
00108 // and a list of interfaces for which the packet should be sent to.
00109 class RdataItem {
00110 public:
00111   RdataItem(int seqno, PgmSender *a, Packet *rdata_pkt = NULL) :
00112     seqno_(seqno), rdata_pkt_(rdata_pkt), rdata_timer_(a, TIMER_RDATA) { }
00113 
00114   // Allow direct access to the private data.
00115   int & seqno() { return seqno_; }
00116   Packet * & rdata_pkt() { return rdata_pkt_; }
00117   PgmSenderTimer & rdata_timer() { return rdata_timer_; }
00118   list<int> & iface_list() { return iface_list_; }
00119   list<NsObject *> & agent_list() { return agent_list_; }
00120 
00121 protected:
00122   // The sequence number of this RDATA item.
00123   int seqno_;
00124   // The RDATA packet itself.
00125   Packet *rdata_pkt_;
00126   // The timer responsible for sending out this RDATA packet.
00127   PgmSenderTimer rdata_timer_;
00128   // The list of interfaces for which this packet must be sent to.
00129   list<int> iface_list_;
00130   // The list of agents for which this packet must be sent to.
00131   list<NsObject *> agent_list_;
00132 };
00133 
00134 // A class used to keep track of duplicate reply requests.
00135 class ReplyItem {
00136 public:
00137   ReplyItem(int seqno) : seqno_(seqno), retransmissions_(0) { }
00138 
00139   int & seqno() { return seqno_; }
00140   int & retransmissions() { return retransmissions_; }
00141 
00142 protected:
00143   int seqno_;
00144   int retransmissions_;
00145 };
00146 
00147 // Largest size we will allow the reply list to grow.
00148 const int MAX_REPLY_LIST_SIZE = 100;
00149 
00150 // Miscellaneous statistical information gathered during simulation.
00151 struct Stats {
00152   int num_naks_received_;
00153   int num_rdata_sent_;
00154   int max_num_repeated_rdata_;
00155 };
00156 
00157 // ************************************************************
00158 // Define the PGM Sender Class
00159 // ************************************************************
00160 static int pgm_snd_uid_ = 0;
00161 
00162 class PgmSender: public Agent {
00163 public:
00164   PgmSender();
00165 
00166   virtual void recv(Packet *, Handler *);
00167   virtual void timeout(int type, void *data);
00168   virtual int command(int argc, const char*const* argv);
00169   virtual void sendmsg(int nbytes, const char *flags = 0);
00170 
00171 protected:
00172 
00173   virtual void start(); // Starts the SPM heartbeats.
00174   virtual void stop(); // Stops the SPM heartbeats.
00175 
00176   virtual void handle_nak(Packet *pkt); // Process a NAK packet.
00177 
00178   virtual void send_spm(); // Sends an SPM packet to the multicast group.
00179 
00180   virtual void send_rdata(RdataItem *pkt); // Sends the given RDATA packet.
00181 
00182   NsObject* iface2link(int iface);
00183   NsObject* pkt2agent(Packet *pkt);
00184 
00185   void print_stats();
00186 
00187   void display_packet(Packet *pkt); // For debugging.
00188 
00189   void PgmSender::trace_event(char *evType, nsaddr_t daddr, double evTime); 
00190 
00191   EventTrace * et_;     //Trace Object for custom Event Traces
00192 
00193   Stats stats_; // Keep track of various statistics.
00194 
00195   char uname_[16]; // Unique PGM sender name.
00196 
00197   // Map the sequence number of a NAK (requested RDATA) with an item
00198   // that represents the RDATA packet including which interfaces the
00199   // RDATA should be sent to, along with a timer that is used to trigger
00200   // sending of the RDATA packet.
00201   map<int, RdataItem> pending_rdata_;
00202 
00203   // A list to keep track of the number of retransmissions for a given
00204   // RDATA reply.  The max size this will grow is MAX_REPLY_LIST_SIZE.
00205   list<ReplyItem> reply_;
00206 
00207   // The 'typicial' size of a data packet including header. This should
00208   // get set automatically from the application calling sendmsg().  We
00209   // make a simplifying assumption that all packet sizes in a session are
00210   // of equal size.
00211   int pktSize_;
00212 
00213   PgmSenderTimer spm_heartbeat_; // Timer for sending out SPM packets.
00214   int spm_running_; // Whether the heartbeats are running or not.
00215   double spm_interval_; // Time between SPM packets (in seconds).
00216   // Time to delay sending out an RDATA in response to a NAK packet, this
00217   // is to allow slow NAKs to get processed at one time, so we don't send
00218   // out duplicate RDATA.
00219   double rdata_delay_;
00220 
00221   int odata_seqno_; // Current ODATA sequence number.
00222   int spm_seqno_; // Current SPM sequence number.
00223 
00224   //  nsaddr_t group_; // The multicast group we send to.
00225 
00226 };
00227 
00228 void PgmSenderTimer::expire(Event *e) {
00229   a_->timeout(type_, data_);
00230 }
00231 
00232 static class PgmSenderClass : public TclClass {
00233 public:
00234   PgmSenderClass() : TclClass("Agent/PGM/Sender") {}
00235   TclObject * create(int argc, const char * const * argv) {
00236     return (new PgmSender());
00237   }
00238 } class_pgm_sender;
00239 
00240 // Constructor.
00241 PgmSender::PgmSender() : Agent(PT_PGM), pktSize_(0),
00242              spm_heartbeat_(this, TIMER_SPM),
00243              spm_running_(0), odata_seqno_(-1), spm_seqno_(-1)
00244 {
00245   stats_.num_naks_received_ = 0;
00246   stats_.num_rdata_sent_ = 0;
00247   stats_.max_num_repeated_rdata_ = 0;
00248 
00249   sprintf(uname_, "pgmSender-%d", pgm_snd_uid_++);
00250 
00251   bind_time("spm_interval_", &spm_interval_);
00252   bind_time("rdata_delay_", &rdata_delay_);
00253 
00254   et_ = (EventTrace *) NULL;
00255 
00256 }
00257 
00258 // Code that is called when a packet is received.
00259 void PgmSender::recv(Packet *pkt, Handler *)
00260 {
00261   hdr_cmn* hc = HDR_CMN(pkt);
00262 
00263   if (hc->ptype_ == PT_PGM) {
00264 
00265 #ifdef PGM_DEBUG
00266     display_packet(pkt);
00267 #endif
00268 
00269     // Identify the type of PGM packet, if it is a NAK process it, otherwise
00270     // throw an error.
00271     hdr_pgm *hp = HDR_PGM(pkt);
00272 
00273     if (hp->type_ == PGM_NAK) {
00274       handle_nak(pkt);
00275     }
00276     else {
00277       printf("ERROR (PgmSender::handle_pgm_pkt): received unexpected PGM packet type %d, discarding.\n", hp->type_);
00278     }
00279   }
00280   else {
00281     printf ("%s ERROR (PgmSender::recv): received non PGM pkt type %d, discarding.\n", uname_, hc->ptype_);
00282   }
00283 
00284   // Free all packets that this agent receives.
00285   Packet::free(pkt);
00286 }
00287 
00288 // Code that is called when a timer expires.
00289 void PgmSender::timeout(int type, void *data)
00290 {
00291   switch(type) {
00292   case TIMER_SPM:
00293     if (spm_running_) {
00294       send_spm();
00295       spm_heartbeat_.resched(spm_interval_);
00296     }
00297     break;
00298   case TIMER_RDATA:
00299     send_rdata((RdataItem *)data);
00300     break;
00301   default:
00302     printf("ERROR (PgmSender::timeout): invalid timeout type.\n");
00303     break;
00304   }
00305 
00306 }
00307 
00308 // Code that is called when a TCL command is issued to the PGM Sender object.
00309 int PgmSender::command(int argc, const char*const* argv)
00310 {
00311   if (argc == 2) {
00312     if (strcmp(argv[1], "start-SPM") == 0) {
00313       start();
00314       return (TCL_OK);
00315     }
00316     if (strcmp(argv[1], "stop-SPM") == 0) {
00317       stop();
00318       return (TCL_OK);
00319     }
00320     if (strcmp(argv[1], "print-stats") == 0) {
00321       print_stats();
00322       return (TCL_OK);
00323     }
00324   }
00325   else if (argc == 3) {     //If Event Trace is on, set the Event trace handle
00326     if (strcmp(argv[1], "eventtrace") == 0) {
00327       et_ = (EventTrace *)TclObject::lookup(argv[2]);
00328       return (TCL_OK);
00329     }
00330   }
00331 
00332   return (Agent::command(argc, argv));
00333 }
00334 
00335 void PgmSender::trace_event(char *evType, nsaddr_t daddr, double evTime) {
00336 
00337   if (et_ == NULL) return;
00338   char *wrk = et_->buffer();
00339   char *nwrk = et_->nbuffer();
00340 
00341   if (wrk != NULL) {
00342     sprintf(wrk, "E "TIME_FORMAT" %d %d PGM %s "TIME_FORMAT, 
00343             et_->round(Scheduler::instance().clock()),   
00344             addr(),                    
00345             daddr,                   
00346             evType,                  
00347             evTime);    
00348   if (nwrk != 0)
00349     sprintf(nwrk,
00350             "E -t "TIME_FORMAT" -o PGM -e %s -s %d.%d -d %d.%d",
00351             et_->round(Scheduler::instance().clock()),   // time
00352             evType,                    // event type
00353             addr(),                       // owner (src) node id
00354             port(),                       // owner (src) port id
00355             daddr,                      // dst node id
00356             0                       // dst port id
00357             );
00358     et_->dump();
00359   }
00360 
00361 }
00362 
00363 // The application calls this function to send out new ODATA (original DATA).
00364 void PgmSender::sendmsg(int nbytes, const char *flags /* = 0 */)
00365 {
00366   odata_seqno_++;
00367 
00368 #ifdef PGM_DEBUG
00369   double now = Scheduler::instance().clock();
00370 
00371   printf("at %f %s sending ODATA seqno %d\n", now, uname_, odata_seqno_);
00372 #endif
00373 
00374   // Create a packet with the given ODATA.
00375   Packet *pkt = allocpkt();
00376   // Set the simulated size of the packet to the indicated nbytes.
00377   hdr_cmn *hc = HDR_CMN(pkt);
00378   pktSize_ = nbytes + sizeof(hdr_pgm);
00379   hc->size_ = pktSize_;
00380 
00381   hc->ptype_ = PT_PGM;
00382 
00383   // Fill in the PGM header.
00384   hdr_pgm *hp = HDR_PGM(pkt);
00385   hp->type_ = PGM_ODATA;
00386   hp->tsi_ = here_; // Set transport session ID to addr/port of this agent.
00387   hp->seqno_ = odata_seqno_;
00388 
00389   hdr_ip *hip = HDR_IP(pkt);
00390   // Set the color for ODATA packets.
00391   hip->fid_ = 1;
00392 
00393   // Send out the packet.
00394   send(pkt, 0);
00395 }
00396 
00397 void PgmSender::start()
00398 {
00399   spm_running_ = 1;
00400   send_spm();
00401   spm_heartbeat_.resched(spm_interval_);
00402 }
00403 
00404 void PgmSender::stop()
00405 {
00406   spm_heartbeat_.cancel();
00407   spm_running_ = 0;
00408   Tcl::instance().evalf("%s done", this->name());
00409 }
00410 
00411 // Process a NAK packet.
00412 void PgmSender::handle_nak(Packet *pkt)
00413 {
00414 
00415   hdr_cmn *hc = HDR_CMN(pkt);
00416   hdr_pgm *hp = HDR_PGM(pkt);
00417   //hdr_pgm_nak *pnak = HDR_PGM_NAK(pkt);
00418 
00419   if (!(hp->tsi_.isEqual (here_))) {
00420     printf("%s received NAK with wrong TSI, discarding.\n", uname_);
00421     return;
00422   }
00423 
00424   stats_.num_naks_received_++;
00425 
00426   // Create the NCF packet.
00427   Packet *ncf_pkt = allocpkt();
00428   // Set the simulated size of the NCF packet.
00429   hdr_cmn *ncf_hc = HDR_CMN(ncf_pkt);
00430   ncf_hc->size_ = sizeof(hdr_pgm);
00431   ncf_hc->ptype_ = PT_PGM;
00432   // Fill in the PGM header for the NCF packet.
00433   hdr_pgm *ncf_hp = HDR_PGM(ncf_pkt);
00434   ncf_hp->type_ = PGM_NCF;
00435   ncf_hp->tsi_ = here_;
00436   ncf_hp->seqno_ = hp->seqno_;
00437 
00438   hdr_ip *ncf_hip = HDR_IP(ncf_pkt);
00439   // Set the color for NCF packets in nam.
00440   ncf_hip->fid_ = 6;
00441 
00442   // Immediately send the NCF packet to the interface where the NAK
00443   // packet was received. If the packet came from another agent attached
00444   // to this node, then send the packet to that agent.
00445   NsObject *tgt;
00446 
00447   if (hc->iface() < 0) {
00448     tgt = pkt2agent(pkt);
00449   }
00450   else {
00451     tgt = iface2link(hc->iface());
00452   }
00453 
00454   if (tgt == NULL) {
00455     printf("ERROR (PgmSender::handle_nak): iface2link returned NULL.\n");
00456     abort();
00457   }
00458   tgt->recv(ncf_pkt);
00459 
00460   // Queue up an RDATA packet to be transferred to the requestor on the
00461   // appropriate interface.
00462 
00463   // Attempt to locate this NAK sequence number on the pending RDATA map.
00464   pair<map<int, RdataItem>::iterator, bool> result;
00465 
00466   result = pending_rdata_.insert(pair<int, RdataItem>(hp->seqno_, RdataItem(hp->seqno_, this)));
00467 
00468   RdataItem *ritem = &(result.first->second);
00469 
00470   if (result.second == true) {
00471     // The entry was added to the map.
00472 
00473     // Set the data field of the timer.
00474     ritem->rdata_timer().data() = ritem;
00475 
00476     // There is NO pending RDATA for the indicated sequence number.
00477 
00478     // Create the RDATA packet.
00479     Packet *rdata_pkt = allocpkt();
00480     // Set the simulated size of the RDATA packet to the typicial data size.
00481     hdr_cmn *rdata_hc = HDR_CMN(rdata_pkt);
00482     rdata_hc->size_ = pktSize_;
00483     rdata_hc->ptype_ = PT_PGM;
00484     // Fill in the PGM header for RDATA packet.
00485     hdr_pgm *rdata_hp = HDR_PGM(rdata_pkt);
00486     rdata_hp->type_ = PGM_RDATA;
00487     rdata_hp->tsi_ = here_; // Set transport session ID to addr/port of this agent.
00488     rdata_hp->seqno_ = hp->seqno_;
00489 
00490     hdr_ip *rdata_hip = HDR_IP(rdata_pkt);
00491     // Set the color for RDATA packets in nam.
00492     rdata_hip->fid_ = 3;
00493 
00494     // Place the new packet into the RdataItem in the map.
00495     ritem->rdata_pkt() = rdata_pkt;
00496 
00497     // Set the timer to go off at rdata_delay_ seconds from now.
00498     ritem->rdata_timer().resched(rdata_delay_);
00499 
00500     //Output Event Trace, Repair will be sent after rdata_delay_
00501     trace_event("REPAIR BACKOFF", rdata_hip->daddr(), rdata_delay_);
00502 
00503     if (hc->iface() < 0) {
00504       // The NAK was sent from a local agent attached to this node. Keep
00505       // track of which agent this is.
00506       ritem->agent_list().push_back(pkt2agent(pkt));
00507     }
00508     else {
00509       // Set the interface number for this RdataItem.
00510       ritem->iface_list().push_back(hc->iface());
00511     }
00512   }
00513   else {
00514     // Seqno entry already exists in the map.
00515 
00516     // The RDATA is already pending.
00517 
00518     if (hc->iface() < 0) {
00519       // Scan the agent list to see if the agent is already registered
00520       // for this RDATA.
00521 
00522       list<NsObject *> *agent_list = &(ritem->agent_list());
00523 
00524       list<NsObject *>::iterator res = find(agent_list->begin(), agent_list->end(), pkt2agent(pkt));
00525 
00526       if (res == agent_list->end()) {
00527     // Agent not found in agent list for this RDATA, add it.
00528     agent_list->push_back(pkt2agent(pkt));
00529       }
00530       else {
00531     printf("%s: NAK received and already had NAK state for that same agent.\n", uname_);
00532       }
00533 
00534     }
00535     else {
00536       // Scan the interface list to see if the interface is already registered
00537       // for this RDATA.
00538       list<int> *iface_list = &(ritem->iface_list());
00539       list<int>::iterator res = find(iface_list->begin(), iface_list->end(), hc->iface());
00540 
00541       if (res == iface_list->end()) {
00542     // Interface not found in iface list for this RDATA, add it.
00543     iface_list->push_back(hc->iface());
00544       }
00545       else {
00546     // Interface already present in the iface list for this RDATA,
00547     // therefore this NAK is a duplicate.
00548     printf("%s: NAK received and already had NAK state for that same interface.\n", uname_);
00549       }
00550     }
00551 
00552   }
00553 
00554 }
00555 
00556 // Send out a new SPM packet to the multicast group.
00557 void PgmSender::send_spm()
00558 {
00559   spm_seqno_++;
00560 
00561   // Create a packet with the given ODATA.
00562   Packet *pkt = allocpkt();
00563   // Set the simulated size of the packet to the indicated nbytes.
00564   hdr_cmn *hc = HDR_CMN(pkt);
00565   hc->size_ = sizeof(hdr_pgm) + sizeof(hdr_pgm_spm);
00566 
00567   hc->ptype_ = PT_PGM;
00568 
00569   hdr_ip *hip = HDR_IP(pkt);
00570   // Set the color for SPM packets in nam.
00571   hip->fid_ = 7;
00572   //  hip->daddr() = group_;
00573 
00574   // Fill in the PGM header.
00575   hdr_pgm *hp = HDR_PGM(pkt);
00576   hp->type_ = PGM_SPM;
00577   hp->tsi_ = here_; // Set transport session ID to addr/port of this agent.
00578   hp->seqno_ = spm_seqno_;
00579 
00580   // Fill in SPM header.
00581   hdr_pgm_spm *hs = HDR_PGM_SPM(pkt);
00582   hs->spm_path_ = here_; // Set current path to the source agent.
00583 
00584 #ifdef PGM_DEBUG
00585   double now = Scheduler::instance().clock();
00586   printf("at %f %s sending SPM, from %d:%d (here = %d:%d) to %d:%d, TSI %d:%d, type %d\n", now, uname_, hip->saddr(), hip->sport(), addr(), port(), hip->daddr(), hip->dport(), hp->tsi_.addr_, hp->tsi_.port_, hp->type_);
00587 #endif
00588 
00589   // Send out the packet.
00590   send(pkt, 0);
00591 }
00592 
00593 // Send out the given RDATA packet. The packet should be already created
00594 // and headers filled in.  This is triggered when the timer expires.
00595 void PgmSender::send_rdata(RdataItem *item)
00596 {
00597 
00598   // Locate the sequence number of this rdata in the list of sent
00599   // retransmissions.
00600   int count = 0;
00601   list<ReplyItem>::iterator iter = reply_.begin();
00602   while (iter != reply_.end()) {
00603     if ((*iter).seqno() == item->seqno()) {
00604       (*iter).retransmissions() += 1;
00605 
00606       if ((*iter).retransmissions() > stats_.max_num_repeated_rdata_) {
00607     stats_.max_num_repeated_rdata_ = (*iter).retransmissions();
00608       }
00609 
00610       break;
00611     }
00612 
00613     count++;
00614     iter++;
00615   }
00616 
00617   if (iter == reply_.end()) {
00618     // This is the first time we're sending out this RDATA. Append it to
00619     // the back of the reply list.
00620     if (count >= MAX_REPLY_LIST_SIZE) {
00621       // Pop off the front-most element if we've reached the max size of
00622       // the reply list.
00623       reply_.pop_front();
00624     }
00625 
00626     reply_.push_back(ReplyItem(item->seqno()));
00627   }
00628 
00629   stats_.num_rdata_sent_++;
00630 
00631   // Send the packet to each of the interfaces.
00632   if (!item->iface_list().empty()) {
00633     list<int>::iterator iter = item->iface_list().begin();
00634     int flag = 0;   // Used to determine when we need to make additional copies of the packet.
00635     while (iter != item->iface_list().end()) {
00636       NsObject *tgt;
00637       tgt = iface2link(*iter);
00638       Packet *pkt = item->rdata_pkt();
00639       if (flag) {
00640     // Make a copy of each packet before sending it, so we don't free()
00641     // the same packet at the different receivers causing a deallocation
00642     // problem.
00643     pkt = pkt->copy();
00644       } else {
00645     trace_event("SEND RDATA", HDR_IP(pkt)->daddr(), 0);
00646     flag = 1;
00647       }
00648       tgt->recv(pkt);
00649 
00650       iter++;
00651     }
00652 
00653   }
00654 
00655   if (!item->agent_list().empty()) {
00656     list<NsObject *>::iterator iter = item->agent_list().begin();
00657     int flag = 0;
00658     while (iter != item->agent_list().end()) {
00659       Packet *pkt = item->rdata_pkt();
00660       if (flag) {
00661     pkt = pkt->copy ();
00662       } else {
00663     flag = 1;
00664       }
00665       (*iter)->recv(pkt);
00666 
00667       iter++;
00668     }
00669   }
00670 
00671   hdr_pgm *hp = HDR_PGM(item->rdata_pkt());
00672 
00673   // Remove this sequence number from the pending RDATA list, since
00674   // we have now sent that RDATA.
00675   if (!pending_rdata_.erase(hp->seqno_)) {
00676     printf("ERROR (PgmSender::send_rdata): Did not erase RdataItem from map.\n");
00677   }
00678 
00679 }
00680 
00681 NsObject* PgmSender::iface2link (int iface)
00682 {
00683 //      Tcl::instance().evalf("%s get-outlink %d", name(), iface);
00684 //      char* ni = Tcl::instance().result();
00685         Tcl&    tcl = Tcl::instance();
00686         char    wrk[64];
00687 
00688     if (iface == -1) {
00689       return NULL;
00690     }
00691 
00692         sprintf (wrk, "[%s set node_] ifaceGetOutLink %d", name (), iface);
00693         tcl.evalc (wrk);
00694         const char* result = tcl.result ();
00695 #ifdef PGM_DEBUG
00696 printf ("[iface2link] agent %s\n", result);
00697 #endif
00698         NsObject* obj = (NsObject*)TclObject::lookup(result);
00699         return (obj);
00700 }
00701 
00702 NsObject* PgmSender::pkt2agent (Packet *pkt)
00703 {
00704         Tcl&            tcl = Tcl::instance();
00705         char            wrk[64];
00706         const char            *result;
00707         int             port;
00708         NsObject*       agent;
00709         hdr_ip*         ih = HDR_IP(pkt);
00710         //nsaddr_t        src = ih->saddr();
00711 
00712         port = ih->sport();
00713 
00714         sprintf (wrk, "[%s set node_] agent %d", name (), port);
00715         tcl.evalc (wrk);
00716         result = tcl.result ();
00717 
00718 #ifdef PGM_DEBUG
00719 printf ("[pkt2agent] port %d, agent %s\n", port, result);
00720 #endif
00721 
00722         agent = (NsObject*)TclObject::lookup (result);
00723         return (agent);
00724 }
00725 
00726 void PgmSender::print_stats()
00727 {
00728   printf("%s\n", uname_);
00729   printf("\tLast ODATA seqno: %d\n", odata_seqno_);
00730   printf("\tLast SPM seqno: %d\n", spm_seqno_);
00731   printf("\tNumber of NAKs received: %d\n", stats_.num_naks_received_);
00732   printf("\tNumber of RDATA transmitted: %d\n", stats_.num_rdata_sent_);
00733   printf("\tMax retransmission count for a single RDATA: %d\n", stats_.max_num_repeated_rdata_);
00734 }
00735 
00736 #ifdef PGM_DEBUG
00737 void PgmSender::display_packet(Packet *pkt)
00738 {
00739 
00740   double now = Scheduler::instance().clock();
00741 
00742   hdr_ip *hip = HDR_IP(pkt);
00743   hdr_cmn *hc = HDR_CMN(pkt);
00744 
00745   printf("at %f %s received packet type ", now, uname_);
00746 
00747   hdr_pgm *hp = HDR_PGM(pkt);
00748   
00749   hdr_pgm_spm *hps;
00750   hdr_pgm_nak *hpn;
00751 
00752   switch(hp->type_) {
00753   case PGM_SPM:
00754     hps = HDR_PGM_SPM(pkt);
00755 
00756     printf("SPM (TSI %d:%d) from %d:%d to %d:%d iface %d, size %d, seqno %d, spm_path %d:%d\n", hp->tsi_.addr_, hp->tsi_.port_, hip->saddr(), hip->sport(), hip->daddr(), hip->dport(), hc->iface(), hc->size(), hp->seqno_, hps->spm_path_.addr_, hps->spm_path_.port_);
00757 
00758     break;
00759   case PGM_ODATA:
00760     printf("ODATA (TSI %d:%d) from %d:%d to %d:%d iface %d, size %d, seqno %d\n", hp->tsi_.addr_, hp->tsi_.port_, hip->saddr(), hip->sport(), hip->daddr(), hip->dport(), hc->iface(), hc->size(), hp->seqno_);
00761 
00762     break;
00763   case PGM_RDATA:
00764     printf("RDATA (TSI %d:%d) from %d:%d to %d:%d iface %d, size %d, seqno %d\n", hp->tsi_.addr_, hp->tsi_.port_, hip->saddr(), hip->sport(), hip->daddr(), hip->dport(), hc->iface(), hc->size(), hp->seqno_);
00765 
00766     break;
00767   case PGM_NAK:
00768     hpn = HDR_PGM_NAK(pkt);
00769 
00770     printf("NAK (TSI %d:%d) from %d:%d to %d:%d iface %d, size %d, seqno %d, source %d:%d, group %d:%d\n", hp->tsi_.addr_, hp->tsi_.port_, hip->saddr(), hip->sport(), hip->daddr(), hip->dport(), hc->iface(), hc->size(), hp->seqno_, hpn->source_.addr_, hpn->source_.port_, hpn->group_.addr_, hpn->group_.port_);
00771 
00772     break;
00773   case PGM_NCF:
00774     printf("NCF (TSI %d:%d) from %d:%d to %d:%d iface %d, size %d, seqno %d\n", hp->tsi_.addr_, hp->tsi_.port_, hip->saddr(), hip->sport(), hip->daddr(), hip->dport(), hc->iface(), hc->size(), hp->seqno_);
00775 
00776     break;
00777   default:
00778     printf("UNKNOWN (TSI %d:%d) from %d:%d to %d:%d iface %d, size %d, seqno %d\n", hp->tsi_.addr_, hp->tsi_.port_, hip->saddr(), hip->sport(), hip->daddr(), hip->dport(), hc->iface(), hc->size(), hp->seqno_);
00779 
00780     break;
00781   }
00782 
00783 }
00784 #endif // PGM_DEBUG
00785 
00786 #endif //HAVE_STL
00787 

Generated on Tue Mar 6 16:47:49 2007 for ns2 Network Simulator 2.29 by  doxygen 1.4.6