00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037 #include "rate-limit.h"
00038
00039 #include "random.h"
00040 #include "ident-tree.h"
00041 #include "pushback.h"
00042
00043
00044
00045 int
00046 RateLimitSessionList::filter(Packet * pkt, int lowDemand) {
00047
00048 RateLimitSession * next = first_;
00049 double dropP = -1;
00050
00051 while (next != NULL) {
00052 double p = next->log(pkt, lowDemand);
00053 if (p >= dropP) dropP = p;
00054 next = next->next_;
00055 }
00056
00057 #ifdef DEBUG_RLSL
00058 if (dropP == -1) {
00059 printf("RLSList: Found a non-member packet at %g\n", Scheduler::instance().clock());
00060 fflush(stdout);
00061 }
00062 #endif
00063
00064 double u = Random::uniform();
00065 if (u <= dropP) {
00066 #ifdef DEBUG_RLSL
00067 printf("RLSList:%d Dropping Packet in filter. \n", first_->logData_->myID_);
00068 fflush(stdout);
00069 #endif
00070 return 1;
00071 }
00072
00073
00074 return 0;
00075 }
00076
00077 int
00078 RateLimitSessionList::insert(RateLimitSession * session) {
00079 RateLimitSession * listItem = first_;
00080
00081 while (listItem != NULL) {
00082 if ( listItem->aggSpec_->equals(session->aggSpec_) ) {
00083 return 0;
00084 }
00085 listItem = listItem->next_;
00086 }
00087
00088 session->setSucc(first_);
00089 first_ = session;
00090 session->localID_ = IDCounter_++;
00091 noSessions_++;
00092 return 1;
00093 }
00094
00095 RateLimitSession *
00096 RateLimitSessionList::containsLocalAggSpec(AggSpec * spec, int myID) {
00097 RateLimitSession * listItem = first_;
00098 while (listItem != NULL) {
00099 if ( listItem->origin_== myID ) {
00100 if ( listItem->aggSpec_->contains(spec) ) {
00101 return listItem;
00102 }
00103
00104
00105
00106 }
00107 listItem = listItem->next_;
00108 }
00109 return NULL;
00110 }
00111
00112 int
00113 RateLimitSessionList::containsAggSpec(AggSpec * spec) {
00114 RateLimitSession * listItem = first_;
00115 while (listItem != NULL) {
00116 if ( listItem->aggSpec_->contains(spec) ) {
00117 return 1;
00118 }
00119 listItem = listItem->next_;
00120 }
00121 return 0;
00122 }
00123
00124
00125 void
00126 RateLimitSessionList::mergeSessions(int myID) {
00127 RateLimitSession * session1 = first_;
00128 while (session1 != NULL) {
00129 AggSpec * agg1 = session1->aggSpec_;
00130 RateLimitSession * session2 = session1->next_;
00131 while (session2 != NULL) {
00132 AggSpec * agg2 = session2->aggSpec_;
00133 if (session1->origin_== myID && session2->origin_ == myID &&
00134 agg1->dstON_ && agg2->dstON_) {
00135 int bits = AggSpec::prefixBitsForMerger(agg1, agg2);
00136 if (bits==0) {
00137
00138
00139 int bitsDiff = ((agg1->dstBits_<agg2->dstBits_)? agg1->dstBits_: agg2->dstBits_) - bits;
00140 int prefix = PrefixTree::getPrefixBits(agg1->dstPrefix_, bits);
00141 int count = getMySubsetCount(prefix, bits, myID);
00142 if (count <2) {
00143 printf("Error: Anomaly \n");
00144 exit(-1);
00145 }
00146 if (PushbackAgent::mergerAccept(count, bits, bitsDiff)) {
00147 merge(prefix, bits, myID);
00148 }
00149 }
00150 }
00151 session2 = session2->next_;
00152 }
00153 session1 = session1->next_;
00154 }
00155 }
00156
00157 void
00158 RateLimitSessionList::merge(int prefix, int bits, int myID) {
00159 RateLimitSession * listItem = first_;
00160 RateLimitSession * firstItem = NULL;
00161 while (listItem != NULL) {
00162 if ( listItem->origin_== myID &&
00163 listItem->aggSpec_->subsetOfDst(prefix, bits) &&
00164 !listItem->merged_) {
00165 if (firstItem == NULL) {
00166 firstItem = listItem;
00167 } else {
00168
00169 firstItem = RateLimitSession::merge(firstItem, listItem, bits);
00170 }
00171 }
00172 listItem = listItem->next_;
00173 }
00174 if (firstItem == NULL) {
00175 printf("Error: Anomaly no 2\n");
00176 exit(-1);
00177 }
00178
00179 firstItem->aggSpec_->expand(prefix, bits);
00180 }
00181
00182 int
00183 RateLimitSessionList::getMySubsetCount(int prefix, int bits, int myID) {
00184 RateLimitSession * listItem = first_;
00185 int count=0;
00186 while (listItem != NULL) {
00187 if ( listItem->origin_== myID &&
00188 listItem->aggSpec_->subsetOfDst(prefix, bits))
00189 count++;
00190
00191 listItem = listItem->next_;
00192 }
00193 return count;
00194 }
00195
00196 int
00197 RateLimitSessionList::noMySessions(int myID) {
00198 RateLimitSession * listItem = first_;
00199 int count=0;
00200 while (listItem != NULL) {
00201 if ( listItem->origin_==myID &&
00202 !listItem->merged_ ) {
00203 count++;
00204 }
00205 listItem = listItem->next_;
00206 }
00207 return count;
00208 }
00209
00210 RateLimitSession *
00211 RateLimitSessionList::getSessionByLocalID(int localID) {
00212
00213 RateLimitSession * listItem = first_;
00214 while (listItem != NULL) {
00215 if (listItem->localID_ == localID) {
00216 return listItem;
00217 }
00218 listItem = listItem->next_;
00219 }
00220
00221 return NULL;
00222 }
00223
00224 RateLimitSession *
00225 RateLimitSessionList::getSessionByRemoteID(int remoteID) {
00226
00227 RateLimitSession * listItem = first_;
00228 while (listItem != NULL) {
00229 if (listItem->remoteID_ == remoteID) {
00230 return listItem;
00231 }
00232 listItem = listItem->next_;
00233 }
00234
00235 return NULL;
00236 }
00237
00238 void
00239 RateLimitSessionList::endSession(RateLimitSession * rls) {
00240
00241 if (first_==NULL) {
00242 printf("RLSL: Error. No session in progress\n");
00243 exit(-1);
00244 }
00245
00246 if (first_==rls) {
00247 first_=rls->next_;
00248 noSessions_--;
00249 delete(rls);
00250 return;
00251 }
00252
00253 RateLimitSession * previous = first_;
00254 RateLimitSession * current = first_->next_;
00255 while (current!=NULL) {
00256 if (current == rls) {
00257 previous->next_=current->next_;
00258 noSessions_--;
00259 delete(rls);
00260 return;
00261 }
00262 previous = current;
00263 current=current->next_;
00264 }
00265
00266 printf("RLSL: Error. The correct RLS not found\n");
00267 exit(-1);
00268 }
00269
00270
00271 int
00272 RateLimitSessionList::rankRate(int myID, double rate) {
00273 int rank=0;
00274 RateLimitSession * listItem = first_;
00275 while (listItem != NULL) {
00276 if (listItem->origin_ == myID && listItem->getArrivalRateForStatus() > rate) {
00277 rank++;
00278 }
00279 listItem = listItem->next_;
00280 }
00281
00282 return rank;
00283 }
00284
00285
00286 int
00287 RateLimitSessionList::rankSession(int myID, RateLimitSession * session) {
00288 int rank=0;
00289 RateLimitSession * listItem = first_;
00290 while (listItem != NULL) {
00291 if (listItem->origin_ == myID) {
00292 if (listItem->getArrivalRateForStatus() < session->getArrivalRateForStatus()) {
00293 rank++;
00294 }
00295
00296 else if (listItem->getArrivalRateForStatus() == session->getArrivalRateForStatus() &&
00297 listItem < session) {
00298 rank++;
00299 }
00300 }
00301 listItem = listItem->next_;
00302 }
00303
00304 return rank;
00305 }
00306
00307
00308
00309
00310
00311 RateLimitSession::RateLimitSession(AggSpec * aggSpec, double rateEstimate, int initial,
00312 double limit, int origin, int locQID,
00313 double delay, double lowerBound, Node * node, RouteLogic * rtLogic):
00314 pushbackON_(0), merged_(0), next_(NULL) {
00315 aggSpec_ = aggSpec;
00316 origin_ = origin;
00317 remoteID_ = -1;
00318 localQID_ = locQID;
00319 remoteQID_ = -1;
00320 heightInPTree_ = 0;
00321 depthInPTree_ = 0;
00322 startTime_ = Scheduler::instance().clock();
00323 expiryTime_ = startTime_ + delay;
00324 refreshTime_ = startTime_;
00325 lowerBound_ = lowerBound;
00326 initialPhase_=initial;
00327 rlStrategy_ = new RateLimitStrategy(limit, aggSpec_->ptype_, aggSpec->ptypeShare_, rateEstimate);
00328 logData_ = new LoggingDataStruct(node, rtLogic, aggSpec->getSampleAddress(), rateEstimate);
00329 }
00330
00331
00332 RateLimitSession::RateLimitSession(AggSpec * aggSpec, double limit, int origin, int locQID,
00333 int remoteQID, int remoteID, int depth, double delay,
00334 double lowerBound, Node * node, RouteLogic * rtLogic):
00335 pushbackON_(0), merged_(0), initialPhase_(0), next_(NULL) {
00336 aggSpec_ = aggSpec;
00337 origin_ = origin;
00338 remoteID_ = remoteID;
00339 localQID_ = locQID;
00340 remoteQID_ = remoteQID;
00341 heightInPTree_ = 0;
00342 depthInPTree_ = depth;
00343 startTime_ = Scheduler::instance().clock();
00344 expiryTime_ = startTime_ + delay;
00345 lowerBound_ = lowerBound;
00346 rlStrategy_ = new RateLimitStrategy(limit, aggSpec_->ptype_, aggSpec->ptypeShare_, 0);
00347 logData_ = new LoggingDataStruct(node, rtLogic, aggSpec->getSampleAddress(), 0);
00348 }
00349
00350 double
00351 RateLimitSession::log(Packet *pkt, int lowDemand) {
00352
00353 int member = aggSpec_->member(pkt);
00354
00355 if (member == 0) {
00356
00357 return 0;
00358 }
00359
00360
00361
00362
00363
00364
00365
00366
00367 logData_->log(pkt);
00368 int mine = (origin_ == logData_->myID_);
00369 double prob = rlStrategy_->process(pkt, mine, lowDemand);
00370
00371 return prob;
00372 }
00373
00374 double
00375 RateLimitSession::getDropRate() {
00376 return rlStrategy_->getDropRate();
00377 }
00378
00379 void
00380 RateLimitSession::pushbackOn() {
00381 pushbackON_ = 1;
00382 rlStrategy_->reset();
00383 }
00384
00385 void
00386 RateLimitSession::refreshed() {
00387 refreshTime_ = Scheduler::instance().clock();
00388 }
00389
00390 void
00391 RateLimitSession::setAggSpec(AggSpec * aggSpec) {
00392 aggSpec_->dstON_ = aggSpec->dstON_;
00393 aggSpec_->dstPrefix_ = aggSpec->dstPrefix_;
00394 aggSpec_->dstBits_ = aggSpec->dstBits_;
00395 }
00396
00397 void
00398 RateLimitSession::setLimit(double limit) {
00399 rlStrategy_->target_rate_=limit;
00400 }
00401
00402 double
00403 RateLimitSession::getArrivalRateForStatus() {
00404
00405
00406
00407
00408
00409 double rate;
00410
00411 if (pushbackON_) {
00412 logData_->consolidateStatus();
00413 rate = logData_->statusArrivalRateAll_;
00414 }
00415 else {
00416 rate = rlStrategy_->getArrivalRate();
00417 }
00418
00419 return rate;
00420 }
00421
00422 RateLimitSession *
00423 RateLimitSession::merge(RateLimitSession * session1, RateLimitSession * session2, int bits) {
00424
00425 RateLimitSession *winner, *loser;
00426
00427 if (session1->pushbackON_) {
00428 winner = session1;
00429 loser = session2;
00430 } else {
00431 winner = session2;
00432 loser = session1;
00433 }
00434 loser->merged_=1;
00435
00436 int envelope;
00437 if (session1->aggSpec_->dstBits_==bits)
00438 envelope = 1;
00439 else if (session2->aggSpec_->dstBits_==bits)
00440 envelope=2;
00441 else
00442 envelope=0;
00443
00444 double lowerBound = pick4merge(session1->lowerBound_, session2->lowerBound_, envelope);
00445 winner->lowerBound_=lowerBound;
00446
00447 double target_rate = pick4merge(session1->rlStrategy_->target_rate_,
00448 session2->rlStrategy_->target_rate_,
00449 envelope);
00450 winner->setLimit(target_rate);
00451
00452 double estRate = pick4merge(session1->rlStrategy_->rateEstimator_->estRate_,
00453 session2->rlStrategy_->rateEstimator_->estRate_,
00454 envelope);
00455 winner->rlStrategy_->rateEstimator_->estRate_=estRate;
00456
00457 LoggingDataStruct * log1 = session1->logData_;
00458 LoggingDataStruct * log2 = session2->logData_;
00459 if (log1->count_ != log2->count_ || log1->myID_ != log2->myID_) {
00460 printf("RLS: Error: logdata count or ID anomaly\n");
00461 exit(-1);
00462 }
00463
00464 estRate = pick4merge(log1->rateEstimator_->estRate_,
00465 log2->rateEstimator_->estRate_,
00466 envelope);
00467 winner->logData_->rateEstimator_->estRate_ = estRate;
00468
00469 LoggingDataStructNode * node1 = log1->first_;
00470 LoggingDataStructNode * node2 = log2->first_;
00471 LoggingDataStructNode * nodew = winner->logData_->first_;
00472
00473 while (node1 != NULL && node2!= NULL && nodew != NULL) {
00474 if (node1->nid_ != node2->nid_) {
00475 printf("RLS: Error: Out of order log nodes. Or something more sinister\n");
00476 exit(-1);
00477 }
00478
00479 estRate = pick4merge(node1->rateEstimator_->estRate_,
00480 node2->rateEstimator_->estRate_,
00481 envelope);
00482 nodew->rateEstimator_->estRate_ = estRate;
00483
00484 double statusArrivalRate = pick4merge(node1->statusArrivalRate_,
00485 node2->statusArrivalRate_,
00486 envelope);
00487 nodew->statusArrivalRate_ = statusArrivalRate;
00488
00489 node1=node1->next_;
00490 node2=node2->next_;
00491 nodew=nodew->next_;
00492 }
00493
00494 if (node1 != NULL || node2 !=NULL || nodew != NULL) {
00495 printf("RLS: Error: Different chains\n");
00496 exit(-1);
00497 }
00498
00499 return winner;
00500 }
00501
00502 double
00503 RateLimitSession::pick4merge(double q1, double q2, int envelope) {
00504
00505 if (envelope == 1) {
00506 return q1;
00507 } else if (envelope == 2) {
00508 return q2;
00509 }
00510 return q1+q2;
00511 }
00512
00513 RateLimitSession::~RateLimitSession() {
00514 delete(aggSpec_);
00515 delete(rlStrategy_);
00516 delete(logData_);
00517 }