00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040
00041
00042
00043 #include "filter_core.hh"
00044
00045 #ifndef NS_DIFFUSION
00046 DiffusionCoreAgent *agent;
00047 #endif // !NS_DIFFUSION
00048
00049 class HashEntry {
00050 public:
00051 bool dummy;
00052
00053 HashEntry() {
00054 dummy = false;
00055 }
00056 };
00057
00058 class NeighborEntry {
00059 public:
00060 int32_t id;
00061 struct timeval tmv;
00062
00063 NeighborEntry(int _id) : id(_id)
00064 {
00065 GetTime(&tmv);
00066 }
00067 };
00068
00069 int NeighborsTimeoutTimer::expire()
00070 {
00071 agent_->neighborsTimeout();
00072
00073 return 0;
00074 }
00075
00076 int FilterTimeoutTimer::expire()
00077 {
00078 agent_->filterTimeout();
00079
00080 return 0;
00081 }
00082
00083 int DiffusionStopTimer::expire()
00084 {
00085 agent_->timeToStop();
00086 #ifndef NS_DIFFUSION
00087 exit(0);
00088 #endif // !NS_DIFFUSION
00089
00090
00091 return 0;
00092 }
00093
00094 void DiffusionCoreAgent::timeToStop()
00095 {
00096 #ifdef STATS
00097 char out_filename[100];
00098 FILE *outfile = NULL;
00099
00100 if (stats_){
00101 sprintf(out_filename, "/tmp/diffusion-%d.out", my_id_);
00102 outfile = fopen(out_filename, "w");
00103
00104 if (outfile == NULL){
00105 DiffPrint(DEBUG_ALWAYS,
00106 "Diffusion Error: Cannot create %s\n", out_filename);
00107 return;
00108 }
00109
00110 stats_->printStats(stdout);
00111 if (outfile){
00112 stats_->printStats(outfile);
00113 fclose(outfile);
00114 }
00115 }
00116 #endif // STATS
00117 }
00118
00119 #ifndef NS_DIFFUSION
00120
00121 void signal_handler(int p)
00122 {
00123 agent->timeToStop();
00124 exit(0);
00125 }
00126
00127 void DiffusionCoreAgent::usage(char *s)
00128 {
00129
00130 DiffPrint(DEBUG_ALWAYS, "Usage: %s [-d debug] [-f filename] [-t stoptime] [-v] [-h] [-p port]", s);
00131 #ifdef IO_LOG
00132 DiffPrint(DEBUG_ALWAYS, " [-l]");
00133 #endif // IO_LOG
00134 #ifdef STATS
00135 DiffPrint(DEBUG_ALWAYS, " [-s] [-i warm_up_time]");
00136 #endif // STATS
00137 DiffPrint(DEBUG_ALWAYS, "\n\n");
00138 DiffPrint(DEBUG_ALWAYS, "\t-d - Sets debug level (0-10)\n");
00139 DiffPrint(DEBUG_ALWAYS, "\t-t - Stops after stoptime seconds\n");
00140 DiffPrint(DEBUG_ALWAYS, "\t-f - Uses filename as the config file\n");
00141 DiffPrint(DEBUG_ALWAYS, "\t-v - Prints diffusion version\n");
00142 DiffPrint(DEBUG_ALWAYS, "\t-h - Prints this information\n");
00143 DiffPrint(DEBUG_ALWAYS, "\t-p - Sets diffusion port to port\n");
00144 #ifdef IO_LOG
00145 DiffPrint(DEBUG_ALWAYS, "\t-l - Turns on i/o logging\n");
00146 #endif // IO_LOG
00147 #ifdef STATS
00148 DiffPrint(DEBUG_ALWAYS, "\t-s - Disables statistics\n");
00149 DiffPrint(DEBUG_ALWAYS, "\t-i - Ignores traffic from the first warm_up_time seconds for stats\n");
00150 #endif // STATS
00151
00152 DiffPrint(DEBUG_ALWAYS, "\n");
00153
00154 exit(0);
00155 }
00156
00157 void DiffusionCoreAgent::run()
00158 {
00159 DeviceList::iterator device_itr;
00160 DiffPacket in_pkt;
00161 fd_set fds;
00162 bool flag;
00163 int status, max_sock, fd;
00164 struct timeval tv;
00165
00166
00167 while (1){
00168
00169
00170 FD_ZERO(&fds);
00171 max_sock = 0;
00172
00173
00174 timers_manager_->nextTimerTime(&tv);
00175 if (tv.tv_sec == 0 && tv.tv_usec == 0){
00176
00177 timers_manager_->executeAllExpiredTimers();
00178 continue;
00179 }
00180
00181 for (device_itr = in_devices_.begin();
00182 device_itr != in_devices_.end(); ++device_itr){
00183 (*device_itr)->addInFDS(&fds, &max_sock);
00184 }
00185
00186 status = select(max_sock+1, &fds, NULL, NULL, &tv);
00187
00188 if (status == 0){
00189
00190 timers_manager_->executeAllExpiredTimers();
00191 }
00192
00193
00194 if (status > 0){
00195 do{
00196 flag = false;
00197 for (device_itr = in_devices_.begin();
00198 device_itr != in_devices_.end(); ++device_itr){
00199 fd = (*device_itr)->checkInFDS(&fds);
00200 if (fd != -1){
00201
00202 in_pkt = (*device_itr)->recvPacket(fd);
00203
00204 if (in_pkt)
00205 recvPacket(in_pkt);
00206
00207
00208 FD_CLR(fd, &fds);
00209 status--;
00210 flag = true;
00211 }
00212 }
00213 } while ((status > 0) && (flag == true));
00214 }
00215
00216
00217 if (status < 0){
00218 DiffPrint(DEBUG_IMPORTANT, "Select returned %d\n", status);
00219 }
00220 }
00221 }
00222 #endif // !NS_DIFFUSION
00223
00224 void DiffusionCoreAgent::neighborsTimeout()
00225 {
00226 struct timeval tmv;
00227 NeighborEntry *neighbor_entry;
00228 NeighborList::iterator neighbor_itr;
00229
00230 DiffPrint(DEBUG_MORE_DETAILS, "Neighbors Timeout !\n");
00231
00232 GetTime(&tmv);
00233
00234 neighbor_itr = neighbor_list_.begin();
00235
00236 while(neighbor_itr != neighbor_list_.end()){
00237 neighbor_entry = *neighbor_itr;
00238 if (tmv.tv_sec > neighbor_entry->tmv.tv_sec + NEIGHBORS_TIMEOUT){
00239
00240 neighbor_itr = neighbor_list_.erase(neighbor_itr);
00241 delete neighbor_entry;
00242 }
00243 else{
00244 neighbor_itr++;
00245 }
00246 }
00247 }
00248
00249 void DiffusionCoreAgent::filterTimeout()
00250 {
00251 struct timeval tmv;
00252 FilterEntry *filter_entry;
00253 FilterList::iterator filter_itr;
00254
00255 DiffPrint(DEBUG_MORE_DETAILS, "Filter Timeout !\n");
00256
00257 GetTime(&tmv);
00258
00259 filter_itr = filter_list_.begin();
00260
00261 while(filter_itr != filter_list_.end()){
00262 filter_entry = *filter_itr;
00263 if (tmv.tv_sec > filter_entry->tmv_.tv_sec + FILTER_TIMEOUT){
00264
00265
00266 DiffPrint(DEBUG_NO_DETAILS, "Filter %d, %d, %d timed out !\n",
00267 filter_entry->agent_, filter_entry->handle_,
00268 filter_entry->priority_);
00269 filter_itr = filter_list_.erase(filter_itr);
00270 delete filter_entry;
00271 }
00272 else{
00273 filter_itr++;
00274 }
00275 }
00276 }
00277
00278 void DiffusionCoreAgent::sendMessage(Message *msg)
00279 {
00280 Tcl_HashEntry *tcl_hash_entry;
00281 unsigned int key[2];
00282 Message *send_message;
00283
00284 send_message = new Message(DIFFUSION_VERSION, msg->msg_type_, diffusion_port_,
00285 0, 0, msg->pkt_num_, msg->rdm_id_,
00286 msg->next_hop_, 0);
00287
00288 send_message->msg_attr_vec_ = CopyAttrs(msg->msg_attr_vec_);
00289 send_message->num_attr_ = send_message->msg_attr_vec_->size();
00290 send_message->data_len_ = CalculateSize(send_message->msg_attr_vec_);
00291
00292
00293 key[0] = msg->pkt_num_;
00294 key[1] = msg->rdm_id_;
00295 tcl_hash_entry = Tcl_FindHashEntry(&htable_, (char *) key);
00296 if (tcl_hash_entry)
00297 msg->new_message_ = 0;
00298 else
00299 msg->new_message_ = 1;
00300
00301 send_message->new_message_ = msg->new_message_;
00302
00303
00304 if (msg->next_port_){
00305
00306 send_message->last_hop_ = LOCALHOST_ADDR;
00307
00308
00309 if (send_message->next_hop_ != LOCALHOST_ADDR){
00310 DiffPrint(DEBUG_ALWAYS, "Error: Message destination is a local agent but next_hop != LOCALHOST_ADDR !\n");
00311 delete send_message;
00312 return;
00313 }
00314
00315
00316 sendMessageToLibrary(send_message, msg->next_port_);
00317 }
00318 else{
00319
00320 send_message->last_hop_ = my_id_;
00321
00322 #ifdef STATS
00323 if (stats_)
00324 stats_->logOutgoingMessage(send_message);
00325 #endif // STATS
00326
00327
00328 if (tcl_hash_entry == NULL)
00329 putHash(key[0], key[1]);
00330 else
00331 DiffPrint(DEBUG_DETAILS, "Node%d: Message being sent is an old message !\n", my_id_);
00332
00333
00334 sendMessageToNetwork(send_message);
00335 }
00336
00337 delete send_message;
00338 }
00339
00340 void DiffusionCoreAgent::forwardMessage(Message *msg, FilterEntry *filter_entry)
00341 {
00342 RedirectMessage *original_hdr;
00343 NRAttribute *original_header_attr;
00344 Message *send_message;
00345
00346
00347 original_hdr = new RedirectMessage(msg->new_message_, msg->msg_type_,
00348 msg->source_port_, msg->data_len_,
00349 msg->num_attr_, msg->rdm_id_,
00350 msg->pkt_num_, msg->next_hop_,
00351 msg->last_hop_, filter_entry->handle_,
00352 msg->next_port_);
00353
00354 original_header_attr = OriginalHdrAttr.make(NRAttribute::IS,
00355 (void *)original_hdr,
00356 sizeof(RedirectMessage));
00357
00358 send_message = new Message(DIFFUSION_VERSION, REDIRECT, diffusion_port_, 0,
00359 0, pkt_count_, random_id_, LOCALHOST_ADDR, my_id_);
00360
00361
00362 pkt_count_++;
00363
00364
00365 send_message->msg_attr_vec_ = CopyAttrs(msg->msg_attr_vec_);
00366
00367
00368 send_message->msg_attr_vec_->push_back(original_header_attr);
00369 send_message->num_attr_ = send_message->msg_attr_vec_->size();
00370 send_message->data_len_ = CalculateSize(send_message->msg_attr_vec_);
00371
00372 sendMessageToLibrary(send_message, filter_entry->agent_);
00373
00374 delete send_message;
00375 delete original_hdr;
00376 }
00377
00378 #ifndef NS_DIFFUSION
00379 void DiffusionCoreAgent::sendMessageToLibrary(Message *msg, u_int16_t agent_id)
00380 {
00381 DiffPacket out_pkt = NULL;
00382 struct hdr_diff *dfh;
00383 int len;
00384 char *pos;
00385
00386 out_pkt = AllocateBuffer(msg->msg_attr_vec_);
00387 dfh = HDR_DIFF(out_pkt);
00388
00389 pos = (char *) out_pkt;
00390 pos = pos + sizeof(struct hdr_diff);
00391
00392 len = PackAttrs(msg->msg_attr_vec_, pos);
00393
00394 LAST_HOP(dfh) = htonl(msg->last_hop_);
00395 NEXT_HOP(dfh) = htonl(msg->next_hop_);
00396 DIFF_VER(dfh) = msg->version_;
00397 MSG_TYPE(dfh) = msg->msg_type_;
00398 DATA_LEN(dfh) = htons(len);
00399 PKT_NUM(dfh) = htonl(msg->pkt_num_);
00400 RDM_ID(dfh) = htonl(msg->rdm_id_);
00401 NUM_ATTR(dfh) = htons(msg->num_attr_);
00402 SRC_PORT(dfh) = htons(msg->source_port_);
00403
00404 sendPacketToLibrary(out_pkt, sizeof(struct hdr_diff) + len, agent_id);
00405
00406 delete [] out_pkt;
00407 }
00408 #else
00409 void DiffusionCoreAgent::sendMessageToLibrary(Message *msg, u_int16_t agent_id)
00410 {
00411 Message *send_message;
00412 DeviceList::iterator device_itr;
00413 int len;
00414
00415 send_message = CopyMessage(msg);
00416 len = CalculateSize(send_message->msg_attr_vec_);
00417 len = len + sizeof(struct hdr_diff);
00418
00419 for (device_itr = local_out_devices_.begin();
00420 device_itr != local_out_devices_.end(); ++device_itr){
00421 (*device_itr)->sendPacket((DiffPacket) send_message, len, agent_id);
00422 }
00423 }
00424 #endif // !NS_DIFFUSION
00425
00426 #ifndef NS_DIFFUSION
00427 void DiffusionCoreAgent::sendMessageToNetwork(Message *msg)
00428 {
00429 DiffPacket out_pkt = NULL;
00430 struct hdr_diff *dfh;
00431 int len;
00432 char *pos;
00433
00434 out_pkt = AllocateBuffer(msg->msg_attr_vec_);
00435 dfh = HDR_DIFF(out_pkt);
00436
00437 pos = (char *) out_pkt;
00438 pos = pos + sizeof(struct hdr_diff);
00439
00440 len = PackAttrs(msg->msg_attr_vec_, pos);
00441
00442 LAST_HOP(dfh) = htonl(msg->last_hop_);
00443 NEXT_HOP(dfh) = htonl(msg->next_hop_);
00444 DIFF_VER(dfh) = msg->version_;
00445 MSG_TYPE(dfh) = msg->msg_type_;
00446 DATA_LEN(dfh) = htons(len);
00447 PKT_NUM(dfh) = htonl(msg->pkt_num_);
00448 RDM_ID(dfh) = htonl(msg->rdm_id_);
00449 NUM_ATTR(dfh) = htons(msg->num_attr_);
00450 SRC_PORT(dfh) = htons(msg->source_port_);
00451
00452 sendPacketToNetwork(out_pkt, sizeof(struct hdr_diff) + len, msg->next_hop_);
00453
00454 delete [] out_pkt;
00455 }
00456 #else
00457 void DiffusionCoreAgent::sendMessageToNetwork(Message *msg)
00458 {
00459 Message *send_message;
00460 int len;
00461 int32_t dst;
00462 DeviceList::iterator device_itr;
00463
00464 send_message = CopyMessage(msg);
00465 len = CalculateSize(send_message->msg_attr_vec_);
00466 len = len + sizeof(struct hdr_diff);
00467 dst = send_message->next_hop_;
00468
00469 for (device_itr = out_devices_.begin();
00470 device_itr != out_devices_.end(); ++device_itr){
00471 (*device_itr)->sendPacket((DiffPacket) send_message, len, dst);
00472 }
00473 }
00474 #endif // !NS_DIFFUSION
00475
00476 void DiffusionCoreAgent::sendPacketToLibrary(DiffPacket pkt, int len,
00477 u_int16_t dst)
00478 {
00479 DeviceList::iterator device_itr;
00480
00481 for (device_itr = local_out_devices_.begin();
00482 device_itr != local_out_devices_.end(); ++device_itr){
00483 (*device_itr)->sendPacket(pkt, len, dst);
00484 }
00485 }
00486
00487 void DiffusionCoreAgent::sendPacketToNetwork(DiffPacket pkt, int len, int dst)
00488 {
00489 DeviceList::iterator device_itr;
00490
00491 for (device_itr = out_devices_.begin();
00492 device_itr != out_devices_.end(); ++device_itr){
00493 (*device_itr)->sendPacket(pkt, len, dst);
00494 }
00495 }
00496
00497 void DiffusionCoreAgent::updateNeighbors(int id)
00498 {
00499 NeighborList::iterator neighbor_itr;
00500 NeighborEntry *neighbor_entry;
00501
00502 if (id == LOCALHOST_ADDR || id == my_id_)
00503 return;
00504
00505 for (neighbor_itr = neighbor_list_.begin();
00506 neighbor_itr != neighbor_list_.end(); ++neighbor_itr){
00507 if ((*neighbor_itr)->id == id)
00508 break;
00509 }
00510
00511 if (neighbor_itr == neighbor_list_.end()){
00512
00513 neighbor_entry = new NeighborEntry(id);
00514 neighbor_list_.push_front(neighbor_entry);
00515 }
00516 else{
00517
00518 GetTime(&((*neighbor_itr)->tmv));
00519 }
00520 }
00521
00522 FilterEntry * DiffusionCoreAgent::findFilter(int16_t handle, u_int16_t agent)
00523 {
00524 FilterList::iterator filter_itr;
00525 FilterEntry *filter_entry;
00526
00527 for (filter_itr = filter_list_.begin();
00528 filter_itr != filter_list_.end(); ++filter_itr){
00529 filter_entry = *filter_itr;
00530 if (handle != filter_entry->handle_ || agent != filter_entry->agent_)
00531 continue;
00532
00533
00534 return filter_entry;
00535 }
00536 return NULL;
00537 }
00538
00539 FilterEntry * DiffusionCoreAgent::deleteFilter(int16_t handle, u_int16_t agent)
00540 {
00541 FilterList::iterator filter_itr = filter_list_.begin();
00542 FilterEntry *filter_entry = NULL;
00543
00544 while (filter_itr != filter_list_.end()){
00545 filter_entry = *filter_itr;
00546 if (handle == filter_entry->handle_ && agent == filter_entry->agent_){
00547 filter_list_.erase(filter_itr);
00548 break;
00549 }
00550 filter_entry = NULL;
00551 filter_itr++;
00552 }
00553 return filter_entry;
00554 }
00555
00556 bool DiffusionCoreAgent::addFilter(NRAttrVec *attrs, u_int16_t agent,
00557 int16_t handle, u_int16_t priority)
00558 {
00559 FilterList::iterator filter_itr;
00560 FilterEntry *filter_entry;
00561
00562 filter_itr = filter_list_.begin();
00563 while (filter_itr != filter_list_.end()){
00564 filter_entry = *filter_itr;
00565 if (filter_entry->priority_ == priority)
00566 return false;
00567 filter_itr++;
00568 }
00569
00570 filter_entry = new FilterEntry(handle, priority, agent);
00571
00572
00573 filter_entry->filter_attrs_ = CopyAttrs(attrs);
00574
00575
00576 filter_list_.push_back(filter_entry);
00577
00578 return true;
00579 }
00580
00581 FilterList::iterator DiffusionCoreAgent::findMatchingFilter(NRAttrVec *attrs,
00582 FilterList::iterator filter_itr)
00583 {
00584 FilterEntry *filter_entry;
00585
00586 for (;filter_itr != filter_list_.end(); ++filter_itr){
00587 filter_entry = *filter_itr;
00588
00589 if (OneWayMatch(filter_entry->filter_attrs_, attrs)){
00590
00591 break;
00592 }
00593 }
00594 return filter_itr;
00595 }
00596
00597 bool DiffusionCoreAgent::restoreOriginalHeader(Message *msg)
00598 {
00599 NRAttrVec::iterator attr_itr = msg->msg_attr_vec_->begin();
00600 NRSimpleAttribute<void *> *original_header_attr = NULL;
00601 RedirectMessage *original_hdr;
00602
00603
00604 original_header_attr = OriginalHdrAttr.find_from(msg->msg_attr_vec_,
00605 attr_itr, &attr_itr);
00606 if (!original_header_attr){
00607 DiffPrint(DEBUG_ALWAYS, "Error: DiffusionCoreAgent::ProcessControlMessage couldn't find the OriginalHdrAttr !\n");
00608 return false;
00609 }
00610
00611
00612 original_hdr = (RedirectMessage *) original_header_attr->getVal();
00613
00614 msg->msg_type_ = original_hdr->msg_type_;
00615 msg->source_port_ = original_hdr->source_port_;
00616 msg->pkt_num_ = original_hdr->pkt_num_;
00617 msg->rdm_id_ = original_hdr->rdm_id_;
00618 msg->next_hop_ = original_hdr->next_hop_;
00619 msg->last_hop_ = original_hdr->last_hop_;
00620 msg->new_message_ = original_hdr->new_message_;
00621 msg->num_attr_ = original_hdr->num_attr_;
00622 msg->data_len_ = original_hdr->data_len_;
00623 msg->next_port_ = original_hdr->next_port_;
00624
00625
00626 msg->msg_attr_vec_->erase(attr_itr);
00627 delete original_header_attr;
00628
00629 return true;
00630 }
00631
00632 FilterList * DiffusionCoreAgent::getFilterList(NRAttrVec *attrs)
00633 {
00634 FilterList *matching_filter_list = new FilterList;
00635 FilterList::iterator known_filters_itr, filter_list_itr;
00636 FilterEntry *matching_filter_entry, *filter_entry;
00637
00638
00639
00640
00641 known_filters_itr = findMatchingFilter(attrs, filter_list_.begin());
00642
00643 while (known_filters_itr != filter_list_.end()){
00644
00645 matching_filter_entry = *known_filters_itr;
00646
00647 for (filter_list_itr = matching_filter_list->begin();
00648 filter_list_itr != matching_filter_list->end(); ++filter_list_itr){
00649 filter_entry = *filter_list_itr;
00650
00651
00652 if (matching_filter_entry->priority_ > filter_entry->priority_)
00653 break;
00654 }
00655
00656
00657 matching_filter_list->insert(filter_list_itr, matching_filter_entry);
00658
00659
00660 known_filters_itr++;
00661 known_filters_itr = findMatchingFilter(attrs, known_filters_itr);
00662 }
00663 return matching_filter_list;
00664 }
00665
00666 u_int16_t DiffusionCoreAgent::getNextFilterPriority(int16_t handle,
00667 u_int16_t priority,
00668 u_int16_t agent)
00669 {
00670 FilterList::iterator filter_itr;
00671 FilterEntry *filter_entry;
00672
00673 if ((priority < FILTER_MIN_PRIORITY) ||
00674 (priority > FILTER_KEEP_PRIORITY))
00675 return FILTER_INVALID_PRIORITY;
00676
00677 if (priority < FILTER_KEEP_PRIORITY)
00678 return (priority - 1);
00679
00680 filter_itr = filter_list_.begin();
00681
00682 while (filter_itr != filter_list_.end()){
00683 filter_entry = *filter_itr;
00684
00685 if ((filter_entry->handle_ == handle) && (filter_entry->agent_ == agent)){
00686
00687 return (filter_entry->priority_ - 1);
00688 }
00689
00690 filter_itr++;
00691 }
00692
00693 return FILTER_INVALID_PRIORITY;
00694 }
00695
00696 void DiffusionCoreAgent::processMessage(Message *msg)
00697 {
00698 FilterList *filter_list;
00699 FilterList::iterator filter_list_itr;
00700 FilterEntry *filter_entry;
00701
00702 filter_list = getFilterList(msg->msg_attr_vec_);
00703
00704
00705
00706 if (filter_list->size() > 0){
00707 filter_list_itr = filter_list->begin();
00708 filter_entry = *filter_list_itr;
00709
00710 forwardMessage(msg, filter_entry);
00711 filter_list->clear();
00712 }
00713 delete filter_list;
00714 }
00715
00716 void DiffusionCoreAgent::processControlMessage(Message *msg)
00717 {
00718 NRSimpleAttribute<void *> *ctrl_msg_attr = NULL;
00719 NRAttrVec::iterator attr_itr;
00720 ControlMessage *control_blob = NULL;
00721 FilterList *filter_list;
00722 FilterList::iterator filter_list_itr;
00723 FilterEntry *filter_entry;
00724 int command, param1, param2;
00725 u_int16_t priority, source_port, new_priority;
00726 int16_t handle;
00727 bool filter_is_last = false;
00728
00729
00730 if (msg->last_hop_ != LOCALHOST_ADDR){
00731 DiffPrint(DEBUG_ALWAYS,
00732 "Error: Received control message from another node !\n");
00733 return;
00734 }
00735
00736
00737 attr_itr = msg->msg_attr_vec_->begin();
00738 ctrl_msg_attr = ControlMsgAttr.find_from(msg->msg_attr_vec_,
00739 attr_itr, &attr_itr);
00740
00741 if (!ctrl_msg_attr){
00742
00743 DiffPrint(DEBUG_ALWAYS, "Error: Control message received is invalid !\n");
00744 return;
00745 }
00746
00747
00748 control_blob = (ControlMessage *) ctrl_msg_attr->getVal();
00749 command = control_blob->command_;
00750 param1 = control_blob->param1_;
00751 param2 = control_blob->param2_;
00752
00753
00754
00755
00756
00757
00758
00759
00760
00761
00762
00763
00764
00765
00766
00767
00768
00769
00770
00771
00772
00773
00774
00775
00776
00777
00778
00779
00780
00781
00782
00783
00784
00785
00786
00787
00788
00789
00790
00791 logControlMessage(msg, command, param1, param2);
00792
00793
00794 msg->msg_attr_vec_->erase(attr_itr);
00795 delete ctrl_msg_attr;
00796
00797 switch(command){
00798 case ADD_UPDATE_FILTER:
00799
00800 priority = param1;
00801 handle = param2;
00802
00803 filter_entry = findFilter(handle, msg->source_port_);
00804
00805 if (filter_entry){
00806
00807 if (PerfectMatch(filter_entry->filter_attrs_, msg->msg_attr_vec_)){
00808
00809 GetTime(&(filter_entry->tmv_));
00810
00811
00812 if (priority == filter_entry->priority_){
00813
00814 DiffPrint(DEBUG_SOME_DETAILS, "Filter %d, %d, %d refreshed.\n",
00815 filter_entry->agent_, filter_entry->handle_,
00816 filter_entry->priority_);
00817 }
00818 else{
00819
00820 DiffPrint(DEBUG_NO_DETAILS,
00821 "Updated priority of filter %d, %d, %d to %d\n",
00822 msg->source_port_, handle, filter_entry->priority_, priority);
00823 filter_entry->priority_ = priority;
00824 }
00825
00826 break;
00827 }
00828 else{
00829
00830 DiffPrint(DEBUG_ALWAYS,
00831 "Filter attributes cannot change during an update !\n");
00832 break;
00833 }
00834 }
00835 else{
00836
00837 if (!addFilter(msg->msg_attr_vec_, msg->source_port_, handle, priority)){
00838 DiffPrint(DEBUG_ALWAYS, "Failed to add filter %d, %d, %d\n",
00839 msg->source_port_, handle, priority);
00840 }
00841 else{
00842 DiffPrint(DEBUG_NO_DETAILS, "Adding filter %d, %d, %d\n",
00843 msg->source_port_, handle, priority);
00844 }
00845 }
00846
00847 break;
00848
00849 case REMOVE_FILTER:
00850
00851 handle = param1;
00852 filter_entry = deleteFilter(handle, msg->source_port_);
00853 if (filter_entry){
00854
00855 DiffPrint(DEBUG_NO_DETAILS, "Filter %d, %d, %d deleted.\n",
00856 filter_entry->agent_, filter_entry->handle_,
00857 filter_entry->priority_);
00858
00859 delete filter_entry;
00860 }
00861 else{
00862 DiffPrint(DEBUG_ALWAYS, "Couldn't find filter to delete !\n");
00863 }
00864
00865 break;
00866
00867 case SEND_MESSAGE:
00868
00869 handle = param1;
00870 priority = param2;
00871 source_port = msg->source_port_;
00872
00873 if (!restoreOriginalHeader(msg))
00874 break;
00875
00876 new_priority = getNextFilterPriority(handle, priority, source_port);
00877
00878 if (new_priority == FILTER_INVALID_PRIORITY)
00879 break;
00880
00881
00882 filter_list = getFilterList(msg->msg_attr_vec_);
00883
00884
00885 if (filter_list->size() > 0){
00886 for (filter_list_itr = filter_list->begin();
00887 filter_list_itr != filter_list->end(); ++filter_list_itr){
00888 filter_entry = *filter_list_itr;
00889 if (filter_entry->priority_ <= new_priority){
00890 forwardMessage(msg, filter_entry);
00891 break;
00892 }
00893 }
00894
00895 if (filter_list_itr == filter_list->end())
00896 filter_is_last = true;
00897
00898 }
00899 else{
00900 filter_is_last = true;
00901 }
00902
00903 if (filter_is_last){
00904
00905 sendMessage(msg);
00906 }
00907
00908 filter_list->clear();
00909
00910 delete filter_list;
00911
00912 break;
00913
00914 case ADD_TO_BLACKLIST:
00915
00916 DiffPrint(DEBUG_IMPORTANT, "Diffusion: Adding node %d to blacklist !\n",
00917 param1);
00918 black_list_.push_front(param1);
00919
00920 break;
00921
00922 case CLEAR_BLACKLIST:
00923
00924 DiffPrint(DEBUG_IMPORTANT, "Diffusion: Clearing blacklist !\n");
00925
00926 black_list_.clear();
00927
00928 break;
00929
00930 default:
00931
00932 DiffPrint(DEBUG_ALWAYS, "Error: Unknown control message received !\n");
00933
00934 break;
00935 }
00936 }
00937
00938 void DiffusionCoreAgent::logControlMessage(Message *msg, int command,
00939 int param1, int param2)
00940 {
00941
00942 }
00943
00944 #ifdef NS_DIFFUSION
00945 DiffusionCoreAgent::DiffusionCoreAgent(DiffRoutingAgent *diffrtg, int nodeid)
00946 {
00947 #else
00948 DiffusionCoreAgent::DiffusionCoreAgent(int argc, char **argv)
00949 {
00950 int opt;
00951 int debug_level;
00952 #endif // NS_DIFFUSION
00953 DeviceList *in_devices, *out_devices, *local_out_devices;
00954 DiffusionIO *device;
00955 TimerCallback *callback;
00956 char *node_id_env;
00957 #ifdef USE_EMSIM
00958 char *sim_id = getenv("SIM_ID");
00959 char *sim_group = getenv("SIM_GROUP");
00960 int32_t group_id;
00961 #endif // USE_EMSIM
00962 long stop_time;
00963 struct timeval tv;
00964 #ifdef IO_LOG
00965 IOLog *pseudo_io_device;
00966 bool use_io_log = false;
00967 #endif // IO_LOG
00968 #ifdef STATS
00969 bool use_io_stats = true;
00970 int stats_warm_up_time = 0;
00971 #endif // STATS
00972
00973
00974 opterr = 0;
00975 config_file_ = NULL;
00976 stop_time = 0;
00977
00978 node_id_env = getenv("node_addr");
00979 diffusion_port_ = DEFAULT_DIFFUSION_PORT;
00980
00981 #ifndef NS_DIFFUSION
00982
00983 while (1){
00984 opt = getopt(argc, argv, COMMAND_LINE_ARGS);
00985
00986 switch(opt){
00987
00988 case 'p':
00989
00990 diffusion_port_ = (u_int16_t) atoi(optarg);
00991 if ((diffusion_port_ < 1024) || (diffusion_port_ >= 65535)){
00992 DiffPrint(DEBUG_ALWAYS,
00993 "Diffusion Error: Port must be between 1024 and 65535 !\n");
00994 exit(-1);
00995 }
00996
00997 break;
00998
00999 case 't':
01000
01001 stop_time = atol(optarg);
01002 if (stop_time <= 0){
01003 DiffPrint(DEBUG_ALWAYS, "Diffusion Error: stop time must be > 0\n");
01004 exit(-1);
01005 }
01006 else{
01007 DiffPrint(DEBUG_ALWAYS, "%s will stop after %ld seconds\n",
01008 PROGRAM, stop_time);
01009 }
01010
01011 break;
01012
01013 #ifdef IO_LOG
01014 case 'l':
01015
01016 use_io_log = true;
01017
01018 break;
01019
01020 #endif // IO_LOG
01021
01022 #ifdef STATS
01023 case 's':
01024
01025 use_io_stats = false;
01026
01027 break;
01028
01029 case 'i':
01030
01031 stats_warm_up_time = atoi(optarg);
01032 if (stats_warm_up_time < 0){
01033 DiffPrint(DEBUG_ALWAYS, "Diffusion Error: warm_up_time must be > 0\n");
01034 exit(-1);
01035 }
01036
01037 break;
01038 #endif // STATS
01039
01040 case 'h':
01041
01042 usage(argv[0]);
01043
01044 break;
01045
01046 case 'v':
01047
01048 DiffPrint(DEBUG_ALWAYS, "\n%s %s\n", PROGRAM, RELEASE);
01049 exit(0);
01050
01051 break;
01052
01053 case 'd':
01054
01055 debug_level = atoi(optarg);
01056
01057 if (debug_level < 1 || debug_level > 10){
01058 DiffPrint(DEBUG_ALWAYS,
01059 "Error: Debug level outside range or missing !\n");
01060 usage(argv[0]);
01061 }
01062
01063 global_debug_level = debug_level;
01064
01065 break;
01066
01067 case 'f':
01068
01069 if (!strncasecmp(optarg, "-", 1)){
01070 DiffPrint(DEBUG_ALWAYS, "Error: Parameter is missing !\n");
01071 usage(argv[0]);
01072 }
01073
01074 config_file_ = strdup(optarg);
01075
01076 break;
01077
01078 case '?':
01079
01080 DiffPrint(DEBUG_ALWAYS,
01081 "Error: %c isn't a valid option or its parameter is missing !\n",
01082 optopt);
01083 usage(argv[0]);
01084
01085 break;
01086
01087 case ':':
01088
01089 DiffPrint(DEBUG_ALWAYS, "Parameter missing !\n");
01090 usage(argv[0]);
01091
01092 break;
01093 }
01094
01095 if (opt == -1)
01096 break;
01097 }
01098
01099 if (!config_file_)
01100 config_file_ = strdup(DEFAULT_CONFIG_FILE);
01101
01102
01103 if (!node_id_configured){
01104
01105 if (node_id_env != NULL){
01106 my_id_ = atoi(node_id_env);
01107 node_id_configured = true;
01108 }
01109 }
01110
01111 #ifdef USE_EMSIM
01112 if (!node_id_configured){
01113
01114 if (sim_id && sim_group){
01115 my_id_ = atoi(sim_id);
01116 group_id = atoi(sim_group);
01117 diffusion_port_ = diffusion_port_ + my_id_ + (100 * group_id);
01118 node_id_configured = true;
01119 }
01120 }
01121 #endif // USE_EMSIM
01122
01123
01124 if (!node_id_configured){
01125 DiffPrint(DEBUG_ALWAYS, "Diffusion : node_addr not set. Using random id.\n");
01126
01127
01128 do{
01129 GetTime(&tv);
01130 SetSeed(&tv);
01131 my_id_ = GetRand();
01132 }
01133 while(my_id_ == LOCALHOST_ADDR || my_id_ == BROADCAST_ADDR);
01134 }
01135
01136 #else
01137 my_id_ = nodeid;
01138 #endif // !NS_DIFFUSION
01139
01140
01141 lon_ = 0.0;
01142 lat_ = 0.0;
01143
01144 #ifdef STATS
01145 if (use_io_stats)
01146 stats_ = new DiffusionStats(my_id_, stats_warm_up_time);
01147 else
01148 stats_ = NULL;
01149 #endif // STATS
01150
01151 GetTime(&tv);
01152 SetSeed(&tv);
01153 pkt_count_ = GetRand();
01154 random_id_ = GetRand();
01155
01156 Tcl_InitHashTable(&htable_, 2);
01157
01158
01159 timers_manager_ = new TimerManager;
01160
01161
01162 callback = new NeighborsTimeoutTimer(this);
01163 timers_manager_->addTimer(NEIGHBORS_DELAY, callback);
01164
01165 callback = new FilterTimeoutTimer(this);
01166 timers_manager_->addTimer(FILTER_DELAY, callback);
01167
01168 if (stop_time > 0){
01169 callback = new DiffusionStopTimer(this);
01170 timers_manager_->addTimer((stop_time * 1000), callback);
01171 }
01172
01173 GetTime(&tv);
01174
01175
01176 DiffPrint(DEBUG_ALWAYS, "Diffusion : starting at time %ld:%ld\n",
01177 tv.tv_sec, tv.tv_usec);
01178 DiffPrint(DEBUG_ALWAYS, "Diffusion : Node id = %d\n", my_id_);
01179
01180
01181 #ifdef IO_LOG
01182 if (use_io_log){
01183 pseudo_io_device = new IOLog(my_id_);
01184 in_devices_.push_back(pseudo_io_device);
01185 out_devices_.push_back(pseudo_io_device);
01186
01187 in_devices = &(pseudo_io_device->in_devices_);
01188 out_devices = &(pseudo_io_device->out_devices_);
01189 local_out_devices = &(local_out_devices_);
01190 }
01191 else{
01192 in_devices = &(in_devices_);
01193 out_devices = &(out_devices_);
01194 local_out_devices = &(local_out_devices_);
01195 }
01196 #else
01197 in_devices = &(in_devices_);
01198 out_devices = &(out_devices_);
01199 local_out_devices = &(local_out_devices_);
01200 #endif // IO_LOG
01201
01202 #ifdef NS_DIFFUSION
01203 device = new LocalApp(diffrtg);
01204 local_out_devices->push_back(device);
01205
01206 device = new LinkLayerAbs(diffrtg);
01207 out_devices->push_back(device);
01208 #endif // NS_DIFFUSION
01209
01210 #ifdef UDP
01211 device = new UDPLocal(&diffusion_port_);
01212 in_devices->push_back(device);
01213 local_out_devices->push_back(device);
01214
01215 #ifdef WIRED
01216 device = new UDPWired(config_file_);
01217 out_devices->push_back(device);
01218 #endif // WIRED
01219 #endif // UDP
01220
01221 #ifdef USE_RPC
01222 device = new RPCIO();
01223 in_devices->push_back(device);
01224 out_devices->push_back(device);
01225 #endif // USE_RPC
01226
01227 #ifdef USE_MOTE_NIC
01228 device = new MOTEIO();
01229 in_devices->push_back(device);
01230 out_devices->push_back(device);
01231 #endif // USE_MOTE_NIC
01232
01233 #ifdef USE_SMAC
01234 device = new SMAC();
01235 in_devices->push_back(device);
01236 out_devices->push_back(device);
01237 #endif // USE_SMAC
01238
01239 #ifdef USE_EMSTAR
01240 #ifdef USE_EMSIM
01241 device = new Emstar(my_id_, group_id, true);
01242 #else
01243 device = new Emstar();
01244 #endif // USE_EMSIM
01245 in_devices->push_back(device);
01246 out_devices->push_back(device);
01247 #endif // USE_EMSTAR
01248
01249 #ifdef USE_WINSNG2
01250 device = new WINSNG2();
01251 in_devices->push_back(device);
01252 out_devices->push_back(device);
01253 #endif // USE_WINSNG2
01254 }
01255
01256 HashEntry * DiffusionCoreAgent::getHash(unsigned int pkt_num,
01257 unsigned int rdm_id)
01258 {
01259 unsigned int key[2];
01260
01261 key[0] = pkt_num;
01262 key[1] = rdm_id;
01263
01264 Tcl_HashEntry *entryPtr = Tcl_FindHashEntry(&htable_, (char *)key);
01265
01266 if (entryPtr == NULL)
01267 return NULL;
01268
01269 return (HashEntry *)Tcl_GetHashValue(entryPtr);
01270 }
01271
01272 void DiffusionCoreAgent::putHash(unsigned int pkt_num,
01273 unsigned int rdm_id)
01274 {
01275 Tcl_HashEntry *tcl_hash_entry;
01276 HashEntry *hash_entry;
01277 HashList::iterator hash_itr;
01278 unsigned int key[2];
01279 int new_hash_key;
01280
01281 if (hash_list_.size() == HASH_TABLE_MAX_SIZE){
01282
01283
01284 for (int i = 0; ((i < HASH_TABLE_REMOVE_AT_ONCE)
01285 && (hash_list_.size() > 0)); i++){
01286 hash_itr = hash_list_.begin();
01287 tcl_hash_entry = *hash_itr;
01288 hash_entry = (HashEntry *) Tcl_GetHashValue(tcl_hash_entry);
01289 delete hash_entry;
01290 hash_list_.erase(hash_itr);
01291 Tcl_DeleteHashEntry(tcl_hash_entry);
01292 }
01293 }
01294
01295 key[0] = pkt_num;
01296 key[1] = rdm_id;
01297
01298 tcl_hash_entry = Tcl_CreateHashEntry(&htable_, (char *)key, &new_hash_key);
01299
01300 if (new_hash_key == 0){
01301 DiffPrint(DEBUG_IMPORTANT, "Key already exists in hash !\n");
01302 return;
01303 }
01304
01305 hash_entry = new HashEntry;
01306
01307 Tcl_SetHashValue(tcl_hash_entry, hash_entry);
01308 hash_list_.push_back(tcl_hash_entry);
01309 }
01310
01311 #ifndef NS_DIFFUSION
01312 void DiffusionCoreAgent::recvPacket(DiffPacket pkt)
01313 {
01314 struct hdr_diff *dfh = HDR_DIFF(pkt);
01315 Message *rcv_message = NULL;
01316 int8_t version, msg_type;
01317 u_int16_t data_len, num_attr, source_port;
01318 int32_t rdm_id, pkt_num, next_hop, last_hop;
01319
01320
01321 version = DIFF_VER(dfh);
01322 msg_type = MSG_TYPE(dfh);
01323 source_port = ntohs(SRC_PORT(dfh));
01324 pkt_num = ntohl(PKT_NUM(dfh));
01325 rdm_id = ntohl(RDM_ID(dfh));
01326 num_attr = ntohs(NUM_ATTR(dfh));
01327 next_hop = ntohl(NEXT_HOP(dfh));
01328 last_hop = ntohl(LAST_HOP(dfh));
01329 data_len = ntohs(DATA_LEN(dfh));
01330
01331
01332 rcv_message = new Message(version, msg_type, source_port, data_len,
01333 num_attr, pkt_num, rdm_id, next_hop, last_hop);
01334
01335
01336 rcv_message->msg_attr_vec_ = UnpackAttrs(pkt, num_attr);
01337
01338
01339 recvMessage(rcv_message);
01340
01341
01342 delete rcv_message;
01343
01344 delete [] pkt;
01345 }
01346 #endif // !NS_DIFFUSION
01347
01348 void DiffusionCoreAgent::recvMessage(Message *msg)
01349 {
01350 BlackList::iterator black_list_itr;
01351 Tcl_HashEntry *tcl_hash_entry;
01352 unsigned int key[2];
01353
01354
01355 if (msg->version_ != DIFFUSION_VERSION)
01356 return;
01357
01358
01359 if (msg->last_hop_ == my_id_){
01360 DiffPrint(DEBUG_ALWAYS, "Error: A diffusion ID conflict has been detected !\n");
01361 exit(-1);
01362 }
01363
01364
01365 if ((msg->next_hop_ != BROADCAST_ADDR) &&
01366 (msg->next_hop_ != LOCALHOST_ADDR) &&
01367 (msg->next_hop_ != my_id_))
01368 return;
01369
01370
01371 black_list_itr = black_list_.begin();
01372 while (black_list_itr != black_list_.end()){
01373 if (*black_list_itr == msg->last_hop_){
01374 DiffPrint(DEBUG_DETAILS, "Ignoring message from blacklisted node %d !\n",
01375 msg->last_hop_);
01376 return;
01377 }
01378 black_list_itr++;
01379 }
01380
01381
01382 if (msg->msg_type_ != CONTROL){
01383
01384
01385 key[0] = msg->pkt_num_;
01386 key[1] = msg->rdm_id_;
01387 tcl_hash_entry = Tcl_FindHashEntry(&htable_, (char *) key);
01388
01389 if (tcl_hash_entry != NULL){
01390 DiffPrint(DEBUG_DETAILS, "Node%d: Received old message !\n", my_id_);
01391 msg->new_message_ = 0;
01392 }
01393 else{
01394
01395 putHash(key[0], key[1]);
01396 msg->new_message_ = 1;
01397 }
01398 }
01399
01400 #ifdef STATS
01401 if (stats_)
01402 stats_->logIncomingMessage(msg);
01403 #endif // STATS
01404
01405
01406 if (msg->msg_type_ == CONTROL)
01407 processControlMessage(msg);
01408 else
01409 processMessage(msg);
01410 }
01411
01412 #ifndef USE_SINGLE_ADDRESS_SPACE
01413 int main(int argc, char **argv)
01414 {
01415 agent = new DiffusionCoreAgent(argc, argv);
01416
01417 signal(SIGINT, signal_handler);
01418
01419 agent->run();
01420
01421 return 0;
01422 }
01423 #endif // !USE_SINGLE_ADDRESS_SPACE