filter_core.cc

Go to the documentation of this file.
00001 // 
00002 // filter_core.cc  : Main Diffusion program
00003 // authors         : Chalermek Intanagonwiwat and Fabio Silva
00004 //
00005 // Copyright (C) 2000-2003 by the University of Southern California
00006 // $Id: filter_core.cc,v 1.4 2005/09/13 04:53:47 tomh Exp $
00007 //
00008 // This program is free software; you can redistribute it and/or
00009 // modify it under the terms of the GNU General Public License,
00010 // version 2, as published by the Free Software Foundation.
00011 //
00012 // This program is distributed in the hope that it will be useful,
00013 // but WITHOUT ANY WARRANTY; without even the implied warranty of
00014 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00015 // GNU General Public License for more details.
00016 //
00017 // You should have received a copy of the GNU General Public License along
00018 // with this program; if not, write to the Free Software Foundation, Inc.,
00019 // 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA.
00020 //
00021 // Linking this file statically or dynamically with other modules is making
00022 // a combined work based on this file.  Thus, the terms and conditions of
00023 // the GNU General Public License cover the whole combination.
00024 //
00025 // In addition, as a special exception, the copyright holders of this file
00026 // give you permission to combine this file with free software programs or
00027 // libraries that are released under the GNU LGPL and with code included in
00028 // the standard release of ns-2 under the Apache 2.0 license or under
00029 // otherwise-compatible licenses with advertising requirements (or modified
00030 // versions of such code, with unchanged license).  You may copy and
00031 // distribute such a system following the terms of the GNU GPL for this
00032 // file and the licenses of the other code concerned, provided that you
00033 // include the source code of that other code when and as the GNU GPL
00034 // requires distribution of source code.
00035 //
00036 // Note that people who make modified versions of this file are not
00037 // obligated to grant this special exception for their modified versions;
00038 // it is their choice whether to do so.  The GNU General Public License
00039 // gives permission to release a modified version without this exception;
00040 // this exception also makes it possible to release a modified version
00041 // which carries forward this exception.
00042 
00043 #include "filter_core.hh"
00044 
00045 #ifndef NS_DIFFUSION
00046 DiffusionCoreAgent *agent;
00047 #endif // !NS_DIFFUSION
00048 
00049 class HashEntry {
00050 public:
00051   bool dummy;
00052 
00053   HashEntry() { 
00054     dummy  = false;
00055   }
00056 };
00057 
00058 class NeighborEntry {
00059 public:
00060   int32_t id;
00061   struct timeval tmv;
00062 
00063   NeighborEntry(int _id) : id(_id)
00064   {
00065     GetTime(&tmv);
00066   }
00067 };
00068 
00069 int NeighborsTimeoutTimer::expire()
00070 {
00071   agent_->neighborsTimeout();
00072 
00073   return 0;
00074 }
00075 
00076 int FilterTimeoutTimer::expire()
00077 {
00078   agent_->filterTimeout();
00079 
00080   return 0;
00081 }
00082 
00083 int DiffusionStopTimer::expire()
00084 {
00085   agent_->timeToStop();
00086 #ifndef NS_DIFFUSION
00087   exit(0);
00088 #endif // !NS_DIFFUSION
00089 
00090   // Never gets here !
00091   return 0;
00092 }
00093 
00094 void DiffusionCoreAgent::timeToStop()
00095 {
00096 #ifdef STATS
00097   char out_filename[100];
00098   FILE *outfile = NULL;
00099 
00100   if (stats_){
00101     sprintf(out_filename, "/tmp/diffusion-%d.out", my_id_);
00102     outfile = fopen(out_filename, "w");
00103 
00104     if (outfile == NULL){
00105       DiffPrint(DEBUG_ALWAYS,
00106         "Diffusion Error: Cannot create %s\n", out_filename);
00107       return;
00108     }
00109 
00110     stats_->printStats(stdout);
00111     if (outfile){
00112       stats_->printStats(outfile);
00113       fclose(outfile);
00114     }
00115   }
00116 #endif // STATS
00117 }
00118 
00119 #ifndef NS_DIFFUSION
00120 
00121 void signal_handler(int p)
00122 {
00123   agent->timeToStop();
00124   exit(0);
00125 }
00126 
00127 void DiffusionCoreAgent::usage(char *s)
00128 {
00129 
00130   DiffPrint(DEBUG_ALWAYS, "Usage: %s [-d debug] [-f filename] [-t stoptime] [-v] [-h] [-p port]", s);
00131 #ifdef IO_LOG
00132   DiffPrint(DEBUG_ALWAYS, " [-l]");
00133 #endif // IO_LOG
00134 #ifdef STATS
00135   DiffPrint(DEBUG_ALWAYS, " [-s] [-i warm_up_time]");
00136 #endif // STATS
00137   DiffPrint(DEBUG_ALWAYS, "\n\n");
00138   DiffPrint(DEBUG_ALWAYS, "\t-d - Sets debug level (0-10)\n");
00139   DiffPrint(DEBUG_ALWAYS, "\t-t - Stops after stoptime seconds\n");
00140   DiffPrint(DEBUG_ALWAYS, "\t-f - Uses filename as the config file\n");
00141   DiffPrint(DEBUG_ALWAYS, "\t-v - Prints diffusion version\n");
00142   DiffPrint(DEBUG_ALWAYS, "\t-h - Prints this information\n");
00143   DiffPrint(DEBUG_ALWAYS, "\t-p - Sets diffusion port to port\n");
00144 #ifdef IO_LOG
00145   DiffPrint(DEBUG_ALWAYS, "\t-l - Turns on i/o logging\n");
00146 #endif // IO_LOG
00147 #ifdef STATS
00148   DiffPrint(DEBUG_ALWAYS, "\t-s - Disables statistics\n");
00149   DiffPrint(DEBUG_ALWAYS, "\t-i - Ignores traffic from the first warm_up_time seconds for stats\n");
00150 #endif // STATS
00151 
00152   DiffPrint(DEBUG_ALWAYS, "\n");
00153 
00154   exit(0);
00155 }
00156 
00157 void DiffusionCoreAgent::run()
00158 {
00159   DeviceList::iterator device_itr;
00160   DiffPacket in_pkt;
00161   fd_set fds;
00162   bool flag;
00163   int status, max_sock, fd;
00164   struct timeval tv;
00165 
00166   // Main Select Loop
00167   while (1){
00168 
00169     // Wait for incoming packets
00170     FD_ZERO(&fds);
00171     max_sock = 0;
00172 
00173     // Figure out how much time to wait
00174     timers_manager_->nextTimerTime(&tv);
00175     if (tv.tv_sec == 0 && tv.tv_usec == 0){
00176       // Timer has expired !
00177       timers_manager_->executeAllExpiredTimers();
00178       continue;
00179     }
00180 
00181     for (device_itr = in_devices_.begin();
00182      device_itr != in_devices_.end(); ++device_itr){
00183       (*device_itr)->addInFDS(&fds, &max_sock);
00184     }
00185 
00186     status = select(max_sock+1, &fds, NULL, NULL, &tv);
00187 
00188     if (status == 0){
00189       // We process all expired timers
00190       timers_manager_->executeAllExpiredTimers();
00191     }
00192 
00193     // Check for new packets
00194     if (status > 0){
00195       do{
00196     flag = false;
00197     for (device_itr = in_devices_.begin();
00198          device_itr != in_devices_.end(); ++device_itr){
00199       fd = (*device_itr)->checkInFDS(&fds);
00200       if (fd != -1){
00201         // Message waiting
00202         in_pkt = (*device_itr)->recvPacket(fd);
00203 
00204         if (in_pkt)
00205           recvPacket(in_pkt);
00206 
00207         // Clear this fd
00208         FD_CLR(fd, &fds);
00209         status--;
00210         flag = true;
00211       }
00212     }
00213       } while ((status > 0) && (flag == true));
00214     }
00215 
00216     // This should not happen
00217     if (status < 0){
00218       DiffPrint(DEBUG_IMPORTANT, "Select returned %d\n", status);
00219     }
00220   }
00221 }
00222 #endif // !NS_DIFFUSION
00223 
00224 void DiffusionCoreAgent::neighborsTimeout()
00225 {
00226   struct timeval tmv;
00227   NeighborEntry *neighbor_entry;
00228   NeighborList::iterator neighbor_itr;
00229 
00230   DiffPrint(DEBUG_MORE_DETAILS, "Neighbors Timeout !\n");
00231 
00232   GetTime(&tmv);
00233 
00234   neighbor_itr = neighbor_list_.begin();
00235 
00236   while(neighbor_itr != neighbor_list_.end()){
00237     neighbor_entry = *neighbor_itr;
00238     if (tmv.tv_sec > neighbor_entry->tmv.tv_sec + NEIGHBORS_TIMEOUT){
00239       // This neighbor expired
00240       neighbor_itr = neighbor_list_.erase(neighbor_itr);
00241       delete neighbor_entry;
00242     }
00243     else{
00244       neighbor_itr++;
00245     }
00246   }
00247 }
00248 
00249 void DiffusionCoreAgent::filterTimeout()
00250 {
00251   struct timeval tmv;
00252   FilterEntry *filter_entry;
00253   FilterList::iterator filter_itr;
00254 
00255   DiffPrint(DEBUG_MORE_DETAILS, "Filter Timeout !\n");
00256 
00257   GetTime(&tmv);
00258 
00259   filter_itr = filter_list_.begin();
00260 
00261   while(filter_itr != filter_list_.end()){
00262     filter_entry = *filter_itr;
00263     if (tmv.tv_sec > filter_entry->tmv_.tv_sec + FILTER_TIMEOUT){
00264 
00265       // This filter expired
00266       DiffPrint(DEBUG_NO_DETAILS, "Filter %d, %d, %d timed out !\n",
00267         filter_entry->agent_, filter_entry->handle_,
00268         filter_entry->priority_);
00269       filter_itr = filter_list_.erase(filter_itr);
00270       delete filter_entry;
00271     }
00272     else{
00273       filter_itr++;
00274     }
00275   }
00276 }
00277 
00278 void DiffusionCoreAgent::sendMessage(Message *msg)
00279 {
00280   Tcl_HashEntry *tcl_hash_entry;
00281   unsigned int key[2];
00282   Message *send_message;
00283   
00284   send_message = new Message(DIFFUSION_VERSION, msg->msg_type_, diffusion_port_,
00285                  0, 0, msg->pkt_num_, msg->rdm_id_,
00286                  msg->next_hop_, 0);
00287 
00288   send_message->msg_attr_vec_ = CopyAttrs(msg->msg_attr_vec_);
00289   send_message->num_attr_ = send_message->msg_attr_vec_->size();
00290   send_message->data_len_ = CalculateSize(send_message->msg_attr_vec_);
00291 
00292   // Adjust message size for logging and check hash
00293   key[0] = msg->pkt_num_;
00294   key[1] = msg->rdm_id_;
00295   tcl_hash_entry = Tcl_FindHashEntry(&htable_, (char *) key);
00296   if (tcl_hash_entry)
00297     msg->new_message_ = 0;
00298   else
00299     msg->new_message_ = 1;
00300 
00301   send_message->new_message_ = msg->new_message_;
00302 
00303   // Check if message goes to an agent or the network
00304   if (msg->next_port_){
00305     // Message goes to an agent
00306     send_message->last_hop_ = LOCALHOST_ADDR;
00307 
00308     // If it's a local message, it has to go to a local agent
00309     if (send_message->next_hop_ != LOCALHOST_ADDR){
00310       DiffPrint(DEBUG_ALWAYS, "Error: Message destination is a local agent but next_hop != LOCALHOST_ADDR !\n");
00311       delete send_message;
00312       return;
00313     }
00314 
00315     // Send the message to the agent specified
00316     sendMessageToLibrary(send_message, msg->next_port_);
00317   }
00318   else{
00319     // Message goes to the network
00320     send_message->last_hop_ = my_id_;
00321 
00322 #ifdef STATS
00323     if (stats_)
00324       stats_->logOutgoingMessage(send_message);
00325 #endif // STATS
00326 
00327     // Add message to the hash table      
00328     if (tcl_hash_entry == NULL)
00329       putHash(key[0], key[1]);
00330     else
00331       DiffPrint(DEBUG_DETAILS, "Node%d: Message being sent is an old message !\n", my_id_);
00332 
00333     // Send Message
00334     sendMessageToNetwork(send_message);
00335   }
00336 
00337   delete send_message;
00338 }
00339 
00340 void DiffusionCoreAgent::forwardMessage(Message *msg, FilterEntry *filter_entry)
00341 {
00342   RedirectMessage *original_hdr;
00343   NRAttribute *original_header_attr;
00344   Message *send_message;
00345 
00346   // Create an attribute with the original header
00347   original_hdr = new RedirectMessage(msg->new_message_, msg->msg_type_,
00348                      msg->source_port_, msg->data_len_,
00349                      msg->num_attr_, msg->rdm_id_,
00350                      msg->pkt_num_, msg->next_hop_,
00351                      msg->last_hop_, filter_entry->handle_,
00352                      msg->next_port_);
00353 
00354   original_header_attr = OriginalHdrAttr.make(NRAttribute::IS,
00355                           (void *)original_hdr,
00356                           sizeof(RedirectMessage));
00357 
00358   send_message = new Message(DIFFUSION_VERSION, REDIRECT, diffusion_port_, 0,
00359                  0, pkt_count_, random_id_, LOCALHOST_ADDR, my_id_);
00360 
00361   // Increment pkt_counter
00362   pkt_count_++;
00363 
00364   // Duplicate the message's attributes
00365   send_message->msg_attr_vec_ = CopyAttrs(msg->msg_attr_vec_);
00366   
00367   // Add the extra attribute
00368   send_message->msg_attr_vec_->push_back(original_header_attr);
00369   send_message->num_attr_ = send_message->msg_attr_vec_->size();
00370   send_message->data_len_ = CalculateSize(send_message->msg_attr_vec_);
00371 
00372   sendMessageToLibrary(send_message, filter_entry->agent_);
00373 
00374   delete send_message;
00375   delete original_hdr;
00376 }
00377 
00378 #ifndef NS_DIFFUSION
00379 void DiffusionCoreAgent::sendMessageToLibrary(Message *msg, u_int16_t agent_id)
00380 {
00381   DiffPacket out_pkt = NULL;
00382   struct hdr_diff *dfh;
00383   int len;
00384   char *pos;
00385 
00386   out_pkt = AllocateBuffer(msg->msg_attr_vec_);
00387   dfh = HDR_DIFF(out_pkt);
00388 
00389   pos = (char *) out_pkt;
00390   pos = pos + sizeof(struct hdr_diff);
00391 
00392   len = PackAttrs(msg->msg_attr_vec_, pos);
00393 
00394   LAST_HOP(dfh) = htonl(msg->last_hop_);
00395   NEXT_HOP(dfh) = htonl(msg->next_hop_);
00396   DIFF_VER(dfh) = msg->version_;
00397   MSG_TYPE(dfh) = msg->msg_type_;
00398   DATA_LEN(dfh) = htons(len);
00399   PKT_NUM(dfh) = htonl(msg->pkt_num_);
00400   RDM_ID(dfh) = htonl(msg->rdm_id_);
00401   NUM_ATTR(dfh) = htons(msg->num_attr_);
00402   SRC_PORT(dfh) = htons(msg->source_port_);
00403 
00404   sendPacketToLibrary(out_pkt, sizeof(struct hdr_diff) + len, agent_id);
00405 
00406   delete [] out_pkt;
00407 }
00408 #else
00409 void DiffusionCoreAgent::sendMessageToLibrary(Message *msg, u_int16_t agent_id)
00410 {
00411   Message *send_message;
00412   DeviceList::iterator device_itr;
00413   int len;
00414 
00415   send_message = CopyMessage(msg);
00416   len = CalculateSize(send_message->msg_attr_vec_);
00417   len = len + sizeof(struct hdr_diff);
00418 
00419   for (device_itr = local_out_devices_.begin();
00420        device_itr != local_out_devices_.end(); ++device_itr){
00421     (*device_itr)->sendPacket((DiffPacket) send_message, len, agent_id);
00422   }
00423 }
00424 #endif // !NS_DIFFUSION
00425 
00426 #ifndef NS_DIFFUSION
00427 void DiffusionCoreAgent::sendMessageToNetwork(Message *msg)
00428 {
00429   DiffPacket out_pkt = NULL;
00430   struct hdr_diff *dfh;
00431   int len;
00432   char *pos;
00433 
00434   out_pkt = AllocateBuffer(msg->msg_attr_vec_);
00435   dfh = HDR_DIFF(out_pkt);
00436 
00437   pos = (char *) out_pkt;
00438   pos = pos + sizeof(struct hdr_diff);
00439 
00440   len = PackAttrs(msg->msg_attr_vec_, pos);
00441 
00442   LAST_HOP(dfh) = htonl(msg->last_hop_);
00443   NEXT_HOP(dfh) = htonl(msg->next_hop_);
00444   DIFF_VER(dfh) = msg->version_;
00445   MSG_TYPE(dfh) = msg->msg_type_;
00446   DATA_LEN(dfh) = htons(len);
00447   PKT_NUM(dfh) = htonl(msg->pkt_num_);
00448   RDM_ID(dfh) = htonl(msg->rdm_id_);
00449   NUM_ATTR(dfh) = htons(msg->num_attr_);
00450   SRC_PORT(dfh) = htons(msg->source_port_);
00451 
00452   sendPacketToNetwork(out_pkt, sizeof(struct hdr_diff) + len, msg->next_hop_);
00453 
00454   delete [] out_pkt;
00455 }
00456 #else
00457 void DiffusionCoreAgent::sendMessageToNetwork(Message *msg)
00458 {
00459   Message *send_message;
00460   int len;
00461   int32_t dst;
00462   DeviceList::iterator device_itr;
00463 
00464   send_message = CopyMessage(msg);
00465   len = CalculateSize(send_message->msg_attr_vec_);
00466   len = len + sizeof(struct hdr_diff);
00467   dst = send_message->next_hop_;
00468 
00469   for (device_itr = out_devices_.begin();
00470        device_itr != out_devices_.end(); ++device_itr){
00471     (*device_itr)->sendPacket((DiffPacket) send_message, len, dst);
00472   }
00473 }
00474 #endif // !NS_DIFFUSION
00475 
00476 void DiffusionCoreAgent::sendPacketToLibrary(DiffPacket pkt, int len,
00477                          u_int16_t dst)
00478 {
00479   DeviceList::iterator device_itr;
00480 
00481   for (device_itr = local_out_devices_.begin();
00482        device_itr != local_out_devices_.end(); ++device_itr){
00483     (*device_itr)->sendPacket(pkt, len, dst);
00484   }
00485 }
00486 
00487 void DiffusionCoreAgent::sendPacketToNetwork(DiffPacket pkt, int len, int dst)
00488 {
00489   DeviceList::iterator device_itr;
00490 
00491   for (device_itr = out_devices_.begin();
00492        device_itr != out_devices_.end(); ++device_itr){
00493     (*device_itr)->sendPacket(pkt, len, dst);
00494   }
00495 }
00496 
00497 void DiffusionCoreAgent::updateNeighbors(int id)
00498 {
00499   NeighborList::iterator neighbor_itr;
00500   NeighborEntry *neighbor_entry;
00501 
00502   if (id == LOCALHOST_ADDR || id == my_id_)
00503     return;
00504 
00505   for (neighbor_itr = neighbor_list_.begin();
00506        neighbor_itr != neighbor_list_.end(); ++neighbor_itr){
00507     if ((*neighbor_itr)->id == id)
00508       break;
00509   }
00510 
00511   if (neighbor_itr == neighbor_list_.end()){
00512     // This is a new neighbor
00513     neighbor_entry = new NeighborEntry(id);
00514     neighbor_list_.push_front(neighbor_entry);
00515   }
00516   else{
00517     // Just update the neighbor timeout
00518     GetTime(&((*neighbor_itr)->tmv));
00519   }
00520 }
00521 
00522 FilterEntry * DiffusionCoreAgent::findFilter(int16_t handle, u_int16_t agent)
00523 {
00524   FilterList::iterator filter_itr;
00525   FilterEntry *filter_entry;
00526 
00527   for (filter_itr = filter_list_.begin();
00528        filter_itr != filter_list_.end(); ++filter_itr){
00529     filter_entry = *filter_itr;
00530     if (handle != filter_entry->handle_ || agent != filter_entry->agent_)
00531       continue;
00532 
00533     // Found
00534     return filter_entry;
00535   }
00536   return NULL;
00537 }
00538 
00539 FilterEntry * DiffusionCoreAgent::deleteFilter(int16_t handle, u_int16_t agent)
00540 {
00541   FilterList::iterator filter_itr = filter_list_.begin();
00542   FilterEntry *filter_entry = NULL;
00543 
00544   while (filter_itr != filter_list_.end()){
00545     filter_entry = *filter_itr;
00546     if (handle == filter_entry->handle_ && agent == filter_entry->agent_){
00547       filter_list_.erase(filter_itr);
00548       break;
00549     }
00550     filter_entry = NULL;
00551     filter_itr++;
00552   }
00553   return filter_entry;
00554 }
00555 
00556 bool DiffusionCoreAgent::addFilter(NRAttrVec *attrs, u_int16_t agent,
00557                    int16_t handle, u_int16_t priority)
00558 {
00559   FilterList::iterator filter_itr;
00560   FilterEntry *filter_entry;
00561 
00562   filter_itr = filter_list_.begin();
00563   while (filter_itr != filter_list_.end()){
00564     filter_entry = *filter_itr;
00565     if (filter_entry->priority_ == priority)
00566       return false;
00567     filter_itr++;
00568   }
00569 
00570   filter_entry = new FilterEntry(handle, priority, agent);
00571 
00572   // Copy the Attribute Vector
00573   filter_entry->filter_attrs_ = CopyAttrs(attrs);
00574 
00575   // Add this filter to the filter list
00576   filter_list_.push_back(filter_entry);
00577 
00578   return true;
00579 }
00580 
00581 FilterList::iterator DiffusionCoreAgent::findMatchingFilter(NRAttrVec *attrs,
00582                                 FilterList::iterator filter_itr)
00583 {
00584   FilterEntry *filter_entry;
00585 
00586   for (;filter_itr != filter_list_.end(); ++filter_itr){
00587     filter_entry = *filter_itr;
00588 
00589     if (OneWayMatch(filter_entry->filter_attrs_, attrs)){
00590       // That's a match !
00591       break;
00592     }
00593   }
00594   return filter_itr;
00595 }
00596 
00597 bool DiffusionCoreAgent::restoreOriginalHeader(Message *msg)
00598 {
00599   NRAttrVec::iterator attr_itr = msg->msg_attr_vec_->begin();
00600   NRSimpleAttribute<void *> *original_header_attr = NULL;
00601   RedirectMessage *original_hdr;
00602 
00603   // Find original Header
00604   original_header_attr = OriginalHdrAttr.find_from(msg->msg_attr_vec_,
00605                            attr_itr, &attr_itr);
00606   if (!original_header_attr){
00607     DiffPrint(DEBUG_ALWAYS, "Error: DiffusionCoreAgent::ProcessControlMessage couldn't find the OriginalHdrAttr !\n");
00608     return false;
00609   }
00610 
00611   // Restore original Header
00612   original_hdr = (RedirectMessage *) original_header_attr->getVal();
00613 
00614   msg->msg_type_ = original_hdr->msg_type_;
00615   msg->source_port_ = original_hdr->source_port_;
00616   msg->pkt_num_ = original_hdr->pkt_num_;
00617   msg->rdm_id_ = original_hdr->rdm_id_;
00618   msg->next_hop_ = original_hdr->next_hop_;
00619   msg->last_hop_ = original_hdr->last_hop_;
00620   msg->new_message_ = original_hdr->new_message_;
00621   msg->num_attr_ = original_hdr->num_attr_;
00622   msg->data_len_ = original_hdr->data_len_;
00623   msg->next_port_ = original_hdr->next_port_;
00624 
00625   // Delete attribute from original set
00626   msg->msg_attr_vec_->erase(attr_itr);
00627   delete original_header_attr;
00628 
00629   return true;
00630 }
00631 
00632 FilterList * DiffusionCoreAgent::getFilterList(NRAttrVec *attrs)
00633 {
00634   FilterList *matching_filter_list = new FilterList;
00635   FilterList::iterator known_filters_itr, filter_list_itr;
00636   FilterEntry *matching_filter_entry, *filter_entry;
00637 
00638   // We need to come up with a list of filters to call
00639   // F1 will be called before F2 if F1->priority > F2->priority
00640 
00641   known_filters_itr = findMatchingFilter(attrs, filter_list_.begin());
00642 
00643   while (known_filters_itr != filter_list_.end()){
00644     // We have a match !
00645     matching_filter_entry = *known_filters_itr;
00646 
00647     for (filter_list_itr = matching_filter_list->begin();
00648      filter_list_itr != matching_filter_list->end(); ++filter_list_itr){
00649       filter_entry = *filter_list_itr;
00650 
00651       // Figure out where to insert 
00652       if (matching_filter_entry->priority_ > filter_entry->priority_)
00653     break;
00654     }
00655 
00656     // Insert matching filter in the list
00657     matching_filter_list->insert(filter_list_itr, matching_filter_entry);
00658 
00659     // Continue the search
00660     known_filters_itr++;
00661     known_filters_itr = findMatchingFilter(attrs, known_filters_itr);
00662   }
00663   return matching_filter_list;
00664 }
00665 
00666 u_int16_t DiffusionCoreAgent::getNextFilterPriority(int16_t handle,
00667                             u_int16_t priority,
00668                             u_int16_t agent)
00669 {
00670   FilterList::iterator filter_itr;
00671   FilterEntry *filter_entry;
00672 
00673   if ((priority < FILTER_MIN_PRIORITY) ||
00674       (priority > FILTER_KEEP_PRIORITY))
00675     return FILTER_INVALID_PRIORITY;
00676 
00677   if (priority < FILTER_KEEP_PRIORITY)
00678     return (priority - 1);
00679 
00680   filter_itr = filter_list_.begin();
00681 
00682   while (filter_itr != filter_list_.end()){
00683     filter_entry = *filter_itr;
00684 
00685     if ((filter_entry->handle_ == handle) && (filter_entry->agent_ == agent)){
00686       // Found this filter
00687       return (filter_entry->priority_ - 1);
00688     }
00689 
00690     filter_itr++;
00691   }
00692 
00693   return FILTER_INVALID_PRIORITY;
00694 }
00695 
00696 void DiffusionCoreAgent::processMessage(Message *msg)
00697 {
00698   FilterList *filter_list;
00699   FilterList::iterator filter_list_itr;
00700   FilterEntry *filter_entry;
00701 
00702   filter_list = getFilterList(msg->msg_attr_vec_);
00703 
00704   // Ok, we have a list of Filters to call. Send this message
00705   // to the first filter on this list
00706   if (filter_list->size() > 0){
00707     filter_list_itr = filter_list->begin();
00708     filter_entry = *filter_list_itr;
00709 
00710     forwardMessage(msg, filter_entry);
00711     filter_list->clear();
00712   }
00713   delete filter_list;
00714 }
00715 
00716 void DiffusionCoreAgent::processControlMessage(Message *msg)
00717 {
00718   NRSimpleAttribute<void *> *ctrl_msg_attr = NULL;
00719   NRAttrVec::iterator attr_itr;
00720   ControlMessage *control_blob = NULL;
00721   FilterList *filter_list;
00722   FilterList::iterator filter_list_itr;
00723   FilterEntry *filter_entry;
00724   int command, param1, param2;
00725   u_int16_t priority, source_port, new_priority;
00726   int16_t handle;
00727   bool filter_is_last = false;
00728 
00729   // Control messages should not come from other nodes
00730   if (msg->last_hop_ != LOCALHOST_ADDR){
00731     DiffPrint(DEBUG_ALWAYS,
00732           "Error: Received control message from another node !\n");
00733     return;
00734   }
00735 
00736   // Find the control attribute
00737   attr_itr = msg->msg_attr_vec_->begin();
00738   ctrl_msg_attr = ControlMsgAttr.find_from(msg->msg_attr_vec_,
00739                        attr_itr, &attr_itr);
00740 
00741   if (!ctrl_msg_attr){
00742     // Control message is invalid
00743     DiffPrint(DEBUG_ALWAYS, "Error: Control message received is invalid !\n");
00744     return;
00745   }
00746 
00747   // Extract the control message info
00748   control_blob = (ControlMessage *) ctrl_msg_attr->getVal();
00749   command = control_blob->command_;
00750   param1 = control_blob->param1_;
00751   param2 = control_blob->param2_;
00752 
00753   // Filter API definitions
00754   //
00755   // command = ADD_UPDATE_FILTER
00756   // param1  = priority
00757   // param2  = handle
00758   // attrs   = other attrs specify the filter
00759   // 
00760   // Remarks: If this filter is already present for this module,
00761   //          we don't create a new one. A filter is identified
00762   //          by the handle and the originating agent. The filter
00763   //          gets refreshed if it already exists. If attrs and
00764   //          handle are the same, we update the priority.
00765   //
00766   //
00767   // command = REMOVE_FILTER
00768   // param1  = handle
00769   //
00770   // Remarks: Remove the filter identified by (agent, handle)
00771   //          If it's not found, a warning message is generated.
00772   //
00773   //
00774   // Remarks: Send message from a local App to another App or
00775   //          a neighbor. If agent_id is zero, the packet goes
00776   //          out to the network. Otherwise, it goes to the
00777   //          agent_id located on this node.
00778   //
00779   //
00780   // command = SEND_MESSAGE
00781   // param1  = handle
00782   // param2  = priority
00783   //
00784   // Remarks: Send this message to the next filter or to a local
00785   //          application. We have to assemble the list again
00786   //          and figure out the current agent's position on the
00787   //          list. Then, we send to the next guy. If there is
00788   //          no other filter in the list, we try to send it to
00789   //          the network, if next_hop contains a node address.
00790 
00791   logControlMessage(msg, command, param1, param2);
00792 
00793   // First we remove the control attribute from the message
00794   msg->msg_attr_vec_->erase(attr_itr);
00795   delete ctrl_msg_attr;
00796 
00797   switch(command){
00798   case ADD_UPDATE_FILTER:
00799 
00800     priority = param1;
00801     handle = param2;
00802 
00803     filter_entry = findFilter(handle, msg->source_port_);
00804 
00805     if (filter_entry){
00806       // Filter already present, must be an update message
00807       if (PerfectMatch(filter_entry->filter_attrs_, msg->msg_attr_vec_)){
00808     // Attrs also match, let's update the filter's timeout
00809     GetTime(&(filter_entry->tmv_));
00810 
00811     // Check if the priority has changed...
00812     if (priority == filter_entry->priority_){
00813       // Nothing to do !
00814       DiffPrint(DEBUG_SOME_DETAILS, "Filter %d, %d, %d refreshed.\n",
00815             filter_entry->agent_, filter_entry->handle_,
00816             filter_entry->priority_);
00817     }
00818     else{
00819       // Update the priority
00820       DiffPrint(DEBUG_NO_DETAILS,
00821             "Updated priority of filter %d, %d, %d to %d\n",
00822             msg->source_port_, handle, filter_entry->priority_, priority);
00823       filter_entry->priority_ = priority;
00824     }
00825 
00826     break;
00827       }
00828       else{
00829     // Filter attributes have changed ! This is not allowed !
00830     DiffPrint(DEBUG_ALWAYS,
00831           "Filter attributes cannot change during an update !\n");
00832     break;
00833       }
00834     }
00835     else{
00836       // This is a new filter
00837       if (!addFilter(msg->msg_attr_vec_, msg->source_port_, handle, priority)){
00838     DiffPrint(DEBUG_ALWAYS, "Failed to add filter %d, %d, %d\n",
00839           msg->source_port_, handle, priority);
00840       }
00841       else{
00842     DiffPrint(DEBUG_NO_DETAILS, "Adding filter %d, %d, %d\n",
00843           msg->source_port_, handle, priority);
00844       }
00845     }
00846 
00847     break;
00848 
00849   case REMOVE_FILTER:
00850 
00851     handle = param1;
00852     filter_entry = deleteFilter(handle, msg->source_port_);
00853     if (filter_entry){
00854       // Filter deleted
00855       DiffPrint(DEBUG_NO_DETAILS, "Filter %d, %d, %d deleted.\n",
00856         filter_entry->agent_, filter_entry->handle_,
00857         filter_entry->priority_);
00858 
00859       delete filter_entry;
00860     }
00861     else{
00862       DiffPrint(DEBUG_ALWAYS, "Couldn't find filter to delete !\n");
00863     }
00864 
00865     break;
00866 
00867   case SEND_MESSAGE:
00868 
00869     handle = param1;
00870     priority = param2;
00871     source_port = msg->source_port_;
00872 
00873     if (!restoreOriginalHeader(msg))
00874       break;
00875 
00876     new_priority = getNextFilterPriority(handle, priority, source_port);
00877 
00878     if (new_priority == FILTER_INVALID_PRIORITY)
00879       break;
00880 
00881     // Now process the incoming message
00882     filter_list = getFilterList(msg->msg_attr_vec_);
00883 
00884     // Find the filter after the 'current' filter on the list
00885     if (filter_list->size() > 0){
00886       for (filter_list_itr = filter_list->begin();
00887        filter_list_itr != filter_list->end(); ++filter_list_itr){
00888     filter_entry = *filter_list_itr;
00889     if (filter_entry->priority_ <= new_priority){
00890       forwardMessage(msg, filter_entry);
00891       break;
00892     }
00893       }
00894 
00895       if (filter_list_itr == filter_list->end())
00896     filter_is_last = true;
00897 
00898     }
00899     else{
00900       filter_is_last = true;
00901     }
00902 
00903     if (filter_is_last){
00904       // Forward message to the network or the destination application
00905       sendMessage(msg);
00906     }
00907 
00908     filter_list->clear();
00909 
00910     delete filter_list;
00911 
00912     break;
00913 
00914   case ADD_TO_BLACKLIST:
00915 
00916     DiffPrint(DEBUG_IMPORTANT, "Diffusion: Adding node %d to blacklist !\n",
00917           param1);
00918     black_list_.push_front(param1);
00919 
00920     break;
00921 
00922   case CLEAR_BLACKLIST:
00923 
00924     DiffPrint(DEBUG_IMPORTANT, "Diffusion: Clearing blacklist !\n");
00925 
00926     black_list_.clear();
00927 
00928     break;
00929 
00930   default:
00931 
00932     DiffPrint(DEBUG_ALWAYS, "Error: Unknown control message received !\n");
00933 
00934     break;
00935   }
00936 }
00937 
00938 void DiffusionCoreAgent::logControlMessage(Message *msg, int command,
00939                        int param1, int param2)
00940 {
00941   // Logs the incoming message
00942 }
00943 
00944 #ifdef NS_DIFFUSION
00945 DiffusionCoreAgent::DiffusionCoreAgent(DiffRoutingAgent *diffrtg, int nodeid)
00946 {
00947 #else
00948 DiffusionCoreAgent::DiffusionCoreAgent(int argc, char **argv)
00949 {
00950   int opt;
00951   int debug_level;
00952 #endif // NS_DIFFUSION
00953   DeviceList *in_devices, *out_devices, *local_out_devices;
00954   DiffusionIO *device;
00955   TimerCallback *callback;
00956   char *node_id_env;
00957 #ifdef USE_EMSIM
00958   char *sim_id = getenv("SIM_ID");
00959   char *sim_group = getenv("SIM_GROUP");
00960   int32_t group_id;
00961 #endif // USE_EMSIM
00962   long stop_time;
00963   struct timeval tv;
00964 #ifdef IO_LOG
00965   IOLog *pseudo_io_device;
00966   bool use_io_log = false;
00967 #endif // IO_LOG
00968 #ifdef STATS
00969   bool use_io_stats = true;
00970   int stats_warm_up_time = 0;
00971 #endif // STATS
00972   //bool node_id_configured = false;
00973 
00974   opterr = 0;
00975   config_file_ = NULL;
00976   stop_time = 0;
00977 
00978   node_id_env = getenv("node_addr");
00979   diffusion_port_ = DEFAULT_DIFFUSION_PORT;
00980 
00981 #ifndef NS_DIFFUSION
00982   // Parse command line options
00983   while (1){
00984     opt = getopt(argc, argv, COMMAND_LINE_ARGS);
00985 
00986     switch(opt){
00987 
00988     case 'p':
00989 
00990       diffusion_port_ = (u_int16_t) atoi(optarg);
00991       if ((diffusion_port_ < 1024) || (diffusion_port_ >= 65535)){
00992     DiffPrint(DEBUG_ALWAYS,
00993           "Diffusion Error: Port must be between 1024 and 65535 !\n");
00994     exit(-1);
00995       }
00996 
00997       break;
00998 
00999     case 't':
01000 
01001       stop_time = atol(optarg);
01002       if (stop_time <= 0){
01003     DiffPrint(DEBUG_ALWAYS, "Diffusion Error: stop time must be > 0\n");
01004     exit(-1);
01005       }
01006       else{
01007     DiffPrint(DEBUG_ALWAYS, "%s will stop after %ld seconds\n",
01008           PROGRAM, stop_time);
01009       }
01010 
01011       break;
01012 
01013 #ifdef IO_LOG
01014     case 'l':
01015 
01016       use_io_log = true;
01017 
01018       break;
01019 
01020 #endif // IO_LOG
01021 
01022 #ifdef STATS
01023     case 's':
01024 
01025       use_io_stats = false;
01026 
01027       break;
01028 
01029     case 'i':
01030 
01031       stats_warm_up_time = atoi(optarg);
01032       if (stats_warm_up_time < 0){
01033     DiffPrint(DEBUG_ALWAYS, "Diffusion Error: warm_up_time must be > 0\n");
01034     exit(-1);
01035       }
01036 
01037       break;
01038 #endif // STATS
01039 
01040     case 'h':
01041 
01042       usage(argv[0]);
01043 
01044       break;
01045 
01046     case 'v':
01047 
01048       DiffPrint(DEBUG_ALWAYS, "\n%s %s\n", PROGRAM, RELEASE);
01049       exit(0);
01050 
01051       break;
01052 
01053     case 'd':
01054 
01055       debug_level = atoi(optarg);
01056 
01057       if (debug_level < 1 || debug_level > 10){
01058     DiffPrint(DEBUG_ALWAYS,
01059           "Error: Debug level outside range or missing !\n");
01060     usage(argv[0]);
01061       }
01062 
01063       global_debug_level = debug_level;
01064 
01065       break;
01066 
01067     case 'f':
01068 
01069       if (!strncasecmp(optarg, "-", 1)){
01070     DiffPrint(DEBUG_ALWAYS, "Error: Parameter is missing !\n");
01071     usage(argv[0]);
01072       }
01073 
01074       config_file_ = strdup(optarg);
01075 
01076       break;
01077 
01078     case '?':
01079 
01080       DiffPrint(DEBUG_ALWAYS,
01081         "Error: %c isn't a valid option or its parameter is missing !\n",
01082         optopt);
01083       usage(argv[0]);
01084 
01085       break;
01086 
01087     case ':':
01088 
01089       DiffPrint(DEBUG_ALWAYS, "Parameter missing !\n");
01090       usage(argv[0]);
01091 
01092       break;
01093     }
01094 
01095     if (opt == -1)
01096       break;
01097   }
01098 
01099   if (!config_file_)
01100     config_file_ = strdup(DEFAULT_CONFIG_FILE);
01101 
01102   // Get diffusion ID
01103   if (!node_id_configured){
01104     // Try to get id from environment variable
01105     if (node_id_env != NULL){
01106       my_id_ = atoi(node_id_env);
01107       node_id_configured = true;
01108     }
01109   }
01110 
01111 #ifdef USE_EMSIM
01112   if (!node_id_configured){
01113     // Try to read groups and node id from emsim environment variables
01114     if (sim_id && sim_group){
01115       my_id_ = atoi(sim_id);
01116       group_id = atoi(sim_group);
01117       diffusion_port_ = diffusion_port_ + my_id_ + (100 * group_id);
01118       node_id_configured = true;
01119     }
01120   }
01121 #endif // USE_EMSIM
01122 
01123   // Use random node id if user has not specified it
01124   if (!node_id_configured){
01125     DiffPrint(DEBUG_ALWAYS, "Diffusion : node_addr not set. Using random id.\n");
01126 
01127     // Generate random ID
01128     do{
01129       GetTime(&tv);
01130       SetSeed(&tv);
01131       my_id_ = GetRand();
01132     }
01133     while(my_id_ == LOCALHOST_ADDR || my_id_ == BROADCAST_ADDR);
01134   }
01135 
01136 #else
01137   my_id_ = nodeid;
01138 #endif // !NS_DIFFUSION
01139 
01140   // Initialize variables
01141   lon_ = 0.0;
01142   lat_ = 0.0;
01143 
01144 #ifdef STATS
01145   if (use_io_stats)
01146     stats_ = new DiffusionStats(my_id_, stats_warm_up_time);
01147   else
01148     stats_ = NULL;
01149 #endif // STATS
01150 
01151   GetTime(&tv);
01152   SetSeed(&tv);
01153   pkt_count_ = GetRand();
01154   random_id_ = GetRand();
01155 
01156   Tcl_InitHashTable(&htable_, 2);
01157 
01158   // Initialize EventQueue
01159   timers_manager_ = new TimerManager;
01160 
01161   // Create regular timers
01162   callback = new NeighborsTimeoutTimer(this);
01163   timers_manager_->addTimer(NEIGHBORS_DELAY, callback);
01164 
01165   callback = new FilterTimeoutTimer(this);
01166   timers_manager_->addTimer(FILTER_DELAY, callback);
01167 
01168   if (stop_time > 0){
01169     callback = new DiffusionStopTimer(this);
01170     timers_manager_->addTimer((stop_time * 1000), callback);
01171   }
01172 
01173   GetTime(&tv);
01174 
01175   // Print Initialization message
01176   DiffPrint(DEBUG_ALWAYS, "Diffusion : starting at time %ld:%ld\n",
01177         tv.tv_sec, tv.tv_usec);
01178   DiffPrint(DEBUG_ALWAYS, "Diffusion : Node id = %d\n", my_id_);
01179 
01180   // Initialize diffusion io devices
01181 #ifdef IO_LOG
01182   if (use_io_log){
01183     pseudo_io_device = new IOLog(my_id_);
01184     in_devices_.push_back(pseudo_io_device);
01185     out_devices_.push_back(pseudo_io_device);
01186 
01187     in_devices = &(pseudo_io_device->in_devices_);
01188     out_devices = &(pseudo_io_device->out_devices_);
01189     local_out_devices = &(local_out_devices_);
01190   }
01191   else{
01192     in_devices = &(in_devices_);
01193     out_devices = &(out_devices_);
01194     local_out_devices = &(local_out_devices_);
01195   }
01196 #else
01197   in_devices = &(in_devices_);
01198   out_devices = &(out_devices_);
01199   local_out_devices = &(local_out_devices_);
01200 #endif // IO_LOG
01201 
01202 #ifdef NS_DIFFUSION
01203   device = new LocalApp(diffrtg);
01204   local_out_devices->push_back(device);
01205 
01206   device = new LinkLayerAbs(diffrtg);
01207   out_devices->push_back(device);
01208 #endif // NS_DIFFUSION
01209 
01210 #ifdef UDP
01211   device = new UDPLocal(&diffusion_port_);
01212   in_devices->push_back(device);
01213   local_out_devices->push_back(device);
01214 
01215 #ifdef WIRED
01216   device = new UDPWired(config_file_);
01217   out_devices->push_back(device);
01218 #endif // WIRED
01219 #endif // UDP
01220 
01221 #ifdef USE_RPC
01222   device = new RPCIO();
01223   in_devices->push_back(device);
01224   out_devices->push_back(device);
01225 #endif // USE_RPC
01226 
01227 #ifdef USE_MOTE_NIC
01228   device = new MOTEIO();
01229   in_devices->push_back(device);
01230   out_devices->push_back(device);
01231 #endif // USE_MOTE_NIC
01232 
01233 #ifdef USE_SMAC
01234   device = new SMAC();
01235   in_devices->push_back(device);
01236   out_devices->push_back(device);
01237 #endif // USE_SMAC
01238 
01239 #ifdef USE_EMSTAR
01240 #ifdef USE_EMSIM
01241   device = new Emstar(my_id_, group_id, true);
01242 #else
01243   device = new Emstar();
01244 #endif // USE_EMSIM
01245   in_devices->push_back(device);
01246   out_devices->push_back(device);
01247 #endif // USE_EMSTAR
01248 
01249 #ifdef USE_WINSNG2
01250   device = new WINSNG2();
01251   in_devices->push_back(device);
01252   out_devices->push_back(device);
01253 #endif // USE_WINSNG2
01254 }
01255 
01256 HashEntry * DiffusionCoreAgent::getHash(unsigned int pkt_num,
01257                      unsigned int rdm_id)
01258 {
01259   unsigned int key[2];
01260 
01261   key[0] = pkt_num;
01262   key[1] = rdm_id;
01263 
01264   Tcl_HashEntry *entryPtr = Tcl_FindHashEntry(&htable_, (char *)key);
01265 
01266   if (entryPtr == NULL)
01267     return NULL;
01268 
01269   return (HashEntry *)Tcl_GetHashValue(entryPtr);
01270 }
01271 
01272 void DiffusionCoreAgent::putHash(unsigned int pkt_num,
01273                  unsigned int rdm_id)
01274 {
01275   Tcl_HashEntry *tcl_hash_entry;
01276   HashEntry *hash_entry;
01277   HashList::iterator hash_itr;
01278   unsigned int key[2];
01279   int new_hash_key;
01280 
01281   if (hash_list_.size() == HASH_TABLE_MAX_SIZE){
01282     // Hash table reached maximum size
01283 
01284     for (int i = 0; ((i < HASH_TABLE_REMOVE_AT_ONCE)
01285              && (hash_list_.size() > 0)); i++){
01286       hash_itr = hash_list_.begin();
01287       tcl_hash_entry = *hash_itr;
01288       hash_entry = (HashEntry *) Tcl_GetHashValue(tcl_hash_entry);
01289       delete hash_entry;
01290       hash_list_.erase(hash_itr);
01291       Tcl_DeleteHashEntry(tcl_hash_entry);
01292     }
01293   }
01294 
01295   key[0] = pkt_num;
01296   key[1] = rdm_id;
01297 
01298   tcl_hash_entry = Tcl_CreateHashEntry(&htable_, (char *)key, &new_hash_key);
01299 
01300   if (new_hash_key == 0){
01301     DiffPrint(DEBUG_IMPORTANT, "Key already exists in hash !\n");
01302     return;
01303   }
01304 
01305   hash_entry = new HashEntry;
01306 
01307   Tcl_SetHashValue(tcl_hash_entry, hash_entry);
01308   hash_list_.push_back(tcl_hash_entry);
01309 }
01310 
01311 #ifndef NS_DIFFUSION
01312 void DiffusionCoreAgent::recvPacket(DiffPacket pkt)
01313 {
01314   struct hdr_diff *dfh = HDR_DIFF(pkt);
01315   Message *rcv_message = NULL;
01316   int8_t version, msg_type;
01317   u_int16_t data_len, num_attr, source_port;
01318   int32_t rdm_id, pkt_num, next_hop, last_hop;   
01319 
01320   // Read header
01321   version = DIFF_VER(dfh);
01322   msg_type = MSG_TYPE(dfh);
01323   source_port = ntohs(SRC_PORT(dfh));
01324   pkt_num = ntohl(PKT_NUM(dfh));
01325   rdm_id = ntohl(RDM_ID(dfh));
01326   num_attr = ntohs(NUM_ATTR(dfh));
01327   next_hop = ntohl(NEXT_HOP(dfh));
01328   last_hop = ntohl(LAST_HOP(dfh));
01329   data_len = ntohs(DATA_LEN(dfh));
01330 
01331   // Packet is good, create a message
01332   rcv_message = new Message(version, msg_type, source_port, data_len,
01333                 num_attr, pkt_num, rdm_id, next_hop, last_hop);
01334 
01335   // Read all attributes into the Message structure
01336   rcv_message->msg_attr_vec_ = UnpackAttrs(pkt, num_attr);
01337 
01338   // Process the incoming message
01339   recvMessage(rcv_message);
01340 
01341   // Don't forget to message when we're done
01342   delete rcv_message;
01343 
01344   delete [] pkt;
01345 }
01346 #endif // !NS_DIFFUSION
01347 
01348 void DiffusionCoreAgent::recvMessage(Message *msg)
01349 {
01350   BlackList::iterator black_list_itr;
01351   Tcl_HashEntry *tcl_hash_entry;
01352   unsigned int key[2];
01353 
01354   // Check version
01355   if (msg->version_ != DIFFUSION_VERSION)
01356     return;
01357 
01358   // Check for ID conflict
01359   if (msg->last_hop_ == my_id_){
01360     DiffPrint(DEBUG_ALWAYS, "Error: A diffusion ID conflict has been detected !\n");
01361     exit(-1);
01362   }
01363 
01364   // Address filtering
01365   if ((msg->next_hop_ != BROADCAST_ADDR) &&
01366       (msg->next_hop_ != LOCALHOST_ADDR) &&
01367       (msg->next_hop_ != my_id_))
01368     return;
01369 
01370   // Blacklist filtering
01371   black_list_itr = black_list_.begin();
01372   while (black_list_itr != black_list_.end()){
01373     if (*black_list_itr == msg->last_hop_){
01374       DiffPrint(DEBUG_DETAILS, "Ignoring message from blacklisted node %d !\n",
01375         msg->last_hop_);
01376       return;
01377     }
01378     black_list_itr++;
01379   }
01380 
01381   // Control Messages are unique and don't go to the hash
01382   if (msg->msg_type_ != CONTROL){
01383     // Hash table keeps info about packets
01384   
01385     key[0] = msg->pkt_num_;
01386     key[1] = msg->rdm_id_;
01387     tcl_hash_entry = Tcl_FindHashEntry(&htable_, (char *) key);
01388 
01389     if (tcl_hash_entry != NULL){
01390       DiffPrint(DEBUG_DETAILS, "Node%d: Received old message !\n", my_id_);
01391       msg->new_message_ = 0;
01392     }
01393     else{
01394       // Add message to the hash table
01395       putHash(key[0], key[1]);
01396       msg->new_message_ = 1;
01397     }
01398   }
01399 
01400 #ifdef STATS
01401   if (stats_)
01402     stats_->logIncomingMessage(msg);
01403 #endif // STATS
01404 
01405   // Check if it's a control of a regular message
01406   if (msg->msg_type_ == CONTROL)
01407     processControlMessage(msg);
01408   else
01409     processMessage(msg);
01410 }
01411 
01412 #ifndef USE_SINGLE_ADDRESS_SPACE
01413 int main(int argc, char **argv)
01414 {
01415   agent = new DiffusionCoreAgent(argc, argv);
01416 
01417   signal(SIGINT, signal_handler);
01418 
01419   agent->run();
01420 
01421   return 0;
01422 }
01423 #endif // !USE_SINGLE_ADDRESS_SPACE

Generated on Tue Mar 6 16:47:45 2007 for ns2 Network Simulator 2.29 by  doxygen 1.4.6