]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
client: instantiate MetaSession for opening connections
authorSage Weil <sage@inktank.com>
Fri, 8 Mar 2013 20:56:54 +0000 (12:56 -0800)
committerSage Weil <sage@inktank.com>
Wed, 13 Mar 2013 23:36:56 +0000 (16:36 -0700)
The previous kludge where a waiting_for_session key indicated that we
had an open in progress was... kludgey.

Introduce some helpers to do the session creation/open.

Move the waiting list to be a session member, and clean up associated
code.

Signed-off-by: Sage Weil <sage@inktank.com>
src/client/Client.cc
src/client/Client.h
src/client/MetaSession.cc
src/client/MetaSession.h

index 43930bd8cc33b4856ca1cf3bb45f1b197888cfb3..f115e10aca986dfb6ff11d4eb5e45a58036d3ad6 100644 (file)
@@ -1111,31 +1111,23 @@ void Client::connect_mds_targets(int mds)
        q != info.export_targets.end();
        q++) {
     if (mds_sessions.count(*q) == 0 &&
-       waiting_for_session.count(*q) == 0 &&
        mdsmap->is_clientreplay_or_active_or_stopping(*q)) {
       ldout(cct, 10) << "check_mds_sessions opening mds." << mds
-              << " export target mds." << *q << dendl;
-      messenger->send_message(new MClientSession(CEPH_SESSION_REQUEST_OPEN),
-                             mdsmap->get_inst(*q));
-      waiting_for_session[*q].size();
+                    << " export target mds." << *q << dendl;
+      _open_mds_session(*q);
     }
   }
 }
 
 void Client::dump_mds_sessions(Formatter *f)
 {
-  f->open_array_section("open_sessions");
+  f->open_array_section("sessions");
   for (map<int,MetaSession*>::const_iterator p = mds_sessions.begin(); p != mds_sessions.end(); ++p) {
     f->open_object_section("session");
     p->second->dump(f);
     f->close_section();
   }
   f->close_section();
-  f->open_array_section("opening_sessions");
-  for (map<int,list<Cond*> >::const_iterator p = waiting_for_session.begin(); p != waiting_for_session.end(); ++p) {
-    f->dump_int("mds", p->first);
-  }
-  f->close_section();
   f->dump_int("mdsmap_epoch", mdsmap->get_epoch());
 }
 void Client::dump_mds_requests(Formatter *f)
@@ -1286,7 +1278,7 @@ int Client::make_request(MetaRequest *request,
     }
 
     // open a session?
