xcpq.cc

Go to the documentation of this file.
00001 // -*-  Mode:C++; c-basic-offset:8; tab-width:8; indent-tabs-mode:t -*-
00002 
00003 /*
00004  * Copyright (C) 2004 by the University of Southern California
00005  * $Id: xcpq.cc,v 1.10 2005/08/25 18:58:14 johnh Exp $
00006  *
00007  * This program is free software; you can redistribute it and/or
00008  * modify it under the terms of the GNU General Public License,
00009  * version 2, as published by the Free Software Foundation.
00010  *
00011  * This program is distributed in the hope that it will be useful,
00012  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00013  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00014  * GNU General Public License for more details.
00015  *
00016  * You should have received a copy of the GNU General Public License along
00017  * with this program; if not, write to the Free Software Foundation, Inc.,
00018  * 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA.
00019  *
00020  *
00021  * The copyright of this module includes the following
00022  * linking-with-specific-other-licenses addition:
00023  *
00024  * In addition, as a special exception, the copyright holders of
00025  * this module give you permission to combine (via static or
00026  * dynamic linking) this module with free software programs or
00027  * libraries that are released under the GNU LGPL and with code
00028  * included in the standard release of ns-2 under the Apache 2.0
00029  * license or under otherwise-compatible licenses with advertising
00030  * requirements (or modified versions of such code, with unchanged
00031  * license).  You may copy and distribute such a system following the
00032  * terms of the GNU GPL for this module and the licenses of the
00033  * other code concerned, provided that you include the source code of
00034  * that other code when and as the GNU GPL requires distribution of
00035  * source code.
00036  *
00037  * Note that people who make modified versions of this module
00038  * are not obligated to grant this special exception for their
00039  * modified versions; it is their choice whether to do so.  The GNU
00040  * General Public License gives permission to release a modified
00041  * version without this exception; this exception also makes it
00042  * possible to release a modified version which carries forward this
00043  * exception.
00044  *
00045  */
00046 
00047 #include "xcpq.h"
00048 #include "xcp.h"
00049 #include "random.h"
00050 
00051 static class XCPQClass : public TclClass {
00052 public:
00053     XCPQClass() : TclClass("Queue/DropTail/XCPQ") {}
00054     TclObject* create(int, const char*const*) {
00055         return (new XCPQueue);
00056     }
00057 } class_droptail_xcpq;
00058 
00059 
00060 const double XCPQueue::BWIDTH;
00061 const double XCPQueue::ALPHA_;
00062 const double XCPQueue::BETA_;
00063 const double XCPQueue::GAMMA_;
00064 const double XCPQueue::XCP_MAX_INTERVAL;
00065 const double XCPQueue::XCP_MIN_INTERVAL;
00066 const int    XCPQueue::BSIZE;
00067 
00068 
00069 
00070 XCPQueue::XCPQueue(): queue_timer_(NULL), 
00071               estimation_control_timer_(NULL),
00072               rtt_timer_(NULL), effective_rtt_(0.0),
00073               spread_bytes_(false)
00074 {
00075     init_vars();
00076 }
00077 
00078 void XCPQueue::setupTimers()
00079 {
00080     queue_timer_ = new XCPTimer(this, &XCPQueue::Tq_timeout);
00081     estimation_control_timer_ = new XCPTimer(this, &XCPQueue::Te_timeout);
00082     rtt_timer_ = new XCPTimer(this, &XCPQueue::everyRTT);
00083 
00084     // Scheduling timers randomly so routers are not synchronized
00085     double T;
00086   
00087     T = max(0.004, Random::normal(Tq_, 0.2 * Tq_));
00088     queue_timer_->sched(T);
00089     
00090     T = max(0.004, Random::normal(Te_, 0.2 * Te_));
00091     estimation_control_timer_->sched(T);
00092 
00093     T = max(0.004, Random::normal(Tr_, 0.2 * Tr_));
00094     rtt_timer_->sched(T);
00095 }
00096 
00097 
00098 void XCPQueue::routerId(XCPWrapQ* q, int id)
00099 {
00100     if (id < 0 && q == 0)
00101         fprintf(stderr, "XCP:invalid routerId and queue\n");
00102     routerId_ = id;
00103     myQueue_ = q;
00104 }
00105 
00106 int XCPQueue::routerId(int id)
00107 {
00108     if (id > -1) 
00109         routerId_ = id;
00110     return ((int)routerId_); 
00111 }
00112 
00113 int XCPQueue::limit(int qlim)
00114 {
00115     if (qlim > 0) 
00116         qlim_ = qlim;
00117     return (qlim_);
00118 }
00119 
00120 void XCPQueue::setBW(double bw)
00121 {
00122     if (bw > 0) 
00123         link_capacity_bps_ = bw;
00124 }
00125 
00126 void XCPQueue::setChannel(Tcl_Channel queue_trace_file)
00127 {
00128     queue_trace_file_ = queue_trace_file;
00129 }
00130 
00131 Packet* XCPQueue::deque()
00132 {
00133     double inst_queue = byteLength();
00134 /* L 32 */
00135     if (inst_queue < running_min_queue_bytes_) 
00136         running_min_queue_bytes_= inst_queue;
00137   
00138     Packet* p = DropTail::deque();
00139     do_before_packet_departure(p);
00140   
00141     max_queue_ci_ = max(length(), max_queue_ci_);
00142     min_queue_ci_ = min(length(), min_queue_ci_);
00143   
00144     return (p);
00145 }
00146 
00147 void XCPQueue::enque(Packet* pkt)
00148 {
00149     max_queue_ci_ = max(length(), max_queue_ci_);
00150     min_queue_ci_ = min(length(), min_queue_ci_);
00151 
00152     do_on_packet_arrival(pkt);
00153     DropTail::enque(pkt);
00154 }
00155 
00156 void XCPQueue::do_on_packet_arrival(Packet* pkt){
00157   
00158     double pkt_size = double(hdr_cmn::access(pkt)->size());
00159 
00160     /* L 1 */
00161     input_traffic_bytes_ += pkt_size;
00162 
00163     hdr_xcp *xh = hdr_xcp::access(pkt); 
00164     if (spread_bytes_) {
00165         int i = int(xh->rtt_/BWIDTH + .5);
00166         if (i > maxb_)
00167             maxb_ = i;
00168         b_[i] += pkt_size;
00169     
00170         if (xh->rtt_ != 0.0 && xh->throughput_ != 0.0)
00171             t_[i] += pkt_size/xh->throughput_;
00172     }
00173     if (xh->xcp_enabled_ != hdr_xcp::XCP_ENABLED)
00174         return; // Estimates depend only on Forward XCP Traffic
00175       
00176     ++num_cc_packets_in_Te_;
00177   
00178     if (xh->rtt_ != 0.0 && xh->throughput_ != 0.0) {
00179         /* L 2 */
00180         double x = pkt_size / xh->throughput_;
00181 
00182         if (xh->xcp_sparse_)
00183             x = Te_;
00184 
00185         sum_inv_throughput_ += x;
00186         /* L 3 */
00187         if (xh->rtt_ < XCP_MAX_INTERVAL) {
00188             /* L 4 */
00189             double y = xh->rtt_ * pkt_size / xh->throughput_;
00190             if (xh->xcp_sparse_)
00191                 y = Te_ * xh->rtt_;
00192             sum_rtt_by_throughput_ += y;
00193             /* L 5 */
00194         } else {
00195             /* L 6 */
00196             double y = XCP_MAX_INTERVAL * pkt_size / xh->throughput_;
00197             if (xh->xcp_sparse_)
00198                 y = Te_ * XCP_MAX_INTERVAL;
00199             sum_rtt_by_throughput_ += y;
00200         }
00201     }
00202 }
00203 
00204 
00205 void XCPQueue::do_before_packet_departure(Packet* p)
00206 {
00207     if (!p) return;
00208   
00209     hdr_xcp *xh = hdr_xcp::access(p);
00210     
00211     if (xh->xcp_enabled_ != hdr_xcp::XCP_ENABLED)
00212         return;
00213     if (xh->rtt_ == 0.0) {
00214         xh->delta_throughput_ = 0;
00215         return;
00216     }
00217     if (xh->throughput_ == 0.0)
00218         xh->throughput_ = .1;    // XXX 1bps is small enough
00219 
00220     double inv = 1.0/xh->throughput_;
00221     double pkt_size = double(hdr_cmn::access(p)->size());
00222     double frac = 1.0;
00223     if (spread_bytes_) {
00224         // rtt-scaling
00225         frac = (xh->rtt_ > Te_) ? Te_ / xh->rtt_ : xh->rtt_ / Te_;
00226     }
00227     /* L 20, 21 */
00228     double pos_fbk = Cp_ * inv * pkt_size;
00229     double neg_fbk = Cn_ * pkt_size;
00230 
00231     if (xh->xcp_sparse_) {
00232         pos_fbk = Cp_ * xh->rtt_;
00233         neg_fbk = Cn_ * xh->rtt_ * xh->throughput_;
00234     }
00235 
00236     pos_fbk = min(residue_pos_fbk_, pos_fbk);
00237 
00238     neg_fbk = min(residue_neg_fbk_, neg_fbk);
00239 
00240     /* L 22 */
00241     double feedback = pos_fbk - neg_fbk;
00242      
00243     /* L 23 */  
00244     if (xh->delta_throughput_ >= feedback) {
00245         /* L 24 */
00246         xh->delta_throughput_ = feedback;
00247         xh->controlling_hop_ = routerId_;
00248         /* L 25 */
00249     } else {
00250         /* L 26 */
00251         neg_fbk = min(residue_neg_fbk_, neg_fbk + (feedback - xh->delta_throughput_));
00252         /* L 27 */
00253         pos_fbk = xh->delta_throughput_ + neg_fbk;
00254     }
00255     /* L 28, L 29 */
00256     residue_pos_fbk_ = max(0.0, residue_pos_fbk_ - pos_fbk);
00257     residue_neg_fbk_ = max(0.0, residue_neg_fbk_ - neg_fbk);
00258     /* L 30 */
00259     if (residue_pos_fbk_ == 0.0)
00260         Cp_ = 0.0;
00261     /* L 31 */
00262     if (residue_neg_fbk_ == 0.0)
00263         Cn_ = 0.0;
00264   
00265     if (TRACE && (queue_trace_file_ != 0 )) {
00266         trace_var("pos_fbk", pos_fbk);
00267         trace_var("neg_fbk", neg_fbk);
00268         trace_var("delta_throughput", xh->delta_throughput_);
00269         int id = hdr_ip::access(p)->flowid();
00270         char buf[25];
00271         sprintf(buf, "Thruput%d",id);
00272         trace_var(buf, xh->throughput_);
00273         
00274         // tracing measured thruput info
00275         if (xh->rtt_ > high_rtt_)
00276             high_rtt_ = xh->rtt_;
00277         total_thruput_ += pkt_size;
00278         
00279         if (num_mice_ != 0) {
00280             if (id >= num_mice_) 
00281                 thruput_elep_ += pkt_size;
00282             else 
00283                 thruput_mice_ += pkt_size;
00284         }
00285     }
00286 }
00287 
00288 
00289 void XCPQueue::Tq_timeout()
00290 {
00291     double inst_queue = byteLength();
00292     /* L 33 */
00293     queue_bytes_ = running_min_queue_bytes_;
00294     /* L 34 */
00295     running_min_queue_bytes_ = inst_queue;
00296     /* L 35 */
00297     Tq_ = max(0.002, (avg_rtt_ - inst_queue/link_capacity_bps_)/2.0); 
00298     /* L 36 */
00299     queue_timer_->resched(Tq_);
00300     
00301     if (TRACE && (queue_trace_file_ != 0)) {
00302         trace_var("Tq_", Tq_);
00303         trace_var("queue_bytes_", queue_bytes_);
00304         trace_var("routerId_", routerId_);
00305     }
00306 }
00307 
00308 void XCPQueue::Te_timeout()
00309 {
00310 
00311     if (TRACE && (queue_trace_file_ != 0)) {
00312         trace_var("residue_pos_fbk_not_allocated", residue_pos_fbk_);
00313         trace_var("residue_neg_fbk_not_allocated", residue_neg_fbk_);
00314     }
00315     if (spread_bytes_) {
00316         double spreaded_bytes = b_[0];
00317         double tp = t_[0];
00318         for (int i = 1; i <= maxb_; ++i) {
00319             double spill = b_[i]/(i+1);
00320             spreaded_bytes += spill;
00321             b_[i-1] = b_[i] - spill;
00322             
00323             spill = t_[i]/(i+1);
00324                 tp += spill;
00325             t_[i-1] = t_[i] - spill;
00326         }
00327         
00328         b_[maxb_] = t_[maxb_] = 0;
00329         if (maxb_ > 0)
00330             --maxb_;
00331         input_traffic_bytes_ = spreaded_bytes;
00332         sum_inv_throughput_ = tp;
00333     }
00334     /* L 8 */
00335     double input_bw = input_traffic_bytes_ / Te_;
00336     double phi_bps = 0.0;
00337     double shuffled_traffic_bps = 0.0;
00338 
00339     if (spread_bytes_) {
00340         avg_rtt_ = (maxb_ + 1)* BWIDTH/2; // XXX fix me
00341     } else {
00342         if (sum_inv_throughput_ != 0.0) {
00343 /* L 7 */
00344             avg_rtt_ = sum_rtt_by_throughput_ / sum_inv_throughput_;
00345         } else
00346             avg_rtt_ = INITIAL_Te_VALUE;
00347     }
00348     
00349     if (input_traffic_bytes_ > 0) {
00350 /* L 9 */
00351         phi_bps = ALPHA_ * (link_capacity_bps_- input_bw) 
00352             - BETA_ * queue_bytes_ / avg_rtt_;
00353 /* L 10 */
00354         shuffled_traffic_bps = GAMMA_ * input_bw;
00355 
00356         if (shuffled_traffic_bps > abs(phi_bps))
00357             shuffled_traffic_bps -= abs(phi_bps);
00358         else
00359             shuffled_traffic_bps = 0.0;
00360 /* L 10 ends here */
00361     }
00362 /* L 11, 12 */  
00363     residue_pos_fbk_ = max(0.0,  phi_bps) + shuffled_traffic_bps;
00364     residue_neg_fbk_ = max(0.0, -phi_bps) + shuffled_traffic_bps;
00365 
00366     if (sum_inv_throughput_ == 0.0)
00367         sum_inv_throughput_ = 1.0;
00368     if (input_traffic_bytes_ > 0) {
00369 /* L 13 */
00370         Cp_ =  residue_pos_fbk_ / sum_inv_throughput_;
00371 /* L 14 */
00372         Cn_ =  residue_neg_fbk_ / input_traffic_bytes_;
00373     } else 
00374         Cp_ = Cn_ = 0.0;
00375         
00376     if (TRACE && (queue_trace_file_ != 0)) {
00377         trace_var("input_traffic_bytes_", input_traffic_bytes_);
00378         trace_var("avg_rtt_", avg_rtt_);
00379         trace_var("residue_pos_fbk_", residue_pos_fbk_);
00380         trace_var("residue_neg_fbk_", residue_neg_fbk_);
00381         //trace_var("Qavg", edv_.v_ave);
00382         trace_var("Qsize", length());
00383         trace_var("min_queue_ci_", double(min_queue_ci_));
00384         trace_var("max_queue_ci_", double(max_queue_ci_));
00385 
00386         trace_var("routerId", routerId_);
00387     } 
00388     num_cc_packets_in_Te_ = 0;
00389 
00390 /* L 15 */  
00391     input_traffic_bytes_ = 0.0;
00392 /* L 16 */
00393     sum_inv_throughput_ = 0.0;
00394 /* L 17 */
00395     sum_rtt_by_throughput_ = 0.0;
00396 /* L 18 */
00397     if (spread_bytes_)
00398         Te_ = BWIDTH;
00399     else
00400         Te_ = max(avg_rtt_, XCP_MIN_INTERVAL);
00401 
00402 /* L 19 */
00403     estimation_control_timer_->resched(Te_);
00404 
00405     min_queue_ci_ = max_queue_ci_ = length();
00406 }
00407 
00408 
00409 void XCPQueue::everyRTT ()
00410 {
00411     if (effective_rtt_ != 0.0)
00412         Tr_ = effective_rtt_;
00413     else 
00414         if (high_rtt_ != 0.0)
00415             Tr_ = high_rtt_;
00416 
00417     // measure drops, if any
00418     trace_var("d", drops_);
00419     drops_=0;
00420 
00421     // sample the current queue size
00422     trace_var("q",length());
00423   
00424     // Update utilization 
00425     trace_var("u", total_thruput_/(Tr_*link_capacity_bps_));
00426     trace_var("u_elep", thruput_elep_/(Tr_*link_capacity_bps_));
00427     trace_var("u_mice", thruput_mice_/(Tr_*link_capacity_bps_));
00428     total_thruput_ = 0;
00429     thruput_elep_ = 0;
00430     thruput_mice_ = 0;
00431   
00432     rtt_timer_->resched(Tr_);
00433 }
00434 
00435 
00436 void  XCPQueue::drop(Packet* p)
00437 {
00438     drops_++;
00439     total_drops_ = total_drops_++;
00440   
00441     Connector::drop(p);
00442 }
00443 
00444 void  XCPQueue::setEffectiveRtt(double rtt)
00445 {
00446     effective_rtt_ = rtt;
00447     rtt_timer_->resched(effective_rtt_);
00448 }
00449 
00450 // Estimation & Control Helpers
00451 
00452 void XCPQueue::init_vars() 
00453 {
00454     link_capacity_bps_  = 0.0;
00455     avg_rtt_        = INITIAL_Te_VALUE;
00456     if (spread_bytes_)
00457         Te_     = BWIDTH;
00458     else
00459         Te_     = INITIAL_Te_VALUE;
00460   
00461     Tq_         = INITIAL_Te_VALUE; 
00462     Tr_                     = 0.1;
00463     high_rtt_               = 0.0;
00464     Cp_         = 0.0;
00465     Cn_         = 0.0;     
00466     residue_pos_fbk_    = 0.0;
00467     residue_neg_fbk_    = 0.0;     
00468     queue_bytes_        = 0.0; // our estimate of the fluid model queue
00469       
00470     input_traffic_bytes_    = 0.0; 
00471     sum_rtt_by_throughput_  = 0.0;
00472     sum_inv_throughput_ = 0.0;
00473     running_min_queue_bytes_= 0;
00474     num_cc_packets_in_Te_   = 0;
00475   
00476     queue_trace_file_ = 0;
00477     myQueue_ = 0;
00478   
00479     for (int i = 0; i<BSIZE; ++i)
00480         b_[i] = t_[i] = 0;
00481     maxb_ = 0;
00482   
00483     min_queue_ci_ = max_queue_ci_ = length();
00484   
00485     // measuring drops
00486     drops_ = 0;
00487     total_drops_ = 0;
00488 
00489     // utilisation
00490     num_mice_ = 0;
00491     thruput_elep_ = 0.0;
00492     thruput_mice_ = 0.0;
00493     total_thruput_ = 0.0;
00494 }
00495 
00496 
00497 void XCPTimer::expire(Event *) { 
00498     (*a_.*call_back_)();
00499 }
00500 
00501 
00502 void XCPQueue::trace_var(char * var_name, double var)
00503 {
00504     char wrk[500];
00505     double now = Scheduler::instance().clock();
00506 
00507     if (queue_trace_file_) {
00508         int n;
00509         sprintf(wrk, "%s %g %g",var_name, now, var);
00510         n = strlen(wrk);
00511         wrk[n] = '\n'; 
00512         wrk[n+1] = 0;
00513         (void)Tcl_Write(queue_trace_file_, wrk, n+1);
00514     }
00515     return; 
00516 }
00517 
00518 

Generated on Tue Mar 6 16:47:53 2007 for ns2 Network Simulator 2.29 by  doxygen 1.4.6