00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040
00041
00042
00043
00044
00045
00046
00047
00048
00049
00050
00051
00052
00053 #ifndef lint
00054 static const char rcsid[] =
00055 "@(#) $Header: /nfs/jade/vint/CVSROOT/ns-2/mcast/srm.cc,v 1.27 2005/08/25 18:58:08 johnh Exp $ (USC/ISI)";
00056 #endif
00057
00058 #include <stdlib.h>
00059 #include <assert.h>
00060
00061 #include "config.h"
00062 #include "agent.h"
00063 #include "ip.h"
00064 #include "srm.h"
00065 #include "trace.h"
00066 #include "rtp.h"
00067
00068
00069 int hdr_srm::offset_;
00070 int hdr_asrm::offset_;
00071
00072 static class SRMHeaderClass : public PacketHeaderClass {
00073 public:
00074 SRMHeaderClass() : PacketHeaderClass("PacketHeader/SRM",
00075 sizeof(hdr_srm)) {
00076 bind_offset(&hdr_srm::offset_);
00077 }
00078 } class_srmhdr;
00079
00080 static class ASRMHeaderClass : public PacketHeaderClass {
00081 public:
00082 ASRMHeaderClass() : PacketHeaderClass("PacketHeader/aSRM",
00083 sizeof(hdr_asrm)) {
00084 bind_offset(&hdr_asrm::offset_);
00085 }
00086 } class_adaptive_srmhdr;
00087
00088
00089 static class SRMAgentClass : public TclClass {
00090 public:
00091 SRMAgentClass() : TclClass("Agent/SRM") {}
00092 TclObject* create(int, const char*const*) {
00093 return (new SRMAgent());
00094 }
00095 } class_srm_agent;
00096
00097 static class ASRMAgentClass : public TclClass {
00098 public:
00099 ASRMAgentClass() : TclClass("Agent/SRM/Adaptive") {}
00100 TclObject* create(int, const char*const*) {
00101 return (new ASRMAgent());
00102 }
00103 } class_adaptive_srm_agent;
00104
00105
00106 SRMAgent::SRMAgent()
00107 : Agent(PT_SRM), dataCtr_(-1), sessCtr_(-1), siphash_(0), seqno_(-1),
00108 app_type_(PT_NTYPE)
00109 {
00110 sip_ = new SRMinfo(-1);
00111
00112 bind("packetSize_", &packetSize_);
00113 bind("groupSize_", &groupSize_);
00114 bind("app_fid_", &app_fid_);
00115 }
00116
00117 SRMAgent::~SRMAgent()
00118 {
00119 cleanup();
00120 }
00121
00122 int SRMAgent::command(int argc, const char*const* argv)
00123 {
00124 Tcl& tcl = Tcl::instance();
00125
00126 if (strcmp(argv[1], "send") == 0) {
00127 if (strcmp(argv[2], "session") == 0) {
00128 send_sess();
00129 return TCL_OK;
00130 }
00131 if (strcmp(argv[2], "request") == 0) {
00132 int round = atoi(argv[3]);
00133 int sender = atoi(argv[4]);
00134 int msgid = atoi(argv[5]);
00135 send_ctrl(SRM_RQST, round, sender, msgid, 0);
00136 return TCL_OK;
00137 }
00138 if (strcmp(argv[2], "repair") == 0) {
00139 int round = atoi(argv[3]);
00140 int sender = atoi(argv[4]);
00141 int msgid = atoi(argv[5]);
00142 send_ctrl(SRM_REPR, round, sender, msgid, packetSize_);
00143 return TCL_OK;
00144 }
00145 tcl.resultf("%s: invalid send request %s", name_, argv[2]);
00146 return TCL_ERROR;
00147 }
00148 if (argc == 2) {
00149 if (strcmp(argv[1], "distances?") == 0) {
00150 tcl.result("");
00151 if (sip_->sender_ >= 0) {
00152 for (SRMinfo* sp = sip_; sp; sp = sp->next_) {
00153 tcl.resultf("%s %d %f", tcl.result(),
00154 sp->sender_,
00155 sp->distance_);
00156 }
00157 }
00158 return TCL_OK;
00159 }
00160 if (strcmp(argv[1], "start") == 0) {
00161 start();
00162 return TCL_OK;
00163 }
00164 }
00165 if (argc == 3) {
00166 if (strcmp(argv[1], "distance?") == 0) {
00167 int sender = atoi(argv[2]);
00168 SRMinfo* sp = get_state(sender);
00169 tcl.resultf("%lf", sp->distance_);
00170 return TCL_OK;
00171 }
00172 if (strcmp(argv[1], "eventtrace") == 0) {
00173 return (TCL_OK);
00174 }
00175 }
00176 return Agent::command(argc, argv);
00177 }
00178
00179 void SRMAgent::recv(Packet* p, Handler* h)
00180 {
00181 hdr_ip* ih = hdr_ip::access(p);
00182 hdr_srm* sh = hdr_srm::access(p);
00183
00184 if (ih->daddr() == -1) {
00185
00186 sh->type() = SRM_DATA;
00187 sh->sender() = addr();
00188 sh->seqnum() = ++dataCtr_;
00189 addExtendedHeaders(p);
00190 ih->dst() = dst_;
00191 target_->recv(p, h);
00192 } else {
00193
00194 #if 0
00195 static char *foo[] = {"NONE", "DATA", "SESS", "RQST", "REPR"};
00196 fprintf(stderr, "%7.4f %s %d recvd SRM_%s <%d, %d> from %d\n",
00197 Scheduler::instance().clock(), name_, addr_,
00198 foo[sh->type()],
00199 sh->sender(), sh->seqnum(), ih->src());
00200 #endif
00201
00202 parseExtendedHeaders(p);
00203 switch (sh->type()) {
00204 case SRM_DATA:
00205 recv_data(sh->sender(), sh->seqnum(), p->accessdata());
00206 break;
00207 case SRM_RQST:
00208 recv_rqst(ih->saddr(),
00209 sh->round(), sh->sender(), sh->seqnum());
00210 break;
00211 case SRM_REPR:
00212 recv_repr(sh->round(), sh->sender(), sh->seqnum(),
00213 p->accessdata());
00214 break;
00215 case SRM_SESS:
00216
00217
00218 recv_sess(p, sh->seqnum(), (int*) p->accessdata());
00219 break;
00220 }
00221 Packet::free(p);
00222 }
00223 }
00224
00225 void SRMAgent::sendmsg(int nbytes, const char* )
00226 {
00227 if (nbytes == -1) {
00228 printf("Error: sendmsg() for SRM should not be -1\n");
00229 return;
00230 }
00231
00232
00233
00234
00235 if (type_ != PT_SRM) {
00236 app_type_ = type_;
00237 type_ = PT_SRM;
00238 }
00239 size_ = nbytes;
00240 Packet *p;
00241 p = allocpkt();
00242 hdr_ip* ih = hdr_ip::access(p);
00243 hdr_srm* sh = hdr_srm::access(p);
00244 hdr_rtp* rh = hdr_rtp::access(p);
00245 hdr_cmn* ch = hdr_cmn::access(p);
00246
00247
00248 ch->ptype() = app_type_;
00249 ch->size() = size_;
00250 ih->flowid() = app_fid_;
00251 rh->seqno() = ++seqno_;
00252
00253 sh->type() = SRM_DATA;
00254 sh->sender() = addr();
00255 sh->seqnum() = ++dataCtr_;
00256 addExtendedHeaders(p);
00257 ih->dst() = dst_;
00258 target_->recv(p);
00259 }
00260
00261
00262 void SRMAgent::send_ctrl(int type, int round, int sender, int msgid, int size)
00263 {
00264 Packet* p = Agent::allocpkt();
00265 hdr_srm* sh = hdr_srm::access(p);
00266 sh->type() = type;
00267 sh->sender() = sender;
00268 sh->seqnum() = msgid;
00269 sh->round() = round;
00270 addExtendedHeaders(p);
00271
00272 hdr_cmn* ch = hdr_cmn::access(p);
00273 ch->size() = sizeof(hdr_srm) + size;
00274 target_->recv(p);
00275 }
00276
00277 void SRMAgent::recv_data(int sender, int msgid, u_char*)
00278 {
00279 Tcl& tcl = Tcl::instance();
00280 SRMinfo* sp = get_state(sender);
00281 if (msgid > sp->ldata_) {
00282 (void) request(sp, msgid - 1);
00283 sp->setReceived(msgid);
00284 sp->ldata_ = msgid;
00285 } else {
00286 tcl.evalf("%s recv data %d %d", name_, sender, msgid);
00287 }
00288 }
00289
00290 void SRMAgent::recv_rqst(int requestor, int round, int sender, int msgid)
00291 {
00292 Tcl& tcl = Tcl::instance();
00293 SRMinfo* sp = get_state(sender);
00294 if (msgid > sp->ldata_) {
00295 (void) request(sp, msgid);
00296 sp->ldata_ = msgid;
00297 } else {
00298 tcl.evalf("%s recv request %d %d %d %d", name_,
00299 requestor, round, sender, msgid);
00300 }
00301 }
00302
00303 void SRMAgent::recv_repr(int round, int sender, int msgid, u_char*)
00304 {
00305 Tcl& tcl = Tcl::instance();
00306 SRMinfo* sp = get_state(sender);
00307 if (msgid > sp->ldata_) {
00308 (void) request(sp, msgid - 1);
00309 sp->setReceived(msgid);
00310 sp->ldata_ = msgid;
00311 } else {
00312 tcl.evalf("%s recv repair %d %d %d", name_,
00313 round, sender, msgid);
00314 }
00315
00316
00317 }
00318
00319 void SRMAgent::send_sess()
00320 {
00321 int size = (1 + groupSize_ * 4) * sizeof(int);
00322 Packet* p = Agent::allocpkt(size);
00323 hdr_srm* sh = hdr_srm::access(p);
00324 sh->type() = SRM_SESS;
00325 sh->sender() = addr();
00326 sh->seqnum() = ++sessCtr_;
00327 addExtendedHeaders(p);
00328
00329 int* data = (int*) p->accessdata();
00330 *data++ = groupSize_;
00331 for (SRMinfo* sp = sip_; sp; sp = sp->next_) {
00332 *data++ = sp->sender_;
00333 *data++ = sp->ldata_;
00334 *data++ = sp->recvTime_;
00335 *data++ = sp->sendTime_;
00336 }
00337 data = (int*) p->accessdata();
00338 data[4] = (int) (Scheduler::instance().clock()*1000);
00339
00340 hdr_cmn* ch = hdr_cmn::access(p);
00341 ch->size() = size+ sizeof(hdr_srm);
00342
00343 target_->recv(p, (Handler*)NULL);
00344 }
00345
00346 #define GET_SESSION_INFO \
00347 sender = *data++; \
00348 dataCnt = *data++; \
00349 rtime = *data++; \
00350 stime = *data++
00351
00352 void SRMAgent::recv_sess(Packet*, int sessCtr, int* data)
00353 {
00354 SRMinfo* sp;
00355
00356 int sender, dataCnt, rtime, stime;
00357 int now, sentAt, sentBy;
00358 int cnt = *data++;
00359 int i;
00360
00361
00362 GET_SESSION_INFO;
00363 if (sender == addr())
00364 return;
00365
00366 sp = get_state(sender);
00367 if (sp->lsess_ > sessCtr)
00368 return;
00369
00370 now = (int) (Scheduler::instance().clock() * 1000);
00371 sentBy = sender;
00372 sentAt = stime;
00373
00374 sp->lsess_ = sessCtr;
00375 sp->recvTime_ = now;
00376 sp->sendTime_ = stime;
00377 (void) request(sp, dataCnt);
00378 if (sp->ldata_ < dataCnt)
00379 sp->ldata_ = dataCnt;
00380
00381 for (i = 1; i < cnt; i++) {
00382 GET_SESSION_INFO;
00383 if (sender == addr() && now) {
00384
00385
00386
00387
00388
00389
00390
00391
00392
00393 int rtt = (now - sentAt) + (rtime - stime);
00394 sp = get_state(sentBy);
00395 sp->distance_ = (double) rtt / 2 / 1000;
00396 #if 0
00397 fprintf(stderr,
00398 "%7.4f %s compute distance to %d: %f\n",
00399 Scheduler::instance().clock(), name_,
00400 sentBy, sp->distance_);
00401 #endif
00402 continue;
00403 }
00404 sp = get_state(sender);
00405 (void) request(sp, dataCnt);
00406 if (sp->ldata_ < dataCnt)
00407 sp->ldata_ = dataCnt;
00408 }
00409 }