-    if (mds_sessions.count(mds) == 0) {
+    if (!have_open_session(mds)) {
       Cond cond;
 
       if (!mdsmap->is_active_or_stopping(mds)) {
@@ -1301,20 +1293,18 @@ int Client::make_request(MetaRequest *request,
        }
       }
       
-      if (waiting_for_session.count(mds) == 0) {
-       ldout(cct, 10) << "opening session to mds." << mds << dendl;
-       messenger->send_message(new MClientSession(CEPH_SESSION_REQUEST_OPEN),
-                               mdsmap->get_inst(mds));
-      }
-      
+      MetaSession *session = _get_or_open_mds_session(mds);
+
       // wait
-      waiting_for_session[mds].push_back(&cond);
-      while (waiting_for_session.count(mds)) {
-       ldout(cct, 10) << "waiting for session to mds." << mds << " to open" << dendl;
-       cond.Wait(client_lock);
+      if (session->state == MetaSession::STATE_OPENING) {
+       session->waiting_for_open.push_back(&cond);
+       while (session->state == MetaSession::STATE_OPENING) {
+         ldout(cct, 10) << "waiting for session to mds." << mds << " to open" << dendl;
+         cond.Wait(client_lock);
+       }
       }
 
-      if (mds_sessions.count(mds) == 0)
+      if (!have_open_session(mds))
        continue;
     }
 
@@ -1461,6 +1451,35 @@ void Client::encode_cap_releases(MetaRequest *req, int mds)
           << req << ", mds " << mds <<dendl;
 }
 
+bool Client::have_open_session(int mds)
+{
+  return
+    mds_sessions.count(mds) &&
+    mds_sessions[mds]->state == MetaSession::STATE_OPEN;
+}
+
+MetaSession *Client::_get_or_open_mds_session(int mds)
+{
+  if (mds_sessions.count(mds))
+    return mds_sessions[mds];
+  return _open_mds_session(mds);
+}
+
+MetaSession *Client::_open_mds_session(int mds)
+{
+  ldout(cct, 10) << "_open_mds_session mds." << mds << dendl;
+  assert(mds_sessions.count(mds) == 0);
+  MetaSession *session = new MetaSession;
+  session->mds_num = mds;
+  session->seq = 0;
+  session->inst = mdsmap->get_inst(mds);
+  session->state = MetaSession::STATE_OPENING;
+  mds_sessions[mds] = session;
+  messenger->send_message(new MClientSession(CEPH_SESSION_REQUEST_OPEN),
+                         session->inst);
+  return session;
+}
+
 void Client::_closed_mds_session(int mds, MetaSession *s)
 {
   mount_cond.Signal();
@@ -1475,62 +1494,41 @@ void Client::handle_client_session(MClientSession *m)
 {
   int from = m->get_source().num();
   ldout(cct, 10) << "handle_client_session " << *m << " from mds." << from << dendl;
-  MetaSession *mds_session = NULL;
-  if (mds_sessions.count(from))
-    mds_session = mds_sessions[from];
+
+  if (mds_sessions.count(from) == 0) {
+    ldout(cct, 10) << " discarding session message from sessionless mds " << m->get_source_inst() << dendl;
+    m->put();
+    return;
+  }
+  MetaSession *session = mds_sessions[from];
 
   switch (m->get_op()) {
   case CEPH_SESSION_OPEN:
-    if (!mds_session) {
-      mds_sessions[from] = mds_session = new MetaSession();
-      mds_session->mds_num = from;
-      mds_session->seq = 0;
-      mds_session->inst = m->get_source_inst();
-    }
-    renew_caps(from);
+    renew_caps(session);
     if (unmounting) {
-      mds_session->closing = true;
-      messenger->send_message(new MClientSession(CEPH_SESSION_REQUEST_CLOSE, mds_session->seq),
+      session->state = MetaSession::STATE_CLOSING;
+      messenger->send_message(new MClientSession(CEPH_SESSION_REQUEST_CLOSE, session->seq),
                               mdsmap->get_inst(from));
     } else {
+      session->state = MetaSession::STATE_OPEN;
       connect_mds_targets(from);
     }
     break;
 
   case CEPH_SESSION_CLOSE:
-    _closed_mds_session(from, mds_session);
+    _closed_mds_session(from, session);
     break;
 
   case CEPH_SESSION_RENEWCAPS:
-    if (!mds_session) {
-      ldout(cct, 0) << "handle_client_session " << *m
-                   << " no open session for mds." << from << dendl;
-      break;
-    }
-    if (mds_session->cap_renew_seq == m->get_seq()) {
-      mds_session->cap_ttl =
-       mds_session->last_cap_renew_request + mdsmap->get_session_timeout();
+    if (session->cap_renew_seq == m->get_seq()) {
+      session->cap_ttl =
+       session->last_cap_renew_request + mdsmap->get_session_timeout();
       wake_inode_waiters(from);
     }
     break;
 
   case CEPH_SESSION_STALE:
-    /* check that if we don't have a valid mds_session for this mds,
-     * we're still waiting for the open session reply */
-    if (mds_session) {
-      renew_caps(from);
-    } else {
-      if (waiting_for_session.count(from) != 0) {
-       ldout(cct, 10) << "handle_client_session " << *m
-                      << " still waiting for open session reply" << dendl;
-       // don't signal waiters, just return
-       m->put();
-       return;
-      } else {
-       // how can we get a stale mds without an open or pending session?
-       assert(mds_session);
-      }
-    }
+    renew_caps(session);
     break;
 
   case CEPH_SESSION_RECALL_STATE:
@@ -1542,8 +1540,7 @@ void Client::handle_client_session(MClientSession *m)
   }
 
   // kick waiting threads
-  signal_cond_list(waiting_for_session[from]);
-  waiting_for_session.erase(from);
+  signal_cond_list(session->waiting_for_open);
 
   m->put();
 }
@@ -1843,6 +1840,7 @@ void Client::handle_mds_map(MMDSMap* m)
       if (oldstate < MDSMap::STATE_ACTIVE) {
        kick_requests(p->first, false);
        kick_flushing_caps(p->first);
+       signal_cond_list(p->second->waiting_for_open);
       }
       connect_mds_targets(p->first);
     }
@@ -1853,20 +1851,6 @@ void Client::handle_mds_map(MMDSMap* m)
   ls.swap(waiting_for_mdsmap);
   signal_cond_list(ls);
 
-  map<int,list<Cond*> >::iterator p = waiting_for_session.begin();
-  while (p != waiting_for_session.end()) {
-    int oldstate = oldmap->get_state(p->first);
-    int newstate = mdsmap->get_state(p->first);
-    if (newstate >= MDSMap::STATE_ACTIVE &&
-       oldstate < MDSMap::STATE_ACTIVE) {
-      ldout(cct, 20) << "kick_opening_mds_session mds." << p->first << dendl;
-      signal_cond_list(p->second);
-      waiting_for_session.erase(p++);
-    } else {
-      p++;
-    }
-  }
-
   delete oldmap;
   m->put();
 
@@ -1977,7 +1961,7 @@ void Client::got_mds_push(int mds)
 
   s->seq++;
   ldout(cct, 10) << " mds." << mds << " seq now " << s->seq << dendl;
-  if (s->closing)
+  if (s->state == MetaSession::STATE_CLOSING)
     messenger->send_message(new MClientSession(CEPH_SESSION_REQUEST_CLOSE, s->seq),
                            s->inst);
 }
@@ -3750,8 +3734,8 @@ void Client::unmount()
        ++p) {
     ldout(cct, 2) << "sending client_session close to mds." << p->first
            << " seq " << p->second->seq << dendl;
-    if (!p->second->closing) {
-      p->second->closing = true;
+    if (p->second->state != MetaSession::STATE_CLOSING) {
+      p->second->state = MetaSession::STATE_CLOSING;
       messenger->send_message(new MClientSession(CEPH_SESSION_REQUEST_CLOSE, p->second->seq),
                               mdsmap->get_inst(p->first));
     }
@@ -3839,19 +3823,17 @@ void Client::renew_caps()
        p++) {
     ldout(cct, 15) << "renew_caps requesting from mds." << p->first << dendl;
     if (mdsmap->get_state(p->first) >= MDSMap::STATE_REJOIN)
-      renew_caps(p->first);
+      renew_caps(p->second);
   }
 }
 
-void Client::renew_caps(const int mds)
+void Client::renew_caps(MetaSession *session)
 {
-  ldout(cct, 10) << "renew_caps mds." << mds << dendl;
-  MetaSession *session = mds_sessions[mds];
+  ldout(cct, 10) << "renew_caps mds." << session->mds_num << dendl;
   session->last_cap_renew_request = ceph_clock_now(cct);
   uint64_t seq = ++session->cap_renew_seq;
   messenger->send_message(new MClientSession(CEPH_SESSION_REQUEST_RENEWCAPS, seq),
-                         mdsmap->get_inst(mds));
-
+                         session->inst);
 }
 
 
