]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
msgr: Only allow one local endpoint; now has simpler, less iterative code!
authorGreg Farnum <gregf@hq.newdream.net>
Sat, 5 Dec 2009 02:23:12 +0000 (18:23 -0800)
committerGreg Farnum <gregf@hq.newdream.net>
Sat, 5 Dec 2009 02:23:12 +0000 (18:23 -0800)
src/msg/SimpleMessenger.cc
src/msg/SimpleMessenger.h

index 7e9eb341096607c3870f2ecd99e8a4e15df0abe3..d7813dd29dec3a24317f68456638c65953abf668 100644 (file)
@@ -210,7 +210,7 @@ void *SimpleMessenger::Accepter::entry()
       }
       
       rank->lock.Lock();
-      if (rank->num_local > 0) {
+      if (rank->local_endpoint) {
        Pipe *p = new Pipe(rank, Pipe::STATE_ACCEPTING);
        p->sd = sd;
        p->start_reader();
@@ -1103,11 +1103,10 @@ int SimpleMessenger::Pipe::connect()
       assert(connect_seq == reply.connect_seq);
       backoff = utime_t();
       dout(20) << "connect success " << connect_seq << ", lossy = " << policy.lossy << dendl;
-
-      for (unsigned i=0; i<rank->local.size(); i++) 
-       if (rank->local[i])
-         rank->local[i]->queue_connect(connection_state->get());
-
+      
+      if (rank->local_endpoint)
+       rank->local_endpoint->queue_connect(connection_state->get());
+      
       if (!reader_running) {
        dout(20) << "connect starting reader" << dendl;
        start_reader();
@@ -1261,9 +1260,8 @@ void SimpleMessenger::Pipe::fail()
 
   discard_queue();
   
-  for (unsigned i=0; i<rank->local.size(); i++) 
-    if (rank->local[i])
-      rank->local[i]->queue_reset(connection_state->get());
+  if (rank->local_endpoint)
+    rank->local_endpoint->queue_reset(connection_state->get());
 }
 
 void SimpleMessenger::Pipe::was_session_reset()
@@ -1273,9 +1271,8 @@ void SimpleMessenger::Pipe::was_session_reset()
   dout(10) << "was_session_reset" << dendl;
   discard_queue();
 
-  for (unsigned i=0; i<rank->local.size(); i++) 
-    if (rank->local[i])
-      rank->local[i]->queue_remote_reset(connection_state->get());
+  if (rank->local_endpoint)
+    rank->local_endpoint->queue_remote_reset(connection_state->get());
 
   out_seq = 0;
   in_seq = 0;
@@ -1411,25 +1408,10 @@ void SimpleMessenger::Pipe::reader()
               << dendl;
       
       // deliver
-      Endpoint *entity = 0;
-      
-      rank->lock.Lock();
-      {
-       unsigned erank = m->get_header().dst_erank;
-       if (erank < rank->max_local && rank->local[erank]) {
-         // find entity
-         entity = rank->local[erank];
-         entity->get();
-       } else {
-         derr(0) << "reader got message " << *m << ", which isn't local" << dendl;
-       }
-      }
-      rank->lock.Unlock();
-      
-      if (entity) {
-       entity->queue_message(m);        // queue
-       entity->put();
-      }
+      if (rank->local_endpoint)
+       rank->local_endpoint->queue_message(m);
+      else derr(0) << "reader got message " << *m
+                  << "which isn't local" << dendl;
 
       lock.Lock();
     } 
@@ -2128,14 +2110,8 @@ SimpleMessenger::Pipe *SimpleMessenger::connect_rank(const entity_addr_t& addr,
 
 AuthAuthorizer *SimpleMessenger::get_authorizer(int peer_type, bool force_new)
 {
-  AuthAuthorizer *a;
-  for (unsigned r = 0; r < max_local; r++) {
-    if (!local[r])
-      continue;
-    a = local[r]->ms_deliver_get_authorizer(peer_type, force_new);
-    if (a)
-      return a;
-  }
+  if (local_endpoint)
+    return local_endpoint->ms_deliver_get_authorizer(peer_type, force_new);
   return 0;
 }
 
@@ -2143,11 +2119,8 @@ bool SimpleMessenger::verify_authorizer(Connection *con, int peer_type,
                                        int protocol, bufferlist& authorizer, bufferlist& authorizer_reply,
                                        bool& isvalid)
 {
-  for (unsigned r = 0; r < max_local; r++) {
-    if (!local[r])
-      continue;
-    return local[r]->ms_deliver_verify_authorizer(con, peer_type, protocol, authorizer, authorizer_reply, isvalid);
-  }
+  if (local_endpoint)
+    return local_endpoint->ms_deliver_verify_authorizer(con, peer_type, protocol, authorizer, authorizer_reply, isvalid);
   return false;
 }
 
@@ -2160,9 +2133,13 @@ SimpleMessenger::Endpoint *SimpleMessenger::register_entity(entity_name_t name)
   dout(10) << "register_entity " << name << dendl;
   lock.Lock();
   
+  if (local_endpoint) { //already have an Endpoint set
+    lock.Unlock();
+    return NULL;
+  }
+
   // create messenger
-  int erank = max_local;
-  Endpoint *msgr = new Endpoint(this, name, erank);
+  Endpoint *msgr = new Endpoint(this, name, 0);
 
   // now i know my type.
   if (my_type >= 0)
@@ -2170,19 +2147,12 @@ SimpleMessenger::Endpoint *SimpleMessenger::register_entity(entity_name_t name)
   else
     my_type = name.type();
 
-  // add to directory
-  max_local++;
-  local.resize(max_local);
-  stopped.resize(max_local);
-
   msgr->get();
-  local[erank] = msgr;
-  stopped[erank] = false;
+  local_endpoint = msgr;
+  endpoint_stopped = false;
 
   dout(10) << "register_entity " << name << " at " << msgr->get_myaddr() << dendl;
 
-  num_local++;
-  
   lock.Unlock();
   return msgr;
 }
@@ -2195,10 +2165,9 @@ void SimpleMessenger::unregister_entity(Endpoint *msgr)
   
   // remove from local directory.
   assert(msgr->my_rank >= 0);
-  assert(local[msgr->my_rank] == msgr);
-  local[msgr->my_rank] = 0;
-  stopped[msgr->my_rank] = true;
-  num_local--;
+  assert(local_endpoint == msgr);
+  local_endpoint = 0;
+  endpoint_stopped = true;
   msgr->my_rank = -1;
 
   assert(msgr->nref.test() > 1);
@@ -2225,13 +2194,13 @@ void SimpleMessenger::submit_message(Message *m, const entity_inst_t& dest, bool
   {
     // local?
     if (rank_addr.is_local_to(dest_addr)) {
-      if (dest_addr.get_erank() < max_local && local[dest_addr.get_erank()]) {
+      if (dest_addr.get_erank() == 0 && local_endpoint) {
         // local
         dout(20) << "submit_message " << *m << " local" << dendl;
-       local[dest_addr.get_erank()]->queue_message(m);
+       local_endpoint->queue_message(m);
       } else {
-        derr(0) << "submit_message " << *m << " " << dest_addr << " local but not in local map?  dropping." << dendl;
-        //assert(0);  // hmpf, this is probably mds->mon beacon from newsyn.
+        derr(0) << "submit_message " << *m << " " << dest_addr << " local but wrong erank? dropping." << dendl;
+        assert(0);  // hmpf, this is probably mds->mon beacon from newsyn.
        delete m;
       }
     }
@@ -2315,12 +2284,12 @@ void SimpleMessenger::wait()
     // reap dead pipes
     reaper();
 
-    if (num_local == 0) {
+    if (!local_endpoint) {
       dout(10) << "wait: everything stopped" << dendl;
       break;   // everything stopped.
     }
 
-    dout(10) << "wait: local still has " << local.size() << " items, waiting" << dendl;
+    dout(10) << "wait: local_endpoint still active" << dendl;
     wait_cond.Wait(lock);
   }
   lock.Unlock();
index bc4e5c5734dadd5a03681c6f1b32565654e28477..fd0fd897639263be1933d64a4b86109d2a111a03 100644 (file)
@@ -379,9 +379,11 @@ private:
   entity_addr_t rank_addr;
   
   // local
-  unsigned max_local, num_local;
+  /*unsigned max_local, num_local;
   vector<Endpoint*> local;
-  vector<bool>             stopped;
+  vector<bool>             stopped; */
+  Endpoint *local_endpoint;
+  bool endpoint_stopped;
   
   // remote
   hash_map<entity_addr_t, Pipe*> rank_pipe;
@@ -415,8 +417,7 @@ public:
   SimpleMessenger() :
     accepter(this),
     lock("SimpleMessenger::lock"), started(false), did_bind(false), need_addr(true),
-    max_local(0), num_local(0),
-    my_type(-1),
+    local_endpoint(NULL), my_type(-1),
     global_seq_lock("SimpleMessenger::global_seq_lock"), global_seq(0) { }
   ~SimpleMessenger() { }