two_phase_pull.cc

Go to the documentation of this file.
00001 //
00002 // two_phase_pull.cc  : Two-Phase Pull/One-Phase Push Filter
00003 // author             : Fabio Silva and Chalermek Intanagonwiwat
00004 //
00005 // Copyright (C) 2000-2003 by the University of Southern California
00006 // $Id: two_phase_pull.cc,v 1.6 2005/09/13 04:53:47 tomh Exp $
00007 //
00008 // This program is free software; you can redistribute it and/or
00009 // modify it under the terms of the GNU General Public License,
00010 // version 2, as published by the Free Software Foundation.
00011 //
00012 // This program is distributed in the hope that it will be useful,
00013 // but WITHOUT ANY WARRANTY; without even the implied warranty of
00014 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00015 // GNU General Public License for more details.
00016 //
00017 // You should have received a copy of the GNU General Public License along
00018 // with this program; if not, write to the Free Software Foundation, Inc.,
00019 // 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA.
00020 //
00021 // Linking this file statically or dynamically with other modules is making
00022 // a combined work based on this file.  Thus, the terms and conditions of
00023 // the GNU General Public License cover the whole combination.
00024 //
00025 // In addition, as a special exception, the copyright holders of this file
00026 // give you permission to combine this file with free software programs or
00027 // libraries that are released under the GNU LGPL and with code included in
00028 // the standard release of ns-2 under the Apache 2.0 license or under
00029 // otherwise-compatible licenses with advertising requirements (or modified
00030 // versions of such code, with unchanged license).  You may copy and
00031 // distribute such a system following the terms of the GNU GPL for this
00032 // file and the licenses of the other code concerned, provided that you
00033 // include the source code of that other code when and as the GNU GPL
00034 // requires distribution of source code.
00035 //
00036 // Note that people who make modified versions of this file are not
00037 // obligated to grant this special exception for their modified versions;
00038 // it is their choice whether to do so.  The GNU General Public License
00039 // gives permission to release a modified version without this exception;
00040 // this exception also makes it possible to release a modified version
00041 // which carries forward this exception.
00042 
00043 #include "two_phase_pull.hh"
00044 
00045 #ifdef NS_DIFFUSION
00046 static class GradientFilterClass : public TclClass {
00047 public:
00048   GradientFilterClass() : TclClass("Application/DiffApp/GradientFilter") {}
00049   TclObject* create(int argc, const char*const* argv) {
00050     if (argc == 5)
00051       return(new GradientFilter(argv[4]));
00052     else
00053       fprintf(stderr, "Insufficient number of args for creating GradientFilter");
00054     return (NULL);
00055   }
00056 } class_gradient_filter;
00057 
00058 int GradientFilter::command(int argc, const char*const* argv) {
00059   if (argc == 3) {
00060     if (strcasecmp(argv[1], "debug") == 0) {
00061       global_debug_level = atoi(argv[2]);
00062       if (global_debug_level < 1 || global_debug_level > 10) {
00063     global_debug_level = DEBUG_DEFAULT;
00064     printf("Error: Debug level outside range(1-10) or missing !\n");
00065       }
00066     }
00067   }
00068   return DiffApp::command(argc, argv);
00069 }
00070 
00071 #endif // NS_DIFFUSION
00072 
00073 void GradientFilterReceive::recv(Message *msg, handle h)
00074 {
00075   app_->recv(msg, h);
00076 }
00077 
00078 int TppMessageSendTimer::expire()
00079 {
00080   // Call timeout function
00081   agent_->messageTimeout(msg_);
00082 
00083   // Do not reschedule this timer
00084   delete this;
00085   return -1;
00086 }
00087 
00088 int TppInterestForwardTimer::expire()
00089 {
00090   // Call timeout function
00091   agent_->interestTimeout(msg_);
00092 
00093   // Do not reschedule this timer
00094   delete this;
00095   return -1;
00096 }
00097 
00098 int TppSubscriptionExpirationTimer::expire()
00099 {
00100   int retval;
00101 
00102   retval = agent_->subscriptionTimeout(attrs_);
00103 
00104   // Delete timer if we are not rescheduling it
00105   if (retval == -1)
00106     delete this;
00107 
00108   return retval;
00109 }
00110 
00111 int TppGradientExpirationCheckTimer::expire()
00112 {
00113   // Call the callback function
00114   agent_->gradientTimeout();
00115 
00116   // Reschedule this timer
00117   return 0;
00118 }
00119 
00120 int TppReinforcementCheckTimer::expire()
00121 {
00122   // Call the callback function
00123   agent_->reinforcementTimeout();
00124 
00125   // Reschedule this timer
00126   return 0;
00127 }
00128 
00129 void GradientFilter::interestTimeout(Message *msg)
00130 {
00131   DiffPrint(DEBUG_MORE_DETAILS, "Interest Timeout !\n");
00132 
00133   msg->last_hop_ = LOCALHOST_ADDR;
00134   msg->next_hop_ = BROADCAST_ADDR;
00135  
00136   ((DiffusionRouting *)dr_)->sendMessage(msg, filter_handle_);
00137 }
00138 
00139 void GradientFilter::messageTimeout(Message *msg)
00140 {
00141   DiffPrint(DEBUG_MORE_DETAILS, "Message Timeout !\n");
00142 
00143   ((DiffusionRouting *)dr_)->sendMessage(msg, filter_handle_);
00144 }
00145 
00146 void GradientFilter::gradientTimeout()
00147 {
00148   RoutingTable::iterator routing_itr;
00149   GradientList::iterator grad_itr;
00150   AgentList::iterator agent_itr;
00151   TppRoutingEntry *routing_entry;
00152   GradientEntry *gradient_entry;
00153   AgentEntry *agent_entry;
00154   struct timeval tmv;
00155 
00156   DiffPrint(DEBUG_MORE_DETAILS, "Gradient Timeout !\n");
00157 
00158   GetTime(&tmv);
00159 
00160   routing_itr = routing_list_.begin();
00161 
00162   while (routing_itr != routing_list_.end()){
00163     routing_entry = *routing_itr;
00164 
00165     // Step 1: Delete expired gradients
00166     grad_itr = routing_entry->gradients_.begin();
00167     while (grad_itr != routing_entry->gradients_.end()){
00168       gradient_entry = *grad_itr;
00169       if (tmv.tv_sec > (gradient_entry->tv_.tv_sec + GRADIENT_TIMEOUT)){
00170 
00171     DiffPrint(DEBUG_NO_DETAILS, "Deleting Gradient to node %d !\n",
00172           gradient_entry->node_addr_);
00173 
00174     grad_itr = routing_entry->gradients_.erase(grad_itr);
00175     delete gradient_entry;
00176       }
00177       else{
00178     grad_itr++;
00179       }
00180     }
00181 
00182     // Step 2: Remove non-active agents
00183     agent_itr = routing_entry->agents_.begin();
00184     while (agent_itr != routing_entry->agents_.end()){
00185       agent_entry = *agent_itr;
00186       if (tmv.tv_sec > (agent_entry->tv_.tv_sec + GRADIENT_TIMEOUT)){
00187 
00188     DiffPrint(DEBUG_NO_DETAILS,
00189           "Deleting Gradient to agent %d !\n", agent_entry->port_);
00190 
00191     agent_itr = routing_entry->agents_.erase(agent_itr);
00192     delete agent_entry;
00193       }
00194       else{
00195     agent_itr++;
00196       }
00197     }
00198 
00199     // Remove the Routing Entry if no gradients and no agents
00200     if ((routing_entry->gradients_.size() == 0) &&
00201     (routing_entry->agents_.size() == 0)){
00202       // Deleting Routing Entry
00203       DiffPrint(DEBUG_DETAILS,
00204         "Nothing left for this data type, cleaning up !\n");
00205       routing_itr = routing_list_.erase(routing_itr);
00206       delete routing_entry;
00207     }
00208     else{
00209       routing_itr++;
00210     }
00211   }
00212 }
00213 
00214 void GradientFilter::reinforcementTimeout()
00215 {
00216   DataNeighborList::iterator data_neighbor_itr;
00217   DataNeighborEntry *data_neighbor_entry;
00218   RoutingTable::iterator routing_itr;
00219   TppRoutingEntry *routing_entry;
00220   Message *my_message;
00221 
00222   DiffPrint(DEBUG_MORE_DETAILS, "Reinforcement Timeout !\n");
00223 
00224   routing_itr = routing_list_.begin();
00225 
00226   while (routing_itr != routing_list_.end()){
00227     routing_entry = *routing_itr;
00228 
00229     // Step 1: Delete expired gradients
00230     data_neighbor_itr = routing_entry->data_neighbors_.begin();
00231 
00232     while (data_neighbor_itr != routing_entry->data_neighbors_.end()){
00233       data_neighbor_entry = *data_neighbor_itr;
00234 
00235       if (data_neighbor_entry->data_flag_ == OLD_MESSAGE){
00236     my_message = new Message(DIFFUSION_VERSION, NEGATIVE_REINFORCEMENT,
00237                  0, 0, routing_entry->attrs_->size(), pkt_count_,
00238                  random_id_, data_neighbor_entry->neighbor_id_,
00239                  LOCALHOST_ADDR);
00240     my_message->msg_attr_vec_ = CopyAttrs(routing_entry->attrs_);
00241 
00242     DiffPrint(DEBUG_NO_DETAILS,
00243           "Sending Negative Reinforcement to node %d !\n",
00244           data_neighbor_entry->neighbor_id_);
00245 
00246     ((DiffusionRouting *)dr_)->sendMessage(my_message, filter_handle_);
00247 
00248     pkt_count_++;
00249     delete my_message;
00250 
00251     // Done. Delete entry
00252     data_neighbor_itr = routing_entry->data_neighbors_.erase(data_neighbor_itr);
00253     delete data_neighbor_entry;
00254       }
00255       else{
00256     data_neighbor_itr++;
00257       }
00258     }
00259 
00260     // Step 2: Delete data neighbors with no activity, zero flags
00261     data_neighbor_itr = routing_entry->data_neighbors_.begin();
00262     while (data_neighbor_itr != routing_entry->data_neighbors_.end()){
00263       data_neighbor_entry = *data_neighbor_itr;
00264       if (data_neighbor_entry->data_flag_ == NEW_MESSAGE){
00265     data_neighbor_entry->data_flag_ = 0;
00266     data_neighbor_itr++;
00267       }
00268       else{
00269     // Delete entry
00270     data_neighbor_itr = routing_entry->data_neighbors_.erase(data_neighbor_itr);
00271     delete data_neighbor_entry;
00272       }
00273     }
00274 
00275     // Advance to the next routing entry
00276     routing_itr++;
00277   }
00278 }
00279 
00280 int GradientFilter::subscriptionTimeout(NRAttrVec *attrs)
00281 {
00282   AttributeList::iterator attribute_itr;
00283   AttributeEntry *attribute_entry;
00284   TppRoutingEntry *routing_entry;
00285   struct timeval tmv;
00286 
00287   DiffPrint(DEBUG_MORE_DETAILS, "Subscription Timeout !\n");
00288 
00289   GetTime(&tmv);
00290 
00291   // Find the correct Routing Entry
00292   routing_entry = findRoutingEntry(attrs);
00293 
00294   if (routing_entry){
00295     // Step 1: Check Timeouts
00296 
00297     attribute_itr = routing_entry->attr_list_.begin();
00298 
00299     while (attribute_itr != routing_entry->attr_list_.end()){
00300       attribute_entry = *attribute_itr;
00301       if (tmv.tv_sec > (attribute_entry->tv_.tv_sec + SUBSCRIPTION_TIMEOUT)){
00302     sendDisinterest(attribute_entry->attrs_, routing_entry);
00303     attribute_itr = routing_entry->attr_list_.erase(attribute_itr);
00304     delete attribute_entry;
00305       }
00306       else{
00307     attribute_itr++;
00308       }
00309     }
00310   }
00311   else{
00312     DiffPrint(DEBUG_DETAILS, "Warning: SubscriptionTimeout could't find RE - maybe deleted by GradientTimeout ?\n");
00313 
00314     // Cancel Timer
00315     return -1;
00316   }
00317 
00318   // Keep Timer
00319   return 0;
00320 }
00321 
00322 void GradientFilter::deleteRoutingEntry(TppRoutingEntry *routing_entry)
00323 {
00324   RoutingTable::iterator routing_itr;
00325   TppRoutingEntry *current_entry;
00326 
00327   for (routing_itr = routing_list_.begin(); routing_itr != routing_list_.end(); ++routing_itr){
00328     current_entry = *routing_itr;
00329     if (current_entry == routing_entry){
00330       routing_itr = routing_list_.erase(routing_itr);
00331       delete routing_entry;
00332       return;
00333     }
00334   }
00335   DiffPrint(DEBUG_ALWAYS, "Error: deleteRoutingEntry could not find entry to delete !\n");
00336 }
00337 
00338 TppRoutingEntry * GradientFilter::matchRoutingEntry(NRAttrVec *attrs, RoutingTable::iterator start, RoutingTable::iterator *place)
00339 {
00340   RoutingTable::iterator routing_itr;
00341   TppRoutingEntry *routing_entry;
00342 
00343   for (routing_itr = start; routing_itr != routing_list_.end(); ++routing_itr){
00344     routing_entry = *routing_itr;
00345     if (MatchAttrs(routing_entry->attrs_, attrs)){
00346       *place = routing_itr;
00347       return routing_entry;
00348     }
00349   }
00350   return NULL;
00351 }
00352 
00353 TppRoutingEntry * GradientFilter::findRoutingEntry(NRAttrVec *attrs)
00354 {
00355   RoutingTable::iterator routing_itr;
00356   TppRoutingEntry *routing_entry;
00357 
00358   for (routing_itr = routing_list_.begin(); routing_itr != routing_list_.end(); ++routing_itr){
00359     routing_entry = *routing_itr;
00360     if (PerfectMatch(routing_entry->attrs_, attrs))
00361       return routing_entry;
00362   }
00363   return NULL;
00364 }
00365 
00366 AttributeEntry * GradientFilter::findMatchingSubscription(TppRoutingEntry *routing_entry,
00367                               NRAttrVec *attrs)
00368 {
00369   AttributeList::iterator attribute_itr;
00370   AttributeEntry *attribute_entry;
00371 
00372   for (attribute_itr = routing_entry->attr_list_.begin(); attribute_itr != routing_entry->attr_list_.end(); ++attribute_itr){
00373     attribute_entry = *attribute_itr;
00374     if (PerfectMatch(attribute_entry->attrs_, attrs))
00375       return attribute_entry;
00376   }
00377   return NULL;
00378 }
00379 
00380 void GradientFilter::updateGradient(TppRoutingEntry *routing_entry,
00381                     int32_t last_hop, bool reinforced)
00382 {
00383   GradientList::iterator gradient_itr;
00384   GradientEntry *gradient_entry;
00385 
00386   for (gradient_itr = routing_entry->gradients_.begin();
00387        gradient_itr != routing_entry->gradients_.end(); ++gradient_itr){
00388     gradient_entry = *gradient_itr;
00389     if (gradient_entry->node_addr_ == last_hop){
00390       GetTime(&(gradient_entry->tv_));
00391       if (reinforced)
00392     gradient_entry->reinforced_ = true;
00393       return;
00394     }
00395   }
00396 
00397   // We need to add a new gradient
00398   gradient_entry = new GradientEntry(last_hop);
00399   if (reinforced)
00400     gradient_entry->reinforced_ = true;
00401 
00402   routing_entry->gradients_.push_back(gradient_entry);
00403 }
00404 
00405 void GradientFilter::updateAgent(TppRoutingEntry *routing_entry,
00406                  u_int16_t source_port)
00407 {
00408   AgentList::iterator agent_itr;
00409   AgentEntry *agent_entry;
00410 
00411   for (agent_itr = routing_entry->agents_.begin(); agent_itr != routing_entry->agents_.end(); ++agent_itr){
00412     agent_entry = *agent_itr;
00413     if (agent_entry->port_ == source_port){
00414       // We already have this guy
00415       GetTime(&(agent_entry->tv_));
00416       return;
00417     }
00418   }
00419 
00420   // This is a new agent, so we create a new entry and add it to the
00421   // list of known agents
00422   agent_entry = new AgentEntry(source_port);
00423   routing_entry->agents_.push_back(agent_entry);
00424 }
00425 
00426 void GradientFilter::forwardPushExploratoryData(Message *msg,
00427                         DataForwardingHistory *forwarding_history)
00428 {
00429   RoutingTable::iterator routing_itr;
00430   TppRoutingEntry *routing_entry;
00431   AgentList::iterator agent_itr;
00432   AgentEntry *agent_entry;
00433   Message *data_msg, *sink_message;
00434   TimerCallback *data_timer;
00435   unsigned int key[2];
00436   HashEntry *hash_entry;
00437 
00438   // Sink processing
00439   routing_itr = routing_list_.begin();
00440   routing_entry = matchRoutingEntry(msg->msg_attr_vec_, routing_itr,
00441                     &routing_itr);
00442 
00443   sink_message = CopyMessage(msg);
00444 
00445   while (routing_entry){
00446 
00447     // Forward message to all local sinks
00448     for (agent_itr = routing_entry->agents_.begin();
00449      agent_itr != routing_entry->agents_.end(); ++agent_itr){
00450       agent_entry = *agent_itr;
00451 
00452       if (!forwarding_history->alreadyForwardedToLibrary(agent_entry->port_)){
00453     // Send DATA message to local sinks
00454     sink_message->next_hop_ = LOCALHOST_ADDR;
00455     sink_message->next_port_ = agent_entry->port_;
00456 
00457     ((DiffusionRouting *)dr_)->sendMessage(sink_message, filter_handle_);
00458 
00459     // Add agent to the forwarding history
00460     forwarding_history->forwardingToLibrary(agent_entry->port_);
00461       }
00462     }
00463 
00464     if ((!forwarding_history->alreadyReinforced()) &&
00465     (routing_entry->agents_.size() > 0) &&
00466     (msg->last_hop_ != LOCALHOST_ADDR)){
00467       // Send a positive reinforcement if we have sinks
00468       sendPositiveReinforcement(routing_entry->attrs_, msg->rdm_id_,
00469                 msg->pkt_num_, msg->last_hop_);
00470       // Record reinforcement in the forwarding history so we do it
00471       // only once per received data message
00472       forwarding_history->sendingReinforcement();
00473     }
00474 
00475     // Look for other matching data types
00476     routing_itr++;
00477     routing_entry = matchRoutingEntry(msg->msg_attr_vec_, routing_itr,
00478                       &routing_itr);
00479   }
00480 
00481   // Delete sink_message after sink processing
00482   delete sink_message;
00483 
00484   // Intermediate node processing
00485 
00486   // Add message information to the hash table
00487   if (msg->last_hop_ != LOCALHOST_ADDR){
00488     key[0] = msg->pkt_num_;
00489     key[1] = msg->rdm_id_;
00490 
00491     hash_entry = new HashEntry(msg->last_hop_);
00492 
00493     putHash(hash_entry, key[0], key[1]);
00494   }
00495 
00496   // Rebroadcast the exploratory push data message
00497   if (!forwarding_history->alreadyForwardedToNetwork(BROADCAST_ADDR)){
00498     data_msg = CopyMessage(msg);
00499     data_msg->next_hop_ = BROADCAST_ADDR;
00500 
00501     data_timer = new TppMessageSendTimer(this, data_msg);
00502 
00503     // Add data timer to the queue
00504     ((DiffusionRouting *)dr_)->addTimer(PUSH_DATA_FORWARD_DELAY +
00505                     (int) ((PUSH_DATA_FORWARD_JITTER * (GetRand() * 1.0 / RAND_MAX) - (PUSH_DATA_FORWARD_JITTER / 2))),
00506                     data_timer);
00507 
00508     // Add broadcast information to forwarding history
00509     forwarding_history->forwardingToNetwork(BROADCAST_ADDR);
00510   }
00511 }
00512 
00513 void GradientFilter::forwardExploratoryData(Message *msg,
00514                         TppRoutingEntry *routing_entry,
00515                         DataForwardingHistory *forwarding_history)
00516 {
00517 #ifdef USE_BROADCAST_TO_MULTIPLE_RECIPIENTS
00518   Message *data_msg;
00519   TimerCallback *data_timer;
00520 #else
00521   GradientList::iterator gradient_itr;
00522   GradientEntry *gradient_entry;
00523 #endif // USE_BROADCAST_TO_MULTIPLE_RECIPIENTS
00524   AgentList::iterator agent_itr;
00525   AgentEntry *agent_entry;
00526   Message *sink_message;
00527   unsigned int key[2];
00528   HashEntry *hash_entry;
00529 
00530   sink_message = CopyMessage(msg);
00531 
00532   // Step 1: Sink Processing
00533   for (agent_itr = routing_entry->agents_.begin();
00534        agent_itr != routing_entry->agents_.end(); ++agent_itr){
00535     agent_entry = *agent_itr;
00536 
00537     if (!forwarding_history->alreadyForwardedToLibrary(agent_entry->port_)){
00538       // Forward the data message to local sinks
00539       sink_message->next_hop_ = LOCALHOST_ADDR;
00540       sink_message->next_port_ = agent_entry->port_;
00541 
00542       // Add agent to the forwarding list
00543       forwarding_history->forwardingToLibrary(agent_entry->port_);
00544 
00545       ((DiffusionRouting *)dr_)->sendMessage(sink_message, filter_handle_);
00546     }
00547   }
00548 
00549   delete sink_message;
00550 
00551   // Step 1A: Reinforcement Processing
00552   if ((!forwarding_history->alreadyReinforced()) &&
00553       (routing_entry->agents_.size() > 0) &&
00554       (msg->last_hop_ != LOCALHOST_ADDR)){
00555     // Send reinforcement to 'last_hop'
00556     sendPositiveReinforcement(routing_entry->attrs_, msg->rdm_id_,
00557                   msg->pkt_num_, msg->last_hop_);
00558     // Record reinforcement in the forwarding history so we do it only
00559     // once per received data message
00560     forwarding_history->sendingReinforcement();
00561   }
00562 
00563   // Step 2: Intermediate Processing
00564 
00565   // Set reinforcement flags
00566   if (msg->last_hop_ != LOCALHOST_ADDR){
00567     setReinforcementFlags(routing_entry, msg->last_hop_, NEW_MESSAGE);
00568   }
00569 
00570   // Add message information to the hash table
00571   if (msg->last_hop_ != LOCALHOST_ADDR){
00572     key[0] = msg->pkt_num_;
00573     key[1] = msg->rdm_id_;
00574 
00575     hash_entry = new HashEntry(msg->last_hop_);
00576 
00577     putHash(hash_entry, key[0], key[1]);
00578   }
00579 
00580   // Forward the EXPLORATORY message
00581 #ifdef USE_BROADCAST_TO_MULTIPLE_RECIPIENTS
00582   if (!forwarding_history->alreadyForwardedToNetwork(BROADCAST_ADDR)){
00583     if (routing_entry->gradients_.size() > 0){
00584       // Broadcast DATA message
00585       data_msg = CopyMessage(msg);
00586       data_msg->next_hop_ = BROADCAST_ADDR;
00587 
00588       // Add to the forwarding history
00589       forwarding_history->forwardingToNetwork(BROADCAST_ADDR);
00590 
00591       data_timer = new TppMessageSendTimer(this, data_msg);
00592 
00593       // Add timer for forwarding the data packet
00594       ((DiffusionRouting *)dr_)->addTimer(DATA_FORWARD_DELAY +
00595                       (int) ((DATA_FORWARD_JITTER * (GetRand() * 1.0 / RAND_MAX) - (DATA_FORWARD_JITTER / 2))),
00596                       data_timer);
00597     }
00598   }
00599 #else
00600   // Forward DATA to all output gradients
00601   for (gradient_itr = routing_entry->gradients_.begin();
00602        gradient_itr != routing_entry->gradients_.end(); ++gradient_itr){
00603 
00604     gradient_entry = *gradient_itr;
00605 
00606     // Check forwarding history
00607     if (!forwarding_history->alreadyForwardedToNetwork(gradient_entry->node_addr_)){
00608       msg->next_hop_ = gradient_entry->node_addr_;
00609       ((DiffusionRouting *)dr_)->sendMessage(msg, filter_handle_);
00610 
00611       // Add to the forwarding history
00612       forwarding_history->forwardingToNetwork(gradient_entry->node_addr_);
00613     }
00614   }
00615 #endif // USE_BROADCAST_TO_MULTIPLE_RECIPIENTS
00616 }
00617 
00618 void GradientFilter::forwardData(Message *msg, TppRoutingEntry *routing_entry,
00619                  DataForwardingHistory *forwarding_history)
00620 {
00621   GradientList::iterator gradient_itr;
00622   AgentList::iterator agent_itr;
00623   GradientEntry *gradient_entry;
00624   AgentEntry *agent_entry;
00625   Message *sink_message, *negative_reinforcement_msg;
00626   bool has_sink = false;
00627 
00628   sink_message = CopyMessage(msg);
00629 
00630   // Step 1: Sink Processing
00631   for (agent_itr = routing_entry->agents_.begin(); agent_itr != routing_entry->agents_.end(); ++agent_itr){
00632     agent_entry = *agent_itr;
00633 
00634     has_sink = true;
00635 
00636     if (!forwarding_history->alreadyForwardedToLibrary(agent_entry->port_)){
00637       // Forward DATA to local sinks
00638       sink_message->next_hop_ = LOCALHOST_ADDR;
00639       sink_message->next_port_ = agent_entry->port_;
00640 
00641       // Add agent to the forwarding list
00642       forwarding_history->forwardingToLibrary(agent_entry->port_);
00643 
00644       ((DiffusionRouting *)dr_)->sendMessage(sink_message, filter_handle_);
00645     }
00646   }
00647 
00648   delete sink_message;
00649 
00650   // Step 2: Intermediate Processing
00651 
00652   // Set reinforcement flags
00653   if (msg->last_hop_ != LOCALHOST_ADDR){
00654     setReinforcementFlags(routing_entry, msg->last_hop_, NEW_MESSAGE);
00655   }
00656 
00657   // Forward DATA only to reinforced gradients
00658   gradient_itr = routing_entry->gradients_.begin();
00659   gradient_entry = findReinforcedGradients(&routing_entry->gradients_,
00660                        gradient_itr, &gradient_itr);
00661 
00662   if (gradient_entry){
00663     while (gradient_entry){
00664 
00665       // Found reinforced gradient, forward data message to this
00666       // neighbor only if the messages comes from a different neighbor
00667       if (gradient_entry->node_addr_ != msg->last_hop_){
00668     msg->next_hop_ = gradient_entry->node_addr_;
00669 
00670     // Check if we have forwarded the message to this neighbor already
00671     if (!forwarding_history->alreadyForwardedToNetwork(msg->next_hop_)){
00672       DiffPrint(DEBUG_NO_DETAILS,
00673             "Node%d: Forwarding data using Reinforced Gradient to node %d !\n",
00674             ((DiffusionRouting *)dr_)->getNodeId(), gradient_entry->node_addr_);
00675 
00676       ((DiffusionRouting *)dr_)->sendMessage(msg, filter_handle_);
00677 
00678       // Add the node to the forwarding history
00679       forwarding_history->forwardingToNetwork(msg->next_hop_);
00680     }
00681       }
00682 
00683       // Move to the next one
00684       gradient_itr++;
00685       gradient_entry = findReinforcedGradients(&routing_entry->gradients_,
00686                            gradient_itr, &gradient_itr);
00687     }
00688   }
00689   else{
00690     // We could not find a reinforced path, so we send a negative
00691     // reinforcement to last_hop
00692     if ((!has_sink) && (msg->last_hop_ != LOCALHOST_ADDR)){
00693       negative_reinforcement_msg = new Message(DIFFUSION_VERSION,
00694                            NEGATIVE_REINFORCEMENT,
00695                            0, 0,
00696                            routing_entry->attrs_->size(),
00697                            pkt_count_,
00698                            random_id_,
00699                            msg->last_hop_,
00700                            LOCALHOST_ADDR);
00701       negative_reinforcement_msg->msg_attr_vec_ = CopyAttrs(routing_entry->attrs_);
00702 
00703       DiffPrint(DEBUG_NO_DETAILS,
00704         "Sending Negative Reinforcement to node %d !\n",
00705         msg->last_hop_);
00706 
00707       ((DiffusionRouting *)dr_)->sendMessage(negative_reinforcement_msg,
00708                          filter_handle_);
00709 
00710       pkt_count_++;
00711       delete negative_reinforcement_msg;
00712     }
00713   }
00714 }
00715 
00716 void GradientFilter::sendPositiveReinforcement(NRAttrVec *reinf_attrs,
00717                            int32_t data_rdm_id,
00718                            int32_t data_pkt_num,
00719                            int32_t destination)
00720 {
00721   ReinforcementBlob *reinforcement_blob;
00722   NRAttribute *reinforcement_attr;
00723   TimerCallback *reinforcement_timer;
00724   Message *pos_reinf_message;
00725   NRAttrVec *attrs;
00726 
00727   reinforcement_blob = new ReinforcementBlob(data_rdm_id, data_pkt_num);
00728 
00729   reinforcement_attr = ReinforcementAttr.make(NRAttribute::IS,
00730                           (void *) reinforcement_blob,
00731                           sizeof(ReinforcementBlob));
00732 
00733   attrs = CopyAttrs(reinf_attrs);
00734   attrs->push_back(reinforcement_attr);
00735 
00736   pos_reinf_message = new Message(DIFFUSION_VERSION, POSITIVE_REINFORCEMENT,
00737                   0, 0, attrs->size(), pkt_count_,
00738                   random_id_, destination, LOCALHOST_ADDR);
00739   pos_reinf_message->msg_attr_vec_ = CopyAttrs(attrs);
00740 
00741   DiffPrint(DEBUG_NO_DETAILS, "Sending Positive Reinforcement to node %d !\n",
00742         destination);
00743 
00744   // Create timer for sending this message
00745   reinforcement_timer = new TppMessageSendTimer(this, pos_reinf_message);
00746 
00747   // Add timer to the event queue
00748   ((DiffusionRouting *)dr_)->addTimer(POS_REINFORCEMENT_SEND_DELAY +
00749                       (int) ((POS_REINFORCEMENT_JITTER * (GetRand() * 1.0 / RAND_MAX) - (POS_REINFORCEMENT_JITTER / 2))),
00750                       reinforcement_timer);
00751   pkt_count_++;
00752   ClearAttrs(attrs);
00753   delete attrs;
00754   delete reinforcement_blob;
00755 }
00756 
00757 GradientEntry * GradientFilter::findReinforcedGradients(GradientList *gradients,
00758                             GradientList::iterator start,
00759                             GradientList::iterator *place)
00760 {
00761   GradientList::iterator gradient_itr;
00762   GradientEntry *gradient_entry;
00763 
00764   for (gradient_itr = start; gradient_itr != gradients->end(); ++gradient_itr){
00765     gradient_entry = *gradient_itr;
00766     if (gradient_entry->reinforced_){
00767       *place = gradient_itr;
00768       return gradient_entry;
00769     }
00770   }
00771 
00772   return NULL;
00773 }
00774 
00775 GradientEntry * GradientFilter::findReinforcedGradient(int32_t node_addr,
00776                                TppRoutingEntry *routing_entry)
00777 {
00778   GradientList::iterator gradient_itr;
00779   GradientEntry *gradient_entry;
00780 
00781   gradient_itr = routing_entry->gradients_.begin();
00782   gradient_entry = findReinforcedGradients(&routing_entry->gradients_,
00783                        gradient_itr, &gradient_itr);
00784 
00785   if (gradient_entry){
00786     while(gradient_entry){
00787       if (gradient_entry->node_addr_ == node_addr)
00788     return gradient_entry;
00789 
00790       // This is not the gradient we are looking for, keep looking
00791       gradient_itr++;
00792       gradient_entry = findReinforcedGradients(&routing_entry->gradients_,
00793                            gradient_itr, &gradient_itr);
00794     }
00795   }
00796 
00797   return NULL;
00798 }
00799 
00800 void GradientFilter::deleteGradient(TppRoutingEntry *routing_entry,
00801                     GradientEntry *gradient_entry)
00802 {
00803   GradientList::iterator gradient_itr;
00804   GradientEntry *current_entry;
00805 
00806   for (gradient_itr = routing_entry->gradients_.begin();
00807        gradient_itr != routing_entry->gradients_.end(); ++gradient_itr){
00808     current_entry = *gradient_itr;
00809     if (current_entry == gradient_entry){
00810       gradient_itr = routing_entry->gradients_.erase(gradient_itr);
00811       delete gradient_entry;
00812       return;
00813     }
00814   }
00815   DiffPrint(DEBUG_ALWAYS,
00816         "Error: deleteGradient could not find gradient to delete !\n");
00817 }
00818 
00819 void GradientFilter::setReinforcementFlags(TppRoutingEntry *routing_entry,
00820                        int32_t last_hop, int new_message)
00821 {
00822   DataNeighborList::iterator data_neighbor_itr;
00823   DataNeighborEntry *data_neighbor_entry;
00824 
00825   for (data_neighbor_itr = routing_entry->data_neighbors_.begin();
00826        data_neighbor_itr != routing_entry->data_neighbors_.end();
00827        ++data_neighbor_itr){
00828     data_neighbor_entry = *data_neighbor_itr;
00829     if (data_neighbor_entry->neighbor_id_ == last_hop){
00830       if (data_neighbor_entry->data_flag_ > 0)
00831     return;
00832       data_neighbor_entry->data_flag_ = new_message;
00833       return;
00834     }
00835   }
00836 
00837   // We need to add a new data neighbor
00838   data_neighbor_entry = new DataNeighborEntry(last_hop, new_message);
00839 
00840   routing_entry->data_neighbors_.push_back(data_neighbor_entry);
00841 }
00842 
00843 void GradientFilter::sendInterest(NRAttrVec *attrs, TppRoutingEntry *routing_entry)
00844 {
00845   AgentList::iterator agent_itr;
00846   AgentEntry *agent_entry;
00847 
00848   Message *msg = new Message(DIFFUSION_VERSION, INTEREST, 0, 0,
00849                  attrs->size(), 0, 0, LOCALHOST_ADDR,
00850                  LOCALHOST_ADDR);
00851 
00852   msg->msg_attr_vec_ = CopyAttrs(attrs);
00853 
00854   for (agent_itr = routing_entry->agents_.begin(); agent_itr != routing_entry->agents_.end(); ++agent_itr){
00855     agent_entry = *agent_itr;
00856 
00857     msg->next_port_ = agent_entry->port_;
00858 
00859     ((DiffusionRouting *)dr_)->sendMessage(msg, filter_handle_);
00860   }
00861 
00862   delete msg;
00863 }
00864 
00865 void GradientFilter::sendDisinterest(NRAttrVec *attrs,
00866                      TppRoutingEntry *routing_entry)
00867 {
00868   NRAttrVec *new_attrs;
00869   NRSimpleAttribute<int> *nrclass = NULL;
00870 
00871   new_attrs = CopyAttrs(attrs);
00872 
00873   nrclass = NRClassAttr.find(new_attrs);
00874   if (!nrclass){
00875     DiffPrint(DEBUG_ALWAYS,
00876           "Error: sendDisinterest couldn't find the class attribute !\n");
00877     ClearAttrs(new_attrs);
00878     delete new_attrs;
00879     return;
00880   }
00881 
00882   // Change the class_key value
00883   nrclass->setVal(NRAttribute::DISINTEREST_CLASS);
00884 
00885   sendInterest(new_attrs, routing_entry);
00886    
00887   ClearAttrs(new_attrs);
00888   delete new_attrs;
00889 }
00890 
00891 void GradientFilter::recv(Message *msg, handle h)
00892 {
00893   if (h != filter_handle_){
00894     DiffPrint(DEBUG_ALWAYS,
00895           "Error: received msg for handle %d, subscribed to handle %d !\n",
00896           h, filter_handle_);
00897     return;
00898   }
00899 
00900   if (msg->new_message_ == 1)
00901     processNewMessage(msg);
00902   else
00903     processOldMessage(msg);
00904 }
00905 
00906 void GradientFilter::processOldMessage(Message *msg)
00907 {
00908   TppRoutingEntry *routing_entry;
00909   RoutingTable::iterator routing_itr;
00910 
00911   switch (msg->msg_type_){
00912 
00913   case INTEREST:
00914 
00915     DiffPrint(DEBUG_NO_DETAILS, "Node%d: Received Old Interest !\n", ((DiffusionRouting *)dr_)->getNodeId());
00916 
00917     if (msg->last_hop_ == LOCALHOST_ADDR){
00918       // Old interest should not come from local agent
00919       DiffPrint(DEBUG_ALWAYS, "Warning: Old Interest from local agent !\n");
00920       break;
00921     }
00922 
00923     // Get the routing entry for these attrs      
00924     routing_entry = findRoutingEntry(msg->msg_attr_vec_);
00925     if (routing_entry)
00926       updateGradient(routing_entry, msg->last_hop_, false);
00927 
00928     break;
00929 
00930   case DATA: 
00931 
00932     DiffPrint(DEBUG_NO_DETAILS, "Node%d: Received an old Data message !\n", ((DiffusionRouting *)dr_)->getNodeId());
00933 
00934     // Find the correct routing entry
00935     routing_itr = routing_list_.begin();
00936     routing_entry = matchRoutingEntry(msg->msg_attr_vec_, routing_itr,
00937                       &routing_itr);
00938 
00939     while (routing_entry){
00940       DiffPrint(DEBUG_NO_DETAILS,
00941         "Set flags to %d to OLD_MESSAGE !\n", msg->last_hop_);
00942 
00943       // Set reinforcement flags
00944       if (msg->last_hop_ != LOCALHOST_ADDR){
00945     setReinforcementFlags(routing_entry, msg->last_hop_, OLD_MESSAGE);
00946       }
00947 
00948       // Continue going through other data types
00949       routing_itr++;
00950       routing_entry = matchRoutingEntry(msg->msg_attr_vec_, routing_itr,
00951                     &routing_itr);
00952     }
00953 
00954     break;
00955 
00956   case PUSH_EXPLORATORY_DATA:
00957 
00958     // Just drop it
00959     DiffPrint(DEBUG_NO_DETAILS,
00960           "Received an old Push Exploratory Data. Loop detected !\n");
00961     
00962     break;
00963 
00964   case EXPLORATORY_DATA:
00965     
00966     // Just drop it
00967     DiffPrint(DEBUG_NO_DETAILS,
00968           "Received an old Exploratory Data. Loop detected !\n");
00969 
00970     break;
00971 
00972   case POSITIVE_REINFORCEMENT:
00973 
00974     DiffPrint(DEBUG_IMPORTANT, "Received an old Positive Reinforcement !\n");
00975 
00976     break;
00977 
00978   case NEGATIVE_REINFORCEMENT:
00979 
00980     DiffPrint(DEBUG_IMPORTANT, "Received an old Negative Reinforcement !\n");
00981 
00982     DiffPrint(DEBUG_IMPORTANT, "pkt_num = %d, rdm_id = %d !\n",
00983           msg->pkt_num_, msg->rdm_id_);
00984 
00985     break;
00986 
00987   default: 
00988 
00989     break;
00990   }
00991 }
00992 
00993 void GradientFilter::processNewMessage(Message *msg)
00994 {
00995   NRSimpleAttribute<void *> *reinforcement_attr = NULL;
00996   DataForwardingHistory *forwarding_history;
00997   NRSimpleAttribute<int> *nrclass = NULL;
00998   NRSimpleAttribute<int> *nrscope = NULL;
00999   ReinforcementBlob *reinforcement_blob;
01000   RoutingTable::iterator routing_itr;
01001   TppRoutingEntry *routing_entry;
01002   GradientList::iterator gradient_itr;
01003   GradientEntry *gradient_entry;
01004   NRAttrVec::iterator place;
01005   HashEntry *hash_entry;
01006   AttributeEntry *attribute_entry;
01007   Message *my_msg;
01008   TimerCallback *interest_timer, *subscription_timer;
01009   unsigned int key[2];
01010   bool new_data_type = false;
01011 
01012   switch (msg->msg_type_){
01013 
01014   case INTEREST:
01015 
01016     DiffPrint(DEBUG_NO_DETAILS, "Received Interest !\n");
01017 
01018     nrclass = NRClassAttr.find(msg->msg_attr_vec_);
01019     nrscope = NRScopeAttr.find(msg->msg_attr_vec_);
01020 
01021     if (!nrclass || !nrscope){
01022       DiffPrint(DEBUG_ALWAYS,
01023         "Warning: Can't find CLASS/SCOPE attributes in the message !\n");
01024       return;
01025     }
01026 
01027     // Step 1: Look for the same data type
01028     routing_entry = findRoutingEntry(msg->msg_attr_vec_);
01029 
01030     if (!routing_entry){
01031       // Create a new routing entry for this data type
01032       routing_entry = new TppRoutingEntry;
01033       routing_entry->attrs_ = CopyAttrs(msg->msg_attr_vec_);
01034       routing_list_.push_back(routing_entry);
01035       new_data_type = true;
01036     }
01037 
01038     if (msg->last_hop_ == LOCALHOST_ADDR){
01039       // From local agent
01040       updateAgent(routing_entry, msg->source_port_);
01041     }
01042     else{
01043       // From outside, we just add the new gradient
01044       updateGradient(routing_entry, msg->last_hop_, false);
01045     }
01046 
01047     if ((nrclass->getVal() == NRAttribute::INTEREST_CLASS) &&
01048     (nrclass->getOp() == NRAttribute::IS)){
01049 
01050       // Global interest messages should always be forwarded
01051       if (nrscope->getVal() == NRAttribute::GLOBAL_SCOPE){
01052 
01053     interest_timer = new TppInterestForwardTimer(this, CopyMessage(msg));
01054 
01055     ((DiffusionRouting *)dr_)->addTimer(INTEREST_FORWARD_DELAY +
01056                         (int) ((INTEREST_FORWARD_JITTER * (GetRand() * 1.0 / RAND_MAX) - (INTEREST_FORWARD_JITTER / 2))),
01057                         interest_timer);
01058       }
01059     }
01060     else{
01061       if ((nrclass->getOp() != NRAttribute::IS) &&
01062       (nrscope->getVal() == NRAttribute::NODE_LOCAL_SCOPE) &&
01063       (new_data_type)){
01064 
01065     subscription_timer = new TppSubscriptionExpirationTimer(this,
01066                                  CopyAttrs(msg->msg_attr_vec_));
01067     
01068     ((DiffusionRouting *)dr_)->addTimer(SUBSCRIPTION_DELAY +
01069                         (int) (SUBSCRIPTION_DELAY * (GetRand() * 1.0 / RAND_MAX)),
01070                         subscription_timer);
01071       }
01072 
01073       // Subscriptions don't have to match other subscriptions
01074       break;
01075     }
01076 
01077     // Step 2: Match other routing tables
01078     routing_itr = routing_list_.begin();
01079     routing_entry = matchRoutingEntry(msg->msg_attr_vec_, routing_itr,
01080                       &routing_itr);
01081 
01082     while (routing_entry){
01083       // Got a match
01084       attribute_entry = findMatchingSubscription(routing_entry,
01085                          msg->msg_attr_vec_);
01086 
01087       // Do we already have this subscription
01088       if (attribute_entry){
01089     GetTime(&(attribute_entry->tv_));
01090       }
01091       else{
01092     // Create a new attribute entry, add it to the attribute list
01093     // and send an interest message to the local agent
01094     attribute_entry = new AttributeEntry(CopyAttrs(msg->msg_attr_vec_));
01095     routing_entry->attr_list_.push_back(attribute_entry);
01096     sendInterest(attribute_entry->attrs_, routing_entry);
01097       }
01098       // Move to the next TppRoutingEntry
01099       routing_itr++;
01100       routing_entry = matchRoutingEntry(msg->msg_attr_vec_, routing_itr,
01101                     &routing_itr);
01102     }
01103 
01104       break;
01105 
01106   case DATA:
01107 
01108     DiffPrint(DEBUG_NO_DETAILS, "Node%d: Received Data !\n",((DiffusionRouting *)dr_)->getNodeId());
01109 
01110     // Create data message forwarding cache
01111     forwarding_history = new DataForwardingHistory;
01112 
01113     // Find the correct routing entry
01114     routing_itr = routing_list_.begin();
01115     routing_entry = matchRoutingEntry(msg->msg_attr_vec_, routing_itr,
01116                       &routing_itr);
01117 
01118     while (routing_entry){
01119       forwardData(msg, routing_entry, forwarding_history);
01120       routing_itr++;
01121       routing_entry = matchRoutingEntry(msg->msg_attr_vec_, routing_itr,
01122                     &routing_itr);
01123     }
01124 
01125     delete forwarding_history;
01126 
01127     break;
01128 
01129   case EXPLORATORY_DATA:
01130 
01131     DiffPrint(DEBUG_NO_DETAILS, "Received Exploratory Data !\n");
01132 
01133     // Create data message forwarding cache
01134     forwarding_history = new DataForwardingHistory;
01135 
01136     // Find the correct routing entry
01137     routing_itr = routing_list_.begin();
01138     routing_entry = matchRoutingEntry(msg->msg_attr_vec_, routing_itr,
01139                       &routing_itr);
01140 
01141     while (routing_entry){
01142       forwardExploratoryData(msg, routing_entry, forwarding_history);
01143       routing_itr++;
01144       routing_entry = matchRoutingEntry(msg->msg_attr_vec_, routing_itr,
01145                     &routing_itr);
01146     }
01147 
01148     // Delete data forwarding cache
01149     delete forwarding_history;
01150 
01151     break;
01152 
01153   case PUSH_EXPLORATORY_DATA:
01154 
01155     DiffPrint(DEBUG_NO_DETAILS, "Received Push Exploratory Data !\n");
01156 
01157     // Create data message forwarding cache
01158     forwarding_history = new DataForwardingHistory;
01159 
01160     // Forward data message
01161     forwardPushExploratoryData(msg, forwarding_history);
01162 
01163     // Delete data forwarding cache
01164     delete forwarding_history;
01165 
01166     break;
01167 
01168   case POSITIVE_REINFORCEMENT:
01169 
01170     DiffPrint(DEBUG_NO_DETAILS, "Received a Positive Reinforcement !\n");
01171 
01172     // Step 0: Look for reinforcement attribute
01173     place = msg->msg_attr_vec_->begin();
01174     reinforcement_attr = ReinforcementAttr.find_from(msg->msg_attr_vec_,
01175                              place, &place);
01176     if (!reinforcement_attr){
01177       DiffPrint(DEBUG_ALWAYS,
01178         "Error: Received an invalid Positive Reinforcement message !\n");
01179       return;
01180     }
01181 
01182     // Step 1: Extract reinforcement blob from message and look for an
01183     // entry in our hash table
01184     reinforcement_blob = (ReinforcementBlob *) reinforcement_attr->getVal();
01185 
01186     key[0] = reinforcement_blob->pkt_num_;
01187     key[1] = reinforcement_blob->rdm_id_;
01188 
01189     hash_entry = getHash(key[0], key[1]);
01190 
01191     // Step 2: Remove the reinforcement attribute from the message
01192     msg->msg_attr_vec_->erase(place);
01193 
01194     // Step 3: Find a routing entry that matches this message
01195     routing_entry = findRoutingEntry(msg->msg_attr_vec_);
01196 
01197     if (!routing_entry){
01198       // So, if we do not know about this data type, this must be a
01199       // reinforcement message to a PUSHED_EXPLORATORY_DATA message
01200 
01201       // Check for class/scope (all interest message should have it)
01202       nrclass = NRClassAttr.find(msg->msg_attr_vec_);
01203       nrscope = NRScopeAttr.find(msg->msg_attr_vec_);
01204 
01205       if (!nrclass || !nrscope){
01206     DiffPrint(DEBUG_ALWAYS,
01207           "Warning: Can't find CLASS/SCOPE attributes in the message !\n");
01208     return;
01209       }
01210 
01211       // Create new Routing Entry
01212       routing_entry = new TppRoutingEntry;
01213       routing_entry->attrs_ = CopyAttrs(msg->msg_attr_vec_);
01214       routing_list_.push_back(routing_entry);
01215     }
01216 
01217     // Add reinforced gradient to last_hop
01218     updateGradient(routing_entry, msg->last_hop_, true);
01219 
01220     // Add the reinforcement attribute back to the message
01221     msg->msg_attr_vec_->push_back(reinforcement_attr);
01222 
01223     // If we have no record of this message it is either because we
01224     // originated the message (in which case, no further action is
01225     // required) or because we dropped it a long time ago because of
01226     // our hashing configuration parameters (in this case, we can't do
01227     // anything)
01228     if (hash_entry){
01229       msg->next_hop_ = hash_entry->last_hop_;
01230 
01231       DiffPrint(DEBUG_NO_DETAILS,
01232         "Forwarding Positive Reinforcement to node %d !\n",
01233         hash_entry->last_hop_);
01234 
01235       ((DiffusionRouting *)dr_)->sendMessage(msg, filter_handle_);
01236     }
01237 
01238     break;
01239 
01240   case NEGATIVE_REINFORCEMENT:
01241 
01242     DiffPrint(DEBUG_NO_DETAILS, "Received a Negative Reinforcement !\n");
01243 
01244     routing_entry = findRoutingEntry(msg->msg_attr_vec_);
01245 
01246     if (routing_entry){
01247       gradient_entry = findReinforcedGradient(msg->last_hop_, routing_entry);
01248 
01249       if (gradient_entry){
01250     // Remove reinforced gradient to last_hop
01251     deleteGradient(routing_entry, gradient_entry);
01252 
01253     gradient_entry = findReinforcedGradients(&routing_entry->gradients_,
01254                          routing_entry->gradients_.begin(),
01255                          &gradient_itr);
01256 
01257     // If there are no other reinforced outgoing gradients
01258     // we need to send our own negative reinforcement
01259     if (!gradient_entry){
01260       my_msg = new Message(DIFFUSION_VERSION, NEGATIVE_REINFORCEMENT,
01261                    0, 0, routing_entry->attrs_->size(), pkt_count_,
01262                    random_id_, BROADCAST_ADDR, LOCALHOST_ADDR);
01263       my_msg->msg_attr_vec_ = CopyAttrs(routing_entry->attrs_);
01264 
01265       DiffPrint(DEBUG_NO_DETAILS,
01266             "Forwarding Negative Reinforcement to ALL !\n");
01267 
01268       ((DiffusionRouting *)dr_)->sendMessage(my_msg, filter_handle_);
01269 
01270       pkt_count_++;
01271       delete my_msg;
01272     }
01273       }
01274     }
01275 
01276     break;
01277 
01278   default:
01279 
01280     break;
01281   }
01282 }
01283 
01284 HashEntry * GradientFilter::getHash(unsigned int pkt_num,
01285                     unsigned int rdm_id)
01286 {
01287    unsigned int key[2];
01288    
01289    key[0] = pkt_num;
01290    key[1] = rdm_id;
01291    
01292    Tcl_HashEntry *entryPtr = Tcl_FindHashEntry(&htable_, (char *)key);
01293    
01294    if (entryPtr == NULL)
01295       return NULL;
01296    
01297    return ((HashEntry *) Tcl_GetHashValue(entryPtr));
01298 }
01299 
01300 void GradientFilter::putHash(HashEntry *new_hash_entry,
01301                  unsigned int pkt_num,
01302                  unsigned int rdm_id)
01303 {
01304    Tcl_HashEntry *tcl_hash_entry;
01305    HashEntry *hash_entry;
01306    HashList::iterator hash_itr;
01307    unsigned int key[2];
01308    int new_hash_key;
01309  
01310    if (hash_list_.size() == HASH_TABLE_DATA_MAX_SIZE){
01311       // Hash table reached maximum size
01312       
01313       for (int i = 0; ((i < HASH_TABLE_DATA_REMOVE_AT_ONCE)
01314                && (hash_list_.size() > 0)); i++){
01315      hash_itr = hash_list_.begin();
01316      tcl_hash_entry = *hash_itr;
01317      hash_entry = (HashEntry *) Tcl_GetHashValue(tcl_hash_entry);
01318      delete hash_entry;
01319      hash_list_.erase(hash_itr);
01320      Tcl_DeleteHashEntry(tcl_hash_entry);
01321       }
01322    }
01323   
01324    key[0] = pkt_num;
01325    key[1] = rdm_id;
01326    
01327    tcl_hash_entry = Tcl_CreateHashEntry(&htable_, (char *) key, &new_hash_key);
01328 
01329    if (new_hash_key == 0){
01330       DiffPrint(DEBUG_IMPORTANT, "Key already exists in hash !\n");
01331       return;
01332    }
01333 
01334    Tcl_SetHashValue(tcl_hash_entry, new_hash_entry);
01335    hash_list_.push_back(tcl_hash_entry);
01336 }
01337 
01338 handle GradientFilter::setupFilter()
01339 {
01340   NRAttrVec attrs;
01341   handle h;
01342 
01343   // For the gradient filter, we use a single attribute with an "IS"
01344   // operator. This causes this filter to match every single packet
01345   // getting to diffusion
01346   attrs.push_back(NRClassAttr.make(NRAttribute::IS,
01347                    NRAttribute::INTEREST_CLASS));
01348 
01349   h = ((DiffusionRouting *)dr_)->addFilter(&attrs,
01350                        GRADIENT_FILTER_PRIORITY, filter_callback_);
01351 
01352   ClearAttrs(&attrs);
01353   return h;
01354 }
01355 
01356 #ifndef NS_DIFFUSION
01357 void GradientFilter::run()
01358 {
01359   // Doesn't do anything
01360   while (1){
01361     sleep(1000);
01362   }
01363 }
01364 #endif // !NS_DIFFUSION
01365 
01366 #ifdef NS_DIFFUSION
01367 GradientFilter::GradientFilter(const char *diffrtg)
01368 {
01369   DiffAppAgent *agent;
01370 #else
01371 GradientFilter::GradientFilter(int argc, char **argv)
01372 {
01373 #endif // NS_DIFFUSION
01374   struct timeval tv;
01375   TimerCallback *reinforcement_timer, *gradient_timer;
01376 
01377   GetTime(&tv);
01378   SetSeed(&tv);
01379   pkt_count_ = GetRand();
01380   random_id_ = GetRand();
01381 
01382   // Create Diffusion Routing class
01383 #ifdef NS_DIFFUSION
01384   agent = (DiffAppAgent *)TclObject::lookup(diffrtg);
01385   dr_ = agent->dr();
01386 #else
01387   parseCommandLine(argc, argv);
01388   dr_ = NR::createNR(diffusion_port_);
01389 #endif // NS_DIFFUSION
01390 
01391   // Create callback classes and set up pointers
01392   filter_callback_ = new GradientFilterReceive(this);
01393 
01394   // Initialize Hashing structures
01395   Tcl_InitHashTable(&htable_, 2);
01396 
01397   // Set up the filter
01398   filter_handle_ = setupFilter();
01399 
01400   // Print filter information
01401   DiffPrint(DEBUG_IMPORTANT,
01402         "Gradient filter subscribed to *, received handle %d\n",
01403         filter_handle_);
01404 
01405   // Add timers for keeping state up-to-date
01406   gradient_timer = new TppGradientExpirationCheckTimer(this);
01407   ((DiffusionRouting *)dr_)->addTimer(GRADIENT_DELAY, gradient_timer);
01408 
01409   reinforcement_timer = new TppReinforcementCheckTimer(this);
01410   ((DiffusionRouting *)dr_)->addTimer(REINFORCEMENT_DELAY, reinforcement_timer);
01411 
01412   GetTime(&tv);
01413 
01414   DiffPrint(DEBUG_ALWAYS, "Gradient filter initialized at time %ld:%ld!\n",
01415         tv.tv_sec, tv.tv_usec);
01416 }
01417 
01418 #ifndef USE_SINGLE_ADDRESS_SPACE
01419 int main(int argc, char **argv)
01420 {
01421   GradientFilter *app;
01422 
01423   // Initialize and run the Gradient Filter
01424   app = new GradientFilter(argc, argv);
01425   app->run();
01426 
01427   return 0;
01428 }
01429 #endif // !USE_SINGLE_ADDRESS_SPACE

Generated on Tue Mar 6 16:47:53 2007 for ns2 Network Simulator 2.29 by  doxygen 1.4.6