From e0862fd0e016da242b0bd299725a33ab24daa5bc Mon Sep 17 00:00:00 2001 From: Greg Farnum Date: Fri, 4 Dec 2009 18:23:12 -0800 Subject: [PATCH] msgr: Only allow one local endpoint; now has simpler, less iterative code! --- src/msg/SimpleMessenger.cc | 99 +++++++++++++------------------------- src/msg/SimpleMessenger.h | 9 ++-- 2 files changed, 39 insertions(+), 69 deletions(-) diff --git a/src/msg/SimpleMessenger.cc b/src/msg/SimpleMessenger.cc index 7e9eb34109660..d7813dd29dec3 100644 --- a/src/msg/SimpleMessenger.cc +++ b/src/msg/SimpleMessenger.cc @@ -210,7 +210,7 @@ void *SimpleMessenger::Accepter::entry() } rank->lock.Lock(); - if (rank->num_local > 0) { + if (rank->local_endpoint) { Pipe *p = new Pipe(rank, Pipe::STATE_ACCEPTING); p->sd = sd; p->start_reader(); @@ -1103,11 +1103,10 @@ int SimpleMessenger::Pipe::connect() assert(connect_seq == reply.connect_seq); backoff = utime_t(); dout(20) << "connect success " << connect_seq << ", lossy = " << policy.lossy << dendl; - - for (unsigned i=0; ilocal.size(); i++) - if (rank->local[i]) - rank->local[i]->queue_connect(connection_state->get()); - + + if (rank->local_endpoint) + rank->local_endpoint->queue_connect(connection_state->get()); + if (!reader_running) { dout(20) << "connect starting reader" << dendl; start_reader(); @@ -1261,9 +1260,8 @@ void SimpleMessenger::Pipe::fail() discard_queue(); - for (unsigned i=0; ilocal.size(); i++) - if (rank->local[i]) - rank->local[i]->queue_reset(connection_state->get()); + if (rank->local_endpoint) + rank->local_endpoint->queue_reset(connection_state->get()); } void SimpleMessenger::Pipe::was_session_reset() @@ -1273,9 +1271,8 @@ void SimpleMessenger::Pipe::was_session_reset() dout(10) << "was_session_reset" << dendl; discard_queue(); - for (unsigned i=0; ilocal.size(); i++) - if (rank->local[i]) - rank->local[i]->queue_remote_reset(connection_state->get()); + if (rank->local_endpoint) + rank->local_endpoint->queue_remote_reset(connection_state->get()); out_seq = 0; in_seq = 0; @@ -1411,25 +1408,10 @@ void SimpleMessenger::Pipe::reader() << dendl; // deliver - Endpoint *entity = 0; - - rank->lock.Lock(); - { - unsigned erank = m->get_header().dst_erank; - if (erank < rank->max_local && rank->local[erank]) { - // find entity - entity = rank->local[erank]; - entity->get(); - } else { - derr(0) << "reader got message " << *m << ", which isn't local" << dendl; - } - } - rank->lock.Unlock(); - - if (entity) { - entity->queue_message(m); // queue - entity->put(); - } + if (rank->local_endpoint) + rank->local_endpoint->queue_message(m); + else derr(0) << "reader got message " << *m + << "which isn't local" << dendl; lock.Lock(); } @@ -2128,14 +2110,8 @@ SimpleMessenger::Pipe *SimpleMessenger::connect_rank(const entity_addr_t& addr, AuthAuthorizer *SimpleMessenger::get_authorizer(int peer_type, bool force_new) { - AuthAuthorizer *a; - for (unsigned r = 0; r < max_local; r++) { - if (!local[r]) - continue; - a = local[r]->ms_deliver_get_authorizer(peer_type, force_new); - if (a) - return a; - } + if (local_endpoint) + return local_endpoint->ms_deliver_get_authorizer(peer_type, force_new); return 0; } @@ -2143,11 +2119,8 @@ bool SimpleMessenger::verify_authorizer(Connection *con, int peer_type, int protocol, bufferlist& authorizer, bufferlist& authorizer_reply, bool& isvalid) { - for (unsigned r = 0; r < max_local; r++) { - if (!local[r]) - continue; - return local[r]->ms_deliver_verify_authorizer(con, peer_type, protocol, authorizer, authorizer_reply, isvalid); - } + if (local_endpoint) + return local_endpoint->ms_deliver_verify_authorizer(con, peer_type, protocol, authorizer, authorizer_reply, isvalid); return false; } @@ -2160,9 +2133,13 @@ SimpleMessenger::Endpoint *SimpleMessenger::register_entity(entity_name_t name) dout(10) << "register_entity " << name << dendl; lock.Lock(); + if (local_endpoint) { //already have an Endpoint set + lock.Unlock(); + return NULL; + } + // create messenger - int erank = max_local; - Endpoint *msgr = new Endpoint(this, name, erank); + Endpoint *msgr = new Endpoint(this, name, 0); // now i know my type. if (my_type >= 0) @@ -2170,19 +2147,12 @@ SimpleMessenger::Endpoint *SimpleMessenger::register_entity(entity_name_t name) else my_type = name.type(); - // add to directory - max_local++; - local.resize(max_local); - stopped.resize(max_local); - msgr->get(); - local[erank] = msgr; - stopped[erank] = false; + local_endpoint = msgr; + endpoint_stopped = false; dout(10) << "register_entity " << name << " at " << msgr->get_myaddr() << dendl; - num_local++; - lock.Unlock(); return msgr; } @@ -2195,10 +2165,9 @@ void SimpleMessenger::unregister_entity(Endpoint *msgr) // remove from local directory. assert(msgr->my_rank >= 0); - assert(local[msgr->my_rank] == msgr); - local[msgr->my_rank] = 0; - stopped[msgr->my_rank] = true; - num_local--; + assert(local_endpoint == msgr); + local_endpoint = 0; + endpoint_stopped = true; msgr->my_rank = -1; assert(msgr->nref.test() > 1); @@ -2225,13 +2194,13 @@ void SimpleMessenger::submit_message(Message *m, const entity_inst_t& dest, bool { // local? if (rank_addr.is_local_to(dest_addr)) { - if (dest_addr.get_erank() < max_local && local[dest_addr.get_erank()]) { + if (dest_addr.get_erank() == 0 && local_endpoint) { // local dout(20) << "submit_message " << *m << " local" << dendl; - local[dest_addr.get_erank()]->queue_message(m); + local_endpoint->queue_message(m); } else { - derr(0) << "submit_message " << *m << " " << dest_addr << " local but not in local map? dropping." << dendl; - //assert(0); // hmpf, this is probably mds->mon beacon from newsyn. + derr(0) << "submit_message " << *m << " " << dest_addr << " local but wrong erank? dropping." << dendl; + assert(0); // hmpf, this is probably mds->mon beacon from newsyn. delete m; } } @@ -2315,12 +2284,12 @@ void SimpleMessenger::wait() // reap dead pipes reaper(); - if (num_local == 0) { + if (!local_endpoint) { dout(10) << "wait: everything stopped" << dendl; break; // everything stopped. } - dout(10) << "wait: local still has " << local.size() << " items, waiting" << dendl; + dout(10) << "wait: local_endpoint still active" << dendl; wait_cond.Wait(lock); } lock.Unlock(); diff --git a/src/msg/SimpleMessenger.h b/src/msg/SimpleMessenger.h index bc4e5c5734dad..fd0fd89763926 100644 --- a/src/msg/SimpleMessenger.h +++ b/src/msg/SimpleMessenger.h @@ -379,9 +379,11 @@ private: entity_addr_t rank_addr; // local - unsigned max_local, num_local; + /*unsigned max_local, num_local; vector local; - vector stopped; + vector stopped; */ + Endpoint *local_endpoint; + bool endpoint_stopped; // remote hash_map rank_pipe; @@ -415,8 +417,7 @@ public: SimpleMessenger() : accepter(this), lock("SimpleMessenger::lock"), started(false), did_bind(false), need_addr(true), - max_local(0), num_local(0), - my_type(-1), + local_endpoint(NULL), my_type(-1), global_seq_lock("SimpleMessenger::global_seq_lock"), global_seq(0) { } ~SimpleMessenger() { } -- 2.39.5