From 7bf0b0854d1f2706a3a2302bcbf92dd5c8c888ef Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Sat, 22 Dec 2012 21:24:52 -0800 Subject: [PATCH] msgr: atomically queue first message with connect_rank Atomically queue the first message on the new pipe, without dropping and retaking pipe_lock. Signed-off-by: Sage Weil --- src/msg/SimpleMessenger.cc | 11 ++++++----- src/msg/SimpleMessenger.h | 3 ++- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/src/msg/SimpleMessenger.cc b/src/msg/SimpleMessenger.cc index cb4574540095a..a6e2f98450591 100644 --- a/src/msg/SimpleMessenger.cc +++ b/src/msg/SimpleMessenger.cc @@ -315,7 +315,8 @@ Pipe *SimpleMessenger::add_accept_pipe(int sd) */ Pipe *SimpleMessenger::connect_rank(const entity_addr_t& addr, int type, - Connection *con) + Connection *con, + Message *first) { assert(lock.is_locked()); assert(addr != my_inst.addr); @@ -329,6 +330,8 @@ Pipe *SimpleMessenger::connect_rank(const entity_addr_t& addr, pipe->set_peer_addr(addr); pipe->policy = get_policy(type); pipe->start_writer(); + if (first) + pipe->_send(first); pipe->pipe_lock.Unlock(); pipe->register_pipe(); pipes.insert(pipe); @@ -369,7 +372,7 @@ Connection *SimpleMessenger::get_connection(const entity_inst_t& dest) pipe = p->second; ldout(cct, 10) << "get_connection " << dest << " existing " << pipe << dendl; } else { - pipe = connect_rank(dest.addr, dest.name.type(), NULL); + pipe = connect_rank(dest.addr, dest.name.type(), NULL, NULL); ldout(cct, 10) << "get_connection " << dest << " new " << pipe << dendl; } Mutex::Locker l(pipe->pipe_lock); @@ -420,9 +423,7 @@ void SimpleMessenger::submit_message(Message *m, Connection *con, m->put(); } else { ldout(cct,20) << "submit_message " << *m << " remote, " << dest_addr << ", new pipe." << dendl; - // not connected. - Pipe *pipe = connect_rank(dest_addr, dest_type, con); - pipe->send(m); + connect_rank(dest_addr, dest_type, con, m); } } diff --git a/src/msg/SimpleMessenger.h b/src/msg/SimpleMessenger.h index d35fa039e485e..ca27cae7b8d89 100644 --- a/src/msg/SimpleMessenger.h +++ b/src/msg/SimpleMessenger.h @@ -431,11 +431,12 @@ private: * @param type The peer type of the entity at the address. * @param con An existing Connection to associate with the new Pipe. If * NULL, it creates a new Connection. + * @param msg an initial message to queue on the new pipe * * @return a pointer to the newly-created Pipe. Caller does not own a * reference; take one if you need it. */ - Pipe *connect_rank(const entity_addr_t& addr, int type, Connection *con); + Pipe *connect_rank(const entity_addr_t& addr, int type, Connection *con, Message *first); /** * Send a message, lazily or not. * This just glues [lazy_]send_message together and passes -- 2.39.5