]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mds: add NEW session state
authorSage Weil <sage@newdream.net>
Wed, 23 Dec 2009 19:52:19 +0000 (11:52 -0800)
committerSage Weil <sage@newdream.net>
Tue, 29 Dec 2009 04:05:32 +0000 (20:05 -0800)
We want to register new sessions in the map so that they can be found when
we replay ESession events in the log.  Otherwise a client that connects
to the mds early will have one Session* on the connection, and another in
the map, and reconnect will fail.

Make sure we remove NEW sessions from the map on connection reset, so that
things gets freed.

Also, take mds_lock in ms_verify_authorizer.

src/mds/MDS.cc
src/mds/Server.cc
src/mds/SessionMap.h
src/mds/journal.cc

index d3d51c4f17bbc72145a3892e4b88990ca0e91ed6..6120b4c2f679ad75af23d71ae6d9420a305008d4 100644 (file)
@@ -1504,8 +1504,10 @@ bool MDS::ms_handle_reset(Connection *con)
     objecter->ms_handle_reset(con);
   } else if (con->get_peer_type() == CEPH_ENTITY_TYPE_CLIENT) {
     Session *session = (Session *)con->get_priv();
-    if (!session || session->is_closed())
+    if (!session || session->is_closed() || session->is_new())
       messenger->mark_down(con->get_peer_addr());
+    if (session->is_new())
+      sessionmap.remove_session(session);
     if (session)
       session->put();
   }
