00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035 #ifndef lint
00036 static const char rcsid[] =
00037 "@(#) $Header: /nfs/jade/vint/CVSROOT/ns-2/queue/queue.cc,v 1.29 2004/10/28 01:22:48 sfloyd Exp $ (LBL)";
00038 #endif
00039
00040 #include "queue.h"
00041 #include <math.h>
00042 #include <stdio.h>
00043
00044 void PacketQueue::remove(Packet* target)
00045 {
00046 for (Packet *pp= 0, *p= head_; p; pp= p, p= p->next_) {
00047 if (p == target) {
00048 if (!pp) deque();
00049 else {
00050 if (p == tail_)
00051 tail_= pp;
00052 pp->next_= p->next_;
00053 --len_;
00054 bytes_ -= hdr_cmn::access(p)->size();
00055 }
00056 return;
00057 }
00058 }
00059 fprintf(stderr, "PacketQueue:: remove() couldn't find target\n");
00060 abort();
00061 }
00062
00063
00064
00065
00066
00067 void PacketQueue::remove(Packet* pkt, Packet *prev)
00068 {
00069 if (pkt) {
00070 if (head_ == pkt)
00071 PacketQueue::deque();
00072 else {
00073 prev->next_ = pkt->next_;
00074 if (tail_ == pkt)
00075 tail_ = prev;
00076 --len_;
00077 bytes_ -= hdr_cmn::access(pkt)->size();
00078 }
00079 }
00080 return;
00081 }
00082
00083 void QueueHandler::handle(Event*)
00084 {
00085 queue_.resume();
00086 }
00087
00088 Queue::~Queue() {
00089 }
00090
00091 Queue::Queue() : Connector(), blocked_(0), unblock_on_resume_(1), qh_(*this),
00092 pq_(0),
00093 last_change_(0),
00094 old_util_(0), period_begin_(0), cur_util_(0), buf_slot_(0),
00095 util_buf_(NULL)
00096 {
00097 bind("limit_", &qlim_);
00098 bind("util_weight_", &util_weight_);
00099 bind_bool("blocked_", &blocked_);
00100 bind_bool("unblock_on_resume_", &unblock_on_resume_);
00101 bind("util_check_intv_", &util_check_intv_);
00102 bind("util_records_", &util_records_);
00103
00104 if (util_records_ > 0) {
00105 util_buf_ = new double[util_records_];
00106 if (util_buf_ == NULL) {
00107 printf("Error allocating util_bufs!");
00108 util_records_ = 0;
00109 }
00110 for (int i = 0; i < util_records_; i++) {
00111 util_buf_[i] = 0;
00112 }
00113 }
00114 }
00115
00116 void Queue::recv(Packet* p, Handler*)
00117 {
00118 double now = Scheduler::instance().clock();
00119 enque(p);
00120 if (!blocked_) {
00121
00122
00123
00124
00125
00126
00127 p = deque();
00128 if (p != 0) {
00129 utilUpdate(last_change_, now, blocked_);
00130 last_change_ = now;
00131 blocked_ = 1;
00132 target_->recv(p, &qh_);
00133 }
00134 }
00135 }
00136
00137 void Queue::utilUpdate(double int_begin, double int_end, int link_state) {
00138 double decay;
00139
00140 decay = exp(-util_weight_ * (int_end - int_begin));
00141 old_util_ = link_state + (old_util_ - link_state) * decay;
00142
00143
00144 if (util_records_ == 0)
00145 return;
00146
00147 double intv = int_end - int_begin;
00148 double tot_intv = int_begin - period_begin_;
00149 if (intv || tot_intv) {
00150 int guard = 0;
00151 cur_util_ = (link_state * intv + cur_util_ * tot_intv) /
00152 (intv + tot_intv);
00153 while (tot_intv + intv > util_check_intv_ &&
00154 guard++ < util_records_) {
00155
00156 period_begin_ = int_end;
00157 util_buf_[buf_slot_] = cur_util_;
00158 buf_slot_ = (buf_slot_ + 1) % util_records_;
00159 cur_util_ = link_state;
00160 intv -= util_check_intv_;
00161 }
00162 }
00163 }
00164
00165 double Queue::utilization(void)
00166 {
00167 double now = Scheduler::instance().clock();
00168
00169 utilUpdate(last_change_, now, blocked_);
00170 last_change_ = now;
00171
00172 return old_util_;
00173
00174 }
00175
00176 double Queue::peak_utilization(void)
00177 {
00178 double now = Scheduler::instance().clock();
00179 double peak = 0;
00180 int i;
00181
00182
00183
00184 if (util_records_ == 0)
00185 return utilization();
00186
00187 utilUpdate(last_change_, now, blocked_);
00188 last_change_ = now;
00189
00190 for (i = 0; i < util_records_; i++) {
00191 if (util_buf_[i] > peak)
00192 peak = util_buf_[i];
00193 }
00194 return peak;
00195 }
00196
00197 void Queue::updateStats(int queuesize)
00198 {
00199 double now = Scheduler::instance().clock();
00200 double newtime = now - total_time_;
00201 if (newtime > 0.0) {
00202 double oldave = true_ave_;
00203 double oldtime = total_time_;
00204 double newtime = now - total_time_;
00205 true_ave_ = (oldtime * oldave + newtime * queuesize) /now;
00206 total_time_ = now;
00207 }
00208 }
00209
00210 void Queue::resume()
00211 {
00212 double now = Scheduler::instance().clock();
00213 Packet* p = deque();
00214 if (p != 0) {
00215 target_->recv(p, &qh_);
00216 } else {
00217 if (unblock_on_resume_) {
00218 utilUpdate(last_change_, now, blocked_);
00219 last_change_ = now;
00220 blocked_ = 0;
00221 }
00222 else {
00223 utilUpdate(last_change_, now, blocked_);
00224 last_change_ = now;
00225 blocked_ = 1;
00226 }
00227 }
00228 }
00229
00230 void Queue::reset()
00231 {
00232 Packet* p;
00233 total_time_ = 0.0;
00234 true_ave_ = 0.0;
00235 while ((p = deque()) != 0)
00236 drop(p);
00237 }
00238