00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040
00041
00042
00043
00044
00045
00046
00047
00048
00049
00050
00051
00052
00053
00054 #include "config.h"
00055 #include <tclcl.h>
00056 #include <iostream>
00057
00058 #include "node.h"
00059 #include "pagepool.h"
00060 #include "webtraf.h"
00061
00062
00063
00064
00065
00066
00067
00068 class WebPage : public TimerHandler {
00069 public:
00070 WebPage(int id, WebTrafSession* sess, int nObj, Node* dst) :
00071 id_(id), sess_(sess), nObj_(nObj), curObj_(0), doneObj_(0),
00072 dst_(dst) {}
00073 virtual ~WebPage() {}
00074
00075 inline void start() {
00076
00077 status_ = TIMER_PENDING;
00078 handle(&event_);
00079 }
00080 inline int id() const { return id_; }
00081 Node* dst() { return dst_; }
00082
00083 void doneObject() {
00084 if (++doneObj_ >= nObj_) {
00085
00086 sess_->donePage((void*)this);
00087 }
00088 }
00089 inline int curObj() const { return curObj_; }
00090 inline int doneObj() const { return doneObj_; }
00091
00092 private:
00093 virtual void expire(Event* = 0) {
00094
00095 if (curObj_ >= nObj_)
00096 return;
00097 sess_->launchReq(this, LASTOBJ_++,
00098 (int)ceil(sess_->objSize()->value()));
00099 if (sess_->mgr()->isdebug())
00100 printf("Session %d launched page %d obj %d\n",
00101 sess_->id(), id_, curObj_);
00102 }
00103 virtual void handle(Event *e) {
00104
00105
00106
00107
00108
00109
00110
00111
00112
00113
00114 if (curObj_ < nObj_) {
00115
00116
00117 TimerHandler::handle(e);
00118 curObj_++;
00119
00120
00121
00122
00123
00124
00125 if (curObj_ < nObj_) sched(sess_->interObj()->value());
00126 }
00127 }
00128 int id_;
00129 WebTrafSession* sess_;
00130 int nObj_, curObj_, doneObj_;
00131 Node* dst_;
00132 static int LASTOBJ_;
00133 };
00134
00135 int WebPage::LASTOBJ_ = 1;
00136
00137 int WebTrafSession::LASTPAGE_ = 1;
00138
00139
00140 WebTrafSession::WebTrafSession(WebTrafPool *mgr, Node *src, int np, int id, int ftcp_, int recycle_p) :
00141 rvInterPage_(NULL), rvPageSize_(NULL),
00142 rvInterObj_(NULL), rvObjSize_(NULL),
00143 mgr_(mgr), src_(src), nPage_(np), curPage_(0), donePage_(0),
00144 id_(id), interPageOption_(1), fulltcp_(0) {
00145 fulltcp_ = ftcp_;
00146 recycle_page_ = recycle_p;
00147 }
00148
00149
00150 WebTrafSession::~WebTrafSession()
00151 {
00152 if (donePage_ != curPage_) {
00153 fprintf(stderr, "done pages %d != all pages %d\n",
00154 donePage_, curPage_);
00155 abort();
00156 }
00157 if (status_ != TIMER_IDLE) {
00158 fprintf(stderr, "WebTrafSession must be idle when deleted.\n");
00159 abort();
00160 }
00161
00162
00163
00164 if (recycle_page_) {
00165 if (rvInterPage_ != NULL)
00166 Tcl::instance().evalf("delete %s", rvInterPage_->name());
00167 if (rvPageSize_ != NULL)
00168 Tcl::instance().evalf("delete %s", rvPageSize_->name());
00169 if (rvInterObj_ != NULL)
00170 Tcl::instance().evalf("delete %s", rvInterObj_->name());
00171 if (rvObjSize_ != NULL)
00172 Tcl::instance().evalf("delete %s", rvObjSize_->name());
00173 }
00174 }
00175
00176 void WebTrafSession::donePage(void* ClntData)
00177 {
00178 WebPage* pg = (WebPage*)ClntData;
00179 if (mgr_->isdebug())
00180 printf("Session %d done page %d\n", id_, pg->id());
00181 if (pg->doneObj() != pg->curObj()) {
00182 fprintf(stderr, "done objects %d != all objects %d\n",
00183 pg->doneObj(), pg->curObj());
00184 abort();
00185 }
00186 delete pg;
00187
00188
00189 if (++donePage_ >= nPage_)
00190 mgr_->doneSession(id_);
00191 else if (interPageOption_) {
00192
00193
00194
00195 sched(rvInterPage_->value());
00196
00197 }
00198 }
00199
00200
00201 void WebTrafSession::expire(Event *)
00202 {
00203
00204 Node* dst = mgr_->pickdst();
00205
00206 WebPage* pg = new WebPage(LASTPAGE_++, this,
00207 (int)ceil(rvPageSize_->value()), dst);
00208 if (mgr_->isdebug())
00209 printf("Session %d starting page %d, curpage %d\n",
00210 id_, LASTPAGE_-1, curPage_);
00211 pg->start();
00212 }
00213
00214 void WebTrafSession::handle(Event *e)
00215 {
00216
00217 TimerHandler::handle(e);
00218 ++curPage_;
00219
00220
00221
00222
00223
00224
00225
00226
00227
00228 if (!interPageOption_) {
00229 if (curPage_ < nPage_) {
00230 sched(rvInterPage_->value());
00231
00232 }
00233 }
00234 }
00235
00236
00237 void WebTrafSession::launchReq(void* ClntData, int obj, int size) {
00238 mgr_->launchReq(src_, ClntData, obj, size);
00239 }
00240
00241 static class WebTrafPoolClass : public TclClass {
00242 public:
00243 WebTrafPoolClass() : TclClass("PagePool/WebTraf") {
00244
00245 }
00246 TclObject* create(int, const char*const*) {
00247 return (new WebTrafPool());
00248 }
00249 } class_webtrafpool;
00250
00251 WebTrafPool::~WebTrafPool()
00252 {
00253 if (session_ != NULL) {
00254 for (int i = 0; i < nSession_; i++)
00255 delete session_[i];
00256 delete []session_;
00257 }
00258 if (server_ != NULL)
00259 delete []server_;
00260 if (client_ != NULL)
00261 delete []client_;
00262
00263 }
00264
00265 void WebTrafPool::delay_bind_init_all()
00266 {
00267 delay_bind_init_one("debug_");
00268 PagePool::delay_bind_init_all();
00269 }
00270
00271 int WebTrafPool::delay_bind_dispatch(const char *varName,const char *localName,
00272 TclObject *tracer)
00273 {
00274 if (delay_bind_bool(varName, localName, "debug_", &debug_, tracer))
00275 return TCL_OK;
00276 return PagePool::delay_bind_dispatch(varName, localName, tracer);
00277 }
00278
00279
00280 WebTrafPool::WebTrafPool() :
00281 session_(NULL), nServer_(0), server_(NULL), nClient_(0), client_(NULL),
00282 nTcp_(0), nSink_(0), fulltcp_(0), recycle_page_(0)
00283 {
00284 bind("fulltcp_", &fulltcp_);
00285 bind("recycle_page_", &recycle_page_);
00286 bind("dont_recycle_", &dont_recycle_);
00287
00288 asimflag_=0;
00289 LIST_INIT(&tcpPool_);
00290 LIST_INIT(&sinkPool_);
00291 dbTcp_a = dbTcp_r = dbTcp_cr = 0;
00292 }
00293
00294 TcpAgent* WebTrafPool::picktcp()
00295 {
00296 TcpAgent* a = (TcpAgent*)detachHead(&tcpPool_);
00297 if (a == NULL) {
00298 Tcl& tcl = Tcl::instance();
00299 tcl.evalf("%s alloc-tcp", name());
00300 a = (TcpAgent*)lookup_obj(tcl.result());
00301 if (a == NULL) {
00302 fprintf(stderr, "Failed to allocate a TCP agent\n");
00303 abort();
00304 }
00305 } else
00306 nTcp_--;
00307
00308 return a;
00309 }
00310
00311 TcpSink* WebTrafPool::picksink()
00312 {
00313 TcpSink* a = (TcpSink*)detachHead(&sinkPool_);
00314 if (a == NULL) {
00315 Tcl& tcl = Tcl::instance();
00316 tcl.evalf("%s alloc-tcp-sink", name());
00317 a = (TcpSink*)lookup_obj(tcl.result());
00318 if (a == NULL) {
00319 fprintf(stderr, "Failed to allocate a TCP sink\n");
00320 abort();
00321 }
00322 } else
00323 nSink_--;
00324 return a;
00325 }
00326
00327 Node* WebTrafPool::picksrc() {
00328 int n = int(floor(Random::uniform(0, nClient_)));
00329 assert((n >= 0) && (n < nClient_));
00330 return client_[n];
00331 }
00332
00333 Node* WebTrafPool::pickdst() {
00334 int n = int(floor(Random::uniform(0, nServer_)));
00335 assert((n >= 0) && (n < nServer_));
00336 return(server_[n].get_node());
00337 }
00338
00339
00340 void WebTrafPool::pick_ep(TcpAgent** tcp, Agent** snk) {
00341
00342 *tcp = picktcp();
00343
00344
00345 if (fulltcp_) {
00346 *snk = picktcp();
00347 } else {
00348 *snk = picksink();
00349 }
00350 }
00351
00352
00353 void WebTrafPool::launchReq(Node *src_, void* ClntData, int obj, int size) {
00354 TcpAgent *ctcp;
00355 Agent *csnk;
00356
00357
00358 pick_ep(&ctcp, &csnk);
00359
00360 WebPage* pg = (WebPage*)ClntData;
00361
00362
00363 Tcl::instance().evalf("%s launch-req %d %d %s %s %s %s %d %d",
00364 name(), obj, pg->id(),
00365 src_->name(), pg->dst()->name(),
00366 ctcp->name(), csnk->name(), size, ClntData);
00367
00368
00369
00370 #if 0
00371 printf("%d \t %d \t %d \t %d \t %g %d %d\n", size, obj, pg->id(), id_,
00372 Scheduler::instance().clock(),
00373 src_->address(), pg->dst()->address());
00374 printf("** Tcp agents %d, Tcp sinks %d\n", nTcp(),nSink());
00375 #endif
00376 }
00377
00378
00379 void WebTrafPool::launchResp(int obj_id, Node *svr_, Node *clnt_, Agent *tcp, Agent* snk, int size, void *ClntData) {
00380 int pid;
00381 pid = obj_id;
00382
00383
00384 if (ClntData) {
00385 WebPage* pg = (WebPage*)ClntData;
00386 pid = pg->id();
00387 }
00388
00389
00390 Tcl::instance().evalf("%s launch-resp %d %d %s %s %s %s %d %d",
00391 name(), obj_id, pid, svr_->name(), clnt_->name(),
00392 tcp->name(), snk->name(), size, ClntData);
00393
00394
00395
00396 #if 0
00397 printf("%d \t %d \t %d \t %d \t %g %d %d\n", size, obj, pg->id(), id_,
00398 Scheduler::instance().clock(),
00399 src_->address(), pg->dst()->address());
00400 printf("** Tcp agents %d, Tcp sinks %d\n", nTcp(),nSink());
00401 #endif
00402 }
00403
00404
00405 int WebTrafPool::find_server(int sid) {
00406 int n = 0;
00407 while (server_[n].get_nid() != sid && n < nServer_) {
00408 n++;
00409 }
00410
00411 return(n);
00412 }
00413
00414 int WebTrafPool::command(int argc, const char*const* argv) {
00415
00416
00417 if (argc == 2){
00418 if (strcmp(argv[1], "use-asim") == 0) {
00419 asimflag_ = 1;
00420
00421 return (TCL_OK);
00422 }
00423 }
00424 else if (argc == 3) {
00425 if (strcmp(argv[1], "set-num-session") == 0) {
00426 if (session_ != NULL) {
00427 for (int i = 0; i < nSession_; i++)
00428 delete session_[i];
00429 delete []session_;
00430 }
00431 nSession_ = atoi(argv[2]);
00432 session_ = new WebTrafSession*[nSession_];
00433 memset(session_, 0, sizeof(WebTrafSession*)*nSession_);
00434 return (TCL_OK);
00435 } else if (strcmp(argv[1], "set-num-server") == 0) {
00436 nServer_ = atoi(argv[2]);
00437 if (server_ != NULL)
00438 delete []server_;
00439 server_ = new WebServer[nServer_];
00440 for (int i = 0; i < nServer_; i++) {
00441 server_[i] = WebServer(this);
00442 };
00443 return (TCL_OK);
00444 } else if (strcmp(argv[1], "set-num-client") == 0) {
00445 nClient_ = atoi(argv[2]);
00446 if (client_ != NULL)
00447 delete []client_;
00448 client_ = new Node*[nClient_];
00449 return (TCL_OK);
00450 } else if (strcmp(argv[1], "set-interPageOption") == 0) {
00451 int option = atoi(argv[2]);
00452 if (session_ != NULL) {
00453 for (int i = 0; i < nSession_; i++) {
00454 WebTrafSession* p = session_[i];
00455 p->set_interPageOption(option);
00456 }
00457 }
00458 return (TCL_OK);
00459 } else if (strcmp(argv[1], "doneObj") == 0) {
00460 WebPage* p = (WebPage*)atol(argv[2]);
00461
00462 p->doneObject();
00463 return (TCL_OK);
00464 } else if (strcmp(argv[1], "set-server-mode") == 0) {
00465
00466 int mode = atoi(argv[2]);
00467 for (int n = 0; n < nServer_; n++) {
00468 server_[n].set_mode(mode);
00469
00470 }
00471
00472 return (TCL_OK);
00473 } else if (strcmp(argv[1], "set-server-rate") == 0) {
00474
00475 int rate = atoi(argv[2]);
00476
00477 for (int n = 0; n < nServer_; n++) {
00478 server_[n].set_rate(rate);
00479 }
00480
00481 return (TCL_OK);
00482 } else if (strcmp(argv[1], "set-server-qlimit") == 0) {
00483
00484 int qlimit = atoi(argv[2]);
00485
00486 for (int n = 0; n < nServer_; n++) {
00487 server_[n].set_queue_limit(qlimit);
00488 }
00489
00490 return (TCL_OK);
00491 }
00492 } else if (argc == 4) {
00493 if (strcmp(argv[1], "set-server") == 0) {
00494 Node* s = (Node*)lookup_obj(argv[3]);
00495 if (s == NULL)
00496 return (TCL_ERROR);
00497 int n = atoi(argv[2]);
00498 if (n >= nServer_) {
00499 fprintf(stderr, "Wrong server index %d\n", n);
00500 return TCL_ERROR;
00501 }
00502 server_[n].set_node(s);
00503 return (TCL_OK);
00504 } else if (strcmp(argv[1], "set-client") == 0) {
00505 Node* c = (Node*)lookup_obj(argv[3]);
00506 if (c == NULL)
00507 return (TCL_ERROR);
00508 int n = atoi(argv[2]);
00509 if (n >= nClient_) {
00510 fprintf(stderr, "Wrong client index %d\n", n);
00511 return TCL_ERROR;
00512 }
00513 client_[n] = c;
00514 return (TCL_OK);
00515 } else if (strcmp(argv[1], "recycle") == 0) {
00516
00517
00518
00519 Agent* tcp = (Agent*)lookup_obj(argv[2]);
00520 Agent* snk = (Agent*)lookup_obj(argv[3]);
00521
00522 if ((tcp == NULL) || (snk == NULL))
00523 return (TCL_ERROR);
00524
00525 if (fulltcp_) {
00526 delete tcp;
00527 delete snk;
00528 } else if (!dont_recycle_) {
00529
00530
00531
00532
00533 nTcp_++;
00534
00535 insertAgent(&tcpPool_, tcp);
00536 nSink_++;
00537 insertAgent(&sinkPool_, snk);
00538
00539 }
00540
00541 return (TCL_OK);
00542 } else if (strcmp(argv[1], "set-server-rate") == 0) {
00543
00544 int sid = atoi(argv[2]);
00545 int rate = atoi(argv[3]);
00546
00547 int n = find_server(sid);
00548 if (n >= nServer_)
00549 return (TCL_ERROR);
00550
00551 server_[n].set_rate(rate);
00552 return (TCL_OK);
00553 } else if (strcmp(argv[1], "set-server-mode") == 0) {
00554
00555 int sid = atoi(argv[2]);
00556 int mode = atoi(argv[3]);
00557
00558 int n = find_server(sid);
00559 if (n >= nServer_)
00560 return (TCL_ERROR);
00561
00562 server_[n].set_mode(mode);
00563 return (TCL_OK);
00564 } else if (strcmp(argv[1], "set-server-qlimit") == 0) {
00565
00566 int sid = atoi(argv[2]);
00567 int qlimit = atoi(argv[3]);
00568
00569 int n = find_server(sid);
00570 if (n >= nServer_)
00571 return (TCL_ERROR);
00572
00573 server_[n].set_queue_limit(qlimit);
00574 return (TCL_OK);
00575 }
00576 } else if (argc == 9) {
00577 if (strcmp(argv[1], "create-session") == 0) {
00578
00579
00580
00581
00582 int n = atoi(argv[2]);
00583 if ((n < 0)||(n >= nSession_)||(session_[n] != NULL)) {
00584 fprintf(stderr,"Invalid session index %d\n",n);
00585 return (TCL_ERROR);
00586 }
00587 int npg = (int)strtod(argv[3], NULL);
00588 double lt = strtod(argv[4], NULL);
00589 WebTrafSession* p =
00590 new WebTrafSession(this, picksrc(), npg, n, fulltcp_, recycle_page_);
00591 int res = lookup_rv(p->interPage(), argv[5]);
00592 res = (res == TCL_OK) ?
00593 lookup_rv(p->pageSize(), argv[6]) : TCL_ERROR;
00594 res = (res == TCL_OK) ?
00595 lookup_rv(p->interObj(), argv[7]) : TCL_ERROR;
00596 res = (res == TCL_OK) ?
00597 lookup_rv(p->objSize(), argv[8]) : TCL_ERROR;
00598 if (res == TCL_ERROR) {
00599 delete p;
00600 fprintf(stderr, "Invalid random variable\n");
00601 return (TCL_ERROR);
00602 }
00603 p->sched(lt);
00604 session_[n] = p;
00605
00606
00607 if(asimflag_){
00608
00609
00610 double lambda = (1/(p->interPage())->avg())/(nServer_*nClient_);
00611 double mu = ((p->objSize())->value());
00612
00613 for (int i=0; i<nServer_; i++){
00614 for(int j=0; j<nClient_; j++){
00615
00616 Tcl::instance().evalf("%s add2asim %d %d %lf %lf", this->name(),server_[i].get_nid(),client_[j]->nodeid(),lambda, mu);
00617 }
00618 }
00619
00620 }
00621
00622 return (TCL_OK);
00623 } else if (strcmp(argv[1], "job_arrival") == 0) {
00624
00625 int obj_id = atoi(argv[2]);
00626 Node* clnt_ = (Node*)lookup_obj(argv[3]);
00627 Node* svr_ = (Node*)lookup_obj(argv[4]);
00628
00629 Agent* tcp = (Agent*)lookup_obj(argv[5]);
00630 Agent* snk = (Agent*)lookup_obj(argv[6]);
00631 int size = atoi(argv[7]);
00632 void* data = (void *)atol(argv[8]);
00633
00634 int sid = svr_->nodeid();
00635 int n = find_server(sid);
00636 if (n >= nServer_)
00637 return (TCL_ERROR);
00638
00639 double delay = server_[n].job_arrival(obj_id, clnt_, tcp, snk, size, data);
00640
00641 Tcl::instance().resultf("%f", delay);
00642 return (TCL_OK);
00643 }
00644 }
00645 return PagePool::command(argc, argv);
00646 }
00647