http.cc

Go to the documentation of this file.
00001 /* 
00002  * Copyright (c) Xerox Corporation 1998. All rights reserved.
00003  *
00004  * This program is free software; you can redistribute it and/or modify it
00005  * under the terms of the GNU General Public License as published by the
00006  * Free Software Foundation; either version 2 of the License, or (at your
00007  * option) any later version.
00008  *
00009  * This program is distributed in the hope that it will be useful, but
00010  * WITHOUT ANY WARRANTY; without even the implied warranty of
00011  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00012  * General Public License for more details.
00013  *
00014  * You should have received a copy of the GNU General Public License along
00015  * with this program; if not, write to the Free Software Foundation, Inc.,
00016  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
00017  *
00018  * Linking this file statically or dynamically with other modules is making
00019  * a combined work based on this file.  Thus, the terms and conditions of
00020  * the GNU General Public License cover the whole combination.
00021  *
00022  * In addition, as a special exception, the copyright holders of this file
00023  * give you permission to combine this file with free software programs or
00024  * libraries that are released under the GNU LGPL and with code included in
00025  * the standard release of ns-2 under the Apache 2.0 license or under
00026  * otherwise-compatible licenses with advertising requirements (or modified
00027  * versions of such code, with unchanged license).  You may copy and
00028  * distribute such a system following the terms of the GNU GPL for this
00029  * file and the licenses of the other code concerned, provided that you
00030  * include the source code of that other code when and as the GNU GPL
00031  * requires distribution of source code.
00032  *
00033  * Note that people who make modified versions of this file are not
00034  * obligated to grant this special exception for their modified versions;
00035  * it is their choice whether to do so.  The GNU General Public License
00036  * gives permission to release a modified version without this exception;
00037  * this exception also makes it possible to release a modified version
00038  * which carries forward this exception.
00039  *
00040  * $Header: /nfs/jade/vint/CVSROOT/ns-2/webcache/http.cc,v 1.21 2005/09/18 23:33:35 tomh Exp $
00041  *
00042  */
00043 //
00044 // Implementation of the HTTP agent. We want a separate agent for HTTP because
00045 // we are interested in (detailed) HTTP headers, instead of just request and 
00046 // response patterns.
00047 //
00048 
00049 #include <stdlib.h>
00050 #include <assert.h>
00051 #include <string.h>
00052 #include <stdarg.h>
00053 
00054 #include "tclcl.h"
00055 #include "agent.h"
00056 #include "app.h"
00057 #include "tcp-simple.h"
00058 #include "http.h"
00059 #include "http-aux.h"
00060 #include "trace.h"
00061 #include "tcpapp.h"
00062 #include "mcache.h"
00063 
00064 //----------------------------------------------------------------------
00065 // Http Application
00066 //
00067 // Allows multiple concurrent HTTP connections
00068 //----------------------------------------------------------------------
00069 static class HttpAppClass : public TclClass {
00070 public:
00071         HttpAppClass() : TclClass("Http") {}
00072         TclObject* create(int, const char*const*) {
00073         return (new HttpApp());
00074     }
00075 } class_http_app;
00076 
00077 // What states should be in a http agent?
00078 HttpApp::HttpApp() : log_(0)
00079 {
00080     bind("id_", &id_);
00081     // Map a client address to a particular TCP agent
00082     tpa_ = new Tcl_HashTable;
00083     Tcl_InitHashTable(tpa_, TCL_ONE_WORD_KEYS);
00084 }
00085 
00086 HttpApp::~HttpApp()
00087 {
00088     if (tpa_ != NULL) {
00089         Tcl_DeleteHashTable(tpa_);
00090         delete tpa_;
00091     }
00092 }
00093 
00094 int HttpApp::add_cnc(HttpApp* client, TcpApp *agt)
00095 {
00096     int newEntry = 1;
00097     long key = client->id();
00098     Tcl_HashEntry *he = Tcl_CreateHashEntry(tpa_, 
00099                         (const char *) key,
00100                         &newEntry);
00101     if (he == NULL) 
00102         return -1;
00103     if (newEntry)
00104         Tcl_SetHashValue(he, (ClientData)agt);
00105     return 0;
00106 }
00107 
00108 void HttpApp::delete_cnc(HttpApp* client)
00109 {
00110         long key = client->id();
00111     Tcl_HashEntry *he = Tcl_FindHashEntry(tpa_,(const char *)key);
00112     if (he != NULL) {
00113         TcpApp *cnc = (TcpApp *)Tcl_GetHashValue(he);
00114         Tcl_DeleteHashEntry(he);
00115         delete cnc;
00116     }
00117 }
00118 
00119 TcpApp* HttpApp::lookup_cnc(HttpApp* client)
00120 {
00121         long key = client->id();
00122     Tcl_HashEntry *he = 
00123         Tcl_FindHashEntry(tpa_, (const char *)key);
00124     if (he == NULL)
00125         return NULL;
00126     return (TcpApp *)Tcl_GetHashValue(he);
00127 }
00128 
00129 // Basic functionalities: 
00130 int HttpApp::command(int argc, const char*const* argv)
00131 {
00132     Tcl& tcl = Tcl::instance();
00133 
00134     if (argc == 2) {
00135         if (strcmp(argv[1], "id") == 0) {
00136             if (argc == 3) {
00137                 id_ = atoi(argv[2]);
00138                 tcl.resultf("%d", id_);
00139             } else
00140                 tcl.resultf("%d", id_);
00141             return TCL_OK;
00142         } else if (strcmp(argv[1], "log") == 0) {
00143             // Return the name of the log channel
00144             if (log_ != NULL)
00145                 tcl.resultf("%s", Tcl_GetChannelName(log_));
00146             else
00147                 tcl.result("");
00148             return TCL_OK;
00149         }
00150     } else if (argc == 3) {
00151         if (strcmp(argv[1], "get-modtime") == 0) {
00152             double mt;
00153             if (pool_->get_mtime(argv[2], mt) != -1) {
00154                 tcl.resultf("%.17g", mt);
00155                 return TCL_OK;
00156             } else 
00157                 return TCL_ERROR;
00158         } else if (strcmp(argv[1], "exist-page") == 0) { 
00159             tcl.resultf("%d", pool_->exist_page(argv[2]));
00160             return TCL_OK;
00161         } else if (strcmp(argv[1], "get-size") == 0) {
00162             int size;
00163             if (pool_->get_size(argv[2], size) != -1) {
00164                 tcl.resultf("%d", size);
00165                 return TCL_OK;
00166             } else 
00167                 return TCL_ERROR;
00168         } else if (strcmp(argv[1], "get-age") == 0) {
00169             double age;
00170             if (pool_->get_age(argv[2], age) != -1) {
00171                 tcl.resultf("%.17g", age);
00172                 return TCL_OK;
00173             } else 
00174                 return TCL_ERROR;
00175         } else if (strcmp(argv[1], "get-cachetime") == 0) {
00176             double et;
00177             if (pool_->get_etime(argv[2], et) != -1) {
00178                 tcl.resultf("%.17g", et);
00179                 return TCL_OK;
00180             } else 
00181                 return TCL_ERROR;
00182         } else if (strcmp(argv[1], "get-page") == 0) {
00183             char buf[4096];
00184             if (pool_->get_pageinfo(argv[2], buf) != -1) {
00185                 tcl.resultf("%s", buf);
00186                 return TCL_OK;
00187             } else 
00188                 return TCL_ERROR;
00189         } else if (strcmp(argv[1], "get-cnc") == 0) {
00190             /*
00191              * <http> get-cnc <client>
00192              *
00193              * Given the communication party, get the tcp agent 
00194              * connected to it.
00195              */
00196             HttpApp *client = 
00197                 (HttpApp *)TclObject::lookup(argv[2]);
00198             TcpApp *cnc = (TcpApp *)lookup_cnc(client);
00199             if (cnc == NULL)
00200                 tcl.result("");
00201             else 
00202                 tcl.resultf("%s", cnc->name());
00203             return TCL_OK;
00204 
00205         } else if (strcmp(argv[1], "set-pagepool") == 0) {
00206             pool_ = (ClientPagePool*)TclObject::lookup(argv[2]);
00207             if (pool_ != NULL) 
00208                 return TCL_OK;
00209             else 
00210                 return TCL_ERROR;
00211         } else if (strcmp(argv[1], "is-connected") == 0) {
00212             /*
00213              * <http> is-connected <server>
00214              */
00215             HttpApp *a = (HttpApp*)TclObject::lookup(argv[2]);
00216             TcpApp *cnc = (TcpApp*)lookup_cnc(a);
00217             if (cnc == NULL) 
00218                 tcl.result("0");
00219             else 
00220                 tcl.result("1");
00221             return TCL_OK;
00222         } else if (strcmp(argv[1], "is-valid") == 0) {
00223             ClientPage *pg = 
00224                 (ClientPage *)pool_->get_page(argv[2]);
00225             if (pg == NULL) {
00226                 tcl.resultf("%d is-valid: No page %s", 
00227                         id_, argv[2]);
00228                 return TCL_ERROR;
00229             }
00230             tcl.resultf("%d", pg->is_valid());
00231             return TCL_OK;
00232         } else if (strcmp(argv[1], "log") == 0) {
00233             int mode;
00234             log_ = Tcl_GetChannel(tcl.interp(), 
00235                           (char*)argv[2], &mode);
00236             if (log_ == 0) {
00237                 tcl.resultf("%d: invalid log file handle %s\n",
00238                         id_, argv[2]);
00239                 return TCL_ERROR;
00240             }
00241             return TCL_OK;
00242         } else if (strcmp(argv[1], "disconnect") == 0) {
00243             /*
00244              * <http> disconnect <client> 
00245              * Delete the association of source and sink TCP.
00246              */
00247             HttpApp *client = 
00248                 (HttpApp *)TclObject::lookup(argv[2]);
00249             delete_cnc(client);
00250             return TCL_OK;
00251         } else if (strcmp(argv[1], "get-pagetype") == 0) {
00252             /*
00253              * <http> get-pagetype <pageid>
00254              * return the page type
00255              */
00256             ClientPage *pg = 
00257                 (ClientPage*)pool_->get_page(argv[2]);
00258             if (pg == NULL) {
00259                 tcl.resultf("%d get-pagetype: No page %s", 
00260                         id_, argv[2]);
00261                 return TCL_ERROR;
00262             }
00263             switch (pg->type()) {
00264             case HTML:
00265                 tcl.result("HTML");
00266                 break;
00267             case MEDIA:
00268                 tcl.result("MEDIA");
00269                 break;
00270             default:
00271                 fprintf(stderr, "Unknown page type %d", 
00272                     pg->type());
00273                 return TCL_ERROR;
00274             }
00275             return TCL_OK;
00276         } else if (strcmp(argv[1], "get-layer") == 0) {
00277             // Assume the page is a MediaPage
00278             MediaPage *pg = (MediaPage *)pool_->get_page(argv[2]);
00279             if (pg == NULL) {
00280                 tcl.resultf("%d get-layer: No page %s", 
00281                         id_, argv[2]);
00282                 return TCL_ERROR;
00283             }
00284             if (pg->type() != MEDIA) {
00285                 tcl.resultf("%d get-layer %s not a media page",
00286                         id_, argv[2]);
00287                 return TCL_ERROR;
00288             }
00289             tcl.resultf("%d", pg->num_layer());
00290             return TCL_OK;
00291         }
00292     } else if (argc == 4) {
00293         if (strcmp(argv[1], "connect") == 0) {
00294             /*
00295              * <http> connect <client> <ts>
00296              *
00297              * Associate a TCP agent with the given client. 
00298              * <ts> is the agent used to send packets out.
00299              * We assume two-way TCP connection, therefore we 
00300              * only need one agent.
00301              */
00302             HttpApp *client = 
00303                 (HttpApp *)TclObject::lookup(argv[2]);
00304             TcpApp *cnc = (TcpApp *)TclObject::lookup(argv[3]);
00305             if (add_cnc(client, cnc)) {
00306                 tcl.resultf("%s: failed to connect to %s", 
00307                         name_, argv[2]);
00308                 return TCL_ERROR;
00309             }
00310             // Set data delivery target
00311             cnc->target() = (Process*)this;
00312             return TCL_OK;
00313         } else if (strcmp(argv[1], "set-modtime") == 0) {
00314             double mt = strtod(argv[3], NULL);
00315             if (pool_->set_mtime(argv[2], mt) != -1)
00316                 return TCL_OK;
00317             else 
00318                 return TCL_ERROR;
00319         } else if (strcmp(argv[1], "set-cachetime") == 0) {
00320             double et = Scheduler::instance().clock();
00321             if (pool_->set_etime(argv[2], et) != -1)
00322                 return TCL_OK;
00323             else 
00324                 return TCL_ERROR;
00325         }
00326     } else {
00327         if (strcmp(argv[1], "send") == 0) {
00328             /*
00329              * <http> send <client> <bytes> <callback> 
00330              */
00331             HttpApp *client = 
00332                 (HttpApp *)TclObject::lookup(argv[2]);
00333             if (client == NULL) {
00334                 tcl.add_errorf("%s: bad client name %s",
00335                            name_, argv[2]);
00336                 return TCL_ERROR;
00337             }
00338             int bytes = atoi(argv[3]);
00339             TcpApp *cnc = (TcpApp *)lookup_cnc(client);
00340             if (cnc == NULL) {
00341                 //tcl.resultf("%s: no connection to client %s",
00342                 //      name_, argv[2]);
00343                 // Tolerate it
00344                 return TCL_OK;
00345             }
00346             char *buf = strdup(argv[4]);
00347             HttpNormalData *d = 
00348                 new HttpNormalData(id_, bytes, buf);
00349             cnc->send(bytes, d);
00350             // delete d;
00351             free(buf);
00352             return TCL_OK;
00353         
00354         } else if (strcmp(argv[1], "enter-page") == 0) {
00355             ClientPage* pg = pool_->enter_page(argc, argv);
00356             if (pg == NULL)
00357                 return TCL_ERROR;
00358             else 
00359                 return TCL_OK;
00360 
00361         } else if (strcmp(argv[1], "evTrace") == 0) { 
00362             char buf[1024], *p;
00363             if (log_ != 0) {
00364                 sprintf(buf, TIME_FORMAT" i %d ", 
00365                   BaseTrace::round(Scheduler::instance().clock()), 
00366                     id_);
00367                 p = &(buf[strlen(buf)]);
00368                 for (int i = 2; i < argc; i++) {
00369                     strcpy(p, argv[i]);
00370                     p += strlen(argv[i]);
00371                     *(p++) = ' ';
00372                 }
00373                 // Stick in a newline.
00374                 *(p++) = '\n', *p = 0;
00375                 Tcl_Write(log_, buf, p-buf);
00376             }
00377             return TCL_OK;
00378         }
00379     }
00380 
00381     return TclObject::command(argc, argv);
00382 }
00383 
00384 void HttpApp::log(const char* fmt, ...)
00385 {
00386     // Don't do anything if we don't have a log file.
00387     if (log_ == 0) 
00388         return;
00389 
00390     char buf[10240], *p;
00391     sprintf(buf, TIME_FORMAT" i %d ", 
00392         BaseTrace::round(Scheduler::instance().clock()), id_);
00393     p = &(buf[strlen(buf)]);
00394     va_list ap;
00395     va_start(ap, fmt);
00396     vsprintf(p, fmt, ap);
00397     Tcl_Write(log_, buf, strlen(buf));
00398 }
00399 
00400 void HttpApp::process_data(int, AppData* data)
00401 {
00402     if (data == NULL) 
00403         return;
00404 
00405     switch (data->type()) {
00406     case HTTP_NORMAL: {
00407         HttpNormalData *tmp = (HttpNormalData*)data;
00408         Tcl::instance().eval(tmp->str());
00409         break;
00410     }
00411     default:
00412         fprintf(stderr, "Bad http invalidation data type %d\n", 
00413             data->type());
00414         abort();
00415         break;
00416     }
00417 }
00418 
00419 
00420 
00421 //----------------------------------------------------------------------
00422 // Clients
00423 //----------------------------------------------------------------------
00424 static class HttpClientClass : public TclClass {
00425 public:
00426     HttpClientClass() : TclClass("Http/Client") {}
00427         TclObject* create(int, const char*const*) {
00428         return (new HttpClient());
00429     }
00430 } class_httpclient_app;
00431 
00432 
00433 
00434 //----------------------------------------------------------------------
00435 // Servers
00436 //----------------------------------------------------------------------
00437 static class HttpServerClass : public TclClass {
00438 public:
00439         HttpServerClass() : TclClass("Http/Server") {}
00440         TclObject* create(int, const char*const*) {
00441         return (new HttpServer());
00442     }
00443 } class_httpserver_app;
00444 
00445 static class HttpInvalServerClass : public TclClass {
00446 public:
00447         HttpInvalServerClass() : TclClass("Http/Server/Inval") {}
00448         TclObject* create(int, const char*const*) {
00449         return (new HttpInvalServer());
00450     }
00451 } class_httpinvalserver_app;
00452 
00453 static class HttpYucInvalServerClass : public TclClass {
00454 public:
00455         HttpYucInvalServerClass() : TclClass("Http/Server/Inval/Yuc") {}
00456         TclObject* create(int, const char*const*) {
00457         return (new HttpYucInvalServer());
00458     }
00459 } class_httpyucinvalserver_app;
00460 
00461 HttpYucInvalServer::HttpYucInvalServer() :
00462     inv_sender_(0), invlist_(0), num_inv_(0)
00463 {
00464     bind("hb_interval_", &hb_interval_);
00465     bind("enable_upd_", &enable_upd_);
00466     bind("Ca_", &Ca_);
00467     bind("Cb_", &Cb_);
00468     bind("push_thresh_", &push_thresh_);
00469     bind("push_high_bound_", &push_high_bound_);
00470     bind("push_low_bound_", &push_low_bound_);
00471 }
00472 
00473 int HttpYucInvalServer::command(int argc, const char*const* argv)
00474 {
00475     Tcl& tcl = Tcl::instance();
00476 
00477     switch (argv[1][0]) {
00478     case 'a': 
00479         if (strcmp(argv[1], "add-inval-sender") == 0) {
00480             HttpUInvalAgent *tmp = 
00481                 (HttpUInvalAgent *)TclObject::lookup(argv[2]);
00482             if (tmp == NULL) {
00483                 tcl.resultf("Non-existent agent %s", argv[2]);
00484                 return TCL_ERROR;
00485             }
00486             inv_sender_ = tmp;
00487             return TCL_OK;
00488         } if (strcmp(argv[1], "add-inv") == 0) {
00489             /*
00490              * <server> add-inv <pageid> <modtime>
00491              */
00492             double mtime = strtod(argv[3], NULL);
00493             add_inv(argv[2], mtime);
00494             return TCL_OK;
00495         }
00496         break;
00497     case 'c': 
00498         if (strcmp(argv[1], "count-request") == 0) {
00499             ClientPage *pg = 
00500                 (ClientPage *)pool_->get_page(argv[2]);
00501             if (pg == NULL) {
00502                 tcl.resultf("%d count-request: No page %s", 
00503                         id_, argv[2]);
00504                 return TCL_ERROR;
00505             }
00506             pg->count_request(Cb_, push_high_bound_);
00507             log("S NTF p %s v %d\n", argv[2], pg->counter());
00508             return TCL_OK;
00509         } else if (strcmp(argv[1], "count-inval") == 0) {
00510             ClientPage *pg = 
00511                 (ClientPage *)pool_->get_page(argv[2]);
00512             if (pg == NULL) {
00513                 tcl.resultf("%d count-inval: No page %s", 
00514                         id_, argv[2]);
00515                 return TCL_ERROR;
00516             }
00517             pg->count_inval(Ca_, push_low_bound_);
00518             log("S NTF p %s v %d\n", argv[2], pg->counter());
00519             return TCL_OK;
00520         } 
00521         break;
00522     case 'i': 
00523         if (strcmp(argv[1], "is-pushable") == 0) {
00524             ClientPage *pg = 
00525                 (ClientPage *)pool_->get_page(argv[2]);
00526             if (pg == NULL) {
00527                 tcl.resultf("%d is-pushable: No page %s", 
00528                         id_, argv[2]);
00529                 return TCL_ERROR;
00530             }
00531             if (pg->is_mpush() && 
00532                 (Scheduler::instance().clock() - pg->mpush_time() >
00533                  HTTP_HBEXPIRE_COUNT*hb_interval_)) {
00534                 // If mandatory push timer expires, stop push
00535                 pg->clear_mpush();
00536                 fprintf(stderr, 
00537                     "server %d timeout mpush\n", id_);
00538             }
00539             tcl.resultf("%d", (enable_upd_ && 
00540                        (pg->counter() >= push_thresh_) ||
00541                        pg->is_mpush()));
00542             return TCL_OK;
00543         }
00544         break;
00545     case 'r': 
00546         if ((strcmp(argv[1], "request-mpush") == 0) ||
00547             (strcmp(argv[1], "refresh-mpush") == 0)) {
00548             ClientPage *pg = 
00549                 (ClientPage *)pool_->get_page(argv[2]);
00550             if (pg == NULL) {
00551                 tcl.resultf("%d is-valid: No page %s", 
00552                         id_, argv[2]);
00553             return TCL_ERROR;
00554             }
00555             pg->set_mpush(Scheduler::instance().clock());
00556             return TCL_OK;
00557         } 
00558         break;
00559     case 's': 
00560         if (strcmp(argv[1], "send-hb") == 0) {
00561             send_heartbeat();
00562             return TCL_OK;
00563         } else if (strcmp(argv[1], "stop-mpush") == 0) {
00564             ClientPage *pg = 
00565                 (ClientPage *)pool_->get_page(argv[2]);
00566             if (pg == NULL) {
00567                 tcl.resultf("%d is-valid: No page %s", 
00568                         id_, argv[2]);
00569                 return TCL_ERROR;
00570             }
00571         pg->clear_mpush();
00572         fprintf(stderr, "server %d stopped mpush\n", id_);
00573         return TCL_OK;
00574         }
00575         break;
00576     }
00577 
00578     return HttpApp::command(argc, argv);
00579 }
00580 
00581 void HttpYucInvalServer::add_inv(const char *name, double mtime)
00582 {
00583     InvalidationRec *p = get_invrec(name);
00584     if ((p != NULL) && (p->mtime() < mtime)) {
00585         p->detach();
00586         delete p;
00587         p = NULL;
00588         num_inv_--;
00589     } 
00590     if (p == NULL) {
00591         p = new InvalidationRec(name, mtime);
00592         p->insert(&invlist_);
00593         num_inv_++;
00594     }
00595 }
00596 
00597 InvalidationRec* HttpYucInvalServer::get_invrec(const char *name)
00598 {
00599     // XXX What should we do if we already have an
00600     // invalidation record of this page in our 
00601     // invlist_? --> We should replace it with the new one
00602     InvalidationRec *r = invlist_;
00603     for (r = invlist_; r != NULL; r = r->next())
00604         if (strcmp(name, r->pg()) == 0)
00605             return r;
00606     return NULL;
00607 }
00608 
00609 HttpHbData* HttpYucInvalServer::pack_heartbeat()
00610 {
00611     HttpHbData *data = new HttpHbData(id_, num_inv_);
00612     InvalidationRec *p = invlist_, *q;
00613     int i = 0;
00614     while (p != NULL) {
00615         data->add(i++, p);
00616         // Clearing up invalidation sending list
00617         if (!p->dec_scount()) {
00618             // Each invalidation is sent to its children
00619             // for at most HTTP_HBEXPIRE times. After that 
00620             // the invalidation record is removed from 
00621             // the list
00622             q = p;
00623             p = p->next();
00624             q->detach();
00625             delete q;
00626             num_inv_--;
00627         } else 
00628             p = p->next();
00629     }
00630     return data;
00631 }
00632 
00633 void HttpYucInvalServer::send_hb_helper(int size, AppData *data)
00634 {
00635     inv_sender_->send(size, data);
00636 }
00637 
00638 void HttpYucInvalServer::send_heartbeat()
00639 {
00640     if (inv_sender_ == NULL)
00641         return;
00642 
00643     HttpHbData* d = pack_heartbeat();
00644     send_hb_helper(d->cost(), d);
00645 }
00646 
00647 
00648 
00649 
00650 //----------------------------------------------------------------------
00651 // Http cache with invalidation protocols. Http/Cache and Http/Cache/Inval
00652 // are used as base classes and provide common TCL methods. Http/Cache 
00653 // derives Http/Cache/TTL and Http/Cache/TTL/Old. Http/Cache/Inval derives
00654 // unicast invalidation and multicast invalidation.
00655 //----------------------------------------------------------------------
00656 
00657 static class HttpCacheClass : public TclClass {
00658 public:
00659         HttpCacheClass() : TclClass("Http/Cache") {}
00660         TclObject* create(int, const char*const*) {
00661         return (new HttpCache());
00662     }
00663 } class_httpcache_app;
00664 
00665 static class HttpInvalCacheClass : public TclClass {
00666 public:
00667         HttpInvalCacheClass() : TclClass("Http/Cache/Inval") {}
00668         TclObject* create(int, const char*const*) {
00669         return (new HttpInvalCache());
00670     }
00671 } class_httpinvalcache_app;
00672 
00673 static class HttpMInvalCacheClass : public TclClass {
00674 public:
00675         HttpMInvalCacheClass() : TclClass("Http/Cache/Inval/Mcast") {}
00676         TclObject* create(int, const char*const*) {
00677         return (new HttpMInvalCache());
00678     }
00679 } class_HttpMInvalCache_app;
00680 
00681 // Static members and functions
00682 HttpMInvalCache** HttpMInvalCache::CacheRepository_ = NULL;
00683 int HttpMInvalCache::NumCache_ = 0;
00684 
00685 void HttpMInvalCache::add_cache(HttpMInvalCache *c)
00686 {
00687     if (CacheRepository_ == NULL) {
00688         CacheRepository_ = new HttpMInvalCache* [c->id() + 1];
00689         CacheRepository_[c->id()] = c;
00690         NumCache_ = c->id();
00691     } else if (NumCache_ < c->id()) {
00692         HttpMInvalCache** p = new HttpMInvalCache* [c->id()+1];
00693         memcpy(p, CacheRepository_, 
00694                (c->id()+1)*sizeof(HttpMInvalCache*));
00695         delete[]CacheRepository_;
00696         CacheRepository_ = p;
00697         NumCache_ = c->id();
00698         p[c->id()] = c;
00699     } else
00700         CacheRepository_[c->id()] = c;
00701 }
00702 
00703 HttpMInvalCache::HttpMInvalCache() : 
00704     hb_timer_(this, HTTP_HBINTERVAL),
00705     inv_sender_(0), num_sender_(0), size_sender_(0), 
00706     invlist_(0), num_inv_(0), inv_parent_(NULL),
00707     upd_sender_(NULL), num_updater_(0), size_updater_(0)
00708 {
00709     bind("hb_interval_", &hb_interval_);
00710     bind("enable_upd_", &enable_upd_);  // If we allow push
00711     bind("Ca_", &Ca_);
00712     bind("Cb_", &Cb_);
00713     bind("push_thresh_", &push_thresh_);
00714     bind("push_high_bound_", &push_high_bound_);
00715     bind("push_low_bound_", &push_low_bound_);
00716 
00717     hb_timer_.set_interval(hb_interval_);
00718     Tcl_InitHashTable(&sstate_, TCL_ONE_WORD_KEYS);
00719     Tcl_InitHashTable(&nbr_, TCL_ONE_WORD_KEYS);
00720 }
00721 
00722 HttpMInvalCache::~HttpMInvalCache() 
00723 {
00724     if (num_sender_ > 0) 
00725         delete []inv_sender_;
00726     Tcl_DeleteHashTable(&sstate_);
00727     Tcl_DeleteHashTable(&nbr_);
00728 }
00729 
00730 int HttpMInvalCache::command(int argc, const char*const* argv)
00731 {
00732     Tcl& tcl = Tcl::instance();
00733     if (argc < 2) 
00734         return HttpInvalCache::command(argc, argv);
00735 
00736     switch (argv[1][0]) {
00737     case 'a':
00738         if ((strcmp(argv[1], "add-inval-listener") == 0) ||
00739             (strcmp(argv[1], "add-upd-listener") == 0)) {
00740             HttpInvalAgent *tmp = 
00741                 (HttpInvalAgent *)TclObject::lookup(argv[2]);
00742             tmp->attachApp((Application *)this);
00743             return TCL_OK;
00744         } else if (strcmp(argv[1], "add-inval-sender") == 0) {
00745             HttpInvalAgent *tmp = 
00746                 (HttpInvalAgent *)TclObject::lookup(argv[2]);
00747             if (tmp == NULL) {
00748                 tcl.resultf("Non-existent agent %s", argv[2]);
00749                 return TCL_ERROR;
00750             }
00751             if (num_sender_ == size_sender_) {
00752                 HttpInvalAgent **tt = 
00753                     new HttpInvalAgent*[size_sender_+5];
00754                 memcpy(tt, inv_sender_, 
00755                        sizeof(HttpInvalAgent*)*size_sender_);
00756                 delete []inv_sender_;
00757                 size_sender_ += 5;
00758                 inv_sender_ = tt;
00759             }
00760             inv_sender_[num_sender_++] = tmp;
00761             return TCL_OK;
00762         } else if (strcmp(argv[1], "add-to-map") == 0) {
00763             add_cache(this);
00764             return TCL_OK;
00765         } else if (strcmp(argv[1], "add-upd-sender") == 0) {
00766             HttpInvalAgent *tmp = 
00767                 (HttpInvalAgent *)TclObject::lookup(argv[2]);
00768             if (tmp == NULL) {
00769                 tcl.resultf("Non-existent agent %s", argv[2]);
00770                 return TCL_ERROR;
00771             }
00772             if (num_updater_ == size_updater_) {
00773                 HttpInvalAgent **tt = 
00774                     new HttpInvalAgent*[size_updater_+5];
00775                 memcpy(tt, upd_sender_, 
00776                        sizeof(HttpInvalAgent*)*size_updater_);
00777                 delete []upd_sender_;
00778                 size_updater_ += 5;
00779                 upd_sender_ = tt;
00780             }
00781             upd_sender_[num_updater_++] = tmp;
00782             return TCL_OK;
00783         }
00784         break;
00785 
00786     case 'c':
00787         if (strcmp(argv[1], "count-request") == 0) {
00788             ClientPage *pg = 
00789                 (ClientPage *)pool_->get_page(argv[2]);
00790             if (pg == NULL) {
00791                 tcl.resultf("%d count-request: No page %s", 
00792                         id_, argv[2]);
00793                 return TCL_ERROR;
00794             }
00795             pg->count_request(Cb_, push_high_bound_);
00796             log("E NTF p %s v %d\n", argv[2], pg->counter());
00797             return TCL_OK;
00798         } else if (strcmp(argv[1], "check-sstate") == 0) {
00799             /*
00800              * <cache> check-sstate <sid> <cid>
00801              * If server is re-connected, reinstate it
00802              */
00803             int sid = atoi(argv[2]);
00804             int cid = atoi(argv[3]);
00805             check_sstate(sid, cid);
00806             return TCL_OK;
00807         }
00808         break;
00809 
00810     case 'i':
00811         // XXX We don't need a "is-pushable" for cache!
00812         if (strcmp(argv[1], "is-unread") == 0) {
00813             ClientPage *pg = 
00814                 (ClientPage *)pool_->get_page(argv[2]);
00815             if (pg == NULL) {
00816                 tcl.resultf("%d is-unread: No page %s", 
00817                         id_, argv[2]);
00818                 return TCL_ERROR;
00819             }
00820             tcl.resultf("%d", pg->is_unread());
00821             return TCL_OK;
00822         }
00823         break;
00824 
00825     case 'j':
00826         if (strcmp(argv[1], "join") == 0) {
00827             /*
00828              * <cache> join <server_id> <cache>
00829              *
00830              * <server> join via <cache>. If they are the same,
00831              * it means we are the primary cache for <server>.
00832              */
00833             int sid = atoi(argv[2]);
00834             HttpMInvalCache *cache = 
00835                 (HttpMInvalCache*)TclObject::lookup(argv[3]);
00836             if (cache == NULL) {
00837                 tcl.add_errorf("Non-existent cache %s", argv[3]);
00838                 return TCL_ERROR;
00839             }
00840             // Add neighbor cache if necessary
00841             NeighborCache *c = lookup_nbr(cache->id());
00842             if (c == NULL)
00843                 add_nbr(cache);
00844             // Establish server invalidation contract
00845             check_sstate(sid, cache->id());
00846             return TCL_OK;
00847         }
00848         break;
00849 
00850     case 'p':
00851         if (strcmp(argv[1], "parent-cache") == 0) {
00852             /*
00853              * <cache> parent-cache <web_server_id>
00854              * Return the parent cache of <web_server_id> in the 
00855              * virtual distribution tree. 
00856              */
00857             int sid = atoi(argv[2]);
00858             SState *sst = lookup_sstate(sid);
00859             if (sst == NULL)
00860                 tcl.result("");
00861             else {
00862                 // Bad hack... :(
00863         NeighborCache *c = lookup_nbr(sst->cache()->cache()->id());
00864                 tcl.resultf("%s", c->cache()->name());
00865             }
00866             return TCL_OK;
00867         } else if (strcmp(argv[1], "push-children") == 0) {
00868             // Multicast the pushed page to all children
00869             ClientPage *pg = 
00870                 (ClientPage *)pool_->get_page(argv[2]);
00871             if (pg == NULL) {
00872                 tcl.resultf("%d is-valid: No page %s", 
00873                         id_, argv[2]);
00874                 return TCL_ERROR;
00875             }
00876             send_upd(pg);
00877             return TCL_OK;
00878         }
00879         break;
00880 
00881     case 'r':
00882         if (strcmp(argv[1], "recv-inv") == 0) {
00883             /*
00884              * <cache> recv-inv <pageid> <modtime>
00885              * This should be called only by a web server, 
00886              * therefore we do not check the validity of the 
00887              * invalidation
00888              */
00889             // Pack it into a HttpHbData, and process it
00890             HttpHbData *d = new HttpHbData(id_, 1);
00891             strcpy(d->rec_pg(0), argv[2]);
00892             d->rec_mtime(0) = strtod(argv[3], NULL);
00893             //int old_inv = num_inv_;
00894             tcl.resultf("%d", recv_inv(d));
00895             delete d;
00896             return TCL_OK;
00897         } else if (strcmp(argv[1], "recv-push") == 0) {
00898             /* 
00899              * <cache> recv-push <pageid> args
00900              */
00901             HttpUpdateData *d = new HttpUpdateData(id_, 1);
00902             strcpy(d->rec_page(0), argv[2]);
00903             for (int i = 3; i < argc; i+=2) {
00904                 if (strcmp(argv[i], "modtime") == 0)
00905                   d->rec_mtime(0) = strtod(argv[i+1], NULL);
00906                 else if (strcmp(argv[i], "size") == 0) {
00907                   d->rec_size(0) = atoi(argv[i+1]);
00908                   // XXX need to set total update page size
00909                   d->set_pgsize(d->rec_size(0));
00910                 } else if (strcmp(argv[i], "age") == 0)
00911                   d->rec_age(0) = strtod(argv[i+1], NULL);
00912             }
00913             tcl.resultf("%d", recv_upd(d));
00914             delete d;
00915             return TCL_OK;
00916         } else if (strcmp(argv[1], "register-server") == 0) {
00917             /*
00918              * <self> register-server <cache_id> <server_id>
00919              * We get a GET response about a page from <server>, 
00920              * which we hear from <cache> 
00921              */
00922             int cid = atoi(argv[2]);
00923             int sid = atoi(argv[3]);
00924             // Assuming we've already known the cache
00925             check_sstate(sid, cid);
00926             return TCL_OK;
00927         }
00928         break;
00929 
00930     case 's':
00931         if (strcmp(argv[1], "start-hbtimer") == 0) {
00932             if (hb_timer_.status() == TIMER_IDLE)
00933                 hb_timer_.sched();
00934             return TCL_OK;
00935         } else if (strcmp(argv[1], "server-hb") == 0) {
00936             int id = atoi(argv[2]);
00937             recv_heartbeat(id);
00938             return TCL_OK;
00939         } else if (strcmp(argv[1], "set-pinv-agent") == 0) {
00940             inv_parent_ = 
00941                 (HttpUInvalAgent*)TclObject::lookup(argv[2]);
00942             return TCL_OK;
00943         } else if (strcmp(argv[1], "set-parent") == 0) {
00944             HttpMInvalCache *c = 
00945                 (HttpMInvalCache*)TclObject::lookup(argv[2]);
00946             if (c == NULL) {
00947                 tcl.add_errorf("Non-existent cache %s", argv[2]);
00948                 return TCL_ERROR;
00949             }
00950             // Add parent cache into known cache list
00951             add_nbr(c);
00952             return TCL_OK;
00953         } else if (strcmp(argv[1], "set-unread") == 0) {
00954             ClientPage *pg = 
00955                 (ClientPage *)pool_->get_page(argv[2]);
00956             if (pg == NULL) {
00957                 tcl.resultf("%d is-valid: No page %s", 
00958                         id_, argv[2]);
00959                 return TCL_ERROR;
00960             }
00961             pg->set_unread();
00962             return TCL_OK;
00963         } else if (strcmp(argv[1], "set-read") == 0) {
00964             ClientPage *pg = 
00965                 (ClientPage *)pool_->get_page(argv[2]);
00966             if (pg == NULL) {
00967                 tcl.resultf("%d is-valid: No page %s", 
00968                         id_, argv[2]);
00969                 return TCL_ERROR;
00970             }
00971             pg->set_read();
00972             return TCL_OK;
00973         } else if (strcmp(argv[1], "set-mandatory-push") == 0) { 
00974             ClientPage *pg = 
00975                 (ClientPage *)pool_->get_page(argv[2]);
00976             if (pg == NULL) {
00977                 tcl.resultf("%d is-valid: No page %s", 
00978                         id_, argv[2]);
00979                 return TCL_ERROR;
00980             }
00981             pg->set_mpush(Scheduler::instance().clock());
00982             return TCL_OK;
00983         } else if (strcmp(argv[1], "stop-mpush") == 0) {
00984             ClientPage *pg = 
00985                 (ClientPage *)pool_->get_page(argv[2]);
00986             if (pg == NULL) {
00987                 tcl.resultf("%d is-valid: No page %s", 
00988                         id_, argv[2]);
00989                 return TCL_ERROR;
00990             }
00991             pg->clear_mpush();
00992             return TCL_OK;
00993         }
00994         break;
00995 
00996     default:
00997         break;
00998     }
00999     return HttpInvalCache::command(argc, argv);
01000 }
01001 
01002 void HttpMInvalCache::check_sstate(int sid, int cid)
01003 {
01004     if ((sid == cid) && (cid == id_))
01005         // How come?
01006         return;
01007     SState *sst = lookup_sstate(sid);
01008     NeighborCache *c = lookup_nbr(cid);
01009     if (sst == NULL) {
01010         if (c == NULL) {
01011             fprintf(stderr, 
01012 "%g: cache %d: No neighbor cache for received invalidation from %d via %d\n", 
01013                 Scheduler::instance().clock(), id_, sid, cid);
01014             abort();
01015         }
01016 #ifdef WEBCACHE_DEBUG
01017         fprintf(stderr,
01018             "%g: cache %d: registered server %d via cache %d\n",
01019             Scheduler::instance().clock(), id_, sid, cid);
01020 #endif
01021         sst = new SState(c);
01022         add_sstate(sid, sst);
01023         c->add_server(sid);
01024     } else if (sst->is_down()) {
01025         sst->up();
01026         if (cid != id_) {
01027             if (c == NULL) {
01028                 fprintf(stderr, 
01029  "[%g]: Cache %d has an invalid neighbor cache %d\n",
01030  Scheduler::instance().clock(), id_, cid);
01031                 abort();
01032             }
01033             c->server_up(sid);
01034         }
01035 #ifdef WEBCACHE_DEBUG
01036         fprintf(stderr, 
01037         "[%g] Cache %d reconnected to server %d via cache %d\n", 
01038             Scheduler::instance().clock(), id_, 
01039             sid, cid);
01040 #endif
01041         Tcl::instance().evalf("%s mark-rejoin", name_);
01042     }
01043 }
01044 
01045 void HttpMInvalCache::add_sstate(int sid, SState *sst)
01046 {
01047     int newEntry = 1;
01048     long key = sid;
01049     Tcl_HashEntry *he = 
01050         Tcl_CreateHashEntry(&sstate_, (const char *)key, &newEntry);
01051     if (he == NULL) 
01052         return;
01053     if (newEntry)
01054         Tcl_SetHashValue(he, (ClientData)sst);
01055 }
01056 
01057 HttpMInvalCache::SState* HttpMInvalCache::lookup_sstate(int sid)
01058 {
01059         long key = sid;
01060     Tcl_HashEntry *he = Tcl_FindHashEntry(&sstate_, (const char *)key);
01061     if (he == NULL)
01062         return NULL;
01063     return (SState *)Tcl_GetHashValue(he);
01064 }
01065 
01066 NeighborCache* HttpMInvalCache::lookup_nbr(int id)
01067 {
01068         long key = id;
01069     Tcl_HashEntry *he = Tcl_FindHashEntry(&nbr_, (const char *)key);
01070     if (he == NULL)
01071         return NULL;
01072     return (NeighborCache *)Tcl_GetHashValue(he);
01073 }
01074 
01075 // Add a new neighbor cache
01076 void HttpMInvalCache::add_nbr(HttpMInvalCache *cache)
01077 {
01078     int newEntry = 1;
01079     long key = cache->id ();
01080     Tcl_HashEntry *he = 
01081         Tcl_CreateHashEntry(&nbr_, (const char *)key, 
01082                     &newEntry);
01083     if (he == NULL) 
01084         return;
01085     // If this cache already exists, don't do anything
01086     if (!newEntry)
01087         return;
01088 
01089     // Start a timer for the neighbor
01090     LivenessTimer *timer = 
01091         new LivenessTimer(this,HTTP_HBEXPIRE_COUNT*hb_interval_,
01092                   cache->id());
01093 
01094     double time = Scheduler::instance().clock();
01095     NeighborCache *c = new NeighborCache(cache, time, timer);
01096     Tcl_SetHashValue(he, (ClientData)c);
01097 }
01098 
01099 // Two ways to receive a heartbeat: (1) via HttpInvalAgent; (2) via TCP 
01100 // connection between a server and a primary cache. (See "server-hb" handling
01101 // in command().
01102 void HttpMInvalCache::recv_heartbeat(int id)
01103 {
01104     // Receive time of the heartbeat
01105     double time = Scheduler::instance().clock();
01106 
01107     NeighborCache *c = lookup_nbr(id);
01108     if (c == NULL) {
01109         // XXX
01110         // The only possible place for this to happen is in the TLC
01111         // group, where no JOIN could ever reach. Moreover, 
01112         // we don't even have an entry for that cache yet, so here
01113         // we add that cache into our entry, and later on we'll add
01114         // corresponding servers there.
01115         if (id == id_) 
01116             return;
01117         add_nbr(map_cache(id));
01118 #ifdef WEBCACHE_DEBUG
01119         fprintf(stderr, "TLC %d discovered TLC %d\n", id_, id);
01120 #endif
01121         return;
01122     } else if (c->is_down()) {
01123         // Neighbor cache recovers. Don't do anything special and
01124         // let invalid entries recover themselves
01125         c->up();
01126 #ifdef WEBCACHE_DEBUG
01127         fprintf(stderr, "[%g] Cache %d reconnected to cache %d\n", 
01128             Scheduler::instance().clock(), id_, id);
01129 #endif
01130         Tcl::instance().evalf("%s mark-rejoin", name_);
01131     } else
01132         // Update heartbeat time
01133         c->reset_timer(time);
01134 }
01135 
01136 void HttpMInvalCache::invalidate_server(int sid)
01137 {
01138     SState *sst = lookup_sstate(sid);
01139     if (sst->is_down())
01140         // If this server is already marked down, return
01141         return;
01142     sst->down();
01143     pool_->invalidate_server(sid);
01144 }
01145 
01146 void HttpMInvalCache::handle_node_failure(int cid)
01147 {
01148 #ifdef WEBCACHE_DEBUG
01149     fprintf(stderr, "[%g] Cache %d disconnected from cache %d\n", 
01150         Scheduler::instance().clock(), id_, cid);
01151 #endif
01152     Tcl::instance().evalf("%s mark-leave", name_);
01153 
01154     NeighborCache *c = lookup_nbr(cid);
01155     if (c == NULL) {
01156         fprintf(stderr, "%s: An unknown neighbor cache %d failed.\n",
01157             name_, cid);
01158     }
01159     // Mark the cache down
01160     c->down();
01161     // Invalidate entries of all servers related to that cache
01162     // XXX We don't have an iterator for all servers in NeighborCache!
01163     c->invalidate(this);
01164 
01165     // Send leave message to all children
01166     HttpLeaveData* data = new HttpLeaveData(id_, c->num());
01167     c->pack_leave(*data);
01168     send_leave(data);
01169 }
01170 
01171 void HttpMInvalCache::recv_leave(HttpLeaveData *d)
01172 {
01173 #ifdef WEBCACHE_DEBUG
01174     fprintf(stderr, "[%g] Cache %d gets a LEAVE from cache %d\n", 
01175         Scheduler::instance().clock(), id_, d->id());
01176 #endif
01177 
01178     if (d->num() == 0) {
01179         fprintf(stderr, 
01180             "%s (%g) gets a leave from cache without server!\n", 
01181             name_, Scheduler::instance().clock());
01182         return;
01183     }
01184 
01185     SState *sst;
01186     HttpLeaveData* data = new HttpLeaveData(id_, d->num());
01187     NeighborCache *c = lookup_nbr(d->id());
01188     int i, j;
01189     for (i = 0, j = 0; i < d->num(); i++) {
01190         sst = lookup_sstate(d->rec_id(i));
01191 
01192         // If we haven't heard of that server, which means we don't 
01193         // have any page of that server, ignore the leave message.
01194         if (sst == NULL) 
01195             continue;
01196         // If it's already marked down, don't bother again.
01197         if (sst->is_down()) 
01198             continue;
01199         // If we hear a LEAVE about a server from one of 
01200         // our child in the virtual distribution tree 
01201         // of the server, ignore it.
01202         if (c != sst->cache()) 
01203             continue;
01204 
01205         // We have the page, and we hold inval contract. Invalidate 
01206         // the page and inform our children of it.
01207         sst->down();
01208         data->add(j++, d->rec_id(i));
01209         pool_->invalidate_server(d->rec_id(i));
01210         Tcl::instance().evalf("%s mark-leave", name_);
01211     }
01212     // Delete it if it's not sent out 
01213     if (j > 0)
01214         send_leave(data);
01215     delete data;
01216 }
01217 
01218 void HttpMInvalCache::send_leave(HttpLeaveData *d)
01219 {
01220     send_hb_helper(d->cost(), d);
01221 }
01222 
01223 void HttpMInvalCache::timeout(int reason)
01224 {
01225     switch (reason) {
01226     case HTTP_INVALIDATION:
01227         // Send an invalidation message
01228         send_heartbeat();
01229         break;
01230     case HTTP_UPDATE:
01231         // XXX do nothing. May put client selective joining update
01232         // group here.
01233         break;
01234     default:
01235         fprintf(stderr, "%s: Unknown reason %d", name_, reason);
01236         break;
01237     }
01238 }
01239 
01240 void HttpMInvalCache::process_data(int size, AppData* data)
01241 {
01242     if (data == NULL)
01243         return;
01244 
01245     switch (data->type()) {
01246     case HTTP_INVALIDATION: {
01247         // Update timer for the source of the heartbeat
01248         HttpHbData *inv = (HttpHbData*)data;
01249         recv_heartbeat(inv->id());
01250         recv_inv(inv);
01251         break;
01252     }
01253     case HTTP_UPDATE: {
01254         // Replace all updated pages
01255         HttpUpdateData *pg = (HttpUpdateData*)data;
01256         recv_upd(pg);
01257         break;
01258     }
01259     // JOIN messages are sent via TCP and direct TCL callback.
01260     case HTTP_LEAVE: {
01261         HttpLeaveData *l = (HttpLeaveData*)data;
01262         recv_leave(l);
01263         break;
01264     }
01265     default:
01266         HttpApp::process_data(size, data);
01267         return;
01268     }
01269 }
01270 
01271 void HttpMInvalCache::add_inv(const char *name, double mtime)
01272 {
01273     InvalidationRec *p = get_invrec(name);
01274     if ((p != NULL) && (p->mtime() < mtime)) {
01275         p->detach();
01276         delete p;
01277         p = NULL;
01278         num_inv_--;
01279     } 
01280     if (p == NULL) {
01281         p = new InvalidationRec(name, mtime);
01282         p->insert(&invlist_);
01283         num_inv_++;
01284     }
01285 }
01286 
01287 InvalidationRec* HttpMInvalCache::get_invrec(const char *name)
01288 {
01289     // XXX What should we do if we already have an
01290     // invalidation record of this page in our 
01291     // invlist_? --> We should replace it with the new one
01292     InvalidationRec *r = invlist_;
01293     for (r = invlist_; r != NULL; r = r->next())
01294         if (strcmp(name, r->pg()) == 0)
01295             return r;
01296     return NULL;
01297 }
01298 
01299 HttpHbData* HttpMInvalCache::pack_heartbeat()
01300 {
01301     HttpHbData *data = new HttpHbData(id_, num_inv_);
01302     InvalidationRec *p = invlist_, *q;
01303     int i = 0;
01304     while (p != NULL) {
01305         data->add(i++, p);
01306         // Clearing up invalidation sending list
01307         if (!p->dec_scount()) {
01308             // Each invalidation is sent to its children
01309             // for at most HTTP_HBEXPIRE times. After that 
01310             // the invalidation record is removed from 
01311             // the list
01312             q = p;
01313             p = p->next();
01314             q->detach();
01315             delete q;
01316             num_inv_--;
01317         } else 
01318             p = p->next();
01319     }
01320     return data;
01321 }
01322 
01323 int HttpMInvalCache::recv_inv(HttpHbData *data)
01324 {
01325     if (data->num_inv() == 0)
01326         return 0;
01327 
01328     InvalidationRec *head;
01329     data->extract(head);
01330     int old_inv = num_inv_;
01331     process_inv(data->num_inv(), head, data->id());
01332     //log("E GINV z %d\n", data->size());
01333     if (old_inv < num_inv_) 
01334         // This invalidation is valid
01335         return 1;
01336     else 
01337         return 0;
01338 }
01339 
01340 // Get an invalidation, check invalidation modtimes, then setup 
01341 // invalidation forwarding entries
01342 // The input invalidation record list is destroyed.
01343 void HttpMInvalCache::process_inv(int, InvalidationRec *ivlist, int cache)
01344 {
01345     InvalidationRec *p = ivlist, *q, *r;
01346     //int upd = 0;
01347     while (p != NULL) {
01348         ClientPage* pg = (ClientPage *)pool_->get_page(p->pg());
01349 
01350         // XXX Establish server states. Server states only gets 
01351         // established when we have a page (no matter if we have its
01352         // content), and we have got an invalidation for the page. 
01353         // Then we know we've got an invalidation contract for the 
01354         // page.
01355         if (pg != NULL) {
01356             check_sstate(pg->server()->id(), cache);
01357             // Count this invalidation no matter whether we're
01358             // going to drop it. But if we doesn't get it 
01359             // from our virtual parent, don't count it
01360             SState *sst = lookup_sstate(pg->server()->id());
01361             if (sst == NULL) {
01362                 // How come we doesn't know the server???
01363                 fprintf(stderr, 
01364                     "%s %d: couldn't find the server.\n", 
01365                     __FILE__, __LINE__);
01366                 abort();
01367             }
01368             if ((sst->cache()->cache()->id() == cache) && 
01369                 (pg->mtime() > p->mtime())) {
01370                 // Don't count repeated invalidations.
01371                 pg->count_inval(Ca_, push_low_bound_);
01372                 log("E NTF p %s v %d\n",p->pg(),pg->counter());
01373             }
01374         }
01375 
01376         // Hook for filters of derived classes
01377         if (recv_inv_filter(pg, p) == HTTP_INVALCACHE_FILTERED) {
01378             // If we do not have the page, or we have (or know 
01379             // about) a newer page, ignore this invalidation 
01380             // record and keep going.
01381             //
01382             // If we have this version of the page, and it's 
01383             // already invalid, ignore this extra invalidation
01384             q = p;
01385             p = p->next();
01386             q->detach();
01387             delete q;
01388         } else {
01389             // Otherwise we invalidate our page and setup a 
01390             // invalidation sending record for the page
01391             pg->invalidate(p->mtime());
01392             // Delete existing record for that page if any
01393             q = get_invrec(p->pg());
01394             if ((q != NULL) && (q->mtime() < p->mtime())) {
01395                 q->detach();
01396                 delete q;
01397                 q = NULL;
01398                 num_inv_--;
01399             }
01400             r = p; 
01401             p = p->next();
01402             r->detach();
01403             // Insert it if necessary
01404             if (q == NULL) {
01405                 r->insert(&invlist_);
01406                 num_inv_++;
01407                 // XXX
01408                 Tcl::instance().evalf("%s mark-invalid",name_);
01409                 log("E GINV p %s m %.17g\n", r->pg(), r->mtime());
01410             } else
01411                 delete r;
01412         }
01413     }
01414 }
01415 
01416 void HttpMInvalCache::send_hb_helper(int size, AppData *data)
01417 {
01418     if (inv_parent_ != NULL) 
01419         inv_parent_->send(size, data->copy());
01420     for (int i = 0; i < num_sender_; i++)
01421         inv_sender_[i]->send(size, data->copy());
01422 }
01423 
01424 void HttpMInvalCache::send_heartbeat()
01425 {
01426     if ((num_sender_ == 0) && (inv_parent_ == NULL))
01427         return;
01428 
01429     HttpHbData* d = pack_heartbeat();
01430     send_hb_helper(d->cost(), d);
01431     delete d;
01432 }
01433 
01434 int HttpMInvalCache::recv_upd(HttpUpdateData *d)
01435 {
01436     if (d->num() != 1) {
01437         fprintf(stderr, 
01438             "%d gets an update which contain !=1 pages.\n", id_);
01439         abort();
01440     }
01441 
01442     ClientPage *pg = pool_->get_page(d->rec_page(0));
01443     if (pg != NULL) 
01444         if (pg->mtime() >= d->rec_mtime(0)) {
01445             // If we've already had this version, or a newer 
01446             // version, ignore this old push
01447 //          fprintf(stderr, "[%g] %d gets an old push\n", 
01448 //              Scheduler::instance().clock(), id_);
01449 //          log("E OLD m %g p %g\n", d->rec_mtime(0), pg->mtime());
01450             return 0;
01451         } else {
01452             // Our old page is invalidated by this new push,
01453             // set up invalidation records for our children
01454             add_inv(d->rec_page(0), d->rec_mtime(0));
01455             pg->count_inval(Ca_, push_low_bound_);
01456             log("E NTF p %s v %d\n", d->rec_page(0),pg->counter());
01457         }
01458 
01459     // Add the new page into our pool
01460     ClientPage *q = pool_->enter_page(d->rec_page(0), d->rec_size(0), 
01461                       d->rec_mtime(0),
01462                       Scheduler::instance().clock(),
01463                       d->rec_age(0));
01464     // By default the page is valid and read. Set it as unread
01465     q->set_unread();
01466 
01467     log("E GUPD m %.17g z %d\n", d->rec_mtime(0), d->pgsize());
01468     Tcl::instance().evalf("%s mark-valid", name_);
01469 
01470     // XXX If the page was previously marked as MandatoryPush, then
01471     // we need to check if it's timed out
01472     if (q->is_mpush() && (Scheduler::instance().clock() - q->mpush_time()
01473                   > HTTP_HBEXPIRE_COUNT*hb_interval_)) {
01474         // If mandatory push timer expires, stop push
01475         q->clear_mpush();
01476         Tcl::instance().evalf("%s cancel-mpush-refresh %s", 
01477                       name_, d->rec_page(0));
01478     }
01479 
01480     if (enable_upd_ && (q->counter() >= push_thresh_) || q->is_mpush())
01481         // XXX Continue pushing if we either select to push, or 
01482         // were instructed to do so.
01483         return 1;
01484     else 
01485         return 0;
01486 }
01487 
01488 HttpUpdateData* HttpMInvalCache::pack_upd(ClientPage* page)
01489 {
01490     HttpUpdateData *data = new HttpUpdateData(id_, 1);
01491     data->add(0, page);
01492     return data;
01493 }
01494 
01495 void HttpMInvalCache::send_upd_helper(int pgsize, AppData* data)
01496 {
01497     for (int i = 0; i < num_updater_; i++)
01498         upd_sender_[i]->send(pgsize, data->copy());
01499 }
01500 
01501 void HttpMInvalCache::send_upd(ClientPage *page)
01502 {
01503     if ((num_updater_ == 0) || !enable_upd_) 
01504         return;
01505 
01506     HttpUpdateData* d = pack_upd(page);
01507     send_upd_helper(d->pgsize(), d);
01508     delete d;
01509 }
01510 
01511 
01512 //----------------------------------------------------------------------
01513 // Multicast invalidation + two way liveness messages + 
01514 // invalidation filtering. 
01515 //----------------------------------------------------------------------
01516 static class HttpPercInvalCacheClass : public TclClass {
01517 public:
01518         HttpPercInvalCacheClass() : TclClass("Http/Cache/Inval/Mcast/Perc") {}
01519         TclObject* create(int, const char*const*) {
01520         return (new HttpPercInvalCache());
01521     }
01522 } class_HttpPercInvalCache_app;
01523 
01524 HttpPercInvalCache::HttpPercInvalCache() 
01525 {
01526     bind("direct_request_", &direct_request_);
01527 }
01528 
01529 int HttpPercInvalCache::command(int argc, const char*const* argv)
01530 {
01531     Tcl& tcl = Tcl::instance();
01532 
01533     if (strcmp(argv[1], "is-header-valid") == 0) {
01534         ClientPage *pg = 
01535             (ClientPage *)pool_->get_page(argv[2]);
01536         if (pg == NULL) {
01537             tcl.resultf("%d is-valid: No page %s", 
01538                     id_, argv[2]);
01539             return TCL_ERROR;
01540         }
01541         tcl.resultf("%d", pg->is_header_valid());
01542         return TCL_OK;
01543     } else if (strcmp(argv[1], "enter-metadata") == 0) {
01544         /* 
01545          * <cache> enter-metadata <args...>
01546          * The same arguments as enter-page, but set the page status
01547          * as HTTP_VALID_HEADER, i.e., if we get a request, we need 
01548          * to fetch the actual valid page content
01549          */
01550         ClientPage *pg = pool_->enter_metadata(argc, argv);
01551         if (pg == NULL)
01552             return TCL_ERROR;
01553         else
01554             return TCL_OK;
01555     }
01556 
01557     return HttpMInvalCache::command(argc, argv);
01558 }

Generated on Tue Mar 6 16:47:45 2007 for ns2 Network Simulator 2.29 by  doxygen 1.4.6