}
rank->lock.Lock();
- if (rank->num_local > 0) {
+ if (rank->local_endpoint) {
Pipe *p = new Pipe(rank, Pipe::STATE_ACCEPTING);
p->sd = sd;
p->start_reader();
assert(connect_seq == reply.connect_seq);
backoff = utime_t();
dout(20) << "connect success " << connect_seq << ", lossy = " << policy.lossy << dendl;
-
- for (unsigned i=0; i<rank->local.size(); i++)
- if (rank->local[i])
- rank->local[i]->queue_connect(connection_state->get());
-
+
+ if (rank->local_endpoint)
+ rank->local_endpoint->queue_connect(connection_state->get());
+
if (!reader_running) {
dout(20) << "connect starting reader" << dendl;
start_reader();
discard_queue();
- for (unsigned i=0; i<rank->local.size(); i++)
- if (rank->local[i])
- rank->local[i]->queue_reset(connection_state->get());
+ if (rank->local_endpoint)
+ rank->local_endpoint->queue_reset(connection_state->get());
}
void SimpleMessenger::Pipe::was_session_reset()
dout(10) << "was_session_reset" << dendl;
discard_queue();
- for (unsigned i=0; i<rank->local.size(); i++)
- if (rank->local[i])
- rank->local[i]->queue_remote_reset(connection_state->get());
+ if (rank->local_endpoint)
+ rank->local_endpoint->queue_remote_reset(connection_state->get());
out_seq = 0;
in_seq = 0;
<< dendl;
// deliver
- Endpoint *entity = 0;
-
- rank->lock.Lock();
- {
- unsigned erank = m->get_header().dst_erank;
- if (erank < rank->max_local && rank->local[erank]) {
- // find entity
- entity = rank->local[erank];
- entity->get();
- } else {
- derr(0) << "reader got message " << *m << ", which isn't local" << dendl;
- }
- }
- rank->lock.Unlock();
-
- if (entity) {
- entity->queue_message(m); // queue
- entity->put();
- }
+ if (rank->local_endpoint)
+ rank->local_endpoint->queue_message(m);
+ else derr(0) << "reader got message " << *m
+ << "which isn't local" << dendl;
lock.Lock();
}
AuthAuthorizer *SimpleMessenger::get_authorizer(int peer_type, bool force_new)
{
- AuthAuthorizer *a;
- for (unsigned r = 0; r < max_local; r++) {
- if (!local[r])
- continue;
- a = local[r]->ms_deliver_get_authorizer(peer_type, force_new);
- if (a)
- return a;
- }
+ if (local_endpoint)
+ return local_endpoint->ms_deliver_get_authorizer(peer_type, force_new);
return 0;
}
int protocol, bufferlist& authorizer, bufferlist& authorizer_reply,
bool& isvalid)
{
- for (unsigned r = 0; r < max_local; r++) {
- if (!local[r])
- continue;
- return local[r]->ms_deliver_verify_authorizer(con, peer_type, protocol, authorizer, authorizer_reply, isvalid);
- }
+ if (local_endpoint)
+ return local_endpoint->ms_deliver_verify_authorizer(con, peer_type, protocol, authorizer, authorizer_reply, isvalid);
return false;
}
dout(10) << "register_entity " << name << dendl;
lock.Lock();
+ if (local_endpoint) { //already have an Endpoint set
+ lock.Unlock();
+ return NULL;
+ }
+
// create messenger
- int erank = max_local;
- Endpoint *msgr = new Endpoint(this, name, erank);
+ Endpoint *msgr = new Endpoint(this, name, 0);
// now i know my type.
if (my_type >= 0)
else
my_type = name.type();
- // add to directory
- max_local++;
- local.resize(max_local);
- stopped.resize(max_local);
-
msgr->get();
- local[erank] = msgr;
- stopped[erank] = false;
+ local_endpoint = msgr;
+ endpoint_stopped = false;
dout(10) << "register_entity " << name << " at " << msgr->get_myaddr() << dendl;
- num_local++;
-
lock.Unlock();
return msgr;
}
// remove from local directory.
assert(msgr->my_rank >= 0);
- assert(local[msgr->my_rank] == msgr);
- local[msgr->my_rank] = 0;
- stopped[msgr->my_rank] = true;
- num_local--;
+ assert(local_endpoint == msgr);
+ local_endpoint = 0;
+ endpoint_stopped = true;
msgr->my_rank = -1;
assert(msgr->nref.test() > 1);
{
// local?
if (rank_addr.is_local_to(dest_addr)) {
- if (dest_addr.get_erank() < max_local && local[dest_addr.get_erank()]) {
+ if (dest_addr.get_erank() == 0 && local_endpoint) {
// local
dout(20) << "submit_message " << *m << " local" << dendl;
- local[dest_addr.get_erank()]->queue_message(m);
+ local_endpoint->queue_message(m);
} else {
- derr(0) << "submit_message " << *m << " " << dest_addr << " local but not in local map? dropping." << dendl;
- //assert(0); // hmpf, this is probably mds->mon beacon from newsyn.
+ derr(0) << "submit_message " << *m << " " << dest_addr << " local but wrong erank? dropping." << dendl;
+ assert(0); // hmpf, this is probably mds->mon beacon from newsyn.
delete m;
}
}
// reap dead pipes
reaper();
- if (num_local == 0) {
+ if (!local_endpoint) {
dout(10) << "wait: everything stopped" << dendl;
break; // everything stopped.
}
- dout(10) << "wait: local still has " << local.size() << " items, waiting" << dendl;
+ dout(10) << "wait: local_endpoint still active" << dendl;
wait_cond.Wait(lock);
}
lock.Unlock();