00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040
00041
00042
00043
00044 #include <stdlib.h>
00045 #include <stdio.h>
00046
00047 #include "dr.hh"
00048
00049 class CallbackEntry {
00050 public:
00051 NR::Callback *cb_;
00052 NR::handle subscription_handle_;
00053
00054 CallbackEntry(NR::Callback *cb, NR::handle subscription_handle) :
00055 cb_(cb), subscription_handle_(subscription_handle) {};
00056 };
00057
00058 class HandleEntry {
00059 public:
00060 handle hdl_;
00061 bool valid_;
00062 NRAttrVec *attrs_;
00063 NR::Callback *cb_;
00064 struct timeval exploratory_time_;
00065 int32_t subscription_id_;
00066
00067 HandleEntry()
00068 {
00069 GetTime(&exploratory_time_);
00070 valid_ = true;
00071 cb_ = NULL;
00072 };
00073
00074 ~HandleEntry(){
00075
00076 ClearAttrs(attrs_);
00077 delete attrs_;
00078 };
00079 };
00080
00081 int InterestCallback::expire()
00082 {
00083 int retval;
00084
00085
00086 retval = drt_->interestTimeout(handle_entry_);
00087
00088 if (retval < 0)
00089 delete this;
00090
00091 return retval;
00092 }
00093
00094 int FilterKeepaliveCallback::expire()
00095 {
00096 int retval;
00097
00098
00099 retval = drt_->filterKeepaliveTimeout(filter_entry_);
00100
00101 if (retval < 0)
00102 delete this;
00103
00104 return retval;
00105 }
00106
00107 int OldAPITimer::expire()
00108 {
00109 int retval;
00110
00111
00112 retval = cb_->expire(0, p_);
00113
00114 if (retval < 0)
00115 delete this;
00116
00117 return retval;
00118 }
00119
00120 #ifdef NS_DIFFUSION
00121 class DiffEventQueue;
00122
00123 int DiffusionRouting::getNodeId() {
00124 return node_->address();
00125 }
00126
00127 int DiffusionRouting::getAgentId(int id) {
00128 if (id != -1)
00129 agent_id_ = id;
00130 return agent_id_;
00131 }
00132
00133 NR * NR::create_ns_NR(u_int16_t port, DiffAppAgent *da) {
00134 return(new DiffusionRouting(port, da));
00135 }
00136 #else
00137 NR *dr = NULL;
00138
00139 #ifdef USE_THREADS
00140 void * ReceiveThread(void *dr)
00141 {
00142
00143 ((DiffusionRouting *)dr)->run(true, WAIT_FOREVER);
00144
00145 return NULL;
00146 }
00147 #endif // USE_THREADS
00148
00149 NR * NR::createNR(u_int16_t port)
00150 {
00151
00152 if (dr)
00153 return dr;
00154
00155 dr = new DiffusionRouting(port);
00156
00157 #ifdef USE_THREADS
00158 int retval;
00159 pthread_t thread;
00160
00161
00162 retval = pthread_create(&thread, NULL, &ReceiveThread, (void *)dr);
00163
00164 if (retval){
00165 DiffPrint(DEBUG_ALWAYS, "Error creating receiving thread ! Aborting...\n");
00166 exit(-1);
00167 }
00168 #endif // USE_THREADS
00169
00170 return dr;
00171 }
00172 #endif // NS_DIFFUSION
00173
00174 void GetLock(pthread_mutex_t *mutex)
00175 {
00176 #ifdef USE_THREADS
00177 pthread_mutex_lock(mutex);
00178 #endif // USE_THREADS
00179 }
00180
00181 void ReleaseLock(pthread_mutex_t *mutex)
00182 {
00183 #ifdef USE_THREADS
00184 pthread_mutex_unlock(mutex);
00185 #endif // USE_THREADS
00186 }
00187
00188 #ifdef NS_DIFFUSION
00189 DiffusionRouting::DiffusionRouting(u_int16_t port, DiffAppAgent *da)
00190 {
00191 #else
00192 DiffusionRouting::DiffusionRouting(u_int16_t port)
00193 {
00194 #ifdef USE_EMSIM
00195 char *sim_id;
00196 char *sim_group;
00197 #endif // USE_EMSIM
00198 #endif // NS_DIFFUSION
00199
00200 struct timeval tv;
00201 DiffusionIO *device;
00202
00203
00204 next_handle_ = 1;
00205 GetTime(&tv);
00206 SetSeed(&tv);
00207 pkt_count_ = GetRand();
00208 random_id_ = GetRand();
00209 agent_id_ = 0;
00210
00211 if (port == 0)
00212 port = DEFAULT_DIFFUSION_PORT;
00213
00214 diffusion_port_ = port;
00215
00216 #ifdef USE_EMSIM
00217
00218 sim_id = getenv("SIM_ID");
00219 sim_group = getenv("SIM_GROUP");
00220
00221
00222 if (sim_id && sim_group){
00223 diffusion_port_ = diffusion_port_ + atoi(sim_id) + (100 * atoi(sim_group));
00224 }
00225 #endif // USE_EMSIM
00226
00227
00228 timers_manager_ = new TimerManager;
00229
00230
00231 #ifdef NS_DIFFUSION
00232 device = new NsLocal(da);
00233 local_out_devices_.push_back(device);
00234 #endif // NS_DIFFUSION
00235
00236 #ifdef UDP
00237 device = new UDPLocal(&agent_id_);
00238 in_devices_.push_back(device);
00239 local_out_devices_.push_back(device);
00240 #endif // UDP
00241
00242
00243 DiffPrint(DEBUG_ALWAYS,
00244 "Diffusion Routing Agent initializing... Agent Id = %d\n",
00245 agent_id_);
00246
00247 #ifdef USE_THREADS
00248
00249 dr_mtx_ = new pthread_mutex_t;
00250 pthread_mutex_init(dr_mtx_, NULL);
00251 #endif // USE_THREADS
00252 }
00253
00254 DiffusionRouting::~DiffusionRouting()
00255 {
00256 HandleList::iterator itr;
00257 HandleEntry *current;
00258
00259
00260 for (itr = sub_list_.begin(); itr != sub_list_.end(); ++itr){
00261 current = *itr;
00262 delete current;
00263 }
00264
00265 for (itr = pub_list_.begin(); itr != pub_list_.end(); ++itr){
00266 current = *itr;
00267 delete current;
00268 }
00269 }
00270
00271 handle DiffusionRouting::subscribe(NRAttrVec *subscribe_attrs, NR::Callback *cb)
00272 {
00273 NRSimpleAttribute<int> *nr_algorithm = NULL;
00274 TimerCallback *timer_callback;
00275 NRAttribute *scope_attr;
00276 HandleEntry *my_handle;
00277
00278
00279 GetLock(dr_mtx_);
00280
00281
00282 if (!checkSubscription(subscribe_attrs)){
00283 DiffPrint(DEBUG_ALWAYS, "Error : Invalid class/scope attributes in the subscribe attributes !\n");
00284 ReleaseLock(dr_mtx_);
00285 return FAIL;
00286 }
00287
00288
00289 my_handle = new HandleEntry;
00290 my_handle->hdl_ = next_handle_;
00291 next_handle_++;
00292 my_handle->cb_ = (NR::Callback *) cb;
00293 sub_list_.push_back(my_handle);
00294
00295
00296 my_handle->attrs_ = CopyAttrs(subscribe_attrs);
00297
00298
00299 if (!hasScope(subscribe_attrs)){
00300 scope_attr = NRScopeAttr.make(NRAttribute::IS, NRAttribute::GLOBAL_SCOPE);
00301 my_handle->attrs_->push_back(scope_attr);
00302 }
00303
00304
00305 nr_algorithm = NRAlgorithmAttr.find(subscribe_attrs);
00306 if (nr_algorithm &&
00307 nr_algorithm->getVal() == NRAttribute::ONE_PHASE_PULL_ALGORITHM){
00308
00309 my_handle->subscription_id_ = GetRand();
00310 my_handle->attrs_->push_back(NRSubscriptionAttr.make(NRAttribute::IS,
00311 my_handle->subscription_id_));
00312 }
00313
00314
00315 timer_callback = new InterestCallback(this, my_handle);
00316 timers_manager_->addTimer(SMALL_TIMEOUT, timer_callback);
00317
00318
00319 ReleaseLock(dr_mtx_);
00320
00321 return my_handle->hdl_;
00322 }
00323
00324 int DiffusionRouting::unsubscribe(handle subscription_handle)
00325 {
00326 HandleEntry *my_handle = NULL;
00327
00328
00329 GetLock(dr_mtx_);
00330
00331 my_handle = findHandle(subscription_handle, &sub_list_);
00332 if (!my_handle){
00333
00334 ReleaseLock(dr_mtx_);
00335 return FAIL;
00336 }
00337
00338
00339 my_handle->valid_ = false;
00340
00341
00342 ReleaseLock(dr_mtx_);
00343
00344 return OK;
00345 }
00346
00347 handle DiffusionRouting::publish(NRAttrVec *publish_attrs)
00348 {
00349 HandleEntry *my_handle;
00350 NRAttribute *scope_attr;
00351
00352
00353 GetLock(dr_mtx_);
00354
00355
00356 if (!checkPublication(publish_attrs)){
00357 DiffPrint(DEBUG_ALWAYS, "Error : Invalid class/scope attributes in the publish attributes !\n");
00358 ReleaseLock(dr_mtx_);
00359 return FAIL;
00360 }
00361
00362
00363 my_handle = new HandleEntry;
00364 my_handle->hdl_ = next_handle_;
00365 next_handle_++;
00366 pub_list_.push_back(my_handle);
00367
00368
00369 my_handle->attrs_ = CopyAttrs(publish_attrs);
00370
00371
00372 if (!hasScope(publish_attrs)){
00373 scope_attr = NRScopeAttr.make(NRAttribute::IS, NRAttribute::NODE_LOCAL_SCOPE);
00374 my_handle->attrs_->push_back(scope_attr);
00375 }
00376
00377
00378 ReleaseLock(dr_mtx_);
00379
00380 return my_handle->hdl_;
00381 }
00382
00383 int DiffusionRouting::unpublish(handle publication_handle)
00384 {
00385 HandleEntry *my_handle = NULL;
00386
00387
00388 GetLock(dr_mtx_);
00389
00390 my_handle = removeHandle(publication_handle, &pub_list_);
00391 if (!my_handle){
00392
00393 ReleaseLock(dr_mtx_);
00394 return FAIL;
00395 }
00396
00397
00398 delete my_handle;
00399
00400
00401 ReleaseLock(dr_mtx_);
00402
00403 return OK;
00404 }
00405
00406 int DiffusionRouting::send(handle publication_handle,
00407 NRAttrVec *send_attrs)
00408 {
00409 NRSimpleAttribute<int> *nr_algorithm = NULL;
00410 NRSimpleAttribute<int> *rmst_id_attr = NULL;
00411 int8_t send_message_type = DATA;
00412 struct timeval current_time;
00413 HandleEntry *my_handle;
00414 Message *my_message;
00415
00416
00417 GetLock(dr_mtx_);
00418
00419
00420 my_handle = findHandle(publication_handle, &pub_list_);
00421 if (!my_handle){
00422 ReleaseLock(dr_mtx_);
00423 return FAIL;
00424 }
00425
00426
00427 if (!checkSend(send_attrs)){
00428 DiffPrint(DEBUG_ALWAYS,
00429 "Error : Invalid class/scope attributes in send attributes !\n");
00430 ReleaseLock(dr_mtx_);
00431 return FAIL;
00432 }
00433
00434
00435 GetTime(¤t_time);
00436
00437
00438 nr_algorithm = NRAlgorithmAttr.find(my_handle->attrs_);
00439 rmst_id_attr = RmstIdAttr.find(send_attrs);
00440
00441 if (!nr_algorithm && !rmst_id_attr || nr_algorithm &&
00442 nr_algorithm->getVal() != NRAttribute::ONE_PHASE_PULL_ALGORITHM){
00443
00444
00445 if (TimevalCmp(¤t_time, &(my_handle->exploratory_time_)) >= 0){
00446
00447
00448 if (isPushData(my_handle->attrs_)){
00449
00450
00451
00452 GetTime(&(my_handle->exploratory_time_));
00453 my_handle->exploratory_time_.tv_sec += PUSH_EXPLORATORY_DELAY;
00454
00455 send_message_type = PUSH_EXPLORATORY_DATA;
00456 }
00457 else{
00458
00459
00460
00461 GetTime(&(my_handle->exploratory_time_));
00462 my_handle->exploratory_time_.tv_sec += EXPLORATORY_DATA_DELAY;
00463
00464 send_message_type = EXPLORATORY_DATA;
00465 }
00466 }
00467 }
00468
00469
00470 my_message = new Message(DIFFUSION_VERSION, send_message_type, agent_id_,
00471 0, 0, pkt_count_, random_id_, LOCALHOST_ADDR,
00472 LOCALHOST_ADDR);
00473
00474 pkt_count_++;
00475
00476
00477 my_message->msg_attr_vec_ = CopyAttrs(my_handle->attrs_);
00478
00479
00480 AddAttrs(my_message->msg_attr_vec_, send_attrs);
00481
00482
00483 my_message->num_attr_ = my_message->msg_attr_vec_->size();
00484 my_message->data_len_ = CalculateSize(my_message->msg_attr_vec_);
00485
00486
00487 ReleaseLock(dr_mtx_);
00488
00489
00490 sendMessageToDiffusion(my_message);
00491
00492 delete my_message;
00493
00494 return OK;
00495 }
00496
00497 int DiffusionRouting::sendRmst(handle publication_handle,
00498 NRAttrVec *send_attrs, int fragment_size)
00499 {
00500 NRSimpleAttribute<void *> *rmst_data_attr;
00501 NRSimpleAttribute<int> *frag_number_attr;
00502 NRSimpleAttribute<int> *max_frag_attr;
00503 void *frag_ptr, *blob_ptr;
00504 char *blob;
00505 timeval send_interval;
00506 int retval;
00507 int id = GetRand() % 500;
00508 int size;
00509 int num_frag;
00510 int max_frag_len;
00511
00512
00513 rmst_data_attr = RmstDataAttr.find(send_attrs);
00514
00515
00516 if(!rmst_data_attr){
00517 DiffPrint(DEBUG_ALWAYS, "sendRMST - can't find blob to send !\n");
00518 return FAIL;
00519 }
00520
00521
00522 blob_ptr = rmst_data_attr->getVal();
00523 size = rmst_data_attr->getLen();
00524 blob = new char[size];
00525 memcpy((void *)blob, blob_ptr, size);
00526 num_frag = (size + fragment_size - 1) / fragment_size;
00527
00528
00529 num_frag--;
00530 max_frag_len = size - (num_frag * fragment_size);
00531 DiffPrint(DEBUG_DETAILS,
00532 "sendRMST: rmst num_frag = %d, fragment_size = %d, max_frag_len = %d\n",
00533 num_frag, fragment_size, max_frag_len);
00534
00535
00536 max_frag_attr = RmstMaxFragAttr.make(NRAttribute::IS, num_frag);
00537 send_attrs->push_back(max_frag_attr);
00538 send_attrs->push_back(RmstTsprtCtlAttr.make(NRAttribute::IS, RMST_RESP));
00539 frag_number_attr = RmstFragAttr.make(NRAttribute::IS, 0);
00540 send_attrs->push_back(frag_number_attr);
00541 send_attrs->push_back(RmstIdAttr.make(NRAttribute::IS, id));
00542
00543
00544 frag_ptr = (void *)&blob[0];
00545
00546
00547 if (num_frag == 0)
00548 rmst_data_attr->setVal(frag_ptr, max_frag_len);
00549 else
00550 rmst_data_attr->setVal(frag_ptr, fragment_size);
00551
00552
00553 retval = send(publication_handle, send_attrs);
00554
00555
00556 for (int i = 1; i <= num_frag; i++){
00557
00558
00559 send_interval.tv_sec = 0;
00560 send_interval.tv_usec = 25000;
00561 select(0, NULL, NULL, NULL, &send_interval);
00562
00563
00564 frag_number_attr->setVal(i);
00565 frag_ptr = (void *)&blob[i * fragment_size];
00566 if (num_frag == i)
00567 rmst_data_attr->setVal(frag_ptr, max_frag_len);
00568 else
00569 rmst_data_attr->setVal(frag_ptr, fragment_size);
00570 retval = send(publication_handle, send_attrs);
00571 }
00572
00573 ClearAttrs(send_attrs);
00574 delete blob;
00575
00576 return OK;
00577 }
00578
00579 int DiffusionRouting::addToBlacklist(int32_t node)
00580 {
00581 ControlMessage *control_blob;
00582 NRAttribute *ctrl_msg_attr;
00583 Message *my_message;
00584 NRAttrVec *attrs;
00585
00586 control_blob = new ControlMessage(ADD_TO_BLACKLIST, node, 0);
00587
00588 ctrl_msg_attr = ControlMsgAttr.make(NRAttribute::IS,
00589 (void *)control_blob,
00590 sizeof(ControlMessage));
00591 attrs = new NRAttrVec;
00592 attrs->push_back(ctrl_msg_attr);
00593
00594 my_message = new Message(DIFFUSION_VERSION, CONTROL, agent_id_, 0,
00595 0, pkt_count_, random_id_, LOCALHOST_ADDR,
00596 LOCALHOST_ADDR);
00597
00598
00599 pkt_count_++;
00600
00601
00602 my_message->msg_attr_vec_ = attrs;
00603 my_message->num_attr_ = attrs->size();
00604 my_message->data_len_ = CalculateSize(attrs);
00605
00606
00607 sendMessageToDiffusion(my_message);
00608
00609
00610 delete my_message;
00611 delete control_blob;
00612
00613 return OK;
00614 }
00615
00616 int DiffusionRouting::clearBlacklist()
00617 {
00618 ControlMessage *control_blob;
00619 NRAttribute *ctrl_msg_attr;
00620 Message *my_message;
00621 NRAttrVec *attrs;
00622
00623 control_blob = new ControlMessage(CLEAR_BLACKLIST, 0, 0);
00624
00625 ctrl_msg_attr = ControlMsgAttr.make(NRAttribute::IS,
00626 (void *)control_blob,
00627 sizeof(ControlMessage));
00628 attrs = new NRAttrVec;
00629 attrs->push_back(ctrl_msg_attr);
00630
00631 my_message = new Message(DIFFUSION_VERSION, CONTROL, agent_id_, 0,
00632 0, pkt_count_, random_id_, LOCALHOST_ADDR,
00633 LOCALHOST_ADDR);
00634
00635
00636 pkt_count_++;
00637
00638
00639 my_message->msg_attr_vec_ = attrs;
00640 my_message->num_attr_ = attrs->size();
00641 my_message->data_len_ = CalculateSize(attrs);
00642
00643
00644 sendMessageToDiffusion(my_message);
00645
00646
00647 delete my_message;
00648 delete control_blob;
00649
00650 return OK;
00651 }
00652
00653 handle DiffusionRouting::addFilter(NRAttrVec *filter_attrs, u_int16_t priority,
00654 FilterCallback *cb)
00655 {
00656 FilterEntry *filter_entry;
00657 NRAttrVec *attrs;
00658 NRAttribute *ctrl_msg_attr;
00659 ControlMessage *control_blob;
00660 Message *my_message;
00661 TimerCallback *timer_callback;
00662
00663
00664 if (!filter_attrs || !cb || priority < FILTER_MIN_PRIORITY || priority > FILTER_MAX_PRIORITY){
00665 DiffPrint(DEBUG_ALWAYS, "Received invalid parameters when adding filter !\n");
00666 return FAIL;
00667 }
00668
00669
00670 GetLock(dr_mtx_);
00671
00672
00673 filter_entry = new FilterEntry(next_handle_, priority, agent_id_);
00674 next_handle_++;
00675 filter_entry->cb_ = (FilterCallback *) cb;
00676 filter_list_.push_back(filter_entry);
00677
00678
00679 filter_entry->filter_attrs_ = CopyAttrs(filter_attrs);
00680
00681
00682 attrs = CopyAttrs(filter_attrs);
00683 control_blob = new ControlMessage(ADD_UPDATE_FILTER,
00684 priority, filter_entry->handle_);
00685
00686 ctrl_msg_attr = ControlMsgAttr.make(NRAttribute::IS,
00687 (void *)control_blob,
00688 sizeof(ControlMessage));
00689
00690 attrs->push_back(ctrl_msg_attr);
00691
00692
00693 my_message = new Message(DIFFUSION_VERSION, CONTROL, agent_id_, 0,
00694 0, pkt_count_, random_id_, LOCALHOST_ADDR,
00695 LOCALHOST_ADDR);
00696
00697
00698 pkt_count_++;
00699
00700
00701 my_message->msg_attr_vec_ = attrs;
00702 my_message->num_attr_ = attrs->size();
00703 my_message->data_len_ = CalculateSize(attrs);
00704
00705
00706 ReleaseLock(dr_mtx_);
00707
00708
00709 sendMessageToDiffusion(my_message);
00710
00711
00712 timer_callback = new FilterKeepaliveCallback(this, filter_entry);
00713 timers_manager_->addTimer(FILTER_KEEPALIVE_DELAY, timer_callback);
00714
00715
00716 delete my_message;
00717 delete control_blob;
00718
00719 return filter_entry->handle_;
00720 }
00721
00722 int DiffusionRouting::removeFilter(handle filter_handle)
00723 {
00724 FilterEntry *filter_entry = NULL;
00725 ControlMessage *control_blob;
00726 NRAttribute *ctrl_msg_attr;
00727 NRAttrVec *attrs;
00728 Message *my_message;
00729
00730
00731 GetLock(dr_mtx_);
00732
00733 filter_entry = findFilter(filter_handle);
00734 if (!filter_entry){
00735
00736 ReleaseLock(dr_mtx_);
00737 return FAIL;
00738 }
00739
00740 control_blob = new ControlMessage(REMOVE_FILTER, filter_entry->handle_, 0);
00741
00742 ctrl_msg_attr = ControlMsgAttr.make(NRAttribute::IS,
00743 (void *)control_blob,
00744 sizeof(ControlMessage));
00745
00746 attrs = new NRAttrVec;
00747 attrs->push_back(ctrl_msg_attr);
00748
00749 my_message = new Message(DIFFUSION_VERSION, CONTROL, agent_id_, 0,
00750 0, pkt_count_, random_id_, LOCALHOST_ADDR,
00751 LOCALHOST_ADDR);
00752
00753
00754 pkt_count_++;
00755
00756
00757 my_message->msg_attr_vec_ = attrs;
00758 my_message->num_attr_ = attrs->size();
00759 my_message->data_len_ = CalculateSize(attrs);
00760
00761
00762 filter_entry->valid_ = false;
00763
00764
00765 sendMessageToDiffusion(my_message);
00766
00767
00768 ReleaseLock(dr_mtx_);
00769
00770
00771 delete my_message;
00772 delete control_blob;
00773
00774 return OK;
00775 }
00776
00777 handle DiffusionRouting::addTimer(int timeout, TimerCallback *callback)
00778 {
00779 return (timers_manager_->addTimer(timeout, callback));
00780 }
00781
00782 handle DiffusionRouting::addTimer(int timeout, void *p, TimerCallbacks *cb)
00783 {
00784 TimerCallback *callback;
00785
00786 callback = new OldAPITimer(cb, p);
00787
00788 return (addTimer(timeout, callback));
00789 }
00790
00791 bool DiffusionRouting::removeTimer(handle hdl)
00792 {
00793 return (timers_manager_->removeTimer(hdl));
00794 }
00795
00796 int DiffusionRouting::filterKeepaliveTimeout(FilterEntry *filter_entry)
00797 {
00798 FilterEntry *my_entry = NULL;
00799 ControlMessage *control_blob;
00800 NRAttribute *ctrl_msg_attr;
00801 NRAttrVec *attrs;
00802 Message *my_message;
00803
00804
00805 GetLock(dr_mtx_);
00806
00807 if (filter_entry->valid_){
00808
00809 control_blob = new ControlMessage(ADD_UPDATE_FILTER,
00810 filter_entry->priority_,
00811 filter_entry->handle_);
00812
00813 ctrl_msg_attr = ControlMsgAttr.make(NRAttribute::IS,
00814 (void *)control_blob,
00815 sizeof(ControlMessage));
00816
00817 attrs = CopyAttrs(filter_entry->filter_attrs_);
00818 attrs->push_back(ctrl_msg_attr);
00819
00820 my_message = new Message(DIFFUSION_VERSION, CONTROL, agent_id_, 0,
00821 0, pkt_count_, random_id_, LOCALHOST_ADDR,
00822 LOCALHOST_ADDR);
00823
00824
00825 pkt_count_++;
00826
00827
00828 my_message->msg_attr_vec_ = attrs;
00829 my_message->num_attr_ = attrs->size();
00830 my_message->data_len_ = CalculateSize(attrs);
00831
00832
00833 sendMessageToDiffusion(my_message);
00834
00835 delete my_message;
00836 delete control_blob;
00837
00838
00839 ReleaseLock(dr_mtx_);
00840
00841
00842 return (FILTER_KEEPALIVE_DELAY);
00843 }
00844 else{
00845
00846 my_entry = deleteFilter(filter_entry->handle_);
00847
00848
00849 if (my_entry != filter_entry){
00850 DiffPrint(DEBUG_ALWAYS, "DiffusionRouting::KeepaliveTimeout: Handles should match !\n");
00851 exit(-1);
00852 }
00853
00854 delete my_entry;
00855
00856
00857 ReleaseLock(dr_mtx_);
00858
00859 return -1;
00860 }
00861 }
00862
00863 int DiffusionRouting::interestTimeout(HandleEntry *handle_entry)
00864 {
00865 HandleEntry *my_handle = NULL;
00866 Message *my_message;
00867
00868
00869 GetLock(dr_mtx_);
00870
00871 if (handle_entry->valid_){
00872
00873 my_message = new Message(DIFFUSION_VERSION, INTEREST, agent_id_, 0,
00874 0, pkt_count_, random_id_, LOCALHOST_ADDR,
00875 LOCALHOST_ADDR);
00876
00877
00878 pkt_count_++;
00879
00880
00881 my_message->msg_attr_vec_ = CopyAttrs(handle_entry->attrs_);
00882 my_message->num_attr_ = handle_entry->attrs_->size();
00883 my_message->data_len_ = CalculateSize(handle_entry->attrs_);
00884
00885
00886 sendMessageToDiffusion(my_message);
00887
00888 delete my_message;
00889
00890
00891 ReleaseLock(dr_mtx_);
00892
00893
00894 return (int) (floor(-1 * (log(1 - (GetRand() * 1.0 / RAND_MAX))) /
00895 INTEREST_LAMBDA));
00896 }
00897 else{
00898
00899 my_handle = removeHandle(handle_entry->hdl_, &sub_list_);
00900
00901
00902 if (my_handle != handle_entry){
00903 DiffPrint(DEBUG_ALWAYS,
00904 "Error: interestTimeout: Handles should match !\n");
00905 exit(-1);
00906 }
00907
00908 delete my_handle;
00909
00910
00911 ReleaseLock(dr_mtx_);
00912
00913
00914 return -1;
00915 }
00916 }
00917
00918 int DiffusionRouting::sendMessage(Message *msg, handle h,
00919 u_int16_t priority)
00920 {
00921 RedirectMessage *original_hdr;
00922 NRAttribute *original_attr, *ctrl_msg_attr;
00923 ControlMessage *control_blob;
00924 NRAttrVec *attrs;
00925 Message *my_message;
00926
00927 if ((priority < FILTER_MIN_PRIORITY) ||
00928 (priority > FILTER_KEEP_PRIORITY))
00929 return FAIL;
00930
00931
00932 original_hdr = new RedirectMessage(msg->new_message_, msg->msg_type_,
00933 msg->source_port_, msg->data_len_,
00934 msg->num_attr_, msg->rdm_id_,
00935 msg->pkt_num_, msg->next_hop_,
00936 msg->last_hop_, 0,
00937 msg->next_port_);
00938
00939 original_attr = OriginalHdrAttr.make(NRAttribute::IS, (void *)original_hdr,
00940 sizeof(RedirectMessage));
00941
00942
00943 control_blob = new ControlMessage(SEND_MESSAGE, h, priority);
00944
00945 ctrl_msg_attr = ControlMsgAttr.make(NRAttribute::IS, (void *)control_blob,
00946 sizeof(ControlMessage));
00947
00948
00949 attrs = CopyAttrs(msg->msg_attr_vec_);
00950 attrs->push_back(original_attr);
00951 attrs->push_back(ctrl_msg_attr);
00952
00953 my_message = new Message(DIFFUSION_VERSION, CONTROL, agent_id_, 0,
00954 0, pkt_count_, random_id_, LOCALHOST_ADDR,
00955 LOCALHOST_ADDR);
00956
00957
00958 pkt_count_++;
00959
00960
00961 my_message->msg_attr_vec_ = attrs;
00962 my_message->num_attr_ = attrs->size();
00963 my_message->data_len_ = CalculateSize(attrs);
00964
00965
00966 sendMessageToDiffusion(my_message);
00967
00968 delete my_message;
00969 delete control_blob;
00970 delete original_hdr;
00971
00972 return OK;
00973 }
00974
00975 #ifndef NS_DIFFUSION
00976 void DiffusionRouting::doIt()
00977 {
00978 run(true, WAIT_FOREVER);
00979 }
00980
00981 void DiffusionRouting::doOne(long timeout)
00982 {
00983 run(false, timeout);
00984 }
00985
00986 void DiffusionRouting::run(bool wait_condition, long max_timeout)
00987 {
00988 DeviceList::iterator itr;
00989 int status, max_sock, fd;
00990 bool flag;
00991 DiffPacket in_pkt;
00992 fd_set fds;
00993 struct timeval tv;
00994 struct timeval max_tv;
00995
00996 do{
00997 FD_ZERO(&fds);
00998 max_sock = 0;
00999
01000
01001 max_tv.tv_sec = (int) (max_timeout / 1000);
01002 max_tv.tv_usec = (int) ((max_timeout % 1000) * 1000);
01003
01004 for (itr = in_devices_.begin(); itr != in_devices_.end(); ++itr){
01005 (*itr)->addInFDS(&fds, &max_sock);
01006 }
01007
01008
01009 timers_manager_->nextTimerTime(&tv);
01010
01011 if (tv.tv_sec == MAXVALUE){
01012
01013 if (max_timeout == WAIT_FOREVER){
01014 tv.tv_sec = POLLING_INTERVAL;
01015 tv.tv_usec = 0;
01016 }
01017 else{
01018 tv = max_tv;
01019 }
01020 }
01021 else{
01022 if ((max_timeout != WAIT_FOREVER) && (TimevalCmp(&tv, &max_tv) > 0)){
01023
01024
01025 tv = max_tv;
01026 }
01027 }
01028
01029 status = select(max_sock+1, &fds, NULL, NULL, &tv);
01030
01031 if (status == 0){
01032
01033 timers_manager_->executeAllExpiredTimers();
01034 }
01035
01036 if (status > 0){
01037 do{
01038 flag = false;
01039 for (itr = in_devices_.begin(); itr != in_devices_.end(); ++itr){
01040 fd = (*itr)->checkInFDS(&fds);
01041 if (fd != -1){
01042
01043 in_pkt = (*itr)->recvPacket(fd);
01044 recvPacket(in_pkt);
01045
01046
01047 FD_CLR(fd, &fds);
01048 status--;
01049 flag = true;
01050 }
01051 }
01052 } while ((status > 0) && (flag == true));
01053 }
01054 else
01055 if (status < 0){
01056 DiffPrint(DEBUG_IMPORTANT, "Select returned %d\n", status);
01057 }
01058 } while (wait_condition);
01059 }
01060
01061 #endif // NS_DIFFUSION
01062
01063 #ifndef NS_DIFFUSION
01064 void DiffusionRouting::sendMessageToDiffusion(Message *msg)
01065 {
01066 DiffPacket out_pkt = NULL;
01067 struct hdr_diff *dfh;
01068 char *pos;
01069 int len;
01070
01071 out_pkt = AllocateBuffer(msg->msg_attr_vec_);
01072 dfh = HDR_DIFF(out_pkt);
01073
01074 pos = (char *) out_pkt;
01075 pos = pos + sizeof(struct hdr_diff);
01076
01077 len = PackAttrs(msg->msg_attr_vec_, pos);
01078
01079 LAST_HOP(dfh) = htonl(msg->last_hop_);
01080 NEXT_HOP(dfh) = htonl(msg->next_hop_);
01081 DIFF_VER(dfh) = msg->version_;
01082 MSG_TYPE(dfh) = msg->msg_type_;
01083 DATA_LEN(dfh) = htons(len);
01084 PKT_NUM(dfh) = htonl(msg->pkt_num_);
01085 RDM_ID(dfh) = htonl(msg->rdm_id_);
01086 NUM_ATTR(dfh) = htons(msg->num_attr_);
01087 SRC_PORT(dfh) = htons(msg->source_port_);
01088
01089 sendPacketToDiffusion(out_pkt, sizeof(struct hdr_diff) + len, diffusion_port_);
01090
01091 delete [] out_pkt;
01092 }
01093 #else
01094 void DiffusionRouting::sendMessageToDiffusion(Message *msg)
01095 {
01096 Message *my_msg;
01097 DeviceList::iterator itr;
01098 int len;
01099
01100 my_msg = CopyMessage(msg);
01101 len = CalculateSize(my_msg->msg_attr_vec_);
01102 len = len + sizeof(struct hdr_diff);
01103
01104 for (itr = local_out_devices_.begin(); itr != local_out_devices_.end(); ++itr){
01105 (*itr)->sendPacket((DiffPacket) my_msg, len, diffusion_port_);
01106 }
01107 }
01108 #endif // !NS_DIFFUSION
01109
01110 void DiffusionRouting::sendPacketToDiffusion(DiffPacket pkt, int len, int dst)
01111 {
01112 DeviceList::iterator itr;
01113
01114 for (itr = local_out_devices_.begin(); itr != local_out_devices_.end(); ++itr){
01115 (*itr)->sendPacket(pkt, len, dst);
01116 }
01117 }
01118
01119 #ifndef NS_DIFFUSION
01120 void DiffusionRouting::recvPacket(DiffPacket pkt)
01121 {
01122 struct hdr_diff *dfh = HDR_DIFF(pkt);
01123 Message *rcv_message = NULL;
01124 int8_t version, msg_type;
01125 u_int16_t data_len, num_attr, source_port;
01126 int32_t pkt_num, rdm_id, next_hop, last_hop;
01127
01128
01129 version = DIFF_VER(dfh);
01130 msg_type = MSG_TYPE(dfh);
01131 source_port = ntohs(SRC_PORT(dfh));
01132 pkt_num = ntohl(PKT_NUM(dfh));
01133 rdm_id = ntohl(RDM_ID(dfh));
01134 num_attr = ntohs(NUM_ATTR(dfh));
01135 next_hop = ntohl(NEXT_HOP(dfh));
01136 last_hop = ntohl(LAST_HOP(dfh));
01137 data_len = ntohs(DATA_LEN(dfh));
01138
01139
01140 rcv_message = new Message(version, msg_type, source_port, data_len,
01141 num_attr, pkt_num, rdm_id, next_hop, last_hop);
01142
01143
01144 rcv_message->msg_attr_vec_ = UnpackAttrs(pkt, num_attr);
01145
01146
01147 recvMessage(rcv_message);
01148
01149
01150 delete rcv_message;
01151 delete [] pkt;
01152 }
01153 #endif // !NS_DIFFUSION
01154
01155 void DiffusionRouting::recvMessage(Message *msg)
01156 {
01157
01158 if (msg->version_ != DIFFUSION_VERSION)
01159 return;
01160
01161
01162 if (msg->next_hop_ != LOCALHOST_ADDR)
01163 return;
01164
01165
01166 if (msg->msg_type_ == REDIRECT)
01167 processControlMessage(msg);
01168 else
01169 processMessage(msg);
01170 }
01171
01172 void DiffusionRouting::processControlMessage(Message *msg)
01173 {
01174 NRSimpleAttribute<void *> *original_header_attr = NULL;
01175 NRAttrVec::iterator place = msg->msg_attr_vec_->begin();
01176 RedirectMessage *original_header;
01177 FilterEntry *entry;
01178 handle my_handle;
01179
01180
01181 original_header_attr = OriginalHdrAttr.find_from(msg->msg_attr_vec_,
01182 place, &place);
01183 if (!original_header_attr){
01184 DiffPrint(DEBUG_ALWAYS, "Error: Received an invalid REDIRECT message !\n");
01185 return;
01186 }
01187
01188
01189 original_header = (RedirectMessage *) original_header_attr->getVal();
01190 my_handle = original_header->handle_;
01191 msg->msg_type_ = original_header->msg_type_;
01192 msg->source_port_ = original_header->source_port_;
01193 msg->pkt_num_ = original_header->pkt_num_;
01194 msg->rdm_id_ = original_header->rdm_id_;
01195 msg->next_hop_ = original_header->next_hop_;
01196 msg->last_hop_ = original_header->last_hop_;
01197 msg->num_attr_ = original_header->num_attr_;
01198 msg->new_message_ = original_header->new_message_;
01199 msg->next_port_ = original_header->next_port_;
01200
01201
01202 msg->msg_attr_vec_->erase(place);
01203 delete original_header_attr;
01204
01205
01206 GetLock(dr_mtx_);
01207
01208 entry = findFilter(my_handle);
01209 if (entry && entry->valid_){
01210
01211 if (OneWayMatch(entry->filter_attrs_, msg->msg_attr_vec_)){
01212 ReleaseLock(dr_mtx_);
01213 entry->cb_->recv(msg, my_handle);
01214 return;
01215 }
01216 else{
01217 DiffPrint(DEBUG_ALWAYS,
01218 "Warning: Filter doesn't match incoming message's attributes !\n");
01219 }
01220 }
01221 else{
01222 DiffPrint(DEBUG_IMPORTANT,
01223 "Report: Cannot find filter (possibly deleted ?)\n");
01224 }
01225
01226 ReleaseLock(dr_mtx_);
01227 }
01228
01229 void DiffusionRouting::processMessage(Message *msg)
01230 {
01231 NRSimpleAttribute<int> *rmst_id_attr = NULL;
01232 CallbackList::iterator cbl_itr;
01233 HandleList::iterator sub_itr;
01234 NRAttrVec *callback_attrs;
01235 HandleEntry *entry;
01236 CallbackEntry *aux;
01237 CallbackList cbl;
01238
01239
01240 GetLock(dr_mtx_);
01241
01242 for (sub_itr = sub_list_.begin(); sub_itr != sub_list_.end(); ++sub_itr){
01243 entry = *sub_itr;
01244 if ((entry->valid_) && (MatchAttrs(msg->msg_attr_vec_, entry->attrs_)))
01245 if (entry->cb_){
01246 aux = new CallbackEntry(entry->cb_, entry->hdl_);
01247 cbl.push_back(aux);
01248 }
01249 }
01250
01251
01252 ReleaseLock(dr_mtx_);
01253
01254
01255 rmst_id_attr = RmstIdAttr.find(msg->msg_attr_vec_);
01256 cbl_itr = cbl.begin();
01257
01258
01259 if (rmst_id_attr && (cbl_itr != cbl.end())){
01260 if (!processRmst(msg)){
01261 cbl.clear();
01262 return;
01263 }
01264 }
01265
01266
01267 for (cbl_itr = cbl.begin(); cbl_itr != cbl.end(); ++cbl_itr){
01268
01269 callback_attrs = CopyAttrs(msg->msg_attr_vec_);
01270
01271
01272 aux = *cbl_itr;
01273 aux->cb_->recv(callback_attrs, aux->subscription_handle_);
01274 delete aux;
01275
01276
01277 ClearAttrs(callback_attrs);
01278 delete callback_attrs;
01279 }
01280
01281
01282 cbl.clear();
01283 }
01284
01285 bool DiffusionRouting::processRmst(Message *msg)
01286 {
01287 NRSimpleAttribute<void *> *data_buf_attr = NULL;
01288 NRSimpleAttribute<int> *max_frag_attr = NULL;
01289 NRSimpleAttribute<int> *rmst_id_attr = NULL;
01290 NRSimpleAttribute<int> *frag_attr = NULL;
01291 int rmst_no, frag_no, data_buf_len, count;
01292 void *blob_ptr, *tmp_frag_ptr;
01293 Int2RecRmst::iterator rmst_iterator;
01294 Int2Frag::iterator frag_iterator;
01295 char *dstPtr;
01296 int dstSize;
01297 RecRmst *rmst_ptr;
01298
01299
01300 data_buf_attr = RmstDataAttr.find(msg->msg_attr_vec_);
01301 rmst_id_attr = RmstIdAttr.find(msg->msg_attr_vec_);
01302 frag_attr = RmstFragAttr.find(msg->msg_attr_vec_);
01303 rmst_no = rmst_id_attr->getVal();
01304 frag_no = frag_attr->getVal();
01305 blob_ptr = data_buf_attr->getVal();
01306 data_buf_len = data_buf_attr->getLen();
01307
01308
01309 rmst_iterator = rec_rmst_map_.find(rmst_no);
01310 if (rmst_iterator == rec_rmst_map_.end()){
01311 rmst_ptr = new RecRmst(rmst_no);
01312 rec_rmst_map_.insert(Int2RecRmst::value_type(rmst_no, rmst_ptr));
01313 }
01314 else
01315 rmst_ptr = (*rmst_iterator).second;
01316
01317 if (frag_no == 0){
01318 max_frag_attr = RmstMaxFragAttr.find(msg->msg_attr_vec_);
01319 rmst_ptr->max_frag_ = max_frag_attr->getVal();
01320 rmst_ptr->mtu_len_ = data_buf_len;
01321 }
01322
01323
01324 tmp_frag_ptr = new char[data_buf_len];
01325 memcpy(tmp_frag_ptr, blob_ptr, data_buf_len);
01326 rmst_ptr->frag_map_.insert(Int2Frag::value_type(frag_no, tmp_frag_ptr));
01327
01328 if (frag_no == rmst_ptr->max_frag_)
01329 rmst_ptr->max_frag_len_ = data_buf_len;
01330
01331 count = rmst_ptr->frag_map_.size();
01332
01333
01334 if (count == (rmst_ptr->max_frag_ + 1)){
01335
01336 DiffPrint(DEBUG_DETAILS,
01337 "RMST #%d is complete, creating big blob !\n", rmst_no);
01338
01339
01340 dstSize = rmst_ptr->max_frag_ * rmst_ptr->mtu_len_ + rmst_ptr->max_frag_len_;
01341 dstPtr = new char[dstSize];
01342
01343
01344 for (int i = 0; i < rmst_ptr->max_frag_; i++){
01345 frag_iterator = rmst_ptr->frag_map_.find(i);
01346 tmp_frag_ptr = (*frag_iterator).second;
01347 memcpy((void *)&dstPtr[i * rmst_ptr->mtu_len_],
01348 (void *)tmp_frag_ptr, rmst_ptr->mtu_len_);
01349 }
01350
01351
01352 frag_iterator = rmst_ptr->frag_map_.find(rmst_ptr->max_frag_);
01353 tmp_frag_ptr = (*frag_iterator).second;
01354 memcpy((void *)&dstPtr[rmst_ptr->max_frag_ * rmst_ptr->mtu_len_],
01355 (void *)tmp_frag_ptr, rmst_ptr->max_frag_len_);
01356
01357
01358 rec_rmst_map_.erase(rmst_iterator);
01359 delete rmst_ptr;
01360
01361
01362 data_buf_attr->setVal(dstPtr, dstSize);
01363
01364
01365 return true;
01366 }
01367
01368
01369 return false;
01370 }
01371
01372 HandleEntry * DiffusionRouting::removeHandle(handle my_handle, HandleList *hl)
01373 {
01374 HandleList::iterator itr;
01375 HandleEntry *entry;
01376
01377 for (itr = hl->begin(); itr != hl->end(); ++itr){
01378 entry = *itr;
01379 if (entry->hdl_ == my_handle){
01380 hl->erase(itr);
01381 return entry;
01382 }
01383 }
01384 return NULL;
01385 }
01386
01387 HandleEntry * DiffusionRouting::findHandle(handle my_handle, HandleList *hl)
01388 {
01389 HandleList::iterator itr;
01390 HandleEntry *entry;
01391
01392 for (itr = hl->begin(); itr != hl->end(); ++itr){
01393 entry = *itr;
01394 if (entry->hdl_ == my_handle)
01395 return entry;
01396 }
01397 return NULL;
01398 }
01399
01400 FilterEntry * DiffusionRouting::deleteFilter(handle my_handle)
01401 {
01402 FilterList::iterator itr;
01403 FilterEntry *entry;
01404
01405 for (itr = filter_list_.begin(); itr != filter_list_.end(); ++itr){
01406 entry = *itr;
01407 if (entry->handle_ == my_handle){
01408 filter_list_.erase(itr);
01409 return entry;
01410 }
01411 }
01412 return NULL;
01413 }
01414
01415 FilterEntry * DiffusionRouting::findFilter(handle my_handle)
01416 {
01417 FilterList::iterator itr;
01418 FilterEntry *entry;
01419
01420 for (itr = filter_list_.begin(); itr != filter_list_.end(); ++itr){
01421 entry = *itr;
01422 if (entry->handle_ == my_handle)
01423 return entry;
01424 }
01425 return NULL;
01426 }
01427
01428 bool DiffusionRouting::hasScope(NRAttrVec *attrs)
01429 {
01430 NRAttribute *temp = NULL;
01431
01432 temp = NRScopeAttr.find(attrs);
01433 if (temp)
01434 return true;
01435
01436 return false;
01437 }
01438
01439 bool DiffusionRouting::checkSubscription(NRAttrVec *attrs)
01440 {
01441 NRSimpleAttribute<int> *nrclass = NULL;
01442 NRSimpleAttribute<int> *nrscope = NULL;
01443
01444
01445 nrclass = NRClassAttr.find(attrs);
01446 nrscope = NRScopeAttr.find(attrs);
01447
01448
01449 if (!nrclass)
01450 return false;
01451
01452 if (nrscope){
01453
01454
01455
01456
01457
01458 if (nrscope->getOp() != NRAttribute::IS)
01459 return false;
01460
01461
01462 if ((nrscope->getVal() == NRAttribute::GLOBAL_SCOPE) &&
01463 (nrclass->getVal() == NRAttribute::INTEREST_CLASS) &&
01464 (nrclass->getOp() == NRAttribute::IS))
01465 return true;
01466
01467
01468 if (nrscope->getVal() == NRAttribute::NODE_LOCAL_SCOPE)
01469 return true;
01470
01471
01472 return false;
01473 }
01474
01475
01476
01477 if ((nrclass->getVal() == NRAttribute::INTEREST_CLASS) &&
01478 (nrclass->getOp() == NRAttribute::IS))
01479 return true;
01480
01481 return false;
01482 }
01483
01484 bool DiffusionRouting::checkPublication(NRAttrVec *attrs)
01485 {
01486 NRSimpleAttribute<int> *nrclass = NULL;
01487 NRSimpleAttribute<int> *nrscope = NULL;
01488
01489
01490 nrclass = NRClassAttr.find(attrs);
01491 nrscope = NRScopeAttr.find(attrs);
01492
01493
01494 if (!nrclass)
01495 return false;
01496
01497
01498
01499 if ((nrclass->getVal() != NRAttribute::DATA_CLASS) ||
01500 (nrclass->getOp() != NRAttribute::IS))
01501 return false;
01502
01503 if (nrscope){
01504
01505
01506
01507
01508
01509 if (nrscope->getOp() != NRAttribute::IS)
01510 return false;
01511
01512
01513 if ((nrscope->getVal() == NRAttribute::GLOBAL_SCOPE) ||
01514 (nrscope->getVal() == NRAttribute::NODE_LOCAL_SCOPE))
01515 return true;
01516
01517
01518 return false;
01519 }
01520
01521
01522
01523 return true;
01524 }
01525
01526 bool DiffusionRouting::checkSend(NRAttrVec *attrs)
01527 {
01528 NRSimpleAttribute<int> *nrclass = NULL;
01529 NRSimpleAttribute<int> *nrscope = NULL;
01530
01531
01532 nrclass = NRClassAttr.find(attrs);
01533 nrscope = NRScopeAttr.find(attrs);
01534
01535 if (nrclass || nrscope)
01536 return false;
01537
01538 return true;
01539 }
01540
01541 bool DiffusionRouting::isPushData(NRAttrVec *attrs)
01542 {
01543 NRSimpleAttribute<int> *nrclass = NULL;
01544 NRSimpleAttribute<int> *nrscope = NULL;
01545
01546
01547 nrclass = NRClassAttr.find(attrs);
01548 nrscope = NRScopeAttr.find(attrs);
01549
01550
01551 if (nrclass && nrscope){
01552 if (nrscope->getVal() == NRAttribute::NODE_LOCAL_SCOPE)
01553 return false;
01554 return true;
01555 }
01556 else{
01557 DiffPrint(DEBUG_ALWAYS, "Error: Cannot find class/scope attributes !\n");
01558 return false;
01559 }
01560 }