00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040
00041
00042
00043
00044
00045
00046
00047
00048
00049
00050
00051
00052
00053
00054
00055
00056
00057
00058 #include "config.h"
00059 #ifdef HAVE_STL
00060
00061 #include <stdlib.h>
00062 #include <stdio.h>
00063
00064
00065 #include <map>
00066 #include <list>
00067 #include <algorithm>
00068
00069 #include "config.h"
00070 #include "tclcl.h"
00071 #include "agent.h"
00072 #include "packet.h"
00073 #include "ip.h"
00074 #include "random.h"
00075 #include "basetrace.h"
00076
00077 #include "pgm.h"
00078
00079
00080
00081
00082 class PgmSender;
00083
00084
00085 enum {
00086 TIMER_SPM = 0,
00087 TIMER_RDATA = 1
00088 };
00089
00090 class PgmSenderTimer : public TimerHandler {
00091 public:
00092 PgmSenderTimer(PgmSender *a, int type) : TimerHandler(), data_(NULL) {
00093 a_ = a;
00094 type_ = type;
00095 }
00096
00097 int & type() { return type_; }
00098 void * &data() { return data_; }
00099
00100 protected:
00101 virtual void expire(Event *e);
00102 PgmSender *a_;
00103 int type_;
00104 void *data_;
00105 };
00106
00107
00108
00109 class RdataItem {
00110 public:
00111 RdataItem(int seqno, PgmSender *a, Packet *rdata_pkt = NULL) :
00112 seqno_(seqno), rdata_pkt_(rdata_pkt), rdata_timer_(a, TIMER_RDATA) { }
00113
00114
00115 int & seqno() { return seqno_; }
00116 Packet * & rdata_pkt() { return rdata_pkt_; }
00117 PgmSenderTimer & rdata_timer() { return rdata_timer_; }
00118 list<int> & iface_list() { return iface_list_; }
00119 list<NsObject *> & agent_list() { return agent_list_; }
00120
00121 protected:
00122
00123 int seqno_;
00124
00125 Packet *rdata_pkt_;
00126
00127 PgmSenderTimer rdata_timer_;
00128
00129 list<int> iface_list_;
00130
00131 list<NsObject *> agent_list_;
00132 };
00133
00134
00135 class ReplyItem {
00136 public:
00137 ReplyItem(int seqno) : seqno_(seqno), retransmissions_(0) { }
00138
00139 int & seqno() { return seqno_; }
00140 int & retransmissions() { return retransmissions_; }
00141
00142 protected:
00143 int seqno_;
00144 int retransmissions_;
00145 };
00146
00147
00148 const int MAX_REPLY_LIST_SIZE = 100;
00149
00150
00151 struct Stats {
00152 int num_naks_received_;
00153 int num_rdata_sent_;
00154 int max_num_repeated_rdata_;
00155 };
00156
00157
00158
00159
00160 static int pgm_snd_uid_ = 0;
00161
00162 class PgmSender: public Agent {
00163 public:
00164 PgmSender();
00165
00166 virtual void recv(Packet *, Handler *);
00167 virtual void timeout(int type, void *data);
00168 virtual int command(int argc, const char*const* argv);
00169 virtual void sendmsg(int nbytes, const char *flags = 0);
00170
00171 protected:
00172
00173 virtual void start();
00174 virtual void stop();
00175
00176 virtual void handle_nak(Packet *pkt);
00177
00178 virtual void send_spm();
00179
00180 virtual void send_rdata(RdataItem *pkt);
00181
00182 NsObject* iface2link(int iface);
00183 NsObject* pkt2agent(Packet *pkt);
00184
00185 void print_stats();
00186
00187 void display_packet(Packet *pkt);
00188
00189 void PgmSender::trace_event(char *evType, nsaddr_t daddr, double evTime);
00190
00191 EventTrace * et_;
00192
00193 Stats stats_;
00194
00195 char uname_[16];
00196
00197
00198
00199
00200
00201 map<int, RdataItem> pending_rdata_;
00202
00203
00204
00205 list<ReplyItem> reply_;
00206
00207
00208
00209
00210
00211 int pktSize_;
00212
00213 PgmSenderTimer spm_heartbeat_;
00214 int spm_running_;
00215 double spm_interval_;
00216
00217
00218
00219 double rdata_delay_;
00220
00221 int odata_seqno_;
00222 int spm_seqno_;
00223
00224
00225
00226 };
00227
00228 void PgmSenderTimer::expire(Event *e) {
00229 a_->timeout(type_, data_);
00230 }
00231
00232 static class PgmSenderClass : public TclClass {
00233 public:
00234 PgmSenderClass() : TclClass("Agent/PGM/Sender") {}
00235 TclObject * create(int argc, const char * const * argv) {
00236 return (new PgmSender());
00237 }
00238 } class_pgm_sender;
00239
00240
00241 PgmSender::PgmSender() : Agent(PT_PGM), pktSize_(0),
00242 spm_heartbeat_(this, TIMER_SPM),
00243 spm_running_(0), odata_seqno_(-1), spm_seqno_(-1)
00244 {
00245 stats_.num_naks_received_ = 0;
00246 stats_.num_rdata_sent_ = 0;
00247 stats_.max_num_repeated_rdata_ = 0;
00248
00249 sprintf(uname_, "pgmSender-%d", pgm_snd_uid_++);
00250
00251 bind_time("spm_interval_", &spm_interval_);
00252 bind_time("rdata_delay_", &rdata_delay_);
00253
00254 et_ = (EventTrace *) NULL;
00255
00256 }
00257
00258
00259 void PgmSender::recv(Packet *pkt, Handler *)
00260 {
00261 hdr_cmn* hc = HDR_CMN(pkt);
00262
00263 if (hc->ptype_ == PT_PGM) {
00264
00265 #ifdef PGM_DEBUG
00266 display_packet(pkt);
00267 #endif
00268
00269
00270
00271 hdr_pgm *hp = HDR_PGM(pkt);
00272
00273 if (hp->type_ == PGM_NAK) {
00274 handle_nak(pkt);
00275 }
00276 else {
00277 printf("ERROR (PgmSender::handle_pgm_pkt): received unexpected PGM packet type %d, discarding.\n", hp->type_);
00278 }
00279 }
00280 else {
00281 printf ("%s ERROR (PgmSender::recv): received non PGM pkt type %d, discarding.\n", uname_, hc->ptype_);
00282 }
00283
00284
00285 Packet::free(pkt);
00286 }
00287
00288
00289 void PgmSender::timeout(int type, void *data)
00290 {
00291 switch(type) {
00292 case TIMER_SPM:
00293 if (spm_running_) {
00294 send_spm();
00295 spm_heartbeat_.resched(spm_interval_);
00296 }
00297 break;
00298 case TIMER_RDATA:
00299 send_rdata((RdataItem *)data);
00300 break;
00301 default:
00302 printf("ERROR (PgmSender::timeout): invalid timeout type.\n");
00303 break;
00304 }
00305
00306 }
00307
00308
00309 int PgmSender::command(int argc, const char*const* argv)
00310 {
00311 if (argc == 2) {
00312 if (strcmp(argv[1], "start-SPM") == 0) {
00313 start();
00314 return (TCL_OK);
00315 }
00316 if (strcmp(argv[1], "stop-SPM") == 0) {
00317 stop();
00318 return (TCL_OK);
00319 }
00320 if (strcmp(argv[1], "print-stats") == 0) {
00321 print_stats();
00322 return (TCL_OK);
00323 }
00324 }
00325 else if (argc == 3) {
00326 if (strcmp(argv[1], "eventtrace") == 0) {
00327 et_ = (EventTrace *)TclObject::lookup(argv[2]);
00328 return (TCL_OK);
00329 }
00330 }
00331
00332 return (Agent::command(argc, argv));
00333 }
00334
00335 void PgmSender::trace_event(char *evType, nsaddr_t daddr, double evTime) {
00336
00337 if (et_ == NULL) return;
00338 char *wrk = et_->buffer();
00339 char *nwrk = et_->nbuffer();
00340
00341 if (wrk != NULL) {
00342 sprintf(wrk, "E "TIME_FORMAT" %d %d PGM %s "TIME_FORMAT,
00343 et_->round(Scheduler::instance().clock()),
00344 addr(),
00345 daddr,
00346 evType,
00347 evTime);
00348 if (nwrk != 0)
00349 sprintf(nwrk,
00350 "E -t "TIME_FORMAT" -o PGM -e %s -s %d.%d -d %d.%d",
00351 et_->round(Scheduler::instance().clock()),
00352 evType,
00353 addr(),
00354 port(),
00355 daddr,
00356 0
00357 );
00358 et_->dump();
00359 }
00360
00361 }
00362
00363
00364 void PgmSender::sendmsg(int nbytes, const char *flags )
00365 {
00366 odata_seqno_++;
00367
00368 #ifdef PGM_DEBUG
00369 double now = Scheduler::instance().clock();
00370
00371 printf("at %f %s sending ODATA seqno %d\n", now, uname_, odata_seqno_);
00372 #endif
00373
00374
00375 Packet *pkt = allocpkt();
00376
00377 hdr_cmn *hc = HDR_CMN(pkt);
00378 pktSize_ = nbytes + sizeof(hdr_pgm);
00379 hc->size_ = pktSize_;
00380
00381 hc->ptype_ = PT_PGM;
00382
00383
00384 hdr_pgm *hp = HDR_PGM(pkt);
00385 hp->type_ = PGM_ODATA;
00386 hp->tsi_ = here_;
00387 hp->seqno_ = odata_seqno_;
00388
00389 hdr_ip *hip = HDR_IP(pkt);
00390
00391 hip->fid_ = 1;
00392
00393
00394 send(pkt, 0);
00395 }
00396
00397 void PgmSender::start()
00398 {
00399 spm_running_ = 1;
00400 send_spm();
00401 spm_heartbeat_.resched(spm_interval_);
00402 }
00403
00404 void PgmSender::stop()
00405 {
00406 spm_heartbeat_.cancel();
00407 spm_running_ = 0;
00408 Tcl::instance().evalf("%s done", this->name());
00409 }
00410
00411
00412 void PgmSender::handle_nak(Packet *pkt)
00413 {
00414
00415 hdr_cmn *hc = HDR_CMN(pkt);
00416 hdr_pgm *hp = HDR_PGM(pkt);
00417
00418
00419 if (!(hp->tsi_.isEqual (here_))) {
00420 printf("%s received NAK with wrong TSI, discarding.\n", uname_);
00421 return;
00422 }
00423
00424 stats_.num_naks_received_++;
00425
00426
00427 Packet *ncf_pkt = allocpkt();
00428
00429 hdr_cmn *ncf_hc = HDR_CMN(ncf_pkt);
00430 ncf_hc->size_ = sizeof(hdr_pgm);
00431 ncf_hc->ptype_ = PT_PGM;
00432
00433 hdr_pgm *ncf_hp = HDR_PGM(ncf_pkt);
00434 ncf_hp->type_ = PGM_NCF;
00435 ncf_hp->tsi_ = here_;
00436 ncf_hp->seqno_ = hp->seqno_;
00437
00438 hdr_ip *ncf_hip = HDR_IP(ncf_pkt);
00439
00440 ncf_hip->fid_ = 6;
00441
00442
00443
00444
00445 NsObject *tgt;
00446
00447 if (hc->iface() < 0) {
00448 tgt = pkt2agent(pkt);
00449 }
00450 else {
00451 tgt = iface2link(hc->iface());
00452 }
00453
00454 if (tgt == NULL) {
00455 printf("ERROR (PgmSender::handle_nak): iface2link returned NULL.\n");
00456 abort();
00457 }
00458 tgt->recv(ncf_pkt);
00459
00460
00461
00462
00463
00464 pair<map<int, RdataItem>::iterator, bool> result;
00465
00466 result = pending_rdata_.insert(pair<int, RdataItem>(hp->seqno_, RdataItem(hp->seqno_, this)));
00467
00468 RdataItem *ritem = &(result.first->second);
00469
00470 if (result.second == true) {
00471
00472
00473
00474 ritem->rdata_timer().data() = ritem;
00475
00476
00477
00478
00479 Packet *rdata_pkt = allocpkt();
00480
00481 hdr_cmn *rdata_hc = HDR_CMN(rdata_pkt);
00482 rdata_hc->size_ = pktSize_;
00483 rdata_hc->ptype_ = PT_PGM;
00484
00485 hdr_pgm *rdata_hp = HDR_PGM(rdata_pkt);
00486 rdata_hp->type_ = PGM_RDATA;
00487 rdata_hp->tsi_ = here_;
00488 rdata_hp->seqno_ = hp->seqno_;
00489
00490 hdr_ip *rdata_hip = HDR_IP(rdata_pkt);
00491
00492 rdata_hip->fid_ = 3;
00493
00494
00495 ritem->rdata_pkt() = rdata_pkt;
00496
00497
00498 ritem->rdata_timer().resched(rdata_delay_);
00499
00500
00501 trace_event("REPAIR BACKOFF", rdata_hip->daddr(), rdata_delay_);
00502
00503 if (hc->iface() < 0) {
00504
00505
00506 ritem->agent_list().push_back(pkt2agent(pkt));
00507 }
00508 else {
00509
00510 ritem->iface_list().push_back(hc->iface());
00511 }
00512 }
00513 else {
00514
00515
00516
00517
00518 if (hc->iface() < 0) {
00519
00520
00521
00522 list<NsObject *> *agent_list = &(ritem->agent_list());
00523
00524 list<NsObject *>::iterator res = find(agent_list->begin(), agent_list->end(), pkt2agent(pkt));
00525
00526 if (res == agent_list->end()) {
00527
00528 agent_list->push_back(pkt2agent(pkt));
00529 }
00530 else {
00531 printf("%s: NAK received and already had NAK state for that same agent.\n", uname_);
00532 }
00533
00534 }
00535 else {
00536
00537
00538 list<int> *iface_list = &(ritem->iface_list());
00539 list<int>::iterator res = find(iface_list->begin(), iface_list->end(), hc->iface());
00540
00541 if (res == iface_list->end()) {
00542
00543 iface_list->push_back(hc->iface());
00544 }
00545 else {
00546
00547
00548 printf("%s: NAK received and already had NAK state for that same interface.\n", uname_);
00549 }
00550 }
00551
00552 }
00553
00554 }
00555
00556
00557 void PgmSender::send_spm()
00558 {
00559 spm_seqno_++;
00560
00561
00562 Packet *pkt = allocpkt();
00563
00564 hdr_cmn *hc = HDR_CMN(pkt);
00565 hc->size_ = sizeof(hdr_pgm) + sizeof(hdr_pgm_spm);
00566
00567 hc->ptype_ = PT_PGM;
00568
00569 hdr_ip *hip = HDR_IP(pkt);
00570
00571 hip->fid_ = 7;
00572
00573
00574
00575 hdr_pgm *hp = HDR_PGM(pkt);
00576 hp->type_ = PGM_SPM;
00577 hp->tsi_ = here_;
00578 hp->seqno_ = spm_seqno_;
00579
00580
00581 hdr_pgm_spm *hs = HDR_PGM_SPM(pkt);
00582 hs->spm_path_ = here_;
00583
00584 #ifdef PGM_DEBUG
00585 double now = Scheduler::instance().clock();
00586 printf("at %f %s sending SPM, from %d:%d (here = %d:%d) to %d:%d, TSI %d:%d, type %d\n", now, uname_, hip->saddr(), hip->sport(), addr(), port(), hip->daddr(), hip->dport(), hp->tsi_.addr_, hp->tsi_.port_, hp->type_);
00587 #endif
00588
00589
00590 send(pkt, 0);
00591 }
00592
00593
00594
00595 void PgmSender::send_rdata(RdataItem *item)
00596 {
00597
00598
00599
00600 int count = 0;
00601 list<ReplyItem>::iterator iter = reply_.begin();
00602 while (iter != reply_.end()) {
00603 if ((*iter).seqno() == item->seqno()) {
00604 (*iter).retransmissions() += 1;
00605
00606 if ((*iter).retransmissions() > stats_.max_num_repeated_rdata_) {
00607 stats_.max_num_repeated_rdata_ = (*iter).retransmissions();
00608 }
00609
00610 break;
00611 }
00612
00613 count++;
00614 iter++;
00615 }
00616
00617 if (iter == reply_.end()) {
00618
00619
00620 if (count >= MAX_REPLY_LIST_SIZE) {
00621
00622
00623 reply_.pop_front();
00624 }
00625
00626 reply_.push_back(ReplyItem(item->seqno()));
00627 }
00628
00629 stats_.num_rdata_sent_++;
00630
00631
00632 if (!item->iface_list().empty()) {
00633 list<int>::iterator iter = item->iface_list().begin();
00634 int flag = 0;
00635 while (iter != item->iface_list().end()) {
00636 NsObject *tgt;
00637 tgt = iface2link(*iter);
00638 Packet *pkt = item->rdata_pkt();
00639 if (flag) {
00640
00641
00642
00643 pkt = pkt->copy();
00644 } else {
00645 trace_event("SEND RDATA", HDR_IP(pkt)->daddr(), 0);
00646 flag = 1;
00647 }
00648 tgt->recv(pkt);
00649
00650 iter++;
00651 }
00652
00653 }
00654
00655 if (!item->agent_list().empty()) {
00656 list<NsObject *>::iterator iter = item->agent_list().begin();
00657 int flag = 0;
00658 while (iter != item->agent_list().end()) {
00659 Packet *pkt = item->rdata_pkt();
00660 if (flag) {
00661 pkt = pkt->copy ();
00662 } else {
00663 flag = 1;
00664 }
00665 (*iter)->recv(pkt);
00666
00667 iter++;
00668 }
00669 }
00670
00671 hdr_pgm *hp = HDR_PGM(item->rdata_pkt());
00672
00673
00674
00675 if (!pending_rdata_.erase(hp->seqno_)) {
00676 printf("ERROR (PgmSender::send_rdata): Did not erase RdataItem from map.\n");
00677 }
00678
00679 }
00680
00681 NsObject* PgmSender::iface2link (int iface)
00682 {
00683
00684
00685 Tcl& tcl = Tcl::instance();
00686 char wrk[64];
00687
00688 if (iface == -1) {
00689 return NULL;
00690 }
00691
00692 sprintf (wrk, "[%s set node_] ifaceGetOutLink %d", name (), iface);
00693 tcl.evalc (wrk);
00694 const char* result = tcl.result ();
00695 #ifdef PGM_DEBUG
00696 printf ("[iface2link] agent %s\n", result);
00697 #endif
00698 NsObject* obj = (NsObject*)TclObject::lookup(result);
00699 return (obj);
00700 }
00701
00702 NsObject* PgmSender::pkt2agent (Packet *pkt)
00703 {
00704 Tcl& tcl = Tcl::instance();
00705 char wrk[64];
00706 const char *result;
00707 int port;
00708 NsObject* agent;
00709 hdr_ip* ih = HDR_IP(pkt);
00710
00711
00712 port = ih->sport();
00713
00714 sprintf (wrk, "[%s set node_] agent %d", name (), port);
00715 tcl.evalc (wrk);
00716 result = tcl.result ();
00717
00718 #ifdef PGM_DEBUG
00719 printf ("[pkt2agent] port %d, agent %s\n", port, result);
00720 #endif
00721
00722 agent = (NsObject*)TclObject::lookup (result);
00723 return (agent);
00724 }
00725
00726 void PgmSender::print_stats()
00727 {
00728 printf("%s\n", uname_);
00729 printf("\tLast ODATA seqno: %d\n", odata_seqno_);
00730 printf("\tLast SPM seqno: %d\n", spm_seqno_);
00731 printf("\tNumber of NAKs received: %d\n", stats_.num_naks_received_);
00732 printf("\tNumber of RDATA transmitted: %d\n", stats_.num_rdata_sent_);
00733 printf("\tMax retransmission count for a single RDATA: %d\n", stats_.max_num_repeated_rdata_);
00734 }
00735
00736 #ifdef PGM_DEBUG
00737 void PgmSender::display_packet(Packet *pkt)
00738 {
00739
00740 double now = Scheduler::instance().clock();
00741
00742 hdr_ip *hip = HDR_IP(pkt);
00743 hdr_cmn *hc = HDR_CMN(pkt);
00744
00745 printf("at %f %s received packet type ", now, uname_);
00746
00747 hdr_pgm *hp = HDR_PGM(pkt);
00748
00749 hdr_pgm_spm *hps;
00750 hdr_pgm_nak *hpn;
00751
00752 switch(hp->type_) {
00753 case PGM_SPM:
00754 hps = HDR_PGM_SPM(pkt);
00755
00756 printf("SPM (TSI %d:%d) from %d:%d to %d:%d iface %d, size %d, seqno %d, spm_path %d:%d\n", hp->tsi_.addr_, hp->tsi_.port_, hip->saddr(), hip->sport(), hip->daddr(), hip->dport(), hc->iface(), hc->size(), hp->seqno_, hps->spm_path_.addr_, hps->spm_path_.port_);
00757
00758 break;
00759 case PGM_ODATA:
00760 printf("ODATA (TSI %d:%d) from %d:%d to %d:%d iface %d, size %d, seqno %d\n", hp->tsi_.addr_, hp->tsi_.port_, hip->saddr(), hip->sport(), hip->daddr(), hip->dport(), hc->iface(), hc->size(), hp->seqno_);
00761
00762 break;
00763 case PGM_RDATA:
00764 printf("RDATA (TSI %d:%d) from %d:%d to %d:%d iface %d, size %d, seqno %d\n", hp->tsi_.addr_, hp->tsi_.port_, hip->saddr(), hip->sport(), hip->daddr(), hip->dport(), hc->iface(), hc->size(), hp->seqno_);
00765
00766 break;
00767 case PGM_NAK:
00768 hpn = HDR_PGM_NAK(pkt);
00769
00770 printf("NAK (TSI %d:%d) from %d:%d to %d:%d iface %d, size %d, seqno %d, source %d:%d, group %d:%d\n", hp->tsi_.addr_, hp->tsi_.port_, hip->saddr(), hip->sport(), hip->daddr(), hip->dport(), hc->iface(), hc->size(), hp->seqno_, hpn->source_.addr_, hpn->source_.port_, hpn->group_.addr_, hpn->group_.port_);
00771
00772 break;
00773 case PGM_NCF:
00774 printf("NCF (TSI %d:%d) from %d:%d to %d:%d iface %d, size %d, seqno %d\n", hp->tsi_.addr_, hp->tsi_.port_, hip->saddr(), hip->sport(), hip->daddr(), hip->dport(), hc->iface(), hc->size(), hp->seqno_);
00775
00776 break;
00777 default:
00778 printf("UNKNOWN (TSI %d:%d) from %d:%d to %d:%d iface %d, size %d, seqno %d\n", hp->tsi_.addr_, hp->tsi_.port_, hip->saddr(), hip->sport(), hip->daddr(), hip->dport(), hc->iface(), hc->size(), hp->seqno_);
00779
00780 break;
00781 }
00782
00783 }
00784 #endif // PGM_DEBUG
00785
00786 #endif //HAVE_STL
00787