dr.cc

Go to the documentation of this file.
00001 //
00002 // dr.cc           : Diffusion Routing Class
00003 // authors         : John Heidemann and Fabio Silva
00004 //
00005 // Copyright (C) 2000-2003 by the University of Southern California
00006 // $Id: dr.cc,v 1.17 2005/09/13 04:53:49 tomh Exp $
00007 //
00008 // This program is free software; you can redistribute it and/or
00009 // modify it under the terms of the GNU General Public License,
00010 // version 2, as published by the Free Software Foundation.
00011 //
00012 // This program is distributed in the hope that it will be useful,
00013 // but WITHOUT ANY WARRANTY; without even the implied warranty of
00014 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00015 // GNU General Public License for more details.
00016 //
00017 // You should have received a copy of the GNU General Public License along
00018 // with this program; if not, write to the Free Software Foundation, Inc.,
00019 // 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA.
00020 //
00021 // Linking this file statically or dynamically with other modules is making
00022 // a combined work based on this file.  Thus, the terms and conditions of
00023 // the GNU General Public License cover the whole combination.
00024 //
00025 // In addition, as a special exception, the copyright holders of this file
00026 // give you permission to combine this file with free software programs or
00027 // libraries that are released under the GNU LGPL and with code included in
00028 // the standard release of ns-2 under the Apache 2.0 license or under
00029 // otherwise-compatible licenses with advertising requirements (or modified
00030 // versions of such code, with unchanged license).  You may copy and
00031 // distribute such a system following the terms of the GNU GPL for this
00032 // file and the licenses of the other code concerned, provided that you
00033 // include the source code of that other code when and as the GNU GPL
00034 // requires distribution of source code.
00035 //
00036 // Note that people who make modified versions of this file are not
00037 // obligated to grant this special exception for their modified versions;
00038 // it is their choice whether to do so.  The GNU General Public License
00039 // gives permission to release a modified version without this exception;
00040 // this exception also makes it possible to release a modified version
00041 // which carries forward this exception.
00042 //
00043 
00044 #include <stdlib.h>
00045 #include <stdio.h>
00046 
00047 #include "dr.hh"
00048 
00049 class CallbackEntry {
00050 public:
00051   NR::Callback *cb_;
00052   NR::handle subscription_handle_;
00053 
00054   CallbackEntry(NR::Callback *cb, NR::handle subscription_handle) :
00055     cb_(cb), subscription_handle_(subscription_handle) {};
00056 };
00057 
00058 class HandleEntry {
00059 public:
00060   handle hdl_;
00061   bool valid_;
00062   NRAttrVec *attrs_;
00063   NR::Callback *cb_;
00064   struct timeval exploratory_time_;
00065   int32_t subscription_id_; // Used for One-Phase Pull
00066 
00067   HandleEntry()
00068   {
00069     GetTime(&exploratory_time_);
00070     valid_ = true;
00071     cb_ = NULL;
00072   };
00073 
00074   ~HandleEntry(){
00075 
00076     ClearAttrs(attrs_);
00077     delete attrs_;
00078   };
00079 };
00080 
00081 int InterestCallback::expire()
00082 {
00083   int retval;
00084 
00085   // Call the interestTimeout function
00086   retval = drt_->interestTimeout(handle_entry_);
00087 
00088   if (retval < 0)
00089     delete this;
00090 
00091   return retval;
00092 }
00093 
00094 int FilterKeepaliveCallback::expire()
00095 {
00096   int retval;
00097 
00098   // Call the filterTimeout function
00099   retval = drt_->filterKeepaliveTimeout(filter_entry_);
00100 
00101   if (retval < 0)
00102     delete this;
00103 
00104   return retval;
00105 }
00106 
00107 int OldAPITimer::expire()
00108 {
00109   int retval;
00110 
00111   // Call the callback function with the provided API
00112   retval = cb_->expire(0, p_);
00113 
00114   if (retval < 0)
00115     delete this;
00116 
00117   return retval;
00118 }
00119 
00120 #ifdef NS_DIFFUSION
00121 class DiffEventQueue;
00122 
00123 int DiffusionRouting::getNodeId() {
00124   return node_->address();
00125 }
00126 
00127 int DiffusionRouting::getAgentId(int id) {
00128   if (id != -1)
00129     agent_id_ = id;
00130   return agent_id_;
00131 }
00132 
00133 NR * NR::create_ns_NR(u_int16_t port, DiffAppAgent *da) {
00134   return(new DiffusionRouting(port, da));
00135 }
00136 #else
00137 NR *dr = NULL;
00138 
00139 #ifdef USE_THREADS
00140 void * ReceiveThread(void *dr)
00141 {
00142   // Never returns
00143   ((DiffusionRouting *)dr)->run(true, WAIT_FOREVER);
00144 
00145   return NULL;
00146 }
00147 #endif // USE_THREADS
00148 
00149 NR * NR::createNR(u_int16_t port)
00150 {
00151   // Create Diffusion Routing Class
00152   if (dr)
00153     return dr;
00154 
00155   dr = new DiffusionRouting(port);
00156 
00157 #ifdef USE_THREADS
00158   int retval;
00159   pthread_t thread;
00160 
00161   // Fork a thread for receiving Messages
00162   retval = pthread_create(&thread, NULL, &ReceiveThread, (void *)dr);
00163 
00164   if (retval){
00165     DiffPrint(DEBUG_ALWAYS, "Error creating receiving thread ! Aborting...\n");
00166     exit(-1);
00167   }
00168 #endif // USE_THREADS
00169 
00170   return dr;
00171 }
00172 #endif // NS_DIFFUSION
00173 
00174 void GetLock(pthread_mutex_t *mutex)
00175 {
00176 #ifdef USE_THREADS
00177   pthread_mutex_lock(mutex);
00178 #endif // USE_THREADS
00179 }
00180 
00181 void ReleaseLock(pthread_mutex_t *mutex)
00182 {
00183 #ifdef USE_THREADS
00184   pthread_mutex_unlock(mutex);
00185 #endif // USE_THREADS
00186 }
00187 
00188 #ifdef NS_DIFFUSION
00189 DiffusionRouting::DiffusionRouting(u_int16_t port, DiffAppAgent *da)
00190 {
00191 #else
00192 DiffusionRouting::DiffusionRouting(u_int16_t port)
00193 {
00194 #ifdef USE_EMSIM
00195   char *sim_id;
00196   char *sim_group;
00197 #endif // USE_EMSIM
00198 #endif // NS_DIFFUSION
00199 
00200   struct timeval tv;
00201   DiffusionIO *device;
00202 
00203   // Initialize basic stuff
00204   next_handle_ = 1;
00205   GetTime(&tv);
00206   SetSeed(&tv);
00207   pkt_count_ = GetRand();
00208   random_id_ = GetRand();
00209   agent_id_ = 0;
00210 
00211   if (port == 0)
00212     port = DEFAULT_DIFFUSION_PORT;
00213 
00214   diffusion_port_ = port;
00215 
00216 #ifdef USE_EMSIM
00217   // Check if we are running in the emstar simulator
00218   sim_id = getenv("SIM_ID");
00219   sim_group = getenv("SIM_GROUP");
00220 
00221   // Update diffusion port if running inside the simulator
00222   if (sim_id && sim_group){
00223     diffusion_port_ = diffusion_port_ + atoi(sim_id) + (100 * atoi(sim_group));
00224   }
00225 #endif // USE_EMSIM
00226 
00227   // Initialize timer manager
00228   timers_manager_ = new TimerManager;
00229 
00230   // Initialize input device
00231 #ifdef NS_DIFFUSION
00232   device = new NsLocal(da);
00233   local_out_devices_.push_back(device);
00234 #endif // NS_DIFFUSION
00235 
00236 #ifdef UDP
00237   device = new UDPLocal(&agent_id_);
00238   in_devices_.push_back(device);
00239   local_out_devices_.push_back(device);
00240 #endif // UDP
00241 
00242   // Print initialization message
00243   DiffPrint(DEBUG_ALWAYS,
00244         "Diffusion Routing Agent initializing... Agent Id = %d\n",
00245         agent_id_);
00246 
00247 #ifdef USE_THREADS
00248   // Initialize Semaphores
00249   dr_mtx_ = new pthread_mutex_t;
00250   pthread_mutex_init(dr_mtx_, NULL);
00251 #endif // USE_THREADS
00252 }
00253 
00254 DiffusionRouting::~DiffusionRouting()
00255 {
00256   HandleList::iterator itr;
00257   HandleEntry *current;
00258 
00259   // Delete all Handles
00260   for (itr = sub_list_.begin(); itr != sub_list_.end(); ++itr){
00261     current = *itr;
00262     delete current;
00263   }
00264 
00265   for (itr = pub_list_.begin(); itr != pub_list_.end(); ++itr){
00266     current = *itr;
00267     delete current;
00268   }
00269 }
00270 
00271 handle DiffusionRouting::subscribe(NRAttrVec *subscribe_attrs, NR::Callback *cb)
00272 {
00273   NRSimpleAttribute<int> *nr_algorithm = NULL;
00274   TimerCallback *timer_callback;
00275   NRAttribute *scope_attr;
00276   HandleEntry *my_handle;
00277 
00278   // Get lock first
00279   GetLock(dr_mtx_);
00280 
00281   // Check the published attributes
00282   if (!checkSubscription(subscribe_attrs)){
00283     DiffPrint(DEBUG_ALWAYS, "Error : Invalid class/scope attributes in the subscribe attributes !\n");
00284     ReleaseLock(dr_mtx_);
00285     return FAIL;
00286   }
00287 
00288   // Create and Initialize the handle_entry structute
00289   my_handle = new HandleEntry;
00290   my_handle->hdl_ = next_handle_;
00291   next_handle_++;
00292   my_handle->cb_ = (NR::Callback *) cb;
00293   sub_list_.push_back(my_handle);
00294 
00295   // Copy the attributes
00296   my_handle->attrs_ = CopyAttrs(subscribe_attrs);
00297 
00298   // For subscriptions, scope is global if not specified
00299   if (!hasScope(subscribe_attrs)){
00300     scope_attr = NRScopeAttr.make(NRAttribute::IS, NRAttribute::GLOBAL_SCOPE);
00301     my_handle->attrs_->push_back(scope_attr);
00302   }
00303 
00304   // For One-Phase Pull, we need a subscription id
00305   nr_algorithm = NRAlgorithmAttr.find(subscribe_attrs);
00306   if (nr_algorithm &&
00307       nr_algorithm->getVal() == NRAttribute::ONE_PHASE_PULL_ALGORITHM){
00308 
00309     my_handle->subscription_id_ = GetRand();
00310     my_handle->attrs_->push_back(NRSubscriptionAttr.make(NRAttribute::IS,
00311                              my_handle->subscription_id_));
00312   }
00313 
00314   // Create Interest Timer and add it to the queue
00315   timer_callback = new InterestCallback(this, my_handle);
00316   timers_manager_->addTimer(SMALL_TIMEOUT, timer_callback);
00317 
00318   // Release lock
00319   ReleaseLock(dr_mtx_);
00320 
00321   return my_handle->hdl_;
00322 }
00323 
00324 int DiffusionRouting::unsubscribe(handle subscription_handle)
00325 {
00326   HandleEntry *my_handle = NULL;
00327 
00328   // Get the lock first
00329   GetLock(dr_mtx_);
00330 
00331   my_handle = findHandle(subscription_handle, &sub_list_);
00332   if (!my_handle){
00333     // Handle doesn't exist, return FAIL
00334     ReleaseLock(dr_mtx_);
00335     return FAIL;
00336   }
00337 
00338   // Handle will be destroyed when next interest timeout happens
00339   my_handle->valid_ = false;
00340 
00341   // Release the lock
00342   ReleaseLock(dr_mtx_);
00343 
00344   return OK;
00345 }
00346 
00347 handle DiffusionRouting::publish(NRAttrVec *publish_attrs)
00348 {
00349   HandleEntry *my_handle;
00350   NRAttribute *scope_attr;
00351 
00352   // Get the lock first
00353   GetLock(dr_mtx_);
00354 
00355   // Check the published attributes
00356   if (!checkPublication(publish_attrs)){
00357     DiffPrint(DEBUG_ALWAYS, "Error : Invalid class/scope attributes in the publish attributes !\n");
00358     ReleaseLock(dr_mtx_);
00359     return FAIL;
00360   }
00361 
00362   // Create and Initialize the handle_entry structute
00363   my_handle = new HandleEntry;
00364   my_handle->hdl_ = next_handle_;
00365   next_handle_++;
00366   pub_list_.push_back(my_handle);
00367 
00368   // Copy the attributes
00369   my_handle->attrs_ = CopyAttrs(publish_attrs);
00370 
00371   // For publications, scope is local if not specified
00372   if (!hasScope(publish_attrs)){
00373     scope_attr = NRScopeAttr.make(NRAttribute::IS, NRAttribute::NODE_LOCAL_SCOPE);
00374     my_handle->attrs_->push_back(scope_attr);
00375   }
00376 
00377   // Release the lock
00378   ReleaseLock(dr_mtx_);
00379 
00380   return my_handle->hdl_;
00381 }
00382 
00383 int DiffusionRouting::unpublish(handle publication_handle)
00384 {
00385   HandleEntry *my_handle = NULL;
00386 
00387   // Get the lock first
00388   GetLock(dr_mtx_);
00389 
00390   my_handle = removeHandle(publication_handle, &pub_list_);
00391   if (!my_handle){
00392     // Handle doesn't exist, return FAIL
00393     ReleaseLock(dr_mtx_);
00394     return FAIL;
00395   }
00396 
00397   // Free structures
00398   delete my_handle;
00399 
00400   // Release the lock
00401   ReleaseLock(dr_mtx_);
00402 
00403   return OK;
00404 }
00405 
00406 int DiffusionRouting::send(handle publication_handle,
00407                NRAttrVec *send_attrs)
00408 {
00409   NRSimpleAttribute<int> *nr_algorithm = NULL;
00410   NRSimpleAttribute<int> *rmst_id_attr = NULL;
00411   int8_t send_message_type = DATA;
00412   struct timeval current_time;
00413   HandleEntry *my_handle;
00414   Message *my_message;
00415 
00416   // Get the lock first
00417   GetLock(dr_mtx_);
00418 
00419   // Get attributes associated with handle
00420   my_handle = findHandle(publication_handle, &pub_list_);
00421   if (!my_handle){
00422     ReleaseLock(dr_mtx_);
00423     return FAIL;
00424   }
00425 
00426   // Check the send attributes
00427   if (!checkSend(send_attrs)){
00428     DiffPrint(DEBUG_ALWAYS,
00429           "Error : Invalid class/scope attributes in send attributes !\n");
00430     ReleaseLock(dr_mtx_);
00431     return FAIL;
00432   }
00433 
00434   // Check if it is time to send another exploratory data message
00435   GetTime(&current_time);
00436 
00437   // Check algorithms
00438   nr_algorithm = NRAlgorithmAttr.find(my_handle->attrs_);
00439   rmst_id_attr = RmstIdAttr.find(send_attrs);
00440 
00441   if (!nr_algorithm && !rmst_id_attr || nr_algorithm &&
00442       nr_algorithm->getVal() != NRAttribute::ONE_PHASE_PULL_ALGORITHM){
00443 
00444     // In One-Phase Pull, there are no exploratory messages
00445     if (TimevalCmp(&current_time, &(my_handle->exploratory_time_)) >= 0){
00446 
00447       // Check if it is a push data message or a regular data message
00448       if (isPushData(my_handle->attrs_)){
00449     // Push data message
00450 
00451     // Update time for the next push exploratory message
00452     GetTime(&(my_handle->exploratory_time_));
00453     my_handle->exploratory_time_.tv_sec += PUSH_EXPLORATORY_DELAY;
00454 
00455     send_message_type = PUSH_EXPLORATORY_DATA;
00456       }
00457       else{
00458     // Regular data message
00459 
00460     // Update time for the next exploratory message
00461     GetTime(&(my_handle->exploratory_time_));
00462     my_handle->exploratory_time_.tv_sec += EXPLORATORY_DATA_DELAY;
00463     
00464     send_message_type = EXPLORATORY_DATA;
00465       }
00466     }
00467   }
00468 
00469   // Initialize message structure
00470   my_message = new Message(DIFFUSION_VERSION, send_message_type, agent_id_,
00471                0, 0, pkt_count_, random_id_, LOCALHOST_ADDR,
00472                LOCALHOST_ADDR);
00473   // Increment pkt_counter
00474   pkt_count_++;
00475 
00476   // First, we duplicate the 'publish' attributes
00477   my_message->msg_attr_vec_ = CopyAttrs(my_handle->attrs_);
00478 
00479   // Now, we add the send attributes
00480   AddAttrs(my_message->msg_attr_vec_, send_attrs);
00481 
00482   // Compute the total number and size of the joined attribute sets
00483   my_message->num_attr_ = my_message->msg_attr_vec_->size();
00484   my_message->data_len_ = CalculateSize(my_message->msg_attr_vec_);
00485 
00486   // Release the lock
00487   ReleaseLock(dr_mtx_);
00488 
00489   // Send Packet
00490   sendMessageToDiffusion(my_message);
00491 
00492   delete my_message;
00493 
00494   return OK;
00495 }
00496 
00497 int DiffusionRouting::sendRmst(handle publication_handle,
00498                    NRAttrVec *send_attrs, int fragment_size)
00499 {
00500   NRSimpleAttribute<void *> *rmst_data_attr;
00501   NRSimpleAttribute<int> *frag_number_attr;
00502   NRSimpleAttribute<int> *max_frag_attr;
00503   void *frag_ptr, *blob_ptr;
00504   char *blob;
00505   timeval send_interval;
00506   int retval;
00507   int id = GetRand() % 500;
00508   int size;
00509   int num_frag;
00510   int max_frag_len;
00511 
00512   // Find RMST blob to send
00513   rmst_data_attr = RmstDataAttr.find(send_attrs);
00514 
00515   // We must have a RMST data attribute to send
00516   if(!rmst_data_attr){
00517     DiffPrint(DEBUG_ALWAYS, "sendRMST - can't find blob to send !\n");
00518     return FAIL;
00519   }
00520 
00521   // Copy RMST blob and calculate number of fragments
00522   blob_ptr = rmst_data_attr->getVal();
00523   size = rmst_data_attr->getLen();
00524   blob = new char[size];
00525   memcpy((void *)blob, blob_ptr, size);
00526   num_frag = (size + fragment_size - 1) / fragment_size;
00527 
00528   // We index starting at zero
00529   num_frag--;
00530   max_frag_len = size - (num_frag * fragment_size);
00531   DiffPrint(DEBUG_DETAILS,
00532         "sendRMST: rmst num_frag = %d, fragment_size = %d, max_frag_len = %d\n",
00533         num_frag, fragment_size, max_frag_len);
00534 
00535   // Prepare attribute vector with RMST attributes
00536   max_frag_attr = RmstMaxFragAttr.make(NRAttribute::IS, num_frag);
00537   send_attrs->push_back(max_frag_attr);
00538   send_attrs->push_back(RmstTsprtCtlAttr.make(NRAttribute::IS, RMST_RESP));
00539   frag_number_attr = RmstFragAttr.make(NRAttribute::IS, 0);
00540   send_attrs->push_back(frag_number_attr);
00541   send_attrs->push_back(RmstIdAttr.make(NRAttribute::IS, id));
00542 
00543   // Replace the large blob with a blob fragment
00544   frag_ptr = (void *)&blob[0];
00545 
00546   // The call to setVal will delete the original blob!!
00547   if (num_frag == 0)
00548     rmst_data_attr->setVal(frag_ptr, max_frag_len);
00549   else
00550     rmst_data_attr->setVal(frag_ptr, fragment_size);
00551 
00552   // Send 1st fragment
00553   retval = send(publication_handle, send_attrs);
00554 
00555   // Send other fragments
00556   for (int i = 1; i <= num_frag; i++){
00557 
00558     // Small delay between sending fragments
00559     send_interval.tv_sec = 0;
00560     send_interval.tv_usec = 25000;
00561     select(0, NULL, NULL, NULL, &send_interval);
00562 
00563     // Send next fragment
00564     frag_number_attr->setVal(i);
00565     frag_ptr = (void *)&blob[i * fragment_size];
00566     if (num_frag == i)
00567       rmst_data_attr->setVal(frag_ptr, max_frag_len);
00568     else
00569       rmst_data_attr->setVal(frag_ptr, fragment_size);
00570     retval = send(publication_handle, send_attrs);
00571   }
00572 
00573   ClearAttrs(send_attrs);
00574   delete blob;
00575 
00576   return OK;
00577 }
00578 
00579 int DiffusionRouting::addToBlacklist(int32_t node)
00580 {
00581   ControlMessage *control_blob;
00582   NRAttribute *ctrl_msg_attr;
00583   Message *my_message;
00584   NRAttrVec *attrs;
00585 
00586   control_blob = new ControlMessage(ADD_TO_BLACKLIST, node, 0);
00587 
00588   ctrl_msg_attr = ControlMsgAttr.make(NRAttribute::IS,
00589                       (void *)control_blob,
00590                       sizeof(ControlMessage));
00591   attrs = new NRAttrVec;
00592   attrs->push_back(ctrl_msg_attr);
00593 
00594   my_message = new Message(DIFFUSION_VERSION, CONTROL, agent_id_, 0,
00595                0, pkt_count_, random_id_, LOCALHOST_ADDR,
00596                LOCALHOST_ADDR);
00597 
00598   // Increment pkt_counter
00599   pkt_count_++;
00600 
00601   // Add attributes to the message
00602   my_message->msg_attr_vec_ = attrs;
00603   my_message->num_attr_ = attrs->size();
00604   my_message->data_len_ = CalculateSize(attrs);
00605 
00606   // Send Packet
00607   sendMessageToDiffusion(my_message);
00608 
00609   // Delete message
00610   delete my_message;
00611   delete control_blob;
00612 
00613   return OK;
00614 }
00615 
00616 int DiffusionRouting::clearBlacklist()
00617 {
00618   ControlMessage *control_blob;
00619   NRAttribute *ctrl_msg_attr;
00620   Message *my_message;
00621   NRAttrVec *attrs;
00622   
00623   control_blob = new ControlMessage(CLEAR_BLACKLIST, 0, 0);
00624 
00625   ctrl_msg_attr = ControlMsgAttr.make(NRAttribute::IS,
00626                       (void *)control_blob,
00627                       sizeof(ControlMessage));
00628   attrs = new NRAttrVec;
00629   attrs->push_back(ctrl_msg_attr);
00630 
00631   my_message = new Message(DIFFUSION_VERSION, CONTROL, agent_id_, 0,
00632                0, pkt_count_, random_id_, LOCALHOST_ADDR,
00633                LOCALHOST_ADDR);
00634 
00635   // Increment pkt_counter
00636   pkt_count_++;
00637 
00638   // Add attributes to the message
00639   my_message->msg_attr_vec_ = attrs;
00640   my_message->num_attr_ = attrs->size();
00641   my_message->data_len_ = CalculateSize(attrs);
00642 
00643   // Send Packet
00644   sendMessageToDiffusion(my_message);
00645 
00646   // Delete message
00647   delete my_message;
00648   delete control_blob;
00649   
00650   return OK;
00651 }
00652 
00653 handle DiffusionRouting::addFilter(NRAttrVec *filter_attrs, u_int16_t priority,
00654                    FilterCallback *cb)
00655 {
00656   FilterEntry *filter_entry;
00657   NRAttrVec *attrs;
00658   NRAttribute *ctrl_msg_attr;
00659   ControlMessage *control_blob;
00660   Message *my_message;
00661   TimerCallback *timer_callback;
00662 
00663   // Check parameters
00664   if (!filter_attrs || !cb || priority < FILTER_MIN_PRIORITY || priority > FILTER_MAX_PRIORITY){
00665     DiffPrint(DEBUG_ALWAYS, "Received invalid parameters when adding filter !\n");
00666     return FAIL;
00667   }
00668 
00669   // Get lock first
00670   GetLock(dr_mtx_);
00671 
00672   // Create and Initialize the handle_entry structute
00673   filter_entry = new FilterEntry(next_handle_, priority, agent_id_);
00674   next_handle_++;
00675   filter_entry->cb_ = (FilterCallback *) cb;
00676   filter_list_.push_back(filter_entry);
00677 
00678   // Copy attributes (keep them for matching later)
00679   filter_entry->filter_attrs_ = CopyAttrs(filter_attrs);
00680 
00681   // Copy the attributes (and add the control attr)
00682   attrs = CopyAttrs(filter_attrs);
00683   control_blob = new ControlMessage(ADD_UPDATE_FILTER,
00684                     priority, filter_entry->handle_);
00685 
00686   ctrl_msg_attr = ControlMsgAttr.make(NRAttribute::IS,
00687                       (void *)control_blob,
00688                       sizeof(ControlMessage));
00689 
00690   attrs->push_back(ctrl_msg_attr);
00691 
00692   // Initialize message structure
00693   my_message = new Message(DIFFUSION_VERSION, CONTROL, agent_id_, 0,
00694                0, pkt_count_, random_id_, LOCALHOST_ADDR,
00695                LOCALHOST_ADDR);
00696 
00697   // Increment pkt_counter
00698   pkt_count_++;
00699 
00700   // Add attributes to the message
00701   my_message->msg_attr_vec_ = attrs;
00702   my_message->num_attr_ = attrs->size();
00703   my_message->data_len_ = CalculateSize(attrs);
00704 
00705   // Release the lock
00706   ReleaseLock(dr_mtx_);
00707 
00708   // Send Packet
00709   sendMessageToDiffusion(my_message);
00710 
00711   // Add keepalive timer to the event queue
00712   timer_callback = new FilterKeepaliveCallback(this, filter_entry);
00713   timers_manager_->addTimer(FILTER_KEEPALIVE_DELAY, timer_callback);
00714 
00715   // Delete message, attribute set and controlblob
00716   delete my_message;
00717   delete control_blob;
00718 
00719   return filter_entry->handle_;
00720 }
00721 
00722 int DiffusionRouting::removeFilter(handle filter_handle)
00723 {
00724   FilterEntry *filter_entry = NULL;
00725   ControlMessage *control_blob;
00726   NRAttribute *ctrl_msg_attr;
00727   NRAttrVec *attrs;
00728   Message *my_message;
00729 
00730   // Get lock first
00731   GetLock(dr_mtx_);
00732 
00733   filter_entry = findFilter(filter_handle);
00734   if (!filter_entry){
00735     // Handle doesn't exist, return FAIL
00736     ReleaseLock(dr_mtx_);
00737     return FAIL;
00738   }
00739 
00740   control_blob = new ControlMessage(REMOVE_FILTER, filter_entry->handle_, 0);
00741 
00742   ctrl_msg_attr = ControlMsgAttr.make(NRAttribute::IS,
00743                       (void *)control_blob,
00744                       sizeof(ControlMessage));
00745 
00746   attrs = new NRAttrVec;
00747   attrs->push_back(ctrl_msg_attr);
00748 
00749   my_message = new Message(DIFFUSION_VERSION, CONTROL, agent_id_, 0,
00750                0, pkt_count_, random_id_, LOCALHOST_ADDR,
00751                LOCALHOST_ADDR);
00752 
00753   // Increment pkt_counter
00754   pkt_count_++;
00755 
00756   // Add attributes to the message
00757   my_message->msg_attr_vec_ = attrs;
00758   my_message->num_attr_ = attrs->size();
00759   my_message->data_len_ = CalculateSize(attrs);
00760 
00761   // Handle will be destroyed when next keepalive timer happens
00762   filter_entry->valid_ = false;
00763 
00764   // Send Packet
00765   sendMessageToDiffusion(my_message);
00766 
00767   // Release the lock
00768   ReleaseLock(dr_mtx_);
00769 
00770   // Delete message
00771   delete my_message;
00772   delete control_blob;
00773 
00774   return OK;
00775 }
00776 
00777 handle DiffusionRouting::addTimer(int timeout, TimerCallback *callback)
00778 {
00779   return (timers_manager_->addTimer(timeout, callback));
00780 }
00781 
00782 handle DiffusionRouting::addTimer(int timeout, void *p, TimerCallbacks *cb)
00783 {
00784   TimerCallback *callback;
00785 
00786   callback = new OldAPITimer(cb, p);
00787 
00788   return (addTimer(timeout, callback));
00789 }
00790 
00791 bool DiffusionRouting::removeTimer(handle hdl)
00792 {
00793   return (timers_manager_->removeTimer(hdl));
00794 }
00795 
00796 int DiffusionRouting::filterKeepaliveTimeout(FilterEntry *filter_entry)
00797 {
00798   FilterEntry *my_entry = NULL;
00799   ControlMessage *control_blob;
00800   NRAttribute *ctrl_msg_attr;
00801   NRAttrVec *attrs;
00802   Message *my_message;
00803 
00804   // Acquire lock first
00805   GetLock(dr_mtx_);
00806 
00807   if (filter_entry->valid_){
00808     // Send keepalive
00809     control_blob = new ControlMessage(ADD_UPDATE_FILTER,
00810                       filter_entry->priority_,
00811                       filter_entry->handle_);
00812 
00813     ctrl_msg_attr = ControlMsgAttr.make(NRAttribute::IS,
00814                     (void *)control_blob,
00815                     sizeof(ControlMessage));
00816 
00817     attrs = CopyAttrs(filter_entry->filter_attrs_);
00818     attrs->push_back(ctrl_msg_attr);
00819 
00820     my_message = new Message(DIFFUSION_VERSION, CONTROL, agent_id_, 0,
00821                  0, pkt_count_, random_id_, LOCALHOST_ADDR,
00822                  LOCALHOST_ADDR);
00823 
00824     // Increment pkt_counter
00825     pkt_count_++;
00826 
00827     // Add attributes to the message
00828     my_message->msg_attr_vec_ = attrs;
00829     my_message->num_attr_ = attrs->size();
00830     my_message->data_len_ = CalculateSize(attrs);
00831 
00832     // Send Message
00833     sendMessageToDiffusion(my_message);
00834 
00835     delete my_message;
00836     delete control_blob;
00837 
00838     // Release lock
00839     ReleaseLock(dr_mtx_);
00840 
00841     // Reschedule another filter keepalive timer in event queue
00842     return (FILTER_KEEPALIVE_DELAY);
00843   }
00844   else{
00845     // Filter was removed
00846     my_entry = deleteFilter(filter_entry->handle_);
00847 
00848     // We should have removed the correct handle
00849     if (my_entry != filter_entry){
00850       DiffPrint(DEBUG_ALWAYS, "DiffusionRouting::KeepaliveTimeout: Handles should match !\n");
00851       exit(-1);
00852     }
00853 
00854     delete my_entry;
00855 
00856     // Release lock
00857     ReleaseLock(dr_mtx_);
00858 
00859     return -1;
00860   }
00861 }
00862 
00863 int DiffusionRouting::interestTimeout(HandleEntry *handle_entry)
00864 {
00865   HandleEntry *my_handle = NULL;
00866   Message *my_message;
00867 
00868   // Acquire lock first
00869   GetLock(dr_mtx_);
00870 
00871   if (handle_entry->valid_){
00872     // Send the interest message if entry is still valid
00873     my_message = new Message(DIFFUSION_VERSION, INTEREST, agent_id_, 0,
00874                  0, pkt_count_, random_id_, LOCALHOST_ADDR,
00875                  LOCALHOST_ADDR);
00876 
00877     // Increment pkt_counter
00878     pkt_count_++;
00879 
00880     // Add attributes to the message
00881     my_message->msg_attr_vec_ = CopyAttrs(handle_entry->attrs_);
00882     my_message->num_attr_ = handle_entry->attrs_->size();
00883     my_message->data_len_ = CalculateSize(handle_entry->attrs_);
00884 
00885     // Send Packet
00886     sendMessageToDiffusion(my_message);
00887 
00888     delete my_message;
00889 
00890     // Release lock
00891     ReleaseLock(dr_mtx_);
00892 
00893     // Reschedule this timer in the queue
00894     return (int) (floor(-1 * (log(1 - (GetRand() * 1.0 / RAND_MAX))) /
00895             INTEREST_LAMBDA));
00896   }
00897   else{
00898     // Interest was canceled. Just delete it from the handle_list
00899     my_handle = removeHandle(handle_entry->hdl_, &sub_list_);
00900 
00901     // We should have removed the correct handle
00902     if (my_handle != handle_entry){
00903       DiffPrint(DEBUG_ALWAYS,
00904         "Error: interestTimeout: Handles should match !\n");
00905       exit(-1);
00906     }
00907 
00908     delete my_handle;
00909 
00910     // Release lock
00911     ReleaseLock(dr_mtx_);
00912 
00913     // Delete timer from the queue
00914     return -1;
00915   }
00916 }
00917 
00918 int DiffusionRouting::sendMessage(Message *msg, handle h,
00919                   u_int16_t priority)
00920 {
00921   RedirectMessage *original_hdr;
00922   NRAttribute *original_attr, *ctrl_msg_attr;
00923   ControlMessage *control_blob;
00924   NRAttrVec *attrs;
00925   Message *my_message;
00926 
00927   if ((priority < FILTER_MIN_PRIORITY) ||
00928       (priority > FILTER_KEEP_PRIORITY))
00929     return FAIL;
00930 
00931   // Create an attribute with the original header
00932   original_hdr = new RedirectMessage(msg->new_message_, msg->msg_type_,
00933                      msg->source_port_, msg->data_len_,
00934                      msg->num_attr_, msg->rdm_id_,
00935                      msg->pkt_num_, msg->next_hop_,
00936                      msg->last_hop_, 0,
00937                      msg->next_port_);
00938 
00939   original_attr = OriginalHdrAttr.make(NRAttribute::IS, (void *)original_hdr,
00940                        sizeof(RedirectMessage));
00941 
00942   // Create the attribute with the control message
00943   control_blob = new ControlMessage(SEND_MESSAGE, h, priority);
00944 
00945   ctrl_msg_attr = ControlMsgAttr.make(NRAttribute::IS, (void *)control_blob,
00946                       sizeof(ControlMessage));
00947 
00948   // Copy Attributes and add originalAttr and controlAttr
00949   attrs = CopyAttrs(msg->msg_attr_vec_);
00950   attrs->push_back(original_attr);
00951   attrs->push_back(ctrl_msg_attr);
00952 
00953   my_message = new Message(DIFFUSION_VERSION, CONTROL, agent_id_, 0,
00954                0, pkt_count_, random_id_, LOCALHOST_ADDR,
00955                LOCALHOST_ADDR);
00956 
00957   // Increment pkt_counter
00958   pkt_count_++;
00959 
00960   // Add attributes to the message
00961   my_message->msg_attr_vec_ = attrs;
00962   my_message->num_attr_ = attrs->size();
00963   my_message->data_len_ = CalculateSize(attrs);
00964 
00965   // Send Packet
00966   sendMessageToDiffusion(my_message);
00967 
00968   delete my_message;
00969   delete control_blob;
00970   delete original_hdr;
00971 
00972   return OK;
00973 }
00974 
00975 #ifndef NS_DIFFUSION
00976 void DiffusionRouting::doIt()
00977 {
00978   run(true, WAIT_FOREVER);
00979 }
00980 
00981 void DiffusionRouting::doOne(long timeout)
00982 {
00983   run(false, timeout);
00984 }
00985 
00986 void DiffusionRouting::run(bool wait_condition, long max_timeout)
00987 {
00988   DeviceList::iterator itr;
00989   int status, max_sock, fd;
00990   bool flag;
00991   DiffPacket in_pkt;
00992   fd_set fds;
00993   struct timeval tv;
00994   struct timeval max_tv;
00995 
00996   do{
00997     FD_ZERO(&fds);
00998     max_sock = 0;
00999 
01000     // Set the maximum timeout value
01001     max_tv.tv_sec = (int) (max_timeout / 1000);
01002     max_tv.tv_usec = (int) ((max_timeout % 1000) * 1000);
01003 
01004     for (itr = in_devices_.begin(); itr != in_devices_.end(); ++itr){
01005       (*itr)->addInFDS(&fds, &max_sock);
01006     }
01007 
01008     // Check for the next timer
01009     timers_manager_->nextTimerTime(&tv);
01010 
01011     if (tv.tv_sec == MAXVALUE){
01012       // If we don't have any timers, we wait for POLLING_INTERVAL
01013       if (max_timeout == WAIT_FOREVER){
01014     tv.tv_sec = POLLING_INTERVAL;
01015     tv.tv_usec = 0;
01016       }
01017       else{
01018     tv = max_tv;
01019       }
01020     }
01021     else{
01022       if ((max_timeout != WAIT_FOREVER) && (TimevalCmp(&tv, &max_tv) > 0)){
01023     // max_timeout value is smaller than next timer's time, so we
01024     // use themax_timeout value instead
01025     tv = max_tv;
01026       }
01027     }
01028 
01029     status = select(max_sock+1, &fds, NULL, NULL, &tv);
01030 
01031     if (status == 0){
01032       // Process all timers that have expired
01033       timers_manager_->executeAllExpiredTimers();
01034     }
01035 
01036     if (status > 0){
01037       do{
01038     flag = false;
01039     for (itr = in_devices_.begin(); itr != in_devices_.end(); ++itr){
01040       fd = (*itr)->checkInFDS(&fds);
01041       if (fd != -1){
01042         // Message waiting
01043         in_pkt = (*itr)->recvPacket(fd);
01044         recvPacket(in_pkt);
01045 
01046         // Clear this fd
01047         FD_CLR(fd, &fds);
01048         status--;
01049         flag = true;
01050       }
01051     }
01052       } while ((status > 0) && (flag == true));
01053     }
01054     else
01055       if (status < 0){
01056     DiffPrint(DEBUG_IMPORTANT, "Select returned %d\n", status);
01057       }
01058   } while (wait_condition);
01059 }
01060 
01061 #endif // NS_DIFFUSION
01062 
01063 #ifndef NS_DIFFUSION
01064 void DiffusionRouting::sendMessageToDiffusion(Message *msg)
01065 {
01066   DiffPacket out_pkt = NULL;
01067   struct hdr_diff *dfh;
01068   char *pos;
01069   int len;
01070 
01071   out_pkt = AllocateBuffer(msg->msg_attr_vec_);
01072   dfh = HDR_DIFF(out_pkt);
01073 
01074   pos = (char *) out_pkt;
01075   pos = pos + sizeof(struct hdr_diff);
01076 
01077   len = PackAttrs(msg->msg_attr_vec_, pos);
01078 
01079   LAST_HOP(dfh) = htonl(msg->last_hop_);
01080   NEXT_HOP(dfh) = htonl(msg->next_hop_);
01081   DIFF_VER(dfh) = msg->version_;
01082   MSG_TYPE(dfh) = msg->msg_type_;
01083   DATA_LEN(dfh) = htons(len);
01084   PKT_NUM(dfh) = htonl(msg->pkt_num_);
01085   RDM_ID(dfh) = htonl(msg->rdm_id_);
01086   NUM_ATTR(dfh) = htons(msg->num_attr_);
01087   SRC_PORT(dfh) = htons(msg->source_port_);
01088 
01089   sendPacketToDiffusion(out_pkt, sizeof(struct hdr_diff) + len, diffusion_port_);
01090 
01091   delete [] out_pkt;
01092 }
01093 #else
01094 void DiffusionRouting::sendMessageToDiffusion(Message *msg)
01095 {
01096   Message *my_msg;
01097   DeviceList::iterator itr;
01098   int len;
01099 
01100   my_msg = CopyMessage(msg);
01101   len = CalculateSize(my_msg->msg_attr_vec_);
01102   len = len + sizeof(struct hdr_diff);
01103 
01104   for (itr = local_out_devices_.begin(); itr != local_out_devices_.end(); ++itr){
01105     (*itr)->sendPacket((DiffPacket) my_msg, len, diffusion_port_);
01106   }
01107 }
01108 #endif // !NS_DIFFUSION
01109 
01110 void DiffusionRouting::sendPacketToDiffusion(DiffPacket pkt, int len, int dst)
01111 {
01112   DeviceList::iterator itr;
01113 
01114   for (itr = local_out_devices_.begin(); itr != local_out_devices_.end(); ++itr){
01115     (*itr)->sendPacket(pkt, len, dst);
01116   }
01117 }
01118 
01119 #ifndef NS_DIFFUSION
01120 void DiffusionRouting::recvPacket(DiffPacket pkt)
01121 {
01122   struct hdr_diff *dfh = HDR_DIFF(pkt);
01123   Message *rcv_message = NULL;
01124   int8_t version, msg_type;
01125   u_int16_t data_len, num_attr, source_port;
01126   int32_t pkt_num, rdm_id, next_hop, last_hop;
01127 
01128   // Read header
01129   version = DIFF_VER(dfh);
01130   msg_type = MSG_TYPE(dfh);
01131   source_port = ntohs(SRC_PORT(dfh));
01132   pkt_num = ntohl(PKT_NUM(dfh));
01133   rdm_id = ntohl(RDM_ID(dfh));
01134   num_attr = ntohs(NUM_ATTR(dfh));
01135   next_hop = ntohl(NEXT_HOP(dfh));
01136   last_hop = ntohl(LAST_HOP(dfh));
01137   data_len = ntohs(DATA_LEN(dfh));
01138 
01139   // Create a message structure from the incoming packet
01140   rcv_message = new Message(version, msg_type, source_port, data_len,
01141                 num_attr, pkt_num, rdm_id, next_hop, last_hop);
01142 
01143   // Read all attributes into the Message structure
01144   rcv_message->msg_attr_vec_ = UnpackAttrs(pkt, num_attr);
01145 
01146   // Process the incoming message
01147   recvMessage(rcv_message);
01148 
01149   // We are done
01150   delete rcv_message;
01151   delete [] pkt;
01152 }
01153 #endif // !NS_DIFFUSION
01154 
01155 void DiffusionRouting::recvMessage(Message *msg)
01156 {
01157   // Check version
01158   if (msg->version_ != DIFFUSION_VERSION)
01159     return;
01160 
01161   // Check destination
01162   if (msg->next_hop_ != LOCALHOST_ADDR)
01163     return;
01164 
01165   // Process the incoming message
01166   if (msg->msg_type_ == REDIRECT)
01167     processControlMessage(msg);
01168   else
01169     processMessage(msg);
01170 }
01171 
01172 void DiffusionRouting::processControlMessage(Message *msg)
01173 {
01174   NRSimpleAttribute<void *> *original_header_attr = NULL;
01175   NRAttrVec::iterator place = msg->msg_attr_vec_->begin();
01176   RedirectMessage *original_header;
01177   FilterEntry *entry;
01178   handle my_handle;
01179 
01180   // Find the attribute containing the original packet header
01181   original_header_attr = OriginalHdrAttr.find_from(msg->msg_attr_vec_,
01182                            place, &place);
01183   if (!original_header_attr){
01184     DiffPrint(DEBUG_ALWAYS, "Error: Received an invalid REDIRECT message !\n");
01185     return;
01186   }
01187 
01188   // Restore original message header
01189   original_header = (RedirectMessage *) original_header_attr->getVal();
01190   my_handle = original_header->handle_;
01191   msg->msg_type_ = original_header->msg_type_;
01192   msg->source_port_ = original_header->source_port_;
01193   msg->pkt_num_ = original_header->pkt_num_;
01194   msg->rdm_id_ = original_header->rdm_id_;
01195   msg->next_hop_ = original_header->next_hop_;
01196   msg->last_hop_ = original_header->last_hop_;
01197   msg->num_attr_ = original_header->num_attr_;
01198   msg->new_message_ = original_header->new_message_;
01199   msg->next_port_ = original_header->next_port_;
01200 
01201   // Delete attribute from the original set
01202   msg->msg_attr_vec_->erase(place);
01203   delete original_header_attr;
01204 
01205   // Find the right callback
01206   GetLock(dr_mtx_);
01207 
01208   entry = findFilter(my_handle);
01209   if (entry && entry->valid_){
01210     // Just to confirm
01211     if (OneWayMatch(entry->filter_attrs_, msg->msg_attr_vec_)){
01212       ReleaseLock(dr_mtx_);
01213       entry->cb_->recv(msg, my_handle);
01214       return;
01215     }
01216     else{
01217       DiffPrint(DEBUG_ALWAYS,
01218         "Warning: Filter doesn't match incoming message's attributes !\n");
01219     }
01220   }
01221   else{
01222     DiffPrint(DEBUG_IMPORTANT,
01223           "Report: Cannot find filter (possibly deleted ?)\n");
01224   }
01225 
01226   ReleaseLock(dr_mtx_);
01227 }
01228 
01229 void DiffusionRouting::processMessage(Message *msg)
01230 {
01231   NRSimpleAttribute<int> *rmst_id_attr = NULL;
01232   CallbackList::iterator cbl_itr;
01233   HandleList::iterator sub_itr;
01234   NRAttrVec *callback_attrs;
01235   HandleEntry *entry; 
01236   CallbackEntry *aux;
01237   CallbackList cbl;
01238 
01239   // First, acquire the lock
01240   GetLock(dr_mtx_);
01241 
01242   for (sub_itr = sub_list_.begin(); sub_itr != sub_list_.end(); ++sub_itr){
01243     entry = *sub_itr;
01244     if ((entry->valid_) && (MatchAttrs(msg->msg_attr_vec_, entry->attrs_)))
01245       if (entry->cb_){
01246     aux = new CallbackEntry(entry->cb_, entry->hdl_);
01247     cbl.push_back(aux);
01248       }
01249   }
01250 
01251   // We can release the lock now
01252   ReleaseLock(dr_mtx_);
01253 
01254   // Check for RMST id attribute
01255   rmst_id_attr = RmstIdAttr.find(msg->msg_attr_vec_);
01256   cbl_itr = cbl.begin();
01257 
01258   // Process RMST fragment if we have callbacks and this message has an RmstId
01259   if (rmst_id_attr && (cbl_itr != cbl.end())){
01260     if (!processRmst(msg)){
01261       cbl.clear();
01262       return;
01263     }
01264   }
01265 
01266   // Now we just call all callback functions
01267   for (cbl_itr = cbl.begin(); cbl_itr != cbl.end(); ++cbl_itr){
01268     // Copy attributes
01269     callback_attrs = CopyAttrs(msg->msg_attr_vec_);
01270 
01271     // Call app-specific callback function
01272     aux = *cbl_itr;
01273     aux->cb_->recv(callback_attrs, aux->subscription_handle_);
01274     delete aux;
01275 
01276     // Clean up callback attributes
01277     ClearAttrs(callback_attrs);
01278     delete callback_attrs;
01279   }
01280 
01281   // We are done
01282   cbl.clear();
01283 }
01284 
01285 bool DiffusionRouting::processRmst(Message *msg)
01286 {
01287   NRSimpleAttribute<void *> *data_buf_attr = NULL;
01288   NRSimpleAttribute<int> *max_frag_attr = NULL;
01289   NRSimpleAttribute<int> *rmst_id_attr = NULL;
01290   NRSimpleAttribute<int> *frag_attr = NULL;
01291   int rmst_no, frag_no, data_buf_len, count;
01292   void *blob_ptr, *tmp_frag_ptr;
01293   Int2RecRmst::iterator rmst_iterator;
01294   Int2Frag::iterator frag_iterator;
01295   char *dstPtr;
01296   int dstSize;
01297   RecRmst *rmst_ptr;
01298 
01299   // Read Rmst attributes
01300   data_buf_attr = RmstDataAttr.find(msg->msg_attr_vec_);
01301   rmst_id_attr = RmstIdAttr.find(msg->msg_attr_vec_);
01302   frag_attr = RmstFragAttr.find(msg->msg_attr_vec_);
01303   rmst_no = rmst_id_attr->getVal();
01304   frag_no = frag_attr->getVal();
01305   blob_ptr = data_buf_attr->getVal();
01306   data_buf_len = data_buf_attr->getLen();
01307 
01308   // See if we are receiving this blob, if not start a new RecRmst
01309   rmst_iterator = rec_rmst_map_.find(rmst_no);
01310   if (rmst_iterator == rec_rmst_map_.end()){
01311     rmst_ptr = new RecRmst(rmst_no);
01312     rec_rmst_map_.insert(Int2RecRmst::value_type(rmst_no, rmst_ptr));
01313   }
01314   else
01315     rmst_ptr = (*rmst_iterator).second;
01316 
01317   if (frag_no == 0){
01318     max_frag_attr = RmstMaxFragAttr.find(msg->msg_attr_vec_);
01319     rmst_ptr->max_frag_ = max_frag_attr->getVal();
01320     rmst_ptr->mtu_len_ = data_buf_len;
01321   }
01322 
01323   // Copy fragment to map
01324   tmp_frag_ptr = new char[data_buf_len];
01325   memcpy(tmp_frag_ptr, blob_ptr, data_buf_len);
01326   rmst_ptr->frag_map_.insert(Int2Frag::value_type(frag_no, tmp_frag_ptr));
01327 
01328   if (frag_no == rmst_ptr->max_frag_)
01329     rmst_ptr->max_frag_len_ = data_buf_len;
01330 
01331   count = rmst_ptr->frag_map_.size();
01332 
01333   // If this is the last rmst fragment, create the entire rmst
01334   if (count == (rmst_ptr->max_frag_ + 1)){
01335     
01336     DiffPrint(DEBUG_DETAILS, 
01337           "RMST #%d is complete, creating big blob !\n", rmst_no);
01338 
01339     // Allocate memory for the big blob
01340     dstSize = rmst_ptr->max_frag_ * rmst_ptr->mtu_len_ + rmst_ptr->max_frag_len_;
01341     dstPtr = new char[dstSize];
01342     
01343     // Copy all but last fragment to a buffer
01344     for (int i = 0; i < rmst_ptr->max_frag_; i++){
01345       frag_iterator = rmst_ptr->frag_map_.find(i);
01346       tmp_frag_ptr = (*frag_iterator).second;
01347       memcpy((void *)&dstPtr[i * rmst_ptr->mtu_len_],
01348          (void *)tmp_frag_ptr, rmst_ptr->mtu_len_);
01349     }
01350 
01351     // Now, copy the last fragment to the buffer
01352     frag_iterator = rmst_ptr->frag_map_.find(rmst_ptr->max_frag_);
01353     tmp_frag_ptr = (*frag_iterator).second;
01354     memcpy((void *)&dstPtr[rmst_ptr->max_frag_ * rmst_ptr->mtu_len_],
01355        (void *)tmp_frag_ptr, rmst_ptr->max_frag_len_);
01356 
01357     // Since we copied everything from the map - clean it up
01358     rec_rmst_map_.erase(rmst_iterator);
01359     delete rmst_ptr;
01360 
01361     // Now we substitute the last fragment with the reconstructed blob
01362     data_buf_attr->setVal(dstPtr, dstSize);
01363 
01364     // Deliver this to the application
01365     return true;
01366   }
01367 
01368   // We don't have the entire blob
01369   return false;
01370 }
01371 
01372 HandleEntry * DiffusionRouting::removeHandle(handle my_handle, HandleList *hl)
01373 {
01374   HandleList::iterator itr;
01375   HandleEntry *entry;
01376 
01377   for (itr = hl->begin(); itr != hl->end(); ++itr){
01378     entry = *itr;
01379     if (entry->hdl_ == my_handle){
01380       hl->erase(itr);
01381       return entry;
01382     }
01383   }
01384   return NULL;
01385 }
01386 
01387 HandleEntry * DiffusionRouting::findHandle(handle my_handle, HandleList *hl)
01388 {
01389   HandleList::iterator itr;
01390   HandleEntry *entry;
01391 
01392   for (itr = hl->begin(); itr != hl->end(); ++itr){
01393     entry = *itr;
01394     if (entry->hdl_ == my_handle)
01395       return entry;
01396   }
01397   return NULL;
01398 }
01399 
01400 FilterEntry * DiffusionRouting::deleteFilter(handle my_handle)
01401 {
01402   FilterList::iterator itr;
01403   FilterEntry *entry;
01404 
01405   for (itr = filter_list_.begin(); itr != filter_list_.end(); ++itr){
01406     entry = *itr;
01407     if (entry->handle_ == my_handle){
01408       filter_list_.erase(itr);
01409       return entry;
01410     }
01411   }
01412   return NULL;
01413 }
01414 
01415 FilterEntry * DiffusionRouting::findFilter(handle my_handle)
01416 {
01417   FilterList::iterator itr;
01418   FilterEntry *entry;
01419 
01420   for (itr = filter_list_.begin(); itr != filter_list_.end(); ++itr){
01421     entry = *itr;
01422     if (entry->handle_ == my_handle)
01423       return entry;
01424   }
01425   return NULL;
01426 }
01427 
01428 bool DiffusionRouting::hasScope(NRAttrVec *attrs)
01429 {
01430   NRAttribute *temp = NULL;
01431 
01432   temp = NRScopeAttr.find(attrs);
01433   if (temp)
01434     return true;
01435 
01436   return false;
01437 }
01438 
01439 bool DiffusionRouting::checkSubscription(NRAttrVec *attrs)
01440 {
01441   NRSimpleAttribute<int> *nrclass = NULL;
01442   NRSimpleAttribute<int> *nrscope = NULL;
01443 
01444   // We first try to locate both class and scope attributes
01445   nrclass = NRClassAttr.find(attrs);
01446   nrscope = NRScopeAttr.find(attrs);
01447 
01448   // There must be a class attribute in subscriptions
01449   if (!nrclass)
01450     return false;
01451 
01452   if (nrscope){
01453     // This subcription has both class and scope attribute. So, we
01454     // check if class/scope attributes comply with the Diffusion
01455     // Routing API
01456 
01457     // Must check scope's operator. The API requires it to be "IS"
01458     if (nrscope->getOp() != NRAttribute::IS)
01459       return false;
01460 
01461     // Ok, so first check if this is a global subscription
01462     if ((nrscope->getVal() == NRAttribute::GLOBAL_SCOPE) &&
01463     (nrclass->getVal() == NRAttribute::INTEREST_CLASS) &&
01464     (nrclass->getOp() == NRAttribute::IS))
01465       return true;
01466 
01467     // Check for local subscriptions
01468     if (nrscope->getVal() == NRAttribute::NODE_LOCAL_SCOPE)
01469       return true;
01470 
01471     // Just to be sure we did not miss any case
01472     return false;
01473   }
01474 
01475   // If there is no scope attribute, we will insert one later if this
01476   // subscription looks like a global subscription
01477   if ((nrclass->getVal() == NRAttribute::INTEREST_CLASS) &&
01478       (nrclass->getOp() == NRAttribute::IS))
01479     return true;
01480 
01481   return false;
01482 }
01483 
01484 bool DiffusionRouting::checkPublication(NRAttrVec *attrs)
01485 {
01486   NRSimpleAttribute<int> *nrclass = NULL;
01487   NRSimpleAttribute<int> *nrscope = NULL;
01488 
01489   // We first try to locate both class and scope attributes
01490   nrclass = NRClassAttr.find(attrs);
01491   nrscope = NRScopeAttr.find(attrs);
01492 
01493   // There must be a class attribute in the publication
01494   if (!nrclass)
01495     return false;
01496 
01497   // In addition, the Diffusion Routing API requires the class
01498   // attribute to be set to "IS DATA_CLASS"
01499   if ((nrclass->getVal() != NRAttribute::DATA_CLASS) ||
01500       (nrclass->getOp() != NRAttribute::IS))
01501     return false;
01502 
01503   if (nrscope){
01504     // Ok, so this publication has both class and scope attributes. We
01505     // now have to check if they comply to the Diffusion Routing API
01506     // semantics for publish
01507 
01508     // Must check scope's operator. The API requires it to be "IS"
01509     if (nrscope->getOp() != NRAttribute::IS)
01510       return false;
01511 
01512     // We accept both global and local scope data messages
01513     if ((nrscope->getVal() == NRAttribute::GLOBAL_SCOPE) ||
01514     (nrscope->getVal() == NRAttribute::NODE_LOCAL_SCOPE))
01515       return true;
01516 
01517     // Just not to miss any case
01518     return false;
01519   }
01520 
01521   // A publish without a scope attribute is fine, we will include a
01522   // default NODE_LOCAL_SCOPE attribute later
01523   return true;
01524 }
01525 
01526 bool DiffusionRouting::checkSend(NRAttrVec *attrs)
01527 {
01528   NRSimpleAttribute<int> *nrclass = NULL;
01529   NRSimpleAttribute<int> *nrscope = NULL;
01530 
01531   // Currently only checks for Class and Scope attributes
01532   nrclass = NRClassAttr.find(attrs);
01533   nrscope = NRScopeAttr.find(attrs);
01534 
01535   if (nrclass || nrscope)
01536     return false;
01537 
01538   return true;
01539 }
01540 
01541 bool DiffusionRouting::isPushData(NRAttrVec *attrs)
01542 {
01543   NRSimpleAttribute<int> *nrclass = NULL;
01544   NRSimpleAttribute<int> *nrscope = NULL;
01545 
01546   // Currently only checks for Class and Scope attributes
01547   nrclass = NRClassAttr.find(attrs);
01548   nrscope = NRScopeAttr.find(attrs);
01549 
01550   // We should have both class and scope
01551   if (nrclass && nrscope){
01552     if (nrscope->getVal() == NRAttribute::NODE_LOCAL_SCOPE)
01553       return false;
01554     return true;
01555   }
01556   else{
01557     DiffPrint(DEBUG_ALWAYS, "Error: Cannot find class/scope attributes !\n");
01558     return false;
01559   }
01560 }

Generated on Tue Mar 6 16:47:44 2007 for ns2 Network Simulator 2.29 by  doxygen 1.4.6