one_phase_pull.cc

Go to the documentation of this file.
00001 //
00002 // one_phase_pull.cc    : One-Phase Pull Filter
00003 // author               : Fabio Silva
00004 //
00005 // Copyright (C) 2000-2003 by the University of Southern California
00006 // $Id: one_phase_pull.cc,v 1.6 2005/09/13 04:53:47 tomh Exp $
00007 //
00008 // This program is free software; you can redistribute it and/or
00009 // modify it under the terms of the GNU General Public License,
00010 // version 2, as published by the Free Software Foundation.
00011 //
00012 // This program is distributed in the hope that it will be useful,
00013 // but WITHOUT ANY WARRANTY; without even the implied warranty of
00014 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00015 // GNU General Public License for more details.
00016 //
00017 // You should have received a copy of the GNU General Public License along
00018 // with this program; if not, write to the Free Software Foundation, Inc.,
00019 // 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA.
00020 //
00021 // Linking this file statically or dynamically with other modules is making
00022 // a combined work based on this file.  Thus, the terms and conditions of
00023 // the GNU General Public License cover the whole combination.
00024 //
00025 // In addition, as a special exception, the copyright holders of this file
00026 // give you permission to combine this file with free software programs or
00027 // libraries that are released under the GNU LGPL and with code included in
00028 // the standard release of ns-2 under the Apache 2.0 license or under
00029 // otherwise-compatible licenses with advertising requirements (or modified
00030 // versions of such code, with unchanged license).  You may copy and
00031 // distribute such a system following the terms of the GNU GPL for this
00032 // file and the licenses of the other code concerned, provided that you
00033 // include the source code of that other code when and as the GNU GPL
00034 // requires distribution of source code.
00035 //
00036 // Note that people who make modified versions of this file are not
00037 // obligated to grant this special exception for their modified versions;
00038 // it is their choice whether to do so.  The GNU General Public License
00039 // gives permission to release a modified version without this exception;
00040 // this exception also makes it possible to release a modified version
00041 // which carries forward this exception.
00042 
00043 #include "one_phase_pull.hh"
00044 
00045 #ifdef NS_DIFFUSION
00046 static class OnePhasePullFilterClass : public TclClass {
00047 public:
00048   OnePhasePullFilterClass() : TclClass("Application/DiffApp/OnePhasePullFilter") {}
00049   TclObject* create(int argc, const char*const* argv) {
00050     if (argc == 5)
00051       return(new OnePhasePullFilter(argv[4]));
00052     else
00053       fprintf(stderr,
00054           "Insufficient number of args for creating OnePhasePullFilter");
00055     return (NULL);
00056   }
00057 } class_one_phase_pull_filter;
00058 
00059 int OnePhasePullFilter::command(int argc, const char*const* argv) {
00060   if (argc == 3) {
00061     if (strcasecmp(argv[1], "debug") == 0) {
00062       global_debug_level = atoi(argv[2]);
00063       if (global_debug_level < 1 || global_debug_level > 10) {
00064     global_debug_level = DEBUG_DEFAULT;
00065     printf("Error: Debug level outside range(1-10) or missing !\n");
00066       }
00067     }
00068   }
00069   return DiffApp::command(argc, argv);
00070 }
00071 
00072 #endif // NS_DIFFUSION
00073 
00074 void OnePhasePullFilterReceive::recv(Message *msg, handle h)
00075 {
00076   filter_->recv(msg, h);
00077 }
00078 
00079 int OppMessageSendTimer::expire()
00080 {
00081   // Call timeout function
00082   agent_->messageTimeout(msg_);
00083 
00084   // Do not reschedule this timer
00085   delete this;
00086   return -1;
00087 }
00088 
00089 int OppInterestForwardTimer::expire()
00090 {
00091   // Call timeout function
00092   agent_->interestTimeout(msg_);
00093 
00094   // Do not reschedule this timer
00095   delete this;
00096   return -1;
00097 }
00098 
00099 int OppSubscriptionExpirationTimer::expire()
00100 {
00101   int retval;
00102 
00103   retval = agent_->subscriptionTimeout(attrs_);
00104 
00105   // Delete timer if we are not rescheduling it
00106   if (retval == -1)
00107     delete this;
00108 
00109   return retval;
00110 }
00111 
00112 int OppGradientExpirationCheckTimer::expire()
00113 {
00114   // Call the callback function
00115   agent_->gradientTimeout();
00116 
00117   // Reschedule this timer
00118   return 0;
00119 }
00120 
00121 int OppReinforcementCheckTimer::expire()
00122 {
00123   // Call the callback function
00124   agent_->reinforcementTimeout();
00125 
00126   // Reschedule this timer
00127   return 0;
00128 }
00129 
00130 RoundIdEntry * RoutingEntry::findRoundIdEntry(int32_t round_id)
00131 {
00132   RoundIdList::iterator round_id_itr;
00133   RoundIdEntry *round_id_entry;
00134 
00135   // Iterate through round ids for this routing entry
00136   for (round_id_itr = round_ids_.begin();
00137        round_id_itr != round_ids_.end(); round_id_itr++){
00138     round_id_entry = *round_id_itr;
00139 
00140     // Check if round ids match
00141     if (round_id_entry->round_id_ == round_id)
00142       return round_id_entry;
00143   }
00144 
00145   // Couldn't find a matching round id entry
00146   return NULL;
00147 }
00148 
00149 RoundIdEntry * RoutingEntry::addRoundIdEntry(int32_t round_id)
00150 {
00151   RoundIdEntry *round_id_entry;
00152 
00153   // Create a new round id entry
00154   round_id_entry = new RoundIdEntry(round_id);
00155 
00156   // Add it to the round id list
00157   round_ids_.push_back(round_id_entry);
00158 
00159   return round_id_entry;
00160 }
00161 
00162 void RoutingEntry::updateNeighborDataInfo(int32_t node_id,
00163                       bool new_message)
00164 {
00165   DataNeighborList::iterator data_neighbor_itr;
00166   OPPDataNeighborEntry *data_neighbor_entry;
00167 
00168   for (data_neighbor_itr = data_neighbors_.begin();
00169        data_neighbor_itr != data_neighbors_.end(); ++data_neighbor_itr){
00170     data_neighbor_entry = *data_neighbor_itr;
00171 
00172     // Find neighbor
00173     if (data_neighbor_entry->node_id_ == node_id){
00174 
00175       // Increment message count
00176       data_neighbor_entry->messages_++;
00177 
00178       // If this is a new message, just set flag and return
00179       if (new_message){
00180     data_neighbor_entry->new_messages_ = new_message;
00181     return;
00182       }
00183     }
00184   }
00185 
00186   // We need to add a new data neighbor
00187   data_neighbor_entry = new OPPDataNeighborEntry(node_id);
00188   data_neighbor_entry->new_messages_ = new_message;
00189   data_neighbors_.push_back(data_neighbor_entry);
00190 }
00191 
00192 void RoutingEntry::addGradient(int32_t last_hop,
00193                    int32_t round_id, bool new_gradient)
00194 {
00195   RoundIdEntry *round_id_entry;
00196   OPPGradientEntry *gradient_entry;
00197 
00198   // Look for an existing routing id entry
00199   round_id_entry = findRoundIdEntry(round_id);
00200 
00201   // Create new entry if not found
00202   if (!round_id_entry)
00203     round_id_entry = addRoundIdEntry(round_id);
00204 
00205   if (new_gradient){
00206     // Marks the beginning of a new round
00207     round_id_entry->gradients_.clear();
00208   }
00209   else{
00210     // Look for a gradient to our last_hop neighbor
00211     gradient_entry = round_id_entry->findGradient(last_hop);
00212 
00213     if (gradient_entry){
00214       // Gradient already in the list, we just update time
00215       GetTime(&gradient_entry->tv_);
00216 
00217       return;
00218     }
00219   }
00220 
00221   // Gradient not yet in the list, add this neighbor to the list
00222   round_id_entry->addGradient(last_hop);
00223 }
00224 
00225 void RoutingEntry::updateSink(u_int16_t sink_id, int32_t round_id)
00226 {
00227   RoundIdEntry *round_id_entry;
00228 
00229   // Lock for an existing round id entry
00230   round_id_entry = findRoundIdEntry(round_id);
00231 
00232   // Create new entry if not found
00233   if (!round_id_entry)
00234     round_id_entry = addRoundIdEntry(round_id);
00235 
00236   // Add/Update this sink
00237   round_id_entry->updateSink(sink_id);
00238 }
00239 
00240 void RoutingEntry::deleteExpiredRoundIds()
00241 {
00242   RoundIdList::iterator round_id_itr;
00243   RoundIdEntry *round_id_entry;
00244   struct timeval tmv;
00245 
00246   GetTime(&tmv);
00247 
00248   // Go through all round ids
00249   for (round_id_itr = round_ids_.begin();
00250        round_id_itr != round_ids_.end(); round_id_itr++){
00251     round_id_entry = *round_id_itr;
00252 
00253     round_id_entry->deleteExpiredSinks();
00254     round_id_entry->deleteExpiredGradients();
00255 
00256     // Delete round id if nothing left
00257     if (round_id_entry->gradients_.size() == 0 &&
00258     round_id_entry->sinks_.size() == 0){
00259 
00260       // Round Id has expired, delete it from the list
00261       DiffPrint(DEBUG_NO_DETAILS, "Delete expired Round Id: %d\n",
00262         round_id_entry->round_id_);
00263 
00264       round_id_itr = round_ids_.erase(round_id_itr);
00265       delete round_id_entry;
00266     }
00267   }
00268 }
00269 
00270 void RoutingEntry::getSinksFromList(FlowIdList *msg_list,
00271                     FlowIdList *sink_list)
00272 {
00273   RoundIdList::iterator round_id_itr;
00274   RoundIdEntry *round_id_entry;
00275   FlowIdList::iterator flow_id_itr;
00276 
00277   for (round_id_itr = round_ids_.begin();
00278        round_id_itr != round_ids_.end(); round_id_itr++){
00279     round_id_entry = *round_id_itr;
00280 
00281     flow_id_itr = find(msg_list->begin(),
00282                msg_list->end(), round_id_entry->round_id_);
00283     if (flow_id_itr != msg_list->end()){
00284       // Flow id in the list
00285       if (round_id_entry->sinks_.size() > 0){
00286     sink_list->push_back(round_id_entry->round_id_);
00287       }
00288     }
00289   }
00290 }
00291 
00292 void RoutingEntry::getFlowsFromList(FlowIdList *msg_list,
00293                     FlowIdList *flow_list)
00294 {
00295   RoundIdList::iterator round_id_itr;
00296   RoundIdEntry *round_id_entry;
00297   FlowIdList::iterator flow_id_itr;
00298 
00299   for (round_id_itr = round_ids_.begin();
00300        round_id_itr != round_ids_.end(); round_id_itr++){
00301     round_id_entry = *round_id_itr;
00302 
00303     flow_id_itr = find(msg_list->begin(),
00304                msg_list->end(), round_id_entry->round_id_);
00305     if (flow_id_itr != msg_list->end()){
00306       // Flow id in the list
00307       if (round_id_entry->sinks_.size() == 0){
00308     // This is a flow we have no local sink for
00309     flow_list->push_back(round_id_entry->round_id_);
00310       }
00311     }
00312   }
00313 }
00314 
00315 int32_t RoutingEntry::getNeighborFromFlow(int32_t flow_id)
00316 {
00317   RoundIdList::iterator round_id_itr;
00318   RoundIdEntry *round_id_entry;
00319   OPPGradientEntry *gradient_entry;
00320 
00321   for (round_id_itr = round_ids_.begin();
00322        round_id_itr != round_ids_.end(); round_id_itr++){
00323     round_id_entry = *round_id_itr;
00324 
00325     if (round_id_entry->round_id_ == flow_id){
00326       // Flow matches, get 'reinforced neighbor'
00327 
00328       if (round_id_entry->gradients_.size() > 0){
00329     // Get the first gradient
00330 
00331     gradient_entry = *round_id_entry->gradients_.begin();
00332     return gradient_entry->node_id_;
00333       }
00334 
00335       DiffPrint(DEBUG_ALWAYS, "Cannot find 'reinforced neighbor !\n");
00336       break;
00337     }
00338   }
00339 
00340   // Couldn't find neighbor for this flow
00341   return BROADCAST_ADDR;
00342 }
00343 void RoundIdEntry::deleteExpiredSinks()
00344 {
00345   SinkList::iterator sink_itr;
00346   SinkEntry *sink_entry;
00347   struct timeval tmv;
00348 
00349   GetTime(&tmv);
00350 
00351   // Go through all sinks
00352   for (sink_itr = sinks_.begin(); sink_itr != sinks_.end(); sink_itr++){
00353     sink_entry = *sink_itr;
00354 
00355     // Check if expired
00356     if (tmv.tv_sec > (sink_entry->tv_.tv_sec + GRADIENT_TIMEOUT)){
00357 
00358       // Expired, delete it
00359       DiffPrint(DEBUG_NO_DETAILS,
00360         "Deleting Gradient to sink %d !\n", sink_entry->port_);
00361       sink_itr = sinks_.erase(sink_itr);
00362       delete sink_entry;
00363     }
00364   }
00365 }
00366 
00367 void RoundIdEntry::deleteExpiredGradients()
00368 {
00369   GradientList::iterator gradient_itr;
00370   OPPGradientEntry *gradient_entry;
00371   struct timeval tmv;
00372 
00373   GetTime(&tmv);
00374 
00375   // Go through all gradients
00376   for (gradient_itr = gradients_.begin();
00377        gradient_itr != gradients_.end(); gradient_itr++){
00378     gradient_entry = *gradient_itr;
00379 
00380     // Check if expired
00381     if (tmv.tv_sec > (gradient_entry->tv_.tv_sec + GRADIENT_TIMEOUT)){
00382 
00383       // Expired, delete it
00384       DiffPrint(DEBUG_NO_DETAILS,
00385         "Deleting gradient to node %d !\n",
00386         gradient_entry->node_id_);
00387       gradient_itr = gradients_.erase(gradient_itr);
00388       delete gradient_entry;
00389     }
00390   }
00391 }
00392 
00393 void RoundIdEntry::updateSink(u_int16_t sink_id)
00394 {
00395   SinkList::iterator sink_itr;
00396   SinkEntry *sink_entry;
00397 
00398   // Go through all sinks
00399   for (sink_itr = sinks_.begin(); sink_itr != sinks_.end(); ++sink_itr){
00400     sink_entry = *sink_itr;
00401 
00402     if (sink_entry->port_ == sink_id){
00403       // We already have this guy
00404       GetTime(&(sink_entry->tv_));
00405       return;
00406     }
00407   }
00408 
00409   // This is a new sink, so we create a new entry on the list
00410   sink_entry = new SinkEntry(sink_id);
00411   sinks_.push_back(sink_entry);
00412 }
00413 
00414 OPPGradientEntry * RoundIdEntry::findGradient(int32_t node_id)
00415 {
00416   GradientList::iterator gradient_itr;
00417   OPPGradientEntry *gradient_entry;
00418 
00419   // Go through all gradients
00420   for (gradient_itr = gradients_.begin();
00421        gradient_itr != gradients_.end(); gradient_itr++){
00422     gradient_entry = *gradient_itr;
00423 
00424     // Is this the one we are looking for ?
00425     if (gradient_entry->node_id_ == node_id)
00426       return gradient_entry;
00427   }
00428 
00429   // Did not find a match
00430   return NULL;
00431 }
00432 
00433 void RoundIdEntry::addGradient(int32_t node_id)
00434 {
00435   OPPGradientEntry *gradient_entry;
00436 
00437   // Create new gradient
00438   gradient_entry = new OPPGradientEntry(node_id);
00439   gradients_.push_back(gradient_entry);
00440 }
00441 
00442 void RoundIdEntry::deleteGradient(int32_t node_id)
00443 {
00444   GradientList::iterator gradient_itr;
00445   OPPGradientEntry *gradient_entry;
00446 
00447   // Go through all gradients
00448   for (gradient_itr = gradients_.begin();
00449        gradient_itr != gradients_.end(); gradient_itr++){
00450     gradient_entry = *gradient_itr;
00451 
00452     // Is this the one we are looking for ?
00453     if (gradient_entry->node_id_ == node_id){
00454   
00455       DiffPrint(DEBUG_NO_DETAILS, "Deleting gradient to node %d !\n",
00456         node_id);
00457 
00458       // Found. Delete it from the list and return
00459       gradient_itr = gradients_.erase(gradient_itr);
00460       delete gradient_entry;
00461       return;
00462     }
00463   }
00464 }
00465 
00466 void OnePhasePullFilter::interestTimeout(Message *msg)
00467 {
00468   DiffPrint(DEBUG_MORE_DETAILS, "Node%d: Interest Timeout !\n", ((DiffusionRouting *)dr_)->getNodeId());
00469 
00470   msg->last_hop_ = LOCALHOST_ADDR;
00471   msg->next_hop_ = BROADCAST_ADDR;
00472  
00473   ((DiffusionRouting *)dr_)->sendMessage(msg, filter_handle_);
00474 }
00475 
00476 void OnePhasePullFilter::messageTimeout(Message *msg)
00477 {
00478   DiffPrint(DEBUG_MORE_DETAILS, "Node%d: Message Timeout !\n", ((DiffusionRouting *)dr_)->getNodeId());
00479 
00480   ((DiffusionRouting *)dr_)->sendMessage(msg, filter_handle_);
00481 }
00482 
00483 void OnePhasePullFilter::gradientTimeout()
00484 {
00485   RoutingTable::iterator routing_itr;
00486   RoutingEntry *routing_entry;
00487 
00488   DiffPrint(DEBUG_MORE_DETAILS, "Node%d: Gradient Timeout !\n",((DiffusionRouting *)dr_)->getNodeId());
00489 
00490   routing_itr = routing_list_.begin();
00491 
00492   // Iterate through the routing table
00493   for (routing_itr = routing_list_.begin();
00494        routing_itr != routing_list_.end(); routing_itr++){
00495     routing_entry = *routing_itr;
00496 
00497     // Step 1: Delete expired round ids
00498     routing_entry->deleteExpiredRoundIds();
00499 
00500     // Step 2: Remove the routing entry if no round ids left
00501     if (routing_entry->round_ids_.size() == 0){
00502 
00503       // Deleting Routing Entry
00504       DiffPrint(DEBUG_DETAILS,
00505         "Nothing left for this data type, cleaning up !\n");
00506       routing_itr = routing_list_.erase(routing_itr);
00507       delete routing_entry;
00508     }
00509   }
00510 }
00511 
00512 void OnePhasePullFilter::reinforcementTimeout()
00513 {
00514   DataNeighborList::iterator data_neighbor_itr;
00515   OPPDataNeighborEntry *data_neighbor_entry;
00516   RoutingTable::iterator routing_itr;
00517   RoutingEntry *routing_entry;
00518   Message *my_message;
00519 
00520   DiffPrint(DEBUG_MORE_DETAILS, "Reinforcement Timeout !\n");
00521 
00522   routing_itr = routing_list_.begin();
00523 
00524   while (routing_itr != routing_list_.end()){
00525     routing_entry = *routing_itr;
00526 
00527     // Step 1: Delete expired gradients
00528     data_neighbor_itr = routing_entry->data_neighbors_.begin();
00529 
00530     while (data_neighbor_itr != routing_entry->data_neighbors_.end()){
00531       data_neighbor_entry = *data_neighbor_itr;
00532 
00533       if ((!data_neighbor_entry->new_messages_) &&
00534       (data_neighbor_entry->messages_ > 0)){
00535     my_message = new Message(DIFFUSION_VERSION, NEGATIVE_REINFORCEMENT,
00536                  0, 0, routing_entry->attrs_->size(), pkt_count_,
00537                  random_id_, data_neighbor_entry->node_id_,
00538                  LOCALHOST_ADDR);
00539     my_message->msg_attr_vec_ = CopyAttrs(routing_entry->attrs_);
00540 
00541     DiffPrint(DEBUG_NO_DETAILS,
00542           "Node%d: Sending Negative Reinforcement to node %d !\n",
00543           ((DiffusionRouting *)dr_)->getNodeId(), data_neighbor_entry->node_id_);
00544 
00545     ((DiffusionRouting *)dr_)->sendMessage(my_message, filter_handle_);
00546 
00547     pkt_count_++;
00548     delete my_message;
00549 
00550     // Done. Delete entry
00551     data_neighbor_itr = routing_entry->data_neighbors_.erase(data_neighbor_itr);
00552     delete data_neighbor_entry;
00553       }
00554       else{
00555     data_neighbor_itr++;
00556       }
00557     }
00558 
00559     // Step 2: Delete data neighbors with no activity, zero flags
00560     data_neighbor_itr = routing_entry->data_neighbors_.begin();
00561     while (data_neighbor_itr != routing_entry->data_neighbors_.end()){
00562       data_neighbor_entry = *data_neighbor_itr;
00563       if (data_neighbor_entry->messages_ > 0){
00564     data_neighbor_entry->messages_ = 0;
00565     data_neighbor_entry->new_messages_ = false;
00566     data_neighbor_itr++;
00567       }
00568       else{
00569     // Delete entry
00570     data_neighbor_itr = routing_entry->data_neighbors_.erase(data_neighbor_itr);
00571     delete data_neighbor_entry;
00572       }
00573     }
00574 
00575     // Advance to the next routing entry
00576     routing_itr++;
00577   }
00578 }
00579 
00580 int OnePhasePullFilter::subscriptionTimeout(NRAttrVec *attrs)
00581 {
00582   SubscriptionList::iterator subscription_itr;
00583   SubscriptionEntry *subscription_entry;
00584   RoutingEntry *routing_entry;
00585   struct timeval tmv;
00586 
00587   DiffPrint(DEBUG_MORE_DETAILS, "Subscription Timeout !\n");
00588 
00589   GetTime(&tmv);
00590 
00591   // Find the correct Routing entry
00592   routing_entry = findRoutingEntry(attrs);
00593 
00594   if (routing_entry){
00595     // Routing entry found
00596 
00597     subscription_itr = routing_entry->subscription_list_.begin();
00598 
00599     // Go through all attributes
00600     while (subscription_itr != routing_entry->subscription_list_.end()){
00601       subscription_entry = *subscription_itr;
00602 
00603       // Check timeouts
00604       if (tmv.tv_sec > (subscription_entry->tv_.tv_sec + SUBSCRIPTION_TIMEOUT)){
00605 
00606     // Time expired, send disinterest message
00607     sendDisinterest(subscription_entry->attrs_, routing_entry);
00608     subscription_itr = routing_entry->subscription_list_.erase(subscription_itr);
00609     delete subscription_entry;
00610       }
00611       else{
00612     subscription_itr++;
00613       }
00614     }
00615   }
00616   else{
00617     DiffPrint(DEBUG_DETAILS, "Warning: Could't find subscription entry - maybe deleted by GradientTimeout ?\n");
00618 
00619     // Cancel Timer
00620     return -1;
00621   }
00622 
00623   // Keep Timer
00624   return 0;
00625 }
00626 
00627 void OnePhasePullFilter::deleteRoutingEntry(RoutingEntry *routing_entry)
00628 {
00629   RoutingTable::iterator routing_itr;
00630   RoutingEntry *current_entry;
00631 
00632   // Go through the routing table
00633   for (routing_itr = routing_list_.begin(); routing_itr != routing_list_.end(); ++routing_itr){
00634     current_entry = *routing_itr;
00635 
00636     // Is this the entry we are looking for ?
00637     if (current_entry == routing_entry){
00638       routing_itr = routing_list_.erase(routing_itr);
00639       delete routing_entry;
00640       return;
00641     }
00642   }
00643   DiffPrint(DEBUG_ALWAYS, "Error: Could not find entry to delete !\n");
00644 }
00645 
00646 RoutingEntry * OnePhasePullFilter::matchRoutingEntry(NRAttrVec *attrs, RoutingTable::iterator start, RoutingTable::iterator *place)
00647 {
00648   RoutingTable::iterator routing_itr;
00649   RoutingEntry *routing_entry;
00650 
00651   for (routing_itr = start; routing_itr != routing_list_.end(); ++routing_itr){
00652     routing_entry = *routing_itr;
00653     if (MatchAttrs(routing_entry->attrs_, attrs)){
00654       *place = routing_itr;
00655       return routing_entry;
00656     }
00657   }
00658   return NULL;
00659 }
00660 
00661 RoutingEntry * OnePhasePullFilter::findRoutingEntry(NRAttrVec *attrs)
00662 {
00663   RoutingTable::iterator routing_itr;
00664   RoutingEntry *routing_entry;
00665 
00666   for (routing_itr = routing_list_.begin(); routing_itr != routing_list_.end(); ++routing_itr){
00667     routing_entry = *routing_itr;
00668     if (PerfectMatch(routing_entry->attrs_, attrs))
00669       return routing_entry;
00670   }
00671   return NULL;
00672 }
00673 
00674 SubscriptionEntry * OnePhasePullFilter::findMatchingSubscription(RoutingEntry *routing_entry,
00675                                  NRAttrVec *attrs)
00676 {
00677   SubscriptionList::iterator subscription_itr;
00678   SubscriptionEntry *subscription_entry;
00679 
00680   for (subscription_itr = routing_entry->subscription_list_.begin(); subscription_itr != routing_entry->subscription_list_.end(); ++subscription_itr){
00681     subscription_entry = *subscription_itr;
00682     if (PerfectMatch(subscription_entry->attrs_, attrs))
00683       return subscription_entry;
00684   }
00685   return NULL;
00686 }
00687 
00688 void OnePhasePullFilter::forwardData(Message *msg,
00689                      RoutingEntry *routing_entry,
00690                      DataForwardingHistory *forwarding_history)
00691 {
00692   NRSimpleAttribute<void *> *nr_data_attr = NULL;
00693   NRAttrVec::iterator attribute_iterator;
00694   FlowIdList msg_flow_list, sinks_flow_list, local_flow_list;
00695   FlowIdList out_flow_list;
00696   int32_t out_neighbor;
00697   int *packed_flows;
00698   FlowIdList::iterator flow_id_itr;
00699   RoundIdList::iterator round_id_itr;
00700   SinkList::iterator sink_itr;
00701   RoundIdEntry *round_id_entry;
00702   SinkEntry *sink_entry;
00703   Message *sink_message, *out_message;
00704 
00705   // Step 0: Read flows from message
00706 
00707   // Find NRFlowAttr and remove from the message
00708   attribute_iterator = msg->msg_attr_vec_->begin();
00709   nr_data_attr = NRFlowAttr.find_from(msg->msg_attr_vec_,
00710                       attribute_iterator,
00711                       &attribute_iterator);
00712 
00713   if (!nr_data_attr){
00714     DiffPrint(DEBUG_ALWAYS, "Cannot find NRFlowAttr !\n");
00715     return;
00716   }
00717 
00718   msg->msg_attr_vec_->erase(attribute_iterator);
00719 
00720   // Read flow ids from list
00721   readFlowsFromList(nr_data_attr->getLen() / sizeof(int),
00722             &msg_flow_list, nr_data_attr->getVal());
00723 
00724   // Fill lists of sinks and flows
00725   routing_entry->getSinksFromList(&msg_flow_list, &sinks_flow_list);
00726   routing_entry->getFlowsFromList(&msg_flow_list, &local_flow_list);
00727 
00728   // Step 1: Sink Processing
00729   if (sinks_flow_list.size() > 0){
00730 
00731     // Copy original message so we can change it
00732     sink_message = CopyMessage(msg);
00733 
00734     // Go through all rounds
00735     for (round_id_itr = routing_entry->round_ids_.begin();
00736      round_id_itr != routing_entry->round_ids_.end();
00737      round_id_itr++){
00738 
00739       round_id_entry = *round_id_itr;
00740 
00741       flow_id_itr = find(sinks_flow_list.begin(), sinks_flow_list.end(),
00742              round_id_entry->round_id_);
00743 
00744       if (flow_id_itr != sinks_flow_list.end()){
00745     // Flows match ! Send message to sink
00746 
00747     for (sink_itr = round_id_entry->sinks_.begin();
00748          sink_itr != round_id_entry->sinks_.end(); ++sink_itr){
00749       sink_entry = *sink_itr;
00750 
00751       if (!forwarding_history->alreadyForwardedToLibrary(sink_entry->port_)){
00752         // Forward DATA to local sinks
00753         sink_message->next_hop_ = LOCALHOST_ADDR;
00754         sink_message->next_port_ = sink_entry->port_;
00755 
00756         // Add sink to the forwarding list
00757         forwarding_history->forwardingToLibrary(sink_entry->port_);
00758 
00759         ((DiffusionRouting *)dr_)->sendMessage(sink_message, filter_handle_);
00760       }
00761     }
00762 
00763     // Remove sink from the flow_list
00764     if (!removeFlowFromList(&msg_flow_list, round_id_entry->round_id_)){
00765       // We should not get here
00766       DiffPrint(DEBUG_ALWAYS, "Cannot remove flow from msg_flow_list !\n");
00767     }
00768       }
00769     }
00770 
00771     // Delete sink message
00772     delete sink_message;
00773   }
00774 
00775   // Step 2: Intermediate Processing
00776   DiffPrint(DEBUG_NO_DETAILS, "Node%d: Forwarding Data\n", ((DiffusionRouting *)dr_)->getNodeId());
00777 
00778   // Set reinforcement flags
00779   if (msg->last_hop_ != LOCALHOST_ADDR)
00780     routing_entry->updateNeighborDataInfo(msg->last_hop_, true);
00781 
00782   // Work on local list until we finish processing all flows
00783   while (local_flow_list.size() > 0){
00784 
00785     // Initialize out_flow_list
00786     out_flow_list.clear();
00787 
00788     // Move first flow from the local flow list to out_flow_list
00789     out_flow_list.push_back(*(local_flow_list.begin()));
00790     local_flow_list.erase(local_flow_list.begin());
00791 
00792     // Remove flow from the flow_list
00793     if (!removeFlowFromList(&msg_flow_list, *(out_flow_list.begin()))){
00794       // We should not get here
00795       DiffPrint(DEBUG_ALWAYS, "Cannot remove flow from msg_flow_list !\n");
00796     }
00797 
00798     // Select output_neighbor
00799     out_neighbor = routing_entry->getNeighborFromFlow(*(out_flow_list.begin()));
00800 
00801     // Must have a valid neighbor
00802     if (out_neighbor == BROADCAST_ADDR)
00803       continue;
00804     
00805     // Go through all other local flows
00806     for (flow_id_itr = local_flow_list.begin();
00807      flow_id_itr != local_flow_list.end(); flow_id_itr++){
00808 
00809       // Check if output neighbor for this flow matches current
00810       if (routing_entry->getNeighborFromFlow(*flow_id_itr) == out_neighbor){
00811 
00812     // Yes it does !
00813 
00814     // Remove flow from the flow_list
00815     if (!removeFlowFromList(&msg_flow_list, *flow_id_itr)){
00816       // We should not get here
00817       DiffPrint(DEBUG_ALWAYS, "Cannot remove flow from msg_flow_list !\n");
00818     }
00819 
00820     // Aggregate both in a single message
00821     out_flow_list.push_back(*flow_id_itr);
00822     flow_id_itr = local_flow_list.erase(flow_id_itr);
00823 
00824       }
00825     }
00826 
00827     // out_flow_list should have a list of flow for out_neighbor
00828     out_message = CopyMessage(msg);
00829     out_message->next_hop_ = out_neighbor;
00830 
00831     packed_flows = writeFlowsToList(&out_flow_list);
00832 
00833     out_message->msg_attr_vec_->push_back(NRFlowAttr.make(NRAttribute::IS,
00834                               (void *) packed_flows,
00835                               sizeof(int) * out_flow_list.size()));
00836 
00837     // NRFlowAttr.make will copy this, so we must delete it
00838     delete [] packed_flows;
00839 
00840     // Send it out
00841     DiffPrint(DEBUG_NO_DETAILS, "Forwarding data to node %d !\n",
00842           out_neighbor);
00843 
00844     ((DiffusionRouting *)dr_)->sendMessage(out_message, filter_handle_);
00845 
00846     // Delete message
00847     delete out_message;
00848   }
00849 
00850   // Done processing for this data type, we replace the NRFlowAttr
00851   // with the (possibly) shorter msg_flow_list list
00852   packed_flows = writeFlowsToList(&msg_flow_list);
00853 
00854   nr_data_attr->setVal((void *) packed_flows, sizeof(int) * msg_flow_list.size());
00855   msg->msg_attr_vec_->push_back(nr_data_attr);
00856 
00857   // setVal makes a copy of this, so we must delete it
00858   delete [] packed_flows;
00859 }
00860 
00861 void OnePhasePullFilter::sendInterest(NRAttrVec *attrs,
00862                       RoutingEntry *routing_entry)
00863 {
00864   RoundIdList::iterator round_id_itr;
00865   RoundIdEntry *round_id_entry;
00866   SinkList::iterator sink_itr;
00867   SinkEntry *sink_entry;
00868 
00869   Message *msg = new Message(DIFFUSION_VERSION, INTEREST, 0, 0,
00870                  attrs->size(), 0, 0, LOCALHOST_ADDR,
00871                  LOCALHOST_ADDR);
00872 
00873   msg->msg_attr_vec_ = CopyAttrs(attrs);
00874 
00875   // Go through all round ids
00876   for (round_id_itr = routing_entry->round_ids_.begin();
00877        round_id_itr != routing_entry->round_ids_.end(); round_id_itr++){
00878     round_id_entry = *round_id_itr;
00879 
00880     // Send interest message to all local sinks
00881     for (sink_itr = round_id_entry->sinks_.begin();
00882      sink_itr != round_id_entry->sinks_.end(); ++sink_itr){
00883 
00884       sink_entry = *sink_itr;
00885       msg->next_port_ = sink_entry->port_;
00886 
00887       ((DiffusionRouting *)dr_)->sendMessage(msg, filter_handle_);
00888     }
00889   }
00890 
00891   delete msg;
00892 }
00893 
00894 void OnePhasePullFilter::sendDisinterest(NRAttrVec *attrs,
00895                      RoutingEntry *routing_entry)
00896 {
00897   NRAttrVec *new_attrs;
00898   NRSimpleAttribute<int> *nrclass = NULL;
00899 
00900   new_attrs = CopyAttrs(attrs);
00901 
00902   nrclass = NRClassAttr.find(new_attrs);
00903   if (!nrclass){
00904     DiffPrint(DEBUG_ALWAYS,
00905           "Error: sendDisinterest couldn't find the class attribute !\n");
00906     ClearAttrs(new_attrs);
00907     delete new_attrs;
00908     return;
00909   }
00910 
00911   // Change the class_key value
00912   nrclass->setVal(NRAttribute::DISINTEREST_CLASS);
00913 
00914   sendInterest(new_attrs, routing_entry);
00915    
00916   ClearAttrs(new_attrs);
00917   delete new_attrs;
00918 }
00919 
00920 void OnePhasePullFilter::readFlowsFromList(int number_of_flows,
00921                        FlowIdList *flow_list,
00922                        void *source_blob)
00923 {
00924   int *current_flow;
00925 
00926   // Point to the beginning of the list
00927   current_flow = (int *) source_blob;
00928 
00929   for (int i = 0; i < number_of_flows; i++){
00930     flow_list->push_back(*current_flow);
00931 
00932     // Advance to next flow
00933     current_flow++;
00934   }
00935 }
00936 
00937 int * OnePhasePullFilter::writeFlowsToList(FlowIdList *flow_list)
00938 {
00939   FlowIdList::iterator flow_itr;
00940   int number_of_flows;
00941   int *flows, *current;;
00942 
00943   number_of_flows = flow_list->size();
00944   flows = new int[number_of_flows];
00945   current = flows;
00946 
00947   for (flow_itr = flow_list->begin();
00948        flow_itr != flow_list->end(); flow_itr++){
00949     *current = *flow_itr;
00950     current++;
00951   }
00952 
00953   return flows;
00954 }
00955 
00956 bool OnePhasePullFilter::removeFlowFromList(FlowIdList *flow_list,
00957                         int32_t flow)
00958 {
00959   FlowIdList::iterator flow_itr;
00960 
00961   flow_itr = find(flow_list->begin(), flow_list->end(), flow);
00962 
00963   if (flow_itr != flow_list->end()){
00964     flow_itr = flow_list->erase(flow_itr);
00965     return true;
00966   }
00967 
00968   return false;
00969 }
00970 
00971 void OnePhasePullFilter::addLocalFlowsToMessage(Message *msg)
00972 {
00973   RoutingTable::iterator routing_itr;
00974   RoundIdList::iterator round_id_itr;
00975   RoutingEntry *routing_entry;
00976   RoundIdEntry *round_id_entry;
00977   FlowIdList local_flows;
00978   int *packed_flows;
00979 
00980   // First we loop through our routing entries
00981   for (routing_itr = routing_list_.begin();
00982        routing_itr != routing_list_.end(); routing_itr++){
00983     routing_entry = *routing_itr;
00984 
00985     // Now go through each round
00986     for (round_id_itr = routing_entry->round_ids_.begin();
00987      round_id_itr != routing_entry->round_ids_.end();
00988      round_id_itr++){
00989       round_id_entry = *round_id_itr;
00990 
00991       local_flows.push_back(round_id_entry->round_id_);
00992     }
00993   }
00994   
00995   packed_flows = writeFlowsToList(&local_flows);
00996 
00997   msg->msg_attr_vec_->push_back(NRFlowAttr.make(NRAttribute::IS,
00998                         (void *) packed_flows,
00999                         sizeof(int) * local_flows.size()));
01000 
01001   // NRFlowAttr.make will copy this, so we must delete it here
01002   delete [] packed_flows;
01003   local_flows.clear();
01004 }
01005 
01006 void OnePhasePullFilter::recv(Message *msg, handle h)
01007 {
01008   if (h != filter_handle_){
01009     DiffPrint(DEBUG_ALWAYS,
01010           "Error: received msg for handle %d, subscribed to handle %d !\n",
01011           h, filter_handle_);
01012     return;
01013   }
01014 
01015   if (msg->new_message_ == 1)
01016     processNewMessage(msg);
01017   else
01018     processOldMessage(msg);
01019 }
01020 
01021 void OnePhasePullFilter::processOldMessage(Message *msg)
01022 {
01023   NRSimpleAttribute<int> *nrsubscription = NULL;
01024   NRAttrVec::iterator attribute_iterator;
01025   RoutingTable::iterator routing_itr;
01026   RoutingEntry *routing_entry;
01027   int32_t round_id;
01028 
01029   switch (msg->msg_type_){
01030 
01031   case INTEREST:
01032 
01033     DiffPrint(DEBUG_NO_DETAILS, "Node%d: Received Old Interest !\n",((DiffusionRouting *)dr_)->getNodeId());
01034 
01035     if (msg->last_hop_ == LOCALHOST_ADDR){
01036       // Old interest should not come from local sink
01037       DiffPrint(DEBUG_ALWAYS, "Warning: Old Interest from local sink !\n");
01038       break;
01039     }
01040 
01041     // Step 0: Take out the subscription attribute
01042     attribute_iterator = msg->msg_attr_vec_->begin();
01043     nrsubscription = NRSubscriptionAttr.find_from(msg->msg_attr_vec_,
01044                           attribute_iterator,
01045                           &attribute_iterator);
01046 
01047     // Return if we cannot find a subscription attribute
01048     if (!nrsubscription){
01049       DiffPrint(DEBUG_ALWAYS,
01050         "Warning: Can't find SUBSCRIPTION attribute in the message !\n");
01051       return;
01052     }
01053 
01054     // Delete attribute from the message
01055     msg->msg_attr_vec_->erase(attribute_iterator);
01056 
01057     // Get the routing entry for these attrs      
01058     routing_entry = findRoutingEntry(msg->msg_attr_vec_);
01059 
01060     if (routing_entry){
01061 
01062       // Use subscription id for identifying this flow
01063       round_id = nrsubscription->getVal();
01064 
01065       // Add gradient to the current round entry
01066       routing_entry->addGradient(msg->last_hop_, round_id, false);
01067     }
01068 
01069     // Add the subscription attribute back to the message
01070     msg->msg_attr_vec_->push_back(nrsubscription);
01071 
01072     break;
01073 
01074   case EXPLORATORY_DATA:
01075   case PUSH_EXPLORATORY_DATA:
01076 
01077     DiffPrint(DEBUG_ALWAYS, "Received and OLD EXPLORATORY message !\n");
01078 
01079     break;
01080 
01081   case DATA: 
01082 
01083     DiffPrint(DEBUG_NO_DETAILS, "Received an old Data message !\n");
01084 
01085     // Find the correct routing entry
01086     routing_itr = routing_list_.begin();
01087     routing_entry = matchRoutingEntry(msg->msg_attr_vec_, routing_itr,
01088                       &routing_itr);
01089 
01090     while (routing_entry){
01091       DiffPrint(DEBUG_NO_DETAILS,
01092         "Set flags to %d to OLD_MESSAGE !\n", msg->last_hop_);
01093 
01094       // Set reinforcement flags
01095       if (msg->last_hop_ != LOCALHOST_ADDR)
01096     routing_entry->updateNeighborDataInfo(msg->last_hop_, false);
01097 
01098       // Continue going through other data types
01099       routing_itr++;
01100       routing_entry = matchRoutingEntry(msg->msg_attr_vec_, routing_itr,
01101                     &routing_itr);
01102     }
01103 
01104     break;
01105 
01106    case NEGATIVE_REINFORCEMENT:
01107 
01108     DiffPrint(DEBUG_IMPORTANT, "Received an old Negative Reinforcement !\n");
01109 
01110     break;
01111 
01112   default:
01113 
01114     DiffPrint(DEBUG_ALWAYS,
01115           "Received an unknown message type: %d\n", msg->msg_type_);
01116 
01117     break;
01118   }
01119 }
01120 
01121 void OnePhasePullFilter::processNewMessage(Message *msg)
01122 {
01123   DataForwardingHistory *forwarding_history;
01124   NRSimpleAttribute<int> *nrclass = NULL;
01125   NRSimpleAttribute<int> *nrscope = NULL;
01126   NRSimpleAttribute<int> *nrsubscription = NULL;
01127   RoundIdList::iterator round_id_itr;
01128   RoutingTable::iterator routing_itr;
01129   NRAttrVec::iterator attribute_iterator;
01130   RoundIdEntry *round_id_entry;
01131   RoutingEntry *routing_entry;
01132   SubscriptionEntry *subscription_entry;
01133   Message *my_msg;
01134   TimerCallback *interest_timer, *subscription_timer;
01135   bool new_data_type = false;
01136   int32_t round_id;
01137 
01138   switch (msg->msg_type_){
01139 
01140   case INTEREST:
01141 
01142     DiffPrint(DEBUG_NO_DETAILS, "Received Interest !\n");
01143 
01144     nrclass = NRClassAttr.find(msg->msg_attr_vec_);
01145     nrscope = NRScopeAttr.find(msg->msg_attr_vec_);
01146 
01147     if (!nrclass || !nrscope){
01148       DiffPrint(DEBUG_ALWAYS,
01149         "Warning: Can't find CLASS/SCOPE attributes in the message !\n");
01150       return;
01151     }
01152 
01153     // Step 0: Take out the subscription attribute
01154     attribute_iterator = msg->msg_attr_vec_->begin();
01155     nrsubscription = NRSubscriptionAttr.find_from(msg->msg_attr_vec_,
01156                           attribute_iterator,
01157                           &attribute_iterator);
01158 
01159     // Return if we cannot find a subscription attribute
01160     if (!nrsubscription){
01161       DiffPrint(DEBUG_ALWAYS,
01162         "Warning: Can't find SUBSCRIPTION attribute in the message !\n");
01163       return;
01164     }
01165 
01166     // Delete attribute from the message
01167     msg->msg_attr_vec_->erase(attribute_iterator);
01168 
01169     // Step 1: Look for the same data type
01170     routing_entry = findRoutingEntry(msg->msg_attr_vec_);
01171 
01172     if (!routing_entry){
01173       // Create a new routing entry for this data type
01174       routing_entry = new RoutingEntry;
01175       routing_entry->attrs_ = CopyAttrs(msg->msg_attr_vec_);
01176       routing_list_.push_back(routing_entry);
01177       new_data_type = true;
01178     }
01179 
01180     // Add the subscription attribute back to the message
01181     msg->msg_attr_vec_->push_back(nrsubscription);
01182 
01183     // Use subscription id for identifying this flow
01184     round_id = nrsubscription->getVal();
01185 
01186     if (msg->last_hop_ == LOCALHOST_ADDR){
01187       // From local sink
01188       routing_entry->updateSink(msg->source_port_, round_id);
01189     }
01190     else{
01191       // Interest received from the network. Add gradient to our
01192       // last_hop neighbor
01193 
01194       // Add gradient to the current round entry
01195       routing_entry->addGradient(msg->last_hop_, round_id, true);
01196     }
01197 
01198     if ((nrclass->getVal() == NRAttribute::INTEREST_CLASS) &&
01199     (nrclass->getOp() == NRAttribute::IS)){
01200 
01201       // Global interest messages should always be forwarded
01202       if (nrscope->getVal() == NRAttribute::GLOBAL_SCOPE){
01203 
01204     interest_timer = new OppInterestForwardTimer(this, CopyMessage(msg));
01205 
01206     ((DiffusionRouting *)dr_)->addTimer(INTEREST_FORWARD_DELAY +
01207                         (int) ((INTEREST_FORWARD_JITTER * (GetRand() * 1.0 / RAND_MAX) - (INTEREST_FORWARD_JITTER / 2))),
01208                         interest_timer);
01209       }
01210     }
01211     else{
01212       if ((nrclass->getOp() != NRAttribute::IS) &&
01213       (nrscope->getVal() == NRAttribute::NODE_LOCAL_SCOPE) &&
01214       (new_data_type)){
01215 
01216     subscription_timer = new OppSubscriptionExpirationTimer(this,
01217                                  CopyAttrs(msg->msg_attr_vec_));
01218     
01219     ((DiffusionRouting *)dr_)->addTimer(SUBSCRIPTION_DELAY +
01220                         (int) (SUBSCRIPTION_DELAY * (GetRand() * 1.0 / RAND_MAX)),
01221                         subscription_timer);
01222       }
01223 
01224       // Subscriptions don't have to match other subscriptions
01225       break;
01226     }
01227 
01228     // Step 2: Match interest against other subscriptions
01229     routing_itr = routing_list_.begin();
01230     routing_entry = matchRoutingEntry(msg->msg_attr_vec_, routing_itr,
01231                       &routing_itr);
01232 
01233     while (routing_entry){
01234       // Got a match
01235       subscription_entry = findMatchingSubscription(routing_entry,
01236                             msg->msg_attr_vec_);
01237 
01238       // Do we already have this subscription
01239       if (subscription_entry){
01240     GetTime(&(subscription_entry->tv_));
01241       }
01242       else{
01243     // Create a new attribute entry, add it to the attribute list
01244     // and send an interest message to the local sink
01245     subscription_entry = new SubscriptionEntry(CopyAttrs(msg->msg_attr_vec_));
01246     routing_entry->subscription_list_.push_back(subscription_entry);
01247     sendInterest(subscription_entry->attrs_, routing_entry);
01248       }
01249       // Move to the next RoutingEntry
01250       routing_itr++;
01251       routing_entry = matchRoutingEntry(msg->msg_attr_vec_, routing_itr,
01252                     &routing_itr);
01253     }
01254 
01255       break;
01256 
01257   case EXPLORATORY_DATA:
01258   case PUSH_EXPLORATORY_DATA:
01259 
01260     DiffPrint(DEBUG_ALWAYS, "Node%d: Received EXPLORATORY Message !\n",((DiffusionRouting *)dr_)->getNodeId());
01261 
01262     break;
01263 
01264   case DATA:
01265 
01266     DiffPrint(DEBUG_NO_DETAILS, "Node%d: Received Data !\n",((DiffusionRouting *)dr_)->getNodeId());
01267 
01268     // Create data message forwarding cache
01269     forwarding_history = new DataForwardingHistory;
01270 
01271     // If message comes from local source, we include our local flows
01272     if (msg->last_hop_ == LOCALHOST_ADDR){
01273       // From local source
01274       addLocalFlowsToMessage(msg);
01275     }
01276 
01277     // Find the correct routing entry
01278     routing_itr = routing_list_.begin();
01279     routing_entry = matchRoutingEntry(msg->msg_attr_vec_, routing_itr,
01280                       &routing_itr);
01281 
01282     while (routing_entry){
01283       forwardData(msg, routing_entry, forwarding_history);
01284       routing_itr++;
01285       routing_entry = matchRoutingEntry(msg->msg_attr_vec_, routing_itr,
01286                     &routing_itr);
01287     }
01288 
01289     delete forwarding_history;
01290 
01291     break;
01292 
01293   case NEGATIVE_REINFORCEMENT:
01294 
01295     DiffPrint(DEBUG_NO_DETAILS, "Received a Negative Reinforcement !\n");
01296 
01297     // Find matching routing entry
01298     routing_entry = findRoutingEntry(msg->msg_attr_vec_);
01299 
01300     if (routing_entry){
01301 
01302       // Go through all round ids
01303       for (round_id_itr = routing_entry->round_ids_.begin();
01304        round_id_itr != routing_entry->round_ids_.end();
01305        round_id_itr++){
01306 
01307     round_id_entry = *round_id_itr;
01308 
01309     // Delete gradient to last hop
01310     round_id_entry->deleteGradient(msg->last_hop_);
01311 
01312     // Delete round id entry if nothing left
01313     if (round_id_entry->gradients_.size() == 0){
01314 
01315       round_id_itr = routing_entry->round_ids_.erase(round_id_itr);
01316       delete round_id_entry;
01317     }
01318 
01319       }
01320 
01321       // If there are no other gradients we need to send our own
01322       // negative reinforcement
01323       if (routing_entry->round_ids_.size() == 0){
01324     my_msg = new Message(DIFFUSION_VERSION, NEGATIVE_REINFORCEMENT,
01325                  0, 0, routing_entry->attrs_->size(), pkt_count_,
01326                  random_id_, BROADCAST_ADDR, LOCALHOST_ADDR);
01327     my_msg->msg_attr_vec_ = CopyAttrs(routing_entry->attrs_);
01328 
01329     DiffPrint(DEBUG_NO_DETAILS,
01330           "Broadcasting Negative Reinforcement !\n");
01331 
01332     ((DiffusionRouting *)dr_)->sendMessage(my_msg, filter_handle_);
01333 
01334     pkt_count_++;
01335     delete my_msg;
01336       }
01337     }
01338 
01339     break;
01340 
01341   default:
01342 
01343     break;
01344   }
01345 }
01346 
01347 handle OnePhasePullFilter::setupFilter()
01348 {
01349   NRAttrVec attrs;
01350   handle h;
01351 
01352   // For the One-Phase Pull filter, we set up a filter to receive
01353   // messages using this protocol
01354   attrs.push_back(NRAlgorithmAttr.make(NRAttribute::EQ,
01355                        NRAttribute::ONE_PHASE_PULL_ALGORITHM));
01356 
01357   h = ((DiffusionRouting *)dr_)->addFilter(&attrs,
01358                        ONE_PHASE_PULL_FILTER_PRIORITY,
01359                        filter_callback_);
01360 
01361   ClearAttrs(&attrs);
01362   return h;
01363 }
01364 
01365 #ifndef NS_DIFFUSION
01366 void OnePhasePullFilter::run()
01367 {
01368   // Doesn't do anything
01369   while (1){
01370     sleep(1000);
01371   }
01372 }
01373 #endif // !NS_DIFFUSION
01374 
01375 #ifdef NS_DIFFUSION
01376 OnePhasePullFilter::OnePhasePullFilter(const char *diffrtg)
01377 {
01378   DiffAppAgent *agent;
01379 #else
01380 OnePhasePullFilter::OnePhasePullFilter(int argc, char **argv)
01381 {
01382 #endif // NS_DIFFUSION
01383   struct timeval tv;
01384   TimerCallback *reinforcement_timer, *gradient_timer;
01385 
01386   GetTime(&tv);
01387   SetSeed(&tv);
01388   pkt_count_ = GetRand();
01389   random_id_ = GetRand();
01390 
01391   // Create Diffusion Routing class
01392 #ifdef NS_DIFFUSION
01393   agent = (DiffAppAgent *)TclObject::lookup(diffrtg);
01394   dr_ = agent->dr();
01395 #else
01396   parseCommandLine(argc, argv);
01397   dr_ = NR::createNR(diffusion_port_);
01398 #endif // NS_DIFFUSION
01399 
01400   // Create callback classes and set up pointers
01401   filter_callback_ = new OnePhasePullFilterReceive(this);
01402 
01403   // Set up the filter
01404   filter_handle_ = setupFilter();
01405 
01406   // Print filter information
01407   DiffPrint(DEBUG_IMPORTANT, "One-Phase Pull filter received handle %d\n",
01408         filter_handle_);
01409 
01410   // Add timers for keeping state up-to-date
01411   gradient_timer = new OppGradientExpirationCheckTimer(this);
01412   ((DiffusionRouting *)dr_)->addTimer(GRADIENT_DELAY, gradient_timer);
01413 
01414   reinforcement_timer = new OppReinforcementCheckTimer(this);
01415   ((DiffusionRouting *)dr_)->addTimer(REINFORCEMENT_DELAY, reinforcement_timer);
01416 
01417   GetTime(&tv);
01418 
01419   DiffPrint(DEBUG_ALWAYS,
01420         "One-Phase Pull filter initialized at time %ld:%ld!\n",
01421         tv.tv_sec, tv.tv_usec);
01422 }
01423 
01424 #ifndef USE_SINGLE_ADDRESS_SPACE
01425 int main(int argc, char **argv)
01426 {
01427   OnePhasePullFilter *app;
01428 
01429   // Initialize and run the Gradient Filter
01430   app = new OnePhasePullFilter(argc, argv);
01431   app->run();
01432 
01433   return 0;
01434 }
01435 #endif // !USE_SINGLE_ADDRESS_SPACE

Generated on Tue Mar 6 16:47:48 2007 for ns2 Network Simulator 2.29 by  doxygen 1.4.6