]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
msgr: atomically queue first message with connect_rank
authorSage Weil <sage@inktank.com>
Sun, 23 Dec 2012 05:24:52 +0000 (21:24 -0800)
committerSage Weil <sage@inktank.com>
Sat, 29 Dec 2012 01:21:00 +0000 (17:21 -0800)
Atomically queue the first message on the new pipe, without dropping
and retaking pipe_lock.

Signed-off-by: Sage Weil <sage@inktank.com>
src/msg/SimpleMessenger.cc
src/msg/SimpleMessenger.h

index cb4574540095a17ca1fa63b2d9b87f1376dc7597..a6e2f984505919c5da08afb673bcde38c844bf81 100644 (file)
@@ -315,7 +315,8 @@ Pipe *SimpleMessenger::add_accept_pipe(int sd)
  */
 Pipe *SimpleMessenger::connect_rank(const entity_addr_t& addr,
                                    int type,
-                                   Connection *con)
+                                   Connection *con,
+                                   Message *first)
 {
   assert(lock.is_locked());
   assert(addr != my_inst.addr);
@@ -329,6 +330,8 @@ Pipe *SimpleMessenger::connect_rank(const entity_addr_t& addr,
   pipe->set_peer_addr(addr);
   pipe->policy = get_policy(type);
   pipe->start_writer();
+  if (first)
+    pipe->_send(first);
   pipe->pipe_lock.Unlock();
   pipe->register_pipe();
   pipes.insert(pipe);
@@ -369,7 +372,7 @@ Connection *SimpleMessenger::get_connection(const entity_inst_t& dest)
       pipe = p->second;
       ldout(cct, 10) << "get_connection " << dest << " existing " << pipe << dendl;
     } else {
-      pipe = connect_rank(dest.addr, dest.name.type(), NULL);
+      pipe = connect_rank(dest.addr, dest.name.type(), NULL, NULL);
       ldout(cct, 10) << "get_connection " << dest << " new " << pipe << dendl;
     }
     Mutex::Locker l(pipe->pipe_lock);
@@ -420,9 +423,7 @@ void SimpleMessenger::submit_message(Message *m, Connection *con,
     m->put();
   } else {
     ldout(cct,20) << "submit_message " << *m << " remote, " << dest_addr << ", new pipe." << dendl;
-    // not connected.
-    Pipe *pipe = connect_rank(dest_addr, dest_type, con);
-    pipe->send(m);
+    connect_rank(dest_addr, dest_type, con, m);
   }
 }
 
index d35fa039e485ee14ca8e51f9fcb5fcfa4fed17e2..ca27cae7b8d89ad320929c7e3e5743cb91df895a 100644 (file)
@@ -431,11 +431,12 @@ private:
    * @param type The peer type of the entity at the address.
    * @param con An existing Connection to associate with the new Pipe. If
    * NULL, it creates a new Connection.
+   * @param msg an initial message to queue on the new pipe
    *
    * @return a pointer to the newly-created Pipe. Caller does not own a
    * reference; take one if you need it.
    */
-  Pipe *connect_rank(const entity_addr_t& addr, int type, Connection *con);
+  Pipe *connect_rank(const entity_addr_t& addr, int type, Connection *con, Message *first);
   /**
    * Send a message, lazily or not.
    * This just glues [lazy_]send_message together and passes