]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
msgr: never (re)open pipe when sending message directly to Connection*
authorSage Weil <sage@newdream.net>
Thu, 22 Apr 2010 20:10:40 +0000 (13:10 -0700)
committerSage Weil <sage@newdream.net>
Thu, 22 Apr 2010 20:10:40 +0000 (13:10 -0700)
src/msg/SimpleMessenger.cc
src/msg/SimpleMessenger.h

index 1e0a57cbdbfa19edca1b54060b6d7311999ce738..46d08a3b91eb2e1353c99bbbf3e9a50b437a16d9 100644 (file)
@@ -390,8 +390,7 @@ int SimpleMessenger::send_message(Message *m, const entity_inst_t& dest)
          << " " << m 
          << dendl;
 
-  submit_message(m, dest);
-
+  submit_message(m, dest.addr, dest.name.type(), false);
   return 0;
 }
 
@@ -402,14 +401,12 @@ int SimpleMessenger::send_message(Message *m, Connection *con)
 
   if (!m->get_priority()) m->set_priority(get_default_send_priority());
 
-
   dout(1) << "--> " << con->get_peer_addr() << " -- " << *m
          << " -- ?+" << m->get_data().length()
          << " " << m
          << dendl;
 
-  submit_message(m, (Pipe **)&con->pipe, con->get_peer_addr(),
-                con->get_peer_type());
+  submit_message(m, (SimpleMessenger::Pipe *)con->pipe);
   return 0;
 }
 
@@ -427,27 +424,7 @@ int SimpleMessenger::lazy_send_message(Message *m, const entity_inst_t& dest)
          << " " << m 
           << dendl;
 
-  submit_message(m, dest, true);
-
-  return 0;
-}
-
-int SimpleMessenger::lazy_send_message(Message *m, Connection *con)
-{
-  //set envelope
-  m->get_header().src = get_myname();
-
-  if (!m->get_priority()) m->set_priority(get_default_send_priority());
-
-
-  dout(1) << "lazy "
-         << "--> " << con->get_peer_addr() << " -- " << *m
-         << " -- ?+" << m->get_data().length()
-         << " " << m
-         << dendl;
-
-  submit_message(m, (Pipe **)&con->pipe, con->get_peer_addr(),
-                con->get_peer_type(), true);
+  submit_message(m, dest.addr, dest.name.type(), true);
   return 0;
 }
 
@@ -2288,15 +2265,31 @@ bool SimpleMessenger::register_entity(entity_name_t name)
   return true;
 }
 
-void SimpleMessenger::submit_message(Message *m, Pipe **ppipe,
-                                    const entity_addr_t& dest_addr,
-                                    int dest_type, bool lazy)
+void SimpleMessenger::submit_message(Message *m, Pipe *pipe)
+{ 
+  lock.Lock();
+  {
+    pipe->pipe_lock.Lock();
+    if (pipe->state == Pipe::STATE_CLOSED) {
+      dout(20) << "submit_message " << *m << " ignoring closed pipe " << pipe->peer_addr << dendl;
+      pipe->unregister_pipe();
+      pipe->pipe_lock.Unlock();
+      m->put();
+    } else {
+      dout(20) << "submit_message " << *m << " remote " << pipe->peer_addr << dendl;
+      pipe->_send(m);
+      pipe->pipe_lock.Unlock();
+    }
+  }
+  lock.Unlock();
+}
+
+void SimpleMessenger::submit_message(Message *m, const entity_addr_t& dest_addr, int dest_type, bool lazy)
 {
   assert(m->nref.test() == 1); //this is just to make sure that a changeset
   //is working properly; if you start using the refcounting more and have multiple
   //people hanging on to a message, ditch the assert!
 
-
   lock.Lock();
   {
     // local?
@@ -2310,28 +2303,24 @@ void SimpleMessenger::submit_message(Message *m, Pipe **ppipe,
         assert(0);  // hmpf, this is probably mds->mon beacon from newsyn.
         m->put();
       }
-    }
-    else { // remote pipe.
-      Pipe *pipe = NULL;
-      if (ppipe) pipe = *ppipe;
-      if (!pipe && !rank_pipe.count(dest_addr)) goto no_pipe;
-      else {
-        // connected?
-        if (!pipe) pipe = rank_pipe[ dest_addr ];
+    } else {
+      // remote pipe.
+      Pipe *pipe = 0;
+      if (rank_pipe.count(dest_addr)) {
+       pipe = rank_pipe[ dest_addr ];
        pipe->pipe_lock.Lock();
        if (pipe->state == Pipe::STATE_CLOSED) {
-         dout(20) << "submit_message " << *m << " remote, " << dest_addr << ", ignoring old closed pipe." << dendl;
+         dout(20) << "submit_message " << *m << " remote, " << dest_addr << ", ignoring closed pipe." << dendl;
          pipe->unregister_pipe();
          pipe->pipe_lock.Unlock();
          pipe = 0;
        } else {
          dout(20) << "submit_message " << *m << " remote, " << dest_addr << ", have pipe." << dendl;
-
+         
          pipe->_send(m);
          pipe->pipe_lock.Unlock();
        }
       }
-    no_pipe:
       if (!pipe) {
        if (lazy) {
          dout(20) << "submit_message " << *m << " remote, " << dest_addr << ", lazy, dropping." << dendl;
index 0f7364a227f0ccca86d2772b3777004e2492353b..2a01336f3c3d291cf258e40029447dab0e0af044 100644 (file)
@@ -474,7 +474,10 @@ private:
   int send_message(Message *m, const entity_inst_t& dest);
   int send_message(Message *m, Connection *con);
   int lazy_send_message(Message *m, const entity_inst_t& dest);
-  int lazy_send_message(Message *m, Connection *con);
+  int lazy_send_message(Message *m, Connection *con) {
+    return send_message(m, con);
+  }
+
   /***********************/
 
 private:
@@ -528,11 +531,8 @@ public:
 
   bool register_entity(entity_name_t addr);
 
-  void submit_message(Message *m, const entity_inst_t& addr, bool lazy=false) {
-    submit_message(m, NULL, addr.addr, addr.name.type(), lazy);
-  }
-  void submit_message(Message *m, Pipe **ppipe, const entity_addr_t& dest_addr,
-                     int dest_type, bool lazy=false);
+  void submit_message(Message *m, const entity_addr_t& addr, int dest_type, bool lazy);
+  void submit_message(Message *m, Pipe *pipe);
                      
   int send_keepalive(const entity_inst_t& addr);