rate-limit.cc

Go to the documentation of this file.
00001 /* -*-  Mode:C++; c-basic-offset:4; tab-width:4; indent-tabs-mode:t -*- */
00002 /*
00003  * Copyright (c) 2000  International Computer Science Institute
00004  * All rights reserved.
00005  *
00006  * Redistribution and use in source and binary forms, with or without
00007  * modification, are permitted provided that the following conditions
00008  * are met:
00009  * 1. Redistributions of source code must retain the above copyright
00010  *    notice, this list of conditions and the following disclaimer.
00011  * 2. Redistributions in binary form must reproduce the above copyright
00012  *    notice, this list of conditions and the following disclaimer in the
00013  *    documentation and/or other materials provided with the distribution.
00014  * 3. All advertising materials mentioning features or use of this software
00015  *    must display the following acknowledgement:
00016  *  This product includes software developed by ACIRI, the AT&T 
00017  *      Center for Internet Research at ICSI (the International Computer
00018  *      Science Institute).
00019  * 4. Neither the name of ACIRI nor of ICSI may be used
00020  *    to endorse or promote products derived from this software without
00021  *    specific prior written permission.
00022  *
00023  * THIS SOFTWARE IS PROVIDED BY ICSI AND CONTRIBUTORS ``AS IS'' AND
00024  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
00025  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
00026  * ARE DISCLAIMED.  IN NO EVENT SHALL ICSI OR CONTRIBUTORS BE LIABLE
00027  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
00028  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
00029  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
00030  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
00031  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
00032  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
00033  * SUCH DAMAGE.
00034  *
00035  */
00036 
00037 #include "rate-limit.h"
00038 
00039 #include "random.h"
00040 #include "ident-tree.h"
00041 #include "pushback.h"
00042 
00043 // ######################### RateLmitSessionList Methods #####################
00044 
00045 int
00046 RateLimitSessionList::filter(Packet * pkt, int lowDemand) {
00047 
00048   RateLimitSession * next = first_;
00049   double dropP = -1;
00050   
00051   while (next != NULL) {
00052     double p = next->log(pkt, lowDemand);
00053     if (p >= dropP) dropP = p;
00054     next = next->next_;
00055   }
00056 
00057 #ifdef DEBUG_RLSL
00058   if (dropP == -1) {
00059     printf("RLSList: Found a non-member packet at %g\n", Scheduler::instance().clock());
00060     fflush(stdout);
00061   }
00062 #endif
00063 
00064   double u = Random::uniform();
00065   if (u <= dropP) {
00066 #ifdef DEBUG_RLSL
00067     printf("RLSList:%d Dropping Packet in filter. \n", first_->logData_->myID_);
00068     fflush(stdout);
00069 #endif
00070    return 1;
00071   }
00072   
00073   //not dropped 
00074   return 0;
00075 }
00076 
00077 int 
00078 RateLimitSessionList::insert(RateLimitSession * session) {
00079   RateLimitSession * listItem = first_;
00080   
00081   while (listItem != NULL) {
00082     if ( listItem->aggSpec_->equals(session->aggSpec_) ) {
00083       return 0;
00084     }
00085     listItem = listItem->next_;
00086   }
00087   
00088   session->setSucc(first_);
00089   first_ = session;
00090   session->localID_ = IDCounter_++;
00091   noSessions_++;
00092   return 1;
00093 }
00094   
00095 RateLimitSession *
00096 RateLimitSessionList::containsLocalAggSpec(AggSpec * spec, int myID) {
00097   RateLimitSession * listItem = first_;
00098   while (listItem != NULL) {
00099     if ( listItem->origin_== myID ) {
00100       if ( listItem->aggSpec_->contains(spec) ) {
00101     return listItem;
00102       } 
00103       //  else if ( spec->contains(listItem->aggSpec_) ) {
00104       //    return 2;
00105       // }
00106     }
00107     listItem = listItem->next_;
00108   }
00109   return NULL;
00110 }
00111 
00112 int 
00113 RateLimitSessionList::containsAggSpec(AggSpec * spec) {
00114   RateLimitSession * listItem = first_;
00115   while (listItem != NULL) {
00116     if ( listItem->aggSpec_->contains(spec) ) {
00117       return 1;
00118     }
00119     listItem = listItem->next_;
00120   }
00121   return 0;
00122 }
00123 
00124 //merger will only take place for aggregates started at this node.
00125 void 
00126 RateLimitSessionList::mergeSessions(int myID) {
00127   RateLimitSession * session1 = first_;
00128   while (session1 != NULL) {
00129     AggSpec * agg1 = session1->aggSpec_;
00130     RateLimitSession * session2 = session1->next_;
00131     while (session2 != NULL) {
00132       AggSpec * agg2 = session2->aggSpec_;
00133       if (session1->origin_== myID && session2->origin_ == myID &&
00134       agg1->dstON_  && agg2->dstON_) {
00135     int bits = AggSpec::prefixBitsForMerger(agg1, agg2);
00136     if (bits==0) {
00137         
00138         //a measure of how much are we broadening.
00139         int bitsDiff = ((agg1->dstBits_<agg2->dstBits_)? agg1->dstBits_: agg2->dstBits_) - bits;
00140         int prefix = PrefixTree::getPrefixBits(agg1->dstPrefix_, bits);
00141         int count = getMySubsetCount(prefix, bits, myID);
00142         if (count <2) {
00143             printf("Error: Anomaly \n");
00144             exit(-1);
00145         }
00146         if (PushbackAgent::mergerAccept(count, bits, bitsDiff)) {
00147             merge(prefix, bits, myID);
00148         }
00149     }
00150       }
00151       session2 = session2->next_;
00152     }
00153     session1 = session1->next_;
00154   }
00155 }
00156 
00157 void
00158 RateLimitSessionList::merge(int prefix, int bits, int myID) {
00159   RateLimitSession * listItem = first_;
00160   RateLimitSession * firstItem = NULL;
00161   while (listItem != NULL) {
00162     if ( listItem->origin_== myID && 
00163      listItem->aggSpec_->subsetOfDst(prefix, bits) &&
00164      !listItem->merged_) {
00165       if (firstItem == NULL) {
00166     firstItem = listItem;
00167       } else {
00168     //merge here with firstItem
00169     firstItem = RateLimitSession::merge(firstItem, listItem, bits);
00170       }
00171     }
00172     listItem = listItem->next_;
00173   }
00174   if (firstItem == NULL) {
00175     printf("Error: Anomaly no 2\n");
00176     exit(-1);
00177   }
00178   
00179   firstItem->aggSpec_->expand(prefix, bits);
00180 }
00181   
00182 int 
00183 RateLimitSessionList::getMySubsetCount(int prefix, int bits, int myID) {
00184  RateLimitSession * listItem = first_;
00185  int count=0;
00186  while (listItem != NULL) {
00187     if ( listItem->origin_== myID &&
00188      listItem->aggSpec_->subsetOfDst(prefix, bits))
00189       count++;
00190     
00191     listItem = listItem->next_;
00192   }
00193  return count;
00194 }
00195 
00196 int 
00197 RateLimitSessionList::noMySessions(int myID) {
00198   RateLimitSession * listItem = first_;
00199   int count=0;
00200   while (listItem != NULL) {
00201     if ( listItem->origin_==myID &&
00202      !listItem->merged_ ) {
00203       count++;
00204     }
00205     listItem = listItem->next_;
00206   }
00207   return count;
00208 }
00209   
00210 RateLimitSession *
00211 RateLimitSessionList::getSessionByLocalID(int localID) {
00212   
00213   RateLimitSession * listItem = first_;
00214   while (listItem != NULL) {
00215     if (listItem->localID_ == localID) {
00216       return listItem;
00217     }
00218     listItem = listItem->next_;
00219   }
00220   
00221   return NULL;
00222 }
00223 
00224 RateLimitSession *
00225 RateLimitSessionList::getSessionByRemoteID(int remoteID) {
00226   
00227   RateLimitSession * listItem = first_;
00228   while (listItem != NULL) {
00229     if (listItem->remoteID_ == remoteID) {
00230       return listItem;
00231     }
00232     listItem = listItem->next_;
00233   }
00234   
00235   return NULL;
00236 }
00237 
00238 void 
00239 RateLimitSessionList::endSession(RateLimitSession * rls) {
00240   
00241   if (first_==NULL) {
00242     printf("RLSL: Error. No session in progress\n");
00243     exit(-1);
00244   }
00245 
00246   if (first_==rls) {
00247     first_=rls->next_;
00248     noSessions_--;
00249     delete(rls);
00250     return;
00251   } 
00252 
00253   RateLimitSession * previous = first_;
00254   RateLimitSession * current = first_->next_;
00255   while (current!=NULL) {
00256     if (current == rls) {
00257       previous->next_=current->next_;
00258       noSessions_--;
00259       delete(rls);
00260       return;
00261     }
00262     previous = current;
00263     current=current->next_;
00264   }
00265   
00266   printf("RLSL: Error. The correct RLS not found\n");
00267   exit(-1);
00268 }
00269 
00270 //descending order
00271 int 
00272 RateLimitSessionList::rankRate(int myID, double rate) {
00273     int rank=0;
00274     RateLimitSession * listItem = first_;
00275     while (listItem != NULL) {
00276         if (listItem->origin_ == myID && listItem->getArrivalRateForStatus() > rate) {
00277             rank++;
00278         }
00279         listItem = listItem->next_;
00280     }
00281     
00282     return rank;
00283 }
00284 
00285 //ascending order
00286 int 
00287 RateLimitSessionList::rankSession(int myID, RateLimitSession * session) {
00288     int rank=0;
00289     RateLimitSession * listItem = first_;
00290     while (listItem != NULL) {
00291         if (listItem->origin_ == myID) {
00292             if (listItem->getArrivalRateForStatus() < session->getArrivalRateForStatus()) {
00293                 rank++;
00294             }
00295             //to enforce deterministic ordering between sessions with same rate
00296             else if (listItem->getArrivalRateForStatus() == session->getArrivalRateForStatus() &&
00297                      listItem < session) {
00298                 rank++;
00299             }
00300         }
00301         listItem = listItem->next_;
00302     }
00303     
00304     return rank;
00305 }
00306 
00307 
00308 // ############################# RateLmitSession Methods #####################
00309 
00310 //local constructor
00311 RateLimitSession::RateLimitSession(AggSpec * aggSpec, double rateEstimate, int initial, 
00312                    double limit, int origin, int locQID, 
00313                    double delay, double lowerBound, Node * node, RouteLogic * rtLogic):
00314   pushbackON_(0), merged_(0), next_(NULL) {
00315   aggSpec_ = aggSpec;
00316   origin_ = origin;
00317   remoteID_ = -1;
00318   localQID_ = locQID;
00319   remoteQID_ = -1;
00320   heightInPTree_ = 0; //always begin as a leaf.
00321   depthInPTree_ = 0;   
00322   startTime_ = Scheduler::instance().clock();
00323   expiryTime_ = startTime_ + delay;
00324   refreshTime_ = startTime_;
00325   lowerBound_ = lowerBound;
00326   initialPhase_=initial;
00327   rlStrategy_ = new RateLimitStrategy(limit, aggSpec_->ptype_, aggSpec->ptypeShare_, rateEstimate);
00328   logData_ = new LoggingDataStruct(node, rtLogic, aggSpec->getSampleAddress(), rateEstimate);
00329 }
00330 
00331 //remote constructor
00332 RateLimitSession::RateLimitSession(AggSpec * aggSpec, double limit, int origin, int locQID, 
00333                    int remoteQID, int remoteID, int depth, double delay, 
00334                    double lowerBound, Node * node, RouteLogic * rtLogic):
00335   pushbackON_(0), merged_(0), initialPhase_(0), next_(NULL) {
00336   aggSpec_ = aggSpec;
00337   origin_ = origin;
00338   remoteID_ = remoteID;
00339   localQID_ = locQID;
00340   remoteQID_ = remoteQID;
00341   heightInPTree_ = 0; //always begin as a leaf.
00342   depthInPTree_ = depth;   
00343   startTime_ = Scheduler::instance().clock();
00344   expiryTime_ = startTime_ + delay;
00345   lowerBound_ = lowerBound;
00346   rlStrategy_ = new RateLimitStrategy(limit, aggSpec_->ptype_, aggSpec->ptypeShare_, 0);
00347   logData_ = new LoggingDataStruct(node, rtLogic, aggSpec->getSampleAddress(), 0);
00348 }
00349 
00350 double 
00351 RateLimitSession::log(Packet *pkt, int lowDemand) {
00352   
00353   int member = aggSpec_->member(pkt); 
00354   
00355   if (member == 0) {
00356     //printf("RLS: Found a non-member packet at %g\n", Scheduler::instance().clock());
00357     return 0;
00358   }
00359 
00360   //expired session
00361   //   if (expiryTime_ < Scheduler::instance().clock()) {
00362   //     printf("RLS: Session expired at %g. expiryTime = %g\n", 
00363   //       Scheduler::instance().clock(), expiryTime_);
00364   //     return 0;
00365   //   }
00366 
00367   logData_->log(pkt);    //log the packet
00368   int mine = (origin_ == logData_->myID_); 
00369   double prob = rlStrategy_->process(pkt, mine, lowDemand);  //rate limit it.
00370 
00371   return prob;
00372 }
00373 
00374 double
00375 RateLimitSession::getDropRate() {
00376   return rlStrategy_->getDropRate();
00377 }
00378 
00379 void
00380 RateLimitSession::pushbackOn() {
00381   pushbackON_ = 1;
00382   rlStrategy_->reset();
00383 }
00384 
00385 void
00386 RateLimitSession::refreshed() {
00387   refreshTime_ = Scheduler::instance().clock();
00388 }
00389 
00390 void 
00391 RateLimitSession::setAggSpec(AggSpec * aggSpec) {
00392   aggSpec_->dstON_ = aggSpec->dstON_;
00393   aggSpec_->dstPrefix_ = aggSpec->dstPrefix_;
00394   aggSpec_->dstBits_ = aggSpec->dstBits_;
00395 }
00396 
00397 void
00398 RateLimitSession::setLimit(double limit) {
00399   rlStrategy_->target_rate_=limit;
00400 }
00401  
00402 double
00403 RateLimitSession::getArrivalRateForStatus()  {
00404 
00405   // for a leaf PBA, this is the rate seen at the rlStrategy_;
00406   // for non-leaf PBAs it is the sum of the rates reported by upstream PBAs 
00407   // in their status messages.
00408   
00409   double rate;
00410   
00411   if (pushbackON_) {
00412     logData_->consolidateStatus();
00413     rate = logData_->statusArrivalRateAll_;
00414   } 
00415   else {
00416     rate = rlStrategy_->getArrivalRate();
00417   }
00418 
00419   return rate;
00420 }
00421 
00422 RateLimitSession *
00423 RateLimitSession::merge(RateLimitSession * session1, RateLimitSession * session2, int bits) {
00424 
00425   RateLimitSession *winner, *loser;
00426   
00427   if (session1->pushbackON_) {
00428     winner = session1;
00429     loser = session2;
00430   } else {
00431     winner = session2;
00432     loser = session1;
00433   }
00434   loser->merged_=1;
00435   
00436   int envelope;
00437   if (session1->aggSpec_->dstBits_==bits) 
00438     envelope = 1;
00439   else if (session2->aggSpec_->dstBits_==bits) 
00440     envelope=2;
00441   else 
00442     envelope=0;
00443   
00444   double lowerBound = pick4merge(session1->lowerBound_, session2->lowerBound_, envelope);
00445   winner->lowerBound_=lowerBound;
00446 
00447   double target_rate = pick4merge(session1->rlStrategy_->target_rate_,
00448                    session2->rlStrategy_->target_rate_, 
00449                    envelope);
00450   winner->setLimit(target_rate);
00451 
00452   double estRate = pick4merge(session1->rlStrategy_->rateEstimator_->estRate_,
00453                   session2->rlStrategy_->rateEstimator_->estRate_,
00454                   envelope);
00455   winner->rlStrategy_->rateEstimator_->estRate_=estRate;
00456 
00457   LoggingDataStruct * log1 = session1->logData_;
00458   LoggingDataStruct * log2 = session2->logData_;
00459   if (log1->count_ != log2->count_ || log1->myID_ != log2->myID_) {
00460     printf("RLS: Error: logdata count or ID anomaly\n");
00461     exit(-1);
00462   }
00463   
00464   estRate = pick4merge(log1->rateEstimator_->estRate_,
00465                log2->rateEstimator_->estRate_,
00466                envelope);
00467   winner->logData_->rateEstimator_->estRate_ = estRate;
00468   
00469   LoggingDataStructNode * node1 = log1->first_;
00470   LoggingDataStructNode * node2 = log2->first_;
00471   LoggingDataStructNode * nodew = winner->logData_->first_;
00472   
00473   while (node1 != NULL && node2!= NULL && nodew != NULL) {
00474     if (node1->nid_ != node2->nid_) {
00475       printf("RLS: Error: Out of order log nodes. Or something more sinister\n");
00476       exit(-1);
00477     }
00478 
00479     estRate =  pick4merge(node1->rateEstimator_->estRate_,
00480               node2->rateEstimator_->estRate_,
00481               envelope);
00482     nodew->rateEstimator_->estRate_ = estRate;
00483     
00484     double statusArrivalRate = pick4merge(node1->statusArrivalRate_,
00485                       node2->statusArrivalRate_,
00486                       envelope);
00487     nodew->statusArrivalRate_ = statusArrivalRate;
00488 
00489     node1=node1->next_;
00490     node2=node2->next_;
00491     nodew=nodew->next_;
00492   }
00493   
00494   if (node1 != NULL || node2 !=NULL || nodew != NULL) {
00495     printf("RLS: Error: Different chains\n");
00496     exit(-1);
00497   }
00498   
00499   return winner;
00500 }
00501 
00502 double
00503 RateLimitSession::pick4merge(double q1, double q2, int envelope) {
00504   
00505   if (envelope == 1) {
00506     return q1;
00507   } else if (envelope == 2) {
00508     return q2;
00509   }
00510   return q1+q2;
00511 }
00512 
00513 RateLimitSession::~RateLimitSession() {
00514   delete(aggSpec_);
00515   delete(rlStrategy_);
00516   delete(logData_);
00517 }

Generated on Tue Mar 6 16:47:49 2007 for ns2 Network Simulator 2.29 by  doxygen 1.4.6