00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040
00041
00042
00043 #include "one_phase_pull.hh"
00044
00045 #ifdef NS_DIFFUSION
00046 static class OnePhasePullFilterClass : public TclClass {
00047 public:
00048 OnePhasePullFilterClass() : TclClass("Application/DiffApp/OnePhasePullFilter") {}
00049 TclObject* create(int argc, const char*const* argv) {
00050 if (argc == 5)
00051 return(new OnePhasePullFilter(argv[4]));
00052 else
00053 fprintf(stderr,
00054 "Insufficient number of args for creating OnePhasePullFilter");
00055 return (NULL);
00056 }
00057 } class_one_phase_pull_filter;
00058
00059 int OnePhasePullFilter::command(int argc, const char*const* argv) {
00060 if (argc == 3) {
00061 if (strcasecmp(argv[1], "debug") == 0) {
00062 global_debug_level = atoi(argv[2]);
00063 if (global_debug_level < 1 || global_debug_level > 10) {
00064 global_debug_level = DEBUG_DEFAULT;
00065 printf("Error: Debug level outside range(1-10) or missing !\n");
00066 }
00067 }
00068 }
00069 return DiffApp::command(argc, argv);
00070 }
00071
00072 #endif // NS_DIFFUSION
00073
00074 void OnePhasePullFilterReceive::recv(Message *msg, handle h)
00075 {
00076 filter_->recv(msg, h);
00077 }
00078
00079 int OppMessageSendTimer::expire()
00080 {
00081
00082 agent_->messageTimeout(msg_);
00083
00084
00085 delete this;
00086 return -1;
00087 }
00088
00089 int OppInterestForwardTimer::expire()
00090 {
00091
00092 agent_->interestTimeout(msg_);
00093
00094
00095 delete this;
00096 return -1;
00097 }
00098
00099 int OppSubscriptionExpirationTimer::expire()
00100 {
00101 int retval;
00102
00103 retval = agent_->subscriptionTimeout(attrs_);
00104
00105
00106 if (retval == -1)
00107 delete this;
00108
00109 return retval;
00110 }
00111
00112 int OppGradientExpirationCheckTimer::expire()
00113 {
00114
00115 agent_->gradientTimeout();
00116
00117
00118 return 0;
00119 }
00120
00121 int OppReinforcementCheckTimer::expire()
00122 {
00123
00124 agent_->reinforcementTimeout();
00125
00126
00127 return 0;
00128 }
00129
00130 RoundIdEntry * RoutingEntry::findRoundIdEntry(int32_t round_id)
00131 {
00132 RoundIdList::iterator round_id_itr;
00133 RoundIdEntry *round_id_entry;
00134
00135
00136 for (round_id_itr = round_ids_.begin();
00137 round_id_itr != round_ids_.end(); round_id_itr++){
00138 round_id_entry = *round_id_itr;
00139
00140
00141 if (round_id_entry->round_id_ == round_id)
00142 return round_id_entry;
00143 }
00144
00145
00146 return NULL;
00147 }
00148
00149 RoundIdEntry * RoutingEntry::addRoundIdEntry(int32_t round_id)
00150 {
00151 RoundIdEntry *round_id_entry;
00152
00153
00154 round_id_entry = new RoundIdEntry(round_id);
00155
00156
00157 round_ids_.push_back(round_id_entry);
00158
00159 return round_id_entry;
00160 }
00161
00162 void RoutingEntry::updateNeighborDataInfo(int32_t node_id,
00163 bool new_message)
00164 {
00165 DataNeighborList::iterator data_neighbor_itr;
00166 OPPDataNeighborEntry *data_neighbor_entry;
00167
00168 for (data_neighbor_itr = data_neighbors_.begin();
00169 data_neighbor_itr != data_neighbors_.end(); ++data_neighbor_itr){
00170 data_neighbor_entry = *data_neighbor_itr;
00171
00172
00173 if (data_neighbor_entry->node_id_ == node_id){
00174
00175
00176 data_neighbor_entry->messages_++;
00177
00178
00179 if (new_message){
00180 data_neighbor_entry->new_messages_ = new_message;
00181 return;
00182 }
00183 }
00184 }
00185
00186
00187 data_neighbor_entry = new OPPDataNeighborEntry(node_id);
00188 data_neighbor_entry->new_messages_ = new_message;
00189 data_neighbors_.push_back(data_neighbor_entry);
00190 }
00191
00192 void RoutingEntry::addGradient(int32_t last_hop,
00193 int32_t round_id, bool new_gradient)
00194 {
00195 RoundIdEntry *round_id_entry;
00196 OPPGradientEntry *gradient_entry;
00197
00198
00199 round_id_entry = findRoundIdEntry(round_id);
00200
00201
00202 if (!round_id_entry)
00203 round_id_entry = addRoundIdEntry(round_id);
00204
00205 if (new_gradient){
00206
00207 round_id_entry->gradients_.clear();
00208 }
00209 else{
00210
00211 gradient_entry = round_id_entry->findGradient(last_hop);
00212
00213 if (gradient_entry){
00214
00215 GetTime(&gradient_entry->tv_);
00216
00217 return;
00218 }
00219 }
00220
00221
00222 round_id_entry->addGradient(last_hop);
00223 }
00224
00225 void RoutingEntry::updateSink(u_int16_t sink_id, int32_t round_id)
00226 {
00227 RoundIdEntry *round_id_entry;
00228
00229
00230 round_id_entry = findRoundIdEntry(round_id);
00231
00232
00233 if (!round_id_entry)
00234 round_id_entry = addRoundIdEntry(round_id);
00235
00236
00237 round_id_entry->updateSink(sink_id);
00238 }
00239
00240 void RoutingEntry::deleteExpiredRoundIds()
00241 {
00242 RoundIdList::iterator round_id_itr;
00243 RoundIdEntry *round_id_entry;
00244 struct timeval tmv;
00245
00246 GetTime(&tmv);
00247
00248
00249 for (round_id_itr = round_ids_.begin();
00250 round_id_itr != round_ids_.end(); round_id_itr++){
00251 round_id_entry = *round_id_itr;
00252
00253 round_id_entry->deleteExpiredSinks();
00254 round_id_entry->deleteExpiredGradients();
00255
00256
00257 if (round_id_entry->gradients_.size() == 0 &&
00258 round_id_entry->sinks_.size() == 0){
00259
00260
00261 DiffPrint(DEBUG_NO_DETAILS, "Delete expired Round Id: %d\n",
00262 round_id_entry->round_id_);
00263
00264 round_id_itr = round_ids_.erase(round_id_itr);
00265 delete round_id_entry;
00266 }
00267 }
00268 }
00269
00270 void RoutingEntry::getSinksFromList(FlowIdList *msg_list,
00271 FlowIdList *sink_list)
00272 {
00273 RoundIdList::iterator round_id_itr;
00274 RoundIdEntry *round_id_entry;
00275 FlowIdList::iterator flow_id_itr;
00276
00277 for (round_id_itr = round_ids_.begin();
00278 round_id_itr != round_ids_.end(); round_id_itr++){
00279 round_id_entry = *round_id_itr;
00280
00281 flow_id_itr = find(msg_list->begin(),
00282 msg_list->end(), round_id_entry->round_id_);
00283 if (flow_id_itr != msg_list->end()){
00284
00285 if (round_id_entry->sinks_.size() > 0){
00286 sink_list->push_back(round_id_entry->round_id_);
00287 }
00288 }
00289 }
00290 }
00291
00292 void RoutingEntry::getFlowsFromList(FlowIdList *msg_list,
00293 FlowIdList *flow_list)
00294 {
00295 RoundIdList::iterator round_id_itr;
00296 RoundIdEntry *round_id_entry;
00297 FlowIdList::iterator flow_id_itr;
00298
00299 for (round_id_itr = round_ids_.begin();
00300 round_id_itr != round_ids_.end(); round_id_itr++){
00301 round_id_entry = *round_id_itr;
00302
00303 flow_id_itr = find(msg_list->begin(),
00304 msg_list->end(), round_id_entry->round_id_);
00305 if (flow_id_itr != msg_list->end()){
00306
00307 if (round_id_entry->sinks_.size() == 0){
00308
00309 flow_list->push_back(round_id_entry->round_id_);
00310 }
00311 }
00312 }
00313 }
00314
00315 int32_t RoutingEntry::getNeighborFromFlow(int32_t flow_id)
00316 {
00317 RoundIdList::iterator round_id_itr;
00318 RoundIdEntry *round_id_entry;
00319 OPPGradientEntry *gradient_entry;
00320
00321 for (round_id_itr = round_ids_.begin();
00322 round_id_itr != round_ids_.end(); round_id_itr++){
00323 round_id_entry = *round_id_itr;
00324
00325 if (round_id_entry->round_id_ == flow_id){
00326
00327
00328 if (round_id_entry->gradients_.size() > 0){
00329
00330
00331 gradient_entry = *round_id_entry->gradients_.begin();
00332 return gradient_entry->node_id_;
00333 }
00334
00335 DiffPrint(DEBUG_ALWAYS, "Cannot find 'reinforced neighbor !\n");
00336 break;
00337 }
00338 }
00339
00340
00341 return BROADCAST_ADDR;
00342 }
00343 void RoundIdEntry::deleteExpiredSinks()
00344 {
00345 SinkList::iterator sink_itr;
00346 SinkEntry *sink_entry;
00347 struct timeval tmv;
00348
00349 GetTime(&tmv);
00350
00351
00352 for (sink_itr = sinks_.begin(); sink_itr != sinks_.end(); sink_itr++){
00353 sink_entry = *sink_itr;
00354
00355
00356 if (tmv.tv_sec > (sink_entry->tv_.tv_sec + GRADIENT_TIMEOUT)){
00357
00358
00359 DiffPrint(DEBUG_NO_DETAILS,
00360 "Deleting Gradient to sink %d !\n", sink_entry->port_);
00361 sink_itr = sinks_.erase(sink_itr);
00362 delete sink_entry;
00363 }
00364 }
00365 }
00366
00367 void RoundIdEntry::deleteExpiredGradients()
00368 {
00369 GradientList::iterator gradient_itr;
00370 OPPGradientEntry *gradient_entry;
00371 struct timeval tmv;
00372
00373 GetTime(&tmv);
00374
00375
00376 for (gradient_itr = gradients_.begin();
00377 gradient_itr != gradients_.end(); gradient_itr++){
00378 gradient_entry = *gradient_itr;
00379
00380
00381 if (tmv.tv_sec > (gradient_entry->tv_.tv_sec + GRADIENT_TIMEOUT)){
00382
00383
00384 DiffPrint(DEBUG_NO_DETAILS,
00385 "Deleting gradient to node %d !\n",
00386 gradient_entry->node_id_);
00387 gradient_itr = gradients_.erase(gradient_itr);
00388 delete gradient_entry;
00389 }
00390 }
00391 }
00392
00393 void RoundIdEntry::updateSink(u_int16_t sink_id)
00394 {
00395 SinkList::iterator sink_itr;
00396 SinkEntry *sink_entry;
00397
00398
00399 for (sink_itr = sinks_.begin(); sink_itr != sinks_.end(); ++sink_itr){
00400 sink_entry = *sink_itr;
00401
00402 if (sink_entry->port_ == sink_id){
00403
00404 GetTime(&(sink_entry->tv_));
00405 return;
00406 }
00407 }
00408
00409
00410 sink_entry = new SinkEntry(sink_id);
00411 sinks_.push_back(sink_entry);
00412 }
00413
00414 OPPGradientEntry * RoundIdEntry::findGradient(int32_t node_id)
00415 {
00416 GradientList::iterator gradient_itr;
00417 OPPGradientEntry *gradient_entry;
00418
00419
00420 for (gradient_itr = gradients_.begin();
00421 gradient_itr != gradients_.end(); gradient_itr++){
00422 gradient_entry = *gradient_itr;
00423
00424
00425 if (gradient_entry->node_id_ == node_id)
00426 return gradient_entry;
00427 }
00428
00429
00430 return NULL;
00431 }
00432
00433 void RoundIdEntry::addGradient(int32_t node_id)
00434 {
00435 OPPGradientEntry *gradient_entry;
00436
00437
00438 gradient_entry = new OPPGradientEntry(node_id);
00439 gradients_.push_back(gradient_entry);
00440 }
00441
00442 void RoundIdEntry::deleteGradient(int32_t node_id)
00443 {
00444 GradientList::iterator gradient_itr;
00445 OPPGradientEntry *gradient_entry;
00446
00447
00448 for (gradient_itr = gradients_.begin();
00449 gradient_itr != gradients_.end(); gradient_itr++){
00450 gradient_entry = *gradient_itr;
00451
00452
00453 if (gradient_entry->node_id_ == node_id){
00454
00455 DiffPrint(DEBUG_NO_DETAILS, "Deleting gradient to node %d !\n",
00456 node_id);
00457
00458
00459 gradient_itr = gradients_.erase(gradient_itr);
00460 delete gradient_entry;
00461 return;
00462 }
00463 }
00464 }
00465
00466 void OnePhasePullFilter::interestTimeout(Message *msg)
00467 {
00468 DiffPrint(DEBUG_MORE_DETAILS, "Node%d: Interest Timeout !\n", ((DiffusionRouting *)dr_)->getNodeId());
00469
00470 msg->last_hop_ = LOCALHOST_ADDR;
00471 msg->next_hop_ = BROADCAST_ADDR;
00472
00473 ((DiffusionRouting *)dr_)->sendMessage(msg, filter_handle_);
00474 }
00475
00476 void OnePhasePullFilter::messageTimeout(Message *msg)
00477 {
00478 DiffPrint(DEBUG_MORE_DETAILS, "Node%d: Message Timeout !\n", ((DiffusionRouting *)dr_)->getNodeId());
00479
00480 ((DiffusionRouting *)dr_)->sendMessage(msg, filter_handle_);
00481 }
00482
00483 void OnePhasePullFilter::gradientTimeout()
00484 {
00485 RoutingTable::iterator routing_itr;
00486 RoutingEntry *routing_entry;
00487
00488 DiffPrint(DEBUG_MORE_DETAILS, "Node%d: Gradient Timeout !\n",((DiffusionRouting *)dr_)->getNodeId());
00489
00490 routing_itr = routing_list_.begin();
00491
00492
00493 for (routing_itr = routing_list_.begin();
00494 routing_itr != routing_list_.end(); routing_itr++){
00495 routing_entry = *routing_itr;
00496
00497
00498 routing_entry->deleteExpiredRoundIds();
00499
00500
00501 if (routing_entry->round_ids_.size() == 0){
00502
00503
00504 DiffPrint(DEBUG_DETAILS,
00505 "Nothing left for this data type, cleaning up !\n");
00506 routing_itr = routing_list_.erase(routing_itr);
00507 delete routing_entry;
00508 }
00509 }
00510 }
00511
00512 void OnePhasePullFilter::reinforcementTimeout()
00513 {
00514 DataNeighborList::iterator data_neighbor_itr;
00515 OPPDataNeighborEntry *data_neighbor_entry;
00516 RoutingTable::iterator routing_itr;
00517 RoutingEntry *routing_entry;
00518 Message *my_message;
00519
00520 DiffPrint(DEBUG_MORE_DETAILS, "Reinforcement Timeout !\n");
00521
00522 routing_itr = routing_list_.begin();
00523
00524 while (routing_itr != routing_list_.end()){
00525 routing_entry = *routing_itr;
00526
00527
00528 data_neighbor_itr = routing_entry->data_neighbors_.begin();
00529
00530 while (data_neighbor_itr != routing_entry->data_neighbors_.end()){
00531 data_neighbor_entry = *data_neighbor_itr;
00532
00533 if ((!data_neighbor_entry->new_messages_) &&
00534 (data_neighbor_entry->messages_ > 0)){
00535 my_message = new Message(DIFFUSION_VERSION, NEGATIVE_REINFORCEMENT,
00536 0, 0, routing_entry->attrs_->size(), pkt_count_,
00537 random_id_, data_neighbor_entry->node_id_,
00538 LOCALHOST_ADDR);
00539 my_message->msg_attr_vec_ = CopyAttrs(routing_entry->attrs_);
00540
00541 DiffPrint(DEBUG_NO_DETAILS,
00542 "Node%d: Sending Negative Reinforcement to node %d !\n",
00543 ((DiffusionRouting *)dr_)->getNodeId(), data_neighbor_entry->node_id_);
00544
00545 ((DiffusionRouting *)dr_)->sendMessage(my_message, filter_handle_);
00546
00547 pkt_count_++;
00548 delete my_message;
00549
00550
00551 data_neighbor_itr = routing_entry->data_neighbors_.erase(data_neighbor_itr);
00552 delete data_neighbor_entry;
00553 }
00554 else{
00555 data_neighbor_itr++;
00556 }
00557 }
00558
00559
00560 data_neighbor_itr = routing_entry->data_neighbors_.begin();
00561 while (data_neighbor_itr != routing_entry->data_neighbors_.end()){
00562 data_neighbor_entry = *data_neighbor_itr;
00563 if (data_neighbor_entry->messages_ > 0){
00564 data_neighbor_entry->messages_ = 0;
00565 data_neighbor_entry->new_messages_ = false;
00566 data_neighbor_itr++;
00567 }
00568 else{
00569
00570 data_neighbor_itr = routing_entry->data_neighbors_.erase(data_neighbor_itr);
00571 delete data_neighbor_entry;
00572 }
00573 }
00574
00575
00576 routing_itr++;
00577 }
00578 }
00579
00580 int OnePhasePullFilter::subscriptionTimeout(NRAttrVec *attrs)
00581 {
00582 SubscriptionList::iterator subscription_itr;
00583 SubscriptionEntry *subscription_entry;
00584 RoutingEntry *routing_entry;
00585 struct timeval tmv;
00586
00587 DiffPrint(DEBUG_MORE_DETAILS, "Subscription Timeout !\n");
00588
00589 GetTime(&tmv);
00590
00591
00592 routing_entry = findRoutingEntry(attrs);
00593
00594 if (routing_entry){
00595
00596
00597 subscription_itr = routing_entry->subscription_list_.begin();
00598
00599
00600 while (subscription_itr != routing_entry->subscription_list_.end()){
00601 subscription_entry = *subscription_itr;
00602
00603
00604 if (tmv.tv_sec > (subscription_entry->tv_.tv_sec + SUBSCRIPTION_TIMEOUT)){
00605
00606
00607 sendDisinterest(subscription_entry->attrs_, routing_entry);
00608 subscription_itr = routing_entry->subscription_list_.erase(subscription_itr);
00609 delete subscription_entry;
00610 }
00611 else{
00612 subscription_itr++;
00613 }
00614 }
00615 }
00616 else{
00617 DiffPrint(DEBUG_DETAILS, "Warning: Could't find subscription entry - maybe deleted by GradientTimeout ?\n");
00618
00619
00620 return -1;
00621 }
00622
00623
00624 return 0;
00625 }
00626
00627 void OnePhasePullFilter::deleteRoutingEntry(RoutingEntry *routing_entry)
00628 {
00629 RoutingTable::iterator routing_itr;
00630 RoutingEntry *current_entry;
00631
00632
00633 for (routing_itr = routing_list_.begin(); routing_itr != routing_list_.end(); ++routing_itr){
00634 current_entry = *routing_itr;
00635
00636
00637 if (current_entry == routing_entry){
00638 routing_itr = routing_list_.erase(routing_itr);
00639 delete routing_entry;
00640 return;
00641 }
00642 }
00643 DiffPrint(DEBUG_ALWAYS, "Error: Could not find entry to delete !\n");
00644 }
00645
00646 RoutingEntry * OnePhasePullFilter::matchRoutingEntry(NRAttrVec *attrs, RoutingTable::iterator start, RoutingTable::iterator *place)
00647 {
00648 RoutingTable::iterator routing_itr;
00649 RoutingEntry *routing_entry;
00650
00651 for (routing_itr = start; routing_itr != routing_list_.end(); ++routing_itr){
00652 routing_entry = *routing_itr;
00653 if (MatchAttrs(routing_entry->attrs_, attrs)){
00654 *place = routing_itr;
00655 return routing_entry;
00656 }
00657 }
00658 return NULL;
00659 }
00660
00661 RoutingEntry * OnePhasePullFilter::findRoutingEntry(NRAttrVec *attrs)
00662 {
00663 RoutingTable::iterator routing_itr;
00664 RoutingEntry *routing_entry;
00665
00666 for (routing_itr = routing_list_.begin(); routing_itr != routing_list_.end(); ++routing_itr){
00667 routing_entry = *routing_itr;
00668 if (PerfectMatch(routing_entry->attrs_, attrs))
00669 return routing_entry;
00670 }
00671 return NULL;
00672 }
00673
00674 SubscriptionEntry * OnePhasePullFilter::findMatchingSubscription(RoutingEntry *routing_entry,
00675 NRAttrVec *attrs)
00676 {
00677 SubscriptionList::iterator subscription_itr;
00678 SubscriptionEntry *subscription_entry;
00679
00680 for (subscription_itr = routing_entry->subscription_list_.begin(); subscription_itr != routing_entry->subscription_list_.end(); ++subscription_itr){
00681 subscription_entry = *subscription_itr;
00682 if (PerfectMatch(subscription_entry->attrs_, attrs))
00683 return subscription_entry;
00684 }
00685 return NULL;
00686 }
00687
00688 void OnePhasePullFilter::forwardData(Message *msg,
00689 RoutingEntry *routing_entry,
00690 DataForwardingHistory *forwarding_history)
00691 {
00692 NRSimpleAttribute<void *> *nr_data_attr = NULL;
00693 NRAttrVec::iterator attribute_iterator;
00694 FlowIdList msg_flow_list, sinks_flow_list, local_flow_list;
00695 FlowIdList out_flow_list;
00696 int32_t out_neighbor;
00697 int *packed_flows;
00698 FlowIdList::iterator flow_id_itr;
00699 RoundIdList::iterator round_id_itr;
00700 SinkList::iterator sink_itr;
00701 RoundIdEntry *round_id_entry;
00702 SinkEntry *sink_entry;
00703 Message *sink_message, *out_message;
00704
00705
00706
00707
00708 attribute_iterator = msg->msg_attr_vec_->begin();
00709 nr_data_attr = NRFlowAttr.find_from(msg->msg_attr_vec_,
00710 attribute_iterator,
00711 &attribute_iterator);
00712
00713 if (!nr_data_attr){
00714 DiffPrint(DEBUG_ALWAYS, "Cannot find NRFlowAttr !\n");
00715 return;
00716 }
00717
00718 msg->msg_attr_vec_->erase(attribute_iterator);
00719
00720
00721 readFlowsFromList(nr_data_attr->getLen() / sizeof(int),
00722 &msg_flow_list, nr_data_attr->getVal());
00723
00724
00725 routing_entry->getSinksFromList(&msg_flow_list, &sinks_flow_list);
00726 routing_entry->getFlowsFromList(&msg_flow_list, &local_flow_list);
00727
00728
00729 if (sinks_flow_list.size() > 0){
00730
00731
00732 sink_message = CopyMessage(msg);
00733
00734
00735 for (round_id_itr = routing_entry->round_ids_.begin();
00736 round_id_itr != routing_entry->round_ids_.end();
00737 round_id_itr++){
00738
00739 round_id_entry = *round_id_itr;
00740
00741 flow_id_itr = find(sinks_flow_list.begin(), sinks_flow_list.end(),
00742 round_id_entry->round_id_);
00743
00744 if (flow_id_itr != sinks_flow_list.end()){
00745
00746
00747 for (sink_itr = round_id_entry->sinks_.begin();
00748 sink_itr != round_id_entry->sinks_.end(); ++sink_itr){
00749 sink_entry = *sink_itr;
00750
00751 if (!forwarding_history->alreadyForwardedToLibrary(sink_entry->port_)){
00752
00753 sink_message->next_hop_ = LOCALHOST_ADDR;
00754 sink_message->next_port_ = sink_entry->port_;
00755
00756
00757 forwarding_history->forwardingToLibrary(sink_entry->port_);
00758
00759 ((DiffusionRouting *)dr_)->sendMessage(sink_message, filter_handle_);
00760 }
00761 }
00762
00763
00764 if (!removeFlowFromList(&msg_flow_list, round_id_entry->round_id_)){
00765
00766 DiffPrint(DEBUG_ALWAYS, "Cannot remove flow from msg_flow_list !\n");
00767 }
00768 }
00769 }
00770
00771
00772 delete sink_message;
00773 }
00774
00775
00776 DiffPrint(DEBUG_NO_DETAILS, "Node%d: Forwarding Data\n", ((DiffusionRouting *)dr_)->getNodeId());
00777
00778
00779 if (msg->last_hop_ != LOCALHOST_ADDR)
00780 routing_entry->updateNeighborDataInfo(msg->last_hop_, true);
00781
00782
00783 while (local_flow_list.size() > 0){
00784
00785
00786 out_flow_list.clear();
00787
00788
00789 out_flow_list.push_back(*(local_flow_list.begin()));
00790 local_flow_list.erase(local_flow_list.begin());
00791
00792
00793 if (!removeFlowFromList(&msg_flow_list, *(out_flow_list.begin()))){
00794
00795 DiffPrint(DEBUG_ALWAYS, "Cannot remove flow from msg_flow_list !\n");
00796 }
00797
00798
00799 out_neighbor = routing_entry->getNeighborFromFlow(*(out_flow_list.begin()));
00800
00801
00802 if (out_neighbor == BROADCAST_ADDR)
00803 continue;
00804
00805
00806 for (flow_id_itr = local_flow_list.begin();
00807 flow_id_itr != local_flow_list.end(); flow_id_itr++){
00808
00809
00810 if (routing_entry->getNeighborFromFlow(*flow_id_itr) == out_neighbor){
00811
00812
00813
00814
00815 if (!removeFlowFromList(&msg_flow_list, *flow_id_itr)){
00816
00817 DiffPrint(DEBUG_ALWAYS, "Cannot remove flow from msg_flow_list !\n");
00818 }
00819
00820
00821 out_flow_list.push_back(*flow_id_itr);
00822 flow_id_itr = local_flow_list.erase(flow_id_itr);
00823
00824 }
00825 }
00826
00827
00828 out_message = CopyMessage(msg);
00829 out_message->next_hop_ = out_neighbor;
00830
00831 packed_flows = writeFlowsToList(&out_flow_list);
00832
00833 out_message->msg_attr_vec_->push_back(NRFlowAttr.make(NRAttribute::IS,
00834 (void *) packed_flows,
00835 sizeof(int) * out_flow_list.size()));
00836
00837
00838 delete [] packed_flows;
00839
00840
00841 DiffPrint(DEBUG_NO_DETAILS, "Forwarding data to node %d !\n",
00842 out_neighbor);
00843
00844 ((DiffusionRouting *)dr_)->sendMessage(out_message, filter_handle_);
00845
00846
00847 delete out_message;
00848 }
00849
00850
00851
00852 packed_flows = writeFlowsToList(&msg_flow_list);
00853
00854 nr_data_attr->setVal((void *) packed_flows, sizeof(int) * msg_flow_list.size());
00855 msg->msg_attr_vec_->push_back(nr_data_attr);
00856
00857
00858 delete [] packed_flows;
00859 }
00860
00861 void OnePhasePullFilter::sendInterest(NRAttrVec *attrs,
00862 RoutingEntry *routing_entry)
00863 {
00864 RoundIdList::iterator round_id_itr;
00865 RoundIdEntry *round_id_entry;
00866 SinkList::iterator sink_itr;
00867 SinkEntry *sink_entry;
00868
00869 Message *msg = new Message(DIFFUSION_VERSION, INTEREST, 0, 0,
00870 attrs->size(), 0, 0, LOCALHOST_ADDR,
00871 LOCALHOST_ADDR);
00872
00873 msg->msg_attr_vec_ = CopyAttrs(attrs);
00874
00875
00876 for (round_id_itr = routing_entry->round_ids_.begin();
00877 round_id_itr != routing_entry->round_ids_.end(); round_id_itr++){
00878 round_id_entry = *round_id_itr;
00879
00880
00881 for (sink_itr = round_id_entry->sinks_.begin();
00882 sink_itr != round_id_entry->sinks_.end(); ++sink_itr){
00883
00884 sink_entry = *sink_itr;
00885 msg->next_port_ = sink_entry->port_;
00886
00887 ((DiffusionRouting *)dr_)->sendMessage(msg, filter_handle_);
00888 }
00889 }
00890
00891 delete msg;
00892 }
00893
00894 void OnePhasePullFilter::sendDisinterest(NRAttrVec *attrs,
00895 RoutingEntry *routing_entry)
00896 {
00897 NRAttrVec *new_attrs;
00898 NRSimpleAttribute<int> *nrclass = NULL;
00899
00900 new_attrs = CopyAttrs(attrs);
00901
00902 nrclass = NRClassAttr.find(new_attrs);
00903 if (!nrclass){
00904 DiffPrint(DEBUG_ALWAYS,
00905 "Error: sendDisinterest couldn't find the class attribute !\n");
00906 ClearAttrs(new_attrs);
00907 delete new_attrs;
00908 return;
00909 }
00910
00911
00912 nrclass->setVal(NRAttribute::DISINTEREST_CLASS);
00913
00914 sendInterest(new_attrs, routing_entry);
00915
00916 ClearAttrs(new_attrs);
00917 delete new_attrs;
00918 }
00919
00920 void OnePhasePullFilter::readFlowsFromList(int number_of_flows,
00921 FlowIdList *flow_list,
00922 void *source_blob)
00923 {
00924 int *current_flow;
00925
00926
00927 current_flow = (int *) source_blob;
00928
00929 for (int i = 0; i < number_of_flows; i++){
00930 flow_list->push_back(*current_flow);
00931
00932
00933 current_flow++;
00934 }
00935 }
00936
00937 int * OnePhasePullFilter::writeFlowsToList(FlowIdList *flow_list)
00938 {
00939 FlowIdList::iterator flow_itr;
00940 int number_of_flows;
00941 int *flows, *current;;
00942
00943 number_of_flows = flow_list->size();
00944 flows = new int[number_of_flows];
00945 current = flows;
00946
00947 for (flow_itr = flow_list->begin();
00948 flow_itr != flow_list->end(); flow_itr++){
00949 *current = *flow_itr;
00950 current++;
00951 }
00952
00953 return flows;
00954 }
00955
00956 bool OnePhasePullFilter::removeFlowFromList(FlowIdList *flow_list,
00957 int32_t flow)
00958 {
00959 FlowIdList::iterator flow_itr;
00960
00961 flow_itr = find(flow_list->begin(), flow_list->end(), flow);
00962
00963 if (flow_itr != flow_list->end()){
00964 flow_itr = flow_list->erase(flow_itr);
00965 return true;
00966 }
00967
00968 return false;
00969 }
00970
00971 void OnePhasePullFilter::addLocalFlowsToMessage(Message *msg)
00972 {
00973 RoutingTable::iterator routing_itr;
00974 RoundIdList::iterator round_id_itr;
00975 RoutingEntry *routing_entry;
00976 RoundIdEntry *round_id_entry;
00977 FlowIdList local_flows;
00978 int *packed_flows;
00979
00980
00981 for (routing_itr = routing_list_.begin();
00982 routing_itr != routing_list_.end(); routing_itr++){
00983 routing_entry = *routing_itr;
00984
00985
00986 for (round_id_itr = routing_entry->round_ids_.begin();
00987 round_id_itr != routing_entry->round_ids_.end();
00988 round_id_itr++){
00989 round_id_entry = *round_id_itr;
00990
00991 local_flows.push_back(round_id_entry->round_id_);
00992 }
00993 }
00994
00995 packed_flows = writeFlowsToList(&local_flows);
00996
00997 msg->msg_attr_vec_->push_back(NRFlowAttr.make(NRAttribute::IS,
00998 (void *) packed_flows,
00999 sizeof(int) * local_flows.size()));
01000
01001
01002 delete [] packed_flows;
01003 local_flows.clear();
01004 }
01005
01006 void OnePhasePullFilter::recv(Message *msg, handle h)
01007 {
01008 if (h != filter_handle_){
01009 DiffPrint(DEBUG_ALWAYS,
01010 "Error: received msg for handle %d, subscribed to handle %d !\n",
01011 h, filter_handle_);
01012 return;
01013 }
01014
01015 if (msg->new_message_ == 1)
01016 processNewMessage(msg);
01017 else
01018 processOldMessage(msg);
01019 }
01020
01021 void OnePhasePullFilter::processOldMessage(Message *msg)
01022 {
01023 NRSimpleAttribute<int> *nrsubscription = NULL;
01024 NRAttrVec::iterator attribute_iterator;
01025 RoutingTable::iterator routing_itr;
01026 RoutingEntry *routing_entry;
01027 int32_t round_id;
01028
01029 switch (msg->msg_type_){
01030
01031 case INTEREST:
01032
01033 DiffPrint(DEBUG_NO_DETAILS, "Node%d: Received Old Interest !\n",((DiffusionRouting *)dr_)->getNodeId());
01034
01035 if (msg->last_hop_ == LOCALHOST_ADDR){
01036
01037 DiffPrint(DEBUG_ALWAYS, "Warning: Old Interest from local sink !\n");
01038 break;
01039 }
01040
01041
01042 attribute_iterator = msg->msg_attr_vec_->begin();
01043 nrsubscription = NRSubscriptionAttr.find_from(msg->msg_attr_vec_,
01044 attribute_iterator,
01045 &attribute_iterator);
01046
01047
01048 if (!nrsubscription){
01049 DiffPrint(DEBUG_ALWAYS,
01050 "Warning: Can't find SUBSCRIPTION attribute in the message !\n");
01051 return;
01052 }
01053
01054
01055 msg->msg_attr_vec_->erase(attribute_iterator);
01056
01057
01058 routing_entry = findRoutingEntry(msg->msg_attr_vec_);
01059
01060 if (routing_entry){
01061
01062
01063 round_id = nrsubscription->getVal();
01064
01065
01066 routing_entry->addGradient(msg->last_hop_, round_id, false);
01067 }
01068
01069
01070 msg->msg_attr_vec_->push_back(nrsubscription);
01071
01072 break;
01073
01074 case EXPLORATORY_DATA:
01075 case PUSH_EXPLORATORY_DATA:
01076
01077 DiffPrint(DEBUG_ALWAYS, "Received and OLD EXPLORATORY message !\n");
01078
01079 break;
01080
01081 case DATA:
01082
01083 DiffPrint(DEBUG_NO_DETAILS, "Received an old Data message !\n");
01084
01085
01086 routing_itr = routing_list_.begin();
01087 routing_entry = matchRoutingEntry(msg->msg_attr_vec_, routing_itr,
01088 &routing_itr);
01089
01090 while (routing_entry){
01091 DiffPrint(DEBUG_NO_DETAILS,
01092 "Set flags to %d to OLD_MESSAGE !\n", msg->last_hop_);
01093
01094
01095 if (msg->last_hop_ != LOCALHOST_ADDR)
01096 routing_entry->updateNeighborDataInfo(msg->last_hop_, false);
01097
01098
01099 routing_itr++;
01100 routing_entry = matchRoutingEntry(msg->msg_attr_vec_, routing_itr,
01101 &routing_itr);
01102 }
01103
01104 break;
01105
01106 case NEGATIVE_REINFORCEMENT:
01107
01108 DiffPrint(DEBUG_IMPORTANT, "Received an old Negative Reinforcement !\n");
01109
01110 break;
01111
01112 default:
01113
01114 DiffPrint(DEBUG_ALWAYS,
01115 "Received an unknown message type: %d\n", msg->msg_type_);
01116
01117 break;
01118 }
01119 }
01120
01121 void OnePhasePullFilter::processNewMessage(Message *msg)
01122 {
01123 DataForwardingHistory *forwarding_history;
01124 NRSimpleAttribute<int> *nrclass = NULL;
01125 NRSimpleAttribute<int> *nrscope = NULL;
01126 NRSimpleAttribute<int> *nrsubscription = NULL;
01127 RoundIdList::iterator round_id_itr;
01128 RoutingTable::iterator routing_itr;
01129 NRAttrVec::iterator attribute_iterator;
01130 RoundIdEntry *round_id_entry;
01131 RoutingEntry *routing_entry;
01132 SubscriptionEntry *subscription_entry;
01133 Message *my_msg;
01134 TimerCallback *interest_timer, *subscription_timer;
01135 bool new_data_type = false;
01136 int32_t round_id;
01137
01138 switch (msg->msg_type_){
01139
01140 case INTEREST:
01141
01142 DiffPrint(DEBUG_NO_DETAILS, "Received Interest !\n");
01143
01144 nrclass = NRClassAttr.find(msg->msg_attr_vec_);
01145 nrscope = NRScopeAttr.find(msg->msg_attr_vec_);
01146
01147 if (!nrclass || !nrscope){
01148 DiffPrint(DEBUG_ALWAYS,
01149 "Warning: Can't find CLASS/SCOPE attributes in the message !\n");
01150 return;
01151 }
01152
01153
01154 attribute_iterator = msg->msg_attr_vec_->begin();
01155 nrsubscription = NRSubscriptionAttr.find_from(msg->msg_attr_vec_,
01156 attribute_iterator,
01157 &attribute_iterator);
01158
01159
01160 if (!nrsubscription){
01161 DiffPrint(DEBUG_ALWAYS,
01162 "Warning: Can't find SUBSCRIPTION attribute in the message !\n");
01163 return;
01164 }
01165
01166
01167 msg->msg_attr_vec_->erase(attribute_iterator);
01168
01169
01170 routing_entry = findRoutingEntry(msg->msg_attr_vec_);
01171
01172 if (!routing_entry){
01173
01174 routing_entry = new RoutingEntry;
01175 routing_entry->attrs_ = CopyAttrs(msg->msg_attr_vec_);
01176 routing_list_.push_back(routing_entry);
01177 new_data_type = true;
01178 }
01179
01180
01181 msg->msg_attr_vec_->push_back(nrsubscription);
01182
01183
01184 round_id = nrsubscription->getVal();
01185
01186 if (msg->last_hop_ == LOCALHOST_ADDR){
01187
01188 routing_entry->updateSink(msg->source_port_, round_id);
01189 }
01190 else{
01191
01192
01193
01194
01195 routing_entry->addGradient(msg->last_hop_, round_id, true);
01196 }
01197
01198 if ((nrclass->getVal() == NRAttribute::INTEREST_CLASS) &&
01199 (nrclass->getOp() == NRAttribute::IS)){
01200
01201
01202 if (nrscope->getVal() == NRAttribute::GLOBAL_SCOPE){
01203
01204 interest_timer = new OppInterestForwardTimer(this, CopyMessage(msg));
01205
01206 ((DiffusionRouting *)dr_)->addTimer(INTEREST_FORWARD_DELAY +
01207 (int) ((INTEREST_FORWARD_JITTER * (GetRand() * 1.0 / RAND_MAX) - (INTEREST_FORWARD_JITTER / 2))),
01208 interest_timer);
01209 }
01210 }
01211 else{
01212 if ((nrclass->getOp() != NRAttribute::IS) &&
01213 (nrscope->getVal() == NRAttribute::NODE_LOCAL_SCOPE) &&
01214 (new_data_type)){
01215
01216 subscription_timer = new OppSubscriptionExpirationTimer(this,
01217 CopyAttrs(msg->msg_attr_vec_));
01218
01219 ((DiffusionRouting *)dr_)->addTimer(SUBSCRIPTION_DELAY +
01220 (int) (SUBSCRIPTION_DELAY * (GetRand() * 1.0 / RAND_MAX)),
01221 subscription_timer);
01222 }
01223
01224
01225 break;
01226 }
01227
01228
01229 routing_itr = routing_list_.begin();
01230 routing_entry = matchRoutingEntry(msg->msg_attr_vec_, routing_itr,
01231 &routing_itr);
01232
01233 while (routing_entry){
01234
01235 subscription_entry = findMatchingSubscription(routing_entry,
01236 msg->msg_attr_vec_);
01237
01238
01239 if (subscription_entry){
01240 GetTime(&(subscription_entry->tv_));
01241 }
01242 else{
01243
01244
01245 subscription_entry = new SubscriptionEntry(CopyAttrs(msg->msg_attr_vec_));
01246 routing_entry->subscription_list_.push_back(subscription_entry);
01247 sendInterest(subscription_entry->attrs_, routing_entry);
01248 }
01249
01250 routing_itr++;
01251 routing_entry = matchRoutingEntry(msg->msg_attr_vec_, routing_itr,
01252 &routing_itr);
01253 }
01254
01255 break;
01256
01257 case EXPLORATORY_DATA:
01258 case PUSH_EXPLORATORY_DATA:
01259
01260 DiffPrint(DEBUG_ALWAYS, "Node%d: Received EXPLORATORY Message !\n",((DiffusionRouting *)dr_)->getNodeId());
01261
01262 break;
01263
01264 case DATA:
01265
01266 DiffPrint(DEBUG_NO_DETAILS, "Node%d: Received Data !\n",((DiffusionRouting *)dr_)->getNodeId());
01267
01268
01269 forwarding_history = new DataForwardingHistory;
01270
01271
01272 if (msg->last_hop_ == LOCALHOST_ADDR){
01273
01274 addLocalFlowsToMessage(msg);
01275 }
01276
01277
01278 routing_itr = routing_list_.begin();
01279 routing_entry = matchRoutingEntry(msg->msg_attr_vec_, routing_itr,
01280 &routing_itr);
01281
01282 while (routing_entry){
01283 forwardData(msg, routing_entry, forwarding_history);
01284 routing_itr++;
01285 routing_entry = matchRoutingEntry(msg->msg_attr_vec_, routing_itr,
01286 &routing_itr);
01287 }
01288
01289 delete forwarding_history;
01290
01291 break;
01292
01293 case NEGATIVE_REINFORCEMENT:
01294
01295 DiffPrint(DEBUG_NO_DETAILS, "Received a Negative Reinforcement !\n");
01296
01297
01298 routing_entry = findRoutingEntry(msg->msg_attr_vec_);
01299
01300 if (routing_entry){
01301
01302
01303 for (round_id_itr = routing_entry->round_ids_.begin();
01304 round_id_itr != routing_entry->round_ids_.end();
01305 round_id_itr++){
01306
01307 round_id_entry = *round_id_itr;
01308
01309
01310 round_id_entry->deleteGradient(msg->last_hop_);
01311
01312
01313 if (round_id_entry->gradients_.size() == 0){
01314
01315 round_id_itr = routing_entry->round_ids_.erase(round_id_itr);
01316 delete round_id_entry;
01317 }
01318
01319 }
01320
01321
01322
01323 if (routing_entry->round_ids_.size() == 0){
01324 my_msg = new Message(DIFFUSION_VERSION, NEGATIVE_REINFORCEMENT,
01325 0, 0, routing_entry->attrs_->size(), pkt_count_,
01326 random_id_, BROADCAST_ADDR, LOCALHOST_ADDR);
01327 my_msg->msg_attr_vec_ = CopyAttrs(routing_entry->attrs_);
01328
01329 DiffPrint(DEBUG_NO_DETAILS,
01330 "Broadcasting Negative Reinforcement !\n");
01331
01332 ((DiffusionRouting *)dr_)->sendMessage(my_msg, filter_handle_);
01333
01334 pkt_count_++;
01335 delete my_msg;
01336 }
01337 }
01338
01339 break;
01340
01341 default:
01342
01343 break;
01344 }
01345 }
01346
01347 handle OnePhasePullFilter::setupFilter()
01348 {
01349 NRAttrVec attrs;
01350 handle h;
01351
01352
01353
01354 attrs.push_back(NRAlgorithmAttr.make(NRAttribute::EQ,
01355 NRAttribute::ONE_PHASE_PULL_ALGORITHM));
01356
01357 h = ((DiffusionRouting *)dr_)->addFilter(&attrs,
01358 ONE_PHASE_PULL_FILTER_PRIORITY,
01359 filter_callback_);
01360
01361 ClearAttrs(&attrs);
01362 return h;
01363 }
01364
01365 #ifndef NS_DIFFUSION
01366 void OnePhasePullFilter::run()
01367 {
01368
01369 while (1){
01370 sleep(1000);
01371 }
01372 }
01373 #endif // !NS_DIFFUSION
01374
01375 #ifdef NS_DIFFUSION
01376 OnePhasePullFilter::OnePhasePullFilter(const char *diffrtg)
01377 {
01378 DiffAppAgent *agent;
01379 #else
01380 OnePhasePullFilter::OnePhasePullFilter(int argc, char **argv)
01381 {
01382 #endif // NS_DIFFUSION
01383 struct timeval tv;
01384 TimerCallback *reinforcement_timer, *gradient_timer;
01385
01386 GetTime(&tv);
01387 SetSeed(&tv);
01388 pkt_count_ = GetRand();
01389 random_id_ = GetRand();
01390
01391
01392 #ifdef NS_DIFFUSION
01393 agent = (DiffAppAgent *)TclObject::lookup(diffrtg);
01394 dr_ = agent->dr();
01395 #else
01396 parseCommandLine(argc, argv);
01397 dr_ = NR::createNR(diffusion_port_);
01398 #endif // NS_DIFFUSION
01399
01400
01401 filter_callback_ = new OnePhasePullFilterReceive(this);
01402
01403
01404 filter_handle_ = setupFilter();
01405
01406
01407 DiffPrint(DEBUG_IMPORTANT, "One-Phase Pull filter received handle %d\n",
01408 filter_handle_);
01409
01410
01411 gradient_timer = new OppGradientExpirationCheckTimer(this);
01412 ((DiffusionRouting *)dr_)->addTimer(GRADIENT_DELAY, gradient_timer);
01413
01414 reinforcement_timer = new OppReinforcementCheckTimer(this);
01415 ((DiffusionRouting *)dr_)->addTimer(REINFORCEMENT_DELAY, reinforcement_timer);
01416
01417 GetTime(&tv);
01418
01419 DiffPrint(DEBUG_ALWAYS,
01420 "One-Phase Pull filter initialized at time %ld:%ld!\n",
01421 tv.tv_sec, tv.tv_usec);
01422 }
01423
01424 #ifndef USE_SINGLE_ADDRESS_SPACE
01425 int main(int argc, char **argv)
01426 {
01427 OnePhasePullFilter *app;
01428
01429
01430 app = new OnePhasePullFilter(argc, argv);
01431 app->run();
01432
01433 return 0;
01434 }
01435 #endif // !USE_SINGLE_ADDRESS_SPACE