@@ -7782,7 +7764,7 @@ void Client::ms_handle_remote_reset(Connection *con)
        }
       }
       if (mds >= 0) {
-       if (s->closing) {
+       if (s->state == MetaSession::STATE_CLOSING) {
          ldout(cct, 1) << "reset from mds we were closing; we'll call that closed" << dendl;
          _closed_mds_session(mds, s);
        }
index d44b87e202b77d33d171e644b0de18d44628fa66..63b699c4022811a76db90b136b7dd2dad35cbb81 100644 (file)
@@ -217,7 +217,7 @@ class Client : public Dispatcher {
   Context *tick_event;
   utime_t last_cap_renew;
   void renew_caps();
-  void renew_caps(int s);
+  void renew_caps(MetaSession *session);
   void flush_cap_releases();
 public:
   void tick();
@@ -229,10 +229,12 @@ public:
 
   // mds sessions
   map<int, MetaSession*> mds_sessions;  // mds -> push seq
-  map<int, list<Cond*> > waiting_for_session;
   list<Cond*> waiting_for_mdsmap;
 
+  bool have_open_session(int mds);
   void got_mds_push(int mds);
+  MetaSession *_get_or_open_mds_session(int mds);
+  MetaSession *_open_mds_session(int mds);
   void _closed_mds_session(int mds, MetaSession *s);
   void handle_client_session(MClientSession *m);
   void send_reconnect(int mds);
index 36338c0bd605c7c9815cf5569f6829808420744b..c6a9496710b5b5686ae4a50f8ecdab5fe50ed452 100644 (file)
@@ -7,6 +7,17 @@
 
 #include "common/Formatter.h"
 
+const char *MetaSession::get_state_name() const
+{
+  switch (state) {
+  case STATE_NEW: return "new";
+  case STATE_OPENING: return "opening";
+  case STATE_OPEN: return "open";
+  case STATE_CLOSING: return "closing";
+  default: return "unknown";
+  }
+}
+
 void MetaSession::dump(Formatter *f) const
 {
   f->dump_int("mds", mds_num);
@@ -17,7 +28,7 @@ void MetaSession::dump(Formatter *f) const
   f->dump_stream("last_cap_renew_request") << last_cap_renew_request;
   f->dump_unsigned("cap_renew_seq", cap_renew_seq);
   f->dump_int("num_caps", num_caps);
-  f->dump_int("closing", (int)closing);
+  f->dump_string("state", get_state_name());
   f->dump_int("was_stale", (int)was_stale);
 }
 
index 20a7a588d73279020ebd2c54b3a84b6369460c23..bb3fad241a0d1ffbe800da2aae5de84f24c3939e 100644 (file)
@@ -19,15 +19,24 @@ class MClientCapRelease;
 
 struct MetaSession {
   int mds_num;
+  Connection *con;
   version_t seq;
   uint64_t cap_gen;
   utime_t cap_ttl, last_cap_renew_request;
   uint64_t cap_renew_seq;
   int num_caps;
   entity_inst_t inst;
-  bool closing;
   bool was_stale;
 
+  enum {
+    STATE_NEW,
+    STATE_OPENING,
+    STATE_OPEN,
+    STATE_CLOSING,
+  } state;
+
+  list<Cond*> waiting_for_open;
+
   xlist<Cap*> caps;
   xlist<Inode*> flushing_caps;
   xlist<CapSnap*> flushing_capsnaps;
@@ -36,10 +45,17 @@ struct MetaSession {
 
   MClientCapRelease *release;
   
-  MetaSession() : mds_num(-1), seq(0), cap_gen(0), cap_renew_seq(0), num_caps(0),
-                closing(false), was_stale(false), release(NULL) {}
+  MetaSession()
+    : mds_num(-1), con(NULL),
+      seq(0), cap_gen(0), cap_renew_seq(0), num_caps(0),
+      was_stale(false),
+      state(STATE_NEW),
+      release(NULL)
+  {}
   ~MetaSession();
 
+  const char *get_state_name() const;
+
   void dump(Formatter *f) const;
 };