00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040
00041
00042
00043
00044
00045
00046
00047
00048
00049 #ifndef lint
00050 static const char rcsid[] =
00051 "@(#) $Header: /nfs/jade/vint/CVSROOT/ns-2/xcp/xcp-end-sys.cc,v 1.11 2005/09/21 20:52:48 haldar Exp $";
00052 #endif
00053
00054 #include <stdio.h>
00055 #include <stdlib.h>
00056 #include <sys/types.h>
00057
00058 #include "ip.h"
00059 #include "tcp.h"
00060 #include "flags.h"
00061
00062 #include "agent.h"
00063 #include "packet.h"
00064
00065 #include "flags.h"
00066 #include "tcp-sink.h"
00067 #include "xcp-end-sys.h"
00068
00069
00070 #define TRACE 0 // when 0, we don't print any debugging info.
00071
00072 void cwndShrinkingTimer::expire(Event*)
00073 {
00074 a_->timeout(TCP_TIMER_DELSND);
00075 }
00076
00077 int hdr_xcp::offset_;
00078 static unsigned int next_xcp = 0;
00079
00080 static class XCPHeaderClass : public PacketHeaderClass {
00081 public:
00082 XCPHeaderClass() : PacketHeaderClass("PacketHeader/XCP",
00083 sizeof(hdr_xcp)) {
00084 bind_offset(&hdr_xcp::offset_);
00085 }
00086 } class_xcphdr;
00087
00088 static class XcpRenoTcpClass : public TclClass {
00089 public:
00090 XcpRenoTcpClass() : TclClass("Agent/TCP/Reno/XCP") {}
00091 TclObject* create(int, const char*const*) {
00092 return (new XcpAgent());
00093 }
00094 } class_xcp;
00095
00096 static class XcpSinkClass : public TclClass {
00097 public:
00098 XcpSinkClass() : TclClass("Agent/XCPSink") {}
00099 TclObject* create(int, const char*const*) {
00100 return (new XcpSink(new Acker));
00101 }
00102 } class_xcpsink;
00103
00104 XcpAgent::XcpAgent(): RenoTcpAgent(), shrink_cwnd_timer_(this)
00105 {
00106 tcpId_ = next_xcp;
00107 next_xcp++;
00108 init_rtt_vars();
00109 current_positive_feedback_ = 0.0;
00110 xcp_srtt_ = 0;
00111 type_ = PT_XCP;
00112
00113 xcp_sparse_seqno_ = -1;
00114 }
00115
00116 void
00117 XcpAgent::delay_bind_init_all()
00118 {
00119 delay_bind_init_one("xcp_sparse_");
00120
00121 RenoTcpAgent::delay_bind_init_all();
00122 }
00123
00124 int
00125 XcpAgent::delay_bind_dispatch(const char *varName,
00126 const char *localName,
00127 TclObject *tracer)
00128 {
00129 if (delay_bind_bool(varName, localName,
00130 "xcp_sparse_", &xcp_sparse_,
00131 tracer))
00132 return TCL_OK;
00133
00134 return RenoTcpAgent::delay_bind_dispatch(varName, localName, tracer);
00135 }
00136
00137
00138
00139 void XcpAgent::output(int seqno, int reason)
00140 {
00141 int force_set_rtx_timer = 0;
00142 Packet* p = allocpkt();
00143 hdr_tcp *tcph = hdr_tcp::access(p);
00144 hdr_flags* hf = hdr_flags::access(p);
00145 tcph->seqno() = seqno;
00146 tcph->ts() = Scheduler::instance().clock();
00147 tcph->ts_echo() = ts_peer_;
00148 tcph->reason() = reason;
00149 if (ecn_) {
00150 hf->ect() = 1;
00151 }
00152 if (cong_action_) {
00153 hf->cong_action() = TRUE;
00154 cong_action_ = FALSE;
00155 }
00156
00157
00158 hdr_xcp *xh = hdr_xcp::access(p);
00159
00160 if ( (xh->xcp_sparse_ = xcp_sparse_) ) {
00161 if (xcp_sparse_seqno_ < 0)
00162 xcp_sparse_seqno_ = tcph->seqno();
00163 else {
00164 xh->xcp_enabled_ = hdr_xcp::XCP_ACK;
00165 xh->delta_throughput_ = 0;
00166
00167
00168
00169
00170
00171 goto xcp_sparse_skip;
00172 }
00173 }
00174
00175 xh->xcp_enabled_ = hdr_xcp::XCP_ENABLED;
00176 xh->cwnd_ = double(cwnd_);
00177 xh->rtt_ = srtt_estimate_;
00178 xh->xcpId_ = tcpId_;
00179
00180 #define MAX_THROUGHPUT 1e24
00181 if (srtt_estimate_ != 0) {
00182 xh->throughput_ = window() * size_ / srtt_estimate_;
00183 xh->delta_throughput_ = (MAX_THROUGHPUT
00184 - xh->throughput_);
00185 } else {
00186
00187 xh->throughput_ = .1;
00188 xh->delta_throughput_ = 0;
00189 }
00190
00191 if(channel_) {
00192 trace_var("throughput", xh->throughput_);
00193 }
00194 xcp_sparse_skip:
00195
00196
00197
00198 if (seqno == 0) {
00199 if (syn_) {
00200 hdr_cmn::access(p)->size() = tcpip_base_hdr_size_ + XCP_HDR_LEN;
00201 }
00202 if (ecn_) {
00203 hf->ecnecho() = 1;
00204
00205 hf->ect() = 0;
00206 }
00207 }
00208 int bytes = hdr_cmn::access(p)->size();
00209
00210
00211 if (highest_ack_ == maxseq_)
00212 force_set_rtx_timer = 1;
00213
00214 output_helper(p);
00215
00216 ++ndatapack_;
00217 ndatabytes_ += bytes;
00218 send(p, 0);
00219
00220 if (seqno == curseq_ && seqno > maxseq_)
00221 idle();
00222 if (seqno > maxseq_) {
00223 maxseq_ = seqno;
00224 if (!rtt_active_) {
00225 rtt_active_ = 1;
00226 if (seqno > rtt_seq_) {
00227 rtt_seq_ = seqno;
00228 rtt_ts_ = Scheduler::instance().clock();
00229 }
00230
00231 }
00232 } else {
00233 ++nrexmitpack_;
00234 nrexmitbytes_ += bytes;
00235 }
00236 if (!(rtx_timer_.status() == TIMER_PENDING) || force_set_rtx_timer)
00237
00238 set_rtx_timer();
00239 }
00240
00241
00242
00243
00244
00245
00246
00247
00248
00249 void XcpAgent::opencwnd()
00250 {
00251 if (maxcwnd_ && (cwnd_ > double(maxcwnd_)))
00252 cwnd_ = double(maxcwnd_);
00253
00254 return;
00255 }
00256
00257 void XcpAgent::recv_newack_helper(Packet *pkt) {
00258 newack(pkt);
00259
00260
00261 if (channel_)
00262 trace_var("xcp_sparse_seqno_", xcp_sparse_seqno_);
00263 if (xcp_sparse_) {
00264 hdr_tcp *tcph = hdr_tcp::access(pkt);
00265 if (xcp_sparse_seqno_ == tcph->seqno()) {
00266 xcp_sparse_seqno_ = -1;
00267 }
00268 }
00269
00270 hdr_xcp *xh = hdr_xcp::access(pkt);
00271 if (xh->xcp_enabled_ != hdr_xcp::XCP_DISABLED) {
00272 if(channel_) {
00273 trace_var("reverse_feedback_", xh->reverse_feedback_);
00274 trace_var("controlling_hop_", xh->controlling_hop_);
00275 }
00276
00277 double delta_cwnd = 0;
00278
00279 delta_cwnd = (xh->reverse_feedback_
00280 * srtt_estimate_
00281 / size_);
00282
00283
00284 double newcwnd = (cwnd_ + delta_cwnd);
00285
00286 if (newcwnd < 1.0)
00287 newcwnd = 1.0;
00288 if (maxcwnd_ && (newcwnd > double(maxcwnd_)))
00289 newcwnd = double(maxcwnd_);
00290 cwnd_ = newcwnd;
00291 if (channel_)
00292 trace_var("newcwnd", newcwnd);
00293
00294 }
00295
00296
00297
00298
00299
00300
00301
00302
00303
00304
00305
00306
00307
00308
00309
00310
00311
00312
00314
00315
00316
00317
00318
00319
00320
00321
00322
00323 if (!ect_ || !hdr_flags::access(pkt)->ecnecho() ||
00324 (old_ecn_ && ecn_burst_)) {
00325
00326
00327
00328
00329
00330
00331 if (!control_increase_ ||
00332 (control_increase_ && (network_limited() == 1)))
00333 opencwnd();
00334 }
00335 if (ect_) {
00336 if (!hdr_flags::access(pkt)->ecnecho())
00337 ecn_backoff_ = 0;
00338 if (!ecn_burst_ && hdr_flags::access(pkt)->ecnecho())
00339 ecn_burst_ = TRUE;
00340 else if (ecn_burst_ && ! hdr_flags::access(pkt)->ecnecho())
00341 ecn_burst_ = FALSE;
00342 }
00343 if (!ect_ && hdr_flags::access(pkt)->ecnecho() &&
00344 !hdr_flags::access(pkt)->cong_action())
00345 ect_ = 1;
00346
00347 if ((highest_ack_ >= curseq_-1) && !closed_) {
00348 closed_ = 1;
00349 finish();
00350 }
00351 if (QOption_ && curseq_ == highest_ack_ +1) {
00352 cancel_rtx_timer();
00353 }
00354 }
00355
00356 void XcpAgent::rtt_update(double tao)
00357 {
00358 #define FIX1 1
00359 double now = Scheduler::instance().clock();
00360 double sendtime = now - tao;
00361 if (ts_option_) {
00362 #if FIX1
00363 int send_tick = int(sendtime/tcp_tick_);
00364 int recv_tick = int(now/tcp_tick_);
00365 t_rtt_ = recv_tick - send_tick;
00366 #else
00367 t_rtt_ = int(tao /tcp_tick_ + 0.5);
00368 #endif
00369 } else {
00370
00371
00372
00373
00374
00375
00376 sendtime += boot_time_;
00377 double tickoff = fmod(sendtime, tcp_tick_);
00378 t_rtt_ = int((tao + tickoff) / tcp_tick_);
00379 }
00380
00381 assert(t_rtt_ >= 0);
00382 if (xcp_srtt_ != 0)
00383 xcp_srtt_ = XCP_UPDATE_SRTT(xcp_srtt_, t_rtt_);
00384 else
00385 xcp_srtt_ = XCP_INIT_SRTT(t_rtt_);
00386 if (xcp_srtt_ == 0)
00387 xcp_srtt_ = 0;
00388 srtt_estimate_ = double(xcp_srtt_) * tcp_tick_ / double(1 << XCP_RTT_SHIFT);
00389
00390 if (TRACE) {
00391 printf("%d: %g SRTT %g, RTT %g \n", tcpId_, now, srtt_estimate_, tao);
00392 }
00393
00394
00395
00396 if (t_rtt_ < 1)
00397 t_rtt_ = 1;
00398
00399
00400
00401
00402
00403 if (t_srtt_ != 0) {
00404 register short delta;
00405
00406 delta = t_rtt_ - (t_srtt_ >> T_SRTT_BITS);
00407
00408 if ((t_srtt_ += delta) <= 0)
00409 t_srtt_ = 1;
00410 if (delta < 0)
00411 delta = -delta;
00412 delta -= (t_rttvar_ >> T_RTTVAR_BITS);
00413 if ((t_rttvar_ += delta) <= 0)
00414 t_rttvar_ = 1;
00415 } else {
00416 t_srtt_ = t_rtt_ << T_SRTT_BITS;
00417 t_rttvar_ = t_rtt_ << (T_RTTVAR_BITS-1);
00418 }
00419
00420 t_rtxcur_ = (((t_rttvar_ << (rttvar_exp_ + (T_SRTT_BITS - T_RTTVAR_BITS))) +
00421 t_srtt_) >> T_SRTT_BITS ) * tcp_tick_;
00422
00423 return;
00424 }
00425
00426 void XcpAgent::rtt_init()
00427 {
00428 TcpAgent::rtt_init();
00429
00430 init_rtt_vars();
00431 rtt_active_ = 0;
00432 rtt_seq_ = -1;
00433
00434 }
00435
00436 void XcpAgent::trace_var(char * var_name, double var)
00437 {
00438 char wrk[500];
00439 if (channel_) {
00440 int n;
00441 sprintf(wrk, "%g x x x x %s %g",time_now(),var_name, var);
00442 n = strlen(wrk);
00443 wrk[n] = '\n';
00444 wrk[n+1] = 0;
00445 (void)Tcl_Write(channel_, wrk, n+1);
00446 }
00447 }
00448
00449
00450 XcpSink::XcpSink(Acker* acker) : Agent(PT_ACK), acker_(acker), save_(NULL),
00451 lastreset_(0.0)
00452 {
00453 }
00454
00455 void
00456 XcpSink::delay_bind_init_all()
00457 {
00458 delay_bind_init_one("packetSize_");
00459 delay_bind_init_one("ts_echo_bugfix_");
00460 delay_bind_init_one("ts_echo_rfc1323_");
00461 delay_bind_init_one("bytes_");
00462 delay_bind_init_one("RFC2581_immediate_ack_");
00463
00464 Agent::delay_bind_init_all();
00465 }
00466
00467 int
00468 XcpSink::delay_bind_dispatch(const char *varName,
00469 const char *localName,
00470 TclObject *tracer)
00471 {
00472 if (delay_bind(varName, localName,
00473 "packetSize_", &size_,
00474 tracer))
00475 return TCL_OK;
00476 if (delay_bind_bool(varName, localName,
00477 "ts_echo_bugfix_", &ts_echo_bugfix_,
00478 tracer))
00479 return TCL_OK;
00480 if (delay_bind_bool(varName, localName,
00481 "ts_echo_rfc1323_", &ts_echo_rfc1323_,
00482 tracer))
00483 return TCL_OK;
00484 if (delay_bind_bool(varName, localName,
00485 "RFC2581_immediate_ack_", &RFC2581_immediate_ack_,
00486 tracer))
00487 return TCL_OK;
00488
00489 return Agent::delay_bind_dispatch(varName, localName, tracer);
00490 }
00491
00492 int XcpSink::command(int argc, const char*const* argv)
00493 {
00494 if (argc == 2) {
00495 if (strcmp(argv[1], "reset") == 0) {
00496 reset();
00497 return (TCL_OK);
00498 }
00499 }
00500 return (Agent::command(argc, argv));
00501 }
00502
00503 void XcpSink::reset()
00504 {
00505 acker_->reset();
00506 save_ = NULL;
00507 lastreset_ = Scheduler::instance().clock();
00508
00509
00510 }
00511
00512 void XcpSink::ack(Packet* opkt)
00513 {
00514 Packet* npkt = allocpkt();
00515 double now = Scheduler::instance().clock();
00516
00517 hdr_tcp *otcp = hdr_tcp::access(opkt);
00518 hdr_tcp *ntcp = hdr_tcp::access(npkt);
00519 ntcp->seqno() = acker_->Seqno();
00520 ntcp->ts() = now;
00521
00522 if (ts_echo_bugfix_)
00523 ntcp->ts_echo() = acker_->ts_to_echo();
00524 else
00525 ntcp->ts_echo() = otcp->ts();
00526
00527 hdr_ip* oip = HDR_IP(opkt);
00528 hdr_ip* nip = HDR_IP(npkt);
00529 nip->flowid() = oip->flowid();
00530
00531 hdr_flags* of = hdr_flags::access(opkt);
00532 hdr_flags* nf = hdr_flags::access(npkt);
00533 hdr_flags *sf;
00534 if (save_ != NULL)
00535 sf = hdr_flags::access(save_);
00536 else
00537 sf = 0;
00538
00539 if ( (save_ != NULL && sf->cong_action()) || of->cong_action() )
00540
00541 acker_->update_ecn_unacked(0);
00542 if ( (sf != 0 && sf->ect() && sf->ce()) ||
00543 (of->ect() && of->ce()) )
00544
00545 acker_->update_ecn_unacked(1);
00546 if ( (sf != 0 && sf->ect()) || of->ect() )
00547
00548 nf->ecnecho() = acker_->ecn_unacked();
00549 if (!of->ect() && of->ecnecho() ||
00550 (sf != 0 && !sf->ect() && sf->ecnecho()) )
00551
00552
00553
00554
00555 nf->ecnecho() = 1;
00556
00557
00558
00559 hdr_xcp *oxcp = hdr_xcp::access(opkt);
00560 hdr_xcp *nxcp = hdr_xcp::access(npkt);
00561 if (oxcp->xcp_enabled_ != hdr_xcp::XCP_DISABLED) {
00562 nxcp->xcp_enabled_ = hdr_xcp::XCP_ACK;
00563 nxcp->reverse_feedback_ = oxcp->delta_throughput_;
00564 nxcp->rtt_ = oxcp->rtt_;
00565 } else
00566 nxcp->xcp_enabled_ = hdr_xcp::XCP_DISABLED;
00567
00568
00569
00570 acker_->append_ack(hdr_cmn::access(npkt),
00571 ntcp, otcp->seqno());
00572 add_to_ack(npkt);
00573
00574 acker_->last_ack_sent_ = ntcp->seqno();
00575
00576 send(npkt, 0);
00577 }
00578
00579 void XcpSink::add_to_ack(Packet*)
00580 {
00581 return;
00582 }
00583
00584 void XcpSink::recv(Packet* pkt, Handler*)
00585 {
00586 int numToDeliver;
00587
00588 int numBytes = hdr_cmn::access(pkt)->size();
00589 hdr_tcp *th = hdr_tcp::access(pkt);
00590
00591
00592 if (th->ts() < lastreset_) {
00593
00594 Packet::free(pkt);
00595 return;
00596 }
00597
00598 acker_->update_ts(th->seqno(),th->ts(),ts_echo_rfc1323_);
00599
00600
00601
00602
00603 numToDeliver = acker_->update(th->seqno(), numBytes);
00604
00605
00606 if (numToDeliver)
00607 recvBytes(numToDeliver);
00608
00609
00610 ack(pkt);
00611
00612
00613 Packet::free(pkt);
00614 }