]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mds: new OMAP storage for sessionmap 3718/head
authorJohn Spray <john.spray@redhat.com>
Thu, 12 Feb 2015 10:06:27 +0000 (10:06 +0000)
committerJohn Spray <john.spray@redhat.com>
Fri, 6 Mar 2015 11:03:56 +0000 (11:03 +0000)
Fixes: #10649
Signed-off-by: John Spray <john.spray@redhat.com>
src/common/config_opts.h
src/mds/Server.cc
src/mds/SessionMap.cc
src/mds/SessionMap.h
src/mds/journal.cc
src/test/encoding/types.h
src/tools/cephfs/TableTool.cc

index 6f3fca80ffc62e939dd040c4add9675b4e966712..909d0c8b2e22f19cf8c30d8024e7869f77dac563 100644 (file)
@@ -357,6 +357,7 @@ OPTION(mds_beacon_grace, OPT_FLOAT, 15)
 OPTION(mds_enforce_unique_name, OPT_BOOL, true)
 OPTION(mds_blacklist_interval, OPT_FLOAT, 24.0*60.0)  // how long to blacklist failed nodes
 OPTION(mds_session_timeout, OPT_FLOAT, 60)    // cap bits and leases time out if client idle
+OPTION(mds_sessionmap_keys_per_op, OPT_U32, 1024)    // how many sessions should I try to load/store in a single OMAP operation?
 OPTION(mds_revoke_cap_timeout, OPT_FLOAT, 60)    // detect clients which aren't revoking caps
 OPTION(mds_recall_state_timeout, OPT_FLOAT, 60)    // detect clients which aren't trimming caps
 OPTION(mds_freeze_tree_timeout, OPT_FLOAT, 30)    // detecting freeze tree deadlock
index 846ade3cb0aaa768e4f571865a61b82467bd6733..176760b616a963db6b2555fe65ea16723dd1cdc7 100644 (file)
@@ -265,9 +265,9 @@ void Server::handle_client_session(MClientSession *m)
     if (session->is_closed())
       mds->sessionmap.add_session(session);
 
+    pv = mds->sessionmap.mark_projected(session);
     sseq = mds->sessionmap.set_state(session, Session::STATE_OPENING);
     mds->sessionmap.touch_session(session);
-    pv = ++mds->sessionmap.projected;
     mdlog->start_submit_entry(new ESession(m->get_source_inst(), true, pv, m->client_meta),
                              new C_MDS_session_finish(mds, session, sseq, true, pv));
     mdlog->flush();
