srm.cc

Go to the documentation of this file.
00001 /* -*-  Mode:C++; c-basic-offset:8; tab-width:8; indent-tabs-mode:t -*- */
00002 
00003 /*
00004  * srm.cc
00005  * Copyright (C) 1997 by the University of Southern California
00006  * $Id: srm.cc,v 1.27 2005/08/25 18:58:08 johnh Exp $
00007  *
00008  * This program is free software; you can redistribute it and/or
00009  * modify it under the terms of the GNU General Public License,
00010  * version 2, as published by the Free Software Foundation.
00011  *
00012  * This program is distributed in the hope that it will be useful,
00013  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00014  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00015  * GNU General Public License for more details.
00016  *
00017  * You should have received a copy of the GNU General Public License along
00018  * with this program; if not, write to the Free Software Foundation, Inc.,
00019  * 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA.
00020  *
00021  *
00022  * The copyright of this module includes the following
00023  * linking-with-specific-other-licenses addition:
00024  *
00025  * In addition, as a special exception, the copyright holders of
00026  * this module give you permission to combine (via static or
00027  * dynamic linking) this module with free software programs or
00028  * libraries that are released under the GNU LGPL and with code
00029  * included in the standard release of ns-2 under the Apache 2.0
00030  * license or under otherwise-compatible licenses with advertising
00031  * requirements (or modified versions of such code, with unchanged
00032  * license).  You may copy and distribute such a system following the
00033  * terms of the GNU GPL for this module and the licenses of the
00034  * other code concerned, provided that you include the source code of
00035  * that other code when and as the GNU GPL requires distribution of
00036  * source code.
00037  *
00038  * Note that people who make modified versions of this module
00039  * are not obligated to grant this special exception for their
00040  * modified versions; it is their choice whether to do so.  The GNU
00041  * General Public License gives permission to release a modified
00042  * version without this exception; this exception also makes it
00043  * possible to release a modified version which carries forward this
00044  * exception.
00045  *
00046  */
00047 
00048 //
00049 //  Maintainer: Kannan Varadhan <kannan@isi.edu>
00050 //  Version Date:   Tue Jul 22 15:41:16 PDT 1997
00051 //
00052 
00053 #ifndef lint
00054 static const char rcsid[] =
00055     "@(#) $Header: /nfs/jade/vint/CVSROOT/ns-2/mcast/srm.cc,v 1.27 2005/08/25 18:58:08 johnh Exp $ (USC/ISI)";
00056 #endif
00057 
00058 #include <stdlib.h>
00059 #include <assert.h>
00060 
00061 #include "config.h"
00062 #include "agent.h"
00063 #include "ip.h"
00064 #include "srm.h"
00065 #include "trace.h"
00066 #include "rtp.h"
00067 
00068 
00069 int hdr_srm::offset_;
00070 int hdr_asrm::offset_;
00071 
00072 static class SRMHeaderClass : public PacketHeaderClass {
00073 public:
00074     SRMHeaderClass() : PacketHeaderClass("PacketHeader/SRM",
00075                          sizeof(hdr_srm)) {
00076         bind_offset(&hdr_srm::offset_);
00077     }
00078 } class_srmhdr;
00079 
00080 static class ASRMHeaderClass : public PacketHeaderClass {
00081 public:
00082     ASRMHeaderClass() : PacketHeaderClass("PacketHeader/aSRM",
00083                           sizeof(hdr_asrm)) {
00084         bind_offset(&hdr_asrm::offset_);
00085     }
00086 } class_adaptive_srmhdr;
00087 
00088 
00089 static class SRMAgentClass : public TclClass {
00090 public:
00091     SRMAgentClass() : TclClass("Agent/SRM") {}
00092     TclObject* create(int, const char*const*) {
00093         return (new SRMAgent());
00094     }
00095 } class_srm_agent;
00096 
00097 static class ASRMAgentClass : public TclClass {
00098 public:
00099     ASRMAgentClass() : TclClass("Agent/SRM/Adaptive") {}
00100     TclObject* create(int, const char*const*) {
00101         return (new ASRMAgent());
00102     }
00103 } class_adaptive_srm_agent;
00104 
00105 
00106 SRMAgent::SRMAgent() 
00107     : Agent(PT_SRM), dataCtr_(-1), sessCtr_(-1), siphash_(0), seqno_(-1),
00108     app_type_(PT_NTYPE)
00109 {
00110     sip_ = new SRMinfo(-1);
00111 
00112     bind("packetSize_", &packetSize_);
00113     bind("groupSize_", &groupSize_);
00114     bind("app_fid_", &app_fid_);
00115 }
00116 
00117 SRMAgent::~SRMAgent()
00118 {
00119     cleanup();
00120 }
00121 
00122 int SRMAgent::command(int argc, const char*const* argv)
00123 {
00124     Tcl& tcl = Tcl::instance();
00125 
00126     if (strcmp(argv[1], "send") == 0) {
00127         if (strcmp(argv[2], "session") == 0) {
00128             send_sess();
00129             return TCL_OK;
00130         }
00131         if (strcmp(argv[2], "request") == 0) {
00132             int round = atoi(argv[3]);
00133             int sender = atoi(argv[4]);
00134             int msgid  = atoi(argv[5]);
00135             send_ctrl(SRM_RQST, round, sender, msgid, 0);
00136             return TCL_OK;
00137         }
00138         if (strcmp(argv[2], "repair") == 0) {
00139             int round = atoi(argv[3]);
00140             int sender = atoi(argv[4]);
00141             int msgid  = atoi(argv[5]);
00142             send_ctrl(SRM_REPR, round, sender, msgid, packetSize_);
00143             return TCL_OK;
00144         }
00145         tcl.resultf("%s: invalid send request %s", name_, argv[2]);
00146         return TCL_ERROR;
00147     }
00148     if (argc == 2) {
00149         if (strcmp(argv[1], "distances?") == 0) {
00150             tcl.result("");
00151             if (sip_->sender_ >= 0) {  // i.e. this agent is active
00152                 for (SRMinfo* sp = sip_; sp; sp = sp->next_) {
00153                     tcl.resultf("%s %d %f", tcl.result(),
00154                             sp->sender_,
00155                             sp->distance_);
00156                 }
00157             }
00158             return TCL_OK;
00159         }
00160         if (strcmp(argv[1], "start") == 0) {
00161             start();
00162             return TCL_OK;
00163         }
00164     }
00165     if (argc == 3) {
00166         if (strcmp(argv[1], "distance?") == 0) {
00167             int sender = atoi(argv[2]);
00168             SRMinfo* sp = get_state(sender);
00169             tcl.resultf("%lf", sp->distance_);
00170             return TCL_OK;
00171         }
00172         if (strcmp(argv[1], "eventtrace") == 0) {
00173             return (TCL_OK);
00174         }
00175     }
00176     return Agent::command(argc, argv);
00177 }
00178 
00179 void SRMAgent::recv(Packet* p, Handler* h)
00180 {
00181     hdr_ip*  ih = hdr_ip::access(p);
00182     hdr_srm* sh = hdr_srm::access(p);
00183     
00184     if (ih->daddr() == -1) {
00185         // Packet from local agent.  Add srm headers, set dst, and fwd
00186         sh->type() = SRM_DATA;
00187         sh->sender() = addr();
00188         sh->seqnum() = ++dataCtr_;
00189         addExtendedHeaders(p);
00190         ih->dst() = dst_;
00191         target_->recv(p, h);
00192     } else {
00193 
00194 #if 0
00195         static char *foo[] = {"NONE", "DATA", "SESS", "RQST", "REPR"};
00196         fprintf(stderr, "%7.4f %s %d recvd SRM_%s <%d, %d> from %d\n",
00197             Scheduler::instance().clock(), name_, addr_,
00198             foo[sh->type()],
00199             sh->sender(), sh->seqnum(), ih->src());
00200 #endif
00201         
00202         parseExtendedHeaders(p);
00203         switch (sh->type()) {
00204         case SRM_DATA:
00205             recv_data(sh->sender(), sh->seqnum(), p->accessdata());
00206             break;
00207         case SRM_RQST:
00208             recv_rqst(ih->saddr(),
00209                   sh->round(), sh->sender(), sh->seqnum());
00210             break;
00211         case SRM_REPR:
00212             recv_repr(sh->round(), sh->sender(), sh->seqnum(),
00213                   p->accessdata());
00214             break;
00215         case SRM_SESS:
00216             // This seqnum() is the session sequence number,
00217             // not the data packet sequence numbers seen before.
00218             recv_sess(p, sh->seqnum(), (int*) p->accessdata());
00219             break;
00220         }
00221         Packet::free(p);
00222     }
00223 }
00224 
00225 void SRMAgent::sendmsg(int nbytes, const char* /*flags*/)
00226 {
00227     if (nbytes == -1) {
00228         printf("Error:  sendmsg() for SRM should not be -1\n");
00229         return;
00230     }
00231     // The traffic generator may have reset our payload type when it
00232     // initialized.  If so, save the current payload type as app_type_,
00233     // and set type_ to PT_SRM.  Use app_type_ for all app. packets
00234     // 
00235     if (type_ != PT_SRM) {
00236         app_type_ = type_;
00237         type_ = PT_SRM;
00238     }
00239     size_ = nbytes;
00240     Packet *p;
00241     p = allocpkt();
00242         hdr_ip*  ih = hdr_ip::access(p);
00243         hdr_srm* sh = hdr_srm::access(p);
00244     hdr_rtp* rh = hdr_rtp::access(p);
00245     hdr_cmn* ch = hdr_cmn::access(p);
00246     //hdr_cmn* ch = hdr_cmn::access(p);
00247     
00248     ch->ptype() = app_type_;
00249     ch->size() =  size_;
00250     ih->flowid() = app_fid_;
00251     rh->seqno() = ++seqno_;
00252     // Add srm headers, set dst, and fwd
00253     sh->type() = SRM_DATA;
00254     sh->sender() = addr();
00255     sh->seqnum() = ++dataCtr_;
00256     addExtendedHeaders(p);
00257     ih->dst() = dst_;
00258     target_->recv(p);
00259 }
00260 
00261 
00262 void SRMAgent::send_ctrl(int type, int round, int sender, int msgid, int size)
00263 {
00264     Packet* p = Agent::allocpkt();
00265     hdr_srm* sh = hdr_srm::access(p);
00266     sh->type() = type;
00267     sh->sender() = sender;
00268     sh->seqnum() = msgid;
00269     sh->round() = round;
00270     addExtendedHeaders(p);
00271 
00272     hdr_cmn* ch = hdr_cmn::access(p);
00273     ch->size() = sizeof(hdr_srm) + size;
00274     target_->recv(p);
00275 }
00276 
00277 void SRMAgent::recv_data(int sender, int msgid, u_char*)
00278 {
00279     Tcl& tcl = Tcl::instance();
00280     SRMinfo* sp = get_state(sender);
00281     if (msgid > sp->ldata_) {
00282         (void) request(sp, msgid - 1);
00283         sp->setReceived(msgid);
00284         sp->ldata_ = msgid;
00285     } else {
00286         tcl.evalf("%s recv data %d %d", name_, sender, msgid);
00287     }
00288 }
00289 
00290 void SRMAgent::recv_rqst(int requestor, int round, int sender, int msgid)
00291 {
00292     Tcl& tcl = Tcl::instance();
00293     SRMinfo* sp = get_state(sender);
00294     if (msgid > sp->ldata_) {
00295         (void) request(sp, msgid);  // request upto msgid
00296         sp->ldata_ = msgid;
00297     } else {
00298         tcl.evalf("%s recv request %d %d %d %d", name_,
00299               requestor, round, sender, msgid);
00300     }
00301 }
00302 
00303 void SRMAgent::recv_repr(int round, int sender, int msgid, u_char*)
00304 {
00305     Tcl& tcl = Tcl::instance();
00306     SRMinfo* sp = get_state(sender);
00307     if (msgid > sp->ldata_) {
00308         (void) request(sp, msgid - 1);  // request upto msgid - 1
00309         sp->setReceived(msgid);
00310         sp->ldata_ = msgid;
00311     } else {
00312         tcl.evalf("%s recv repair %d %d %d", name_,
00313               round, sender, msgid);
00314     }
00315     // Notice that we currently make no provisions for a listener
00316     // agent to receive the data.
00317 }
00318 
00319 void SRMAgent::send_sess()
00320 {
00321     int size = (1 + groupSize_ * 4) * sizeof(int);
00322     Packet* p = Agent::allocpkt(size);
00323     hdr_srm* sh = hdr_srm::access(p);
00324     sh->type() = SRM_SESS;
00325     sh->sender() = addr();
00326     sh->seqnum() = ++sessCtr_;
00327     addExtendedHeaders(p);
00328 
00329     int* data = (int*) p->accessdata();
00330     *data++ = groupSize_;
00331     for (SRMinfo* sp = sip_; sp; sp = sp->next_) {
00332         *data++ = sp->sender_;
00333         *data++ = sp->ldata_;
00334         *data++ = sp->recvTime_;
00335         *data++ = sp->sendTime_;
00336     }
00337     data = (int*) p->accessdata();
00338     data[4] = (int) (Scheduler::instance().clock()*1000);
00339 
00340     hdr_cmn* ch = hdr_cmn::access(p);
00341     ch->size() = size+ sizeof(hdr_srm);
00342 
00343     target_->recv(p, (Handler*)NULL);
00344 }
00345 
00346 #define GET_SESSION_INFO            \
00347     sender = *data++;           \
00348     dataCnt = *data++;          \
00349     rtime = *data++;            \
00350     stime = *data++
00351 
00352 void SRMAgent::recv_sess(Packet*, int sessCtr, int* data)
00353 {
00354     SRMinfo* sp;
00355     
00356     int sender, dataCnt, rtime, stime;
00357     int now, sentAt, sentBy;
00358     int cnt = *data++;
00359     int i;
00360 
00361     /* The first block contains the sender's own state */
00362     GET_SESSION_INFO;
00363     if (sender == addr())           // sender's own session message
00364         return;
00365 
00366     sp = get_state(sender);
00367     if (sp->lsess_ > sessCtr)       // older session message recd.
00368         return;
00369     
00370     now = (int) (Scheduler::instance().clock() * 1000);
00371     sentBy = sender;            // to later compute rtt
00372     sentAt = stime;
00373     
00374     sp->lsess_ = sessCtr;
00375     sp->recvTime_ = now;
00376     sp->sendTime_ = stime;
00377     (void) request(sp, dataCnt);
00378     if (sp->ldata_ < dataCnt)
00379         sp->ldata_ = dataCnt;
00380     
00381     for (i = 1; i < cnt; i++) {
00382         GET_SESSION_INFO;
00383         if (sender == addr() && now) {
00384             //
00385             //    This session message from sender sentBy:
00386             //        vvvvv
00387             //      now <=======+ sentAt
00388             //       |     |
00389             //    stime +=======> rtime
00390             //        ^^^^^
00391             //   Earlier session message sent by ``this'' agent
00392             //
00393                         int rtt = (now - sentAt) + (rtime - stime);
00394             sp = get_state(sentBy);
00395             sp->distance_ = (double) rtt / 2 / 1000;
00396 #if 0
00397             fprintf(stderr,
00398                 "%7.4f %s compute distance to %d: %f\n",
00399                 Scheduler::instance().clock(), name_,
00400                 sentBy, sp->distance_);
00401 #endif
00402             continue;
00403         }
00404         sp = get_state(sender);
00405         (void) request(sp, dataCnt);
00406         if (sp->ldata_ < dataCnt)
00407             sp->ldata_ = dataCnt;
00408     }
00409 }

Generated on Tue Mar 6 16:47:51 2007 for ns2 Network Simulator 2.29 by  doxygen 1.4.6