From e3ef14840aaf9f7d10f3b585cdde0dd1a3ab3289 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Thu, 22 Apr 2010 13:10:40 -0700 Subject: [PATCH] msgr: never (re)open pipe when sending message directly to Connection* --- src/msg/SimpleMessenger.cc | 71 ++++++++++++++++---------------------- src/msg/SimpleMessenger.h | 12 +++---- 2 files changed, 36 insertions(+), 47 deletions(-) diff --git a/src/msg/SimpleMessenger.cc b/src/msg/SimpleMessenger.cc index 1e0a57cbdbfa1..46d08a3b91eb2 100644 --- a/src/msg/SimpleMessenger.cc +++ b/src/msg/SimpleMessenger.cc @@ -390,8 +390,7 @@ int SimpleMessenger::send_message(Message *m, const entity_inst_t& dest) << " " << m << dendl; - submit_message(m, dest); - + submit_message(m, dest.addr, dest.name.type(), false); return 0; } @@ -402,14 +401,12 @@ int SimpleMessenger::send_message(Message *m, Connection *con) if (!m->get_priority()) m->set_priority(get_default_send_priority()); - dout(1) << "--> " << con->get_peer_addr() << " -- " << *m << " -- ?+" << m->get_data().length() << " " << m << dendl; - submit_message(m, (Pipe **)&con->pipe, con->get_peer_addr(), - con->get_peer_type()); + submit_message(m, (SimpleMessenger::Pipe *)con->pipe); return 0; } @@ -427,27 +424,7 @@ int SimpleMessenger::lazy_send_message(Message *m, const entity_inst_t& dest) << " " << m << dendl; - submit_message(m, dest, true); - - return 0; -} - -int SimpleMessenger::lazy_send_message(Message *m, Connection *con) -{ - //set envelope - m->get_header().src = get_myname(); - - if (!m->get_priority()) m->set_priority(get_default_send_priority()); - - - dout(1) << "lazy " - << "--> " << con->get_peer_addr() << " -- " << *m - << " -- ?+" << m->get_data().length() - << " " << m - << dendl; - - submit_message(m, (Pipe **)&con->pipe, con->get_peer_addr(), - con->get_peer_type(), true); + submit_message(m, dest.addr, dest.name.type(), true); return 0; } @@ -2288,15 +2265,31 @@ bool SimpleMessenger::register_entity(entity_name_t name) return true; } -void SimpleMessenger::submit_message(Message *m, Pipe **ppipe, - const entity_addr_t& dest_addr, - int dest_type, bool lazy) +void SimpleMessenger::submit_message(Message *m, Pipe *pipe) +{ + lock.Lock(); + { + pipe->pipe_lock.Lock(); + if (pipe->state == Pipe::STATE_CLOSED) { + dout(20) << "submit_message " << *m << " ignoring closed pipe " << pipe->peer_addr << dendl; + pipe->unregister_pipe(); + pipe->pipe_lock.Unlock(); + m->put(); + } else { + dout(20) << "submit_message " << *m << " remote " << pipe->peer_addr << dendl; + pipe->_send(m); + pipe->pipe_lock.Unlock(); + } + } + lock.Unlock(); +} + +void SimpleMessenger::submit_message(Message *m, const entity_addr_t& dest_addr, int dest_type, bool lazy) { assert(m->nref.test() == 1); //this is just to make sure that a changeset //is working properly; if you start using the refcounting more and have multiple //people hanging on to a message, ditch the assert! - lock.Lock(); { // local? @@ -2310,28 +2303,24 @@ void SimpleMessenger::submit_message(Message *m, Pipe **ppipe, assert(0); // hmpf, this is probably mds->mon beacon from newsyn. m->put(); } - } - else { // remote pipe. - Pipe *pipe = NULL; - if (ppipe) pipe = *ppipe; - if (!pipe && !rank_pipe.count(dest_addr)) goto no_pipe; - else { - // connected? - if (!pipe) pipe = rank_pipe[ dest_addr ]; + } else { + // remote pipe. + Pipe *pipe = 0; + if (rank_pipe.count(dest_addr)) { + pipe = rank_pipe[ dest_addr ]; pipe->pipe_lock.Lock(); if (pipe->state == Pipe::STATE_CLOSED) { - dout(20) << "submit_message " << *m << " remote, " << dest_addr << ", ignoring old closed pipe." << dendl; + dout(20) << "submit_message " << *m << " remote, " << dest_addr << ", ignoring closed pipe." << dendl; pipe->unregister_pipe(); pipe->pipe_lock.Unlock(); pipe = 0; } else { dout(20) << "submit_message " << *m << " remote, " << dest_addr << ", have pipe." << dendl; - + pipe->_send(m); pipe->pipe_lock.Unlock(); } } - no_pipe: if (!pipe) { if (lazy) { dout(20) << "submit_message " << *m << " remote, " << dest_addr << ", lazy, dropping." << dendl; diff --git a/src/msg/SimpleMessenger.h b/src/msg/SimpleMessenger.h index 0f7364a227f0c..2a01336f3c3d2 100644 --- a/src/msg/SimpleMessenger.h +++ b/src/msg/SimpleMessenger.h @@ -474,7 +474,10 @@ private: int send_message(Message *m, const entity_inst_t& dest); int send_message(Message *m, Connection *con); int lazy_send_message(Message *m, const entity_inst_t& dest); - int lazy_send_message(Message *m, Connection *con); + int lazy_send_message(Message *m, Connection *con) { + return send_message(m, con); + } + /***********************/ private: @@ -528,11 +531,8 @@ public: bool register_entity(entity_name_t addr); - void submit_message(Message *m, const entity_inst_t& addr, bool lazy=false) { - submit_message(m, NULL, addr.addr, addr.name.type(), lazy); - } - void submit_message(Message *m, Pipe **ppipe, const entity_addr_t& dest_addr, - int dest_type, bool lazy=false); + void submit_message(Message *m, const entity_addr_t& addr, int dest_type, bool lazy); + void submit_message(Message *m, Pipe *pipe); int send_keepalive(const entity_inst_t& addr); -- 2.39.5