00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040
00041
00042
00043
00044
00045
00046
00047
00048
00049
00050
00051 #include <stdlib.h>
00052 #include "diff_sink.h"
00053 #include "diffusion.h"
00054 #include "diff_rate.h"
00055 #include "hash_table.h"
00056 #include "agent.h"
00057 #include "packet.h"
00058 #include "tclcl.h"
00059 #include "random.h"
00060 #include "god.h"
00061
00062 #define REPORT_PERIOD 1.0
00063
00064 extern char* MsgStr[];
00065
00066 void Report_Timer::expire(Event *) {
00067 a_->report();
00068 }
00069
00070
00071 void Sink_Timer::expire(Event *) {
00072 a_->timeout(0);
00073 }
00074
00075 void Periodic_Timer::expire(Event *) {
00076 a_->bcast_interest();
00077 }
00078
00079 static class SinkClass : public TclClass {
00080 public:
00081 SinkClass() : TclClass("Agent/Diff_Sink") {}
00082 TclObject* create(int , const char*const* ) {
00083 return(new SinkAgent());
00084 }
00085 } class_sink;
00086
00087
00088 SinkAgent::SinkAgent() : Agent(PT_DIFF), data_type_(0),
00089 running_(0), random_(0), sink_timer_(this), periodic_timer_(this),
00090 report_timer_(this)
00091 {
00092
00093
00094 APP_DUP_ = true;
00095
00096 periodic_ = true;
00097 always_max_rate_ = false;
00098
00099
00100
00101 bind("data_type_", &data_type_);
00102 bind_time("interval_", &interval_);
00103 bind("packetSize_", &size_);
00104 bind("random_", &random_);
00105 bind("maxpkts_", &maxpkts_);
00106
00107
00108
00109
00110 pk_count=0;
00111 num_recv=0;
00112 num_send=0;
00113 RecvPerSec=0;
00114
00115 cum_delay=0.0;
00116
00117 data_counter = 0;
00118 simple_report_rate = ORIGINAL;
00119
00120 last_arrival_time = -1.0;
00121 }
00122
00123 void SinkAgent::start()
00124 {
00125 running_ = 1;
00126 sendpkt();
00127 sink_timer_.resched(interval_);
00128 }
00129
00130 void SinkAgent::stop()
00131 {
00132 if (running_) {
00133 running_ = 0;
00134 }
00135
00136 if (periodic_ == true) {
00137 periodic_ = false;
00138 periodic_timer_.force_cancel();
00139 }
00140 }
00141
00142
00143 void SinkAgent::report()
00144 {
00145
00146 report_timer_.resched(REPORT_PERIOD);
00147 RecvPerSec = 0;
00148 }
00149
00150
00151 void SinkAgent::timeout(int)
00152 {
00153 if (running_) {
00154 sendpkt();
00155 double t = interval_;
00156 if (random_)
00157
00158 t += interval_ * Random::uniform(-0.5, 0.5);
00159 sink_timer_.resched(t);
00160 }
00161 }
00162
00163
00164 void SinkAgent::sendpkt()
00165 {
00166 if (pk_count >= maxpkts_) {
00167 running_ = 0;
00168 return;
00169 }
00170
00171 Packet* pkt = create_packet();
00172 hdr_cdiff* dfh = HDR_CDIFF(pkt);
00173 hdr_ip* iph = HDR_IP(pkt);
00174 hdr_cmn* cmh = HDR_CMN(pkt);
00175
00176
00177 data_counter = (data_counter + SUB_SAMPLED)% ORIGINAL;
00178 if (data_counter == SUB_SAMPLED) {
00179 dfh->report_rate = SUB_SAMPLED;
00180 } else {
00181 dfh->report_rate = ORIGINAL;
00182 }
00183 if (simple_report_rate < dfh->report_rate) {
00184 Packet::free(pkt);
00185 return;
00186 }
00187
00188 cmh->size() = size_;
00189 dfh->mess_type = DATA;
00190 dfh->pk_num = pk_count;
00191
00192 pk_count++;
00193
00194 dfh->sender_id = here_;
00195 dfh->data_type = data_type_;
00196 dfh->forward_agent_id = here_;
00197
00198 dfh->num_next = 1;
00199 dfh->next_nodes[0] = here_.addr_;
00200
00201 iph->src_ = here_;
00202 iph->dst_.addr_ = here_.addr_;
00203 iph->dst_.port_ = ROUTING_PORT;
00204
00205
00206
00207
00208
00209
00210 num_send++;
00211 dfh->attr[0] = data_type_;
00212
00213 if (APP_DUP_ == true)
00214 dfh->attr[1] = 0;
00215 else
00216 dfh->attr[1] = here_.addr_;
00217
00218 dfh->attr[2] = num_send;
00219 God::instance()->CountNewData(dfh->attr);
00220 send(pkt, 0);
00221 }
00222
00223
00224 void SinkAgent::bcast_interest()
00225 {
00226 Packet* pkt = create_packet();
00227 hdr_cdiff* dfh = HDR_CDIFF(pkt);
00228 hdr_ip* iph = HDR_IP(pkt);
00229
00230
00231 dfh->mess_type = INTEREST;
00232 dfh->pk_num = pk_count;
00233 pk_count++;
00234 dfh->sender_id = here_;
00235 dfh->data_type = data_type_;
00236 dfh->forward_agent_id = here_;
00237
00238 dfh->report_rate = SUB_SAMPLED;
00239 dfh->num_next = 1;
00240 dfh->next_nodes[0] = here_.addr_;
00241
00242 iph->src_ = here_;
00243 iph->dst_.addr_ = here_.addr_;
00244 iph->dst_.port_ = ROUTING_PORT;
00245
00246
00247
00248
00249
00250
00251
00252
00253
00254
00255
00256 send(pkt, 0);
00257 if (periodic_ == true)
00258 periodic_timer_.resched(INTEREST_PERIODIC);
00259 }
00260
00261
00262 void SinkAgent::data_ready()
00263 {
00264
00265 Packet* pkt = create_packet();
00266
00267
00268 hdr_cdiff* dfh = HDR_CDIFF(pkt);
00269 hdr_ip* iph = HDR_IP(pkt);
00270
00271
00272 dfh->mess_type = DATA_READY;
00273 dfh->pk_num = pk_count;
00274 pk_count++;
00275 dfh->sender_id = here_;
00276 dfh->data_type = data_type_;
00277 dfh->forward_agent_id = here_;
00278
00279 dfh->num_next = 1;
00280 dfh->next_nodes[0] = here_.addr_;
00281
00282 iph->src_ = here_;
00283 iph->dst_.addr_ = here_.addr_;
00284 iph->dst_.port_ = ROUTING_PORT;
00285
00286 send(pkt, 0);
00287 }
00288
00289
00290 void SinkAgent::Terminate()
00291 {
00292 #ifdef DEBUG_OUTPUT
00293 printf("SINK %d : TYPE %d : terminates (send %d, recv %d, cum_delay %f)\n",
00294 here_.addr_, data_type_, num_send, num_recv, cum_delay);
00295 #endif
00296 }
00297
00298
00299 int SinkAgent::command(int argc, const char*const* argv)
00300 {
00301 if (argc == 2) {
00302
00303 if (strcmp(argv[1], "enable-duplicate") == 0) {
00304 APP_DUP_ = true;
00305 return TCL_OK;
00306 }
00307
00308 if (strcmp(argv[1], "disable-duplicate") == 0) {
00309 APP_DUP_ = false;
00310 return TCL_OK;
00311 }
00312
00313 if (strcmp(argv[1], "always-max-rate") == 0) {
00314 always_max_rate_ = true;
00315 return TCL_OK;
00316 }
00317
00318 if (strcmp(argv[1], "terminate") == 0) {
00319 Terminate();
00320 return TCL_OK;
00321 }
00322
00323 if (strcmp(argv[1], "announce") == 0) {
00324 bcast_interest();
00325 report_timer_.resched(REPORT_PERIOD);
00326
00327 return (TCL_OK);
00328 }
00329
00330 if (strcmp(argv[1], "ready") == 0) {
00331 God::instance()->data_pkt_size = size_;
00332 data_ready();
00333 return (TCL_OK);
00334 }
00335
00336 if (strcmp(argv[1], "send") == 0) {
00337 sendpkt();
00338 return (TCL_OK);
00339 }
00340
00341 if (strcmp(argv[1], "cbr-start") == 0) {
00342 start();
00343 return (TCL_OK);
00344 }
00345
00346 if (strcmp(argv[1], "stop") == 0) {
00347 stop();
00348 report_timer_.force_cancel();
00349 return (TCL_OK);
00350 }
00351
00352 }
00353
00354 if (argc == 3) {
00355 if (strcmp(argv[1], "data-type") == 0) {
00356 data_type_ = atoi(argv[2]);
00357 return (TCL_OK);
00358 }
00359 }
00360
00361 return (Agent::command(argc, argv));
00362 }
00363
00364
00365 void SinkAgent::recv(Packet* pkt, Handler*)
00366 {
00367 hdr_cdiff* dfh = HDR_CDIFF(pkt);
00368
00369
00370
00371
00372
00373
00374
00375 if (data_type_ != dfh->data_type) {
00376 printf("Hey, What are you doing? I am not a sink for %d. I'm a sink for %d. \n", dfh->data_type, data_type_);
00377 Packet::free(pkt);
00378 return;
00379 }
00380
00381
00382 switch(dfh->mess_type) {
00383 case DATA_REQUEST :
00384
00385 if (always_max_rate_ == false)
00386 simple_report_rate = dfh->report_rate;
00387
00388 if (!running_) start();
00389
00390
00391
00392
00393 break;
00394
00395 case DATA_STOP :
00396
00397 if (running_) stop();
00398 break;
00399
00400 case DATA :
00401
00402 if (APP_DUP_ == true) {
00403 if (DataTable.GetHash(dfh->attr) != NULL) {
00404 Packet::free(pkt);
00405 return;
00406 } else {
00407 DataTable.PutInHash(dfh->attr);
00408 }
00409 }
00410
00411 cum_delay = cum_delay + (NOW - dfh->ts_);
00412 num_recv++;
00413 RecvPerSec++;
00414 God::instance()->IncrRecv();
00415
00416
00417
00418
00419
00420
00421
00422 last_arrival_time = NOW;
00423
00424 break;
00425
00426 default:
00427
00428 break;
00429 }
00430
00431 Packet::free(pkt);
00432 }
00433
00434
00435 void SinkAgent::reset()
00436 {
00437 }
00438
00439
00440 void SinkAgent:: set_addr(ns_addr_t address)
00441 {
00442 here_=address;
00443 }
00444
00445
00446 int SinkAgent:: get_pk_count()
00447 {
00448 return pk_count;
00449 }
00450
00451
00452 void SinkAgent:: incr_pk_count()
00453 {
00454 pk_count++;
00455 }
00456
00457
00458 Packet * SinkAgent:: create_packet()
00459 {
00460 Packet *pkt = allocpkt();
00461
00462 if (pkt==NULL) return NULL;
00463
00464 hdr_cmn* cmh = HDR_CMN(pkt);
00465
00466 cmh->size() = 36;
00467
00468 hdr_cdiff* dfh = HDR_CDIFF(pkt);
00469 dfh->ts_ = NOW;
00470
00471 return pkt;
00472 }
00473
00474
00475
00476
00477