@@ -1524,6 +1526,8 @@ bool MDS::ms_verify_authorizer(Connection *con, int peer_type,
                               int protocol, bufferlist& authorizer_data, bufferlist& authorizer_reply,
                               bool& is_valid)
 {
+  Mutex::Locker l(mds_lock);
+
   AuthAuthorizeHandler *authorize_handler = get_authorize_handler(protocol);
   if (!authorize_handler) {
     is_valid = false;
@@ -1545,6 +1549,7 @@ bool MDS::ms_verify_authorizer(Connection *con, int peer_type,
       s->inst.name = n;
       dout(10) << " new session " << s << " for " << s->inst << dendl;
       con->set_priv(s);
+      sessionmap.add_session(s);
     } else {
       dout(10) << " existing session " << s << " for " << s->inst << dendl;
       con->set_priv(s->get());
index 2e2885a1374fa1047c97e1c1dc9e75464afc98cd..cc67fec0834d9907fe9c7424596b3d0e5982277c 100644 (file)
@@ -163,7 +163,7 @@ Session *Server::get_session(Message *m)
 {
   Session *session = (Session *)m->get_connection()->get_priv();
   if (session) {
-    dout(20) << "get_session got " << session->inst << " from message" << dendl;
+    dout(20) << "get_session have " << session << " " << session->inst << " state " << session->get_state() << dendl;
     session->put();  // not carry ref
   } else {
     dout(20) << "get_session dne for " << m->get_source_inst() << dendl;
@@ -308,7 +308,7 @@ version_t Server::prepare_force_open_sessions(map<client_t,entity_inst_t>& cm)
           << " on " << cm.size() << " clients"
           << dendl;
   for (map<client_t,entity_inst_t>::iterator p = cm.begin(); p != cm.end(); ++p) {
-    Session *session = mds->sessionmap.get_or_add_session(p->second);
+    Session *session = mds->sessionmap.get_or_add_open_session(p->second);
     if (session->is_closed() || session->is_closing())
       mds->sessionmap.set_state(session, Session::STATE_OPENING);
     mds->sessionmap.touch_session(session);
@@ -488,10 +488,14 @@ void Server::handle_client_reconnect(MClientReconnect *m)
       dout(1) << " no longer in reconnect state, ignoring reconnect, sending close" << dendl;
       ss << "denied reconnect attempt (mds is " << ceph_mds_state_name(mds->get_state())
         << ") from " << m->get_source_inst();
-    } else {
+    } else if (!session) {
       dout(1) << " no session for " << m->get_source() << ", ignoring reconnect, sending close" << dendl;
       ss << "denied reconnect attempt from " << m->get_source_inst() << " (no session)";
-    }
+    } else if (session->is_closed()) {
+      dout(1) << " no session for " << m->get_source() << ", ignoring reconnect, sending close" << dendl;
+      ss << "denied reconnect attempt from " << m->get_source_inst() << " (session closed)";
+    } else
+      assert(0);
     ss << " after " << delay << " (allowed interval " << g_conf.mds_reconnect_timeout << ")";
     mds->logclient.log(LOG_INFO, ss);
     mds->messenger->send_message(new MClientSession(CEPH_SESSION_CLOSE), m->get_source_inst());
index d383abffb89f31ec1c59e2ebb062d3e67b7de1a9..14b8bc9d30058569b1d5638213d9ff34d8cece29 100644 (file)
@@ -39,13 +39,14 @@ class MDRequest;
 class Session : public RefCountedObject {
   // -- state etc --
 public:
-  static const int STATE_CLOSED = 0;
+  static const int STATE_NEW = 0;
   static const int STATE_OPENING = 1;   // journaling open
   static const int STATE_OPEN = 2;
   static const int STATE_CLOSING = 3;   // journaling close
   static const int STATE_STALE = 4;
   static const int STATE_STALE_PURGING = 5;
   static const int STATE_STALE_CLOSING = 6;
+  static const int STATE_CLOSED = 7;
 
 private:
   int state;
@@ -87,13 +88,15 @@ public:
 
   client_t get_client() { return client_t(inst.name.num()); }
 
-  bool is_closed() { return state == STATE_CLOSED; }
+  int get_state() { return state; }
+  bool is_new() { return state == STATE_NEW; }
   bool is_opening() { return state == STATE_OPENING; }
   bool is_open() { return state == STATE_OPEN; }
   bool is_closing() { return state == STATE_CLOSING; }
   bool is_stale() { return state == STATE_STALE; }
   bool is_stale_purging() { return state == STATE_STALE_PURGING; }
   bool is_stale_closing() { return state == STATE_STALE_CLOSING; }
+  bool is_closed() { return state == STATE_CLOSED; }
 
   // -- caps --
 private:
@@ -134,7 +137,7 @@ public:
 
 
   Session() : 
-    state(STATE_CLOSED), 
+    state(STATE_NEW), 
     session_list_item(this),
     cap_push_seq(0) { }
   ~Session() {
@@ -205,16 +208,19 @@ public:
       return session_map[w];
     return 0;
   }
-  Session* get_or_add_session(entity_inst_t i) {
+  Session* get_or_add_open_session(entity_inst_t i) {
     if (session_map.count(i.name))
       return session_map[i.name];
     Session *s = session_map[i.name] = new Session;
     s->inst = i;
+    set_state(s, Session::STATE_OPEN);
+    s->last_cap_renew = g_clock.now();
     return s;
   }
   void add_session(Session *s) {
     assert(session_map.count(s->inst.name) == 0);
     session_map[s->inst.name] = s;
+    by_state[s->state].push_back(&s->session_list_item);
     s->get();
   }
   void remove_session(Session *s) {
@@ -261,11 +267,8 @@ public:
   void open_sessions(map<client_t,entity_inst_t>& client_map) {
     for (map<client_t,entity_inst_t>::iterator p = client_map.begin(); 
         p != client_map.end(); 
-        ++p) {
-      Session *session = get_or_add_session(p->second);
-      session->inst = p->second;
-      set_state(session, Session::STATE_OPEN);
-    }
+        ++p)
+      get_or_add_open_session(p->second);
     version++;
   }
 
index fe59ba5f9f67d3fd40ac544352a8624d2eb2ba5a..542e597903689decd645af33235b43f7686c3058 100644 (file)
@@ -709,9 +709,8 @@ void ESession::replay(MDS *mds)
     mds->sessionmap.projected = ++mds->sessionmap.version;
     assert(mds->sessionmap.version == cmapv);
     if (open) {
-      Session *session = mds->sessionmap.get_or_add_session(client_inst);
-      session->last_cap_renew = g_clock.now();
-      mds->sessionmap.set_state(session, Session::STATE_OPEN);
+      Session *session = mds->sessionmap.get_or_add_open_session(client_inst);
+      dout(10) << "session " << session << " state " << session->get_state() << dendl;
     } else {
       Session *session = mds->sessionmap.get_session(client_inst.name);
       if (session)