00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040
00041
00042
00043
00044
00045
00046
00047
00048
00049
00050
00051
00052
00053 #ifndef lint
00054 static const char rcsid[] =
00055 "@(#) $Header: /nfs/jade/vint/CVSROOT/ns-2/mcast/srm-ssm.cc,v 1.11 2005/08/25 18:58:08 johnh Exp $ (USC/ISI)";
00056 #endif
00057
00058 #include <stdlib.h>
00059 #include <assert.h>
00060
00061 #include <stdio.h>
00062
00063 #include "config.h"
00064 #include "tclcl.h"
00065 #include "agent.h"
00066 #include "packet.h"
00067 #include "ip.h"
00068 #include "srm.h"
00069 #include "srm-ssm.h"
00070 #include "trace.h"
00071
00072 int hdr_srm_ext::offset_;
00073 static class SRMEXTHeaderClass : public PacketHeaderClass {
00074 public:
00075 SRMEXTHeaderClass() : PacketHeaderClass("PacketHeader/SRMEXT",
00076 sizeof(hdr_srm_ext)) {
00077 bind_offset(&hdr_srm_ext::offset_);
00078 }
00079 } class_srmexthdr;
00080
00081 static class SSMSRMAgentClass : public TclClass {
00082 public:
00083 SSMSRMAgentClass() : TclClass("Agent/SRM/SSM") {}
00084 TclObject* create(int, const char*const*) {
00085 return (new SSMSRMAgent());
00086 }
00087 } class_srm_ssm_agent;
00088
00089
00090 SSMSRMAgent::SSMSRMAgent()
00091 : SRMAgent(), glb_sessCtr_(-1), loc_sessCtr_(-1), rep_sessCtr_(-1)
00092 {
00093 bind("group_scope_",&groupScope_);
00094 bind("local_scope_",&localScope_);
00095 bind("scope_flag_",&scopeFlag_);
00096 bind("rep_id_", &repid_);
00097 }
00098
00099 int SSMSRMAgent::command(int argc, const char*const* argv)
00100 {
00101 Tcl& tcl = Tcl::instance();
00102
00103 if (strcmp(argv[1], "send") == 0) {
00104 if (strcmp(argv[2], "session") == 0) {
00105 send_sess();
00106 return TCL_OK;
00107 }
00108 if (strcmp(argv[2], "request") == 0) {
00109 int round = atoi(argv[3]);
00110 int sender = atoi(argv[4]);
00111 int msgid = atoi(argv[5]);
00112 send_ctrl(SRM_RQST, round, sender, msgid, 0);
00113 return TCL_OK;
00114 }
00115 if (strcmp(argv[2], "repair") == 0) {
00116 int round = atoi(argv[3]);
00117 int sender = atoi(argv[4]);
00118 int msgid = atoi(argv[5]);
00119 send_ctrl(SRM_REPR, round, sender, msgid, packetSize_);
00120 return TCL_OK;
00121 }
00122 tcl.resultf("%s: invalid send request %s", name_, argv[2]);
00123 return TCL_ERROR;
00124
00125
00126
00127
00128
00129
00130
00131 }
00132
00133 if (argc == 2) {
00134 if (strcmp(argv[1], "start") == 0) {
00135 sip_->sender_ = addr();
00136 sip_->distance_ = 0.0;
00137
00138
00139
00140
00141
00142 groupScope_ = 32;
00143 senderFlag_ = 0;
00144 printf("%s is %d and rep-status %d\n", name_, addr(), scopeFlag_);
00145 return TCL_OK;
00146 }
00147 if (strcmp(argv[1], "ch-rep") == 0) {
00148 if(scopeFlag_ == SRM_GLOBAL) {
00149 sip_->repid_ = repid_ = addr();
00150 sip_->scopeFlag_ = SRM_GLOBAL;
00151 } else {
00152 sip_->repid_ = repid_;
00153 sip_->scopeFlag_ = SRM_LOCAL;
00154 }
00155 return TCL_OK;
00156 }
00157 if (strcmp(argv[1], "distances?") == 0) {
00158 if (sip_->sender_ < 0) {
00159 tcl.result("");
00160 return TCL_OK;
00161 }
00162 for (SRMinfo* sp = sip_; sp; sp = sp->next_) {
00163 if((sp->distanceFlag_ == REP_DISTANCE) ||
00164 (sp->distanceFlag_ == SELF_DISTANCE)) {
00165
00166 tcl.resultf("%s %d %f", tcl.result(),
00167 sp->sender_, sp->distance_);
00168 } else {
00169 SRMinfo* rsp = get_state(sp->repid_);
00170 tcl.resultf("%s %d %f", tcl.result(),
00171 sp->sender_, rsp->distance_);
00172 }
00173 }
00174 return TCL_OK;
00175 }
00176
00177 }
00178 if (argc == 3) {
00179 if (strcmp(argv[1], "distance?") == 0) {
00180 int sender = atoi(argv[2]);
00181 SRMinfo* sp = get_state(sender);
00182 if((sp->distanceFlag_ == REP_DISTANCE) ||
00183 (sp->distanceFlag_ == SELF_DISTANCE)) {
00184 tcl.resultf("%f", sp->distance_);
00185 } else {
00186 SRMinfo* rsp = get_state(sp->repid_);
00187 tcl.resultf("%f", rsp->distance_);
00188 }
00189 return TCL_OK;
00190 }
00191 }
00192 return SRMAgent::command(argc, argv);
00193 }
00194
00195 void SSMSRMAgent::recv(Packet* p, Handler* h)
00196 {
00197 hdr_ip* ih = hdr_ip::access(p);
00198 hdr_srm* sh = hdr_srm::access(p);
00199 hdr_srm_ext* seh = hdr_srm_ext::access(p);
00200
00201 if (ih->daddr() == 0) {
00202
00203 sh->type() = SRM_DATA;
00204 sh->sender() = addr();
00205 sh->seqnum() = ++dataCtr_;
00206 seh->repid() = repid_;
00207 ih->dst() = dst_;
00208 ih->src() = here_;
00209 target_->recv(p, h);
00210 } else {
00211
00212 #if 0
00213 static char *foo[] = {"NONE", "DATA", "SESS", "RQST", "REPR"};
00214 fprintf(stdout, "%7.4f %s %d recvd SRM_%s <%d, %d> from %d\n",
00215 Scheduler::instance().clock(), name_, addr_, foo[sh->type()],
00216 sh->sender(), sh->seqnum(), ih->src());
00217 fflush(stdout);
00218 #endif
00219
00220 switch (sh->type()) {
00221 case SRM_DATA:
00222 recv_data(sh->sender(), sh->seqnum(), seh->repid(), p->accessdata());
00223 Packet::free(p);
00224 break;
00225 case SRM_RQST:
00226 recv_rqst(ih->saddr(), sh->round(), sh->sender(), sh->seqnum(),
00227 seh->repid());
00228 Packet::free(p);
00229 break;
00230 case SRM_REPR:
00231 recv_repr(sh->round(), sh->sender(), sh->seqnum(), p->accessdata());
00232 Packet::free(p);
00233 break;
00234 case SRM_SESS:
00235
00236
00237
00238 recv_sess(sh->seqnum(), (int*) p->accessdata(), p);
00239 break;
00240 }
00241 }
00242 }
00243
00244
00245 void SSMSRMAgent::recv_data(int sender, int id, int repid, u_char* data)
00246 {
00247 SRMinfo* sp = get_state(sender);
00248
00249 sp->repid_ = repid;
00250 SRMAgent::recv_data(sender,id,data);
00251 }
00252
00253 void SSMSRMAgent::send_ctrl(int type, int round, int sender, int msgid, int size)
00254 {
00255 Packet* p = Agent::allocpkt();
00256 hdr_srm* sh = hdr_srm::access(p);
00257 hdr_srm_ext* seh = hdr_srm_ext::access(p);
00258 sh->type() = type;
00259 sh->sender() = sender;
00260 sh->seqnum() = msgid;
00261 sh->round() = round;
00262 seh->repid() = repid_;
00263
00264 hdr_cmn* ch = hdr_cmn::access(p);
00265 ch->size() = sizeof(hdr_srm) + size;
00266 target_->recv(p, (Handler*)NULL);
00267 }
00268
00269 void SSMSRMAgent::recv_rqst(int requestor, int round, int sender,
00270 int msgid, int repid)
00271 {
00272
00273 SRMinfo* rsp = get_state(requestor);
00274 rsp->repid_ = repid;
00275 SRMAgent::recv_rqst(requestor,round, sender,msgid);
00276 }
00277
00278
00279
00280
00281
00282
00283
00284 void SSMSRMAgent::send_sess()
00285 {
00286 if (scopeFlag_ == SRM_GLOBAL) {
00287 send_glb_sess();
00288 send_rep_sess();
00289 } else {
00290 send_loc_sess();
00291 }
00292
00293 }
00294
00295 #define SESSINFO_SIZE 5
00296 #define SESS_CONST 2
00297
00298 void SSMSRMAgent::send_glb_sess()
00299 {
00300 int size = (SESS_CONST + groupSize_ * SESSINFO_SIZE) * sizeof(int);
00301
00302 int num_entries;
00303 Packet* p = Agent::allocpkt(size);
00304 hdr_srm* sh = hdr_srm::access(p);
00305 hdr_srm_ext* seh = hdr_srm_ext::access(p);
00306
00307 #if 0
00308 printf("sending global session message\n");
00309 #endif
00310 sh->type() = SRM_SESS;
00311 sh->sender() = addr();
00312 sh->seqnum() = ++glb_sessCtr_;
00313 seh->repid() = repid_;
00314
00315 int* data = (int*) p->accessdata();
00316 *data++ = groupSize_;
00317 *data++ = SRM_GLOBAL;
00318 num_entries = 0;
00319 for (SRMinfo* sp = sip_; sp; sp = sp->next_) {
00320
00321 if ((sp->senderFlag_ ||
00322 (sp->scopeFlag_ == SRM_GLOBAL) ||
00323 (sp->sender_ == addr()))
00324 && (is_active(sp))) {
00325 *data++ = sp->sender_;
00326 *data++ = sp->ldata_;
00327 *data++ = sp->recvTime_;
00328 *data++ = sp->sendTime_;
00329 *data++ = sp->repid_;
00330 num_entries++;
00331 }
00332 }
00333 data = (int*) p->accessdata();
00334 data[0] = num_entries;
00335 data[1] = SRM_GLOBAL;
00336 size = (SESS_CONST + num_entries * SESSINFO_SIZE) * sizeof(int);
00337 data[5] = (int) (Scheduler::instance().clock()*1000);
00338
00339 hdr_cmn* ch = hdr_cmn::access(p);
00340 ch->size() += size+ sizeof(hdr_srm);
00341 hdr_ip* ih = hdr_ip::access(p);
00342 ih->ttl() = groupScope_;
00343
00344 ih->flowid() = SRM_GLOBAL;
00345 seh->ottl() = groupScope_;
00346 target_->recv(p, (Handler*)NULL);
00347 }
00348
00349
00350 void SSMSRMAgent::send_loc_sess()
00351 {
00352 int size = (SESS_CONST + groupSize_ * SESSINFO_SIZE) * sizeof(int);
00353
00354 int num_entries;
00355 Packet* p = Agent::allocpkt(size);
00356 hdr_srm* sh = hdr_srm::access(p);
00357 hdr_srm_ext* seh = hdr_srm_ext::access(p);
00358 sh->type() = SRM_SESS;
00359 sh->sender() = addr();
00360 sh->seqnum() = ++loc_sessCtr_;
00361 seh->repid() = repid_;
00362 #if 0
00363 printf("sending local session message\n");
00364 #endif
00365 int* data = (int*) p->accessdata();
00366
00367 *data++ = groupSize_;
00368 *data++ = SRM_LOCAL;
00369 num_entries = 0;
00370 for (SRMinfo* sp = sip_; sp; sp = sp->next_) {
00371
00372
00373 if ((sp->senderFlag_ ||
00374 (sp->scopeFlag_ == SRM_LOCAL) ||
00375 (sp->distanceFlag_ = SELF_DISTANCE) ||
00376
00377 (sp->sender_ == addr()) ||
00378
00379
00380 (repid_ == sp->sender_))
00381 && (is_active(sp))) {
00382 *data++ = sp->sender_;
00383 *data++ = sp->ldata_;
00384 *data++ = sp->recvTime_;
00385 *data++ = sp->sendTime_;
00386 *data++ = sp->repid_;
00387 num_entries++;
00388 }
00389 }
00390 data = (int*) p->accessdata();
00391 data[0] = num_entries;
00392 data[1] = SRM_LOCAL;
00393 size = (SESS_CONST + num_entries * SESSINFO_SIZE) * sizeof(int);
00394 data[5] = (int) (Scheduler::instance().clock()*1000);
00395
00396 hdr_cmn* ch = hdr_cmn::access(p);
00397 ch->size() += size+ sizeof(hdr_srm);
00398 hdr_ip* ih = hdr_ip::access(p);
00399 ih->ttl() = localScope_;
00400
00401 ih->flowid() = SRM_LOCAL;
00402 seh->ottl() = localScope_;
00403 target_->recv(p, (Handler*)NULL);
00404 }
00405
00406
00407 void SSMSRMAgent::send_rep_sess()
00408 {
00409 int size = (SESS_CONST + groupSize_ * SESSINFO_SIZE) * sizeof(int);
00410
00411 int num_entries, num_local_members;
00412 Packet* p = Agent::allocpkt(size);
00413 hdr_srm* sh = hdr_srm::access(p);
00414 hdr_srm_ext* seh = hdr_srm_ext::access(p);
00415 sh->type() = SRM_SESS;
00416 sh->sender() = addr();
00417 sh->seqnum() = ++rep_sessCtr_;
00418 seh->repid() = repid_;
00419 #if 0
00420 printf("sending rep_info session message\n");
00421 #endif
00422 int* data = (int*) p->accessdata();
00423 *data++ = groupSize_;
00424 *data++ = SRM_RINFO;
00425 num_entries = 0;
00426 num_local_members = 0;
00427 for (SRMinfo* sp = sip_; sp; sp = sp->next_) {
00428 if (sp->activeFlag_ == ACTIVE) {
00429
00430
00431 *data++ = sp->sender_;
00432 *data++ = sp->ldata_;
00433 if (sp->scopeFlag_ == SRM_GLOBAL) {
00434 *data++ = (int) (sp->distance_*1000);
00435 data++;
00436 } else {
00437
00438 *data++ = sp->recvTime_;
00439 *data++ = sp->sendTime_;
00440 num_local_members++;
00441 }
00442 *data++ = sp->repid_;
00443 num_entries++;
00444 }
00445 }
00446 if (num_local_members <= 0) {
00447 Packet::free(p);
00448 return;
00449 }
00450 data = (int*) p->accessdata();
00451 data[0] = num_entries;
00452 data[1] = SRM_RINFO;
00453 size = (SESS_CONST + num_entries * SESSINFO_SIZE) * sizeof(int);
00454 data[5] = (int) (Scheduler::instance().clock()*1000);
00455
00456 hdr_cmn* ch = hdr_cmn::access(p);
00457 ch->size() += size+ sizeof(hdr_srm);
00458 hdr_ip* ih = hdr_ip::access(p);
00459 ih->ttl() = localScope_;
00460
00461 ih->flowid() = SRM_RINFO;
00462 seh->ottl() = localScope_;
00463 target_->recv(p, (Handler*)NULL);
00464 }
00465
00466
00467 #define GET_SESSION_INFO \
00468 sender = *data++; \
00469 dataCnt = *data++; \
00470 rtime = *data++; \
00471 stime = *data++; \
00472 repid = *data++; \
00473 // printf("s:%d, d:%d, rt:%d, st:%d, rep:%d\n",sender,
00474
00475
00476
00477
00478 void SSMSRMAgent::recv_sess(int sessCtr, int* data, Packet* p)
00479 {
00480 int type = data[1];
00481
00482 switch (type) {
00483 case SRM_GLOBAL :
00484 recv_glb_sess(sessCtr,data,p);
00485 break;
00486 case SRM_LOCAL :
00487 recv_loc_sess(sessCtr,data,p);
00488 break;
00489 case SRM_RINFO :
00490 if (scopeFlag_ == SRM_GLOBAL) return;
00491 recv_rep_sess(sessCtr,data,p);
00492 break;
00493 }
00494 Packet::free(p);
00495 }
00496
00497 void SSMSRMAgent::recv_glb_sess(int sessCtr, int* data, Packet* p)
00498 {
00499 Tcl& tcl = Tcl::instance();
00500 SRMinfo* sp;
00501 int ttl;
00502
00503 hdr_ip* ih = hdr_ip::access(p);
00504 hdr_srm_ext* seh = hdr_srm_ext::access(p);
00505 ttl = seh->ottl() - ih->ttl();
00506
00507 int sender, dataCnt, rtime, stime,repid;
00508 int now, sentAt, sentBy;
00509 int cnt = *data++;
00510
00511 int i;
00512
00513
00514
00515
00516 GET_SESSION_INFO;
00517 if (sender == addr())
00518
00519 return;
00520 if (seh->repid() != repid) {
00521 fprintf(stdout,"%f Recvd a glb-sess with diff header(%d) != inside(%d)\n",
00522 Scheduler::instance().clock(),seh->repid(),repid);
00523
00524 return;
00525 }
00526 if (sender != repid) {
00527 fprintf(stdout,"%f Recvd a glb-sess with repid(%d) != address(%d)\n",
00528 Scheduler::instance().clock(),repid,sender);
00529
00530 return;
00531 }
00532
00533 sp = get_state(sender);
00534 if (sp->lglbsess_ > sessCtr)
00535 return;
00536 #if 0
00537 fprintf(stdout,"%s recv-gsess from %d\n",name_,sender);
00538 #endif
00539 tcl.evalf("%s recv-gsess %d %d", name_, sender, ttl);
00540
00541 if (sp->scopeFlag_ != SRM_GLOBAL) {
00542 sp->scopeFlag_ = SRM_GLOBAL;
00543 }
00544 sp->repid_ = repid;
00545
00546 now = (int) (Scheduler::instance().clock() * 1000);
00547 sentBy = sender;
00548 sentAt = stime;
00549
00550 sp->lglbsess_ = sessCtr;
00551 sp->recvTime_ = now;
00552 sp->sendTime_ = stime;
00553 for (i = sp->ldata_ + 1; i <= dataCnt; i++)
00554 if (! sp->ifReceived(i))
00555 tcl.evalf("%s request %d %d", name_, sender, i, sp->repid_);
00556 if (sp->ldata_ < dataCnt)
00557 sp->ldata_ = dataCnt;
00558
00559 for (i = 1; i < cnt; i++) {
00560 GET_SESSION_INFO;
00561 if (sender == addr() && now) {
00562 int rtt = (now - sentAt) + (rtime - stime);
00563 sp = get_state(sentBy);
00564 sp->distance_ = (double) rtt / 2 / 1000;
00565 sp->distanceFlag_ = SELF_DISTANCE;
00566 #if 0
00567 fprintf(stderr,
00568 "%7.4f %s compute distance to %d: %f\n",
00569 Scheduler::instance().clock(), name_,
00570 sentBy, sp->distance_);
00571 #endif
00572 continue;
00573 }
00574 sp = get_state(sender);
00575 for (int j = sp->ldata_ + 1; j <= dataCnt; j++)
00576 if (! sp->ifReceived(j))
00577 tcl.evalf("%s request %d %d", name_, sender, j, sp->repid_);
00578 if (sp->ldata_ < dataCnt)
00579 sp->ldata_ = dataCnt;
00580 }
00581 }
00582
00583
00584 void SSMSRMAgent::recv_loc_sess(int sessCtr, int* data, Packet* p)
00585 {
00586 Tcl& tcl = Tcl::instance();
00587 SRMinfo* sp;
00588 int ttl;
00589
00590 hdr_ip* ih = hdr_ip::access(p);
00591 hdr_srm_ext* seh = hdr_srm_ext::access(p);
00592 ttl = seh->ottl() - ih->ttl();
00593
00594 int sender, dataCnt, rtime, stime,repid;
00595 int now, sentAt, sentBy;
00596 int cnt = *data++;
00597 data++;
00598 int i;
00599
00600
00601
00602
00603
00604 GET_SESSION_INFO;
00605 if (sender == addr())
00606 return;
00607
00608 sp = get_state(sender);
00609 if (sp->llocsess_ > sessCtr)
00610 return;
00611 if (sp->scopeFlag_ != SRM_LOCAL) {
00612 sp->scopeFlag_ = SRM_LOCAL;
00613
00614 }
00615 sp->repid_ = repid;
00616
00617 #if 0
00618 fprintf(stdout,"%s recv-lsess from %d\n",name_,sender);
00619 #endif
00620 tcl.evalf("%s recv-lsess %d %d %d", name_, sender, repid, ttl);
00621
00622 now = (int) (Scheduler::instance().clock() * 1000);
00623 sentBy = sender;
00624 sentAt = stime;
00625
00626 sp->llocsess_ = sessCtr;
00627 sp->recvTime_ = now;
00628 sp->sendTime_ = stime;
00629 for (i = sp->ldata_ + 1; i <= dataCnt; i++)
00630 if (! sp->ifReceived(i))
00631 tcl.evalf("%s request %d %d", name_, sender, i, sp->repid_);
00632 if (sp->ldata_ < dataCnt)
00633 sp->ldata_ = dataCnt;
00634
00635 for (i = 1; i < cnt; i++) {
00636 GET_SESSION_INFO;
00637 if (sender == addr() && now) {
00638 int rtt = (now - sentAt) + (rtime - stime);
00639 sp = get_state(sentBy);
00640 sp->distance_ = (double) rtt / 2 / 1000;
00641 sp->distanceFlag_ = SELF_DISTANCE;
00642 #if 0
00643 fprintf(stderr,
00644 "%7.4f %s compute distance to %d: %f\n",
00645 Scheduler::instance().clock(), name_,
00646 sentBy, sp->distance_);
00647 #endif
00648 continue;
00649 }
00650 sp = get_state(sender);
00651 for (int j = sp->ldata_ + 1; j <= dataCnt; j++)
00652 if (! sp->ifReceived(j))
00653 tcl.evalf("%s request %d %d", name_, sender, j, sp->repid_);
00654 if (sp->ldata_ < dataCnt)
00655 sp->ldata_ = dataCnt;
00656 }
00657 }
00658
00659
00660
00661
00662
00663 void SSMSRMAgent::recv_rep_sess(int sessCtr, int* data, Packet*)
00664 {
00665 Tcl& tcl = Tcl::instance();
00666 SRMinfo* sp;
00667
00668 int sender, dataCnt, rtime, stime,repid;
00669 int now, sentAt, sentBy;
00670 int cnt = *data++;
00671 data++;
00672 int i;
00673
00674
00675
00676
00677
00678 GET_SESSION_INFO;
00679 if (sender == addr())
00680 return;
00681 if (sender != repid_)
00682 return;
00683 if (sender != repid) {
00684 fprintf(stdout,"Recvd a rep-sess with repid(%d) != address(%d)\n",
00685 repid,sender);
00686 abort();
00687 }
00688 sp = get_state(sender);
00689 if (sp->lrepsess_ > sessCtr)
00690 return;
00691 if (sp->scopeFlag_ != SRM_GLOBAL)
00692 sp->scopeFlag_ = SRM_GLOBAL;
00693
00694 now = (int) (Scheduler::instance().clock() * 1000);
00695 sentBy = sender;
00696 sentAt = stime;
00697
00698 sp->lrepsess_ = sessCtr;
00699 sp->recvTime_ = now;
00700 sp->sendTime_ = stime;
00701 for (i = sp->ldata_ + 1; i <= dataCnt; i++)
00702 if (! sp->ifReceived(i))
00703 tcl.evalf("%s request %d %d", name_, sender, i, sp->repid_);
00704 if (sp->ldata_ < dataCnt)
00705 sp->ldata_ = dataCnt;
00706
00707 for (i = 1; i < cnt; i++) {
00708 GET_SESSION_INFO;
00709 if (sender == addr() && now) {
00710 int rtt = (now - sentAt) + (rtime - stime);
00711 sp = get_state(sentBy);
00712 sp->distance_ = (double) rtt / 2 / 1000;
00713 sp->distanceFlag_ = SELF_DISTANCE;
00714 #if 0
00715 fprintf(stderr,
00716 "%7.4f %s compute distance to %d: %f\n",
00717 Scheduler::instance().clock(), name_,
00718 sentBy, sp->distance_);
00719 #endif
00720 continue;
00721 }
00722 if ((sender == repid) && (sender != sentBy)) {
00723 sp = get_state(sender);
00724 if (!(is_active(sp) && (sp->distanceFlag_ == SELF_DISTANCE))) {
00725 sp->distance_ = (double) rtime/1000;
00726
00727 sp->distanceFlag_ = REP_DISTANCE;
00728
00729 }
00730 }
00731 sp = get_state(sender);
00732 for (int j = sp->ldata_ + 1; j <= dataCnt; j++)
00733 if (! sp->ifReceived(j))
00734 tcl.evalf("%s request %d %d", name_, sender, j, sp->repid_);
00735 if (sp->ldata_ < dataCnt)
00736 sp->ldata_ = dataCnt;
00737 }
00738 }
00739
00740
00741
00742 #define sessionDelay 1000
00743
00744
00745 void SSMSRMAgent::timeout_info()
00746 {
00747 int now;
00748 now = (int) (Scheduler::instance().clock() * 1000);
00749 for (SRMinfo* sp = sip_->next_; sp; sp = sp->next_) {
00750 if ((now - sp->recvTime_) >= 3*sessionDelay) {
00751 sp->activeFlag_ = INACTIVE;
00752 groupSize_--;
00753 }
00754
00755 }
00756 }
00757
00758 int SSMSRMAgent::is_active(SRMinfo *sp)
00759 {
00760 int now;
00761 now = (int) (Scheduler::instance().clock() * 1000);
00762 if ((sp->sender_ != addr()) && ((now - sp->recvTime_) >= 3*sessionDelay)) {
00763 return 0;
00764 } else {
00765 return 1;
00766 }
00767 }