00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040
00041
00042
00043
00044
00045
00046
00047
00048
00049
00050
00051
00052
00053
00054 #include <assert.h>
00055 #include <math.h>
00056 #include <stdio.h>
00057 #include <signal.h>
00058 #include <float.h>
00059 #include <stdlib.h>
00060
00061 #include <tcl.h>
00062
00063
00064 #include "diff_header.h"
00065 #include "agent.h"
00066 #include "tclcl.h"
00067 #include "ip.h"
00068 #include "config.h"
00069 #include "packet.h"
00070 #include "trace.h"
00071 #include "random.h"
00072 #include "classifier.h"
00073 #include "node.h"
00074 #include "omni_mcast.h"
00075 #include "iflist.h"
00076 #include "hash_table.h"
00077 #include "arp.h"
00078 #include "mac.h"
00079 #include "ll.h"
00080 #include "dsr/path.h"
00081 #include "god.h"
00082
00083 static class OmniMcastClass : public TclClass {
00084 public:
00085 OmniMcastClass() : TclClass("Agent/OmniMcast") {}
00086 TclObject* create(int argc, const char*const* argv) {
00087 return(new OmniMcastAgent());
00088 }
00089 } class_omni_mcast;
00090
00091
00092 void OmniMcastArpBufferTimer::expire(Event *e)
00093 {
00094 a_->ArpBufferCheck();
00095 resched(ARP_BUFFER_CHECK + ARP_BUFFER_CHECK *
00096 (double) ((long) e>>5 & 0xff) /256.0);
00097 }
00098
00099 void OmniMcastSendBufTimer::expire(Event *e)
00100 {
00101 a_->SendBufferCheck();
00102 resched(SEND_BUFFER_CHECK + SEND_BUFFER_CHECK * (double) ((long) e>>5 & 0xff)/256.0);
00103 }
00104
00105
00106 void OmniMcastAgent::DataForSink(Packet *pkt)
00107 {
00108 hdr_cdiff *dfh = HDR_CDIFF(pkt);
00109 unsigned int dtype = dfh->data_type;
00110 Agent_List *cur_agent;
00111 Packet *cur_pkt;
00112 hdr_cdiff *cur_dfh;
00113 hdr_ip *cur_iph;
00114
00115
00116 for (cur_agent= (routing_table[dtype]).sink; cur_agent != NULL;
00117 cur_agent= AGENT_NEXT(cur_agent) ) {
00118
00119 cur_pkt = pkt->copy();
00120 cur_iph = HDR_IP(cur_pkt);
00121 cur_iph->dst_ = AGT_ADDR(cur_agent);
00122
00123 cur_dfh = HDR_CDIFF(cur_pkt);
00124 cur_dfh->forward_agent_id = here_;
00125 cur_dfh->num_next = 1;
00126 cur_dfh->next_nodes[0] = NODE_ADDR(cur_agent);
00127
00128 send_to_dmux(cur_pkt, 0);
00129 }
00130 }
00131
00132
00133 void OmniMcastAgent::GodForwardData(Packet *pkt)
00134 {
00135 hdr_cdiff *dfh = HDR_CDIFF(pkt);
00136 unsigned int dtype = dfh->data_type;
00137 Packet *cur_pkt;
00138 hdr_cdiff *cur_dfh;
00139 hdr_ip *cur_iph;
00140 nsaddr_t src_node = (dfh->sender_id).addr_;
00141 int ret_num_oif;
00142 int *next_oif= God::instance()->NextOIFs(dtype, src_node, THIS_NODE,
00143 &ret_num_oif);
00144
00145 if (ret_num_oif == 0) {
00146 Packet::free(pkt);
00147 return;
00148 }
00149
00150 assert(next_oif != NULL);
00151
00152 for (int i=0; i<ret_num_oif; i++) {
00153 cur_pkt = pkt->copy();
00154 cur_iph = HDR_IP(cur_pkt);
00155
00156 (cur_iph->dst_).addr_ = next_oif[i];
00157 (cur_iph->dst_).port_ = ROUTING_PORT;
00158
00159 cur_dfh = HDR_CDIFF(cur_pkt);
00160 cur_dfh->forward_agent_id = here_;
00161 cur_dfh->num_next = 1;
00162 cur_dfh->next_nodes[0] = next_oif[i];
00163
00164 MACprepare(cur_pkt, next_oif[i], NS_AF_INET, MAC_RETRY_);
00165 MACsend(cur_pkt, 0);
00166 }
00167
00168 delete []next_oif;
00169 Packet::free(pkt);
00170 }
00171
00172
00173 Packet *OmniMcastAgent::prepare_message(unsigned int dtype, ns_addr_t to_addr,
00174 int msg_type)
00175 {
00176 Packet *pkt;
00177 hdr_cdiff *dfh;
00178 hdr_ip *iph;
00179
00180 pkt = create_packet();
00181 dfh = HDR_CDIFF(pkt);
00182 iph = HDR_IP(pkt);
00183
00184 dfh->mess_type = msg_type;
00185 dfh->pk_num = pk_count;
00186 pk_count++;
00187 dfh->sender_id = here_;
00188 dfh->data_type = dtype;
00189 dfh->forward_agent_id = here_;
00190
00191 dfh->ts_ = NOW;
00192 dfh->num_next = 1;
00193 dfh->next_nodes[0] = to_addr.addr_;
00194
00195 iph->src_ = here_;
00196 iph->dst_ = to_addr;
00197
00198 return pkt;
00199 }
00200
00201
00202 OmniMcastAgent::OmniMcastAgent() : Agent(PT_DIFF), arp_buf_timer(this),
00203 send_buf_timer(this)
00204 {
00205 pk_count = 0;
00206 target_ = 0;
00207 node = NULL;
00208 tracetarget = NULL;
00209 }
00210
00211
00212 void OmniMcastAgent::recv(Packet* packet, Handler*)
00213 {
00214 hdr_cdiff* dfh = HDR_CDIFF(packet);
00215
00216
00217
00218 Pkt_Hash_Entry *hashPtr= PktTable.GetHash(dfh->sender_id, dfh->pk_num);
00219
00220
00221
00222 if (hashPtr != NULL) {
00223 Packet::free(packet);
00224 return;
00225 }
00226
00227
00228
00229 PktTable.put_in_hash(dfh);
00230
00231
00232
00233 ConsiderNew(packet);
00234 }
00235
00236
00237 void OmniMcastAgent::ConsiderNew(Packet *pkt)
00238 {
00239 hdr_cdiff* dfh = HDR_CDIFF(pkt);
00240 unsigned char msg_type = dfh->mess_type;
00241 unsigned int dtype = dfh->data_type;
00242
00243 Pkt_Hash_Entry *hashPtr;
00244 Agent_List *agentPtr;
00245 PrvCurPtr RetVal;
00246 nsaddr_t from_nodeID, forward_nodeID;
00247
00248 Packet *gen_pkt;
00249 hdr_cdiff *gen_dfh;
00250
00251 switch (msg_type) {
00252 case INTEREST :
00253
00254 hashPtr = PktTable.GetHash(dfh->sender_id, dfh->pk_num);
00255
00256
00257
00258
00259 from_nodeID = (dfh->sender_id).addr_;
00260 forward_nodeID = (dfh->forward_agent_id).addr_;
00261
00262
00263 if (THIS_NODE == from_nodeID) {
00264
00265
00266
00267
00268 RetVal = INTF_FIND(routing_table[dtype].sink, dfh->sender_id);
00269
00270 if (RetVal.cur == NULL) {
00271
00272 agentPtr = new Agent_List;
00273 AGT_ADDR(agentPtr) = dfh->sender_id;
00274 INTF_INSERT(routing_table[dtype].sink, agentPtr);
00275
00276 God::instance()->AddSink(dtype, THIS_NODE);
00277 }
00278
00279 }
00280
00281 Packet::free(pkt);
00282 return;
00283
00284
00285 case DATA_READY :
00286
00287
00288
00289 agentPtr = new Agent_List;
00290 AGT_ADDR(agentPtr) = dfh->sender_id;
00291 agentPtr->next = routing_table[dtype].source;
00292 routing_table[dtype].source = agentPtr;
00293
00294 God::instance()->AddSource(dtype, (dfh->sender_id).addr_);
00295 gen_pkt = prepare_message(dtype, dfh->sender_id, DATA_REQUEST);
00296 gen_dfh = HDR_CDIFF(gen_pkt);
00297 gen_dfh->report_rate = ORIGINAL;
00298 send_to_dmux(gen_pkt, 0);
00299 Packet::free(pkt);
00300 return;
00301
00302
00303 case DATA :
00304
00305 DataForSink(pkt);
00306 GodForwardData(pkt);
00307 return;
00308
00309
00310 default :
00311
00312 Packet::free(pkt);
00313 break;
00314 }
00315 }
00316
00317 void OmniMcastAgent::Terminate()
00318 {
00319 #ifdef DEBUG_OUTPUT
00320 printf("node %d: remaining energy %f, initial energy %f\n", THIS_NODE,
00321 node->energy_model()->energy(),
00322 node->energy_model()->initialenergy() );
00323 #endif
00324 }
00325
00326 void OmniMcastAgent::Start()
00327 {
00328 arp_buf_timer.sched(ARP_BUFFER_CHECK + ARP_BUFFER_CHECK *
00329 Random::uniform(1.0));
00330 send_buf_timer.sched(SEND_BUFFER_CHECK + SEND_BUFFER_CHECK *
00331 Random::uniform(1.0));
00332 }
00333
00334
00335 void OmniMcastAgent::StopSource()
00336 {
00337 Agent_List *cur;
00338
00339 for (int i=0; i<MAX_DATA_TYPE; i++) {
00340 for (cur=routing_table[i].source; cur!=NULL; cur=AGENT_NEXT(cur) ) {
00341 SEND_MESSAGE(i, AGT_ADDR(cur), DATA_STOP);
00342 }
00343 }
00344 }
00345
00346
00347 Packet * OmniMcastAgent:: create_packet()
00348 {
00349 Packet *pkt = allocpkt();
00350
00351 if (pkt==NULL) return NULL;
00352
00353 hdr_cmn* cmh = HDR_CMN(pkt);
00354 cmh->size() = 36;
00355
00356 hdr_cdiff* dfh = HDR_CDIFF(pkt);
00357 dfh->ts_ = NOW;
00358 return pkt;
00359 }
00360
00361
00362 void OmniMcastAgent::MACprepare(Packet *pkt, nsaddr_t next_hop,
00363 unsigned int type, bool lk_dtct)
00364 {
00365 hdr_cdiff* dfh = HDR_CDIFF(pkt);
00366 hdr_cmn* cmh = HDR_CMN(pkt);
00367 hdr_ip* iph = HDR_IP(pkt);
00368
00369 dfh->forward_agent_id = here_;
00370 if (type == NS_AF_ILINK && next_hop == (nsaddr_t)MAC_BROADCAST) {
00371 cmh->xmit_failure_ = 0;
00372 cmh->next_hop() = MAC_BROADCAST;
00373 cmh->addr_type() = NS_AF_ILINK;
00374 cmh->direction() = hdr_cmn::DOWN;
00375
00376
00377 iph->src_ = here_;
00378 iph->dst_.addr_ = next_hop;
00379 iph->dst_.port_ = ROUTING_PORT;
00380
00381 dfh->num_next = 1;
00382 dfh->next_nodes[0] = next_hop;
00383
00384 return;
00385 }
00386
00387 if (lk_dtct != 0) {
00388 cmh->xmit_failure_ = OmniMcastXmitFailedCallback;
00389 cmh->xmit_failure_data_ = (void *) this;
00390 }
00391 else {
00392 cmh->xmit_failure_ = 0;
00393 }
00394
00395 cmh->direction() = hdr_cmn::DOWN;
00396 cmh->next_hop() = next_hop;
00397 cmh->addr_type() = type;
00398
00399 iph->src_ = here_;
00400 iph->dst_.addr_ = next_hop;
00401 iph->dst_.port_ = ROUTING_PORT;
00402
00403 dfh->num_next = 1;
00404 dfh->next_nodes[0] = next_hop;
00405 }
00406
00407
00408 void OmniMcastAgent::MACsend(Packet *pkt, Time delay)
00409 {
00410 hdr_cmn* cmh = HDR_CMN(pkt);
00411 hdr_cdiff* dfh = HDR_CDIFF(pkt);
00412
00413 if (dfh->mess_type == DATA)
00414 cmh->size() = (God::instance()->data_pkt_size) + 4*(dfh->num_next - 1);
00415 else
00416 cmh->size() = 36 + 4*(dfh->num_next -1);
00417
00418 Scheduler::instance().schedule(ll, pkt, delay);
00419 }
00420
00421
00422 void OmniMcastXmitFailedCallback(Packet *pkt, void *data)
00423 {
00424 OmniMcastAgent *agent = (OmniMcastAgent *)data;
00425 agent->xmitFailed(pkt);
00426 }
00427
00428
00429 void OmniMcastAgent::xmitFailed(Packet *pkt)
00430 {
00431
00432 }
00433
00434
00435 void OmniMcastAgent::StickPacketInArpBuffer(Packet *pkt)
00436 {
00437 Time min = DBL_MAX;
00438 int min_index = 0;
00439 int c;
00440
00441 for (c=0; c < ARP_BUF_SIZE; c++) {
00442 if (arp_buf[c].p == NULL) {
00443 arp_buf[c].t = NOW;
00444 arp_buf[c].attempt = 1;
00445 arp_buf[c].p = pkt;
00446 return;
00447 }
00448 else if (arp_buf[c].t < min) {
00449 min = arp_buf[c].t;
00450 min_index = c;
00451 }
00452 }
00453
00454
00455
00456 ARPEntry *llinfo;
00457 hdr_cmn* cmh = HDR_CMN(arp_buf[min_index].p);
00458
00459 llinfo= arp_table->arplookup(cmh->next_hop());
00460
00461 if (llinfo == 0) {
00462
00463 xmitFailed(arp_buf[min_index].p);
00464 }
00465 else
00466 MACsend(arp_buf[min_index].p, 0);
00467
00468
00469
00470
00471 arp_buf[min_index].t = NOW;
00472 arp_buf[min_index].attempt = 1;
00473 arp_buf[min_index].p = pkt;
00474 }
00475
00476
00477 void OmniMcastAgent::ArpBufferCheck()
00478 {
00479 int c;
00480 ARPEntry *llinfo;
00481 hdr_cmn* cmh;
00482
00483 for (c = 0; c < ARP_BUF_SIZE; c++) {
00484 if (arp_buf[c].p == NULL)
00485 continue;
00486
00487 cmh = HDR_CMN(arp_buf[c].p);
00488 llinfo= arp_table->arplookup(cmh->next_hop());
00489 if (llinfo != 0) {
00490 MACsend(arp_buf[c].p, 0);
00491 arp_buf[c].p = NULL;
00492 continue;}
00493
00494 if (arp_buf[c].attempt > ARP_MAX_ATTEMPT) {
00495
00496 xmitFailed(arp_buf[c].p);
00497 arp_buf[c].p = NULL;
00498 continue;
00499 }
00500
00501 arp_table->arprequest(THIS_NODE, cmh->next_hop(), (LL *)ll);
00502 arp_buf[c].attempt ++;
00503 }
00504 }
00505
00506
00507 void OmniMcastAgent::StickPacketInSendBuffer(Packet *p)
00508 {
00509 Time min = DBL_MAX;
00510 int min_index = 0;
00511 int c;
00512
00513 for (c = 0 ; c < SEND_BUF_SIZE ; c ++) {
00514 if (send_buf[c].p == NULL)
00515 {
00516 send_buf[c].t = NOW;
00517 send_buf[c].p = p;
00518 return;
00519 }
00520 else if (send_buf[c].t < min)
00521 {
00522 min = send_buf[c].t;
00523 min_index = c;
00524 }
00525 }
00526
00527
00528
00529 if (send_buf[min_index].p != NULL) {
00530 MACsend(send_buf[min_index].p, 0);
00531 }
00532
00533
00534
00535 send_buf[min_index].t = Scheduler::instance().clock();
00536 send_buf[min_index].p = p;
00537 }
00538
00539
00540 void OmniMcastAgent::SendBufferCheck()
00541 {
00542 for (int c = 0; c < SEND_BUF_SIZE; c++) {
00543 if (send_buf[c].p != NULL) {
00544 MACsend(send_buf[c].p, 0);
00545 send_buf[c].p = NULL;
00546 }
00547 }
00548 }
00549
00550
00551 void OmniMcastAgent::trace (char *fmt,...)
00552 {
00553 va_list ap;
00554
00555 if (!tracetarget)
00556 return;
00557
00558 va_start (ap, fmt);
00559 vsprintf (tracetarget->pt_->buffer (), fmt, ap);
00560 tracetarget->pt_->dump ();
00561 va_end (ap);
00562 }
00563
00564
00565
00566 int OmniMcastAgent::command(int argc, const char*const* argv)
00567 {
00568 Tcl& tcl = Tcl::instance();
00569
00570 if (argc == 2) {
00571
00572 if (strcasecmp(argv[1], "reset-state")==0) {
00573 reset();
00574 return TCL_OK;
00575 }
00576
00577 if (strcasecmp(argv[1], "reset")==0) {
00578 return Agent::command(argc, argv);
00579 }
00580
00581 if (strcasecmp(argv[1], "start")==0) {
00582 Start();
00583 return TCL_OK;
00584 }
00585
00586 if (strcasecmp(argv[1], "stop")==0) {
00587 return TCL_OK;
00588 }
00589
00590 if (strcasecmp(argv[1], "terminate")==0) {
00591 Terminate();
00592 return TCL_OK;
00593 }
00594
00595 if (strcasecmp(argv[1], "stop-source")==0) {
00596 StopSource();
00597 return TCL_OK;
00598 }
00599
00600 } else if (argc == 3) {
00601
00602 if (strcasecmp(argv[1], "on-node")==0) {
00603 node = (Node *)tcl.lookup(argv[2]);
00604 return TCL_OK;
00605 }
00606
00607 if (strcasecmp(argv[1], "add-ll") == 0) {
00608
00609 TclObject *obj;
00610
00611 if ( (obj = TclObject::lookup(argv[2])) == 0) {
00612 fprintf(stderr, "OmniMcast Node: %d lookup of %s failed\n", THIS_NODE,
00613 argv[2]);
00614 return TCL_ERROR;
00615 }
00616 ll = (NsObject *) obj;
00617
00618
00619 arp_table = ((LL *)ll)->arp_table();
00620 if (arp_table == NULL)
00621 return TCL_ERROR;
00622
00623 return TCL_OK;
00624 }
00625
00626 if (strcasecmp (argv[1], "tracetarget") == 0) {
00627 TclObject *obj;
00628 if ((obj = TclObject::lookup (argv[2])) == 0) {
00629 fprintf (stderr, "%s: %s lookup of %s failed\n", __FILE__, argv[1],
00630 argv[2]);
00631 return TCL_ERROR;
00632 }
00633
00634 tracetarget = (Trace *) obj;
00635 return TCL_OK;
00636 }
00637
00638 if (strcasecmp(argv[1], "port-dmux") == 0) {
00639
00640 TclObject *obj;
00641
00642 if ( (obj = TclObject::lookup(argv[2])) == 0) {
00643 fprintf(stderr, "OmniMcast Node: %d lookup of %s failed\n", THIS_NODE,
00644 argv[2]);
00645 return TCL_ERROR;
00646 }
00647 port_dmux = (NsObject *) obj;
00648 return TCL_OK;
00649 }
00650
00651 }
00652
00653 return Agent::command(argc, argv);
00654 }
00655
00656
00657 void OmniMcastAgent::reset()
00658 {
00659 PktTable.reset();
00660
00661 for (int i=0; i<MAX_DATA_TYPE; i++) {
00662 routing_table[i].reset();
00663 }
00664 clear_arp_buf();
00665 clear_send_buf();
00666 }
00667
00668
00669 void OmniMcastAgent::clear_arp_buf()
00670 {
00671 for (int i=0; i<ARP_BUF_SIZE; i++) {
00672 arp_buf[i].t = 0;
00673 arp_buf[i].attempt = 0;
00674 if (arp_buf[i].p != NULL)
00675 Packet::free(arp_buf[i].p);
00676 arp_buf[i].p = NULL;
00677 }
00678 }
00679
00680
00681 void OmniMcastAgent::clear_send_buf()
00682 {
00683 for (int i=0; i<SEND_BUF_SIZE; i++) {
00684 send_buf[i].t = 0;
00685 if (send_buf[i].p != NULL)
00686 Packet::free(send_buf[i].p);
00687 send_buf[i].p = NULL;
00688 }
00689 }
00690
00691
00692 void OmniMcast_Entry::reset()
00693 {
00694 clear_agentlist(source);
00695 clear_agentlist(sink);
00696 source = NULL;
00697 sink = NULL;
00698 }
00699
00700
00701 void OmniMcast_Entry::clear_agentlist(Agent_List *list)
00702 {
00703 Agent_List *cur=list;
00704 Agent_List *temp = NULL;
00705
00706 while (cur != NULL) {
00707 temp = AGENT_NEXT(cur);
00708 delete cur;
00709 cur = temp;
00710 }
00711 }
00712
00713
00714