00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040
00041
00042
00043
00044
00045
00046
00047
00048
00049
00050
00051
00052
00053
00054
00055
00056
00057
00058 #include "config.h"
00059 #ifdef HAVE_STL
00060
00061 #include <stdlib.h>
00062 #include <stdio.h>
00063
00064 #include <map>
00065
00066 #include "config.h"
00067 #include "tclcl.h"
00068 #include "agent.h"
00069 #include "packet.h"
00070 #include "ip.h"
00071 #include "random.h"
00072 #include "basetrace.h"
00073
00074 #include "rcvbuf.h"
00075
00076 #include "pgm.h"
00077
00078
00079
00080
00081 class PgmReceiver;
00082
00083
00084 enum {
00085 NAK_TIMER = 0
00086 };
00087
00088 class PgmReceiverTimer : public TimerHandler {
00089 public:
00090 PgmReceiverTimer(PgmReceiver *a, int type) : TimerHandler(), data_(NULL) {
00091 a_ = a;
00092 type_ = type;
00093 }
00094
00095 void * & data() { return data_; }
00096
00097 protected:
00098 virtual void expire(Event *e);
00099 PgmReceiver *a_;
00100 int type_;
00101 void *data_;
00102 };
00103
00104
00105
00106
00107
00108
00109 enum {
00110 BACK_OFF_STATE = 0,
00111 WAIT_NCF_STATE = 1,
00112 WAIT_DATA_STATE = 2
00113 };
00114
00115 class NakItem {
00116 public:
00117 NakItem(PgmReceiver *a, int seqno) : nak_state_(BACK_OFF_STATE),
00118 nak_sent_(false), seqno_(seqno), ncf_retry_count_(0),
00119 data_retry_count_(0), nak_timer_(a, NAK_TIMER) { }
00120
00121 int & nak_state() { return nak_state_; }
00122 bool & nak_sent() { return nak_sent_; }
00123 int & seqno() { return seqno_; }
00124 int & ncf_retry_count() { return ncf_retry_count_; }
00125 int & data_retry_count() { return data_retry_count_; }
00126 PgmReceiverTimer & nak_timer() { return nak_timer_; }
00127
00128 protected:
00129
00130
00131 int nak_state_;
00132
00133
00134 bool nak_sent_;
00135
00136
00137 int seqno_;
00138
00139
00140 int ncf_retry_count_;
00141
00142
00143 int data_retry_count_;
00144
00145 PgmReceiverTimer nak_timer_;
00146 };
00147
00148
00149
00150
00151 static int pgm_rcv_uid_ = 0;
00152
00153 struct Stats {
00154
00155
00156 int naks_transmitted_;
00157
00158
00159 int naks_duplicated_;
00160 };
00161
00162 class PgmReceiver: public Agent {
00163 public:
00164 PgmReceiver();
00165
00166 virtual void recv(Packet *, Handler *);
00167 virtual void timeout(int type, void *data);
00168 virtual int command(int argc, const char*const* argv);
00169
00170 protected:
00171
00172 void handle_spm(Packet *pkt);
00173 void handle_odata(Packet *pkt);
00174 void handle_rdata(Packet *pkt);
00175 void handle_nak(Packet *pkt);
00176 void handle_ncf(Packet *pkt);
00177
00178 void generate_Nak(int seqno);
00179 void cancel_Nak(int seqno, NakItem *nitem = NULL);
00180 void timeout_nak(NakItem *data);
00181 void send_nak(int seqno);
00182
00183 void print_stats();
00184 void display_packet(Packet *pkt);
00185
00186 void PgmReceiver::trace_event(char *evType, double evTime);
00187
00188 EventTrace * et_;
00189
00190 char uname_[16];
00191
00192
00193 Stats stats_;
00194
00195
00196
00197
00198 int max_nak_ncf_retries_;
00199
00200
00201
00202
00203 int max_nak_data_retries_;
00204
00205
00206
00207 double nak_bo_ivl_;
00208
00209
00210
00211 double nak_rpt_ivl_;
00212
00213
00214
00215 double nak_rdata_ivl_;
00216
00217
00218
00219 bool have_tsi_state_;
00220
00221 int spm_seqno_;
00222
00223 ns_addr_t tsi_;
00224 ns_addr_t upstream_node_;
00225 int upstream_iface_;
00226
00227
00228 ns_addr_t source_;
00229 ns_addr_t group_;
00230
00231
00232 RcvBuffer rcvbuf_;
00233
00234
00235 map<int, NakItem> naks_;
00236
00237 };
00238
00239 static class PgmReceiverClass : public TclClass {
00240 public:
00241 PgmReceiverClass() : TclClass("Agent/PGM/Receiver") {}
00242 TclObject * create(int argc, const char * const * argv) {
00243 return (new PgmReceiver());
00244 }
00245 } class_pgm_receiver;
00246
00247 void PgmReceiverTimer::expire(Event *e) {
00248 a_->timeout(type_, data_);
00249 }
00250
00251
00252 PgmReceiver::PgmReceiver() : Agent(PT_PGM), have_tsi_state_(false),
00253 spm_seqno_(-1)
00254 {
00255 stats_.naks_transmitted_ = 0;
00256 stats_.naks_duplicated_ = 0;
00257
00258 sprintf (uname_, "pgmRecv-%d", pgm_rcv_uid_++);
00259 bind("max_nak_ncf_retries_", &max_nak_ncf_retries_);
00260 bind("max_nak_data_retries_", &max_nak_data_retries_);
00261 bind_time("nak_bo_ivl_", &nak_bo_ivl_);
00262 bind_time("nak_rpt_ivl_", &nak_rpt_ivl_);
00263 bind_time("nak_rdata_ivl_", &nak_rdata_ivl_);
00264
00265 et_ = (EventTrace *) NULL;
00266
00267 }
00268
00269
00270 void PgmReceiver::recv(Packet *pkt, Handler *)
00271 {
00272 hdr_pgm *hp = HDR_PGM(pkt);
00273
00274 hdr_cmn *hc = HDR_CMN(pkt);
00275
00276 if (hc->ptype_ != PT_PGM) {
00277 printf("%s ERROR (PgmReceiver::recv): received non PGM pkt type %d, discarding.\n", uname_, hc->ptype_);
00278 Packet::free(pkt);
00279 return;
00280 }
00281
00282 #ifdef PGM_DEBUG
00283 display_packet(pkt);
00284 #endif
00285
00286 switch(hp->type_) {
00287 case PGM_SPM:
00288 handle_spm(pkt);
00289 break;
00290 case PGM_ODATA:
00291 handle_odata(pkt);
00292 break;
00293 case PGM_RDATA:
00294 handle_rdata(pkt);
00295 break;
00296 case PGM_NAK:
00297
00298
00299 handle_nak(pkt);
00300 break;
00301 case PGM_NCF:
00302 handle_ncf(pkt);
00303 break;
00304 default:
00305 printf("ERROR (PgmReceiver::recv): Received invalid PGM type %d.\n",
00306 hp->type_);
00307 break;
00308 }
00309
00310 Packet::free(pkt);
00311 }
00312
00313
00314 void PgmReceiver::timeout(int type, void *data)
00315 {
00316
00317 switch(type) {
00318 case NAK_TIMER:
00319 timeout_nak((NakItem *) data);
00320 break;
00321 default:
00322 printf("ERROR (PgmReceiver::timeout): Unknown timeout type %d.\n", type);
00323 break;
00324 }
00325
00326 }
00327
00328
00329 int PgmReceiver::command(int argc, const char*const* argv)
00330 {
00331
00332
00333 if (argc == 2) {
00334 if (strcmp(argv[1], "print-stats") == 0) {
00335 print_stats();
00336 return (TCL_OK);
00337 }
00338 }
00339 else if (argc == 3) {
00340 if (strcmp(argv[1], "eventtrace") == 0) {
00341 et_ = (EventTrace *)TclObject::lookup(argv[2]);
00342 return (TCL_OK);
00343 }
00344 }
00345
00346 return (Agent::command(argc, argv));
00347 }
00348
00349 void PgmReceiver::trace_event(char *evType, double evTime) {
00350
00351 if (et_ == NULL) return;
00352 char *wrk = et_->buffer();
00353 char *nwrk = et_->nbuffer();
00354
00355 if (wrk != NULL) {
00356 sprintf(wrk, "E "TIME_FORMAT" %d %d PGM %s "TIME_FORMAT,
00357 et_->round(Scheduler::instance().clock()),
00358 addr(),
00359 0,
00360 evType,
00361 evTime);
00362 if (nwrk != 0)
00363 sprintf(nwrk,
00364 "E -t "TIME_FORMAT" -o PGM -e %s -s %d.%d -d %d.%d",
00365 et_->round(Scheduler::instance().clock()),
00366 evType,
00367 addr(),
00368 port(),
00369 0,
00370 0
00371 );
00372 et_->dump();
00373 }
00374
00375 }
00376
00377 void PgmReceiver::handle_spm(Packet *pkt)
00378 {
00379 hdr_cmn *hc = HDR_CMN(pkt);
00380 hdr_ip *hip = HDR_IP(pkt);
00381 hdr_pgm *hp = HDR_PGM(pkt);
00382 hdr_pgm_spm *hps = HDR_PGM_SPM(pkt);
00383
00384 if (have_tsi_state_ == false) {
00385
00386 have_tsi_state_ = true;
00387
00388
00389 tsi_ = hp->tsi_;
00390
00391
00392 source_ = hip->src();
00393 group_ = hip->dst();
00394 }
00395 else {
00396
00397
00398 if (!(hp->tsi_.isEqual (tsi_))) {
00399 printf("%s Received SPM with incorrect TSI, discarding.\n", uname_);
00400 return;
00401 }
00402
00403
00404 if (hp->seqno_ <= spm_seqno_) {
00405 printf("%s received an old SPM seqno, discarding.\n", uname_);
00406 return;
00407 }
00408 }
00409
00410
00411 spm_seqno_ = hp->seqno_;
00412
00413
00414 upstream_node_ = hps->spm_path_;
00415
00416
00417 upstream_iface_ = hc->iface();
00418 }
00419
00420 void PgmReceiver::handle_odata(Packet *pkt)
00421 {
00422 hdr_pgm *hp = HDR_PGM(pkt);
00423
00424
00425 if ( (have_tsi_state_ == true) && !(hp->tsi_.isEqual (tsi_)) ) {
00426 printf("PGM Receiver received ODATA with incorrect TSI, discarding.\n");
00427 return;
00428 }
00429
00430 double clock = Scheduler::instance().clock();
00431
00432 if (rcvbuf_.nextpkt_ < hp->seqno_) {
00433 int lo = rcvbuf_.nextpkt_;
00434 int hi = hp->seqno_ - 1;
00435
00436 for (int i = lo; i <= hi; i++) {
00437 printf("%s detected loss of seq %d\n", uname_, i);
00438
00439 if (have_tsi_state_ == false) {
00440 printf("%s has no TSI/SPM state when lost packet was detected. This results in unrecoverable data loss.\n", uname_);
00441 }
00442 else {
00443 generate_Nak(i);
00444 }
00445 }
00446 }
00447
00448 rcvbuf_.add_pkt(hp->seqno_, clock);
00449
00450
00451
00452 cancel_Nak(hp->seqno_, NULL);
00453 }
00454
00455 void PgmReceiver::handle_rdata(Packet *pkt)
00456 {
00457
00458 hdr_pgm *hp = HDR_PGM(pkt);
00459
00460
00461 if ( (have_tsi_state_ == true) && !(hp->tsi_.isEqual (tsi_)) ) {
00462 printf("%s received RDATA with incorrect TSI, discarding.\n", uname_);
00463 return;
00464 }
00465
00466 if ( (rcvbuf_.nextpkt_ > hp->seqno_) && !rcvbuf_.exists_pkt(hp->seqno_) ) {
00467
00468
00469 cancel_Nak(hp->seqno_, NULL);
00470 }
00471
00472 double clock = Scheduler::instance().clock();
00473 rcvbuf_.add_pkt (hp->seqno_, clock);
00474
00475 }
00476
00477
00478
00479
00480 void PgmReceiver::handle_nak(Packet *pkt)
00481 {
00482 hdr_pgm *hp = HDR_PGM(pkt);
00483
00484
00485 if ( (have_tsi_state_ == true) && !(hp->tsi_.isEqual (tsi_)) ) {
00486 printf("%s received NAK with incorrect TSI, discarding.\n", uname_);
00487 return;
00488 }
00489
00490
00491 map<int, NakItem>::iterator result = naks_.find(hp->seqno_);
00492
00493 if (result == naks_.end()) {
00494
00495 printf("%s received multicast NAK but no NAK state found, discarding.\n", uname_);
00496 return;
00497 }
00498
00499 NakItem *nitem = &((*result).second);
00500
00501 switch( nitem->nak_state() ) {
00502 case BACK_OFF_STATE:
00503
00504 nitem->nak_state() = WAIT_NCF_STATE;
00505
00506
00507 nitem->nak_timer().resched(nak_rpt_ivl_);
00508
00509 break;
00510 case WAIT_NCF_STATE:
00511
00512
00513
00514 nitem->nak_timer().resched(nak_rpt_ivl_);
00515
00516 break;
00517 case WAIT_DATA_STATE:
00518
00519
00520
00521 nitem->nak_timer().resched(nak_rdata_ivl_);
00522
00523 break;
00524 default:
00525 printf("ERROR (PgmReceiver::handle_nak): Unknown nak state %d.\n", nitem->nak_state());
00526 break;
00527 }
00528
00529 }
00530
00531 void PgmReceiver::handle_ncf(Packet *pkt)
00532 {
00533 hdr_pgm *hp = HDR_PGM(pkt);
00534
00535
00536 if ( (have_tsi_state_ == true) && !(hp->tsi_.isEqual (tsi_)) ) {
00537 printf("%s received NCF with incorrect TSI, discarding.\n", uname_);
00538 return;
00539 }
00540
00541
00542
00543
00544
00545
00546
00547
00548
00549
00550
00551
00552 map<int, NakItem>::iterator result = naks_.find(hp->seqno_);
00553
00554 if (result == naks_.end()) {
00555
00556 printf("%s received NCF but no NAK state found, discarding.\n", uname_);
00557 return;
00558 }
00559
00560 NakItem *nitem = &((*result).second);
00561
00562 switch( nitem->nak_state() ) {
00563 case BACK_OFF_STATE:
00564
00565 nitem->nak_state() = WAIT_DATA_STATE;
00566
00567
00568 nitem->nak_timer().resched(nak_rdata_ivl_);
00569
00570 break;
00571 case WAIT_NCF_STATE:
00572
00573 nitem->nak_state() = WAIT_DATA_STATE;
00574
00575
00576 nitem->nak_timer().resched(nak_rdata_ivl_);
00577 break;
00578 case WAIT_DATA_STATE:
00579
00580
00581
00582 nitem->nak_timer().resched(nak_rdata_ivl_);
00583
00584 break;
00585 default:
00586 printf("ERROR (PgmReceiver::handle_ncf): Unknown nak state %d.\n", nitem->nak_state());
00587 return;
00588 }
00589
00590 }
00591
00592 void PgmReceiver::generate_Nak(int seqno)
00593 {
00594 #ifdef PGM_DEBUG
00595 double now = Scheduler::instance().clock();
00596 printf("at %f %s generating NAK state for seqno %d.\n", now, uname_, seqno);
00597 #endif
00598
00599
00600 pair<map<int, NakItem>::iterator, bool> result;
00601
00602 result = naks_.insert(pair<int, NakItem>(seqno, NakItem(this, seqno)));
00603
00604 NakItem *nitem = &(result.first->second);
00605
00606 if (result.second == true) {
00607
00608 double backoff = Random::uniform(nak_bo_ivl_);
00609
00610
00611 nitem->nak_timer().data() = nitem;
00612
00613
00614 nitem->nak_timer().resched(backoff);
00615
00616 printf("backoff: %f\n", backoff);
00617 trace_event("DETECT", backoff);
00618 }
00619 else {
00620 printf("%s generate_Nak was called with NAK state already established, ignoring.\n", uname_);
00621 }
00622
00623 }
00624
00625 void PgmReceiver::cancel_Nak(int seqno, NakItem *nitem)
00626 {
00627
00628 if (nitem == NULL) {
00629
00630 map<int, NakItem>::iterator result = naks_.find(seqno);
00631
00632 if (result == naks_.end()) {
00633
00634
00635 return;
00636 }
00637
00638 nitem = &((*result).second);
00639 }
00640
00641
00642 nitem->nak_timer().force_cancel();
00643
00644
00645 if (!naks_.erase(seqno)) {
00646 printf("ERROR (PgmReceiver::cancel_Nak): Failed erasing seqno from nak map.\n");
00647 }
00648
00649 }
00650
00651 void PgmReceiver::timeout_nak(NakItem *nitem)
00652 {
00653
00654 double backoff;
00655
00656 switch(nitem->nak_state()) {
00657 case BACK_OFF_STATE:
00658
00659 if (nitem->nak_sent() == false) {
00660 nitem->nak_sent() = true;
00661 }
00662 else {
00663 stats_.naks_duplicated_++;
00664 }
00665
00666
00667 nitem->nak_state() = WAIT_NCF_STATE;
00668
00669
00670 nitem->nak_timer().resched(nak_rpt_ivl_);
00671
00672 send_nak(nitem->seqno());
00673
00674 break;
00675 case WAIT_NCF_STATE:
00676
00677
00678
00679 if (nitem->ncf_retry_count() > max_nak_ncf_retries_) {
00680
00681
00682 printf("%s reached max_nak_ncf_retries, stopping NAK generation.\n", uname_);
00683 cancel_Nak(nitem->seqno(), nitem);
00684 return;
00685 }
00686
00687 nitem->ncf_retry_count() += 1;
00688
00689
00690 nitem->nak_state() = BACK_OFF_STATE;
00691
00692
00693 backoff = Random::uniform(nak_bo_ivl_);
00694
00695
00696 nitem->nak_timer().resched(backoff);
00697
00698 break;
00699 case WAIT_DATA_STATE:
00700
00701
00702 if (nitem->data_retry_count() > max_nak_data_retries_) {
00703 printf("%s reached max_nak_data_retries, stopping NAK generation.\n", uname_);
00704 cancel_Nak(nitem->seqno(), nitem);
00705 return;
00706 }
00707
00708 nitem->data_retry_count() += 1;
00709
00710
00711 nitem->nak_state() = BACK_OFF_STATE;
00712
00713
00714 backoff = Random::uniform(nak_bo_ivl_);
00715
00716
00717 nitem->nak_timer().resched(backoff);
00718
00719 break;
00720 default:
00721 printf("ERROR (PgmReceiver::timeout_nak): Unknown NAK state %d.\n", nitem->nak_state());
00722 break;
00723 }
00724
00725 }
00726
00727 void PgmReceiver::send_nak(int seqno)
00728 {
00729 printf("%s send_nak is called.\n", uname_);
00730
00731 Packet *nak_pkt = allocpkt();
00732
00733 hdr_cmn *nak_hc = HDR_CMN(nak_pkt);
00734 nak_hc->size_ = sizeof(hdr_pgm) + sizeof(hdr_pgm_nak);
00735 nak_hc->ptype_ = PT_PGM;
00736
00737
00738 hdr_ip *nak_hip = HDR_IP(nak_pkt);
00739 nak_hip->dst() = upstream_node_;
00740
00741
00742 nak_hip->fid_ = 8;
00743
00744
00745 hdr_pgm *nak_hp = HDR_PGM(nak_pkt);
00746 nak_hp->type_ = PGM_NAK;
00747 nak_hp->tsi_ = tsi_;
00748 nak_hp->seqno_ = seqno;
00749
00750
00751 hdr_pgm_nak *nak_hpn = HDR_PGM_NAK(nak_pkt);
00752 nak_hpn->source_ = source_;
00753 nak_hpn->group_ = group_;
00754
00755
00756
00757 stats_.naks_transmitted_++;
00758
00759
00760 send(nak_pkt, 0);
00761
00762
00763
00764 }
00765
00766 void PgmReceiver::print_stats()
00767 {
00768 printf("%s:\n", uname_);
00769 printf("\tLast packet:\t\t%d\n", rcvbuf_.nextpkt_-1);
00770 printf("\tMax packet:\t\t%d\n", rcvbuf_.maxpkt_);
00771 if (rcvbuf_.pkts_recovered_) {
00772 printf("\tPackets recovered:\t%d\n", rcvbuf_.pkts_recovered_);
00773 printf("\tLatency (min, max, avg):\t%lf, %lf, %lf\n",
00774 rcvbuf_.min_delay_, rcvbuf_.max_delay_,
00775 rcvbuf_.delay_sum_ / rcvbuf_.pkts_recovered_);
00776 }
00777 if (rcvbuf_.duplicates_) {
00778 printf("\tDuplicate RDATA:\t%d\n", rcvbuf_.duplicates_);
00779 }
00780 printf("\tTotal NAKs sent:\t%d\n", stats_.naks_transmitted_);
00781 printf("\tRetransmitted NAKs:\t%d\n", stats_.naks_duplicated_);
00782 }
00783
00784 #ifdef PGM_DEBUG
00785 void PgmReceiver::display_packet(Packet *pkt)
00786 {
00787
00788 double now = Scheduler::instance().clock();
00789
00790 hdr_ip *hip = HDR_IP(pkt);
00791 hdr_cmn *hc = HDR_CMN(pkt);
00792
00793 printf("at %f %s received packet type ", now, uname_);
00794
00795 hdr_pgm *hp = HDR_PGM(pkt);
00796
00797 hdr_pgm_spm *hps;
00798 hdr_pgm_nak *hpn;
00799
00800 switch(hp->type_) {
00801 case PGM_SPM:
00802 hps = HDR_PGM_SPM(pkt);
00803
00804 printf("SPM (TSI %d:%d) from %d:%d to %d:%d iface %d, size %d, seqno %d, spm_path %d:%d\n", hp->tsi_.addr_, hp->tsi_.port_, hip->saddr(), hip->sport(), hip->daddr(), hip->dport(), hc->iface(), hc->size(), hp->seqno_, hps->spm_path_.addr_, hps->spm_path_.port_);
00805
00806 break;
00807 case PGM_ODATA:
00808 printf("ODATA (TSI %d:%d) from %d:%d to %d:%d iface %d, size %d, seqno %d\n", hp->tsi_.addr_, hp->tsi_.port_, hip->saddr(), hip->sport(), hip->daddr(), hip->dport(), hc->iface(), hc->size(), hp->seqno_);
00809
00810 break;
00811 case PGM_RDATA:
00812 printf("RDATA (TSI %d:%d) from %d:%d to %d:%d iface %d, size %d, seqno %d\n", hp->tsi_.addr_, hp->tsi_.port_, hip->saddr(), hip->sport(), hip->daddr(), hip->dport(), hc->iface(), hc->size(), hp->seqno_);
00813
00814 break;
00815 case PGM_NAK:
00816 hpn = HDR_PGM_NAK(pkt);
00817
00818 printf("NAK (TSI %d:%d) from %d:%d to %d:%d iface %d, size %d, seqno %d, source %d:%d, group %d:%d\n", hp->tsi_.addr_, hp->tsi_.port_, hip->saddr(), hip->sport(), hip->daddr(), hip->dport(), hc->iface(), hc->size(), hp->seqno_, hpn->source_.addr_, hpn->source_.port_, hpn->group_.addr_, hpn->group_.port_);
00819
00820 break;
00821 case PGM_NCF:
00822 printf("NCF (TSI %d:%d) from %d:%d to %d:%d iface %d, size %d, seqno %d\n", hp->tsi_.addr_, hp->tsi_.port_, hip->saddr(), hip->sport(), hip->daddr(), hip->dport(), hc->iface(), hc->size(), hp->seqno_);
00823
00824 break;
00825 default:
00826 printf("UNKNOWN (TSI %d:%d) from %d:%d to %d:%d iface %d, size %d, seqno %d\n", hp->tsi_.addr_, hp->tsi_.port_, hip->saddr(), hip->sport(), hip->daddr(), hip->dport(), hc->iface(), hc->size(), hp->seqno_);
00827
00828 break;
00829 }
00830
00831 }
00832 #endif // PGM_DEBUG
00833
00834 #endif //HAVE_STL