@@ -357,6 +357,8 @@ void Server::_session_logged(Session *session, uint64_t state_seq, bool open, ve
   dout(10) << "_session_logged " << session->info.inst << " state_seq " << state_seq << " " << (open ? "open":"close")
           << " " << pv << dendl;
 
+  mds->sessionmap.mark_dirty(session);
+
   if (piv) {
     mds->inotable->apply_release_ids(inos);
     assert(mds->inotable->get_version() == piv);
@@ -430,18 +432,29 @@ void Server::_session_logged(Session *session, uint64_t state_seq, bool open, ve
   } else {
     assert(0);
   }
-  mds->sessionmap.version++;  // noop
 }
 
+/**
+ * Inject sessions from some source other than actual connections.
+ *
+ * For example:
+ *  - sessions inferred from journal replay
+ *  - sessions learned from other MDSs during rejoin
+ *  - sessions learned from other MDSs during dir/caps migration
+ *  - sessions learned from other MDSs during a cross-MDS rename
+ */
 version_t Server::prepare_force_open_sessions(map<client_t,entity_inst_t>& cm,
                                              map<client_t,uint64_t>& sseqmap)
 {
-  version_t pv = ++mds->sessionmap.projected;
+  version_t pv = mds->sessionmap.get_projected();
+
   dout(10) << "prepare_force_open_sessions " << pv 
           << " on " << cm.size() << " clients"
           << dendl;
   for (map<client_t,entity_inst_t>::iterator p = cm.begin(); p != cm.end(); ++p) {
+
     Session *session = mds->sessionmap.get_or_add_session(p->second);
+    pv = mds->sessionmap.mark_projected(session);
     if (session->is_closed() || 
        session->is_closing() ||
        session->is_killing())
@@ -451,7 +464,6 @@ version_t Server::prepare_force_open_sessions(map<client_t,entity_inst_t>& cm,
             session->is_opening() ||
             session->is_stale());
     session->inc_importing();
-//  mds->sessionmap.touch_session(session);
   }
   return pv;
 }
@@ -466,8 +478,13 @@ void Server::finish_force_open_sessions(map<client_t,entity_inst_t>& cm,
    * trying to force open a session...  
    */
   dout(10) << "finish_force_open_sessions on " << cm.size() << " clients,"
-          << " v " << mds->sessionmap.version << " -> " << (mds->sessionmap.version+1) << dendl;
+          << " initial v " << mds->sessionmap.get_version() << dendl;
+  
+
+  int sessions_inserted = 0;
   for (map<client_t,entity_inst_t>::iterator p = cm.begin(); p != cm.end(); ++p) {
+    sessions_inserted++;
+
     Session *session = mds->sessionmap.get_session(p->second.name);
     assert(session);
     
@@ -487,10 +504,15 @@ void Server::finish_force_open_sessions(map<client_t,entity_inst_t>& cm,
       dout(10) << "force_open_sessions skipping already-open " << session->info.inst << dendl;
       assert(session->is_open() || session->is_stale());
     }
-    if (dec_import)
+
+    if (dec_import) {
       session->dec_importing();
+    }
+
+    mds->sessionmap.mark_dirty(session);
   }
-  mds->sessionmap.version++;
+
+  dout(10) << __func__ << ": final v " << mds->sessionmap.get_version() << dendl;
 }
 
 class C_MDS_TerminatedSessions : public ServerContext {
@@ -612,7 +634,7 @@ void Server::kill_session(Session *session, Context *on_safe)
 void Server::journal_close_session(Session *session, int state, Context *on_safe)
 {
   uint64_t sseq = mds->sessionmap.set_state(session, state);
-  version_t pv = ++mds->sessionmap.projected;
+  version_t pv = mds->sessionmap.mark_projected(session);
   version_t piv = 0;
 
   // release alloc and pending-alloc inos for this session
@@ -2135,7 +2157,8 @@ CInode* Server::prepare_new_inode(MDRequestRef& mdr, CDir *dir, inodeno_t useino
   if (mdr->session->info.prealloc_inos.size()) {
     mdr->used_prealloc_ino = 
       in->inode.ino = mdr->session->take_ino(useino);  // prealloc -> used
-    mds->sessionmap.projected++;
+    mds->sessionmap.mark_projected(mdr->session);
+
     dout(10) << "prepare_new_inode used_prealloc " << mdr->used_prealloc_ino
             << " (" << mdr->session->info.prealloc_inos
             << ", " << mdr->session->info.prealloc_inos.size() << " left)"
@@ -2159,7 +2182,7 @@ CInode* Server::prepare_new_inode(MDRequestRef& mdr, CDir *dir, inodeno_t useino
     mds->inotable->project_alloc_ids(mdr->prealloc_inos, got);
     assert(mdr->prealloc_inos.size());  // or else fix projected increment semantics
     mdr->session->pending_prealloc_inos.insert(mdr->prealloc_inos);
-    mds->sessionmap.projected++;
+    mds->sessionmap.mark_projected(mdr->session);
     dout(10) << "prepare_new_inode prealloc " << mdr->prealloc_inos << dendl;
   }
 
@@ -2224,14 +2247,14 @@ CInode* Server::prepare_new_inode(MDRequestRef& mdr, CDir *dir, inodeno_t useino
 
 void Server::journal_allocated_inos(MDRequestRef& mdr, EMetaBlob *blob)
 {
-  dout(20) << "journal_allocated_inos sessionmapv " << mds->sessionmap.projected
+  dout(20) << "journal_allocated_inos sessionmapv " << mds->sessionmap.get_projected()
           << " inotablev " << mds->inotable->get_projected_version()
           << dendl;
   blob->set_ino_alloc(mdr->alloc_ino,
                      mdr->used_prealloc_ino,
                      mdr->prealloc_inos,
                      mdr->client_request->get_source(),
-                     mds->sessionmap.projected,
+                     mds->sessionmap.get_projected(),
                      mds->inotable->get_projected_version());
 }
 
@@ -2249,13 +2272,13 @@ void Server::apply_allocated_inos(MDRequestRef& mdr)
     assert(session);
     session->pending_prealloc_inos.subtract(mdr->prealloc_inos);
     session->info.prealloc_inos.insert(mdr->prealloc_inos);
-    mds->sessionmap.version++;
+    mds->sessionmap.mark_dirty(session);
     mds->inotable->apply_alloc_ids(mdr->prealloc_inos);
   }
   if (mdr->used_prealloc_ino) {
     assert(session);
     session->info.used_inos.erase(mdr->used_prealloc_ino);
-    mds->sessionmap.version++;
+    mds->sessionmap.mark_dirty(session);
   }
 }
 
@@ -6125,7 +6148,7 @@ void Server::handle_client_rename(MDRequestRef& mdr)
   
   _rename_prepare(mdr, &le->metablob, &le->client_map, srcdn, destdn, straydn);
   if (le->client_map.length())
-    le->cmapv = mds->sessionmap.projected;
+    le->cmapv = mds->sessionmap.get_projected();
 
   // -- commit locally --
   C_MDS_rename_finish *fin = new C_MDS_rename_finish(mds, mdr, srcdn, destdn, straydn);
index 4fa43d9cd544a79d3045e8dc7043dfc0d602d329..6651ac76e72af3bda1205d4bc60ed841525f2cee 100644 (file)
@@ -69,13 +69,145 @@ object_t SessionMap::get_object_name()
 
 class C_IO_SM_Load : public SessionMapIOContext {
 public:
-  bufferlist bl;
-  C_IO_SM_Load(SessionMap *cm) : SessionMapIOContext(cm) {}
+  const bool first;  //< Am I the initial (header) load?
+  int header_r;  //< Return value from OMAP header read
+  int values_r;  //< Return value from OMAP value read
+  bufferlist header_bl;
+  std::map<std::string, bufferlist> session_vals;
+
+  C_IO_SM_Load(SessionMap *cm, const bool f)
+    : SessionMapIOContext(cm), first(f), header_r(0), values_r(0) {}
+
   void finish(int r) {
-    sessionmap->_load_finish(r, bl);
+    sessionmap->_load_finish(r, header_r, values_r, first, header_bl, session_vals);
   }
 };
 
+
+/**
+ * Decode OMAP header.  Call this once when loading.
+ */
+void SessionMapStore::decode_header(
+      bufferlist &header_bl)
+{
+  bufferlist::iterator q = header_bl.begin();
+  DECODE_START(1, q)
+  ::decode(version, q);
+  DECODE_FINISH(q);
+}
+
+void SessionMapStore::encode_header(
+    bufferlist *header_bl)
+{
+  ENCODE_START(1, 1, *header_bl);
+  ::encode(version, *header_bl);
+  ENCODE_FINISH(*header_bl);
+}
+
+/**
+ * Decode and insert some serialized OMAP values.  Call this
+ * repeatedly to insert batched loads.
+ */
+void SessionMapStore::decode_values(std::map<std::string, bufferlist> &session_vals)
+{
+  for (std::map<std::string, bufferlist>::iterator i = session_vals.begin();
+       i != session_vals.end(); ++i) {
+
+    entity_inst_t inst;
+
+    bool parsed = inst.name.parse(i->first);
+    if (!parsed) {
+      derr << "Corrupt entity name '" << i->first << "' in sessionmap" << dendl;
+      throw buffer::malformed_input("Corrupt entity name in sessionmap");
+    }
+
+    Session *s = get_or_add_session(inst);
+    if (s->is_closed())
+      s->set_state(Session::STATE_OPEN);
+    bufferlist::iterator q = i->second.begin();
+    s->decode(q);
+  }
+}
+
+/**
+ * An OMAP read finished.
+ */
+void SessionMap::_load_finish(
+    int operation_r,
+    int header_r,
+    int values_r,
+    bool first,
+    bufferlist &header_bl,
+    std::map<std::string, bufferlist> &session_vals)
+{
+  if (operation_r < 0) {
+    derr << "_load_finish got " << cpp_strerror(operation_r) << dendl;
+    assert(0 == "failed to load sessionmap");
+  }
+
+  // Decode header
+  if (first) {
+    if (header_r != 0) {
+      derr << __func__ << ": header error: " << cpp_strerror(header_r) << dendl;
+      assert(0 == "error reading header!");
+    }
+
+    if(header_bl.length() == 0) {
+      dout(4) << __func__ << ": header missing, loading legacy..." << dendl;
+      load_legacy();
+      return;
+    }
+
+    decode_header(header_bl);
+    dout(10) << __func__ << " loaded version " << version << dendl;
+  }
+
+  if (values_r != 0) {
+    derr << __func__ << ": error reading values: "
+      << cpp_strerror(values_r) << dendl;
+    assert(0 == "error reading values");
+  }
+
+  // Decode session_vals
+  decode_values(session_vals);
+
+  if (session_vals.size() == g_conf->mds_sessionmap_keys_per_op) {
+    // Issue another read if we're not at the end of the omap
+    const std::string last_key = session_vals.rbegin()->first;
+    dout(10) << __func__ << ": continue omap load from '"
+             << last_key << "'" << dendl;
+    object_t oid = get_object_name();
+    object_locator_t oloc(mds->mdsmap->get_metadata_pool());
+    C_IO_SM_Load *c = new C_IO_SM_Load(this, false);
+    ObjectOperation op;
+    op.omap_get_vals(last_key, "", g_conf->mds_sessionmap_keys_per_op,
+        &c->session_vals, &c->values_r);
+    mds->objecter->read(oid, oloc, op, CEPH_NOSNAP, NULL, 0,
+        new C_OnFinisher(c, &mds->finisher));
+  } else {
+    // I/O is complete.  Update `by_state`
+    dout(10) << __func__ << ": omap load complete" << dendl;
+    for (ceph::unordered_map<entity_name_t, Session*>::iterator i = session_map.begin();
+         i != session_map.end(); ++i) {
+      Session *s = i->second;
+      if (by_state.count(s->get_state()) == 0)
+        by_state[s->get_state()] = new xlist<Session*>;
+      by_state[s->get_state()]->push_back(&s->item_session_list);
+    }
+
+    // Population is complete.  Trigger load waiters.
+    dout(10) << __func__ << ": v " << version 
+          << ", " << session_map.size() << " sessions" << dendl;
+    projected = committing = committed = version;
+    dump();
+    finish_contexts(g_ceph_context, waiting_for_load);
+  }
+}
+
+/**
+ * Populate session state from OMAP records in this
+ * rank's sessionmap object.
+ */
 void SessionMap::load(MDSInternalContextBase *onload)
 {
   dout(10) << "load" << dendl;
@@ -83,14 +215,47 @@ void SessionMap::load(MDSInternalContextBase *onload)
   if (onload)
     waiting_for_load.push_back(onload);
   
-  C_IO_SM_Load *c = new C_IO_SM_Load(this);
+  C_IO_SM_Load *c = new C_IO_SM_Load(this, true);
   object_t oid = get_object_name();
   object_locator_t oloc(mds->mdsmap->get_metadata_pool());
+
+  ObjectOperation op;
+  op.omap_get_header(&c->header_bl, &c->header_r);
+  op.omap_get_vals("", "", g_conf->mds_sessionmap_keys_per_op,
+      &c->session_vals, &c->values_r);
+
+  mds->objecter->read(oid, oloc, op, CEPH_NOSNAP, NULL, 0, new C_OnFinisher(c, &mds->finisher));
+}
+
+class C_IO_SM_LoadLegacy : public SessionMapIOContext {
+public:
+  bufferlist bl;
+  C_IO_SM_LoadLegacy(SessionMap *cm) : SessionMapIOContext(cm) {}
+  void finish(int r) {
+    sessionmap->_load_legacy_finish(r, bl);
+  }
+};
+
+
+/**
+ * Load legacy (object data blob) SessionMap format, assuming
+ * that waiting_for_load has already been populated with
+ * the relevant completion.  This is the fallback if we do not
+ * find an OMAP header when attempting to load normally.
+ */
+void SessionMap::load_legacy()
+{
+  dout(10) << __func__ << dendl;
+
+  C_IO_SM_LoadLegacy *c = new C_IO_SM_LoadLegacy(this);
+  object_t oid = get_object_name();
+  object_locator_t oloc(mds->mdsmap->get_metadata_pool());
+
   mds->objecter->read_full(oid, oloc, CEPH_NOSNAP, &c->bl, 0,
                           new C_OnFinisher(c, &mds->finisher));
 }
 
-void SessionMap::_load_finish(int r, bufferlist &bl)
+void SessionMap::_load_legacy_finish(int r, bufferlist &bl)
 { 
   bufferlist::iterator blp = bl.begin();
   if (r < 0) {
@@ -98,13 +263,24 @@ void SessionMap::_load_finish(int r, bufferlist &bl)
     assert(0 == "failed to load sessionmap");
   }
   dump();
-  decode(blp);  // note: this sets last_cap_renew = now()
+  decode_legacy(blp);  // note: this sets last_cap_renew = now()
   dout(10) << "_load_finish v " << version 
           << ", " << session_map.size() << " sessions, "
           << bl.length() << " bytes"
           << dendl;
   projected = committing = committed = version;
   dump();
+
+  // Mark all sessions dirty, so that on next save() we will write
+  // a complete OMAP version of the data loaded from the legacy format
+  for (ceph::unordered_map<entity_name_t, Session*>::iterator i = session_map.begin();
+       i != session_map.end(); ++i) {
+    // Don't use mark_dirty because on this occasion we want to ignore the
+    // keys_per_op limit and do one big write (upgrade must be atomic)
+    dirty_sessions.insert(i->first);
+  }
+  loaded_legacy = true;
+
   finish_contexts(g_ceph_context, waiting_for_load);
 }
 
@@ -124,7 +300,7 @@ public:
 
 void SessionMap::save(MDSInternalContextBase *onsave, version_t needv)
 {
-  dout(10) << "save needv " << needv << ", v " << version << dendl;
+  dout(10) << __func__ << ": needv " << needv << ", v " << version << dendl;
  
   if (needv && committing >= needv) {
     assert(committing > committed);
@@ -133,21 +309,76 @@ void SessionMap::save(MDSInternalContextBase *onsave, version_t needv)
   }
 
   commit_waiters[version].push_back(onsave);
-  
-  bufferlist bl;
-  
-  encode(bl);
+
   committing = version;
   SnapContext snapc;
   object_t oid = get_object_name();
   object_locator_t oloc(mds->mdsmap->get_metadata_pool());
 
-  mds->objecter->write_full(oid, oloc,
-                           snapc,
-                           bl, ceph_clock_now(g_ceph_context), 0,
-                           NULL,
-                           new C_OnFinisher(new C_IO_SM_Save(this, version),
-                                            &mds->finisher));
+  ObjectOperation op;
+
+  /* Compose OSD OMAP transaction for full write */
+  bufferlist header_bl;
+  encode_header(&header_bl);
+  op.omap_set_header(header_bl);
+
+  /* If we loaded a legacy sessionmap, then erase the old data.  If
+   * an old-versioned MDS tries to read it, it'll fail out safely
+   * with an end_of_buffer exception */
+  if (loaded_legacy) {
+    dout(4) << __func__ << " erasing legacy sessionmap" << dendl;
+    op.truncate(0);
+    loaded_legacy = false;  // only need to truncate once.
+  }
+
+  dout(20) << " updating keys:" << dendl;
+  map<string, bufferlist> to_set;
+  for(std::set<entity_name_t>::const_iterator i = dirty_sessions.begin();
+      i != dirty_sessions.end(); ++i) {
+    const entity_name_t name = *i;
+    const Session *session = session_map[name];
+
+    if (session->is_open() ||
+       session->is_closing() ||
+       session->is_stale() ||
+       session->is_killing()) {
+      dout(20) << "  " << name << dendl;
+      // Serialize K
+      std::ostringstream k;
+      k << name;
+
+      // Serialize V
+      bufferlist bl;
+      session->info.encode(bl);
+
+      // Add to RADOS op
+      to_set[k.str()] = bl;
+    } else {
+      dout(20) << "  " << name << " (ignoring)" << dendl;
+    }
+  }
+  if (!to_set.empty()) {
+    op.omap_set(to_set);
+  }
+
+  dout(20) << " removing keys:" << dendl;
+  set<string> to_remove;
+  for(std::set<entity_name_t>::const_iterator i = null_sessions.begin();
+      i != null_sessions.end(); ++i) {
+    dout(20) << "  " << *i << dendl;
+    std::ostringstream k;
+    k << *i;
+    to_remove.insert(k.str());
+  }
+  if (!to_remove.empty()) {
+    op.omap_rm_keys(to_remove);
+  }
+
+  dirty_sessions.clear();
+  null_sessions.clear();
+
+  mds->objecter->mutate(oid, oloc, op, snapc, ceph_clock_now(g_ceph_context),
+      0, NULL, new C_OnFinisher(new C_IO_SM_Save(this, version), &mds->finisher));
 }
 
 void SessionMap::_save_finish(version_t v)
@@ -160,37 +391,13 @@ void SessionMap::_save_finish(version_t v)
 }
 
 
-// -------------------
-
-void SessionMapStore::encode(bufferlist& bl) const
-{
-  uint64_t pre = -1;     // for 0.19 compatibility; we forgot an encoding prefix.
-  ::encode(pre, bl);
-
-  ENCODE_START(3, 3, bl);
-  ::encode(version, bl);
-
-  for (ceph::unordered_map<entity_name_t,Session*>::const_iterator p = session_map.begin(); 
-       p != session_map.end(); 
-       ++p) {
-    if (p->second->is_open() ||
-       p->second->is_closing() ||
-       p->second->is_stale() ||
-       p->second->is_killing()) {
-      ::encode(p->first, bl);
-      p->second->info.encode(bl);
-    }
-  }
-  ENCODE_FINISH(bl);
-}
-
 /**
  * Deserialize sessions, and update by_state index
  */
-void SessionMap::decode(bufferlist::iterator &p)
+void SessionMap::decode_legacy(bufferlist::iterator &p)
 {
   // Populate `sessions`
-  SessionMapStore::decode(p);
+  SessionMapStore::decode_legacy(p);
 
   // Update `by_state`
   for (ceph::unordered_map<entity_name_t, Session*>::iterator i = session_map.begin();
@@ -212,7 +419,7 @@ uint64_t SessionMap::set_state(Session *session, int s) {
   return session->get_state_seq();
 }
 
-void SessionMapStore::decode(bufferlist::iterator& p)
+void SessionMapStore::decode_legacy(bufferlist::iterator& p)
 {
   utime_t now = ceph_clock_now(g_ceph_context);
   uint64_t pre;
@@ -354,6 +561,10 @@ void SessionMap::remove_session(Session *s)
   s->item_session_list.remove_myself();
   session_map.erase(s->info.inst.name);
   s->put();
+  if (dirty_sessions.count(s->info.inst.name)) {
+    dirty_sessions.erase(s->info.inst.name);
+  }
+  null_sessions.insert(s->info.inst.name);
 }
 
 void SessionMap::touch_session(Session *session)
@@ -449,3 +660,50 @@ void Session::decode(bufferlist::iterator &p)
   _update_human_name();
 }
 
+void SessionMap::_mark_dirty(Session *s)
+{
+  if (dirty_sessions.size() >= g_conf->mds_sessionmap_keys_per_op) {
+    // Pre-empt the usual save() call from journal segment trim, in
+    // order to avoid building up an oversized OMAP update operation
+    // from too many sessions modified at once
+    save(new C_MDSInternalNoop, version);
+  }
+
+  dirty_sessions.insert(s->info.inst.name);
+}
+
+void SessionMap::mark_dirty(Session *s)
+{
+  dout(20) << __func__ << " s=" << s << " name=" << s->info.inst.name
+    << " v=" << version << dendl;
+
+  _mark_dirty(s);
+  version++;
+  s->pop_pv(version);
+}
+
+void SessionMap::replay_dirty_session(Session *s)
+{
+  dout(20) << __func__ << " s=" << s << " name=" << s->info.inst.name
+    << " v=" << version << dendl;
+
+  _mark_dirty(s);
+
+  replay_advance_version();
+}
+
+void SessionMap::replay_advance_version()
+{
+  version++;
+  projected = version;
+}
+
+version_t SessionMap::mark_projected(Session *s)
+{
+  dout(20) << __func__ << " s=" << s << " name=" << s->info.inst.name
+    << " pv=" << projected << " -> " << projected + 1 << dendl;
+  ++projected;
+  s->push_pv(projected);
+  return projected;
+}
+
index c990adacff93537e03d5dc371863202dd77fb173..e5a78ff8316175db6b08d090d03fb4ab0ce045eb 100644 (file)
@@ -86,8 +86,27 @@ private:
   void _update_human_name();
   std::string human_name;
 
+  // Versions in this this session was projected: used to verify
+  // that appropriate mark_dirty calls follow.
+  std::deque<version_t> projected;
+
 public:
 
+  void push_pv(version_t pv)
+  {
+    if (!projected.empty()) {
+      assert(projected.back() != pv);
+    }
+    projected.push_back(pv);
+  }
+
+  void pop_pv(version_t v)
+  {
+    assert(!projected.empty());
+    assert(projected.front() == v);
+    projected.pop_front();
+  }
+
   inline int get_state() const {return state;}
   void set_state(int new_state)
   {
@@ -154,12 +173,12 @@ public:
   int get_state() { return state; }
   const char *get_state_name() { return get_state_name(state); }
   uint64_t get_state_seq() { return state_seq; }
-  bool is_closed() { return state == STATE_CLOSED; }
-  bool is_opening() { return state == STATE_OPENING; }
-  bool is_open() { return state == STATE_OPEN; }
-  bool is_closing() { return state == STATE_CLOSING; }
-  bool is_stale() { return state == STATE_STALE; }
-  bool is_killing() { return state == STATE_KILLING; }
+  bool is_closed() const { return state == STATE_CLOSED; }
+  bool is_opening() const { return state == STATE_OPENING; }
+  bool is_open() const { return state == STATE_OPEN; }
+  bool is_closing() const { return state == STATE_CLOSING; }
+  bool is_stale() const { return state == STATE_STALE; }
+  bool is_killing() const { return state == STATE_KILLING; }
 
   void inc_importing() {
     ++importing_count;
@@ -253,7 +272,6 @@ public:
     last_cap_renew = utime_t();
 
   }
-
 };
 
 /*
@@ -267,13 +285,18 @@ class MDS;
  * encode/decode outside of live MDS instance.
  */
 class SessionMapStore {
+protected:
+  version_t version;
 public:
   ceph::unordered_map<entity_name_t, Session*> session_map;
-  version_t version;
   mds_rank_t rank;
 
-  virtual void encode(bufferlist& bl) const;
-  virtual void decode(bufferlist::iterator& blp);
+  version_t get_version() const {return version;}
+
+  virtual void encode_header(bufferlist *header_bl);
+  virtual void decode_header(bufferlist &header_bl);
+  virtual void decode_values(std::map<std::string, bufferlist> &session_vals);
+  virtual void decode_legacy(bufferlist::iterator& blp);
   void dump(Formatter *f) const;
 
   void set_rank(mds_rank_t r)
@@ -309,18 +332,45 @@ class SessionMap : public SessionMapStore {
 public:
   MDS *mds;
 
-public:  // i am lazy
+protected:
   version_t projected, committing, committed;
+public:
   map<int,xlist<Session*>* > by_state;
   uint64_t set_state(Session *session, int state);
   map<version_t, list<MDSInternalContextBase*> > commit_waiters;
 
   SessionMap(MDS *m) : mds(m),
-                      projected(0), committing(0), committed(0) 
+                      projected(0), committing(0), committed(0),
+                       loaded_legacy(false)
   { }
 
+  void set_version(const version_t v)
+  {
+    version = projected = v;
+  }
+
+  void set_projected(const version_t v)
+  {
+    projected = v;
+  }
+
+  version_t get_projected() const
+  {
+    return projected;
+  }
+
+  version_t get_committed() const
+  {
+    return committed;
+  }
+
+  version_t get_committing() const
+  {
+    return committed;
+  }
+
   // sessions
-  void decode(bufferlist::iterator& blp);
+  void decode_legacy(bufferlist::iterator& blp);
   bool empty() { return session_map.empty(); }
   const ceph::unordered_map<entity_name_t, Session*> &get_sessions() const
   {
@@ -428,10 +478,62 @@ public:  // i am lazy
   object_t get_object_name();
 
   void load(MDSInternalContextBase *onload);
-  void _load_finish(int r, bufferlist &bl);
+  void _load_finish(
+      int operation_r,
+      int header_r,
+      int values_r,
+      bool first,
+      bufferlist &header_bl,
+      std::map<std::string, bufferlist> &session_vals);
+
+  void load_legacy();
+  void _load_legacy_finish(int r, bufferlist &bl);
+
   void save(MDSInternalContextBase *onsave, version_t needv=0);
   void _save_finish(version_t v);
+
+protected:
+  std::set<entity_name_t> dirty_sessions;
+  std::set<entity_name_t> null_sessions;
+  bool loaded_legacy;
+  void _mark_dirty(Session *session);
+public:
+
+  /**
+   * Advance the version, and mark this session
+   * as dirty within the new version.
+   *
+   * Dirty means journalled but needing writeback
+   * to the backing store.  Must have called
+   * mark_projected previously for this session.
+   */
+  void mark_dirty(Session *session);
+
+  /**
+   * Advance the projected version, and mark this
+   * session as projected within the new version
+   *
+   * Projected means the session is updated in memory
+   * but we're waiting for the journal write of the update
+   * to finish.  Must subsequently call mark_dirty
+   * for sessions in the same global order as calls
+   * to mark_projected.
+   */
+  version_t mark_projected(Session *session);
+
+  /**
+   * During replay, advance versions to account
+   * for a session modification, and mark the
+   * session dirty.
+   */
+  void replay_dirty_session(Session *session);
+
+  /**
+   * During replay, if a session no longer present
+   * would have consumed a version, advance `version`
+   * and `projected` to account for that.
+   */
+  void replay_advance_version();
 };
 
 
index 1870ec70832d3aa24bbdc14125cdc1a517f93431..8b14c012a40ef9883d9c2fdd64e072312e40084b 100644 (file)
@@ -229,10 +229,10 @@ void LogSegment::try_to_expire(MDS *mds, MDSGatherBuilder &gather_bld, int op_pr
   }
 
   // sessionmap
-  if (sessionmapv > mds->sessionmap.committed) {
+  if (sessionmapv > mds->sessionmap.get_committed()) {
     dout(10) << "try_to_expire saving sessionmap, need " << sessionmapv 
-             << ", committed is " << mds->sessionmap.committed
-             << " (" << mds->sessionmap.committing << ")"
+             << ", committed is " << mds->sessionmap.get_committed()
+             << " (" << mds->sessionmap.get_committing() << ")"
              << dendl;
     mds->sessionmap.save(gather_bld.new_sub(), sessionmapv);
   }
@@ -1498,12 +1498,12 @@ void EMetaBlob::replay(MDS *mds, LogSegment *logseg, MDSlaveUpdate *slaveup)
     }
   }
   if (sessionmapv) {
-    if (mds->sessionmap.version >= sessionmapv) {
+    if (mds->sessionmap.get_version() >= sessionmapv) {
       dout(10) << "EMetaBlob.replay sessionmap v " << sessionmapv
-              << " <= table " << mds->sessionmap.version << dendl;
-    } else if (mds->sessionmap.version + 2 >= sessionmapv) {
+              << " <= table " << mds->sessionmap.get_version() << dendl;
+    } else if (mds->sessionmap.get_version() + 2 >= sessionmapv) {
       dout(10) << "EMetaBlob.replay sessionmap v " << sessionmapv
-              << " -(1|2) == table " << mds->sessionmap.version
+              << " -(1|2) == table " << mds->sessionmap.get_version()
               << " prealloc " << preallocated_inos
               << " used " << used_preallocated_ino
               << dendl;
@@ -1524,26 +1524,28 @@ void EMetaBlob::replay(MDS *mds, LogSegment *logseg, MDSlaveUpdate *slaveup)
            assert(i == used_preallocated_ino);
            session->info.used_inos.clear();
          }
-         mds->sessionmap.projected = ++mds->sessionmap.version;
+          mds->sessionmap.replay_dirty_session(session);
        }
        if (!preallocated_inos.empty()) {
          session->info.prealloc_inos.insert(preallocated_inos);
-         mds->sessionmap.projected = ++mds->sessionmap.version;
+          mds->sessionmap.replay_dirty_session(session);
        }
+
       } else {
        dout(10) << "EMetaBlob.replay no session for " << client_name << dendl;
-       if (used_preallocated_ino)
-         mds->sessionmap.projected = ++mds->sessionmap.version;
+       if (used_preallocated_ino) {
+         mds->sessionmap.replay_advance_version();
+        }
        if (!preallocated_inos.empty())
-         mds->sessionmap.projected = ++mds->sessionmap.version;
+         mds->sessionmap.replay_advance_version();
       }
-      assert(sessionmapv == mds->sessionmap.version);
+      assert(sessionmapv == mds->sessionmap.get_version());
     } else {
       mds->clog->error() << "journal replay sessionmap v " << sessionmapv
-                       << " -(1|2) > table " << mds->sessionmap.version << "\n";
+                       << " -(1|2) > table " << mds->sessionmap.get_version() << "\n";
       assert(g_conf->mds_wipe_sessions);
       mds->sessionmap.wipe();
-      mds->sessionmap.version = mds->sessionmap.projected = sessionmapv;
+      mds->sessionmap.set_version(sessionmapv);
     }
   }
 
@@ -1616,14 +1618,12 @@ void ESession::update_segment()
 
 void ESession::replay(MDS *mds)
 {
-  if (mds->sessionmap.version >= cmapv) {
-    dout(10) << "ESession.replay sessionmap " << mds->sessionmap.version 
+  if (mds->sessionmap.get_version() >= cmapv) {
+    dout(10) << "ESession.replay sessionmap " << mds->sessionmap.get_version() 
             << " >= " << cmapv << ", noop" << dendl;
   } else {
-    dout(10) << "ESession.replay sessionmap " << mds->sessionmap.version
+    dout(10) << "ESession.replay sessionmap " << mds->sessionmap.get_version()
             << " < " << cmapv << " " << (open ? "open":"close") << " " << client_inst << dendl;
-    mds->sessionmap.projected = ++mds->sessionmap.version;
-    assert(mds->sessionmap.version == cmapv);
     Session *session;
     if (open) {
       session = mds->sessionmap.get_or_add_session(client_inst);
@@ -1636,6 +1636,7 @@ void ESession::replay(MDS *mds)
        if (session->connection == NULL) {
          dout(10) << " removed session " << session->info.inst << dendl;
          mds->sessionmap.remove_session(session);
+          session = NULL;
        } else {
          session->clear();    // the client has reconnected; keep the Session, but reset
          dout(10) << " reset session " << session->info.inst << " (they reconnected)" << dendl;
@@ -1645,6 +1646,12 @@ void ESession::replay(MDS *mds)
                          << " from time " << stamp << ", ignoring";
       }
     }
+    if (session) {
+      mds->sessionmap.replay_dirty_session(session);
+    } else {
+      mds->sessionmap.replay_advance_version();
+    }
+    assert(mds->sessionmap.get_version() == cmapv);
   }
   
   if (inos.size() && inotablev) {
@@ -1769,15 +1776,15 @@ void ESessions::update_segment()
 
 void ESessions::replay(MDS *mds)
 {
-  if (mds->sessionmap.version >= cmapv) {
-    dout(10) << "ESessions.replay sessionmap " << mds->sessionmap.version
+  if (mds->sessionmap.get_version() >= cmapv) {
+    dout(10) << "ESessions.replay sessionmap " << mds->sessionmap.get_version()
             << " >= " << cmapv << ", noop" << dendl;
   } else {
-    dout(10) << "ESessions.replay sessionmap " << mds->sessionmap.version
+    dout(10) << "ESessions.replay sessionmap " << mds->sessionmap.get_version()
             << " < " << cmapv << dendl;
     mds->sessionmap.open_sessions(client_map);
-    assert(mds->sessionmap.version == cmapv);
-    mds->sessionmap.projected = mds->sessionmap.version;
+    assert(mds->sessionmap.get_version() == cmapv);
+    mds->sessionmap.set_projected(mds->sessionmap.get_version());
   }
   update_segment();
 }
@@ -2035,11 +2042,11 @@ void EUpdate::replay(MDS *mds)
   }
   
   if (client_map.length()) {
-    if (mds->sessionmap.version >= cmapv) {
+    if (mds->sessionmap.get_version() >= cmapv) {
       dout(10) << "EUpdate.replay sessionmap v " << cmapv
-              << " <= table " << mds->sessionmap.version << dendl;
+              << " <= table " << mds->sessionmap.get_version() << dendl;
     } else {
-      dout(10) << "EUpdate.replay sessionmap " << mds->sessionmap.version
+      dout(10) << "EUpdate.replay sessionmap " << mds->sessionmap.get_version()
               << " < " << cmapv << dendl;
       // open client sessions?
       map<client_t,entity_inst_t> cm;
@@ -2049,8 +2056,8 @@ void EUpdate::replay(MDS *mds)
       mds->server->prepare_force_open_sessions(cm, seqm);
       mds->server->finish_force_open_sessions(cm, seqm);
 
-      assert(mds->sessionmap.version == cmapv);
-      mds->sessionmap.projected = mds->sessionmap.version;
+      assert(mds->sessionmap.get_version() == cmapv);
+      mds->sessionmap.set_projected(mds->sessionmap.get_version());
     }
   }
 }
@@ -2854,18 +2861,18 @@ void EImportStart::replay(MDS *mds)
                                            mds_authority_t(mds->get_nodeid(), mds->get_nodeid()));
 
   // open client sessions?
-  if (mds->sessionmap.version >= cmapv) {
-    dout(10) << "EImportStart.replay sessionmap " << mds->sessionmap.version 
+  if (mds->sessionmap.get_version() >= cmapv) {
+    dout(10) << "EImportStart.replay sessionmap " << mds->sessionmap.get_version() 
             << " >= " << cmapv << ", noop" << dendl;
   } else {
-    dout(10) << "EImportStart.replay sessionmap " << mds->sessionmap.version 
+    dout(10) << "EImportStart.replay sessionmap " << mds->sessionmap.get_version() 
             << " < " << cmapv << dendl;
     map<client_t,entity_inst_t> cm;
     bufferlist::iterator blp = client_map.begin();
     ::decode(cm, blp);
     mds->sessionmap.open_sessions(cm);
-    assert(mds->sessionmap.version == cmapv);
-    mds->sessionmap.projected = mds->sessionmap.version;
+    assert(mds->sessionmap.get_version() == cmapv);
+    mds->sessionmap.set_projected(mds->sessionmap.get_version());
   }
   update_segment();
 }
index bcc03472c2d9a11fd6bc0f07efc1ce7bb4c10710..10bc22f2582dfdfa3710ddc5c113430f3c120f8f 100644 (file)
@@ -180,9 +180,6 @@ TYPE(InoTable)
 #include "mds/SnapServer.h"
 TYPEWITHSTRAYDATA(SnapServer)
 
-#include "mds/SessionMap.h"
-TYPE(SessionMapStore)
-
 #include "mds/events/ECommitted.h"
 TYPE(ECommitted)
 #include "mds/events/EExport.h"
index 4b22de04f7231bf4edec8fd0133b10ffcb7edc64..8d257cfd55e31896a6dd97ec1cbedaf4a252ccf7 100644 (file)
@@ -261,14 +261,112 @@ public:
   }
 };
 
+template <typename A>
+class TableHandlerOmap
+{
+private:
+  // The RADOS object ID for the table
+  std::string object_name;
+
+  // The rank in question (may be NONE)
+  mds_rank_t rank;
+
+  // Whether this is an MDSTable subclass (i.e. has leading version field to decode)
+  bool mds_table;
+
+public:
+  TableHandlerOmap(mds_rank_t r, std::string const &name, bool mds_table_)
+    : rank(r), mds_table(mds_table_)
+  {
+    // Compose object name of the table we will dump
+    std::ostringstream oss;
+    oss << "mds";
+    if (rank != MDS_RANK_NONE) {
+      oss << rank;
+    }
+    oss << "_" << name;
+    object_name = oss.str();
+  }
+
+  int load_and_dump(librados::IoCtx *io, Formatter *f)
+  {
+    assert(io != NULL);
+    assert(f != NULL);
+
+    // Read in the header
+    bufferlist header_bl;
+    int r = io->omap_get_header(object_name, &header_bl);
+    if (r != 0) {
+      derr << "error reading header: " << cpp_strerror(r) << dendl;
+      return r;
+    }
+
+    // Decode the header
+    A table_inst;
+    table_inst.set_rank(rank);
+    try {
+      table_inst.decode_header(header_bl);
+    } catch (buffer::error &e) {
+      derr << "table " << object_name << " is corrupt" << dendl;
+      return -EIO;
+    }
+
+    // Read and decode OMAP values in chunks
+    std::string last_key = "";
+    while(true) {
+      std::map<std::string, bufferlist> values;
+      int r = io->omap_get_vals(object_name, last_key,
+          g_conf->mds_sessionmap_keys_per_op, &values);
+
+      if (r != 0) {
+        derr << "error reading values: " << cpp_strerror(r) << dendl;
+        return r;
+      }
+
+      if (values.empty()) {
+        break;
+      }
+
+      try {
+        table_inst.decode_values(values);
+      } catch (buffer::error &e) {
+        derr << "table " << object_name << " is corrupt" << dendl;
+        return -EIO;
+      }
+      last_key = values.rbegin()->first;
+    }
+
+    table_inst.dump(f);
+
+    return 0;
+  }
+
+  int reset(librados::IoCtx *io)
+  {
+    A table_inst;
+    table_inst.set_rank(rank);
+    table_inst.reset_state();
+
+    bufferlist header_bl;
+    table_inst.encode_header(&header_bl);
+
+    // Compose a transaction to clear and write header
+    librados::ObjectWriteOperation op;
+    op.omap_clear();
+    op.omap_set_header(header_bl);
+    
+    return io->operate(object_name, &op);
+  }
+};
+
 int TableTool::_show_session_table(mds_rank_t rank, Formatter *f)
 {
-  return TableHandler<SessionMapStore>(rank, "sessionmap", false).load_and_dump(&io, f);
+  return TableHandlerOmap<SessionMapStore>(rank, "sessionmap", false).load_and_dump(&io, f);
 }
 
 int TableTool::_reset_session_table(mds_rank_t rank, Formatter *f)
 {
-  return TableHandler<SessionMapStore>(rank, "sessionmap", false).reset(&io);
+  return TableHandlerOmap<SessionMapStore>(rank, "sessionmap", false).reset(&io);
 }
 
 int TableTool::_show_ino_table(mds_rank_t rank, Formatter *f)