From: Sage Weil Date: Tue, 16 Oct 2012 20:03:53 +0000 (-0700) Subject: mds: explicitly queue messages for unconnected clients X-Git-Tag: v0.54~40 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=db976663a50ce7ab2448c42cb0c6bfd4f58d25a5;p=ceph.git mds: explicitly queue messages for unconnected clients Previously, the messenger would queue messages for a destination that didn't exist when you were a server; that changed a while back with the wip-msgr merge (circa v0.52). The result is that when we force open client sessions and queue messages, they are dropped on the floor and the client--when it does connect--gets confusing stuff from the MDS. Instead, explicitly queue and send these messages. Also, *always* send via the Connection* instead of the inst. Fixes: #2681 Signed-off-by: Sage Weil --- diff --git a/src/mds/MDS.cc b/src/mds/MDS.cc index c02611a409b7..df9002e8d595 100644 --- a/src/mds/MDS.cc +++ b/src/mds/MDS.cc @@ -411,7 +411,7 @@ void MDS::send_message_client_counted(Message *m, Session *session) if (session->connection) { messenger->send_message(m, session->connection); } else { - messenger->send_message(m, session->inst); + session->preopen_out_queue.push_back(m); } } @@ -421,7 +421,7 @@ void MDS::send_message_client(Message *m, Session *session) if (session->connection) { messenger->send_message(m, session->connection); } else { - messenger->send_message(m, session->inst); + session->preopen_out_queue.push_back(m); } } @@ -2101,6 +2101,12 @@ void MDS::ms_handle_accept(Connection *con) if (s->connection != con) { dout(10) << " session connection " << s->connection << " -> " << con << dendl; s->connection = con; + + // send out any queued messages + while (!s->preopen_out_queue.empty()) { + messenger->send_message(s->preopen_out_queue.front(), con); + s->preopen_out_queue.pop_front(); + } } } } diff --git a/src/mds/Server.cc b/src/mds/Server.cc index 4a983b25414a..8f36b0237043 100644 --- a/src/mds/Server.cc +++ b/src/mds/Server.cc @@ -270,7 +270,7 @@ void Server::_session_logged(Session *session, uint64_t state_seq, bool open, ve assert(session->is_opening()); mds->sessionmap.set_state(session, Session::STATE_OPEN); mds->sessionmap.touch_session(session); - mds->messenger->send_message(new MClientSession(CEPH_SESSION_OPEN), session->inst); + mds->messenger->send_message(new MClientSession(CEPH_SESSION_OPEN), session->connection); } else if (session->is_closing() || session->is_killing()) { // kill any lingering capabilities, leases, requests @@ -351,7 +351,7 @@ void Server::finish_force_open_sessions(map& cm, dout(10) << "force_open_sessions opened " << session->inst << dendl; mds->sessionmap.set_state(session, Session::STATE_OPEN); mds->sessionmap.touch_session(session); - mds->messenger->send_message(new MClientSession(CEPH_SESSION_OPEN), session->inst); + session->preopen_out_queue.push_back(new MClientSession(CEPH_SESSION_OPEN)); } } else { dout(10) << "force_open_sessions skipping already-open " << session->inst << dendl; diff --git a/src/mds/SessionMap.h b/src/mds/SessionMap.h index 6bca3611e59e..4735fed4a097 100644 --- a/src/mds/SessionMap.h +++ b/src/mds/SessionMap.h @@ -86,6 +86,8 @@ public: Connection *connection; xlist::item item_session_list; + list preopen_out_queue; ///< messages for client, queued before they connect + elist requests; interval_set pending_prealloc_inos; // journaling prealloc, will be added to prealloc_inos @@ -187,6 +189,10 @@ public: lease_seq(0) { } ~Session() { assert(!item_session_list.is_on_list()); + while (!preopen_out_queue.empty()) { + preopen_out_queue.front()->put(); + preopen_out_queue.pop_front(); + } } void clear() {