00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040
00041
00042
00043
00044
00045
00046
00047
00048
00049
00050
00051
00052
00053 #include <assert.h>
00054 #include <math.h>
00055 #include <stdio.h>
00056 #include <signal.h>
00057 #include <float.h>
00058
00059 #include <tcl.h>
00060 #include <stdlib.h>
00061
00062 #include "diff_header.h"
00063 #include "agent.h"
00064 #include "tclcl.h"
00065 #include "ip.h"
00066 #include "config.h"
00067 #include "packet.h"
00068 #include "trace.h"
00069 #include "random.h"
00070 #include "classifier.h"
00071 #include "node.h"
00072 #include "diffusion.h"
00073 #include "iflist.h"
00074 #include "hash_table.h"
00075 #include "arp.h"
00076 #include "mac.h"
00077 #include "ll.h"
00078 #include "dsr/path.h"
00079 #include "god.h"
00080 #include "routing_table.h"
00081 #include "diff_rate.h"
00082
00083 extern char *MsgStr[];
00084
00085 static class DiffusionRateClass : public TclClass {
00086 public:
00087 DiffusionRateClass() : TclClass("Agent/Diffusion/RateGradient") {}
00088 TclObject* create(int , const char*const* ) {
00089 return(new DiffusionRate());
00090 }
00091 } class_diffusion_rate;
00092
00093
00094 void GradientTimer::expire(Event *)
00095 {
00096 a_->GradientTimeOut();
00097 }
00098
00099
00100 void NegativeReinforceTimer::expire(Event *)
00101 {
00102 a_->NegReinfTimeOut();
00103 }
00104
00105
00106 DiffusionRate::DiffusionRate() : DiffusionAgent()
00107 {
00108 DUP_SUP_ = true;
00109
00110 sub_type_ = BCAST_SUB;
00111 org_type_ = UNICAST_ORG;
00112 pos_type_ = POS_ALL;
00113 pos_node_type_ = INTM_POS;
00114 neg_win_type_ = NEG_TIMER;
00115 neg_thr_type_ = NEG_ABSOLUTE;
00116 neg_max_type_ = NEG_FIXED_MAX;
00117
00118 num_not_send_bcast_data = 0;
00119 num_data_bcast_send = 0;
00120 num_data_bcast_rcv = 0;
00121 num_neg_bcast_send = 0;
00122 num_neg_bcast_rcv = 0;
00123 }
00124
00125
00126 void DiffusionRate::recv(Packet* packet, Handler*)
00127 {
00128 hdr_cdiff* dfh = HDR_CDIFF(packet);
00129
00130
00131
00132 Pkt_Hash_Entry *hashPtr= PktTable.GetHash(dfh->sender_id, dfh->pk_num);
00133
00134
00135 #ifdef DEBUG_RATE
00136 printf("DF node %x recv %s (%x, %x, %d)\n",
00137 THIS_NODE, MsgStr[dfh->mess_type], (dfh->sender_id).addr_,
00138 (dfh->sender_id).port_, dfh->pk_num);
00139 #endif
00140
00141
00142
00143
00144 if (hashPtr != NULL) {
00145 consider_old(packet);
00146 return;
00147 }
00148
00149
00150
00151 PktTable.put_in_hash(dfh);
00152
00153
00154
00155
00156 if (DUP_SUP_ == true) {
00157
00158 if (dfh->mess_type == DATA) {
00159 if (DataTable.GetHash(dfh->attr) != NULL) {
00160 consider_old(packet);
00161 return;
00162 } else {
00163 DataTable.PutInHash(dfh->attr);
00164 }
00165
00166 }
00167 }
00168
00169 consider_new(packet);
00170 }
00171
00172
00173 void DiffusionRate::consider_old(Packet *pkt)
00174 {
00175 hdr_cdiff* dfh = HDR_CDIFF(pkt);
00176 hdr_cmn* cmh = HDR_CMN(pkt);
00177 unsigned char msg_type = dfh->mess_type;
00178 unsigned int dtype = dfh->data_type;
00179
00180 switch (msg_type) {
00181 case INTEREST :
00182 InterestHandle(pkt);
00183 return;
00184
00185 case DATA:
00186
00187 if (cmh->next_hop_ == (nsaddr_t)MAC_BROADCAST) {
00188 num_data_bcast_rcv++;
00189 }
00190
00191 if (dfh->report_rate == ORIGINAL) {
00192 routing_table[dtype].CntOldOrg(dfh->forward_agent_id);
00193 }
00194 Packet::free(pkt);
00195 return;
00196
00197 case NEG_REINFORCE:
00198 if (cmh->next_hop_ == (nsaddr_t)MAC_BROADCAST) {
00199 num_neg_bcast_rcv++;
00200 }
00201 break;
00202
00203 default :
00204 Packet::free(pkt);
00205 break;
00206 }
00207 }
00208
00209
00210 void DiffusionRate::consider_new(Packet *pkt)
00211 {
00212 hdr_cdiff* dfh = HDR_CDIFF(pkt);
00213 hdr_cmn * cmh = HDR_CMN(pkt);
00214 unsigned char msg_type = dfh->mess_type;
00215 unsigned int dtype = dfh->data_type;
00216
00217 Agent_List *agentPtr;
00218 Packet *gen_pkt;
00219 hdr_cdiff *gen_dfh;
00220
00221 switch (msg_type) {
00222 case INTEREST :
00223 InterestHandle(pkt);
00224 return;
00225
00226 case POS_REINFORCE :
00227 if ( POS_REINF_ == false ) {
00228 printf("Hey, we are not in pos_reinf mode.\n");
00229 Packet::free(pkt);
00230 exit(-1);
00231 }
00232
00233 ProcessPosReinf(pkt);
00234 return;
00235
00236 case NEG_REINFORCE :
00237 if (cmh->next_hop_ == (nsaddr_t)MAC_BROADCAST) {
00238 num_neg_bcast_rcv++;
00239 } else {
00240 routing_table[dtype].CntNeg(dfh->forward_agent_id);
00241 }
00242
00243 if (NEG_REINF_ == false) {
00244 printf("Hey, we are not in neg_reinf mode.\n");
00245 Packet::free(pkt);
00246 exit(-1);
00247 }
00248
00249 ProcessNegReinf(pkt);
00250 return;
00251
00252 case DATA_READY :
00253
00254
00255
00256 agentPtr = new Agent_List;
00257 AGT_ADDR(agentPtr) = dfh->sender_id;
00258 agentPtr->next = routing_table[dtype].source;
00259 routing_table[dtype].source = agentPtr;
00260
00261 God::instance()->AddSource(dtype, (dfh->sender_id).addr_);
00262
00263
00264
00265
00266
00267
00268 if (routing_table[dtype].active != NULL ||
00269 routing_table[dtype].sink != NULL) {
00270 gen_pkt = prepare_message(dtype, dfh->sender_id, DATA_REQUEST);
00271 gen_dfh = HDR_CDIFF(gen_pkt);
00272 gen_dfh->report_rate = SUB_SAMPLED;
00273 send_to_dmux(gen_pkt, 0);
00274
00275
00276
00277
00278
00279
00280 }
00281
00282 Packet::free(pkt);
00283 return;
00284
00285 case DATA :
00286
00287 if (cmh->next_hop_ == (nsaddr_t)MAC_BROADCAST) {
00288 num_data_bcast_rcv++;
00289 }
00290
00291 DataForSink(pkt);
00292
00293 if (dfh->report_rate == SUB_SAMPLED) {
00294 routing_table[dtype].CntNewSub(dfh->forward_agent_id);
00295 FwdData(pkt);
00296 return;
00297 }
00298
00299 if (dfh->report_rate == ORIGINAL) {
00300 routing_table[dtype].new_org_counter++;
00301 routing_table[dtype].CntNewOrg(dfh->forward_agent_id);
00302 FwdData(pkt);
00303
00304 if (neg_win_type_ == NEG_COUNTER) {
00305 CheckNegCounter(dtype);
00306 return;
00307 }
00308 }
00309 return;
00310
00311 default :
00312 Packet::free(pkt);
00313 break;
00314 }
00315 }
00316
00317
00318 void DiffusionRate::CheckNegCounter(int dtype)
00319 {
00320 if (neg_max_type_ == NEG_FIXED_MAX) {
00321 if (routing_table[dtype].new_org_counter >= MAX_NEG_COUNTER
00322 && NEG_REINF_ == true) {
00323 GenNeg(dtype);
00324 routing_table[dtype].new_org_counter = 0;
00325 routing_table[dtype].ClrAllNewOrg();
00326 routing_table[dtype].ClrAllOldOrg();
00327 }
00328 return;
00329 }
00330
00331 if (neg_max_type_ == NEG_SCALE_MAX) {
00332 if (routing_table[dtype].new_org_counter >=
00333 PER_IIF * routing_table[dtype].num_iif
00334 && NEG_REINF_ == true) {
00335 GenNeg(dtype);
00336 routing_table[dtype].new_org_counter = 0;
00337 routing_table[dtype].ClrAllNewOrg();
00338 routing_table[dtype].ClrAllOldOrg();
00339 }
00340 return;
00341 }
00342 }
00343
00344
00345 void DiffusionRate::InterestHandle(Packet *pkt)
00346 {
00347 hdr_cdiff *dfh = HDR_CDIFF(pkt);
00348 unsigned int dtype = dfh->data_type;
00349 Agent_List *agentPtr;
00350
00351 nsaddr_t from_nodeID;
00352 PrvCurPtr RetVal;
00353 Out_List *OutPtr;
00354
00355
00356 if (dfh->ts_ + INTEREST_TIMEOUT < NOW) {
00357 Packet::free(pkt);
00358 return;
00359 }
00360
00361
00362
00363
00364 from_nodeID = (dfh->sender_id).addr_;
00365
00366 if (THIS_NODE == from_nodeID) {
00367
00368
00369
00370
00371 RetVal = INTF_FIND(routing_table[dtype].sink, dfh->sender_id);
00372
00373 if (RetVal.cur == NULL) {
00374
00375 agentPtr = new Agent_List;
00376 AGT_ADDR(agentPtr) = dfh->sender_id;
00377 INTF_INSERT(routing_table[dtype].sink, agentPtr);
00378
00379 God::instance()->AddSink(dtype, THIS_NODE);
00380 }
00381 } else {
00382
00383
00384
00385 RetVal = INTF_FIND(routing_table[dtype].active, dfh->forward_agent_id);
00386 if (RetVal.cur == NULL) {
00387 OutPtr = new Out_List;
00388 AGT_ADDR(OutPtr) = dfh->forward_agent_id;
00389 GRADIENT(OutPtr) = dfh->report_rate;
00390 GRAD_TMOUT(OutPtr) = dfh->ts_ + INTEREST_TIMEOUT;
00391 INTF_INSERT(routing_table[dtype].active, OutPtr);
00392 routing_table[dtype].num_active ++;
00393 } else {
00394 GRAD_TMOUT(RetVal.cur) = max(GRAD_TMOUT(RetVal.cur),
00395 dfh->ts_ + INTEREST_TIMEOUT);
00396 }
00397
00398 }
00399
00400 if (NOW > routing_table[dtype].last_fwd_time + INTEREST_PERIODIC) {
00401 if (routing_table[dtype].ExistOriginalGradient() == true) {
00402 DataReqAll(dtype, ORIGINAL);
00403 } else {
00404 DataReqAll(dtype, SUB_SAMPLED);
00405 }
00406 routing_table[dtype].last_fwd_time = NOW;
00407 MACprepare(pkt, MAC_BROADCAST, NS_AF_ILINK, 0);
00408 MACsend(pkt, JITTER*Random::uniform(1.0));
00409 overhead++;
00410
00411 #ifdef DEBUG_RATE
00412 hdr_cmn *cmh = HDR_CMN(pkt);
00413 printf("DF node %x will send %s (%x, %x, %d) to %x\n",
00414 THIS_NODE, MsgStr[dfh->mess_type], (dfh->sender_id).addr_,
00415 (dfh->sender_id).port_, dfh->pk_num, cmh->next_hop());
00416 #endif
00417
00418
00419 return;
00420 }
00421
00422 Packet::free(pkt);
00423 return;
00424 }
00425
00426
00427 void DiffusionRate::NegReinfTimeOut()
00428 {
00429 for (int i=0; i<MAX_DATA_TYPE; i++) {
00430 GenNeg(i);
00431 routing_table[i].new_org_counter = 0;
00432 routing_table[i].ClrAllNewOrg();
00433 routing_table[i].ClrAllOldOrg();
00434 }
00435
00436 neg_reinf_timer->resched(NEG_CHECK);
00437 }
00438
00439
00440 void DiffusionRate::GradientTimeOut()
00441 {
00442 int i;
00443 Agent_List *cur_out, **prv_out;
00444
00445 for (i=0; i<MAX_DATA_TYPE; i++) {
00446 for (cur_out = routing_table[i].active,
00447 prv_out = (Agent_List **)&routing_table[i].active;
00448 cur_out != NULL; ) {
00449
00450 if (NOW > GRAD_TMOUT(cur_out)) {
00451 INTF_REMOVE(prv_out, cur_out);
00452 routing_table[i].num_active -- ;
00453 cur_out = *prv_out;
00454 }
00455 else {
00456 prv_out = &(cur_out->next);
00457 cur_out = cur_out->next;
00458 }
00459
00460 }
00461 }
00462
00463 gradient_timer->resched(INTEREST_TIMEOUT);
00464 }
00465
00466
00467 bool DiffusionRate::FwdSubsample(Packet *pkt)
00468 {
00469 hdr_cdiff *dfh = HDR_CDIFF(pkt);
00470 Out_List *cur_out;
00471 Packet *cur_pkt;
00472 hdr_cdiff *cur_dfh;
00473 hdr_ip *cur_iph;
00474 unsigned int dtype = dfh->data_type;
00475
00476 if (routing_table[dtype].num_active <= 0) {
00477 num_not_send_bcast_data++;
00478 return false;
00479 }
00480
00481
00482
00483 num_data_bcast_send++;
00484
00485 if (sub_type_ == BCAST_SUB) {
00486 MACprepare(pkt, MAC_BROADCAST, NS_AF_ILINK, 0);
00487 MACsend(pkt, JITTER*Random::uniform(1.0));
00488
00489 #ifdef DEBUG_RATE
00490 hdr_cmn *cmh = HDR_CMN(pkt);
00491 printf("DF node %x will send %s (%x, %x, %d) to %x\n",
00492 THIS_NODE, MsgStr[dfh->mess_type], (dfh->sender_id).addr_,
00493 (dfh->sender_id).port_, dfh->pk_num, cmh->next_hop());
00494 #endif // DEBUG_RATE
00495
00496 return true;
00497 }
00498
00499 if (sub_type_ == UNICAST_SUB) {
00500 for (cur_out = routing_table[dtype].active; cur_out!= NULL;
00501 cur_out = OUT_NEXT(cur_out)) {
00502
00503 cur_pkt = pkt->copy();
00504 cur_iph = HDR_IP(cur_pkt);
00505 cur_iph->dst_ = AGT_ADDR(cur_out);
00506
00507 cur_dfh = HDR_CDIFF(cur_pkt);
00508 cur_dfh->forward_agent_id = here_;
00509 cur_dfh->num_next = 1;
00510 cur_dfh->next_nodes[0] = NODE_ADDR(cur_out);
00511
00512 MACprepare(cur_pkt, NODE_ADDR(cur_out), NS_AF_INET,
00513 MAC_RETRY_);
00514 MACsend(cur_pkt, 0);
00515
00516 #ifdef DEBUG_RATE
00517 cur_cmh = HDR_CMN(cur_pkt);
00518 printf("DF node %x will send %s (%x, %x, %d) to %x\n",
00519 THIS_NODE, MsgStr[cur_dfh->mess_type],
00520 (cur_dfh->sender_id).addr_, (cur_dfh->sender_id).port_,
00521 cur_dfh->pk_num, cur_cmh->next_hop());
00522 #endif // DEBUG_RATE
00523
00524 }
00525
00526 return true;
00527 }
00528
00529 return false;
00530 }
00531
00532
00533 void DiffusionRate::TriggerPosReinf(Packet *pkt, ns_addr_t forward_agent)
00534 {
00535 hdr_cdiff *dfh = HDR_CDIFF(pkt);
00536 unsigned int dtype = dfh->data_type;
00537 nsaddr_t forwarder_node = forward_agent.addr_;
00538
00539 if (pos_node_type_ == INTM_POS) {
00540 if (routing_table[dtype].sink != NULL ||
00541 routing_table[dtype].ExistOriginalGradient() == true) {
00542 DataReqAll(dtype, ORIGINAL);
00543 if (THIS_NODE != forwarder_node) {
00544 PosReinf(dtype, forwarder_node, dfh->sender_id,
00545 dfh->pk_num);
00546 routing_table[dtype].CntPosSend(forward_agent);
00547 routing_table[dtype].ClrNewSub(forward_agent);
00548 }
00549 }
00550 return;
00551 }
00552
00553
00554 if (pos_node_type_ == END_POS) {
00555 if (routing_table[dtype].sink != NULL) {
00556 DataReqAll(dtype, ORIGINAL);
00557 if (THIS_NODE != forwarder_node) {
00558 PosReinf(dtype, forwarder_node, dfh->sender_id,
00559 dfh->pk_num);
00560 routing_table[dtype].CntPosSend(forward_agent);
00561 routing_table[dtype].ClrNewSub(forward_agent);
00562 }
00563 }
00564
00565 return;
00566 }
00567 }
00568
00569
00570 void DiffusionRate::FwdOriginal(Packet *pkt)
00571 {
00572 hdr_cdiff *dfh = HDR_CDIFF(pkt);
00573 unsigned int dtype = dfh->data_type;
00574 Out_List *cur_out;
00575 Packet *cur_pkt;
00576 hdr_cdiff *cur_dfh;
00577 hdr_ip *cur_iph;
00578
00579 if (org_type_ == BCAST_ORG) {
00580 MACprepare(pkt, MAC_BROADCAST, NS_AF_ILINK, 0);
00581 MACsend(pkt, JITTER*Random::uniform(1.0));
00582
00583 #ifdef DEBUG_RATE
00584 hdr_cmn *cmh = HDR_CMN(pkt);
00585 printf("DF node %x will send %s (%x, %x, %d) to %x\n",
00586 THIS_NODE, MsgStr[dfh->mess_type], (dfh->sender_id).addr_,
00587 (dfh->sender_id).port_, dfh->pk_num, cmh->next_hop());
00588 #endif // DEBUG_RATE
00589
00590 return;
00591 }
00592
00593 if (org_type_ == UNICAST_ORG) {
00594 for (cur_out = routing_table[dtype].active; cur_out!= NULL;
00595 cur_out = OUT_NEXT(cur_out)) {
00596 if (GRADIENT(cur_out) == ORIGINAL) {
00597
00598 cur_pkt = pkt->copy();
00599 cur_iph = HDR_IP(cur_pkt);
00600 cur_iph->dst_ = AGT_ADDR(cur_out);
00601
00602 cur_dfh = HDR_CDIFF(cur_pkt);
00603 cur_dfh->forward_agent_id = here_;
00604 cur_dfh->num_next = 1;
00605 cur_dfh->next_nodes[0] = NODE_ADDR(cur_out);
00606
00607 cur_out->num_data_send++;
00608
00609 MACprepare(cur_pkt, NODE_ADDR(cur_out), NS_AF_INET,
00610 MAC_RETRY_);
00611 MACsend(cur_pkt, 0);
00612
00613 #ifdef DEBUG_RATE
00614 cur_cmh = HDR_CMN(cur_pkt);
00615 printf("DF node %x will send %s (%x, %x, %d) to %x\n",
00616 THIS_NODE, MsgStr[cur_dfh->mess_type],
00617 (cur_dfh->sender_id).addr_, (cur_dfh->sender_id).port_,
00618 cur_dfh->pk_num, cur_cmh->next_hop());
00619 #endif // DEBUG_RATE
00620
00621 }
00622 }
00623
00624 Packet::free(pkt);
00625 return;
00626 }
00627 }
00628
00629
00630 void DiffusionRate::FwdData(Packet *pkt)
00631 {
00632 hdr_cdiff *dfh = HDR_CDIFF(pkt);
00633 unsigned int dtype = dfh->data_type;
00634 nsaddr_t forwarder_node;
00635 ns_addr_t forward_agent;
00636 bool forward_flag;
00637
00638 forwarder_node = (dfh->forward_agent_id).addr_;
00639 forward_agent = dfh->forward_agent_id;
00640
00641 if (dfh->report_rate == SUB_SAMPLED) {
00642 forward_flag = FwdSubsample(pkt);
00643 TriggerPosReinf(pkt, forward_agent);
00644
00645 if (forward_flag == false) {
00646 Packet::free(pkt);
00647 }
00648 return;
00649 }
00650
00651
00652
00653 if (routing_table[dtype].ExistOriginalGradient() == false
00654 && routing_table[dtype].sink == NULL) {
00655
00656 if (THIS_NODE != forwarder_node && NEG_REINF_ == true) {
00657 BcastNeg(dtype);
00658 routing_table[dtype].new_org_counter = 0;
00659 routing_table[dtype].ClrAllNewOrg();
00660 routing_table[dtype].ClrAllOldOrg();
00661 }
00662 Packet::free(pkt);
00663 return;
00664 }
00665
00666 if (routing_table[dtype].ExistOriginalGradient() == false) {
00667 Packet::free(pkt);
00668 return;
00669 }
00670
00671 FwdOriginal(pkt);
00672 }
00673
00674
00675 void DiffusionRate::DataReqAll(unsigned int dtype, int report_rate)
00676 {
00677 Agent_List *cur_agent;
00678 Packet *pkt;
00679 hdr_cdiff *dfh;
00680
00681 for (cur_agent=routing_table[dtype].source; cur_agent != NULL;
00682 cur_agent = AGENT_NEXT(cur_agent) ) {
00683 pkt = prepare_message(dtype, AGT_ADDR(cur_agent), DATA_REQUEST);
00684 dfh = HDR_CDIFF(pkt);
00685 dfh->report_rate = report_rate;
00686 send_to_dmux(pkt, 0);
00687 }
00688 }
00689
00690
00691 void DiffusionRate::GenNeg(int dtype)
00692 {
00693 In_List *cur;
00694
00695 if (neg_thr_type_ == NEG_ABSOLUTE) {
00696 for (cur= routing_table[dtype].iif; cur != NULL; cur= IN_NEXT(cur)) {
00697 if (NEW_ORG_RECV(cur) <= 0 && OLD_ORG_RECV(cur) > MAX_DUP_DATA) {
00698 UcastNeg(dtype, AGT_ADDR(cur));
00699 cur->num_neg_send++;
00700 }
00701 }
00702 return;
00703 }
00704
00705 if (neg_thr_type_ == NEG_RELATIVE) {
00706 int most= routing_table[dtype].MostRecvOrg();
00707
00708 for (cur= routing_table[dtype].iif; cur != NULL; cur= IN_NEXT(cur)) {
00709 if (OLD_ORG_RECV(cur) > MAX_DUP_DATA &&
00710 NEW_ORG_RECV(cur) <= NEG_MIN_RATIO*most) {
00711 UcastNeg(dtype, AGT_ADDR(cur));
00712 cur->num_neg_send++;
00713 }
00714 }
00715 return;
00716 }
00717 }
00718
00719
00720 void DiffusionRate::BcastNeg(int dtype)
00721 {
00722 ns_addr_t bcast_addr;
00723 bcast_addr.addr_ = MAC_BROADCAST;
00724 bcast_addr.port_ = ROUTING_PORT;
00725
00726 Packet *pkt=prepare_message(dtype, bcast_addr, NEG_REINFORCE);
00727
00728 MACprepare(pkt, MAC_BROADCAST, NS_AF_ILINK, 0);
00729 MACsend(pkt, 0);
00730 overhead++;
00731 num_neg_bcast_send++;
00732
00733 #ifdef DEBUG_RATE
00734 hdr_cdiff *dfh = HDR_CDIFF(pkt);
00735 hdr_cmn *cmh = HDR_CMN(pkt);
00736 printf("DF node %d will send %s (%x, %x, %d) to %x\n",
00737 THIS_NODE, MsgStr[dfh->mess_type], (dfh->sender_id).addr_,
00738 (dfh->sender_id).port_, dfh->pk_num, cmh->next_hop());
00739 #endif // DEBUG_RATE
00740
00741 }
00742
00743
00744 void DiffusionRate::UcastNeg(int dtype, ns_addr_t to)
00745 {
00746 Packet *pkt=prepare_message(dtype, to, NEG_REINFORCE);
00747 MACprepare(pkt, to.addr_, NS_AF_INET, 0);
00748 MACsend(pkt, 0);
00749 overhead++;
00750
00751 #ifdef DEBUG_RATE
00752 hdr_cdiff *dfh = HDR_CDIFF(pkt);
00753 hdr_cmn *cmh = HDR_CMN(pkt);
00754 printf("DF node %d will send %s (%x, %x, %d) to %x\n",
00755 THIS_NODE, MsgStr[dfh->mess_type], (dfh->sender_id).addr_,
00756 (dfh->sender_id).port_, dfh->pk_num, cmh->next_hop());
00757 #endif
00758
00759 }
00760
00761
00762 void DiffusionRate::ProcessNegReinf(Packet *pkt)
00763 {
00764 hdr_cdiff *dfh = HDR_CDIFF(pkt);
00765 unsigned int dtype = dfh->data_type;
00766 Out_List *cur_out;
00767 PrvCurPtr RetVal;
00768
00769 RetVal=INTF_FIND(routing_table[dtype].active, dfh->forward_agent_id);
00770
00771 if (RetVal.cur == NULL) {
00772 Packet::free(pkt);
00773 return;
00774 }
00775
00776 cur_out = (Out_List *)(RetVal.cur);
00777 if (GRADIENT(cur_out) == SUB_SAMPLED) {
00778 Packet::free(pkt);
00779 return;
00780 }
00781
00782 GRADIENT(cur_out) = SUB_SAMPLED;
00783
00784 if (routing_table[dtype].ExistOriginalGradient() == false &&
00785 routing_table[dtype].sink == NULL) {
00786
00787 DataReqAll(dtype, SUB_SAMPLED);
00788
00789 if (NEG_REINF_ == true) {
00790 BcastNeg(dtype);
00791 routing_table[dtype].new_org_counter = 0;
00792 routing_table[dtype].ClrAllNewOrg();
00793 routing_table[dtype].ClrAllOldOrg();
00794 }
00795 }
00796
00797 Packet::free(pkt);
00798 }
00799
00800
00801 void DiffusionRate::ProcessPosReinf(Packet *pkt)
00802 {
00803 hdr_cdiff *dfh= HDR_CDIFF(pkt);
00804 unsigned int dtype = dfh->data_type;
00805 Out_List *cur_out, *OutPtr;
00806 PrvCurPtr RetVal;
00807
00808 RetVal=INTF_FIND(routing_table[dtype].active, dfh->forward_agent_id);
00809
00810 if (RetVal.cur != NULL) {
00811 cur_out = (Out_List *)(RetVal.cur);
00812 GRADIENT(cur_out) = ORIGINAL;
00813 GRAD_TMOUT(RetVal.cur) = max(GRAD_TMOUT(RetVal.cur),
00814 dfh->ts_ + INTEREST_TIMEOUT);
00815 NUM_POS_RECV(cur_out)++;
00816 } else {
00817
00818 OutPtr = new Out_List;
00819 AGT_ADDR(OutPtr) = dfh->forward_agent_id;
00820 GRADIENT(OutPtr) = dfh->report_rate;
00821 GRAD_TMOUT(OutPtr) = dfh->ts_ + INTEREST_TIMEOUT;
00822 INTF_INSERT(routing_table[dtype].active, OutPtr);
00823 routing_table[dtype].num_active ++;
00824 NUM_POS_RECV(OutPtr)++;
00825 }
00826
00827 DataReqAll(dtype, ORIGINAL);
00828
00829
00830 Pkt_Hash_Entry *hashPtr;
00831 nsaddr_t next_node;
00832 In_List *recent_in;
00833 In_List *cur;
00834
00835 switch(pos_type_) {
00836
00837 case POS_HASH:
00838 hashPtr=PktTable.GetHash(dfh->info.sender, dfh->info.seq);
00839 if (hashPtr == NULL) {
00840 perror("Hey! I've never seen that packet before.\n");
00841 Packet::free(pkt);
00842 exit(-1);
00843 }
00844
00845 next_node = (hashPtr->forwarder_id).addr_;
00846 if (next_node == THIS_NODE) {
00847 Packet::free(pkt);
00848 return;
00849 }
00850
00851 PosReinf(dtype, hashPtr->forwarder_id.addr_, dfh->info.sender,
00852 dfh->info.seq);
00853 routing_table[dtype].CntPosSend(hashPtr->forwarder_id);
00854 routing_table[dtype].ClrNewSub(hashPtr->forwarder_id);
00855
00856 #ifdef DEBUG_RATE
00857 printf("DF node %d will send %s to %x\n",
00858 THIS_NODE, MsgStr[dfh->mess_type], hashPtr->forwarder_id.addr_);
00859 #endif // DEBUG_RATE
00860
00861 Packet::free(pkt);
00862 return;
00863
00864
00865 case POS_LAST:
00866 recent_in = routing_table[dtype].MostRecentIn();
00867 if (recent_in == NULL) {
00868 Packet::free(pkt);
00869 return;
00870 }
00871
00872 next_node = NODE_ADDR(recent_in);
00873 if (next_node == THIS_NODE) {
00874 Packet::free(pkt);
00875 return;
00876 }
00877
00878 PosReinf(dtype, NODE_ADDR(recent_in), dfh->info.sender, dfh->info.seq);
00879 routing_table[dtype].CntPosSend(AGT_ADDR(recent_in));
00880 routing_table[dtype].ClrNewSub(AGT_ADDR(recent_in));
00881
00882 #ifdef DEBUG_RATE
00883 printf("DF node %d will send %s to %x\n",
00884 THIS_NODE, MsgStr[dfh->mess_type], NODE_ADDR(recent_in));
00885 #endif // DEBUG_RATE
00886
00887 Packet::free(pkt);
00888 return;
00889
00890
00891 case POS_ALL:
00892 for (cur = routing_table[dtype].iif; cur!=NULL; cur = IN_NEXT(cur)) {
00893
00894 if (NEW_SUB_RECV(cur) <= 0) {
00895 continue;
00896 }
00897
00898 next_node = NODE_ADDR(cur);
00899
00900 if (next_node == THIS_NODE) {
00901 continue;
00902 }
00903
00904 PosReinf(dtype, NODE_ADDR(cur), dfh->info.sender, dfh->info.seq);
00905 routing_table[dtype].CntPosSend(AGT_ADDR(cur));
00906 routing_table[dtype].ClrNewSub(AGT_ADDR(cur));
00907
00908 #ifdef DEBUG_RATE
00909 printf("DF node %d will send %s to %x\n",
00910 THIS_NODE, MsgStr[dfh->mess_type], NODE_ADDR(cur));
00911 #endif // DEBUG_RATE
00912
00913 }
00914 Packet::free(pkt);
00915 return;
00916
00917 default:
00918 Packet::free(pkt);
00919 return;
00920 }
00921 }
00922
00923
00924 void DiffusionRate::PosReinf(int dtype, nsaddr_t to_node,
00925 ns_addr_t info_sender, unsigned int info_seq)
00926 {
00927 ns_addr_t to_agent_addr;
00928 to_agent_addr.addr_ = to_node;
00929 to_agent_addr.port_ = ROUTING_PORT;
00930
00931 Packet *pkt=prepare_message(dtype, to_agent_addr, POS_REINFORCE);
00932 hdr_cdiff *dfh = HDR_CDIFF(pkt);
00933
00934 dfh->report_rate = ORIGINAL;
00935 dfh->info.sender = info_sender;
00936 dfh->info.seq = info_seq;
00937
00938 MACprepare(pkt, to_node, NS_AF_INET, 1);
00939 MACsend(pkt, 0);
00940 overhead++;
00941
00942 #ifdef DEBUG_RATE
00943 hdr_cmn *cmh = HDR_CMN(pkt);
00944 printf("DF node %d will send %s (%x, %x, %d) to %x\n",
00945 THIS_NODE, MsgStr[dfh->mess_type], (dfh->sender_id).addr_,
00946 (dfh->sender_id).port_, dfh->pk_num, cmh->next_hop());
00947 #endif
00948
00949 }
00950
00951
00952 void DiffusionRate::Start()
00953 {
00954 DiffusionAgent::Start();
00955
00956 gradient_timer = new GradientTimer(this);
00957 gradient_timer->resched(INTEREST_TIMEOUT);
00958
00959 if ( neg_win_type_ == NEG_TIMER && NEG_REINF_ == true) {
00960 neg_reinf_timer = new NegativeReinforceTimer(this);
00961 neg_reinf_timer->resched(NEG_CHECK);
00962 }
00963 }
00964
00965
00966 void DiffusionRate::reset()
00967 {
00968 DiffusionAgent::reset();
00969 DataTable.reset();
00970 }
00971
00972
00973 void DiffusionRate::Print_IOlist()
00974 {
00975 Out_List *cur_out;
00976 In_List *cur_in;
00977 int i;
00978
00979 for (i=0; i<1; i++) {
00980 printf("Node %d DATA TYPE %d: send bcast data %d, not send %d, rcv %d\n",
00981 THIS_NODE, i, num_data_bcast_send, num_not_send_bcast_data,
00982 num_data_bcast_rcv);
00983 printf("Node %d neg bcast send %d, neg bcast rcv %d\n",
00984 THIS_NODE, num_neg_bcast_send, num_neg_bcast_rcv);
00985 for (cur_out = routing_table[i].active; cur_out != NULL;
00986 cur_out = OUT_NEXT(cur_out) ) {
00987 printf("DF node %d has oif %d (%f,%d) send data %d recv neg %d pos %d\n",
00988 THIS_NODE, NODE_ADDR(cur_out), GRADIENT(cur_out),
00989 routing_table[i].num_active, NUM_DATA_SEND(cur_out),
00990 NUM_NEG_RECV(cur_out), NUM_POS_RECV(cur_out));
00991 }
00992
00993 for (cur_in = routing_table[i].iif; cur_in != NULL;
00994 cur_in = IN_NEXT(cur_in) ) {
00995 printf("Diffusion node %d has iif for %d\n",
00996 THIS_NODE, NODE_ADDR(cur_in));
00997 printf("Node %d recv new sub %d,new org %d,old org %d:send neg %d pos %d\n",
00998 THIS_NODE, TOTAL_NEW_SUB_RECV(cur_in), TOTAL_NEW_ORG_RECV(cur_in),
00999 TOTAL_OLD_ORG_RECV(cur_in), NUM_NEG_SEND(cur_in),
01000 NUM_POS_SEND(cur_in));
01001 }
01002
01003 }
01004 }
01005
01006
01007 int DiffusionRate::command(int argc, const char*const*argv)
01008 {
01009 if (argc == 2) {
01010 if (strcasecmp(argv[1], "enable-suppression") == 0) {
01011 DUP_SUP_ = true;
01012 return TCL_OK;
01013 }
01014
01015 if (strcasecmp(argv[1], "disable-suppression") == 0) {
01016 DUP_SUP_ = false;
01017 return TCL_OK;
01018 }
01019
01020 }
01021
01022 else if (argc == 3) {
01023
01024 if (strcasecmp(argv[1], "set-sub-tx-type") == 0 ) {
01025 sub_type_ = ParseSubType(argv[2]);
01026 return TCL_OK;
01027 }
01028
01029 if (strcasecmp(argv[1], "set-org-tx-type") == 0 ) {
01030 org_type_ = ParseOrgType(argv[2]);
01031 return TCL_OK;
01032 }
01033
01034 if (strcasecmp(argv[1], "set-pos-type") == 0 ) {
01035 pos_type_ = ParsePosType(argv[2]);
01036 return TCL_OK;
01037 }
01038
01039 if (strcasecmp(argv[1], "set-pos-node-type") == 0 ) {
01040 pos_node_type_ = ParsePosNodeType(argv[2]);
01041 return TCL_OK;
01042 }
01043
01044 if (strcasecmp(argv[1], "set-neg-win-type") == 0 ) {
01045 neg_win_type_ = ParseNegWinType(argv[2]);
01046 return TCL_OK;
01047 }
01048
01049 if (strcasecmp(argv[1], "set-neg-thr-type") == 0 ) {
01050 neg_thr_type_ = ParseNegThrType(argv[2]);
01051 return TCL_OK;
01052 }
01053
01054 if (strcasecmp(argv[1], "set-neg-max-type") == 0 ) {
01055 neg_max_type_ = ParseNegMaxType(argv[2]);
01056 return TCL_OK;
01057 }
01058 }
01059
01060
01061 return DiffusionAgent::command(argc, argv);
01062 }
01063
01064
01065
01066 sub_t ParseSubType(const char* str)
01067 {
01068 if (strcasecmp(str, "BROADCAST") == 0) {
01069 return BCAST_SUB;
01070 }
01071
01072 if (strcasecmp(str, "UNICAST") == 0) {
01073 return UNICAST_SUB;
01074 }
01075
01076 fprintf(stderr,"ParseSubType Error -- Only BROADCAST or UNICAST\n");
01077 exit(-1);
01078 }
01079
01080
01081
01082 org_t ParseOrgType(const char* str)
01083 {
01084 if (strcasecmp(str, "BROADCAST") == 0) {
01085 return BCAST_ORG;
01086 }
01087
01088 if (strcasecmp(str, "UNICAST") == 0) {
01089 return UNICAST_ORG;
01090 }
01091
01092 fprintf(stderr,"ParseOrgType Error -- Only BROADCAST or UNICAST\n");
01093 exit(-1);
01094 }
01095
01096
01097 pos_t ParsePosType(const char* str)
01098 {
01099 if (strcasecmp(str, "HASH") == 0) {
01100 return POS_HASH;
01101 }
01102
01103 if (strcasecmp(str, "LAST") == 0) {
01104 return POS_LAST;
01105 }
01106
01107 if (strcasecmp(str, "ALL") == 0) {
01108 return POS_ALL;
01109 }
01110
01111 fprintf(stderr,"ParsePosType Error -- Only HASH, LAST, or ALL\n");
01112 exit(-1);
01113 }
01114
01115
01116 pos_ndt ParsePosNodeType(const char* str)
01117 {
01118 if (strcasecmp(str, "END") == 0) {
01119 return END_POS;
01120 }
01121
01122 if (strcasecmp(str, "INTM") == 0) {
01123 return INTM_POS;
01124 }
01125
01126 fprintf(stderr,"ParsePosNodeType Error -- Only END or INTM\n");
01127 exit(-1);
01128 }
01129
01130
01131 neg_wint ParseNegWinType(const char* str)
01132 {
01133 if (strcasecmp(str, "COUNTER") == 0) {
01134 return NEG_COUNTER;
01135 }
01136
01137 if (strcasecmp(str, "TIMER") == 0) {
01138 return NEG_TIMER;
01139 }
01140
01141 fprintf(stderr,"ParseNegWinType Error -- Only COUNTER or TIMER\n");
01142 exit(-1);
01143 }
01144
01145
01146 neg_tht ParseNegThrType(const char* str)
01147 {
01148 if (strcasecmp(str, "ABSOLUTE") == 0) {
01149 return NEG_ABSOLUTE;
01150 }
01151
01152 if (strcasecmp(str, "RELATIVE") == 0) {
01153 return NEG_RELATIVE;
01154 }
01155
01156 fprintf(stderr,"ParseNegThrType Error -- Only ABSOLUTE or RELATIVE\n");
01157 exit(-1);
01158 }
01159
01160
01161 neg_mxt ParseNegMaxType(const char* str)
01162 {
01163 if (strcasecmp(str, "FIXED") == 0) {
01164 return NEG_FIXED_MAX;
01165 }
01166
01167 if (strcasecmp(str, "SCALE") == 0) {
01168 return NEG_SCALE_MAX;
01169 }
01170
01171 fprintf(stderr,"ParseNegMaxType Error -- Only FIXED or SCALE\n");
01172 exit(-1);
01173 }
01174
01175
01176
01177
01178