00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040
00041
00042
00043 #include "two_phase_pull.hh"
00044
00045 #ifdef NS_DIFFUSION
00046 static class GradientFilterClass : public TclClass {
00047 public:
00048 GradientFilterClass() : TclClass("Application/DiffApp/GradientFilter") {}
00049 TclObject* create(int argc, const char*const* argv) {
00050 if (argc == 5)
00051 return(new GradientFilter(argv[4]));
00052 else
00053 fprintf(stderr, "Insufficient number of args for creating GradientFilter");
00054 return (NULL);
00055 }
00056 } class_gradient_filter;
00057
00058 int GradientFilter::command(int argc, const char*const* argv) {
00059 if (argc == 3) {
00060 if (strcasecmp(argv[1], "debug") == 0) {
00061 global_debug_level = atoi(argv[2]);
00062 if (global_debug_level < 1 || global_debug_level > 10) {
00063 global_debug_level = DEBUG_DEFAULT;
00064 printf("Error: Debug level outside range(1-10) or missing !\n");
00065 }
00066 }
00067 }
00068 return DiffApp::command(argc, argv);
00069 }
00070
00071 #endif // NS_DIFFUSION
00072
00073 void GradientFilterReceive::recv(Message *msg, handle h)
00074 {
00075 app_->recv(msg, h);
00076 }
00077
00078 int TppMessageSendTimer::expire()
00079 {
00080
00081 agent_->messageTimeout(msg_);
00082
00083
00084 delete this;
00085 return -1;
00086 }
00087
00088 int TppInterestForwardTimer::expire()
00089 {
00090
00091 agent_->interestTimeout(msg_);
00092
00093
00094 delete this;
00095 return -1;
00096 }
00097
00098 int TppSubscriptionExpirationTimer::expire()
00099 {
00100 int retval;
00101
00102 retval = agent_->subscriptionTimeout(attrs_);
00103
00104
00105 if (retval == -1)
00106 delete this;
00107
00108 return retval;
00109 }
00110
00111 int TppGradientExpirationCheckTimer::expire()
00112 {
00113
00114 agent_->gradientTimeout();
00115
00116
00117 return 0;
00118 }
00119
00120 int TppReinforcementCheckTimer::expire()
00121 {
00122
00123 agent_->reinforcementTimeout();
00124
00125
00126 return 0;
00127 }
00128
00129 void GradientFilter::interestTimeout(Message *msg)
00130 {
00131 DiffPrint(DEBUG_MORE_DETAILS, "Interest Timeout !\n");
00132
00133 msg->last_hop_ = LOCALHOST_ADDR;
00134 msg->next_hop_ = BROADCAST_ADDR;
00135
00136 ((DiffusionRouting *)dr_)->sendMessage(msg, filter_handle_);
00137 }
00138
00139 void GradientFilter::messageTimeout(Message *msg)
00140 {
00141 DiffPrint(DEBUG_MORE_DETAILS, "Message Timeout !\n");
00142
00143 ((DiffusionRouting *)dr_)->sendMessage(msg, filter_handle_);
00144 }
00145
00146 void GradientFilter::gradientTimeout()
00147 {
00148 RoutingTable::iterator routing_itr;
00149 GradientList::iterator grad_itr;
00150 AgentList::iterator agent_itr;
00151 TppRoutingEntry *routing_entry;
00152 GradientEntry *gradient_entry;
00153 AgentEntry *agent_entry;
00154 struct timeval tmv;
00155
00156 DiffPrint(DEBUG_MORE_DETAILS, "Gradient Timeout !\n");
00157
00158 GetTime(&tmv);
00159
00160 routing_itr = routing_list_.begin();
00161
00162 while (routing_itr != routing_list_.end()){
00163 routing_entry = *routing_itr;
00164
00165
00166 grad_itr = routing_entry->gradients_.begin();
00167 while (grad_itr != routing_entry->gradients_.end()){
00168 gradient_entry = *grad_itr;
00169 if (tmv.tv_sec > (gradient_entry->tv_.tv_sec + GRADIENT_TIMEOUT)){
00170
00171 DiffPrint(DEBUG_NO_DETAILS, "Deleting Gradient to node %d !\n",
00172 gradient_entry->node_addr_);
00173
00174 grad_itr = routing_entry->gradients_.erase(grad_itr);
00175 delete gradient_entry;
00176 }
00177 else{
00178 grad_itr++;
00179 }
00180 }
00181
00182
00183 agent_itr = routing_entry->agents_.begin();
00184 while (agent_itr != routing_entry->agents_.end()){
00185 agent_entry = *agent_itr;
00186 if (tmv.tv_sec > (agent_entry->tv_.tv_sec + GRADIENT_TIMEOUT)){
00187
00188 DiffPrint(DEBUG_NO_DETAILS,
00189 "Deleting Gradient to agent %d !\n", agent_entry->port_);
00190
00191 agent_itr = routing_entry->agents_.erase(agent_itr);
00192 delete agent_entry;
00193 }
00194 else{
00195 agent_itr++;
00196 }
00197 }
00198
00199
00200 if ((routing_entry->gradients_.size() == 0) &&
00201 (routing_entry->agents_.size() == 0)){
00202
00203 DiffPrint(DEBUG_DETAILS,
00204 "Nothing left for this data type, cleaning up !\n");
00205 routing_itr = routing_list_.erase(routing_itr);
00206 delete routing_entry;
00207 }
00208 else{
00209 routing_itr++;
00210 }
00211 }
00212 }
00213
00214 void GradientFilter::reinforcementTimeout()
00215 {
00216 DataNeighborList::iterator data_neighbor_itr;
00217 DataNeighborEntry *data_neighbor_entry;
00218 RoutingTable::iterator routing_itr;
00219 TppRoutingEntry *routing_entry;
00220 Message *my_message;
00221
00222 DiffPrint(DEBUG_MORE_DETAILS, "Reinforcement Timeout !\n");
00223
00224 routing_itr = routing_list_.begin();
00225
00226 while (routing_itr != routing_list_.end()){
00227 routing_entry = *routing_itr;
00228
00229
00230 data_neighbor_itr = routing_entry->data_neighbors_.begin();
00231
00232 while (data_neighbor_itr != routing_entry->data_neighbors_.end()){
00233 data_neighbor_entry = *data_neighbor_itr;
00234
00235 if (data_neighbor_entry->data_flag_ == OLD_MESSAGE){
00236 my_message = new Message(DIFFUSION_VERSION, NEGATIVE_REINFORCEMENT,
00237 0, 0, routing_entry->attrs_->size(), pkt_count_,
00238 random_id_, data_neighbor_entry->neighbor_id_,
00239 LOCALHOST_ADDR);
00240 my_message->msg_attr_vec_ = CopyAttrs(routing_entry->attrs_);
00241
00242 DiffPrint(DEBUG_NO_DETAILS,
00243 "Sending Negative Reinforcement to node %d !\n",
00244 data_neighbor_entry->neighbor_id_);
00245
00246 ((DiffusionRouting *)dr_)->sendMessage(my_message, filter_handle_);
00247
00248 pkt_count_++;
00249 delete my_message;
00250
00251
00252 data_neighbor_itr = routing_entry->data_neighbors_.erase(data_neighbor_itr);
00253 delete data_neighbor_entry;
00254 }
00255 else{
00256 data_neighbor_itr++;
00257 }
00258 }
00259
00260
00261 data_neighbor_itr = routing_entry->data_neighbors_.begin();
00262 while (data_neighbor_itr != routing_entry->data_neighbors_.end()){
00263 data_neighbor_entry = *data_neighbor_itr;
00264 if (data_neighbor_entry->data_flag_ == NEW_MESSAGE){
00265 data_neighbor_entry->data_flag_ = 0;
00266 data_neighbor_itr++;
00267 }
00268 else{
00269
00270 data_neighbor_itr = routing_entry->data_neighbors_.erase(data_neighbor_itr);
00271 delete data_neighbor_entry;
00272 }
00273 }
00274
00275
00276 routing_itr++;
00277 }
00278 }
00279
00280 int GradientFilter::subscriptionTimeout(NRAttrVec *attrs)
00281 {
00282 AttributeList::iterator attribute_itr;
00283 AttributeEntry *attribute_entry;
00284 TppRoutingEntry *routing_entry;
00285 struct timeval tmv;
00286
00287 DiffPrint(DEBUG_MORE_DETAILS, "Subscription Timeout !\n");
00288
00289 GetTime(&tmv);
00290
00291
00292 routing_entry = findRoutingEntry(attrs);
00293
00294 if (routing_entry){
00295
00296
00297 attribute_itr = routing_entry->attr_list_.begin();
00298
00299 while (attribute_itr != routing_entry->attr_list_.end()){
00300 attribute_entry = *attribute_itr;
00301 if (tmv.tv_sec > (attribute_entry->tv_.tv_sec + SUBSCRIPTION_TIMEOUT)){
00302 sendDisinterest(attribute_entry->attrs_, routing_entry);
00303 attribute_itr = routing_entry->attr_list_.erase(attribute_itr);
00304 delete attribute_entry;
00305 }
00306 else{
00307 attribute_itr++;
00308 }
00309 }
00310 }
00311 else{
00312 DiffPrint(DEBUG_DETAILS, "Warning: SubscriptionTimeout could't find RE - maybe deleted by GradientTimeout ?\n");
00313
00314
00315 return -1;
00316 }
00317
00318
00319 return 0;
00320 }
00321
00322 void GradientFilter::deleteRoutingEntry(TppRoutingEntry *routing_entry)
00323 {
00324 RoutingTable::iterator routing_itr;
00325 TppRoutingEntry *current_entry;
00326
00327 for (routing_itr = routing_list_.begin(); routing_itr != routing_list_.end(); ++routing_itr){
00328 current_entry = *routing_itr;
00329 if (current_entry == routing_entry){
00330 routing_itr = routing_list_.erase(routing_itr);
00331 delete routing_entry;
00332 return;
00333 }
00334 }
00335 DiffPrint(DEBUG_ALWAYS, "Error: deleteRoutingEntry could not find entry to delete !\n");
00336 }
00337
00338 TppRoutingEntry * GradientFilter::matchRoutingEntry(NRAttrVec *attrs, RoutingTable::iterator start, RoutingTable::iterator *place)
00339 {
00340 RoutingTable::iterator routing_itr;
00341 TppRoutingEntry *routing_entry;
00342
00343 for (routing_itr = start; routing_itr != routing_list_.end(); ++routing_itr){
00344 routing_entry = *routing_itr;
00345 if (MatchAttrs(routing_entry->attrs_, attrs)){
00346 *place = routing_itr;
00347 return routing_entry;
00348 }
00349 }
00350 return NULL;
00351 }
00352
00353 TppRoutingEntry * GradientFilter::findRoutingEntry(NRAttrVec *attrs)
00354 {
00355 RoutingTable::iterator routing_itr;
00356 TppRoutingEntry *routing_entry;
00357
00358 for (routing_itr = routing_list_.begin(); routing_itr != routing_list_.end(); ++routing_itr){
00359 routing_entry = *routing_itr;
00360 if (PerfectMatch(routing_entry->attrs_, attrs))
00361 return routing_entry;
00362 }
00363 return NULL;
00364 }
00365
00366 AttributeEntry * GradientFilter::findMatchingSubscription(TppRoutingEntry *routing_entry,
00367 NRAttrVec *attrs)
00368 {
00369 AttributeList::iterator attribute_itr;
00370 AttributeEntry *attribute_entry;
00371
00372 for (attribute_itr = routing_entry->attr_list_.begin(); attribute_itr != routing_entry->attr_list_.end(); ++attribute_itr){
00373 attribute_entry = *attribute_itr;
00374 if (PerfectMatch(attribute_entry->attrs_, attrs))
00375 return attribute_entry;
00376 }
00377 return NULL;
00378 }
00379
00380 void GradientFilter::updateGradient(TppRoutingEntry *routing_entry,
00381 int32_t last_hop, bool reinforced)
00382 {
00383 GradientList::iterator gradient_itr;
00384 GradientEntry *gradient_entry;
00385
00386 for (gradient_itr = routing_entry->gradients_.begin();
00387 gradient_itr != routing_entry->gradients_.end(); ++gradient_itr){
00388 gradient_entry = *gradient_itr;
00389 if (gradient_entry->node_addr_ == last_hop){
00390 GetTime(&(gradient_entry->tv_));
00391 if (reinforced)
00392 gradient_entry->reinforced_ = true;
00393 return;
00394 }
00395 }
00396
00397
00398 gradient_entry = new GradientEntry(last_hop);
00399 if (reinforced)
00400 gradient_entry->reinforced_ = true;
00401
00402 routing_entry->gradients_.push_back(gradient_entry);
00403 }
00404
00405 void GradientFilter::updateAgent(TppRoutingEntry *routing_entry,
00406 u_int16_t source_port)
00407 {
00408 AgentList::iterator agent_itr;
00409 AgentEntry *agent_entry;
00410
00411 for (agent_itr = routing_entry->agents_.begin(); agent_itr != routing_entry->agents_.end(); ++agent_itr){
00412 agent_entry = *agent_itr;
00413 if (agent_entry->port_ == source_port){
00414
00415 GetTime(&(agent_entry->tv_));
00416 return;
00417 }
00418 }
00419
00420
00421
00422 agent_entry = new AgentEntry(source_port);
00423 routing_entry->agents_.push_back(agent_entry);
00424 }
00425
00426 void GradientFilter::forwardPushExploratoryData(Message *msg,
00427 DataForwardingHistory *forwarding_history)
00428 {
00429 RoutingTable::iterator routing_itr;
00430 TppRoutingEntry *routing_entry;
00431 AgentList::iterator agent_itr;
00432 AgentEntry *agent_entry;
00433 Message *data_msg, *sink_message;
00434 TimerCallback *data_timer;
00435 unsigned int key[2];
00436 HashEntry *hash_entry;
00437
00438
00439 routing_itr = routing_list_.begin();
00440 routing_entry = matchRoutingEntry(msg->msg_attr_vec_, routing_itr,
00441 &routing_itr);
00442
00443 sink_message = CopyMessage(msg);
00444
00445 while (routing_entry){
00446
00447
00448 for (agent_itr = routing_entry->agents_.begin();
00449 agent_itr != routing_entry->agents_.end(); ++agent_itr){
00450 agent_entry = *agent_itr;
00451
00452 if (!forwarding_history->alreadyForwardedToLibrary(agent_entry->port_)){
00453
00454 sink_message->next_hop_ = LOCALHOST_ADDR;
00455 sink_message->next_port_ = agent_entry->port_;
00456
00457 ((DiffusionRouting *)dr_)->sendMessage(sink_message, filter_handle_);
00458
00459
00460 forwarding_history->forwardingToLibrary(agent_entry->port_);
00461 }
00462 }
00463
00464 if ((!forwarding_history->alreadyReinforced()) &&
00465 (routing_entry->agents_.size() > 0) &&
00466 (msg->last_hop_ != LOCALHOST_ADDR)){
00467
00468 sendPositiveReinforcement(routing_entry->attrs_, msg->rdm_id_,
00469 msg->pkt_num_, msg->last_hop_);
00470
00471
00472 forwarding_history->sendingReinforcement();
00473 }
00474
00475
00476 routing_itr++;
00477 routing_entry = matchRoutingEntry(msg->msg_attr_vec_, routing_itr,
00478 &routing_itr);
00479 }
00480
00481
00482 delete sink_message;
00483
00484
00485
00486
00487 if (msg->last_hop_ != LOCALHOST_ADDR){
00488 key[0] = msg->pkt_num_;
00489 key[1] = msg->rdm_id_;
00490
00491 hash_entry = new HashEntry(msg->last_hop_);
00492
00493 putHash(hash_entry, key[0], key[1]);
00494 }
00495
00496
00497 if (!forwarding_history->alreadyForwardedToNetwork(BROADCAST_ADDR)){
00498 data_msg = CopyMessage(msg);
00499 data_msg->next_hop_ = BROADCAST_ADDR;
00500
00501 data_timer = new TppMessageSendTimer(this, data_msg);
00502
00503
00504 ((DiffusionRouting *)dr_)->addTimer(PUSH_DATA_FORWARD_DELAY +
00505 (int) ((PUSH_DATA_FORWARD_JITTER * (GetRand() * 1.0 / RAND_MAX) - (PUSH_DATA_FORWARD_JITTER / 2))),
00506 data_timer);
00507
00508
00509 forwarding_history->forwardingToNetwork(BROADCAST_ADDR);
00510 }
00511 }
00512
00513 void GradientFilter::forwardExploratoryData(Message *msg,
00514 TppRoutingEntry *routing_entry,
00515 DataForwardingHistory *forwarding_history)
00516 {
00517 #ifdef USE_BROADCAST_TO_MULTIPLE_RECIPIENTS
00518 Message *data_msg;
00519 TimerCallback *data_timer;
00520 #else
00521 GradientList::iterator gradient_itr;
00522 GradientEntry *gradient_entry;
00523 #endif // USE_BROADCAST_TO_MULTIPLE_RECIPIENTS
00524 AgentList::iterator agent_itr;
00525 AgentEntry *agent_entry;
00526 Message *sink_message;
00527 unsigned int key[2];
00528 HashEntry *hash_entry;
00529
00530 sink_message = CopyMessage(msg);
00531
00532
00533 for (agent_itr = routing_entry->agents_.begin();
00534 agent_itr != routing_entry->agents_.end(); ++agent_itr){
00535 agent_entry = *agent_itr;
00536
00537 if (!forwarding_history->alreadyForwardedToLibrary(agent_entry->port_)){
00538
00539 sink_message->next_hop_ = LOCALHOST_ADDR;
00540 sink_message->next_port_ = agent_entry->port_;
00541
00542
00543 forwarding_history->forwardingToLibrary(agent_entry->port_);
00544
00545 ((DiffusionRouting *)dr_)->sendMessage(sink_message, filter_handle_);
00546 }
00547 }
00548
00549 delete sink_message;
00550
00551
00552 if ((!forwarding_history->alreadyReinforced()) &&
00553 (routing_entry->agents_.size() > 0) &&
00554 (msg->last_hop_ != LOCALHOST_ADDR)){
00555
00556 sendPositiveReinforcement(routing_entry->attrs_, msg->rdm_id_,
00557 msg->pkt_num_, msg->last_hop_);
00558
00559
00560 forwarding_history->sendingReinforcement();
00561 }
00562
00563
00564
00565
00566 if (msg->last_hop_ != LOCALHOST_ADDR){
00567 setReinforcementFlags(routing_entry, msg->last_hop_, NEW_MESSAGE);
00568 }
00569
00570
00571 if (msg->last_hop_ != LOCALHOST_ADDR){
00572 key[0] = msg->pkt_num_;
00573 key[1] = msg->rdm_id_;
00574
00575 hash_entry = new HashEntry(msg->last_hop_);
00576
00577 putHash(hash_entry, key[0], key[1]);
00578 }
00579
00580
00581 #ifdef USE_BROADCAST_TO_MULTIPLE_RECIPIENTS
00582 if (!forwarding_history->alreadyForwardedToNetwork(BROADCAST_ADDR)){
00583 if (routing_entry->gradients_.size() > 0){
00584
00585 data_msg = CopyMessage(msg);
00586 data_msg->next_hop_ = BROADCAST_ADDR;
00587
00588
00589 forwarding_history->forwardingToNetwork(BROADCAST_ADDR);
00590
00591 data_timer = new TppMessageSendTimer(this, data_msg);
00592
00593
00594 ((DiffusionRouting *)dr_)->addTimer(DATA_FORWARD_DELAY +
00595 (int) ((DATA_FORWARD_JITTER * (GetRand() * 1.0 / RAND_MAX) - (DATA_FORWARD_JITTER / 2))),
00596 data_timer);
00597 }
00598 }
00599 #else
00600
00601 for (gradient_itr = routing_entry->gradients_.begin();
00602 gradient_itr != routing_entry->gradients_.end(); ++gradient_itr){
00603
00604 gradient_entry = *gradient_itr;
00605
00606
00607 if (!forwarding_history->alreadyForwardedToNetwork(gradient_entry->node_addr_)){
00608 msg->next_hop_ = gradient_entry->node_addr_;
00609 ((DiffusionRouting *)dr_)->sendMessage(msg, filter_handle_);
00610
00611
00612 forwarding_history->forwardingToNetwork(gradient_entry->node_addr_);
00613 }
00614 }
00615 #endif // USE_BROADCAST_TO_MULTIPLE_RECIPIENTS
00616 }
00617
00618 void GradientFilter::forwardData(Message *msg, TppRoutingEntry *routing_entry,
00619 DataForwardingHistory *forwarding_history)
00620 {
00621 GradientList::iterator gradient_itr;
00622 AgentList::iterator agent_itr;
00623 GradientEntry *gradient_entry;
00624 AgentEntry *agent_entry;
00625 Message *sink_message, *negative_reinforcement_msg;
00626 bool has_sink = false;
00627
00628 sink_message = CopyMessage(msg);
00629
00630
00631 for (agent_itr = routing_entry->agents_.begin(); agent_itr != routing_entry->agents_.end(); ++agent_itr){
00632 agent_entry = *agent_itr;
00633
00634 has_sink = true;
00635
00636 if (!forwarding_history->alreadyForwardedToLibrary(agent_entry->port_)){
00637
00638 sink_message->next_hop_ = LOCALHOST_ADDR;
00639 sink_message->next_port_ = agent_entry->port_;
00640
00641
00642 forwarding_history->forwardingToLibrary(agent_entry->port_);
00643
00644 ((DiffusionRouting *)dr_)->sendMessage(sink_message, filter_handle_);
00645 }
00646 }
00647
00648 delete sink_message;
00649
00650
00651
00652
00653 if (msg->last_hop_ != LOCALHOST_ADDR){
00654 setReinforcementFlags(routing_entry, msg->last_hop_, NEW_MESSAGE);
00655 }
00656
00657
00658 gradient_itr = routing_entry->gradients_.begin();
00659 gradient_entry = findReinforcedGradients(&routing_entry->gradients_,
00660 gradient_itr, &gradient_itr);
00661
00662 if (gradient_entry){
00663 while (gradient_entry){
00664
00665
00666
00667 if (gradient_entry->node_addr_ != msg->last_hop_){
00668 msg->next_hop_ = gradient_entry->node_addr_;
00669
00670
00671 if (!forwarding_history->alreadyForwardedToNetwork(msg->next_hop_)){
00672 DiffPrint(DEBUG_NO_DETAILS,
00673 "Node%d: Forwarding data using Reinforced Gradient to node %d !\n",
00674 ((DiffusionRouting *)dr_)->getNodeId(), gradient_entry->node_addr_);
00675
00676 ((DiffusionRouting *)dr_)->sendMessage(msg, filter_handle_);
00677
00678
00679 forwarding_history->forwardingToNetwork(msg->next_hop_);
00680 }
00681 }
00682
00683
00684 gradient_itr++;
00685 gradient_entry = findReinforcedGradients(&routing_entry->gradients_,
00686 gradient_itr, &gradient_itr);
00687 }
00688 }
00689 else{
00690
00691
00692 if ((!has_sink) && (msg->last_hop_ != LOCALHOST_ADDR)){
00693 negative_reinforcement_msg = new Message(DIFFUSION_VERSION,
00694 NEGATIVE_REINFORCEMENT,
00695 0, 0,
00696 routing_entry->attrs_->size(),
00697 pkt_count_,
00698 random_id_,
00699 msg->last_hop_,
00700 LOCALHOST_ADDR);
00701 negative_reinforcement_msg->msg_attr_vec_ = CopyAttrs(routing_entry->attrs_);
00702
00703 DiffPrint(DEBUG_NO_DETAILS,
00704 "Sending Negative Reinforcement to node %d !\n",
00705 msg->last_hop_);
00706
00707 ((DiffusionRouting *)dr_)->sendMessage(negative_reinforcement_msg,
00708 filter_handle_);
00709
00710 pkt_count_++;
00711 delete negative_reinforcement_msg;
00712 }
00713 }
00714 }
00715
00716 void GradientFilter::sendPositiveReinforcement(NRAttrVec *reinf_attrs,
00717 int32_t data_rdm_id,
00718 int32_t data_pkt_num,
00719 int32_t destination)
00720 {
00721 ReinforcementBlob *reinforcement_blob;
00722 NRAttribute *reinforcement_attr;
00723 TimerCallback *reinforcement_timer;
00724 Message *pos_reinf_message;
00725 NRAttrVec *attrs;
00726
00727 reinforcement_blob = new ReinforcementBlob(data_rdm_id, data_pkt_num);
00728
00729 reinforcement_attr = ReinforcementAttr.make(NRAttribute::IS,
00730 (void *) reinforcement_blob,
00731 sizeof(ReinforcementBlob));
00732
00733 attrs = CopyAttrs(reinf_attrs);
00734 attrs->push_back(reinforcement_attr);
00735
00736 pos_reinf_message = new Message(DIFFUSION_VERSION, POSITIVE_REINFORCEMENT,
00737 0, 0, attrs->size(), pkt_count_,
00738 random_id_, destination, LOCALHOST_ADDR);
00739 pos_reinf_message->msg_attr_vec_ = CopyAttrs(attrs);
00740
00741 DiffPrint(DEBUG_NO_DETAILS, "Sending Positive Reinforcement to node %d !\n",
00742 destination);
00743
00744
00745 reinforcement_timer = new TppMessageSendTimer(this, pos_reinf_message);
00746
00747
00748 ((DiffusionRouting *)dr_)->addTimer(POS_REINFORCEMENT_SEND_DELAY +
00749 (int) ((POS_REINFORCEMENT_JITTER * (GetRand() * 1.0 / RAND_MAX) - (POS_REINFORCEMENT_JITTER / 2))),
00750 reinforcement_timer);
00751 pkt_count_++;
00752 ClearAttrs(attrs);
00753 delete attrs;
00754 delete reinforcement_blob;
00755 }
00756
00757 GradientEntry * GradientFilter::findReinforcedGradients(GradientList *gradients,
00758 GradientList::iterator start,
00759 GradientList::iterator *place)
00760 {
00761 GradientList::iterator gradient_itr;
00762 GradientEntry *gradient_entry;
00763
00764 for (gradient_itr = start; gradient_itr != gradients->end(); ++gradient_itr){
00765 gradient_entry = *gradient_itr;
00766 if (gradient_entry->reinforced_){
00767 *place = gradient_itr;
00768 return gradient_entry;
00769 }
00770 }
00771
00772 return NULL;
00773 }
00774
00775 GradientEntry * GradientFilter::findReinforcedGradient(int32_t node_addr,
00776 TppRoutingEntry *routing_entry)
00777 {
00778 GradientList::iterator gradient_itr;
00779 GradientEntry *gradient_entry;
00780
00781 gradient_itr = routing_entry->gradients_.begin();
00782 gradient_entry = findReinforcedGradients(&routing_entry->gradients_,
00783 gradient_itr, &gradient_itr);
00784
00785 if (gradient_entry){
00786 while(gradient_entry){
00787 if (gradient_entry->node_addr_ == node_addr)
00788 return gradient_entry;
00789
00790
00791 gradient_itr++;
00792 gradient_entry = findReinforcedGradients(&routing_entry->gradients_,
00793 gradient_itr, &gradient_itr);
00794 }
00795 }
00796
00797 return NULL;
00798 }
00799
00800 void GradientFilter::deleteGradient(TppRoutingEntry *routing_entry,
00801 GradientEntry *gradient_entry)
00802 {
00803 GradientList::iterator gradient_itr;
00804 GradientEntry *current_entry;
00805
00806 for (gradient_itr = routing_entry->gradients_.begin();
00807 gradient_itr != routing_entry->gradients_.end(); ++gradient_itr){
00808 current_entry = *gradient_itr;
00809 if (current_entry == gradient_entry){
00810 gradient_itr = routing_entry->gradients_.erase(gradient_itr);
00811 delete gradient_entry;
00812 return;
00813 }
00814 }
00815 DiffPrint(DEBUG_ALWAYS,
00816 "Error: deleteGradient could not find gradient to delete !\n");
00817 }
00818
00819 void GradientFilter::setReinforcementFlags(TppRoutingEntry *routing_entry,
00820 int32_t last_hop, int new_message)
00821 {
00822 DataNeighborList::iterator data_neighbor_itr;
00823 DataNeighborEntry *data_neighbor_entry;
00824
00825 for (data_neighbor_itr = routing_entry->data_neighbors_.begin();
00826 data_neighbor_itr != routing_entry->data_neighbors_.end();
00827 ++data_neighbor_itr){
00828 data_neighbor_entry = *data_neighbor_itr;
00829 if (data_neighbor_entry->neighbor_id_ == last_hop){
00830 if (data_neighbor_entry->data_flag_ > 0)
00831 return;
00832 data_neighbor_entry->data_flag_ = new_message;
00833 return;
00834 }
00835 }
00836
00837
00838 data_neighbor_entry = new DataNeighborEntry(last_hop, new_message);
00839
00840 routing_entry->data_neighbors_.push_back(data_neighbor_entry);
00841 }
00842
00843 void GradientFilter::sendInterest(NRAttrVec *attrs, TppRoutingEntry *routing_entry)
00844 {
00845 AgentList::iterator agent_itr;
00846 AgentEntry *agent_entry;
00847
00848 Message *msg = new Message(DIFFUSION_VERSION, INTEREST, 0, 0,
00849 attrs->size(), 0, 0, LOCALHOST_ADDR,
00850 LOCALHOST_ADDR);
00851
00852 msg->msg_attr_vec_ = CopyAttrs(attrs);
00853
00854 for (agent_itr = routing_entry->agents_.begin(); agent_itr != routing_entry->agents_.end(); ++agent_itr){
00855 agent_entry = *agent_itr;
00856
00857 msg->next_port_ = agent_entry->port_;
00858
00859 ((DiffusionRouting *)dr_)->sendMessage(msg, filter_handle_);
00860 }
00861
00862 delete msg;
00863 }
00864
00865 void GradientFilter::sendDisinterest(NRAttrVec *attrs,
00866 TppRoutingEntry *routing_entry)
00867 {
00868 NRAttrVec *new_attrs;
00869 NRSimpleAttribute<int> *nrclass = NULL;
00870
00871 new_attrs = CopyAttrs(attrs);
00872
00873 nrclass = NRClassAttr.find(new_attrs);
00874 if (!nrclass){
00875 DiffPrint(DEBUG_ALWAYS,
00876 "Error: sendDisinterest couldn't find the class attribute !\n");
00877 ClearAttrs(new_attrs);
00878 delete new_attrs;
00879 return;
00880 }
00881
00882
00883 nrclass->setVal(NRAttribute::DISINTEREST_CLASS);
00884
00885 sendInterest(new_attrs, routing_entry);
00886
00887 ClearAttrs(new_attrs);
00888 delete new_attrs;
00889 }
00890
00891 void GradientFilter::recv(Message *msg, handle h)
00892 {
00893 if (h != filter_handle_){
00894 DiffPrint(DEBUG_ALWAYS,
00895 "Error: received msg for handle %d, subscribed to handle %d !\n",
00896 h, filter_handle_);
00897 return;
00898 }
00899
00900 if (msg->new_message_ == 1)
00901 processNewMessage(msg);
00902 else
00903 processOldMessage(msg);
00904 }
00905
00906 void GradientFilter::processOldMessage(Message *msg)
00907 {
00908 TppRoutingEntry *routing_entry;
00909 RoutingTable::iterator routing_itr;
00910
00911 switch (msg->msg_type_){
00912
00913 case INTEREST:
00914
00915 DiffPrint(DEBUG_NO_DETAILS, "Node%d: Received Old Interest !\n", ((DiffusionRouting *)dr_)->getNodeId());
00916
00917 if (msg->last_hop_ == LOCALHOST_ADDR){
00918
00919 DiffPrint(DEBUG_ALWAYS, "Warning: Old Interest from local agent !\n");
00920 break;
00921 }
00922
00923
00924 routing_entry = findRoutingEntry(msg->msg_attr_vec_);
00925 if (routing_entry)
00926 updateGradient(routing_entry, msg->last_hop_, false);
00927
00928 break;
00929
00930 case DATA:
00931
00932 DiffPrint(DEBUG_NO_DETAILS, "Node%d: Received an old Data message !\n", ((DiffusionRouting *)dr_)->getNodeId());
00933
00934
00935 routing_itr = routing_list_.begin();
00936 routing_entry = matchRoutingEntry(msg->msg_attr_vec_, routing_itr,
00937 &routing_itr);
00938
00939 while (routing_entry){
00940 DiffPrint(DEBUG_NO_DETAILS,
00941 "Set flags to %d to OLD_MESSAGE !\n", msg->last_hop_);
00942
00943
00944 if (msg->last_hop_ != LOCALHOST_ADDR){
00945 setReinforcementFlags(routing_entry, msg->last_hop_, OLD_MESSAGE);
00946 }
00947
00948
00949 routing_itr++;
00950 routing_entry = matchRoutingEntry(msg->msg_attr_vec_, routing_itr,
00951 &routing_itr);
00952 }
00953
00954 break;
00955
00956 case PUSH_EXPLORATORY_DATA:
00957
00958
00959 DiffPrint(DEBUG_NO_DETAILS,
00960 "Received an old Push Exploratory Data. Loop detected !\n");
00961
00962 break;
00963
00964 case EXPLORATORY_DATA:
00965
00966
00967 DiffPrint(DEBUG_NO_DETAILS,
00968 "Received an old Exploratory Data. Loop detected !\n");
00969
00970 break;
00971
00972 case POSITIVE_REINFORCEMENT:
00973
00974 DiffPrint(DEBUG_IMPORTANT, "Received an old Positive Reinforcement !\n");
00975
00976 break;
00977
00978 case NEGATIVE_REINFORCEMENT:
00979
00980 DiffPrint(DEBUG_IMPORTANT, "Received an old Negative Reinforcement !\n");
00981
00982 DiffPrint(DEBUG_IMPORTANT, "pkt_num = %d, rdm_id = %d !\n",
00983 msg->pkt_num_, msg->rdm_id_);
00984
00985 break;
00986
00987 default:
00988
00989 break;
00990 }
00991 }
00992
00993 void GradientFilter::processNewMessage(Message *msg)
00994 {
00995 NRSimpleAttribute<void *> *reinforcement_attr = NULL;
00996 DataForwardingHistory *forwarding_history;
00997 NRSimpleAttribute<int> *nrclass = NULL;
00998 NRSimpleAttribute<int> *nrscope = NULL;
00999 ReinforcementBlob *reinforcement_blob;
01000 RoutingTable::iterator routing_itr;
01001 TppRoutingEntry *routing_entry;
01002 GradientList::iterator gradient_itr;
01003 GradientEntry *gradient_entry;
01004 NRAttrVec::iterator place;
01005 HashEntry *hash_entry;
01006 AttributeEntry *attribute_entry;
01007 Message *my_msg;
01008 TimerCallback *interest_timer, *subscription_timer;
01009 unsigned int key[2];
01010 bool new_data_type = false;
01011
01012 switch (msg->msg_type_){
01013
01014 case INTEREST:
01015
01016 DiffPrint(DEBUG_NO_DETAILS, "Received Interest !\n");
01017
01018 nrclass = NRClassAttr.find(msg->msg_attr_vec_);
01019 nrscope = NRScopeAttr.find(msg->msg_attr_vec_);
01020
01021 if (!nrclass || !nrscope){
01022 DiffPrint(DEBUG_ALWAYS,
01023 "Warning: Can't find CLASS/SCOPE attributes in the message !\n");
01024 return;
01025 }
01026
01027
01028 routing_entry = findRoutingEntry(msg->msg_attr_vec_);
01029
01030 if (!routing_entry){
01031
01032 routing_entry = new TppRoutingEntry;
01033 routing_entry->attrs_ = CopyAttrs(msg->msg_attr_vec_);
01034 routing_list_.push_back(routing_entry);
01035 new_data_type = true;
01036 }
01037
01038 if (msg->last_hop_ == LOCALHOST_ADDR){
01039
01040 updateAgent(routing_entry, msg->source_port_);
01041 }
01042 else{
01043
01044 updateGradient(routing_entry, msg->last_hop_, false);
01045 }
01046
01047 if ((nrclass->getVal() == NRAttribute::INTEREST_CLASS) &&
01048 (nrclass->getOp() == NRAttribute::IS)){
01049
01050
01051 if (nrscope->getVal() == NRAttribute::GLOBAL_SCOPE){
01052
01053 interest_timer = new TppInterestForwardTimer(this, CopyMessage(msg));
01054
01055 ((DiffusionRouting *)dr_)->addTimer(INTEREST_FORWARD_DELAY +
01056 (int) ((INTEREST_FORWARD_JITTER * (GetRand() * 1.0 / RAND_MAX) - (INTEREST_FORWARD_JITTER / 2))),
01057 interest_timer);
01058 }
01059 }
01060 else{
01061 if ((nrclass->getOp() != NRAttribute::IS) &&
01062 (nrscope->getVal() == NRAttribute::NODE_LOCAL_SCOPE) &&
01063 (new_data_type)){
01064
01065 subscription_timer = new TppSubscriptionExpirationTimer(this,
01066 CopyAttrs(msg->msg_attr_vec_));
01067
01068 ((DiffusionRouting *)dr_)->addTimer(SUBSCRIPTION_DELAY +
01069 (int) (SUBSCRIPTION_DELAY * (GetRand() * 1.0 / RAND_MAX)),
01070 subscription_timer);
01071 }
01072
01073
01074 break;
01075 }
01076
01077
01078 routing_itr = routing_list_.begin();
01079 routing_entry = matchRoutingEntry(msg->msg_attr_vec_, routing_itr,
01080 &routing_itr);
01081
01082 while (routing_entry){
01083
01084 attribute_entry = findMatchingSubscription(routing_entry,
01085 msg->msg_attr_vec_);
01086
01087
01088 if (attribute_entry){
01089 GetTime(&(attribute_entry->tv_));
01090 }
01091 else{
01092
01093
01094 attribute_entry = new AttributeEntry(CopyAttrs(msg->msg_attr_vec_));
01095 routing_entry->attr_list_.push_back(attribute_entry);
01096 sendInterest(attribute_entry->attrs_, routing_entry);
01097 }
01098
01099 routing_itr++;
01100 routing_entry = matchRoutingEntry(msg->msg_attr_vec_, routing_itr,
01101 &routing_itr);
01102 }
01103
01104 break;
01105
01106 case DATA:
01107
01108 DiffPrint(DEBUG_NO_DETAILS, "Node%d: Received Data !\n",((DiffusionRouting *)dr_)->getNodeId());
01109
01110
01111 forwarding_history = new DataForwardingHistory;
01112
01113
01114 routing_itr = routing_list_.begin();
01115 routing_entry = matchRoutingEntry(msg->msg_attr_vec_, routing_itr,
01116 &routing_itr);
01117
01118 while (routing_entry){
01119 forwardData(msg, routing_entry, forwarding_history);
01120 routing_itr++;
01121 routing_entry = matchRoutingEntry(msg->msg_attr_vec_, routing_itr,
01122 &routing_itr);
01123 }
01124
01125 delete forwarding_history;
01126
01127 break;
01128
01129 case EXPLORATORY_DATA:
01130
01131 DiffPrint(DEBUG_NO_DETAILS, "Received Exploratory Data !\n");
01132
01133
01134 forwarding_history = new DataForwardingHistory;
01135
01136
01137 routing_itr = routing_list_.begin();
01138 routing_entry = matchRoutingEntry(msg->msg_attr_vec_, routing_itr,
01139 &routing_itr);
01140
01141 while (routing_entry){
01142 forwardExploratoryData(msg, routing_entry, forwarding_history);
01143 routing_itr++;
01144 routing_entry = matchRoutingEntry(msg->msg_attr_vec_, routing_itr,
01145 &routing_itr);
01146 }
01147
01148
01149 delete forwarding_history;
01150
01151 break;
01152
01153 case PUSH_EXPLORATORY_DATA:
01154
01155 DiffPrint(DEBUG_NO_DETAILS, "Received Push Exploratory Data !\n");
01156
01157
01158 forwarding_history = new DataForwardingHistory;
01159
01160
01161 forwardPushExploratoryData(msg, forwarding_history);
01162
01163
01164 delete forwarding_history;
01165
01166 break;
01167
01168 case POSITIVE_REINFORCEMENT:
01169
01170 DiffPrint(DEBUG_NO_DETAILS, "Received a Positive Reinforcement !\n");
01171
01172
01173 place = msg->msg_attr_vec_->begin();
01174 reinforcement_attr = ReinforcementAttr.find_from(msg->msg_attr_vec_,
01175 place, &place);
01176 if (!reinforcement_attr){
01177 DiffPrint(DEBUG_ALWAYS,
01178 "Error: Received an invalid Positive Reinforcement message !\n");
01179 return;
01180 }
01181
01182
01183
01184 reinforcement_blob = (ReinforcementBlob *) reinforcement_attr->getVal();
01185
01186 key[0] = reinforcement_blob->pkt_num_;
01187 key[1] = reinforcement_blob->rdm_id_;
01188
01189 hash_entry = getHash(key[0], key[1]);
01190
01191
01192 msg->msg_attr_vec_->erase(place);
01193
01194
01195 routing_entry = findRoutingEntry(msg->msg_attr_vec_);
01196
01197 if (!routing_entry){
01198
01199
01200
01201
01202 nrclass = NRClassAttr.find(msg->msg_attr_vec_);
01203 nrscope = NRScopeAttr.find(msg->msg_attr_vec_);
01204
01205 if (!nrclass || !nrscope){
01206 DiffPrint(DEBUG_ALWAYS,
01207 "Warning: Can't find CLASS/SCOPE attributes in the message !\n");
01208 return;
01209 }
01210
01211
01212 routing_entry = new TppRoutingEntry;
01213 routing_entry->attrs_ = CopyAttrs(msg->msg_attr_vec_);
01214 routing_list_.push_back(routing_entry);
01215 }
01216
01217
01218 updateGradient(routing_entry, msg->last_hop_, true);
01219
01220
01221 msg->msg_attr_vec_->push_back(reinforcement_attr);
01222
01223
01224
01225
01226
01227
01228 if (hash_entry){
01229 msg->next_hop_ = hash_entry->last_hop_;
01230
01231 DiffPrint(DEBUG_NO_DETAILS,
01232 "Forwarding Positive Reinforcement to node %d !\n",
01233 hash_entry->last_hop_);
01234
01235 ((DiffusionRouting *)dr_)->sendMessage(msg, filter_handle_);
01236 }
01237
01238 break;
01239
01240 case NEGATIVE_REINFORCEMENT:
01241
01242 DiffPrint(DEBUG_NO_DETAILS, "Received a Negative Reinforcement !\n");
01243
01244 routing_entry = findRoutingEntry(msg->msg_attr_vec_);
01245
01246 if (routing_entry){
01247 gradient_entry = findReinforcedGradient(msg->last_hop_, routing_entry);
01248
01249 if (gradient_entry){
01250
01251 deleteGradient(routing_entry, gradient_entry);
01252
01253 gradient_entry = findReinforcedGradients(&routing_entry->gradients_,
01254 routing_entry->gradients_.begin(),
01255 &gradient_itr);
01256
01257
01258
01259 if (!gradient_entry){
01260 my_msg = new Message(DIFFUSION_VERSION, NEGATIVE_REINFORCEMENT,
01261 0, 0, routing_entry->attrs_->size(), pkt_count_,
01262 random_id_, BROADCAST_ADDR, LOCALHOST_ADDR);
01263 my_msg->msg_attr_vec_ = CopyAttrs(routing_entry->attrs_);
01264
01265 DiffPrint(DEBUG_NO_DETAILS,
01266 "Forwarding Negative Reinforcement to ALL !\n");
01267
01268 ((DiffusionRouting *)dr_)->sendMessage(my_msg, filter_handle_);
01269
01270 pkt_count_++;
01271 delete my_msg;
01272 }
01273 }
01274 }
01275
01276 break;
01277
01278 default:
01279
01280 break;
01281 }
01282 }
01283
01284 HashEntry * GradientFilter::getHash(unsigned int pkt_num,
01285 unsigned int rdm_id)
01286 {
01287 unsigned int key[2];
01288
01289 key[0] = pkt_num;
01290 key[1] = rdm_id;
01291
01292 Tcl_HashEntry *entryPtr = Tcl_FindHashEntry(&htable_, (char *)key);
01293
01294 if (entryPtr == NULL)
01295 return NULL;
01296
01297 return ((HashEntry *) Tcl_GetHashValue(entryPtr));
01298 }
01299
01300 void GradientFilter::putHash(HashEntry *new_hash_entry,
01301 unsigned int pkt_num,
01302 unsigned int rdm_id)
01303 {
01304 Tcl_HashEntry *tcl_hash_entry;
01305 HashEntry *hash_entry;
01306 HashList::iterator hash_itr;
01307 unsigned int key[2];
01308 int new_hash_key;
01309
01310 if (hash_list_.size() == HASH_TABLE_DATA_MAX_SIZE){
01311
01312
01313 for (int i = 0; ((i < HASH_TABLE_DATA_REMOVE_AT_ONCE)
01314 && (hash_list_.size() > 0)); i++){
01315 hash_itr = hash_list_.begin();
01316 tcl_hash_entry = *hash_itr;
01317 hash_entry = (HashEntry *) Tcl_GetHashValue(tcl_hash_entry);
01318 delete hash_entry;
01319 hash_list_.erase(hash_itr);
01320 Tcl_DeleteHashEntry(tcl_hash_entry);
01321 }
01322 }
01323
01324 key[0] = pkt_num;
01325 key[1] = rdm_id;
01326
01327 tcl_hash_entry = Tcl_CreateHashEntry(&htable_, (char *) key, &new_hash_key);
01328
01329 if (new_hash_key == 0){
01330 DiffPrint(DEBUG_IMPORTANT, "Key already exists in hash !\n");
01331 return;
01332 }
01333
01334 Tcl_SetHashValue(tcl_hash_entry, new_hash_entry);
01335 hash_list_.push_back(tcl_hash_entry);
01336 }
01337
01338 handle GradientFilter::setupFilter()
01339 {
01340 NRAttrVec attrs;
01341 handle h;
01342
01343
01344
01345
01346 attrs.push_back(NRClassAttr.make(NRAttribute::IS,
01347 NRAttribute::INTEREST_CLASS));
01348
01349 h = ((DiffusionRouting *)dr_)->addFilter(&attrs,
01350 GRADIENT_FILTER_PRIORITY, filter_callback_);
01351
01352 ClearAttrs(&attrs);
01353 return h;
01354 }
01355
01356 #ifndef NS_DIFFUSION
01357 void GradientFilter::run()
01358 {
01359
01360 while (1){
01361 sleep(1000);
01362 }
01363 }
01364 #endif // !NS_DIFFUSION
01365
01366 #ifdef NS_DIFFUSION
01367 GradientFilter::GradientFilter(const char *diffrtg)
01368 {
01369 DiffAppAgent *agent;
01370 #else
01371 GradientFilter::GradientFilter(int argc, char **argv)
01372 {
01373 #endif // NS_DIFFUSION
01374 struct timeval tv;
01375 TimerCallback *reinforcement_timer, *gradient_timer;
01376
01377 GetTime(&tv);
01378 SetSeed(&tv);
01379 pkt_count_ = GetRand();
01380 random_id_ = GetRand();
01381
01382
01383 #ifdef NS_DIFFUSION
01384 agent = (DiffAppAgent *)TclObject::lookup(diffrtg);
01385 dr_ = agent->dr();
01386 #else
01387 parseCommandLine(argc, argv);
01388 dr_ = NR::createNR(diffusion_port_);
01389 #endif // NS_DIFFUSION
01390
01391
01392 filter_callback_ = new GradientFilterReceive(this);
01393
01394
01395 Tcl_InitHashTable(&htable_, 2);
01396
01397
01398 filter_handle_ = setupFilter();
01399
01400
01401 DiffPrint(DEBUG_IMPORTANT,
01402 "Gradient filter subscribed to *, received handle %d\n",
01403 filter_handle_);
01404
01405
01406 gradient_timer = new TppGradientExpirationCheckTimer(this);
01407 ((DiffusionRouting *)dr_)->addTimer(GRADIENT_DELAY, gradient_timer);
01408
01409 reinforcement_timer = new TppReinforcementCheckTimer(this);
01410 ((DiffusionRouting *)dr_)->addTimer(REINFORCEMENT_DELAY, reinforcement_timer);
01411
01412 GetTime(&tv);
01413
01414 DiffPrint(DEBUG_ALWAYS, "Gradient filter initialized at time %ld:%ld!\n",
01415 tv.tv_sec, tv.tv_usec);
01416 }
01417
01418 #ifndef USE_SINGLE_ADDRESS_SPACE
01419 int main(int argc, char **argv)
01420 {
01421 GradientFilter *app;
01422
01423
01424 app = new GradientFilter(argc, argv);
01425 app->run();
01426
01427 return 0;
01428 }
01429 #endif // !USE_SINGLE_ADDRESS_SPACE