00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040
00041
00042
00043
00044
00045
00046
00047
00048
00049
00050
00051
00052
00053
00054
00055
00056
00057
00058 #include "config.h"
00059 #ifdef HAVE_STL
00060
00061 #include <stdlib.h>
00062 #include <stdio.h>
00063
00064 #include <map>
00065 #include <list>
00066 #include <algorithm>
00067
00068 #include "config.h"
00069 #include "tclcl.h"
00070 #include "agent.h"
00071 #include "packet.h"
00072 #include "ip.h"
00073 #include "random.h"
00074 #include "basetrace.h"
00075
00076 #include "pgm.h"
00077
00078
00079
00080
00081
00082
00083 int hdr_pgm::offset_;
00084 int hdr_pgm_spm::offset_;
00085 int hdr_pgm_nak::offset_;
00086
00087
00088 class PGMHeaderClass : public PacketHeaderClass {
00089 public:
00090 PGMHeaderClass() : PacketHeaderClass("PacketHeader/PGM", sizeof(hdr_pgm)) {
00091 bind_offset(&hdr_pgm::offset_);
00092 }
00093
00094 } class_pgmhdr;
00095
00096
00097 class PGM_SPMHeaderClass : public PacketHeaderClass {
00098 public:
00099 PGM_SPMHeaderClass() : PacketHeaderClass("PacketHeader/PGM_SPM",
00100 sizeof(hdr_pgm_spm)) {
00101 bind_offset(&hdr_pgm_spm::offset_);
00102 }
00103
00104 } class_pgm_spmhdr;
00105
00106
00107 class PGM_NAKHeaderClass : public PacketHeaderClass {
00108 public:
00109 PGM_NAKHeaderClass() : PacketHeaderClass("PacketHeader/PGM_NAK",
00110 sizeof(hdr_pgm_nak)) {
00111 bind_offset(&hdr_pgm_nak::offset_);
00112 }
00113
00114 } class_pgm_nakhdr;
00115
00116
00117
00118
00119
00120 class PgmAgent;
00121
00122
00123 enum {
00124 TIMER_NAK_RETRANS = 0,
00125 TIMER_NAK_RPT = 1,
00126 TIMER_NAK_RDATA = 2,
00127 TIMER_NAK_ELIM = 3
00128 };
00129
00130 class PgmAgentTimer : public TimerHandler {
00131 public:
00132 PgmAgentTimer(PgmAgent *a, int type) : TimerHandler(), data_(NULL) {
00133 a_ = a;
00134 type_ = type;
00135 }
00136
00137 void * &data() { return data_; }
00138
00139 protected:
00140 virtual void expire(Event *e);
00141 PgmAgent *a_;
00142 int type_;
00143 void *data_;
00144 };
00145
00146
00147
00148
00149
00150
00151 enum {
00152 NAK_PENDING = 0,
00153 NAK_CONFIRMED = 1
00154 };
00155
00156 class StateInfo;
00157
00158 class RepairState {
00159 public:
00160 RepairState(PgmAgent *a, StateInfo *sinfo, int seqno, ns_addr_t &source,
00161 ns_addr_t &group) : seqno_(seqno), source_(source),
00162 group_(group),
00163 nak_state_(NAK_PENDING), nak_elimination_(true),
00164 nak_retrans_timer_(a, TIMER_NAK_RETRANS),
00165 nak_rpt_timer_(a, TIMER_NAK_RPT),
00166 nak_rdata_timer_(a, TIMER_NAK_RDATA),
00167 nak_elim_timer_(a, TIMER_NAK_ELIM),
00168 sinfo_(sinfo)
00169 { }
00170
00171 int & seqno() { return seqno_; }
00172 ns_addr_t & source() { return source_; }
00173 ns_addr_t & group() { return group_; }
00174
00175 int & nak_state() { return nak_state_; }
00176 bool & nak_elimination() { return nak_elimination_; }
00177 PgmAgentTimer & nak_retrans_timer() { return nak_retrans_timer_; }
00178 PgmAgentTimer & nak_rpt_timer() { return nak_rpt_timer_; }
00179 PgmAgentTimer & nak_rdata_timer() { return nak_rdata_timer_; }
00180 PgmAgentTimer & nak_elim_timer() { return nak_elim_timer_; }
00181 list<int> & iface_list() { return iface_list_; }
00182 list<NsObject *> & agent_list() { return agent_list_; }
00183 StateInfo * & sinfo() { return sinfo_; }
00184
00185 protected:
00186
00187
00188 int seqno_;
00189
00190 ns_addr_t source_;
00191 ns_addr_t group_;
00192
00193 int nak_state_;
00194
00195
00196
00197
00198
00199 bool nak_elimination_;
00200
00201
00202 PgmAgentTimer nak_retrans_timer_;
00203
00204
00205
00206 PgmAgentTimer nak_rpt_timer_;
00207
00208
00209
00210
00211 PgmAgentTimer nak_rdata_timer_;
00212
00213
00214
00215
00216
00217
00218 PgmAgentTimer nak_elim_timer_;
00219
00220
00221 list<int> iface_list_;
00222
00223
00224 list<NsObject *> agent_list_;
00225
00226
00227
00228 StateInfo *sinfo_;
00229
00230 };
00231
00232
00233
00234
00235
00236 class StateInfo {
00237 public:
00238
00239 StateInfo(ns_addr_t tsi) : tsi_(tsi), spm_seqno_(-1) { }
00240
00241
00242 int operator<(const StateInfo &right) const {
00243 return ((tsi_.addr_ < right.tsi_.addr_) || ( (tsi_.addr_==right.tsi_.addr_) && (tsi_.port_ < right.tsi_.port_)));
00244 }
00245
00246 ns_addr_t & tsi() { return tsi_; }
00247 int & spm_seqno() { return spm_seqno_; }
00248 ns_addr_t & upstream_node() { return upstream_node_; }
00249 int & upstream_iface() { return upstream_iface_; }
00250 map<int, RepairState> & repair() { return repair_; }
00251
00252 protected:
00253
00254 ns_addr_t tsi_;
00255 int spm_seqno_;
00256 ns_addr_t upstream_node_;
00257 int upstream_iface_;
00258
00259
00260
00261 map<int, RepairState> repair_;
00262
00263 };
00264
00265
00266
00267
00268
00269
00270
00271 struct Stats {
00272 int num_unsolicited_ncf_;
00273 int num_unsolicited_rdata_;
00274 int num_suppressed_naks_;
00275 int num_naks_transmitted_;
00276 };
00277
00278
00279 static int pgm_agent_uid_ = 0;
00280
00281 class PgmAgent : public Agent {
00282 public:
00283 PgmAgent();
00284 virtual void recv(Packet *, Handler *);
00285 virtual void timeout(int type, void *data);
00286 virtual int command(int argc, const char*const* argv);
00287
00288 protected:
00289
00290 void handle_spm(Packet *pkt);
00291 void handle_odata(Packet *pkt);
00292 void handle_rdata(Packet *pkt);
00293 void handle_nak(Packet *pkt);
00294 void handle_ncf(Packet *pkt);
00295
00296 void send_nak(ns_addr_t &upstream_node, ns_addr_t &tsi, int seqno, ns_addr_t &source, ns_addr_t &group);
00297
00298 void timeout_nak_retrans(RepairState *rstate);
00299 void timeout_nak_rpt(RepairState *rstate);
00300 void timeout_nak_rdata(RepairState *rstate);
00301 void timeout_nak_elim(RepairState *rstate);
00302
00303 void remove_repair_state(RepairState *rstate);
00304
00305 void print_stats();
00306
00307 void PgmAgent::trace_event(char *evType, double evTime);
00308
00309 #ifdef PGM_DEBUG
00310 void display_packet(Packet *pkt);
00311 #endif
00312
00313 NsObject* iface2link(int iface);
00314 NsObject* pkt2agent (Packet *pkt);
00315
00316 StateInfo * find_TSI(ns_addr_t &tsi);
00317 StateInfo * insert_TSI(ns_addr_t &tsi);
00318
00319 EventTrace * et_;
00320
00321 int pgm_enabled_;
00322
00323 char uname_[16];
00324
00325 Stats stats_;
00326
00327
00328 list<StateInfo> state_list_;
00329
00330
00331
00332 double nak_retrans_ival_;
00333
00334
00335
00336
00337 double nak_rpt_ival_;
00338
00339
00340
00341 double nak_rdata_ival_;
00342
00343
00344
00345
00346 double nak_elim_ival_;
00347
00348 };
00349
00350 static class PgmClass : public TclClass {
00351 public:
00352 PgmClass() : TclClass("Agent/PGM") {}
00353 TclObject * create(int argc, const char * const * argv) {
00354 return (new PgmAgent());
00355 }
00356 } class_pgm_agent;
00357
00358 void PgmAgentTimer::expire(Event *e) {
00359 a_->timeout(type_, data_);
00360 }
00361
00362
00363 PgmAgent::PgmAgent() : Agent(PT_PGM), pgm_enabled_(1)
00364 {
00365
00366 sprintf (uname_, "pgmAgent-%d", pgm_agent_uid_++);
00367
00368
00369 stats_.num_unsolicited_ncf_ = 0;
00370 stats_.num_unsolicited_rdata_ = 0;
00371 stats_.num_suppressed_naks_ = 0;
00372 stats_.num_naks_transmitted_ = 0;
00373
00374 bind("pgm_enabled_", &pgm_enabled_);
00375 bind_time("nak_retrans_ival_", &nak_retrans_ival_);
00376 bind_time("nak_rpt_ival_", &nak_rpt_ival_);
00377 bind_time("nak_rdata_ival_", &nak_rdata_ival_);
00378 bind_time("nak_elim_ival_", &nak_elim_ival_);
00379
00380 et_ = (EventTrace *) NULL;
00381 }
00382
00383
00384 void PgmAgent::recv(Packet* pkt, Handler*)
00385 {
00386 hdr_pgm *hp = HDR_PGM(pkt);
00387
00388 if (!pgm_enabled_) {
00389 target_->recv(pkt);
00390 return;
00391 }
00392
00393 hdr_cmn *hc = HDR_CMN(pkt);
00394
00395 if (hc->ptype_ != PT_PGM) {
00396 printf("%s ERROR (PgmAgent::recv): received non PGM pkt type %d, discarding.\n", uname_, hc->ptype_);
00397 Packet::free(pkt);
00398 return;
00399 }
00400
00401 #ifdef PGM_DEBUG
00402 display_packet(pkt);
00403 #endif
00404
00405
00406
00407
00408 switch(hp->type_) {
00409 case PGM_SPM:
00410 handle_spm(pkt);
00411 break;
00412 case PGM_ODATA:
00413 handle_odata(pkt);
00414 break;
00415 case PGM_RDATA:
00416 handle_rdata(pkt);
00417 break;
00418 case PGM_NAK:
00419 handle_nak(pkt);
00420 break;
00421 case PGM_NCF:
00422 handle_ncf(pkt);
00423 break;
00424 default:
00425 printf("ERROR (PgmAgent::recv): Received PGM packet with unknown type %d.\n", hp->type_);
00426
00427 Packet::free(pkt);
00428
00429 break;
00430 }
00431
00432 }
00433
00434
00435 void PgmAgent::timeout(int type, void *data)
00436 {
00437
00438 switch(type) {
00439 case TIMER_NAK_RETRANS:
00440 timeout_nak_retrans((RepairState *) data);
00441 break;
00442 case TIMER_NAK_RPT:
00443 timeout_nak_rpt((RepairState *) data);
00444 break;
00445 case TIMER_NAK_RDATA:
00446 timeout_nak_rdata((RepairState *) data);
00447 break;
00448 case TIMER_NAK_ELIM:
00449 timeout_nak_elim((RepairState *) data);
00450 break;
00451 default:
00452 printf("ERROR (PgmAgent::timeout()): Invalid timeout type %d.\n", type);
00453 break;
00454 }
00455
00456 }
00457
00458
00459 int PgmAgent::command (int argc, const char*const* argv)
00460 {
00461
00462
00463 if (argc == 2) {
00464 if (strcmp(argv[1], "print-stats") == 0) {
00465 print_stats();
00466 return (TCL_OK);
00467 }
00468 }
00469 else if (argc == 3) {
00470 if (strcmp(argv[1], "eventtrace") == 0) {
00471 et_ = (EventTrace *)TclObject::lookup(argv[2]);
00472 return (TCL_OK);
00473 }
00474 }
00475
00476 return (Agent::command(argc, argv));
00477 }
00478
00479 void PgmAgent::trace_event(char *evType, double evTime) {
00480
00481 if (et_ == NULL) return;
00482 char *wrk = et_->buffer();
00483 char *nwrk = et_->nbuffer();
00484
00485 if (wrk != NULL) {
00486 sprintf(wrk, "E "TIME_FORMAT" %d %d PGM %s "TIME_FORMAT,
00487 et_->round(Scheduler::instance().clock()),
00488 addr(),
00489 0,
00490 evType,
00491 evTime);
00492 if (nwrk != 0)
00493 sprintf(nwrk,
00494 "E -t "TIME_FORMAT" -o PGM -e %s -s %d.%d -d %d.%d",
00495 et_->round(Scheduler::instance().clock()),
00496 evType,
00497 addr(),
00498 port(),
00499 0,
00500 0
00501 );
00502 et_->dump();
00503 }
00504
00505 }
00506
00507 void PgmAgent::handle_spm(Packet *pkt)
00508 {
00509
00510 hdr_pgm *hp = HDR_PGM(pkt);
00511 hdr_pgm_spm *hs = HDR_PGM_SPM(pkt);
00512 hdr_cmn *hc = HDR_CMN(pkt);
00513
00514
00515 StateInfo *state = find_TSI(hp->tsi_);
00516
00517 if (state == NULL) {
00518
00519 state = insert_TSI(hp->tsi_);
00520
00521
00522 state->spm_seqno() = hp->seqno_;
00523
00524 state->upstream_node() = hs->spm_path_;
00525 state->upstream_iface() = hc->iface();
00526 }
00527 else {
00528
00529
00530
00531 if ( state->spm_seqno() < hp->seqno_ ) {
00532
00533 state->spm_seqno() = hp->seqno_;
00534
00535 state->upstream_node() = hs->spm_path_;
00536 state->upstream_iface() = hc->iface();
00537 }
00538 else {
00539 printf("%s received an old SPM seqno, discarding.\n", uname_);
00540 Packet::free(pkt);
00541 return;
00542 }
00543 }
00544
00545
00546
00547 hs->spm_path_ = here_;
00548
00549
00550 send(pkt, 0);
00551 }
00552
00553 void PgmAgent::handle_odata(Packet *pkt)
00554 {
00555
00556
00557
00558
00559
00560 send(pkt, 0);
00561 }
00562
00563 void PgmAgent::handle_rdata(Packet *pkt)
00564 {
00565
00566
00567 hdr_pgm *hp = HDR_PGM(pkt);
00568
00569
00570 StateInfo *state = find_TSI(hp->tsi_);
00571 if (state == NULL) {
00572 printf("%s received RDATA for which no SPM state is established, discarding.\n", uname_);
00573 stats_.num_unsolicited_rdata_++;
00574 Packet::free(pkt);
00575 return;
00576 }
00577
00578
00579 map<int, RepairState>::iterator result = state->repair().find(hp->seqno_);
00580
00581 if (result == state->repair().end()) {
00582
00583 printf("%s received RDATA for which no repair state is present, discarding.\n", uname_);
00584 stats_.num_unsolicited_rdata_++;
00585 Packet::free(pkt);
00586 return;
00587 }
00588
00589 RepairState *rstate = &((*result).second);
00590
00591
00592
00593
00594
00595 if (rstate->iface_list().empty() && rstate->agent_list().empty()) {
00596 printf("%s received RDATA but repair state has no interfaces for it, discarding.\n", uname_);
00597
00598 stats_.num_unsolicited_rdata_++;
00599
00600 Packet::free(pkt);
00601 }
00602
00603 NsObject *tgt;
00604 Packet *new_pkt;
00605 int flag = 0;
00606
00607 trace_event("SEND RDATA", 0);
00608
00609
00610
00611 if (!rstate->iface_list().empty()) {
00612 list<int>::iterator iter = rstate->iface_list().begin();
00613
00614 while (iter != rstate->iface_list().end()) {
00615 if (!flag) {
00616 tgt = iface2link(*iter);
00617 if (tgt == NULL) {
00618 printf("ERROR (PgmAgent::handle_rdata): iface2link returned NULL.\n");
00619 abort();
00620 }
00621 tgt->recv(pkt);
00622 flag = 1;
00623 }
00624 else {
00625
00626
00627
00628 new_pkt = pkt->copy();
00629 tgt = iface2link(*iter);
00630 if (tgt == NULL) {
00631 printf("ERROR (PgmAgent::handle_rdata): iface2link returned NULL.\n");
00632 abort();
00633 }
00634 tgt->recv(new_pkt);
00635 }
00636
00637 iter++;
00638 }
00639 }
00640
00641 if (!rstate->agent_list().empty()) {
00642 list<NsObject *>::iterator iter = rstate->agent_list().begin();
00643
00644 while (iter != rstate->agent_list().end()) {
00645 if (!flag) {
00646 (*iter)->recv(pkt);
00647 flag = 1;
00648 }
00649 else {
00650
00651
00652
00653 new_pkt = pkt->copy();
00654 (*iter)->recv(new_pkt);
00655 }
00656
00657 iter++;
00658 }
00659 }
00660
00661
00662
00663 remove_repair_state(&((*result).second));
00664
00665 }
00666
00667 void PgmAgent::handle_nak(Packet *pkt)
00668 {
00669
00670 hdr_pgm *hp = HDR_PGM(pkt);
00671 hdr_pgm_nak *hpn = HDR_PGM_NAK(pkt);
00672 hdr_cmn *hc = HDR_CMN(pkt);
00673
00674
00675
00676 StateInfo *state = find_TSI(hp->tsi_);
00677
00678 if (state == NULL) {
00679 printf("PGM Agent received NAK for which no SPM state is established, discarding.\n");
00680 Packet::free(pkt);
00681 return;
00682 }
00683
00684
00685 Packet *ncf_pkt = allocpkt();
00686 hdr_cmn *ncf_hc = HDR_CMN(ncf_pkt);
00687 ncf_hc->size_ = sizeof(hdr_pgm);
00688 ncf_hc->ptype_ = PT_PGM;
00689 hdr_pgm *ncf_hp = HDR_PGM(ncf_pkt);
00690 ncf_hp->type_ = PGM_NCF;
00691 ncf_hp->tsi_ = hp->tsi_;
00692 ncf_hp->seqno_ = hp->seqno_;
00693
00694
00695 hdr_ip *ncf_ip = HDR_IP(ncf_pkt);
00696 ncf_ip->src() = hpn->source_;
00697
00698 ncf_ip->dst() = hpn->group_;
00699
00700
00701 ncf_ip->fid_ = 6;
00702
00703
00704
00705 NsObject *tgt;
00706
00707 if (hc->iface() < 0) {
00708 tgt = pkt2agent(pkt);
00709 if (tgt == NULL) {
00710 printf("ERROR: (PgmAgent::handle_nak) pkt2agent returned NULL.\n");
00711 abort();
00712 }
00713 tgt->recv(ncf_pkt);
00714 }
00715 else {
00716 tgt = iface2link(hc->iface());
00717 if (tgt == NULL) {
00718 printf("ERROR: (PgmAgent::handle_nak) iface2link returned NULL.\n");
00719 abort();
00720 }
00721 tgt->recv(ncf_pkt);
00722 }
00723
00724
00725
00726 pair<map<int, RepairState>::iterator, bool> result;
00727
00728 result = state->repair().insert(pair<int, RepairState>(hp->seqno_, RepairState(this, state, hp->seqno_, hpn->source_, hpn->group_)));
00729
00730 RepairState *rstate = &(result.first->second);
00731
00732 if (result.second == true) {
00733
00734
00735
00736
00737 rstate->nak_retrans_timer().data() = rstate;
00738 rstate->nak_rpt_timer().data() = rstate;
00739 rstate->nak_rdata_timer().data() = rstate;
00740 rstate->nak_elim_timer().data() = rstate;
00741
00742
00743 if (hc->iface() < 0) {
00744 rstate->agent_list().push_back(pkt2agent(pkt));
00745 }
00746 else {
00747 rstate->iface_list().push_back(hc->iface());
00748 }
00749
00750
00751 rstate->nak_retrans_timer().resched(nak_retrans_ival_);
00752
00753
00754 rstate->nak_rpt_timer().resched(nak_rpt_ival_);
00755
00756 trace_event("SEND NACK", nak_rpt_ival_);
00757
00758
00759
00760
00761
00762 }
00763 else {
00764
00765
00766 if (hc->iface() < 0) {
00767
00768
00769 list<NsObject *> *agent_list = &(rstate->agent_list());
00770 list<NsObject *>::iterator res = find(agent_list->begin(), agent_list->end(), pkt2agent(pkt));
00771
00772 if (res == agent_list->end()) {
00773 agent_list->push_back(pkt2agent(pkt));
00774 }
00775
00776 }
00777 else {
00778
00779
00780
00781 list<int> *iface_list = &(rstate->iface_list());
00782 list<int>::iterator res = find(iface_list->begin(), iface_list->end(), hc->iface());
00783
00784 if (res == iface_list->end()) {
00785
00786 iface_list->push_back(hc->iface());
00787 }
00788 }
00789
00790
00791
00792 if (rstate->nak_elimination() == false) {
00793 rstate->nak_state() = NAK_PENDING;
00794
00795 rstate->nak_retrans_timer().resched(nak_retrans_ival_);
00796 rstate->nak_rpt_timer().resched(nak_rpt_ival_);
00797
00798
00799 rstate->nak_rdata_timer().force_cancel();
00800 rstate->nak_elim_timer().force_cancel();
00801
00802 rstate->nak_elimination() = true;
00803
00804 #ifdef PGM_DEBUG
00805 printf("%s: Got NAK for seqno %d with previous NAK state, accepted.\n",
00806 uname_, hp->seqno_);
00807 #endif
00808 }
00809 else {
00810
00811 #ifdef PGM_DEBUG
00812 printf("%s: Got NAK for seqno %d but have previous NAK state, discarding.\n", uname_, hp->seqno_);
00813 #endif
00814 stats_.num_suppressed_naks_++;
00815 Packet::free(pkt);
00816 return;
00817 }
00818
00819 }
00820
00821 stats_.num_naks_transmitted_++;
00822
00823
00824 send_nak(state->upstream_node(), hp->tsi_, hp->seqno_, hpn->source_, hpn->group_);
00825
00826 Packet::free(pkt);
00827 }
00828
00829 void PgmAgent::handle_ncf(Packet *pkt)
00830 {
00831
00832 hdr_pgm *hp = HDR_PGM(pkt);
00833 hdr_cmn *hc = HDR_CMN(pkt);
00834 hdr_ip *hip = HDR_IP(pkt);
00835
00836
00837 StateInfo *state = find_TSI(hp->tsi_);
00838
00839 if (state == NULL) {
00840 printf("%s received NCF for which no SPM state is established, discarding.\n", uname_);
00841 stats_.num_unsolicited_ncf_++;
00842 Packet::free(pkt);
00843 return;
00844 }
00845
00846 if (hc->iface() != state->upstream_iface()) {
00847 printf("%s received NCF from non-upstream interface, discarding.\n", uname_);
00848 stats_.num_unsolicited_ncf_++;
00849 Packet::free(pkt);
00850 return;
00851 }
00852
00853 trace_event("SEND NCF", 0);
00854
00855
00856 map<int, RepairState>::iterator result = state->repair().find(hp->seqno_);
00857 RepairState *rstate;
00858
00859 if (result == state->repair().end()) {
00860
00861
00862
00863
00864
00865
00866 pair<map<int, RepairState>::iterator, bool> res;
00867 res = state->repair().insert(pair<int, RepairState>(hp->seqno_, RepairState(this, state, hp->seqno_, hip->src(), hip->dst())));
00868
00869 rstate = &(res.first->second);
00870
00871
00872 rstate->nak_retrans_timer().data() = rstate;
00873 rstate->nak_rpt_timer().data() = rstate;
00874 rstate->nak_rdata_timer().data() = rstate;
00875 rstate->nak_elim_timer().data() = rstate;
00876
00877 stats_.num_unsolicited_ncf_++;
00878 }
00879 else {
00880 rstate = &((*result).second);
00881
00882
00883
00884 rstate->nak_retrans_timer().force_cancel();
00885 rstate->nak_rpt_timer().force_cancel();
00886 }
00887
00888 rstate->nak_state() = NAK_CONFIRMED;
00889
00890 rstate->nak_rdata_timer().resched(nak_rdata_ival_);
00891 rstate->nak_elim_timer().resched(nak_elim_ival_);
00892
00893 Packet::free(pkt);
00894 }
00895
00896
00897 void PgmAgent::send_nak(ns_addr_t &upstream_node, ns_addr_t &tsi, int seqno, ns_addr_t &source, ns_addr_t &group)
00898 {
00899
00900 #ifdef PGM_DEBUG
00901 double now = Scheduler::instance().clock();
00902 printf("at %f %s sending NAK for seqno %d upstream.\n", now, uname_, seqno);
00903 #endif
00904
00905 Packet *nak_pkt = allocpkt();
00906
00907 hdr_cmn *nak_hc = HDR_CMN(nak_pkt);
00908 nak_hc->size_ = sizeof(hdr_pgm) + sizeof(hdr_pgm_nak);
00909 nak_hc->ptype_ = PT_PGM;
00910
00911
00912 hdr_ip *nak_hip = HDR_IP(nak_pkt);
00913 nak_hip->dst() = upstream_node;
00914
00915
00916 nak_hip->fid_ = 8;
00917
00918
00919 hdr_pgm *nak_hp = HDR_PGM(nak_pkt);
00920 nak_hp->type_ = PGM_NAK;
00921 nak_hp->tsi_ = tsi;
00922 nak_hp->seqno_ = seqno;
00923
00924
00925 hdr_pgm_nak *nak_hpn = HDR_PGM_NAK(nak_pkt);
00926 nak_hpn->source_ = source;
00927 nak_hpn->group_ = group;
00928
00929
00930 send(nak_pkt, 0);
00931 }
00932
00933
00934 void PgmAgent::timeout_nak_retrans(RepairState *rstate)
00935 {
00936
00937 stats_.num_naks_transmitted_++;
00938
00939
00940 send_nak(rstate->sinfo()->upstream_node(), rstate->sinfo()->tsi(), rstate->seqno(), rstate->source(), rstate->group());
00941
00942
00943 rstate->nak_retrans_timer().resched(nak_retrans_ival_);
00944
00945 }
00946
00947
00948 void PgmAgent::timeout_nak_rpt(RepairState *rstate)
00949 {
00950
00951 printf("%s: timeout_nak_rpt expired, removing repair state.\n", uname_);
00952
00953
00954
00955 remove_repair_state(rstate);
00956
00957 }
00958
00959
00960 void PgmAgent::timeout_nak_rdata(RepairState *rstate)
00961 {
00962
00963 printf("%s: timeout_nak_rdata expired, removing repair state.\n", uname_);
00964
00965
00966
00967 remove_repair_state(rstate);
00968
00969 }
00970
00971
00972 void PgmAgent::timeout_nak_elim(RepairState *rstate)
00973 {
00974
00975
00976 rstate->nak_elimination() = false;
00977
00978 }
00979
00980 void PgmAgent::remove_repair_state(RepairState *rstate)
00981 {
00982
00983
00984 rstate->nak_retrans_timer().force_cancel();
00985 rstate->nak_rpt_timer().force_cancel();
00986 rstate->nak_rdata_timer().force_cancel();
00987 rstate->nak_elim_timer().force_cancel();
00988
00989
00990 if (!rstate->sinfo()->repair().erase(rstate->seqno())) {
00991 printf("ERROR (PgmAgent::remove_repair_state): Did not erase seqno from map.\n");
00992 }
00993
00994 }
00995
00996 NsObject* PgmAgent::iface2link (int iface)
00997 {
00998
00999
01000 Tcl& tcl = Tcl::instance();
01001 char wrk[64];
01002
01003 if (iface == -1) {
01004 return NULL;
01005 }
01006
01007 sprintf (wrk, "[%s set node_] ifaceGetOutLink %d", name (), iface);
01008 tcl.evalc (wrk);
01009 const char* result = tcl.result ();
01010 #ifdef PGM_DEBUG
01011 printf ("[iface2link] agent %s\n", result);
01012 #endif
01013 NsObject* obj = (NsObject*)TclObject::lookup(result);
01014 return (obj);
01015 }
01016
01017 NsObject* PgmAgent::pkt2agent (Packet *pkt)
01018 {
01019 Tcl& tcl = Tcl::instance();
01020 char wrk[64];
01021 const char *result;
01022 int port;
01023 NsObject* agent;
01024 hdr_ip* ih = HDR_IP(pkt);
01025
01026
01027 port = ih->sport();
01028
01029 sprintf (wrk, "[%s set node_] agent %d", name (), port);
01030 tcl.evalc (wrk);
01031 result = tcl.result ();
01032
01033 #ifdef PGM_DEBUG
01034 printf ("[pkt2agent] port %d, agent %s\n", port, result);
01035 #endif
01036
01037 agent = (NsObject*)TclObject::lookup (result);
01038 return (agent);
01039 }
01040
01041
01042 StateInfo * PgmAgent::find_TSI(ns_addr_t &tsi)
01043 {
01044
01045
01046 list<StateInfo>::iterator iter = state_list_.begin();
01047 while(iter != state_list_.end()) {
01048 if ( (*iter).tsi().isEqual (tsi) ) {
01049 return &(*iter);
01050 }
01051 iter++;
01052 }
01053
01054 return NULL;
01055 }
01056
01057
01058
01059 StateInfo * PgmAgent::insert_TSI(ns_addr_t &tsi)
01060 {
01061 state_list_.push_back(StateInfo(tsi));
01062
01063 return &(state_list_.back());
01064 }
01065
01066 void PgmAgent::print_stats()
01067 {
01068 printf("%s:\n", uname_);
01069 printf("\tNAKs Transmitted: \t%d\n", stats_.num_naks_transmitted_);
01070 printf("\tNAKs Suppressed: \t%d\n", stats_.num_suppressed_naks_);
01071 printf("\tUnsolicited NCFs: \t%d\n", stats_.num_unsolicited_ncf_);
01072 printf("\tUnsolicited RDATA: \t%d\n", stats_.num_unsolicited_rdata_);
01073 }
01074
01075 #ifdef PGM_DEBUG
01076 void PgmAgent::display_packet(Packet *pkt)
01077 {
01078
01079 double now = Scheduler::instance().clock();
01080
01081 hdr_ip *hip = HDR_IP(pkt);
01082 hdr_cmn *hc = HDR_CMN(pkt);
01083
01084 printf("at %f %s received packet type ", now, uname_);
01085
01086 hdr_pgm *hp = HDR_PGM(pkt);
01087
01088 hdr_pgm_spm *hps;
01089 hdr_pgm_nak *hpn;
01090
01091 switch(hp->type_) {
01092 case PGM_SPM:
01093 hps = HDR_PGM_SPM(pkt);
01094
01095 printf("SPM (TSI %d:%d) from %d:%d to %d:%d iface %d, size %d, seqno %d, spm_path %d:%d\n", hp->tsi_.addr_, hp->tsi_.port_, hip->saddr(), hip->sport(), hip->daddr(), hip->dport(), hc->iface(), hc->size(), hp->seqno_, hps->spm_path_.addr_, hps->spm_path_.port_);
01096
01097 break;
01098 case PGM_ODATA:
01099 printf("ODATA (TSI %d:%d) from %d:%d to %d:%d iface %d, size %d, seqno %d\n", hp->tsi_.addr_, hp->tsi_.port_, hip->saddr(), hip->sport(), hip->daddr(), hip->dport(), hc->iface(), hc->size(), hp->seqno_);
01100
01101 break;
01102 case PGM_RDATA:
01103 printf("RDATA (TSI %d:%d) from %d:%d to %d:%d iface %d, size %d, seqno %d\n", hp->tsi_.addr_, hp->tsi_.port_, hip->saddr(), hip->sport(), hip->daddr(), hip->dport(), hc->iface(), hc->size(), hp->seqno_);
01104
01105 break;
01106 case PGM_NAK:
01107 hpn = HDR_PGM_NAK(pkt);
01108
01109 printf("NAK (TSI %d:%d) from %d:%d to %d:%d iface %d, size %d, seqno %d, source %d:%d, group %d:%d\n", hp->tsi_.addr_, hp->tsi_.port_, hip->saddr(), hip->sport(), hip->daddr(), hip->dport(), hc->iface(), hc->size(), hp->seqno_, hpn->source_.addr_, hpn->source_.port_, hpn->group_.addr_, hpn->group_.port_);
01110
01111 break;
01112 case PGM_NCF:
01113 printf("NCF (TSI %d:%d) from %d:%d to %d:%d iface %d, size %d, seqno %d\n", hp->tsi_.addr_, hp->tsi_.port_, hip->saddr(), hip->sport(), hip->daddr(), hip->dport(), hc->iface(), hc->size(), hp->seqno_);
01114
01115 break;
01116 default:
01117 printf("UNKNOWN (TSI %d:%d) from %d:%d to %d:%d iface %d, size %d, seqno %d\n", hp->tsi_.addr_, hp->tsi_.port_, hip->saddr(), hip->sport(), hip->daddr(), hip->dport(), hc->iface(), hc->size(), hp->seqno_);
01118
01119 break;
01120 }
01121
01122 }
01123 #endif // PGM_DEBUG
01124
01125 #endif //HAVE_STL