00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040
00041
00042
00043
00044
00045
00046
00047
00048
00049
00050
00051
00052
00053
00054
00055 #include <tclcl.h>
00056
00057 #include "empweb.h"
00058
00059
00060
00061
00062
00063
00064
00065
00066 class EmpWebPage : public TimerHandler {
00067 public:
00068 EmpWebPage(int id, EmpWebTrafSession* sess, int nObj, Node* dst, int svrId) :
00069 persistOption_(0), id_(id), sess_(sess), nObj_(nObj), curObj_(0), doneObj_(0), dst_(dst), svrId_(svrId) {}
00070 virtual ~EmpWebPage() {}
00071
00072 inline void start() {
00073
00074 status_ = TIMER_PENDING;
00075 handle(&event_);
00076 }
00077 inline int id() const { return id_; }
00078 inline int svrId() const { return svrId_; }
00079 Node* dst() { return dst_; }
00080
00081 void doneObject() {
00082 if (sess_->mgr()->isdebug())
00083 printf("doneObject: %g done=%d total=%d \n", Scheduler::instance().clock(), doneObj_, nObj_);
00084
00085 if (++doneObj_ >= nObj_) {
00086 printf("doneObject: %g %d %d \n", Scheduler::instance().clock(), doneObj_, nObj_);
00087 sess_->donePage((void*)this);
00088
00089
00090 } else if (persistOption_) {
00091 sched(sess_->interObj()->value());
00092 }
00093
00094 }
00095 inline int curObj() const { return curObj_; }
00096 inline int doneObj() const { return doneObj_; }
00097
00098 inline void set_persistOption(int opt) { persistOption_ = opt; }
00099 int persistOption_ ;
00100
00101 private:
00102 virtual void expire(Event* = 0) {
00103
00104 if (curObj_ >= nObj_)
00105 return;
00106 sess_->launchReq(this, LASTOBJ_++,
00107 (int)ceil(sess_->objSize()->value()),
00108 (int)ceil(sess_->reqSize()->value()), sess_->id(), persistOption_);
00109 if (sess_->mgr()->isdebug())
00110 printf("expire: Session %d launched page %d obj %d nObj %d \n",
00111 sess_->id(), id_, curObj_, nObj_);
00112 }
00113 virtual void handle(Event *e) {
00114 if (sess_->mgr()->isdebug())
00115 printf("handle: Session %d launched page %d obj %d\n",
00116 sess_->id(), id_, curObj_);
00117
00118 TimerHandler::handle(e);
00119 curObj_++;
00120 if (!persistOption_) {
00121 if (curObj_ < nObj_) sched(sess_->interObj()->value());
00122 }
00123 }
00124 int id_;
00125 EmpWebTrafSession* sess_;
00126 int nObj_, curObj_;
00127 int doneObj_;
00128 Node* dst_;
00129 int svrId_ ;
00130 static int LASTOBJ_;
00131 };
00132
00133 int EmpWebPage::LASTOBJ_ = 1;
00134
00135 int EmpWebTrafSession::LASTPAGE_ = 1;
00136
00137 int EmpWebTrafPool::LASTFLOW_ = 1;
00138
00139
00140
00141 EmpWebTrafSession::~EmpWebTrafSession()
00142 {
00143 if (donePage_ != curPage_) {
00144 fprintf(stderr, "done pages %d != all pages %d\n",
00145 donePage_, curPage_);
00146 abort();
00147 }
00148 if (status_ != TIMER_IDLE) {
00149 fprintf(stderr, "EmpWebTrafSession must be idle when deleted.\n");
00150 abort();
00151 }
00152
00153
00154
00155
00156
00157
00158
00159
00160
00161
00162
00163
00164
00165
00166
00167
00168
00169
00170 }
00171
00172 void EmpWebTrafSession::donePage(void* ClntData)
00173 {
00174 EmpWebPage* pg = (EmpWebPage*)ClntData;
00175 if (mgr_->isdebug())
00176 printf("Session %d done page %d\n", id_, pg->id());
00177
00178 if (pg->doneObj() != pg->curObj()) {
00179 fprintf(stderr, "done objects %d != all objects %d\n",
00180 pg->doneObj(), pg->curObj());
00181 abort();
00182 }
00183
00184
00185
00186 if (pg->persistOption_) {
00187 if (!fulltcp_) {
00188
00189 mgr_->recycleTcp(ctcp_);
00190 mgr_->recycleTcp(stcp_);
00191 mgr_->recycleSink(csnk_);
00192 mgr_->recycleSink(ssnk_);
00193 } else {
00194 Tcl::instance().evalf("%s disconnect-full %s %s %s %s",
00195 mgr_->name(),
00196 src_->name(), pg->dst()->name(),
00197 ctcp_->name(), stcp_->name());
00198 }
00199 }
00200
00201 delete pg;
00202
00203
00204 if (++donePage_ >= nPage_) {
00205 mgr_->doneSession(id_);
00206 } else if (interPageOption_) {
00207 sched(rvInterPage_->value());
00208
00209 }
00210 }
00211
00212
00213 void EmpWebTrafSession::expire(Event *)
00214 {
00215
00216
00217 int n;
00218
00219 if (clientIdx_ < mgr()->nClientL_) n = 0 ;
00220 else
00221 n = int(ceil(serverSel()->value()));
00222
00223
00224 assert((n >= 0) && (n < mgr()->nSrc_));
00225 Node* dst = mgr()->server_[n];
00226
00227
00228
00229 EmpWebPage* pg = new EmpWebPage(LASTPAGE_++, this,
00230 (int)ceil(rvPageSize_->value()), dst, n);
00231
00232
00233 int opt = (int)ceil(this->persistSel()->value());
00234 pg->set_persistOption(opt);
00235
00236 if (mgr_->isdebug())
00237 printf("Session %d starting page %d, curpage %d \n",
00238 id_, LASTPAGE_-1, curPage_);
00239
00240 if (pg->persistOption_) {
00241
00242 mgr_->LASTFLOW_++;
00243
00244 int wins = int(ceil(serverWin()->value()));
00245 int winc = int(ceil(clientWin()->value()));
00246 int window = (wins >= winc) ? wins : winc;
00247
00248 int m = int(ceil(mtu()->value()));
00249
00250
00251 if (fulltcp_) {
00252 ctcp_ = mgr_->picktcp(window,m);
00253 stcp_ = mgr_->picktcp(window,m);
00254
00255 Tcl::instance().evalf("%s connect-full %s %s %s %s",
00256 mgr_->name(),
00257 src_->name(), pg->dst()->name(),
00258 ctcp_->name(), stcp_->name());
00259
00260 } else {
00261 ctcp_ = mgr_->picktcp(window,m);
00262 stcp_ = mgr_->picktcp(window,m);
00263 csnk_ = mgr_->picksink();
00264 ssnk_ = mgr_->picksink();
00265 }
00266
00267
00268 Tcl::instance().evalf("%s set-fid %d %s %s", mgr_->name(), mgr_->color_, ctcp_->name(), stcp_->name());
00269
00270 }
00271
00272 pg->start();
00273 }
00274
00275 void EmpWebTrafSession::handle(Event *e)
00276 {
00277
00278 TimerHandler::handle(e);
00279 ++curPage_;
00280
00281
00282
00283
00284 if (!interPageOption_) {
00285 if (curPage_ < nPage_) {
00286 sched(rvInterPage_->value());
00287
00288 }
00289 }
00290 }
00291
00292
00293 void EmpWebTrafSession::launchReq(void* ClntData, int obj, int size, int reqSize, int sid, int persist)
00294 {
00295
00296 TcpAgent* ctcp;
00297 TcpAgent* stcp;
00298 TcpSink* csnk;
00299 TcpSink* ssnk;
00300
00301 EmpWebPage* pg = (EmpWebPage*)ClntData;
00302
00303 if (persist) {
00304 if (mgr_->isdebug()) {
00305 printf("HTTP1.1\n");
00306 }
00307
00308
00309 ctcp = ctcp_;
00310 stcp = stcp_;
00311 if (fulltcp_) {
00312 csnk = 0;
00313 ssnk = 0;
00314 } else {
00315 csnk = csnk_;
00316 ssnk = ssnk_;
00317 }
00318
00319 } else {
00320 if (mgr_->isdebug()) {
00321 printf("HTTP1.0\n");
00322 }
00323
00324 mgr_->LASTFLOW_++;
00325
00326 int wins = int(ceil(serverWin()->value()));
00327 int winc = int(ceil(clientWin()->value()));
00328 int window = (wins >= winc) ? wins : winc;
00329
00330 int m = int(ceil(mtu()->value()));
00331
00332
00333
00334 ctcp = mgr_->picktcp(window,m);
00335 stcp = mgr_->picktcp(window,m);
00336 Tcl::instance().evalf("%s set-fid %d %s %s",
00337 mgr_->name(), mgr_->color_, ctcp->name(), stcp->name());
00338 if (fulltcp_) {
00339 csnk = 0;
00340 ssnk = 0;
00341 } else {
00342 csnk = mgr_->picksink();
00343 ssnk = mgr_->picksink();
00344 }
00345 }
00346
00347
00348
00349
00350 if (fulltcp_) {
00351 Tcl::instance().evalf("%s launch-req-full %d %d %s %s %s %s %d %d %d %d",
00352 mgr_->name(), obj, pg->id(),
00353 src_->name(), pg->dst()->name(),
00354 ctcp->name(),
00355 stcp->name(),
00356 size, reqSize, ClntData,persist);
00357 } else {
00358 assert (csnk != 0 && ssnk != 0);
00359 Tcl::instance().evalf("%s launch-req %d %d %s %s %s %s %s %s %d %d %d %d",
00360 mgr_->name(), obj, pg->id(),
00361 src_->name(), pg->dst()->name(),
00362 ctcp->name(), csnk->name(),
00363 stcp->name(), ssnk->name(),
00364 size, reqSize, ClntData,
00365 persist);
00366 }
00367
00368 if (mgr_->isdebug()) {
00369 printf("size=%d obj=%d page=%d sess=%d %g src=%d dst=%d\n", size, obj, pg->id(), id_, Scheduler::instance().clock(), src_->address(), pg->dst()->address());
00370 }
00371 }
00372
00373
00374 static class EmpWebTrafPoolClass : public TclClass {
00375 public:
00376 EmpWebTrafPoolClass() : TclClass("PagePool/EmpWebTraf") {}
00377 TclObject* create(int, const char*const*) {
00378 return (new EmpWebTrafPool());
00379 }
00380 } class_empwebtrafpool;
00381
00382 EmpWebTrafPool::~EmpWebTrafPool()
00383 {
00384 if (session_ != NULL) {
00385 for (int i = 0; i < nSession_; i++)
00386 delete session_[i];
00387 delete []session_;
00388 }
00389 if (server_ != NULL)
00390 delete []server_;
00391 if (client_ != NULL)
00392 delete []client_;
00393
00394 }
00395
00396 void EmpWebTrafPool::delay_bind_init_all()
00397 {
00398 delay_bind_init_one("debug_");
00399 PagePool::delay_bind_init_all();
00400 }
00401
00402 int EmpWebTrafPool::delay_bind_dispatch(const char *varName,const char *localName,
00403 TclObject *tracer)
00404 {
00405 if (delay_bind_bool(varName, localName, "debug_", &debug_, tracer))
00406 return TCL_OK;
00407 return PagePool::delay_bind_dispatch(varName, localName, tracer);
00408 }
00409
00410 EmpWebTrafPool::EmpWebTrafPool() :
00411 concurrentSess_(0), nSrc_(0), server_(NULL), session_(NULL), nClient_(0), client_(NULL), nTcp_(0), nSink_(0), fulltcp_(0)
00412 {
00413 bind("fulltcp_", &fulltcp_);
00414
00415 LIST_INIT(&tcpPool_);
00416 LIST_INIT(&sinkPool_);
00417 }
00418
00419 TcpAgent* EmpWebTrafPool::picktcp(int win, int mtu)
00420 {
00421
00422
00423 TcpAgent* a = (TcpAgent*)detachHead(&tcpPool_);
00424 if (a == NULL) {
00425 Tcl& tcl = Tcl::instance();
00426 tcl.evalf("%s alloc-tcp %d %d", name(), win, mtu);
00427 a = (TcpAgent*)lookup_obj(tcl.result());
00428 if (a == NULL) {
00429 fprintf(stderr, "Failed to allocate a TCP agent\n");
00430 abort();
00431 }
00432 } else
00433 nTcp_--;
00434 return a;
00435 }
00436
00437 TcpSink* EmpWebTrafPool::picksink()
00438 {
00439 TcpSink* a = (TcpSink*)detachHead(&sinkPool_);
00440 if (a == NULL) {
00441 Tcl& tcl = Tcl::instance();
00442 tcl.evalf("%s alloc-tcp-sink", name());
00443 a = (TcpSink*)lookup_obj(tcl.result());
00444 if (a == NULL) {
00445 fprintf(stderr, "Failed to allocate a TCP sink\n");
00446 abort();
00447 }
00448 } else
00449 nSink_--;
00450 return a;
00451 }
00452
00453 void EmpWebTrafPool::recycleTcp(Agent* a)
00454 {
00455 if (fulltcp_) {
00456 delete a;
00457 } else {
00458
00459 if (a == NULL) {
00460 fprintf(stderr, "Failed to recycle TCP agent\n");
00461 abort();
00462 }
00463 nTcp_++;
00464 insertAgent(&tcpPool_, a);
00465 }
00466 }
00467
00468 void EmpWebTrafPool::recycleSink(Agent* a)
00469 {
00470 if (fulltcp_) {
00471 delete a;
00472 } else {
00473
00474 if (a == NULL) {
00475 fprintf(stderr, "Failed to recycle Sink agent\n");
00476 abort();
00477 }
00478 nSink_++;
00479 insertAgent(&sinkPool_, a);
00480 }
00481 }
00482
00483 int EmpWebTrafPool::command(int argc, const char*const* argv)
00484 {
00485 if (argc == 3) {
00486 if (strcmp(argv[1], "set-num-session") == 0) {
00487 if (session_ != NULL) {
00488 for (int i = 0; i < nSession_; i++)
00489 delete session_[i];
00490 delete []session_;
00491 }
00492 nSession_ = atoi(argv[2]);
00493 session_ = new EmpWebTrafSession*[nSession_];
00494 memset(session_, 0, sizeof(EmpWebTrafSession*)*nSession_);
00495 return (TCL_OK);
00496 } else if (strcmp(argv[1], "set-num-server-lan") == 0) {
00497 nSrcL_ = atoi(argv[2]);
00498 if (nSrcL_ > nSrc_) {
00499 fprintf(stderr, "Wrong server index %d\n", nSrcL_);
00500 return TCL_ERROR;
00501 }
00502 return (TCL_OK);
00503 } else if (strcmp(argv[1], "set-num-remote-client") == 0) {
00504 nClientL_ = atoi(argv[2]);
00505 if (nClientL_ > nClient_) {
00506 fprintf(stderr, "Wrong client index %d\n", nClientL_);
00507 return TCL_ERROR;
00508 }
00509 return (TCL_OK);
00510 } else if (strcmp(argv[1], "set-num-server") == 0) {
00511 nSrc_ = atoi(argv[2]);
00512 if (server_ != NULL)
00513 delete []server_;
00514 server_ = new Node*[nSrc_];
00515 return (TCL_OK);
00516 } else if (strcmp(argv[1], "set-num-client") == 0) {
00517 nClient_ = atoi(argv[2]);
00518 if (client_ != NULL)
00519 delete []client_;
00520 client_ = new Node*[nClient_];
00521 return (TCL_OK);
00522 } else if (strcmp(argv[1], "set-interPageOption") == 0) {
00523 int option = atoi(argv[2]);
00524 if (session_ != NULL) {
00525 for (int i = 0; i < nSession_; i++) {
00526 EmpWebTrafSession* p = session_[i];
00527 p->set_interPageOption(option);
00528 }
00529 }
00530 return (TCL_OK);
00531 } else if (strcmp(argv[1], "doneObj") == 0) {
00532 EmpWebPage* p = (EmpWebPage*)atol(argv[2]);
00533
00534 p->doneObject();
00535
00536 return (TCL_OK);
00537 }
00538 } else if (argc == 4) {
00539 if (strcmp(argv[1], "set-server") == 0) {
00540 Node* cli = (Node*)lookup_obj(argv[3]);
00541 if (cli == NULL)
00542 return (TCL_ERROR);
00543 int nc = atoi(argv[2]);
00544 if (nc >= nSrc_) {
00545 fprintf(stderr, "Wrong server index %d\n", nc);
00546 return TCL_ERROR;
00547 }
00548 server_[nc] = cli;
00549 return (TCL_OK);
00550 } else if (strcmp(argv[1], "set-client") == 0) {
00551 Node* s = (Node*)lookup_obj(argv[3]);
00552 if (s == NULL)
00553 return (TCL_ERROR);
00554 int n = atoi(argv[2]);
00555 if (n >= nClient_) {
00556 fprintf(stderr, "Wrong client index %d\n", n);
00557 return TCL_ERROR;
00558 }
00559 client_[n] = s;
00560 return (TCL_OK);
00561 } else if (strcmp(argv[1], "recycle") == 0) {
00562
00563
00564
00565 Agent* tcp = (Agent*)lookup_obj(argv[2]);
00566 Agent* snk = (Agent*)lookup_obj(argv[3]);
00567 if ((tcp == NULL) || (snk == NULL))
00568 return (TCL_ERROR);
00569
00570 if (fulltcp_) {
00571 delete tcp;
00572 delete snk;
00573 } else {
00574 nTcp_++, nSink_++;
00575 insertAgent(&tcpPool_, tcp);
00576 insertAgent(&sinkPool_, snk);
00577 }
00578 return (TCL_OK);
00579 }
00580 } else if (argc == 17) {
00581 if (strcmp(argv[1], "create-session") == 0) {
00582
00583
00584
00585
00586
00587
00588
00589 int n = atoi(argv[2]);
00590 if ((n < 0)||(n >= nSession_)||(session_[n] != NULL)) {
00591 fprintf(stderr,"Invalid session index %d\n",n);
00592 return (TCL_ERROR);
00593 }
00594 int npg = (int)strtod(argv[3], NULL);
00595 double lt = strtod(argv[4], NULL);
00596
00597 int flip = atoi(argv[15]);
00598 if ((flip < 0)||(flip > 1)) {
00599 fprintf(stderr,"Invalid I/O flag %d\n",flip);
00600 return (TCL_ERROR);
00601 }
00602
00603
00604 color_ = atoi(argv[16]);
00605
00606 int cl;
00607 if (flip == 1)
00608 cl = int(floor(Random::uniform(0, nClientL_)));
00609 else
00610 cl = int(floor(Random::uniform(nClientL_, nClient_)));
00611
00612 assert((cl >= 0) && (cl < nClient_));
00613 Node* c=client_[cl];
00614
00615 EmpWebTrafSession* p =
00616 new EmpWebTrafSession(this, c, npg, n, nSrc_, cl,fulltcp_);
00617
00618 int res = lookup_rv(p->interPage(), argv[5]);
00619 res = (res == TCL_OK) ?
00620 lookup_rv(p->pageSize(), argv[6]) : TCL_ERROR;
00621 res = (res == TCL_OK) ?
00622 lookup_rv(p->interObj(), argv[7]) : TCL_ERROR;
00623 res = (res == TCL_OK) ?
00624 lookup_rv(p->objSize(), argv[8]) : TCL_ERROR;
00625 res = (res == TCL_OK) ?
00626 lookup_rv(p->reqSize(), argv[9]) : TCL_ERROR;
00627 res = (res == TCL_OK) ?
00628 lookup_rv(p->persistSel(), argv[10]) : TCL_ERROR;
00629 res = (res == TCL_OK) ?
00630 lookup_rv(p->serverSel(), argv[11]) : TCL_ERROR;
00631 res = (res == TCL_OK) ?
00632 lookup_rv(p->serverWin(), argv[12]) : TCL_ERROR;
00633 res = (res == TCL_OK) ?
00634 lookup_rv(p->clientWin(), argv[13]) : TCL_ERROR;
00635 res = (res == TCL_OK) ?
00636 lookup_rv(p->mtu(), argv[14]) : TCL_ERROR;
00637 if (res == TCL_ERROR) {
00638 delete p;
00639 fprintf(stderr, "Invalid random variable\n");
00640 return (TCL_ERROR);
00641 }
00642 p->sched(lt);
00643 session_[n] = p;
00644
00645
00646 return (TCL_OK);
00647 }
00648 }
00649 return PagePool::command(argc, argv);
00650 }
00651
00652