ew.cc

Go to the documentation of this file.
00001 
00002 /*
00003  * ew.cc
00004  * Copyright (C) 1999 by the University of Southern California
00005  * $Id: ew.cc,v 1.6 2005/08/25 18:58:03 johnh Exp $
00006  *
00007  * This program is free software; you can redistribute it and/or
00008  * modify it under the terms of the GNU General Public License,
00009  * version 2, as published by the Free Software Foundation.
00010  *
00011  * This program is distributed in the hope that it will be useful,
00012  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00013  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00014  * GNU General Public License for more details.
00015  *
00016  * You should have received a copy of the GNU General Public License along
00017  * with this program; if not, write to the Free Software Foundation, Inc.,
00018  * 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA.
00019  *
00020  *
00021  * The copyright of this module includes the following
00022  * linking-with-specific-other-licenses addition:
00023  *
00024  * In addition, as a special exception, the copyright holders of
00025  * this module give you permission to combine (via static or
00026  * dynamic linking) this module with free software programs or
00027  * libraries that are released under the GNU LGPL and with code
00028  * included in the standard release of ns-2 under the Apache 2.0
00029  * license or under otherwise-compatible licenses with advertising
00030  * requirements (or modified versions of such code, with unchanged
00031  * license).  You may copy and distribute such a system following the
00032  * terms of the GNU GPL for this module and the licenses of the
00033  * other code concerned, provided that you include the source code of
00034  * that other code when and as the GNU GPL requires distribution of
00035  * source code.
00036  *
00037  * Note that people who make modified versions of this module
00038  * are not obligated to grant this special exception for their
00039  * modified versions; it is their choice whether to do so.  The GNU
00040  * General Public License gives permission to release a modified
00041  * version without this exception; this exception also makes it
00042  * possible to release a modified version which carries forward this
00043  * exception.
00044  *
00045  */
00046 
00047 //
00048 // ew.cc (Early warning system)
00049 //   by Xuan Chen (xuanc@isi.edu), USC/ISI
00050 
00051 #include "ip.h"
00052 #include "tcp.h"
00053 #include "tcp-full.h"
00054 #include "random.h"
00055 
00056 #include "ew.h"
00057 
00058 // Definition of High-Low Filter
00059 HLF::HLF() {
00060   alpha = 0;
00061   high = low = 0;
00062 }
00063 
00064 void HLF::reset(double value) {
00065   high = low = value;
00066 }
00067 
00068 void HLF::reset() {
00069   reset(0);
00070 }
00071 
00072 // Set Alpha
00073 void HLF::setAlpha(double value) {
00074   if (value > 1 || value < 0)
00075     return;
00076 
00077   if (value >= 0.5)
00078     alpha = value;
00079   else
00080     alpha = 1 - value;
00081 }
00082 
00083 // Get outputs from HLF
00084 double HLF::getHigh() {
00085   return(high);
00086 }
00087 
00088 double HLF::getLow() {
00089   return(low);
00090 }
00091 
00092 // update high-low filter
00093 // high(t) = alpha * high(t-1) + (1 - alpha) * o(t)
00094 // low(t) = (1 - alpha) * low(t-1) + alpha * o(t)
00095 void HLF::update(double input) {
00096   // Set values to the current observation for the first time.
00097   if (high)
00098     high = alpha * high + (1 - alpha) * input;
00099   else
00100     high = input;
00101   if (low)
00102     low = (1 - alpha) * low + alpha * input;
00103   else
00104     low = input;
00105   //  printf("HLF %d %.2f, %.2f\n", (int)now, high, low);
00106 }
00107 
00108 // Definition for a token-bucket rate limitor
00109 TBrateLimitor::TBrateLimitor() {
00110   TBrateLimitor(DEFAULT_TB_RATE_P);
00111 };
00112 
00113 TBrateLimitor::TBrateLimitor(double rate) {
00114   double now = Scheduler::instance().clock();
00115   pkt_mode = 1;
00116   bucket_size = DEFAULT_TB_SIZE;
00117   token_rate = 0;
00118   token_num = bucket_size;
00119   last_time = now;
00120   ini_token_rate = rate;
00121 
00122   resetScore();
00123   setRate(rate);
00124   //printf("TB pkt_mode:%d, bucket_size:%g, token_num:%g, last_time:%g\n",
00125   //     pkt_mode, bucket_size, token_num, last_time);
00126 
00127   // High-low filter
00128   hlf.setAlpha(ALPHA);
00129   hlf.reset(rate);
00130 };
00131 
00132 // adjust the rate limit to the default value
00133 void TBrateLimitor::setRate(double rate) {
00134   last_token_rate = token_rate;
00135   
00136   if (! token_rate) {
00137     token_rate = rate;
00138   } else if (rate != token_rate) {
00139     // use HLF to change token rate
00140     hlf.update(rate);
00141     // Use low-gain filter for fast response
00142     //token_rate = hlf.getLow();
00143     token_rate = hlf.getHigh();
00144     token_rate = rate;
00145   }
00146   
00147   printf("TR %d %.2f %.2f %d %d\n", (int)(Scheduler::instance().clock()), 
00148      token_rate, rate, p_score, n_score);
00149 }
00150 
00151 // adjust the rate limit to approaching an optimal rate limit
00152 void TBrateLimitor::adjustRate() {
00153   // pay the penalty
00154   adjustScore(-1);
00155 
00156   double rate = token_rate;
00157   if (p_score >= n_score)
00158     rate = token_rate * (1 + 0.2);
00159   else 
00160     rate = token_rate * (1 - 0.2);
00161 
00162   setRate(rate);
00163 }
00164 
00165 // Reset negative / positive score
00166 void TBrateLimitor::resetScore() {
00167   n_score = p_score = 0;
00168 }
00169 
00170 // adjust the score for increasing or decreasing scores
00171 void TBrateLimitor::adjustScore(int score) {
00172   // pay the penalty
00173   if (last_token_rate > token_rate)
00174     n_score += score;
00175   else
00176     p_score += score;
00177 }
00178 
00179 int TBrateLimitor::run(double incoming, double t_rate) {
00180   double now = Scheduler::instance().clock();
00181   double interval = now - last_time;
00182   
00183   //printf("TB: now:%g last_time:%g interval:%g; ", now, last_time, interval);
00184   token_num += interval * t_rate;
00185   last_time = now;
00186 
00187   // more tokens are overflowed
00188   if (token_num > bucket_size)
00189     token_num = bucket_size;
00190 
00191   //printf("token #:%g; ", token_num);
00192 
00193   // through (0 dropping probability)
00194   if (token_num >= incoming) {
00195     token_num -= incoming;
00196     //printf("...through\n");
00197     return 0;
00198   }
00199 
00200   // dropped
00201   //printf("...dropped\n");
00202   return 1;
00203 }
00204 
00205 int TBrateLimitor::run(double incoming) {
00206   return (run(incoming, token_rate));
00207 }
00208 
00209 // EW detector
00210 // Constructor
00211 EWdetector::EWdetector() {
00212   ew_src = ew_dst = -1;
00213 
00214   // reset measurement
00215   cur_rate = avg_rate = 0;
00216 
00217   // reset timers
00218   db_timer = dt_timer = 0;
00219 
00220   // reset alarm
00221   resetAlarm();
00222   resetChange();
00223 
00224   // High-low filter
00225   hlf.setAlpha(ALPHA);
00226 }
00227 
00228 //EWdetector::~EWdetector() {
00229 //};
00230 
00231 // Enable detecting and debugging
00232 void EWdetector::setDt(int inv) {
00233   dt_inv = inv;
00234   //printf("DT: %d\n", dt_inv);
00235 };
00236 void EWdetector::setDb(int inv) {
00237   db_inv = inv;
00238   //printf("DB: %d\n", db_inv);
00239 };
00240 
00241 void EWdetector::setLink(int src, int dst) {
00242   ew_src = src;
00243   ew_dst = dst;
00244   //printf("EW: (%d:%d)\n", ew_src, ew_dst);
00245 };
00246 
00247 void EWdetector::setAlarm() {
00248   alarm = 1;
00249 
00250   // Reset low and high gain filters' input values to the long-term avg
00251   // Actually, there is no change to high gain filter
00252   hlf.reset(avg_rate);
00253 };
00254 
00255 void EWdetector::resetAlarm() {
00256   alarm = 0;
00257 
00258   // Reset low and high gain filters' input values to the long-term avg
00259   // Actually, there is no change to low gain filter
00260   hlf.reset(avg_rate);
00261 };
00262 
00263 // Set and reset change flag
00264 void EWdetector::setChange() {
00265   change = 1;
00266 }
00267 
00268 void EWdetector::resetChange() {
00269   change = 0;
00270 }
00271 
00272 // Test if the alarm has been triggered
00273 int EWdetector::testAlarm() {
00274   if (!change)
00275     return(EW_UNCHANGE);
00276   else 
00277     return(alarm);
00278 }
00279 
00280 // Update long term average
00281 void EWdetector::updateAvg() {
00282   // update the request rate
00283   // update the aggregated response rate
00284   // Update flip-flop filter
00285   hlf.update(cur_rate);
00286   
00287   // Update SWIN, not used any more.
00288   //updateSWin(cur_rate);
00289   //ravgSWin();
00290   //printSWin();
00291   
00292   // Update the long term average value with the output from different filters
00293   if (!alarm) {
00294     // Use low-gain filter for fast response
00295     //avg_rate = hlf.getLow();
00296     avg_rate = hlf.getHigh();
00297   } else {
00298     // Use high-gain filter to keep the long term average stable
00299     avg_rate = hlf.getHigh();
00300   }
00301 }
00302 
00303 // the detector's engine
00304 void EWdetector::run(Packet *pkt) {
00305   // get the time
00306   now = Scheduler::instance().clock();
00307   
00308   //printf("EW[%d:%d] run ", ew_src, ew_dst);
00309   // update the measurement 
00310   measure(pkt);
00311 
00312   // There is a timeout!
00313   if (now >= dt_timer) {
00314     // Start detection
00315     //printf("Detection timeout(%d)\n", (int)now);
00316     
00317     // 1. Update the current rate from measurement
00318     updateCur();
00319 
00320     // 2. Detect change and Trigger alarm if necessary
00321     // Compare the current rate with the long term average
00322     detect();
00323     
00324     // 3. Update the long term averages
00325     updateAvg();
00326 
00327     // setup the sleeping timer
00328     dt_timer = (int)now + dt_inv;
00329     //printf("%d\n", dt_inv);
00330 
00331     change = 1;
00332   }
00333   
00334   // Schedule debug
00335   if (db_inv && now >= db_timer) {
00336     //printf("debugB ");
00337     trace();
00338     db_timer = (int)now + db_inv;
00339   }
00340 }
00341 
00342 // end of EW detector
00343 
00344 // EW bit rate detector
00345 //Constructor.  
00346 EWdetectorB::EWdetectorB() : EWdetector() {
00347   drop_p = 0;
00348   arr_count = 0;
00349 
00350   adjustor = 1.0;
00351 
00352   // Initialize ALIST
00353   alist.head = alist.tail = NULL;
00354   alist.count = 0;
00355 
00356   swin.head = swin.tail = NULL;
00357   swin.count = swin.ravg = 0;
00358 }
00359 
00360 //Deconstructor.
00361 EWdetectorB::~EWdetectorB(){
00362   resetAList();
00363   resetSWin();
00364 }
00365 
00366 // Initialize the EW parameters
00367 void EWdetectorB::init(int ew_adj) {
00368   // EW adjustor (g = resp rate / request rate)
00369   adjustor = ew_adj;
00370 }
00371 
00372 // Update current measurement 
00373 void EWdetectorB::measure(Packet *pkt) {
00374   //printf(" before UA");
00375   // Conduct detection continously
00376   updateAList(pkt);
00377   //printf(" after UA");
00378 }
00379 
00380 // Update current measurement 
00381 void EWdetectorB::updateCur() {
00382   //printAList();
00383   // Record current aggregated response rate
00384   cur_rate = computeARR();
00385 }
00386 
00387 // Check if the packet belongs to existing flow
00388 int EWdetectorB::exFlow(Packet *pkt) {
00389   // Should check SYN packets to protect existing connections
00390   //   need to use FullTCP
00391   return(0);
00392 }
00393 
00394 // Conduct the measurement
00395 void EWdetectorB::updateAList(Packet *pkt) {
00396   hdr_cmn* hdr = hdr_cmn::access(pkt);
00397   hdr_ip* iph = hdr_ip::access(pkt);
00398   int dst_id = iph->daddr();
00399   int src_id = iph->saddr();
00400   int f_id = iph->flowid(); 
00401 
00402   // Get the corresponding id.
00403   //printf("EW[%d:%d] in detector\n", ew_src, ew_dst);
00404 
00405   AListEntry *p;
00406   p = searchAList(src_id, dst_id, f_id);
00407 
00408   // Add new entry to AList
00409   // keep the bytes sent by each aggregate in AList
00410   if (!p) {
00411     p = newAListEntry(src_id, dst_id, f_id);
00412   }
00413 
00414   // update the existing (or just created) entry in AList
00415   assert(p && p->f_id == f_id && p->src_id == src_id && p->dst_id == dst_id);
00416 
00417   // update the flow's arrival rate using TSW
00418   double bytesInTSW, newBytes;
00419   bytesInTSW = p->avg_rate * p->win_length;
00420   newBytes = bytesInTSW + (double) hdr->size();
00421   p->avg_rate = newBytes / (now - p->t_front + p->win_length);
00422   p->t_front = now;
00423 
00424   //printAListEntry(p, 0);
00425 }
00426 
00427 // Get the median for a part of AList 
00428 //   starting from index with count entries
00429 int EWdetectorB::getMedianAList(int index, int count) {
00430   int m;
00431   
00432   if (!count)
00433     return 0;
00434 
00435   sortAList();
00436   //printAList();
00437 
00438   // Pick the entry with median avg_rate
00439   m = (int) (count / 2);
00440   if (2 * m == count) {
00441     return((getRateAList(index + m - 1) + getRateAList(index + m)) / 2);
00442   } else {
00443     return(getRateAList(index + m));
00444   }
00445 }
00446 
00447 // Get the rate given the index in the list
00448 int EWdetectorB::getRateAList(int index) {
00449   struct AListEntry *p;
00450 
00451   //printf("%d\n", index);
00452   p = alist.head;
00453   for (int i = 0; i < index; i++) {
00454     if (p)
00455       p = p->next;
00456   }
00457   
00458   if (p)
00459     return ((int)p->avg_rate);
00460 
00461   printf("Error in AList!\n");
00462   return(0);
00463 }
00464 
00465 // Calculate the aggragated response rate for high-bandwidth flows
00466 int EWdetectorB::computeARR() {
00467   int i, agg_rate;
00468 
00469   // Explicit garbage collection first 
00470   //  before both choosing HBFs and searching AList
00471   //printf("before timeout ");
00472   timeoutAList();
00473   //printf("after timeout ");
00474 
00475   // do nothing if no entry exists
00476   if (!alist.count) 
00477     return 0;
00478 
00479   // Pick the 10% highest bandwidth flows
00480   arr_count = (int) (alist.count * 0.1 + 1);
00481 
00482   // Sort AList first
00483   sortAList();
00484 
00485   // Calculate the ARR for HBFs
00486   // Use mean
00487   agg_rate = 0;
00488   for (i = 0; i < arr_count; i++) {
00489     agg_rate += getRateAList(i);
00490   }
00491   
00492   if (i)
00493     agg_rate = (int) (agg_rate / i);
00494   else {
00495     printf("No MAX returned from ALIST!!!\n");
00496   }
00497   
00498   // Use median (the median for the list or median for HBFs?)
00499   //agg_rate = getMedianAList(0, k);
00500   //printf("%f %d %d %d\n", now, k, agg_rate, getMedianAList(0, k));
00501   
00502   return(agg_rate);
00503 }
00504 
00505 // Find the matched AList entry
00506 struct AListEntry * EWdetectorB::searchAList(int src_id, int dst_id, int f_id){
00507   AListEntry *p;
00508 
00509   // Explicit garbage collection first.
00510   //printf("before timeout ");
00511   timeoutAList();
00512   //printf("after timeout ");
00513   // Use src and dest pair and flow id:
00514   //   aggregate flows within the same request-response exchange
00515   // Timeout need to be set to a very small value in order to
00516   //   seperate different exchanges.
00517   p = alist.head;
00518   while (p && 
00519      (p->f_id != f_id || p->src_id != src_id || p->dst_id != dst_id)) {
00520     p = p->next;
00521   }
00522   
00523   return(p);
00524 }
00525 
00526 // Add new entry to AList
00527 struct AListEntry * EWdetectorB::newAListEntry(int src_id, int dst_id, int f_id) {
00528   AListEntry *p;
00529 
00530   p = new AListEntry;
00531   p->src_id = src_id;
00532   p->dst_id = dst_id;
00533   p->f_id = f_id;
00534   p->last_update = now;
00535   p->avg_rate = 0;
00536   // Since we are doing random sampling, 
00537   // the t_front should set to the beginning of this period instead of 0.
00538   p->t_front = now;
00539   p->win_length = 1;
00540   p->next = NULL;
00541   
00542   // Add new entry to AList
00543   if (alist.tail)
00544     alist.tail->next = p;
00545   alist.tail = p;
00546   
00547   if (!alist.head)
00548     alist.head = p;
00549   
00550   alist.count++;
00551 
00552   return(p);
00553 }
00554 
00555 // Find the entry with max avg_rate in AList
00556 struct AListEntry * EWdetectorB::getMaxAList() {
00557   struct AListEntry *p, *pp, *max, *pm;
00558 
00559   //printAList();
00560   // find the max entry and remove
00561   p = pp = alist.head;
00562   max = pm = p;
00563   
00564   while (p) {
00565     if (p->avg_rate > max->avg_rate) {
00566       pm = pp;
00567       max = p;
00568     }
00569     
00570     pp = p;
00571     p = p->next;
00572   }
00573   
00574   // remove max from AList
00575   if (alist.head == max)
00576     alist.head = max->next;
00577   
00578   if (pm != max)
00579     pm->next = max->next;
00580   
00581   max->next = NULL;
00582   //printAList();
00583 
00584   return(max);
00585 }
00586 
00587 // Sort AList based on the avg_rate
00588 void EWdetectorB::sortAList() {
00589   struct AListEntry *max, *head, *tail;
00590   
00591   if (!alist.head)
00592     return;
00593 
00594   //printAList();
00595   head = tail = NULL;
00596 
00597   while (alist.head) {
00598     // Get the entry with the max avg_rate
00599     max = getMaxAList();
00600     //printAListEntry(max, i);
00601     
00602     if (max) {
00603       // Add max to the tail of the new list
00604       if (tail)
00605     tail->next = max;
00606       tail = max;
00607       
00608       if (!head)
00609     head = max;
00610     }
00611   }
00612 
00613   alist.head = head;
00614   alist.tail = tail;
00615 
00616   //printAList();
00617 }
00618 
00619 // Timeout AList entries
00620 void EWdetectorB::timeoutAList() {
00621   AListEntry *p, *q;
00622   float to;
00623 
00624   to = EW_FLOW_TIME_OUT;
00625   if (dt_inv)
00626     to = dt_inv;
00627 
00628   // Expire the old entries in AList
00629   p = q = alist.head;
00630   while (p) {
00631     // Garbage collection
00632     if (p->last_update + to < now){
00633       // The coresponding flow is expired.      
00634       if (p == alist.head){
00635     if (p == alist.tail) {
00636       alist.head = alist.tail = NULL;
00637       free(p);
00638       p = q = NULL;
00639     } else {
00640       alist.head = p->next;
00641       free(p);
00642       p = q = alist.head;
00643     }
00644       } else {
00645     q->next = p->next;
00646     if (p == alist.tail)
00647       alist.tail = q;
00648     free(p);
00649     p = q->next;
00650       }
00651       alist.count--;
00652     } else {
00653       q = p;
00654       p = q->next;
00655     }
00656   }
00657 }
00658 
00659 // Reset AList
00660 void EWdetectorB::resetAList() {
00661   struct AListEntry *ap, *aq;
00662 
00663   ap = aq = alist.head;
00664   while (ap) {
00665     aq = ap;
00666     ap = ap->next;
00667     free(aq);
00668   }
00669   
00670   ap = aq = NULL;
00671   alist.head = alist.tail = NULL;  
00672   alist.count = 0;
00673 }
00674 
00675 
00676 
00677 // Reset SWin
00678 void EWdetectorB::resetSWin() {
00679   struct SWinEntry *p, *q;
00680 
00681   p = q = swin.head;
00682   while (p) {
00683     q = p;
00684     p = p->next;
00685     free(q);
00686   }
00687   
00688   p = q = NULL;
00689   swin.head = swin.tail = NULL;  
00690   swin.count = swin.ravg = 0;
00691 }
00692 
00693 // update swin with the latest measurement of aggregated response rate
00694 void EWdetectorB::updateSWin(int rate) {
00695   struct SWinEntry *p, *new_entry;
00696 
00697   new_entry = new SWinEntry;
00698   new_entry->rate = rate;
00699   new_entry->weight = 1;
00700   new_entry->next = NULL;
00701   
00702   if (swin.tail)
00703     swin.tail->next = new_entry;
00704   swin.tail = new_entry;
00705   
00706   if (!swin.head)
00707     swin.head = new_entry;
00708 
00709   // Reset current rate.
00710   if (swin.count < EW_SWIN_SIZE) {
00711     swin.count++;
00712   } else {
00713     p = swin.head;
00714     swin.head = p->next;
00715     free(p);
00716   }
00717 }
00718 
00719 // Calculate the running average over the sliding window
00720 void EWdetectorB::ravgSWin() {
00721   struct SWinEntry *p;
00722   float sum = 0;
00723   float t_weight = 0;
00724 
00725   //printf("Calculate running average over the sliding window:\n");
00726   p = swin.head;
00727   //printf("after p\n");
00728 
00729   while (p) {
00730     //printSWinEntry(p, i++);
00731     sum += p->rate * p->weight;
00732     t_weight += p->weight;
00733     p = p->next;
00734   }
00735   p = NULL;
00736   //printf("\n");  
00737 
00738   swin.ravg = (int)(sum / t_weight);
00739 
00740   //  printf("Ravg: %d\n", swin.ravg);
00741 }
00742 
00743 // detect the traffic change by 
00744 // comparing the current measurement with the long-term average
00745 //   trigger alarm if necessary.
00746 void EWdetectorB::detect() {
00747   // When ALARM:
00748   //  detect if it is the time to release the alarm
00749   // When NO ALARM:
00750   //  detect if it is the time to trigger the alarm
00751   if (alarm) {
00752     // Determine if an alarm should be released
00753     if (cur_rate > avg_rate * (1 + EW_RELEASE_RANGE)) {
00754       // reset alarm
00755       resetAlarm();
00756     } 
00757   } else {
00758     // Determine if an alarm should be triggered
00759     //   need to be conservative!
00760     if (cur_rate < avg_rate * (1 - EW_DETECT_RANGE)) {
00761       setAlarm();
00762       
00763       // Initial drop_p to the MAX value whenever alarm triggered
00764       if (drop_p < EW_MAX_DROP_P)
00765     drop_p = EW_MAX_DROP_P;
00766     } else {
00767     }
00768   }
00769   
00770   // Determine the dropping probability
00771   //computeDropP();
00772 }
00773 
00774 // Determine the dropping probability based on current measurement
00775 void EWdetectorB::computeDropP() {
00776   double p = 0;
00777 
00778   if (alarm) {
00779     // Compute the dropping probability as a linear function of current rate
00780     //  p is computed towards the current measurement.
00781     p = 1;
00782     if (cur_rate)
00783       p = (avg_rate - cur_rate) * adjustor / cur_rate;
00784     
00785     // p could be greater than 1
00786     if (p > 1)
00787       p = 1;
00788     // p could also be negative
00789     if (p < 0)
00790       p = 0;
00791     
00792     // Compute the actual drop probability
00793     drop_p = ALPHA * drop_p + (1 - ALPHA) * p;    
00794     // adjust drop_p
00795     if (drop_p < EW_MIN_DROP_P)
00796       drop_p = EW_MIN_DROP_P;
00797     if (drop_p > EW_MAX_DROP_P)
00798       drop_p = EW_MAX_DROP_P;
00799   } else {
00800     // Fade out the drop_p when no alarm
00801     if (drop_p > 0) {
00802       if (drop_p <= EW_MIN_DROP_P)
00803         drop_p = 0;
00804       else {
00805         drop_p = ALPHA * drop_p;
00806       }
00807     }
00808   }
00809 }
00810 
00811 // Decreas the sample interval
00812 void EWdetectorB::decSInv() {
00813   // Need some investigation for the min allowed detection interval
00814   //  if (s_inv / 2 > EW_MIN_SAMPLE_INTERVAL) {
00815   // s_inv = s_inv / 2;
00816     
00817     //printf("SINV decreased by 2.\n");
00818   //}
00819 }
00820 
00821 // Increase the sample interval
00822 void EWdetectorB::incSInv() {
00823   //if(s_inv * 2 <= init_s_inv) {
00824   //  s_inv = s_inv * 2;
00825   
00826   //printf("SINV increased by 2.\n");
00827   // }
00828 }
00829 
00830 // Prints one entry in SWin
00831 void EWdetectorB::printSWin() {
00832   struct SWinEntry *p;
00833   printf("%f SWIN[%d, %d]", now, swin.ravg, swin.count);
00834   p = swin.head;
00835   int i = 0;
00836   while (p) {
00837     printSWinEntry(p, i++);
00838     p = p->next;
00839   }
00840   p = NULL;
00841   printf("\n");
00842 }
00843 
00844 // Print the contents in SWin
00845 void EWdetectorB::printSWinEntry(struct SWinEntry *p, int i) {
00846   if (p)
00847     printf("[%d: %d %.2f] ", i, p->rate, p->weight);
00848 }
00849 
00850 // Print one entry in AList
00851 void EWdetectorB::printAListEntry(struct AListEntry *p, int i) {
00852   if (!p)
00853     return;
00854 
00855   printf("[%d] %d (%d %d) %.2f %.2f\n", i, p->f_id, p->src_id, p->dst_id, 
00856      p->avg_rate, p->last_update);
00857 }
00858 
00859 
00860 // Print the entries in AList
00861 void EWdetectorB::printAList() {
00862   struct AListEntry *p;
00863   printf("%f AList(%d):\n", now, alist.count);
00864 
00865   p = alist.head;
00866   int i = 0;
00867   while (p) {
00868     printAListEntry(p, i);
00869     i++;
00870     p = p->next;
00871   }
00872   p = NULL;
00873   printf("\n");
00874 }
00875 
00876 // Trace bit rate (resp rate)
00877 void EWdetectorB::trace() {
00878   double db_rate = 0;
00879   double m_rate = 0;
00880 
00881   timeoutAList();
00882   m_rate = getMedianAList(0, alist.count);
00883   //printf("B ");
00884   db_rate = computeARR();
00885 
00886   if (!m_rate || !db_rate);
00887     //printAList();
00888 
00889   printf("B %d %.2f %.2f %d %d %.2f %.2f\n", 
00890      (int)now, cur_rate, avg_rate, arr_count, alarm, db_rate, m_rate);
00891 }
00892 
00893 // EW packet detector
00894 EWdetectorP::EWdetectorP() : EWdetector() {
00895   // Packet stats
00896   cur_p.arrival = cur_p.dept = cur_p.drop = 0;
00897   last_p.arrival = last_p.dept = last_p.drop = 0;
00898   last_p_db.arrival = last_p_db.dept = last_p_db.drop = 0;
00899 }
00900 
00901 EWdetectorP::~EWdetectorP(){
00902   // Packet stats
00903   cur_p.arrival = cur_p.dept = cur_p.drop = 0;
00904   last_p.arrival = last_p.dept = last_p.drop = 0;
00905 }
00906 
00907 // get the current request rate
00908 double EWdetectorP::getRate() {
00909   return(cur_rate);
00910 }
00911 
00912 // update packet stats
00913 void EWdetectorP::updateStats(int flag) {
00914   // Packet arrival
00915   if (flag == PKT_ARRIVAL) {
00916     cur_p.arrival++;
00917     return;
00918   }
00919 
00920   // Packet departure
00921   if (flag == PKT_DEPT) {
00922     cur_p.dept++;
00923     return;
00924   }
00925 
00926   // Packet dropped
00927   if (flag == PKT_DROP) {
00928     cur_p.drop++;
00929     return;
00930   }
00931 }
00932 
00933 // Detect changes in packet rate
00934 void EWdetectorP::detect() {
00935   if (cur_rate > avg_rate * (1 + EW_DETECT_RANGE)) {
00936     if (!alarm) {
00937       setAlarm();
00938     }
00939   } else if (cur_rate < avg_rate * (1 - EW_RELEASE_RANGE)) {
00940     if (alarm)
00941       resetAlarm();
00942   }
00943 }
00944 
00945 // Update current measurement
00946 void EWdetectorP::updateCur() {
00947   // measure the accepted packet rate (rather than arrival rate)
00948   cur_rate = (cur_p.dept - last_p.dept) / dt_inv;
00949 
00950   // keep the current value
00951   last_p.arrival = cur_p.arrival;
00952   last_p.dept = cur_p.dept;
00953   last_p.drop = cur_p.drop;
00954 }
00955 
00956 // Update long term average
00957 /*void EWdetectorP::updateAvg() {
00958   avg_rate = (int)(hlf.alpha * avg_rate + (1 - hlf.alpha) * cur_rate);
00959 }
00960 */
00961 // Update stats
00962 void EWdetectorP::measure(Packet *pkt) {
00963   
00964 
00965   // stats on packet departure and drop are collect in policer 
00966 }
00967 
00968 // Trace packet incoming rate (req rate)
00969 void EWdetectorP::trace() {
00970   printf("P %d %.2f %.2f %d %d %d %d %d %d %d\n", 
00971      (int)now, cur_rate, avg_rate, alarm,
00972      cur_p.arrival - last_p_db.arrival,
00973      cur_p.dept - last_p_db.dept,
00974      cur_p.drop - last_p_db.drop,  
00975      cur_p.arrival, cur_p.dept, cur_p.drop);
00976 
00977   last_p_db.arrival = cur_p.arrival;
00978   last_p_db.dept = cur_p.dept;
00979   last_p_db.drop = cur_p.drop;
00980 }
00981 
00982 // EW Policy: deal with queueing stuffs.
00983 //Constructor.  
00984 EWPolicy::EWPolicy() : Policy() {
00985   // Initialize detectors
00986   ewB = cewB = NULL;
00987   ewP = cewP = NULL;
00988   
00989   // Initialize rate limitor
00990   rlP = rlB = NULL;
00991 
00992   max_p = max_b = 0;
00993   alarm = pre_alarm = 0;
00994   change = 0;
00995 }
00996 
00997 //Deconstructor.
00998 EWPolicy::~EWPolicy(){
00999   if (ewB)
01000     free(ewB);
01001 
01002   if (ewP)
01003     free(ewP);
01004 
01005   if (cewB)
01006     free(cewB);
01007 
01008   if (cewP)
01009     free(cewP);
01010 }
01011 
01012 // Initialize the EW parameters
01013 void EWPolicy::init(int adj, int src, int dst) {
01014   ew_adj = adj;
01015   qsrc = src;
01016   qdst = dst;
01017 }
01018 
01019 // EW meter: do nothing.
01020 //  measurement is done in policer: we need to know whether the packet is
01021 //    dropped or not.
01022 void EWPolicy::applyMeter(policyTableEntry *policy, Packet *pkt) {
01023   return;
01024 }
01025 
01026 // EW Policer
01027 //  1. do measurement: P: both arrival and departure; B: only departure
01028 //  2. make packet drop decisions
01029 int EWPolicy::applyPolicer(policyTableEntry *policy, policerTableEntry *policer, Packet *pkt) {
01030   //printf("enter applyPolicer ");
01031 
01032   // can't count/penalize ACKs:
01033   //   with resp: may cause inaccurate calculation with TSW(??)
01034   //   with req:  may cause resp retransmission.
01035   // just pass them through
01036   hdr_cmn *th = hdr_cmn::access(pkt);
01037   hdr_ip* iph = hdr_ip::access(pkt);
01038   int dst_id = iph->daddr();
01039   int src_id = iph->saddr();
01040 
01041   if (th->ptype() == PT_ACK)
01042     return(policer->initialCodePt);
01043 
01044   // for other packets...
01045 
01046   // Get time
01047   now = Scheduler::instance().clock();
01048 
01049   // keep arrival packet stats
01050   if (ewP) {
01051     printf("TRR %d %d %d %d\n", (int)now, src_id, dst_id, th->size());
01052     ewP->updateStats(PKT_ARRIVAL);
01053   }
01054 
01055   // For other packets:
01056   if (dropPacket(pkt)) {
01057     // keep packet stats
01058     if (ewP)
01059       ewP->updateStats(PKT_DROP);
01060     
01061     //printf("downgrade!\n");   
01062     return(policer->downgrade1);
01063   } else {
01064     // keep packet stats
01065     if (ewP)
01066       ewP->updateStats(PKT_DEPT);
01067 
01068     // conduct EW detection
01069     if (ewP)
01070       ewP->run(pkt);
01071     
01072     if (ewB)
01073       ewB->run(pkt);    
01074 
01075     //printf("initial!\n"); 
01076     return(policer->initialCodePt);
01077   }
01078 }
01079 
01080 // detect if there is alarm triggered
01081 void EWPolicy::detect(Packet *pkt) {
01082   int alarm_b, alarm_p;
01083 
01084   alarm_b = alarm_p = 0;
01085 
01086   if (!ewP || ! cewB)
01087     return;
01088   
01089   alarm_b = cewB->testAlarm();
01090   alarm_p = ewP->testAlarm();
01091   
01092 
01093   if (alarm_p == EW_UNCHANGE || alarm_b == EW_UNCHANGE)
01094     return;
01095 
01096   // Need to get info from both parts to make a decision
01097   // Reset change flags
01098   ewP->resetChange();
01099   cewB->resetChange();
01100 
01101   change = 1;
01102   // keep the old value of alarm
01103   pre_alarm = alarm;
01104 
01105   // As long as alarm_b is 0, reset the alarm
01106   if (alarm_b == 0)
01107     alarm = 0;
01108   else if (alarm_p == 0)
01109     alarm = 0;
01110   else 
01111     alarm = 1;
01112 
01113   printf("ALARM %d %d\n", pre_alarm, alarm);
01114 }
01115 
01116 //  make packet drop decisions
01117 int EWPolicy::dropPacket(Packet *pkt) {
01118   // 1. arrival stats is measured in meter (departure and drops here)
01119   // 2. No penalty to response traffic!!
01120   // 3. need to protect existing connections
01121 
01122   // pass EW if there is any
01123   if (cewB && ewP) {
01124     // protecting existing connections
01125     //  drop requests for new connection (SYN packet)
01126     //    if (cewB->exFlow(pkt))
01127     hdr_tcp *tcph = hdr_tcp::access(pkt);
01128     // Protecting non-SYN packets: existing connections
01129     if ((tcph->flags() & TH_SYN) == 0) {
01130       //return(0);
01131     }
01132 
01133     // Check alarm
01134     detect(pkt);
01135 
01136     if (change) {
01137       // for new incoming requests:
01138       //   use EW measurement to adjust the rate limit (to current Rq)
01139       // see if the alarm should be reset
01140       
01141       if (pre_alarm) {
01142     if (alarm) {
01143       // The rate is not right:
01144       //   too low: too few connection admitted;
01145       //   too high: congestion in response
01146       // Adjustment is needed.
01147       if (rlP)
01148         rlP->adjustRate();
01149     } else {
01150       // the current rate is ok, award the current choice
01151       if (rlP)
01152         rlP->adjustScore(1);
01153     }
01154       } else {
01155     if (alarm) {
01156       if (rlP) {
01157         // Start a new round
01158         rlP->resetScore();
01159         // Use current request rate as the rate limit
01160         rlP->setRate(ewP->getRate());
01161       }
01162     } else {
01163       // the current rate is ok
01164     }
01165       }    
01166       
01167       change = 0;
01168     }  
01169   }
01170 
01171   // Passing rate limitor if there is any
01172   if (rlP) {
01173     // rate limiting
01174     return(rlP->run(1));
01175   };
01176   
01177   // through by default
01178   return(0);
01179 }
01180 
01181 // Enable detecting on packet incoming rate (req rate)
01182 void EWPolicy::detectPr(int dt_inv, int db_inv) {
01183   ewP = new EWdetectorP;
01184   ewP->setLink(qsrc, qdst);
01185   ewP->setDt(dt_inv);
01186   ewP->setDb(db_inv);
01187 }
01188 
01189 void EWPolicy::detectPr(int dt_inv) {
01190   detectPr(dt_inv, dt_inv);
01191 }
01192 
01193 void EWPolicy::detectPr() {
01194   detectPr(EW_DT_INV, EW_DB_INV);
01195 }
01196 
01197 // Enable detecting and debugging bit rate (eg: resp rate)
01198 void EWPolicy::detectBr(int dt_inv, int db_inv) {
01199   ewB = new EWdetectorB;
01200   ewB->init(ew_adj);
01201   ewB->setLink(qsrc, qdst);
01202   ewB->setDt(dt_inv);
01203   ewB->setDb(db_inv);
01204 }
01205 
01206 void EWPolicy::detectBr(int dt_inv) {
01207   detectBr(dt_inv, dt_inv);
01208 }
01209 
01210 void EWPolicy::detectBr() {
01211   detectBr(EW_DT_INV, EW_DB_INV);
01212 }
01213 
01214 // Rate limitor: packet rate
01215 void EWPolicy::limitPr(double rate) {
01216   //printf("PR %d\n", rate);
01217   rlP = new TBrateLimitor(rate);
01218 };
01219 
01220 // Rate limitor: bit rate
01221 void EWPolicy::limitBr(double rate) {
01222   //printf("BR %d\n", rate);
01223   rlB = new TBrateLimitor(rate);
01224 };
01225 
01226 // Rate limitor: packet rate
01227 void EWPolicy::limitPr() {
01228   limitPr(DEFAULT_TB_RATE_P);
01229 };
01230 
01231 // Rate limitor: bit rate
01232 void EWPolicy::limitBr() {
01233   limitBr(DEFAULT_TB_RATE_B);
01234 };
01235 
01236 // couple EW detector
01237 void EWPolicy::coupleEW(EWPolicy *ewpc) {
01238   coupleEW(ewpc, 0);
01239 }
01240 
01241 // couple EW detector
01242 void EWPolicy::coupleEW(EWPolicy *ewpc, double rate) {
01243   // couple the EW detector 
01244   cewB = ewpc->ewB;
01245   
01246   // Setup rate limitor with the default limit
01247   if (rate)
01248     limitPr(rate);
01249   else
01250     limitPr();
01251 }
01252 // End of EWP

Generated on Tue Mar 6 16:47:45 2007 for ns2 Network Simulator 2.29 by  doxygen 1.4.6