00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040
00041
00042
00043
00044
00045
00046
00047
00048
00049
00050
00051
00052
00053
00054
00055
00056 #include <assert.h>
00057 #include <math.h>
00058 #include <stdio.h>
00059 #include <signal.h>
00060 #include <float.h>
00061 #include <tcl.h>
00062 #include <stdlib.h>
00063
00064 #include "diff_header.h"
00065 #include "agent.h"
00066 #include "tclcl.h"
00067 #include "ip.h"
00068 #include "config.h"
00069 #include "packet.h"
00070 #include "trace.h"
00071 #include "random.h"
00072 #include "classifier.h"
00073 #include "node.h"
00074 #include "diffusion.h"
00075 #include "iflist.h"
00076 #include "hash_table.h"
00077 #include "arp.h"
00078 #include "mac.h"
00079 #include "ll.h"
00080 #include "dsr/path.h"
00081 #include "god.h"
00082 #include "routing_table.h"
00083
00084 char *MsgStr[]= {"", "INTEREST", "DATA", "DATA_READY", "DATA_REQUEST",
00085 "POS_REINFORCE", "NEG_REINFORCE", "INHIBIT", "TX_FAILED",
00086 "DATA_STOP" };
00087
00088 int hdr_cdiff::offset_;
00089 static class DiffHeaderClass : public PacketHeaderClass {
00090 public:
00091 DiffHeaderClass() : PacketHeaderClass("PacketHeader/Diffusion",
00092 sizeof(hdr_cdiff)) {
00093 bind_offset(&hdr_cdiff::offset_);
00094 }
00095 } class_diffhdr;
00096
00097
00098 static class DiffusionClass : public TclClass {
00099 public:
00100 DiffusionClass() : TclClass("Agent/Diffusion") {}
00101 TclObject* create(int , const char*const* ) {
00102 return(new DiffusionAgent());
00103 }
00104 } class_diffusion;
00105
00106
00107 void ArpBufferTimer::expire(Event *e)
00108 {
00109 a_->ArpBufferCheck();
00110 resched(ARP_BUFFER_CHECK + ARP_BUFFER_CHECK *
00111 (double) ((long) e>>5 & 0xff) /256.0);
00112 }
00113
00114 void SendBufTimer::expire(Event *e)
00115 {
00116 a_->SendBufferCheck();
00117 resched(SEND_BUFFER_CHECK + SEND_BUFFER_CHECK * (double) ((long) e>>5 & 0xff)/256.0);
00118 }
00119
00120
00121 void DiffusionAgent::DataForSink(Packet *pkt)
00122 {
00123 hdr_cdiff *dfh = HDR_CDIFF(pkt);
00124 unsigned int dtype = dfh->data_type;
00125 Agent_List *cur_agent;
00126 Packet *cur_pkt;
00127 hdr_cdiff *cur_dfh;
00128 hdr_ip *cur_iph;
00129
00130
00131
00132
00133 for (cur_agent= (routing_table[dtype]).sink; cur_agent != NULL;
00134 cur_agent= AGENT_NEXT(cur_agent) ) {
00135
00136 cur_pkt = pkt->copy();
00137 cur_iph = HDR_IP(cur_pkt);
00138 cur_iph->dst_ = AGT_ADDR(cur_agent);
00139
00140 cur_dfh = HDR_CDIFF(cur_pkt);
00141 cur_dfh->forward_agent_id = here_;
00142 cur_dfh->num_next = 1;
00143 cur_dfh->next_nodes[0] = NODE_ADDR(cur_agent);
00144
00145 send_to_dmux(cur_pkt, 0);
00146 }
00147 }
00148
00149
00150 Packet *DiffusionAgent::prepare_message(unsigned int dtype, ns_addr_t to_addr,
00151 int msg_type)
00152 {
00153 Packet *pkt;
00154 hdr_cdiff *dfh;
00155 hdr_ip *iph;
00156
00157 pkt = create_packet();
00158 dfh = HDR_CDIFF(pkt);
00159 iph = HDR_IP(pkt);
00160
00161 dfh->mess_type = msg_type;
00162 dfh->pk_num = pk_count;
00163 pk_count++;
00164 dfh->sender_id = here_;
00165 dfh->data_type = dtype;
00166 dfh->forward_agent_id = here_;
00167
00168 dfh->ts_ = NOW;
00169 dfh->num_next = 1;
00170 dfh->next_nodes[0] = to_addr.addr_;
00171
00172 iph->src_ = here_;
00173 iph->dst_ = to_addr;
00174
00175 return pkt;
00176 }
00177
00178
00179 DiffusionAgent::DiffusionAgent() : Agent(PT_DIFF), arp_buf_timer(this),
00180 send_buf_timer(this)
00181 {
00182 POS_REINF_ = true;
00183 NEG_REINF_ = true;
00184 overhead = 0;
00185 pk_count = 0;
00186 target_ = 0;
00187
00188 node = NULL;
00189 tracetarget = NULL;
00190 }
00191
00192
00193 void DiffusionAgent::recv(Packet* packet, Handler*)
00194 {
00195 hdr_cdiff* dfh = HDR_CDIFF(packet);
00196
00197
00198
00199 Pkt_Hash_Entry *hashPtr= PktTable.GetHash(dfh->sender_id, dfh->pk_num);
00200
00201
00202
00203
00204 if (hashPtr != NULL) {
00205 consider_old(packet);
00206 return;
00207 }
00208
00209
00210
00211 PktTable.put_in_hash(dfh);
00212
00213
00214
00215 consider_new(packet);
00216 }
00217
00218
00219 void DiffusionAgent::consider_old(Packet *pkt)
00220 {
00221 Packet::free(pkt);
00222 }
00223
00224
00225 void DiffusionAgent::consider_new(Packet *pkt)
00226 {
00227 Packet::free(pkt);
00228 }
00229
00230
00231 void DiffusionAgent::Terminate()
00232 {
00233 #ifdef DEBUG_OUTPUT
00234 printf("Diffusion node %d : terminates (overhead %d)\n",
00235 THIS_NODE, overhead);
00236 printf("node %d: remaining energy %f, initial energy %f\n", THIS_NODE,
00237 node->energy_model()->energy(),
00238 node->energy_model()->initialenergy() );
00239 Print_IOlist();
00240 #endif
00241 }
00242
00243
00244 void DiffusionAgent::Print_IOlist()
00245 {
00246 }
00247
00248
00249 void DiffusionAgent::Start()
00250 {
00251 arp_buf_timer.sched(ARP_BUFFER_CHECK + ARP_BUFFER_CHECK *
00252 Random::uniform(1.0));
00253 send_buf_timer.sched(SEND_BUFFER_CHECK + SEND_BUFFER_CHECK *
00254 Random::uniform(1.0));
00255 }
00256
00257
00258 int DiffusionAgent::command(int argc, const char*const* argv)
00259 {
00260 Tcl& tcl = Tcl::instance();
00261
00262 if (argc == 2) {
00263
00264 if (strcasecmp(argv[1], "reset-state")==0) {
00265
00266 reset();
00267 return TCL_OK;
00268 }
00269
00270 if (strcasecmp(argv[1], "reset")==0) {
00271
00272 return Agent::command(argc, argv);
00273 }
00274
00275 if (strcasecmp(argv[1], "start")==0) {
00276 Start();
00277 return TCL_OK;
00278 }
00279
00280 if (strcasecmp(argv[1], "stop")==0) {
00281 return TCL_OK;
00282 }
00283
00284 if (strcasecmp(argv[1], "terminate")==0) {
00285 Terminate();
00286 return TCL_OK;
00287 }
00288
00289 if (strcasecmp(argv[1], "stop-source")==0) {
00290 StopSource();
00291 return TCL_OK;
00292 }
00293
00294 if (strcasecmp(argv[1], "enable-pos") == 0) {
00295 POS_REINF_ = true;
00296 return TCL_OK;
00297 }
00298
00299 if (strcasecmp(argv[1], "enable-neg") == 0) {
00300 NEG_REINF_ = true;
00301 return TCL_OK;
00302 }
00303
00304
00305 if (strcasecmp(argv[1], "disable-pos") == 0) {
00306 POS_REINF_ = false;
00307 return TCL_OK;
00308 }
00309
00310 if (strcasecmp(argv[1], "disable-neg") == 0) {
00311 NEG_REINF_ = false;
00312 return TCL_OK;
00313 }
00314
00315 } else if (argc == 3) {
00316
00317 if (strcasecmp(argv[1], "on-node")==0) {
00318 node = (Node *)tcl.lookup(argv[2]);
00319 return TCL_OK;
00320 }
00321
00322
00323 if (strcasecmp(argv[1], "add-ll") == 0) {
00324
00325 TclObject *obj;
00326
00327 if ( (obj = TclObject::lookup(argv[2])) == 0) {
00328 fprintf(stderr, "Diffusion Node: %d lookup of %s failed\n", THIS_NODE,
00329 argv[2]);
00330 return TCL_ERROR;
00331 }
00332 ll = (NsObject *) obj;
00333
00334
00335 arp_table = ((LL *)ll)->arp_table();
00336 if (arp_table == NULL)
00337 return TCL_ERROR;
00338
00339 return TCL_OK;
00340 }
00341
00342 if (strcasecmp (argv[1], "tracetarget") == 0) {
00343 TclObject *obj;
00344 if ((obj = TclObject::lookup (argv[2])) == 0) {
00345 fprintf (stderr, "%s: %s lookup of %s failed\n", __FILE__, argv[1],
00346 argv[2]);
00347 return TCL_ERROR;
00348 }
00349
00350 tracetarget = (Trace *) obj;
00351 return TCL_OK;
00352 }
00353
00354 if (strcasecmp(argv[1], "port-dmux") == 0) {
00355
00356 TclObject *obj;
00357
00358 if ( (obj = TclObject::lookup(argv[2])) == 0) {
00359 fprintf(stderr, "Diffusion Node: %d lookup of %s failed\n", THIS_NODE,
00360 argv[2]);
00361 return TCL_ERROR;
00362 }
00363 port_dmux = (NsObject *) obj;
00364 return TCL_OK;
00365 }
00366
00367
00368 }
00369
00370 return Agent::command(argc, argv);
00371 }
00372
00373
00374 void DiffusionAgent::StopSource()
00375 {
00376 Agent_List *cur;
00377
00378 for (int i=0; i<MAX_DATA_TYPE; i++) {
00379 for (cur=routing_table[i].source; cur!=NULL; cur=AGENT_NEXT(cur) ) {
00380 SEND_MESSAGE(i, AGT_ADDR(cur), DATA_STOP);
00381 }
00382 }
00383 }
00384
00385
00386 Packet * DiffusionAgent:: create_packet()
00387 {
00388 Packet *pkt = allocpkt();
00389
00390 if (pkt==NULL) return NULL;
00391
00392 hdr_cmn* cmh = HDR_CMN(pkt);
00393 cmh->size() = 36;
00394
00395 hdr_cdiff* dfh = HDR_CDIFF(pkt);
00396 dfh->ts_ = NOW;
00397 return pkt;
00398 }
00399
00400
00401 void DiffusionAgent::MACprepare(Packet *pkt, nsaddr_t next_hop,
00402 int type, bool lk_dtct)
00403 {
00404 hdr_cdiff* dfh = HDR_CDIFF(pkt);
00405 hdr_cmn* cmh = HDR_CMN(pkt);
00406 hdr_ip* iph = HDR_IP(pkt);
00407
00408 dfh->forward_agent_id = here_;
00409 if (type == (int) NS_AF_ILINK && next_hop == (nsaddr_t) MAC_BROADCAST) {
00410 cmh->xmit_failure_ = 0;
00411 cmh->next_hop() = MAC_BROADCAST;
00412 cmh->addr_type() = NS_AF_ILINK;
00413 cmh->direction() = hdr_cmn::DOWN;
00414
00415
00416 iph->src_ = here_;
00417 iph->dst_.addr_ = next_hop;
00418 iph->dst_.port_ = ROUTING_PORT;
00419
00420 dfh->num_next = 1;
00421 dfh->next_nodes[0] = next_hop;
00422
00423 return;
00424 }
00425
00426 if (lk_dtct != 0) {
00427 cmh->xmit_failure_ = XmitFailedCallback;
00428 cmh->xmit_failure_data_ = (void *) this;
00429 }
00430 else {
00431 cmh->xmit_failure_ = 0;
00432 }
00433
00434 cmh->direction() = hdr_cmn::DOWN;
00435
00436 cmh->next_hop() = next_hop;
00437 cmh->addr_type() = type;
00438
00439 iph->src_ = here_;
00440 iph->dst_.addr_ = next_hop;
00441 iph->dst_.port_ = ROUTING_PORT;
00442
00443 dfh->num_next = 1;
00444 dfh->next_nodes[0] = next_hop;
00445 }
00446
00447
00448 void DiffusionAgent::MACsend(Packet *pkt, Time delay)
00449 {
00450 hdr_cmn* cmh = HDR_CMN(pkt);
00451 hdr_cdiff* dfh = HDR_CDIFF(pkt);
00452
00453 if (dfh->mess_type == DATA)
00454 cmh->size() = (God::instance()->data_pkt_size) + 4*(dfh->num_next - 1);
00455 else
00456 cmh->size() = 36 + 4*(dfh->num_next -1);
00457
00458 Scheduler::instance().schedule(ll, pkt, delay);
00459 }
00460
00461
00462 void XmitFailedCallback(Packet *pkt, void *data)
00463 {
00464 DiffusionAgent *agent = (DiffusionAgent *)data;
00465 agent->xmitFailed(pkt);
00466 }
00467
00468
00469 void DiffusionAgent::xmitFailed(Packet *)
00470 {
00471
00472 }
00473
00474
00475 void DiffusionAgent::StickPacketInArpBuffer(Packet *pkt)
00476 {
00477 Time min = DBL_MAX;
00478 int min_index = 0;
00479 int c;
00480
00481 for (c=0; c < ARP_BUF_SIZE; c++) {
00482 if (arp_buf[c].p == NULL) {
00483 arp_buf[c].t = NOW;
00484 arp_buf[c].attempt = 1;
00485 arp_buf[c].p = pkt;
00486 return;
00487 }
00488 else if (arp_buf[c].t < min) {
00489 min = arp_buf[c].t;
00490 min_index = c;
00491 }
00492 }
00493
00494
00495
00496 ARPEntry *llinfo;
00497 hdr_cmn* cmh = HDR_CMN(arp_buf[min_index].p);
00498
00499 llinfo= arp_table->arplookup(cmh->next_hop());
00500
00501 if (llinfo == 0) {
00502
00503 xmitFailed(arp_buf[min_index].p);
00504 }
00505 else
00506 MACsend(arp_buf[min_index].p, 0);
00507
00508
00509
00510
00511 arp_buf[min_index].t = NOW;
00512 arp_buf[min_index].attempt = 1;
00513 arp_buf[min_index].p = pkt;
00514 }
00515
00516
00517 void DiffusionAgent::ArpBufferCheck()
00518 {
00519 int c;
00520 ARPEntry *llinfo;
00521 hdr_cmn* cmh;
00522
00523 for (c = 0; c < ARP_BUF_SIZE; c++) {
00524 if (arp_buf[c].p == NULL)
00525 continue;
00526
00527 cmh = HDR_CMN(arp_buf[c].p);
00528 llinfo= arp_table->arplookup(cmh->next_hop());
00529 if (llinfo != 0) {
00530 MACsend(arp_buf[c].p, 0);
00531 arp_buf[c].p = NULL;
00532 continue;}
00533
00534 if (arp_buf[c].attempt > ARP_MAX_ATTEMPT) {
00535
00536 xmitFailed(arp_buf[c].p);
00537 arp_buf[c].p = NULL;
00538 continue;
00539 }
00540
00541 arp_table->arprequest(THIS_NODE, cmh->next_hop(), (LL *)ll);
00542 arp_buf[c].attempt ++;
00543 }
00544 }
00545
00546
00547 void DiffusionAgent::StickPacketInSendBuffer(Packet *p)
00548 {
00549 Time min = DBL_MAX;
00550 int min_index = 0;
00551 int c;
00552
00553 for (c = 0 ; c < SEND_BUF_SIZE ; c ++)
00554 if (send_buf[c].p == NULL)
00555 {
00556 send_buf[c].t = NOW;
00557 send_buf[c].p = p;
00558 return;
00559 }
00560 else if (send_buf[c].t < min)
00561 {
00562 min = send_buf[c].t;
00563 min_index = c;
00564 }
00565
00566
00567
00568 hdr_cdiff *dfh = HDR_CDIFF(send_buf[min_index].p);
00569 hdr_ip *iph = HDR_IP(send_buf[min_index].p);
00570 int dtype = dfh->data_type;
00571
00572 PrvCurPtr RetVal = INTF_FIND(routing_table[dtype].active, iph->dst_);
00573
00574 if (RetVal.cur != NULL)
00575 MACsend(send_buf[min_index].p, 0);
00576 else
00577 Packet::free(send_buf[min_index].p);
00578
00579
00580
00581 send_buf[min_index].t = Scheduler::instance().clock();
00582 send_buf[min_index].p = p;
00583 }
00584
00585
00586 void DiffusionAgent::SendBufferCheck()
00587 {
00588 int c;
00589 hdr_cdiff *dfh;
00590 hdr_cmn *cmh;
00591 hdr_ip *iph;
00592 int dtype;
00593 PrvCurPtr RetVal;
00594
00595 for (c = 0; c < SEND_BUF_SIZE; c++) {
00596 if (send_buf[c].p == NULL)
00597 continue;
00598
00599 dfh = HDR_CDIFF(send_buf[c].p);
00600 cmh = HDR_CMN(send_buf[c].p);
00601 iph = HDR_IP(send_buf[c].p);
00602 dtype = dfh->data_type;
00603
00604 RetVal = INTF_FIND(routing_table[dtype].active, iph->dst_);
00605
00606 if (RetVal.cur != NULL) {
00607 MACsend(send_buf[c].p, 0);
00608 send_buf[c].p = NULL;
00609 continue;
00610 }
00611
00612 if (Scheduler::instance().clock() - send_buf[c].t > SEND_TIMEOUT) {
00613 Packet::free(send_buf[c].p);
00614 send_buf[c].p = NULL;
00615 continue;
00616 }
00617
00618 }
00619 }
00620
00621
00622 void DiffusionAgent::trace (char *fmt,...)
00623 {
00624 va_list ap;
00625
00626 if (!tracetarget)
00627 return;
00628
00629 va_start (ap, fmt);
00630 vsprintf (tracetarget->pt_->buffer (), fmt, ap);
00631 tracetarget->pt_->dump ();
00632 va_end (ap);
00633 }
00634
00635
00636 void DiffusionAgent::reset()
00637 {
00638 PktTable.reset();
00639
00640 for (int i=0; i<MAX_DATA_TYPE; i++) {
00641 routing_table[i].reset();
00642 }
00643 clear_arp_buf();
00644 clear_send_buf();
00645 }
00646
00647
00648 void DiffusionAgent::clear_arp_buf()
00649 {
00650 for (int i=0; i<ARP_BUF_SIZE; i++) {
00651 arp_buf[i].t = 0;
00652 arp_buf[i].attempt = 0;
00653 if (arp_buf[i].p != NULL)
00654 Packet::free(arp_buf[i].p);
00655 arp_buf[i].p = NULL;
00656 }
00657 }
00658
00659 void DiffusionAgent::clear_send_buf()
00660 {
00661 for (int i=0; i<SEND_BUF_SIZE; i++) {
00662 send_buf[i].t = 0;
00663 if (send_buf[i].p != NULL)
00664 Packet::free(send_buf[i].p);
00665 send_buf[i].p = NULL;
00666 }
00667 }
00668
00669
00670
00671
00672
00673
00674
00675