]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
mds: explicitly queue messages for unconnected clients
authorSage Weil <sage@inktank.com>
Tue, 16 Oct 2012 20:03:53 +0000 (13:03 -0700)
committerSage Weil <sage@inktank.com>
Tue, 16 Oct 2012 20:04:43 +0000 (13:04 -0700)
Previously, the messenger would queue messages for a destination that
didn't exist when you were a server; that changed a while back with the
wip-msgr merge (circa v0.52).  The result is that when we force open
client sessions and queue messages, they are dropped on the floor and the
client--when it does connect--gets confusing stuff from the MDS.

Instead, explicitly queue and send these messages.  Also, *always* send
via the Connection* instead of the inst.

Fixes: #2681
Signed-off-by: Sage Weil <sage@inktank.com>
src/mds/MDS.cc
src/mds/Server.cc
src/mds/SessionMap.h

index c02611a409b7e3a8fe402886a2db918bdc1a276d..df9002e8d59575b100afafc33d48c85a658e8d3f 100644 (file)
@@ -411,7 +411,7 @@ void MDS::send_message_client_counted(Message *m, Session *session)
   if (session->connection) {
     messenger->send_message(m, session->connection);
   } else {
-    messenger->send_message(m, session->inst);
+    session->preopen_out_queue.push_back(m);
   }
 }
 
@@ -421,7 +421,7 @@ void MDS::send_message_client(Message *m, Session *session)
  if (session->connection) {
     messenger->send_message(m, session->connection);
   } else {
-    messenger->send_message(m, session->inst);
+    session->preopen_out_queue.push_back(m);
   }
 }
 
@@ -2101,6 +2101,12 @@ void MDS::ms_handle_accept(Connection *con)
     if (s->connection != con) {
       dout(10) << " session connection " << s->connection << " -> " << con << dendl;
       s->connection = con;
+
+      // send out any queued messages
+      while (!s->preopen_out_queue.empty()) {
+       messenger->send_message(s->preopen_out_queue.front(), con);
+       s->preopen_out_queue.pop_front();
+      }
     }
   }
 }
index 4a983b25414ac393ca11c79fff1c95753c8196d5..8f36b023704329bcc3e2cb1f54502e533e6e265e 100644 (file)
@@ -270,7 +270,7 @@ void Server::_session_logged(Session *session, uint64_t state_seq, bool open, ve
     assert(session->is_opening());
     mds->sessionmap.set_state(session, Session::STATE_OPEN);
     mds->sessionmap.touch_session(session);
-    mds->messenger->send_message(new MClientSession(CEPH_SESSION_OPEN), session->inst);
+    mds->messenger->send_message(new MClientSession(CEPH_SESSION_OPEN), session->connection);
   } else if (session->is_closing() ||
             session->is_killing()) {
     // kill any lingering capabilities, leases, requests
@@ -351,7 +351,7 @@ void Server::finish_force_open_sessions(map<client_t,entity_inst_t>& cm,
        dout(10) << "force_open_sessions opened " << session->inst << dendl;
        mds->sessionmap.set_state(session, Session::STATE_OPEN);
        mds->sessionmap.touch_session(session);
-       mds->messenger->send_message(new MClientSession(CEPH_SESSION_OPEN), session->inst);
+       session->preopen_out_queue.push_back(new MClientSession(CEPH_SESSION_OPEN));
       }
     } else {
       dout(10) << "force_open_sessions skipping already-open " << session->inst << dendl;
index 6bca3611e59e39b8b9ab672feee454ce514aefc7..4735fed4a097bfa974b76fd437595b22b7f1a506 100644 (file)
@@ -86,6 +86,8 @@ public:
   Connection *connection;
   xlist<Session*>::item item_session_list;
 
+  list<Message*> preopen_out_queue;  ///< messages for client, queued before they connect
+
   elist<MDRequest*> requests;
 
   interval_set<inodeno_t> pending_prealloc_inos; // journaling prealloc, will be added to prealloc_inos
@@ -187,6 +189,10 @@ public:
     lease_seq(0) { }
   ~Session() {
     assert(!item_session_list.is_on_list());
+    while (!preopen_out_queue.empty()) {
+      preopen_out_queue.front()->put();
+      preopen_out_queue.pop_front();
+    }
   }
 
   void clear() {