]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mon: update for multiple filesystems
authorJohn Spray <john.spray@redhat.com>
Wed, 17 Feb 2016 16:05:43 +0000 (16:05 +0000)
committerJohn Spray <john.spray@redhat.com>
Thu, 10 Mar 2016 12:11:23 +0000 (12:11 +0000)
This is a big logic change throughout MDSMonitor,
and additonally some new commands to handle
per-fs variants of what used to be global things.

Signed-off-by: John Spray <john.spray@redhat.com>
src/mon/MDSMonitor.cc
src/mon/MDSMonitor.h
src/mon/MonCommands.h
src/mon/Monitor.cc
src/mon/OSDMonitor.cc

index b27d7dbb80bb7e65474949f02ac7b452a778cd2a..8653b8c8ff45c737b99c55857b3109910a80cd56 100644 (file)
@@ -1,4 +1,4 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
 // vim: ts=8 sw=2 smarttab
 /*
  * Ceph - scalable distributed file system
@@ -28,6 +28,7 @@
 #include "common/cmdparse.h"
 
 #include "messages/MMDSMap.h"
+#include "messages/MFSMap.h"
 #include "messages/MMDSBeacon.h"
 #include "messages/MMDSLoadTargets.h"
 #include "messages/MMonCommand.h"
 
 #define dout_subsys ceph_subsys_mon
 #undef dout_prefix
-#define dout_prefix _prefix(_dout, mon, mdsmap)
-static ostream& _prefix(std::ostream *_dout, Monitor *mon, MDSMap const& mdsmap) {
+#define dout_prefix _prefix(_dout, mon, fsmap)
+static ostream& _prefix(std::ostream *_dout, Monitor *mon, FSMap const& fsmap) {
   return *_dout << "mon." << mon->name << "@" << mon->rank
                << "(" << mon->get_state_name()
-               << ").mds e" << mdsmap.get_epoch() << " ";
+               << ").mds e" << fsmap.get_epoch() << " ";
 }
 
-
 /*
  * Specialized implementation of cmd_getval to allow us to parse
  * out strongly-typedef'd types
@@ -75,29 +75,43 @@ static const string MDS_METADATA_PREFIX("mds_metadata");
 
 // my methods
 
-void MDSMonitor::print_map(MDSMap &m, int dbl)
+void MDSMonitor::print_map(FSMap &m, int dbl)
 {
   dout(dbl) << "print_map\n";
   m.print(*_dout);
   *_dout << dendl;
 }
 
-void MDSMonitor::create_new_fs(MDSMap &m, const std::string &name, int metadata_pool, int data_pool)
+void MDSMonitor::create_new_fs(FSMap &fsm, const std::string &name,
+    int metadata_pool, int data_pool)
 {
-  m.enabled = true;
-  m.fs_name = name;
-  m.max_mds = g_conf->max_mds;
-  m.created = ceph_clock_now(g_ceph_context);
-  m.data_pools.insert(data_pool);
-  m.metadata_pool = metadata_pool;
-  m.cas_pool = -1;
-  m.compat = get_mdsmap_compat_set_default();
-
-  m.session_timeout = g_conf->mds_session_timeout;
-  m.session_autoclose = g_conf->mds_session_autoclose;
-  m.max_file_size = g_conf->mds_max_file_size;
-
-  print_map(m);
+  auto fs = std::make_shared<Filesystem>();
+  fs->mds_map.fs_name = name;
+  fs->mds_map.max_mds = g_conf->max_mds;
+  fs->mds_map.data_pools.insert(data_pool);
+  fs->mds_map.metadata_pool = metadata_pool;
+  fs->mds_map.cas_pool = -1;
+  fs->mds_map.max_file_size = g_conf->mds_max_file_size;
+  fs->mds_map.compat = fsm.compat;
+  fs->mds_map.created = ceph_clock_now(g_ceph_context);
+  fs->mds_map.modified = ceph_clock_now(g_ceph_context);
+  fs->mds_map.session_timeout = g_conf->mds_session_timeout;
+  fs->mds_map.session_autoclose = g_conf->mds_session_autoclose;
+  fs->mds_map.enabled = true;
+  fs->fscid = fsm.next_filesystem_id++;
+  fsm.filesystems[fs->fscid] = fs;
+
+  // ANONYMOUS is only for upgrades from legacy mdsmaps, we should
+  // have initialized next_filesystem_id such that it's never used here.
+  assert(fs->fscid != FS_CLUSTER_ID_ANONYMOUS);
+
+  // Created first filesystem?  Set it as the one
+  // for legacy clients to use
+  if (fsm.filesystems.size() == 1) {
+    fsm.legacy_client_fscid = fs->fscid;
+  }
+
+  print_map(fsm);
 }
 
 
@@ -105,21 +119,18 @@ void MDSMonitor::create_new_fs(MDSMap &m, const std::string &name, int metadata_
 void MDSMonitor::create_initial()
 {
   dout(10) << "create_initial" << dendl;
-
-  // Initial state is a disable MDS map
-  assert(mdsmap.get_enabled() == false);
 }
 
 
 void MDSMonitor::update_from_paxos(bool *need_bootstrap)
 {
   version_t version = get_last_committed();
-  if (version == mdsmap.epoch)
+  if (version == fsmap.epoch)
     return;
-  assert(version >= mdsmap.epoch);
 
   dout(10) << __func__ << " version " << version
-          << ", my e " << mdsmap.epoch << dendl;
+          << ", my e " << fsmap.epoch << dendl;
+  assert(version >= fsmap.epoch);
 
   // read and decode
   mdsmap_bl.clear();
@@ -128,11 +139,12 @@ void MDSMonitor::update_from_paxos(bool *need_bootstrap)
 
   assert(mdsmap_bl.length() > 0);
   dout(10) << __func__ << " got " << version << dendl;
-  mdsmap.decode(mdsmap_bl);
+  fsmap.decode(mdsmap_bl);
 
   // new map
   dout(4) << "new map" << dendl;
-  print_map(mdsmap, 0);
+  print_map(fsmap, 0);
+  fsmap.sanity();
 
   check_subs();
   update_logger();
@@ -145,28 +157,36 @@ void MDSMonitor::init()
 
 void MDSMonitor::create_pending()
 {
-  pending_mdsmap = mdsmap;
-  pending_mdsmap.epoch++;
-  dout(10) << "create_pending e" << pending_mdsmap.epoch << dendl;
+  pending_fsmap = fsmap;
+  pending_fsmap.epoch++;
+
+  dout(10) << "create_pending e" << pending_fsmap.epoch << dendl;
 }
 
 void MDSMonitor::encode_pending(MonitorDBStore::TransactionRef t)
 {
-  dout(10) << "encode_pending e" << pending_mdsmap.epoch << dendl;
+  dout(10) << "encode_pending e" << pending_fsmap.epoch << dendl;
 
-  pending_mdsmap.modified = ceph_clock_now(g_ceph_context);
 
   // print map iff 'debug mon = 30' or higher
-  print_map(pending_mdsmap, 30);
+  print_map(pending_fsmap, 30);
+  pending_fsmap.sanity();
+
+  // Set 'modified' on maps modified this epoch
+  for (auto &i : fsmap.filesystems) {
+    if (i.second->mds_map.epoch == fsmap.epoch) {
+      i.second->mds_map.modified = ceph_clock_now(g_ceph_context);
+    }
+  }
 
   // apply to paxos
-  assert(get_last_committed() + 1 == pending_mdsmap.epoch);
+  assert(get_last_committed() + 1 == pending_fsmap.epoch);
   bufferlist mdsmap_bl;
-  pending_mdsmap.encode(mdsmap_bl, mon->get_quorum_features());
+  pending_fsmap.encode(mdsmap_bl, mon->get_quorum_features());
 
   /* put everything in the transaction */
-  put_version(t, pending_mdsmap.epoch, mdsmap_bl);
-  put_last_committed(t, pending_mdsmap.epoch);
+  put_version(t, pending_fsmap.epoch, mdsmap_bl);
+  put_last_committed(t, pending_fsmap.epoch);
 
   // Encode MDSHealth data
   for (std::map<uint64_t, MDSHealth>::iterator i = pending_daemon_health.begin();
@@ -206,10 +226,20 @@ void MDSMonitor::update_logger()
 {
   dout(10) << "update_logger" << dendl;
 
-  mon->cluster_logger->set(l_cluster_num_mds_up, mdsmap.get_num_up_mds());
-  mon->cluster_logger->set(l_cluster_num_mds_in, mdsmap.get_num_in_mds());
-  mon->cluster_logger->set(l_cluster_num_mds_failed, mdsmap.get_num_failed_mds());
-  mon->cluster_logger->set(l_cluster_mds_epoch, mdsmap.get_epoch());
+  uint64_t up = 0;
+  uint64_t in = 0;
+  uint64_t failed = 0;
+  for (const auto &i : fsmap.filesystems) {
+    const MDSMap &mds_map = i.second->mds_map;
+
+    up += mds_map.get_num_up_mds();
+    in += mds_map.get_num_in_mds();
+    failed += mds_map.get_num_failed_mds();
+  }
+  mon->cluster_logger->set(l_cluster_num_mds_up, up);
+  mon->cluster_logger->set(l_cluster_num_mds_in, in);
+  mon->cluster_logger->set(l_cluster_num_mds_failed, failed);
+  mon->cluster_logger->set(l_cluster_mds_epoch, fsmap.get_epoch());
 }
 
 bool MDSMonitor::preprocess_query(MonOpRequestRef op)
@@ -253,6 +283,7 @@ bool MDSMonitor::preprocess_beacon(MonOpRequestRef op)
   mds_gid_t gid = m->get_global_id();
   version_t seq = m->get_seq();
   MDSMap::mds_info_t info;
+  epoch_t effective_epoch = 0;
 
   // check privileges, ignore if fails
   MonSession *session = m->get_session();
@@ -280,8 +311,8 @@ bool MDSMonitor::preprocess_beacon(MonOpRequestRef op)
   }
 
   // check compat
-  if (!m->get_compat().writeable(mdsmap.compat)) {
-    dout(1) << " mds " << m->get_source_inst() << " can't write to mdsmap " << mdsmap.compat << dendl;
+  if (!m->get_compat().writeable(fsmap.compat)) {
+    dout(1) << " mds " << m->get_source_inst() << " can't write to fsmap " << fsmap.compat << dendl;
     goto ignore;
   }
 
@@ -289,23 +320,23 @@ bool MDSMonitor::preprocess_beacon(MonOpRequestRef op)
   if (!mon->is_leader())
     return false;
 
-  if (pending_mdsmap.test_flag(CEPH_MDSMAP_DOWN)) {
-    dout(7) << " mdsmap DOWN flag set, ignoring mds " << m->get_source_inst() << " beacon" << dendl;
-    goto ignore;
-  }
-
   // booted, but not in map?
-  if (pending_mdsmap.is_dne_gid(gid)) {
+  if (!pending_fsmap.gid_exists(gid)) {
     if (state != MDSMap::STATE_BOOT) {
-      dout(7) << "mds_beacon " << *m << " is not in mdsmap (state "
+      dout(7) << "mds_beacon " << *m << " is not in fsmap (state "
               << ceph_mds_state_name(state) << ")" << dendl;
-      mon->send_reply(op, new MMDSMap(mon->monmap->fsid, &mdsmap));
+
+      MDSMap null_map;
+      null_map.epoch = fsmap.epoch;
+      null_map.compat = fsmap.compat;
+      mon->send_reply(op, new MMDSMap(mon->monmap->fsid, &null_map));
       return true;
     } else {
       return false;  // not booted yet.
     }
   }
-  info = pending_mdsmap.get_info_gid(gid);
+  dout(10) << __func__ << ": GID exists in map: " << gid << dendl;
+  info = pending_fsmap.get_info_gid(gid);
 
   // old seq?
   if (info.state_seq > seq) {
@@ -313,10 +344,19 @@ bool MDSMonitor::preprocess_beacon(MonOpRequestRef op)
     goto ignore;
   }
 
-  if (mdsmap.get_epoch() != m->get_last_epoch_seen()) {
-    dout(10) << "mds_beacon " << *m
-            << " ignoring requested state, because mds hasn't seen latest map" << dendl;
-    goto reply;
+  // Work out the latest epoch that this daemon should have seen
+  {
+    fs_cluster_id_t fscid = pending_fsmap.mds_roles.at(gid);
+    if (fscid == FS_CLUSTER_ID_NONE) {
+      effective_epoch = pending_fsmap.standby_epochs.at(gid);
+    } else {
+      effective_epoch = pending_fsmap.get_filesystem(fscid)->mds_map.epoch;
+    }
+    if (effective_epoch != m->get_last_epoch_seen()) {
+      dout(10) << "mds_beacon " << *m
+               << " ignoring requested state, because mds hasn't seen latest map" << dendl;
+      goto reply;
+    }
   }
 
   if (info.laggy()) {
@@ -347,19 +387,8 @@ bool MDSMonitor::preprocess_beacon(MonOpRequestRef op)
       goto reply;
     }
     
-    if (info.state == MDSMap::STATE_STANDBY &&
-       (state == MDSMap::STATE_STANDBY_REPLAY ||
-           state == MDSMap::STATE_ONESHOT_REPLAY) &&
-       (pending_mdsmap.is_degraded() ||
-        ((m->get_standby_for_rank() >= 0) &&
-            pending_mdsmap.get_state(m->get_standby_for_rank()) < MDSMap::STATE_ACTIVE))) {
-      dout(10) << "mds_beacon can't standby-replay mds." << m->get_standby_for_rank() << " at this time (cluster degraded, or mds not active)" << dendl;
-      dout(10) << "pending_mdsmap.is_degraded()==" << pending_mdsmap.is_degraded()
-          << " rank state: " << ceph_mds_state_name(pending_mdsmap.get_state(m->get_standby_for_rank())) << dendl;
-      goto reply;
-    }
     _note_beacon(m);
-    return false;  // need to update map
+    return false;
   }
 
   // Comparing known daemon health with m->get_health()
@@ -373,10 +402,11 @@ bool MDSMonitor::preprocess_beacon(MonOpRequestRef op)
 
  reply:
   // note time and reply
+  assert(effective_epoch > 0);
   _note_beacon(m);
   mon->send_reply(op,
                  new MMDSBeacon(mon->monmap->fsid, m->get_global_id(), m->get_name(),
-                                mdsmap.get_epoch(), state, seq,
+                                effective_epoch, state, seq,
                                 CEPH_FEATURES_SUPPORTED_DEFAULT));
   return true;
 
@@ -391,7 +421,6 @@ bool MDSMonitor::preprocess_offload_targets(MonOpRequestRef op)
   op->mark_mdsmon_event(__func__);
   MMDSLoadTargets *m = static_cast<MMDSLoadTargets*>(op->get_req());
   dout(10) << "preprocess_offload_targets " << *m << " from " << m->get_orig_source() << dendl;
-  mds_gid_t gid;
   
   // check privileges, ignore message if fails
   MonSession *session = m->get_session();
@@ -403,9 +432,8 @@ bool MDSMonitor::preprocess_offload_targets(MonOpRequestRef op)
     goto done;
   }
 
-  gid = m->global_id;
-  if (mdsmap.mds_info.count(gid) &&
-      m->targets == mdsmap.mds_info[gid].export_targets)
+  if (fsmap.gid_exists(m->global_id) &&
+      m->targets == fsmap.get_info_gid(m->global_id).export_targets)
     goto done;
 
   return false;
@@ -452,12 +480,6 @@ bool MDSMonitor::prepare_beacon(MonOpRequestRef op)
   MDSMap::DaemonState state = m->get_state();
   version_t seq = m->get_seq();
 
-  // Ignore beacons if filesystem is disabled
-  if (!mdsmap.get_enabled()) {
-    dout(1) << "warning, MDS " << m->get_orig_source_inst() << " up but filesystem disabled" << dendl;
-    return false;
-  }
-
   // Store health
   dout(20) << __func__ << " got health from gid " << gid << " with " << m->get_health().metrics.size() << " metrics." << dendl;
   pending_daemon_health[gid] = m->get_health();
@@ -467,7 +489,7 @@ bool MDSMonitor::prepare_beacon(MonOpRequestRef op)
     // zap previous instance of this name?
     if (g_conf->mds_enforce_unique_name) {
       bool failed_mds = false;
-      while (mds_gid_t existing = pending_mdsmap.find_mds_gid_by_name(m->get_name())) {
+      while (mds_gid_t existing = pending_fsmap.find_mds_gid_by_name(m->get_name())) {
         if (!mon->osdmon()->is_writeable()) {
           mon->osdmon()->wait_for_writeable(op, new C_RetryMessage(this, op));
           return false;
@@ -481,26 +503,35 @@ bool MDSMonitor::prepare_beacon(MonOpRequestRef op)
       }
     }
 
-    // add
-    MDSMap::mds_info_t& info = pending_mdsmap.mds_info[gid];
-    info.global_id = gid;
-    info.name = m->get_name();
-    info.rank = -1;
-    info.addr = addr;
-    info.mds_features = m->get_mds_features();
-    info.state = MDSMap::STATE_STANDBY;
-    info.state_seq = seq;
-    info.standby_for_rank = m->get_standby_for_rank();
-    info.standby_for_name = m->get_standby_for_name();
+    // Add this daemon to the map
+    if (pending_fsmap.mds_roles.count(gid) == 0) {
+      MDSMap::mds_info_t new_info;
+      new_info.global_id = gid;
+      new_info.name = m->get_name();
+      new_info.addr = addr;
+      new_info.mds_features = m->get_mds_features();
+      new_info.state = MDSMap::STATE_STANDBY;
+      new_info.state_seq = seq;
+      new_info.standby_for_rank = m->get_standby_for_rank();
+      new_info.standby_for_name = m->get_standby_for_name();
+      pending_fsmap.insert(new_info);
+    }
 
+    // Resolve standby_for_name to a rank
+    const MDSMap::mds_info_t &info = pending_fsmap.get_info_gid(gid);
     if (!info.standby_for_name.empty()) {
-      const MDSMap::mds_info_t *leaderinfo = mdsmap.find_by_name(info.standby_for_name);
+      const MDSMap::mds_info_t *leaderinfo = fsmap.find_by_name(
+          info.standby_for_name);
       if (leaderinfo && (leaderinfo->rank >= 0)) {
-        info.standby_for_rank =
-            mdsmap.find_by_name(info.standby_for_name)->rank;
-        if (mdsmap.is_followable(info.standby_for_rank)) {
-          info.state = MDSMap::STATE_STANDBY_REPLAY;
-        }
+        auto fscid = pending_fsmap.mds_roles.at(leaderinfo->global_id);
+        auto fs = pending_fsmap.get_filesystem(fscid);
+        bool followable = fs->mds_map.is_followable(leaderinfo->rank);
+
+        pending_fsmap.modify_daemon(gid, [fscid, leaderinfo, followable](
+              MDSMap::mds_info_t *info) {
+            info->standby_for_rank = leaderinfo->rank;
+            info->standby_for_ns = fscid;
+        });
       }
     }
 
@@ -509,17 +540,18 @@ bool MDSMonitor::prepare_beacon(MonOpRequestRef op)
     last_beacon[gid].seq = seq;
 
     // new incompat?
-    if (!pending_mdsmap.compat.writeable(m->get_compat())) {
-      dout(10) << " mdsmap " << pending_mdsmap.compat << " can't write to new mds' " << m->get_compat()
-              << ", updating mdsmap and killing old mds's"
+    if (!pending_fsmap.compat.writeable(m->get_compat())) {
+      dout(10) << " fsmap " << pending_fsmap.compat
+               << " can't write to new mds' " << m->get_compat()
+              << ", updating fsmap and killing old mds's"
               << dendl;
-      pending_mdsmap.compat = m->get_compat();
+      pending_fsmap.update_compat(m->get_compat());
     }
 
     update_metadata(m->get_global_id(), m->get_sys_info());
   } else {
-    // state change
-    MDSMap::mds_info_t& info = pending_mdsmap.get_info_gid(gid);
+    // state update
+    const MDSMap::mds_info_t &info = pending_fsmap.get_info_gid(gid);
 
     if (info.state == MDSMap::STATE_STOPPING && state != MDSMap::STATE_STOPPED ) {
       // we can't transition to any other states from STOPPING
@@ -531,7 +563,11 @@ bool MDSMonitor::prepare_beacon(MonOpRequestRef op)
 
     if (info.laggy()) {
       dout(10) << "prepare_beacon clearing laggy flag on " << addr << dendl;
-      info.clear_laggy();
+      pending_fsmap.modify_daemon(info.global_id, [](MDSMap::mds_info_t *info)
+        {
+          info->clear_laggy();
+        }
+      );
     }
   
     dout(10) << "prepare_beacon mds." << info.rank
@@ -540,37 +576,70 @@ bool MDSMonitor::prepare_beacon(MonOpRequestRef op)
             << "  standby_for_rank=" << m->get_standby_for_rank()
             << dendl;
     if (state == MDSMap::STATE_STOPPED) {
-      pending_mdsmap.up.erase(info.rank);
-      pending_mdsmap.in.erase(info.rank);
-      pending_mdsmap.stopped.insert(info.rank);
-      pending_mdsmap.mds_info.erase(gid);  // last! info is a ref into this map
+      pending_fsmap.stop(gid);
       last_beacon.erase(gid);
     } else if (state == MDSMap::STATE_STANDBY_REPLAY) {
       if (m->get_standby_for_rank() == MDSMap::MDS_STANDBY_NAME) {
-        /* convert name to rank. If we don't have it, do nothing. The
-        mds will stay in standby and keep requesting the state change */
         dout(20) << "looking for mds " << m->get_standby_for_name()
                   << " to STANDBY_REPLAY for" << dendl;
-        const MDSMap::mds_info_t *found_mds = NULL;
-        if ((found_mds = mdsmap.find_by_name(m->get_standby_for_name())) &&
-            (found_mds->rank >= 0) &&
-           mdsmap.is_followable(found_mds->rank)) {
-          info.standby_for_rank = found_mds->rank;
+        auto target_info = pending_fsmap.find_by_name(m->get_standby_for_name());
+        if (target_info == nullptr) {
+          // This name is unknown, do nothing, stay in standby
+          return false;
+        }
+
+        auto target_ns = pending_fsmap.mds_roles.at(target_info->global_id);
+        if (target_ns == FS_CLUSTER_ID_NONE) {
+          // The named daemon is not in a Filesystem, do nothing.
+          return false;
+        }
+
+        auto target_fs = pending_fsmap.get_filesystem(target_ns);
+        if (target_fs->mds_map.is_followable(info.rank)) {
           dout(10) <<" found mds " << m->get_standby_for_name()
-                       << "; it has rank " << info.standby_for_rank << dendl;
-          info.state = MDSMap::STATE_STANDBY_REPLAY;
-          info.state_seq = seq;
+                       << "; it has rank " << target_info->rank << dendl;
+          pending_fsmap.modify_daemon(info.global_id,
+              [target_info, target_ns, seq](MDSMap::mds_info_t *info) {
+            info->standby_for_rank = target_info->rank;
+            info->standby_for_ns = target_ns;
+            info->state = MDSMap::STATE_STANDBY_REPLAY;
+            info->state_seq = seq;
+          });
+        } else {
+          // The named daemon has a rank but isn't followable, do nothing
+          return false;
+        }
+      } else if (m->get_standby_for_rank() >= 0) {
+        // TODO get this from MDS message
+        // >>
+        fs_cluster_id_t target_ns = FS_CLUSTER_ID_NONE;
+        // <<
+
+        mds_role_t target_role = {
+          target_ns == FS_CLUSTER_ID_NONE ?
+            pending_fsmap.legacy_client_fscid : info.standby_for_ns,
+          m->get_standby_for_rank()};
+
+        if (target_role.fscid != FS_CLUSTER_ID_NONE) {
+          auto fs = pending_fsmap.get_filesystem(target_role.fscid);
+          if (fs->mds_map.is_followable(target_role.rank)) {
+            pending_fsmap.modify_daemon(info.global_id,
+                [target_role, seq](MDSMap::mds_info_t *info) {
+              info->standby_for_rank = target_role.rank;
+              info->standby_for_ns = target_role.fscid;
+              info->state = MDSMap::STATE_STANDBY_REPLAY;
+              info->state_seq = seq;
+            });
+          } else {
+            // We know who they want to follow, but he's not in a suitable state
+            return false;
+          }
         } else {
+          // Couldn't resolve to a particular filesystem
           return false;
         }
-      } else if (m->get_standby_for_rank() >= 0 &&
-                mdsmap.is_followable(m->get_standby_for_rank())) {
-        /* switch to standby-replay for this MDS*/
-        info.state = MDSMap::STATE_STANDBY_REPLAY;
-        info.state_seq = seq;
-        info.standby_for_rank = m->get_standby_for_rank();
       } else { //it's a standby for anybody, and is already in the list
-        assert(pending_mdsmap.get_mds_info().count(info.global_id));
+        assert(pending_fsmap.get_mds_info().count(info.global_id));
         return false;
       }
     } else if (state == MDSMap::STATE_DAMAGED) {
@@ -586,27 +655,17 @@ bool MDSMonitor::prepare_beacon(MonOpRequestRef op)
       dout(4) << __func__ << ": marking rank "
               << info.rank << " damaged" << dendl;
 
-
-      // Blacklist this MDS daemon
       const utime_t until = ceph_clock_now(g_ceph_context);
-      pending_mdsmap.last_failure_osd_epoch = mon->osdmon()->blacklist(
-          info.addr, until);
+      const auto blacklist_epoch = mon->osdmon()->blacklist(info.addr, until);
       request_proposal(mon->osdmon());
-
-      // Clear out daemon state and add rank to damaged list
-      pending_mdsmap.up.erase(info.rank);
-      pending_mdsmap.damaged.insert(info.rank);
+      pending_fsmap.damaged(gid, blacklist_epoch);
       last_beacon.erase(gid);
 
-      // Call erase() last because the `info` reference becomes invalid
-      // after we remove the instance from the map.
-      pending_mdsmap.mds_info.erase(gid);
-
       // Respond to MDS, so that it knows it can continue to shut down
       mon->send_reply(op,
                      new MMDSBeacon(
                        mon->monmap->fsid, m->get_global_id(),
-                       m->get_name(), mdsmap.get_epoch(), state, seq,
+                       m->get_name(), fsmap.get_epoch(), state, seq,
                        CEPH_FEATURES_SUPPORTED_DEFAULT));
     } else if (state == MDSMap::STATE_DNE) {
       if (!mon->osdmon()->is_writeable()) {
@@ -624,16 +683,18 @@ bool MDSMonitor::prepare_beacon(MonOpRequestRef op)
       mon->send_reply(op,
                      new MMDSBeacon(
                        mon->monmap->fsid, m->get_global_id(),
-                       m->get_name(), mdsmap.get_epoch(), state, seq,
+                       m->get_name(), fsmap.get_epoch(), state, seq,
                        CEPH_FEATURES_SUPPORTED_DEFAULT));
     } else {
-      info.state = state;
-      info.state_seq = seq;
+      pending_fsmap.modify_daemon(gid, [state, seq](MDSMap::mds_info_t *info) {
+        info->state = state;
+        info->state_seq = seq;
+      });
     }
   }
 
   dout(7) << "prepare_beacon pending map now:" << dendl;
-  print_map(pending_mdsmap);
+  print_map(pending_fsmap);
   
   wait_for_finished_proposal(op, new C_Updated(this, op));
 
@@ -645,9 +706,9 @@ bool MDSMonitor::prepare_offload_targets(MonOpRequestRef op)
   op->mark_mdsmon_event(__func__);
   MMDSLoadTargets *m = static_cast<MMDSLoadTargets*>(op->get_req());
   mds_gid_t gid = m->global_id;
-  if (pending_mdsmap.mds_info.count(gid)) {
+  if (pending_fsmap.gid_has_rank(gid)) {
     dout(10) << "prepare_offload_targets " << gid << " " << m->targets << dendl;
-    pending_mdsmap.mds_info[gid].export_targets = m->targets;
+    pending_fsmap.update_export_targets(gid, m->targets);
   } else {
     dout(10) << "prepare_offload_targets " << gid << " not in map" << dendl;
   }
@@ -670,12 +731,15 @@ void MDSMonitor::_updated(MonOpRequestRef op)
 
   if (m->get_state() == MDSMap::STATE_STOPPED) {
     // send the map manually (they're out of the map, so they won't get it automatic)
-    mon->send_reply(op, new MMDSMap(mon->monmap->fsid, &mdsmap));
+    MDSMap null_map;
+    null_map.epoch = fsmap.epoch;
+    null_map.compat = fsmap.compat;
+    mon->send_reply(op, new MMDSMap(mon->monmap->fsid, &null_map));
   } else {
     mon->send_reply(op, new MMDSBeacon(mon->monmap->fsid,
                                       m->get_global_id(),
                                       m->get_name(),
-                                      mdsmap.get_epoch(),
+                                      fsmap.get_epoch(),
                                       m->get_state(),
                                       m->get_seq(),
                                       CEPH_FEATURES_SUPPORTED_DEFAULT));
@@ -688,23 +752,26 @@ void MDSMonitor::on_active()
   update_logger();
 
   if (mon->is_leader())
-    mon->clog->info() << "mdsmap " << mdsmap << "\n";
+    mon->clog->info() << "fsmap " << fsmap << "\n";
 }
 
 void MDSMonitor::get_health(list<pair<health_status_t, string> >& summary,
                            list<pair<health_status_t, string> > *detail,
                            CephContext* cct) const
 {
-  mdsmap.get_health(summary, detail);
+  fsmap.get_health(summary, detail);
 
   // For each MDS GID...
-  for (std::map<mds_gid_t, MDSMap::mds_info_t>::const_iterator i = mdsmap.mds_info.begin();
-      i != mdsmap.mds_info.end(); ++i) {
+  const auto info_map = fsmap.get_mds_info();
+  for (const auto &i : info_map) {
+    const auto &gid = i.first;
+    const auto &info = i.second;
+
     // Decode MDSHealth
     bufferlist bl;
-    mon->store->get(MDS_HEALTH_PREFIX, stringify(i->first), bl);
+    mon->store->get(MDS_HEALTH_PREFIX, stringify(gid), bl);
     if (!bl.length()) {
-      derr << "Missing health data for MDS " << i->first << dendl;
+      derr << "Missing health data for MDS " << gid << dendl;
       continue;
     }
     MDSHealth health;
@@ -712,7 +779,7 @@ void MDSMonitor::get_health(list<pair<health_status_t, string> >& summary,
     health.decode(bl_i);
 
     for (std::list<MDSHealthMetric>::iterator j = health.metrics.begin(); j != health.metrics.end(); ++j) {
-      int const rank = i->second.rank;
+      int const rank = info.rank;
       std::ostringstream message;
       message << "mds" << rank << ": " << j->message;
       summary.push_back(std::make_pair(j->sev, message.str()));
@@ -742,14 +809,129 @@ void MDSMonitor::get_health(list<pair<health_status_t, string> >& summary,
 
 void MDSMonitor::dump_info(Formatter *f)
 {
-  f->open_object_section("mdsmap");
-  mdsmap.dump(f);
+  f->open_object_section("fsmap");
+  fsmap.dump(f);
   f->close_section();
 
   f->dump_unsigned("mdsmap_first_committed", get_first_committed());
   f->dump_unsigned("mdsmap_last_committed", get_last_committed());
 }
 
+class FileSystemCommandHandler
+{
+protected:
+  std::string prefix;
+
+  /**
+   * Parse true|yes|1 style boolean string from `bool_str`
+   * `result` must be non-null.
+   * `ss` will be populated with error message on error.
+   *
+   * @return 0 on success, else -EINVAL
+   */
+  int parse_bool(
+      const std::string &bool_str,
+      bool *result,
+      std::ostream &ss)
+  {
+    assert(result != nullptr);
+
+    string interr;
+    int64_t n = strict_strtoll(bool_str.c_str(), 10, &interr);
+
+    if (bool_str == "false" || bool_str == "no"
+        || (interr.length() == 0 && n == 0)) {
+      *result = false;
+      return 0;
+    } else if (bool_str == "true" || bool_str == "yes"
+        || (interr.length() == 0 && n == 1)) {
+      *result = true;
+      return 0;
+    } else {
+      ss << "value must be false|no|0 or true|yes|1";
+      return -EINVAL;
+    }
+  }
+
+  /**
+   * Return 0 if the pool is suitable for use with CephFS, or
+   * in case of errors return a negative error code, and populate
+   * the passed stringstream with an explanation.
+   */
+  int _check_pool(
+      OSDMap &osd_map,
+      const int64_t pool_id,
+      std::stringstream *ss) const
+  {
+    assert(ss != NULL);
+
+    const pg_pool_t *pool = osd_map.get_pg_pool(pool_id);
+    if (!pool) {
+      *ss << "pool id '" << pool_id << "' does not exist";
+      return -ENOENT;
+    }
+
+    const string& pool_name = osd_map.get_pool_name(pool_id);
+
+    if (pool->is_erasure()) {
+      // EC pools are only acceptable with a cache tier overlay
+      if (!pool->has_tiers() || !pool->has_read_tier() || !pool->has_write_tier()) {
+        *ss << "pool '" << pool_name << "' (id '" << pool_id << "')"
+           << " is an erasure-code pool";
+        return -EINVAL;
+      }
+
+      // That cache tier overlay must be writeback, not readonly (it's the
+      // write operations like modify+truncate we care about support for)
+      const pg_pool_t *write_tier = osd_map.get_pg_pool(
+          pool->write_tier);
+      assert(write_tier != NULL);  // OSDMonitor shouldn't allow DNE tier
+      if (write_tier->cache_mode == pg_pool_t::CACHEMODE_FORWARD
+          || write_tier->cache_mode == pg_pool_t::CACHEMODE_READONLY) {
+        *ss << "EC pool '" << pool_name << "' has a write tier ("
+            << osd_map.get_pool_name(pool->write_tier)
+            << ") that is configured "
+               "to forward writes.  Use a cache mode such as 'writeback' for "
+               "CephFS";
+        return -EINVAL;
+      }
+    }
+
+    if (pool->is_tier()) {
+      *ss << " pool '" << pool_name << "' (id '" << pool_id
+        << "') is already in use as a cache tier.";
+      return -EINVAL;
+    }
+
+    // Nothing special about this pool, so it is permissible
+    return 0;
+  }
+
+  virtual std::string const &get_prefix() {return prefix;}
+
+public:
+  FileSystemCommandHandler(const std::string &prefix_)
+    : prefix(prefix_)
+  {
+
+  }
+  virtual ~FileSystemCommandHandler()
+  {
+  }
+
+  bool can_handle(std::string const &prefix_)
+  {
+    return get_prefix() == prefix_;
+  }
+
+  virtual int handle(
+    Monitor *mon,
+    FSMap &fsmap,
+    MonOpRequestRef op,
+    map<string, cmd_vartype> &cmdmap,
+    std::stringstream &ss) = 0;
+};
+
 bool MDSMonitor::preprocess_command(MonOpRequestRef op)
 {
   op->mark_mdsmon_event(__func__);
@@ -785,14 +967,14 @@ bool MDSMonitor::preprocess_command(MonOpRequestRef op)
       f->close_section();
       f->flush(ds);
     } else {
-      ds << mdsmap;
+      ds << fsmap;
     }
     r = 0;
   } else if (prefix == "mds dump") {
     int64_t epocharg;
     epoch_t epoch;
 
-    MDSMap *p = &mdsmap;
+    FSMap *p = &fsmap;
     if (cmd_getval(g_ceph_context, cmdmap, "epoch", epocharg)) {
       epoch = epocharg;
       bufferlist b;
@@ -803,14 +985,61 @@ bool MDSMonitor::preprocess_command(MonOpRequestRef op)
       } else {
        assert(err == 0);
        assert(b.length());
-       p = new MDSMap;
+       p = new FSMap;
        p->decode(b);
       }
     }
     if (p) {
       stringstream ds;
+      const MDSMap *mdsmap = nullptr;
+      MDSMap blank;
+      blank.epoch = fsmap.epoch;
+      if (fsmap.legacy_client_fscid != FS_CLUSTER_ID_NONE) {
+        mdsmap = &(fsmap.filesystems[fsmap.legacy_client_fscid]->mds_map);
+      } else {
+        mdsmap = &blank;
+      }
       if (f != NULL) {
        f->open_object_section("mdsmap");
+       mdsmap->dump(f.get());
+       f->close_section();
+       f->flush(ds);
+       r = 0;
+      } else {
+       mdsmap->print(ds);
+       r = 0;
+      } 
+      if (r == 0) {
+       rdata.append(ds);
+       ss << "dumped fsmap epoch " << p->get_epoch();
+      }
+      if (p != &fsmap) {
+       delete p;
+      }
+    }
+  } else if (prefix == "fs dump") {
+    int64_t epocharg;
+    epoch_t epoch;
+
+    FSMap *p = &fsmap;
+    if (cmd_getval(g_ceph_context, cmdmap, "epoch", epocharg)) {
+      epoch = epocharg;
+      bufferlist b;
+      int err = get_version(epoch, b);
+      if (err == -ENOENT) {
+       p = 0;
+       r = -ENOENT;
+      } else {
+       assert(err == 0);
+       assert(b.length());
+       p = new FSMap;
+       p->decode(b);
+      }
+    }
+    if (p) {
+      stringstream ds;
+      if (f != NULL) {
+       f->open_object_section("fsmap");
        p->dump(f.get());
        f->close_section();
        f->flush(ds);
@@ -821,9 +1050,9 @@ bool MDSMonitor::preprocess_command(MonOpRequestRef op)
       } 
       if (r == 0) {
        rdata.append(ds);
-       ss << "dumped mdsmap epoch " << p->get_epoch();
+       ss << "dumped fsmap epoch " << p->get_epoch();
       }
-      if (p != &mdsmap)
+      if (p != &fsmap)
        delete p;
     }
   } else if (prefix == "mds metadata") {
@@ -847,15 +1076,15 @@ bool MDSMonitor::preprocess_command(MonOpRequestRef op)
       } else {
        assert(err == 0);
        assert(b.length());
-       MDSMap mm;
+       FSMap mm;
        mm.decode(b);
        mm.encode(rdata, m->get_connection()->get_features());
-       ss << "got mdsmap epoch " << mm.get_epoch();
+       ss << "got fsmap epoch " << mm.get_epoch();
        r = 0;
       }
     } else {
-      mdsmap.encode(rdata, m->get_connection()->get_features());
-      ss << "got mdsmap epoch " << mdsmap.get_epoch();
+      fsmap.encode(rdata, m->get_connection()->get_features());
+      ss << "got fsmap epoch " << fsmap.get_epoch();
       r = 0;
     }
   } else if (prefix == "mds tell") {
@@ -866,12 +1095,10 @@ bool MDSMonitor::preprocess_command(MonOpRequestRef op)
 
     if (whostr == "*") {
       r = -ENOENT;
-      const map<mds_gid_t, MDSMap::mds_info_t> mds_info = mdsmap.get_mds_info();
-      for (map<mds_gid_t, MDSMap::mds_info_t>::const_iterator i = mds_info.begin();
-          i != mds_info.end();
-          ++i) {
+      const auto info_map = fsmap.get_mds_info();
+      for (const auto &i : info_map) {
        m->cmd = args_vec;
-       mon->send_command(i->second.get_inst(), m->cmd);
+       mon->send_command(i.second.get_inst(), m->cmd);
        r = 0;
       }
       if (r == -ENOENT) {
@@ -880,48 +1107,76 @@ bool MDSMonitor::preprocess_command(MonOpRequestRef op)
        ss << "ok";
       }
     } else {
-      errno = 0;
-      long who_l = strtol(whostr.c_str(), 0, 10);
-      if (!errno && who_l >= 0) {
-        mds_rank_t who = mds_rank_t(who_l);
-       if (mdsmap.is_up(who)) {
-         m->cmd = args_vec;
-         mon->send_command(mdsmap.get_inst(who), m->cmd);
-         r = 0;
-         ss << "ok";
-       } else {
-         ss << "mds." << who << " not up";
-         r = -ENOENT;
-       }
-      } else ss << "specify mds number or *";
+      if (fsmap.legacy_client_fscid) {
+        auto fs = fsmap.filesystems.at(fsmap.legacy_client_fscid);
+        errno = 0;
+        long who_l = strtol(whostr.c_str(), 0, 10);
+        if (!errno && who_l >= 0) {
+          mds_rank_t who = mds_rank_t(who_l);
+          if (fs->mds_map.is_up(who)) {
+            m->cmd = args_vec;
+            mon->send_command(fs->mds_map.get_inst(who), m->cmd);
+            r = 0;
+            ss << "ok";
+          } else {
+            ss << "mds." << who << " not up";
+            r = -ENOENT;
+          }
+        } else {
+          ss << "specify mds number or *";
+        }
+      } else {
+        ss << "no legacy filesystem set";
+      }
     }
   } else if (prefix == "mds compat show") {
       if (f) {
        f->open_object_section("mds_compat");
-       mdsmap.compat.dump(f.get());
+       fsmap.compat.dump(f.get());
        f->close_section();
        f->flush(ds);
       } else {
-       ds << mdsmap.compat;
+       ds << fsmap.compat;
       }
       r = 0;
+  } else if (prefix == "fs get") {
+    string fs_name;
+    cmd_getval(g_ceph_context, cmdmap, "fs_name", fs_name);
+    auto fs = fsmap.get_filesystem(fs_name);
+    if (fs == nullptr) {
+      ss << "filesystem '" << fs_name << "' not found";
+      r = -ENOENT;
+    } else {
+      if (f != nullptr) {
+        f->open_object_section("filesystem");
+        fs->dump(f.get());
+        f->close_section();
+        f->flush(ds);
+        r = 0;
+      } else {
+        fs->print(ds);
+        r = 0;
+      }
+    }
   } else if (prefix == "fs ls") {
     if (f) {
       f->open_array_section("filesystems");
       {
-        if (mdsmap.get_enabled()) {
+        for (const auto i : fsmap.filesystems) {
+          const auto fs = i.second;
           f->open_object_section("filesystem");
           {
-            f->dump_string("name", mdsmap.fs_name);
-            const string &md_pool_name = mon->osdmon()->osdmap.get_pool_name(mdsmap.metadata_pool);
+            const MDSMap &mds_map = fs->mds_map;
+            f->dump_string("name", mds_map.fs_name);
             /* Output both the names and IDs of pools, for use by
              * humans and machines respectively */
-            f->dump_string("metadata_pool", md_pool_name);
-            f->dump_int("metadata_pool_id", mdsmap.metadata_pool);
+            f->dump_string("metadata_pool", mon->osdmon()->osdmap.get_pool_name(
+                  mds_map.metadata_pool));
+            f->dump_int("metadata_pool_id", mds_map.metadata_pool);
             f->open_array_section("data_pool_ids");
             {
-              for (std::set<int64_t>::iterator dpi = mdsmap.data_pools.begin();
-                   dpi != mdsmap.data_pools.end(); ++dpi) {
+              for (auto dpi = mds_map.data_pools.begin();
+                   dpi != mds_map.data_pools.end(); ++dpi) {
                 f->dump_int("data_pool_id", *dpi);
               }
             }
@@ -929,10 +1184,11 @@ bool MDSMonitor::preprocess_command(MonOpRequestRef op)
 
             f->open_array_section("data_pools");
             {
-                for (std::set<int64_t>::iterator dpi = mdsmap.data_pools.begin();
-                   dpi != mdsmap.data_pools.end(); ++dpi) {
-                  const string &pool_name = mon->osdmon()->osdmap.get_pool_name(*dpi);
-                  f->dump_string("data_pool", pool_name);
+                for (auto dpi = mds_map.data_pools.begin();
+                   dpi != mds_map.data_pools.end(); ++dpi) {
+                  const auto &name = mon->osdmon()->osdmap.get_pool_name(
+                      *dpi);
+                  f->dump_string("data_pool", name);
                 }
             }
 
@@ -944,17 +1200,23 @@ bool MDSMonitor::preprocess_command(MonOpRequestRef op)
       f->close_section();
       f->flush(ds);
     } else {
-      if (mdsmap.get_enabled()) {
-        const string &md_pool_name = mon->osdmon()->osdmap.get_pool_name(mdsmap.metadata_pool);
+      for (const auto i : fsmap.filesystems) {
+        const auto fs = i.second;
+        const MDSMap &mds_map = fs->mds_map;
+        const string &md_pool_name = mon->osdmon()->osdmap.get_pool_name(
+            mds_map.metadata_pool);
         
-        ds << "name: " << mdsmap.fs_name << ", metadata pool: " << md_pool_name << ", data pools: [";
-        for (std::set<int64_t>::iterator dpi = mdsmap.data_pools.begin();
-           dpi != mdsmap.data_pools.end(); ++dpi) {
+        ds << "name: " << mds_map.fs_name << ", metadata pool: "
+           << md_pool_name << ", data pools: [";
+        for (std::set<int64_t>::iterator dpi = mds_map.data_pools.begin();
+           dpi != mds_map.data_pools.end(); ++dpi) {
           const string &pool_name = mon->osdmon()->osdmap.get_pool_name(*dpi);
           ds << pool_name << " ";
         }
         ds << "]" << std::endl;
-      } else {
+      }
+
+      if (fsmap.filesystems.empty()) {
         ds << "No filesystems enabled" << std::endl;
       }
     }
@@ -971,86 +1233,75 @@ bool MDSMonitor::preprocess_command(MonOpRequestRef op)
     return false;
 }
 
-void MDSMonitor::fail_mds_gid(mds_gid_t gid)
+bool MDSMonitor::fail_mds_gid(mds_gid_t gid)
 {
-  assert(pending_mdsmap.mds_info.count(gid));
-  MDSMap::mds_info_t& info = pending_mdsmap.mds_info[gid];
-  dout(10) << "fail_mds_gid " << gid << " mds." << info.name << " rank " << info.rank << dendl;
-
-  utime_t until = ceph_clock_now(g_ceph_context);
-  until += g_conf->mds_blacklist_interval;
-
-  pending_mdsmap.last_failure_osd_epoch = mon->osdmon()->blacklist(info.addr, until);
-
-  if (info.rank >= 0) {
-    if (info.state == MDSMap::STATE_CREATING) {
-      // If this gid didn't make it past CREATING, then forget
-      // the rank ever existed so that next time it's handed out
-      // to a gid it'll go back into CREATING.
-      pending_mdsmap.in.erase(info.rank);
-    } else {
-      // Put this rank into the failed list so that the next available STANDBY will
-      // pick it up.
-      pending_mdsmap.failed.insert(info.rank);
-    }
-    pending_mdsmap.up.erase(info.rank);
+  const MDSMap::mds_info_t info = pending_fsmap.get_info_gid(gid);
+  dout(10) << "fail_mds_gid " << gid << " mds." << info.name << " role " << info.rank << dendl;
+
+  epoch_t blacklist_epoch = 0;
+  if (info.rank >= 0 && info.state != MDSMap::STATE_STANDBY_REPLAY) {
+    utime_t until = ceph_clock_now(g_ceph_context);
+    until += g_conf->mds_blacklist_interval;
+    blacklist_epoch = mon->osdmon()->blacklist(info.addr, until);
   }
 
-  pending_mdsmap.mds_info.erase(gid);
-
+  pending_fsmap.erase(gid, blacklist_epoch);
   last_beacon.erase(gid);
+  if (pending_daemon_health.count(gid)) {
+    pending_daemon_health.erase(gid);
+    pending_daemon_health_rm.insert(gid);
+  }
+
+  return blacklist_epoch != 0;
 }
 
 mds_gid_t MDSMonitor::gid_from_arg(const std::string& arg, std::ostream &ss)
 {
+  // Try parsing as a role
+  mds_role_t role;
+  std::ostringstream ignore_err;  // Don't spam 'ss' with parse_role errors
+  int r = parse_role(arg, &role, ignore_err);
+  if (r == 0) {
+    // See if a GID is assigned to this role
+    auto fs = pending_fsmap.get_filesystem(role.fscid);
+    assert(fs != nullptr);  // parse_role ensures it exists
+    if (fs->mds_map.is_up(role.rank)) {
+      dout(10) << __func__ << ": validated rank/GID " << role
+               << " as a rank" << dendl;
+      return fs->mds_map.get_mds_info(role.rank).global_id;
+    }
+  }
+
+  // Try parsing as a gid
   std::string err;
-  unsigned long long rank_or_gid = strict_strtoll(arg.c_str(), 10, &err);
+  unsigned long long maybe_gid = strict_strtoll(arg.c_str(), 10, &err);
   if (!err.empty()) {
-    // Try to interpret the arg as an MDS name
-    const MDSMap::mds_info_t *mds_info = mdsmap.find_by_name(arg);
+    // Not a role or a GID, try as a daemon name
+    const MDSMap::mds_info_t *mds_info = fsmap.find_by_name(arg);
     if (!mds_info) {
       ss << "MDS named '" << arg
         << "' does not exist, or is not up";
       return MDS_GID_NONE;
     }
-    if (mds_info->rank >= 0) {
-      dout(10) << __func__ << ": resolved MDS name '" << arg << "' to rank " << rank_or_gid << dendl;
-      rank_or_gid = (unsigned long long)(mds_info->rank);
-    } else {
-      dout(10) << __func__ << ": resolved MDS name '" << arg << "' to GID " << rank_or_gid << dendl;
-      rank_or_gid = mds_info->global_id;
-    }
+    dout(10) << __func__ << ": resolved MDS name '" << arg
+             << "' to GID " << mds_info->global_id << dendl;
+    return mds_info->global_id;
   } else {
+    // Not a role, but parses as a an integer, might be a GID
     dout(10) << __func__ << ": treating MDS reference '" << arg
-            << "' as an integer " << rank_or_gid << dendl;
-  }
-
-  if (mon->is_leader()) {
-    if (pending_mdsmap.up.count(mds_rank_t(rank_or_gid))) {
-      dout(10) << __func__ << ": validated rank/GID " << rank_or_gid
-              << " as a rank" << dendl;
-      mds_gid_t gid = pending_mdsmap.up[mds_rank_t(rank_or_gid)];
-      if (pending_mdsmap.mds_info.count(gid)) {
-       return gid;
-      } else {
-       dout(10) << __func__ << ": GID " << rank_or_gid << " was removed." << dendl;
-       return MDS_GID_NONE;
+            << "' as an integer " << maybe_gid << dendl;
+    if (mon->is_leader()) {
+      if (pending_fsmap.gid_exists(mds_gid_t(maybe_gid))) {
+        return mds_gid_t(maybe_gid);
+      }
+    } else {
+      if (fsmap.gid_exists(mds_gid_t(maybe_gid))) {
+        return mds_gid_t(maybe_gid);
       }
-    } else if (pending_mdsmap.mds_info.count(mds_gid_t(rank_or_gid))) {
-      dout(10) << __func__ << ": validated rank/GID " << rank_or_gid
-              << " as a GID" << dendl;
-      return mds_gid_t(rank_or_gid);
-    }
-  } else {
-    // mon is a peon
-    if (mdsmap.have_inst(mds_rank_t(rank_or_gid))) {
-      return mdsmap.get_info(mds_rank_t(rank_or_gid)).global_id;
-    } else if (mdsmap.get_state_gid(mds_gid_t(rank_or_gid))) {
-      return mds_gid_t(rank_or_gid);
     }
   }
 
-  dout(1) << __func__ << ": rank/GID " << rank_or_gid
+  dout(1) << __func__ << ": rank/GID " << arg
          << " not a existent rank or GID" << dendl;
   return MDS_GID_NONE;
 }
@@ -1096,6 +1347,20 @@ bool MDSMonitor::prepare_command(MonOpRequestRef op)
     return true;
   }
 
+  for (auto h : handlers) {
+    if (h->can_handle(prefix)) {
+      r = h->handle(mon, pending_fsmap, op, cmdmap, ss);
+      if (r == -EAGAIN) {
+        // message has been enqueued for retry; return.
+        dout(4) << __func__ << " enqueue for retry by management_command" << dendl;
+        return false;
+      } else {
+        // Successful or not, we're done: respond.
+        goto out;
+      }
+    }
+  }
+
   /* Execute filesystem add/remove, or pass through to filesystem_command */
   r = management_command(op, prefix, cmdmap, ss);
   if (r >= 0)
@@ -1103,6 +1368,7 @@ bool MDSMonitor::prepare_command(MonOpRequestRef op)
   
   if (r == -EAGAIN) {
     // message has been enqueued for retry; return.
+    dout(4) << __func__ << " enqueue for retry by management_command" << dendl;
     return false;
   } else if (r != -ENOSYS) {
     // MDSMonitor::management_command() returns -ENOSYS if it knows nothing
@@ -1113,18 +1379,36 @@ bool MDSMonitor::prepare_command(MonOpRequestRef op)
     goto out;
   }
 
-  if (!pending_mdsmap.get_enabled()) {
-    ss << "No filesystem configured: use `ceph fs new` to create a filesystem";
-    r = -ENOENT;
-  } else {
-    r = filesystem_command(op, prefix, cmdmap, ss);
-    if (r < 0 && r == -EAGAIN) {
-      // Do not reply, the message has been enqueued for retry
-      return false;
+  r = filesystem_command(op, prefix, cmdmap, ss);
+  if (r >= 0) {
+    goto out;
+  } else if (r == -EAGAIN) {
+    // Do not reply, the message has been enqueued for retry
+    dout(4) << __func__ << " enqueue for retry by filesystem_command" << dendl;
+    return false;
+  } else if (r != -ENOSYS) {
+    goto out;
+  }
+
+  // Only handle legacy commands if there is a filesystem configured
+  if (pending_fsmap.legacy_client_fscid == FS_CLUSTER_ID_NONE) {
+    if (pending_fsmap.filesystems.size() == 0) {
+      ss << "No filesystem configured: use `ceph fs new` to create a filesystem";
+    } else {
+      ss << "No filesystem set for use with legacy commands";
     }
+    r = -EINVAL;
+    goto out;
+  }
+
+  r = legacy_filesystem_command(op, prefix, cmdmap, ss);
+
+  if (r == -ENOSYS && ss.str().empty()) {
+    ss << "unrecognized command";
   }
 
 out:
+  dout(4) << __func__ << " done, r=" << r << dendl;
   /* Compose response */
   string rs;
   getline(ss, rs);
@@ -1195,6 +1479,42 @@ int MDSMonitor::_check_pool(
   return 0;
 }
 
+class FlagSetHandler : public FileSystemCommandHandler
+{
+  public:
+  FlagSetHandler()
+    : FileSystemCommandHandler("fs flag set")
+  {
+  }
+
+  int handle(
+      Monitor *mon,
+      FSMap &fsmap,
+      MonOpRequestRef op,
+      map<string, cmd_vartype> &cmdmap,
+      std::stringstream &ss)
+  {
+    string flag_name;
+    cmd_getval(g_ceph_context, cmdmap, "flag_name", flag_name);
+
+    string flag_val;
+    cmd_getval(g_ceph_context, cmdmap, "val", flag_val);
+
+    if (flag_name == "enable_multiple") {
+      bool flag_bool = false;
+      int r = parse_bool(flag_val, &flag_bool, ss);
+      if (r != 0) {
+        ss << "Invalid boolean value '" << flag_val << "'";
+        return r;
+      }
+      fsmap.set_enable_multiple(flag_bool);
+      return 0;
+    } else {
+      ss << "Unknown flag '" << flag_name << "'";
+      return -EINVAL;
+    }
+  }
+};
 
 /**
  * Handle a command for creating or removing a filesystem.
@@ -1211,56 +1531,13 @@ int MDSMonitor::management_command(
     std::stringstream &ss)
 {
   op->mark_mdsmon_event(__func__);
-  if (prefix == "mds newfs") {
-    /* Legacy `newfs` command, takes pool numbers instead of
-     * names, assumes fs name to be MDS_FS_NAME_DEFAULT, and
-     * can overwrite existing filesystem settings */
-    MDSMap newmap;
-    int64_t metadata, data;
-
-    if (!cmd_getval(g_ceph_context, cmdmap, "metadata", metadata)) {
-      ss << "error parsing 'metadata' value '"
-         << cmd_vartype_stringify(cmdmap["metadata"]) << "'";
-      return -EINVAL;
-    }
-    if (!cmd_getval(g_ceph_context, cmdmap, "data", data)) {
-      ss << "error parsing 'data' value '"
-         << cmd_vartype_stringify(cmdmap["data"]) << "'";
-      return -EINVAL;
-    }
-    int r = _check_pool(data, &ss);
-    if (r < 0) {
-      return r;
-    }
-
-    r = _check_pool(metadata, &ss);
-    if (r < 0) {
-      return r;
-    }
-
-    // be idempotent.. success if it already exists and matches
-    if (mdsmap.get_enabled() &&
-       mdsmap.get_metadata_pool() == metadata &&
-       mdsmap.get_first_data_pool() == data &&
-       mdsmap.fs_name == MDS_FS_NAME_DEFAULT) {
-      ss << "filesystem '" << MDS_FS_NAME_DEFAULT << "' already exists";
-      return 0;
-    }
 
-    string sure;
-    cmd_getval(g_ceph_context, cmdmap, "sure", sure);
-    if (pending_mdsmap.get_enabled() && sure != "--yes-i-really-mean-it") {
-      ss << "this is DANGEROUS and will wipe out the mdsmap's fs, and may clobber data in the new pools you specify.  add --yes-i-really-mean-it if you do.";
-      return -EPERM;
-    } else {
-      newmap.inc = pending_mdsmap.inc;
-      pending_mdsmap = newmap;
-      pending_mdsmap.epoch = mdsmap.epoch + 1;
-      create_new_fs(pending_mdsmap, MDS_FS_NAME_DEFAULT, metadata, data);
-      ss << "new fs with metadata pool " << metadata << " and data pool " << data;
-      return 0;
-    }
+  if (prefix == "mds newfs") {
+    // newfs is the legacy command that in single-filesystem times
+    // used to be equivalent to doing an "fs rm ; fs new".  We
+    // can't do this in a sane way in multi-filesystem world.
+    ss << "'newfs' no longer available.  Please use 'fs new'.";
+    return -EINVAL;
   } else if (prefix == "fs new") {
     string metadata_name;
     cmd_getval(g_ceph_context, cmdmap, "metadata", metadata_name);
@@ -1291,19 +1568,25 @@ int MDSMonitor::management_command(
         return -EINVAL;
     }
 
-    if (pending_mdsmap.get_enabled()
-        && pending_mdsmap.fs_name == fs_name
-        && *(pending_mdsmap.data_pools.begin()) == data
-        && pending_mdsmap.metadata_pool == metadata) {
-      // Identical FS created already, this is a no-op
-      ss << "filesystem '" << fs_name << "' already exists";
-      return 0;
+    if (pending_fsmap.any_filesystems()
+        && !pending_fsmap.get_enable_multiple()) {
+      ss << "Creation of multiple filesystems is disabled.  To enable "
+            "this experimental feature, use 'ceph fs flag set enable_multiple "
+            "true'";
+      return -EINVAL;
     }
 
-    if (pending_mdsmap.get_enabled()) {
-      /* We currently only support one filesystem, so cannot create a second */
-      ss << "A filesystem already exists, use `ceph fs rm` if you wish to delete it";
-      return -EINVAL;
+    if (pending_fsmap.get_filesystem(fs_name)) {
+      auto fs = pending_fsmap.get_filesystem(fs_name);
+      if (*(fs->mds_map.data_pools.begin()) == data
+          && fs->mds_map.metadata_pool == metadata) {
+        // Identical FS created already, this is a no-op
+        ss << "filesystem '" << fs_name << "' already exists";
+        return 0;
+      } else {
+        ss << "filesystem already exists with name '" << fs_name << "'";
+        return -EINVAL;
+      }
     }
 
     pg_pool_t const *data_pool = mon->osdmon()->osdmap.get_pg_pool(data);
@@ -1343,12 +1626,7 @@ int MDSMonitor::management_command(
     }
 
     // All checks passed, go ahead and create.
-    MDSMap newmap;
-    newmap.inc = pending_mdsmap.inc;
-    pending_mdsmap = newmap;
-    pending_mdsmap.epoch = mdsmap.epoch + 1;
-    pending_mdsmap.last_failure_osd_epoch = mdsmap.last_failure_osd_epoch;
-    create_new_fs(pending_mdsmap, fs_name, metadata, data);
+    create_new_fs(pending_fsmap, fs_name, metadata, data);
     ss << "new fs with metadata pool " << metadata << " and data pool " << data;
     return 0;
   } else if (prefix == "fs rm") {
@@ -1357,14 +1635,15 @@ int MDSMonitor::management_command(
     //  syntax should apply to multi-FS future)
     string fs_name;
     cmd_getval(g_ceph_context, cmdmap, "fs_name", fs_name);
-    if (!pending_mdsmap.get_enabled() || fs_name != pending_mdsmap.fs_name) {
+    auto fs = pending_fsmap.get_filesystem(fs_name);
+    if (fs == nullptr) {
         // Consider absence success to make deletes idempotent
         ss << "filesystem '" << fs_name << "' does not exist";
         return 0;
     }
 
     // Check that no MDS daemons are active
-    if (!pending_mdsmap.up.empty()) {
+    if (!fs->mds_map.up.empty()) {
       ss << "all MDS daemons must be inactive before removing filesystem";
       return -EINVAL;
     }
@@ -1378,27 +1657,31 @@ int MDSMonitor::management_command(
       return -EPERM;
     }
 
-    MDSMap newmap;
-    newmap.inc = pending_mdsmap.inc;
-    pending_mdsmap = newmap;
-    pending_mdsmap.epoch = mdsmap.epoch + 1;
-    assert(pending_mdsmap.get_enabled() == false);
-    pending_mdsmap.metadata_pool = -1;
-    pending_mdsmap.cas_pool = -1;
-    pending_mdsmap.created = ceph_clock_now(g_ceph_context);
+    if (pending_fsmap.legacy_client_fscid == fs->fscid) {
+      pending_fsmap.legacy_client_fscid = FS_CLUSTER_ID_NONE;
+    }
+
+    // There may be standby_replay daemons left here
+    for (const auto &i : fs->mds_map.mds_info) {
+      assert(i.second.state == MDSMap::STATE_STANDBY_REPLAY);
+      fail_mds_gid(i.first);
+    }
+
+    pending_fsmap.filesystems.erase(fs->fscid);
 
     return 0;
   } else if (prefix == "fs reset") {
     string fs_name;
     cmd_getval(g_ceph_context, cmdmap, "fs_name", fs_name);
-    if (!pending_mdsmap.get_enabled() || fs_name != pending_mdsmap.fs_name) {
+    auto fs = pending_fsmap.get_filesystem(fs_name);
+    if (fs == nullptr) {
         ss << "filesystem '" << fs_name << "' does not exist";
         // Unlike fs rm, we consider this case an error
         return -ENOENT;
     }
 
     // Check that no MDS daemons are active
-    if (!pending_mdsmap.up.empty()) {
+    if (!fs->mds_map.up.empty()) {
       ss << "all MDS daemons must be inactive before resetting filesystem: set the cluster_down flag"
             " and use `ceph mds fail` to make this so";
       return -EINVAL;
@@ -1413,32 +1696,35 @@ int MDSMonitor::management_command(
       return -EPERM;
     }
 
-    MDSMap newmap;
+    FSMap newmap;
+
+    auto new_fs = std::make_shared<Filesystem>();
 
     // Populate rank 0 as existing (so don't go into CREATING)
     // but failed (so that next available MDS is assigned the rank)
-    newmap.in.insert(mds_rank_t(0));
-    newmap.failed.insert(mds_rank_t(0));
+    new_fs->mds_map.in.insert(mds_rank_t(0));
+    new_fs->mds_map.failed.insert(mds_rank_t(0));
 
     // Carry forward what makes sense
-    newmap.data_pools = mdsmap.data_pools;
-    newmap.metadata_pool = mdsmap.metadata_pool;
-    newmap.cas_pool = mdsmap.cas_pool;
-    newmap.fs_name = mdsmap.fs_name;
-    newmap.created = ceph_clock_now(g_ceph_context);
-    newmap.epoch = mdsmap.epoch + 1;
-    newmap.inc = mdsmap.inc;
-    newmap.enabled = mdsmap.enabled;
-    newmap.inline_data_enabled = mdsmap.inline_data_enabled;
-    newmap.compat = get_mdsmap_compat_set_default();
-    newmap.session_timeout = g_conf->mds_session_timeout;
-    newmap.session_autoclose = g_conf->mds_session_autoclose;
-    newmap.max_file_size = g_conf->mds_max_file_size;
-
-    // Persist the new MDSMap
-    pending_mdsmap = newmap;
+    new_fs->fscid = fs->fscid;
+    new_fs->mds_map.inc = fs->mds_map.inc;
+    new_fs->mds_map.inline_data_enabled = fs->mds_map.inline_data_enabled;
+    new_fs->mds_map.max_mds = g_conf->max_mds;
+    new_fs->mds_map.data_pools = fs->mds_map.data_pools;
+    new_fs->mds_map.metadata_pool = fs->mds_map.metadata_pool;
+    new_fs->mds_map.cas_pool = fs->mds_map.cas_pool;
+    new_fs->mds_map.fs_name = fs->mds_map.fs_name;
+    new_fs->mds_map.max_file_size = g_conf->mds_max_file_size;
+    new_fs->mds_map.compat = fsmap.compat;
+    new_fs->mds_map.created = ceph_clock_now(g_ceph_context);
+    new_fs->mds_map.modified = ceph_clock_now(g_ceph_context);
+    new_fs->mds_map.session_timeout = g_conf->mds_session_timeout;
+    new_fs->mds_map.session_autoclose = g_conf->mds_session_autoclose;
+    new_fs->mds_map.enabled = true;
+
+    // Persist the new FSMap
+    pending_fsmap.filesystems[new_fs->fscid] = new_fs;
     return 0;
-
   } else {
     return -ENOSYS;
   }
@@ -1446,70 +1732,56 @@ int MDSMonitor::management_command(
 
 
 /**
- * Handle a command that affects the filesystem (i.e. a filesystem
- * must exist for the command to act upon).
+ * Given one of the following forms:
+ *   <fs name>:<rank>
+ *   <fs id>:<rank>
+ *   <rank>
  *
- * @retval 0        Command was successfully handled and has side effects
- * @retval -EAGAIN  Messages has been requeued for retry
- * @retval -ENOSYS  Unknown command
- * @retval < 0      An error has occurred; **ss** may have been set.
+ * Parse into a mds_role_t.  The rank-only form is only valid
+ * if legacy_client_ns is set.
  */
-int MDSMonitor::filesystem_command(
-    MonOpRequestRef op,
-    std::string const &prefix,
-    map<string, cmd_vartype> &cmdmap,
-    std::stringstream &ss)
+int MDSMonitor::parse_role(
+    const std::string &role_str,
+    mds_role_t *role,
+    std::ostream &ss)
 {
-  op->mark_mdsmon_event(__func__);
-  MMonCommand *m = static_cast<MMonCommand*>(op->get_req());
-  int r = 0;
-  string whostr;
-  cmd_getval(g_ceph_context, cmdmap, "who", whostr);
-  if (prefix == "mds stop" ||
-      prefix == "mds deactivate") {
+  const FSMap *relevant_fsmap = &fsmap;
+  if (mon->is_leader()) {
+    relevant_fsmap = &pending_fsmap;
+  }
+  return relevant_fsmap->parse_role(role_str, role, ss);
+}
 
-    int who_i = parse_pos_long(whostr.c_str(), &ss);
-    if (who_i < 0) {
+
+class SetHandler : public FileSystemCommandHandler
+{
+public:
+  SetHandler()
+    : FileSystemCommandHandler("fs set")
+  {}
+
+  virtual int handle(
+      Monitor *mon,
+      FSMap &fsmap,
+      MonOpRequestRef op,
+      map<string, cmd_vartype> &cmdmap,
+      std::stringstream &ss)
+  {
+    std::string fs_name;
+    if (!cmd_getval(g_ceph_context, cmdmap, "fs_name", fs_name) || fs_name.empty()) {
+      ss << "Missing filesystem name";
       return -EINVAL;
     }
-    mds_rank_t who = mds_rank_t(who_i);
-    if (!pending_mdsmap.is_active(who)) {
-      r = -EEXIST;
-      ss << "mds." << who << " not active (" 
-        << ceph_mds_state_name(pending_mdsmap.get_state(who)) << ")";
-    } else if (pending_mdsmap.get_root() == who ||
-               pending_mdsmap.get_tableserver() == who) {
-      r = -EINVAL;
-      ss << "can't tell the root (" << pending_mdsmap.get_root()
-        << ") or tableserver (" << pending_mdsmap.get_tableserver()
-        << ") to deactivate";
-    } else if (pending_mdsmap.get_num_in_mds() <= size_t(pending_mdsmap.get_max_mds())) {
-      r = -EBUSY;
-      ss << "must decrease max_mds or else MDS will immediately reactivate";
-    } else {
-      r = 0;
-      mds_gid_t gid = pending_mdsmap.up[who];
-      ss << "telling mds." << who << " " << pending_mdsmap.mds_info[gid].addr << " to deactivate";
-      pending_mdsmap.mds_info[gid].state = MDSMap::STATE_STOPPING;
+
+    auto fs = fsmap.get_filesystem(fs_name);
+    if (fs == nullptr) {
+      ss << "Not found: '" << fs_name << "'";
+      return -ENOENT;
     }
 
-  } else if (prefix == "mds set_max_mds") {
-    // NOTE: see also "mds set max_mds", which can modify the same field.
-    int64_t maxmds;
-    if (!cmd_getval(g_ceph_context, cmdmap, "maxmds", maxmds) || maxmds < 0) {
-      return -EINVAL;
-    }
-    if (maxmds > MAX_MDS) {
-      ss << "may not have more than " << MAX_MDS << " MDS ranks";
-      return -EINVAL;
-    }
-    pending_mdsmap.max_mds = maxmds;
-    r = 0;
-    ss << "max_mds = " << pending_mdsmap.max_mds;
-  } else if (prefix == "mds set") {
-    string var;
-    if (!cmd_getval(g_ceph_context, cmdmap, "var", var) || var.empty()) {
-      ss << "Invalid variable";
+    string var;
+    if (!cmd_getval(g_ceph_context, cmdmap, "var", var) || var.empty()) {
+      ss << "Invalid variable";
       return -EINVAL;
     }
     string val;
@@ -1529,9 +1801,20 @@ int MDSMonitor::filesystem_command(
         ss << "may not have more than " << MAX_MDS << " MDS ranks";
         return -EINVAL;
       }
-      pending_mdsmap.max_mds = n;
+      fsmap.modify_filesystem(
+          fs->fscid,
+          [n](std::shared_ptr<Filesystem> fs)
+      {
+        fs->mds_map.set_max_mds(n);
+      });
     } else if (var == "inline_data") {
-      if (val == "true" || val == "yes" || (!interr.length() && n == 1)) {
+      bool enable_inline = false;
+      int r = parse_bool(val, &enable_inline, ss);
+      if (r != 0) {
+        return r;
+      }
+
+      if (enable_inline) {
        string confirm;
        if (!cmd_getval(g_ceph_context, cmdmap, "confirm", confirm) ||
            confirm != "--yes-i-really-mean-it") {
@@ -1539,15 +1822,26 @@ int MDSMonitor::filesystem_command(
          return -EPERM;
        }
        ss << "inline data enabled";
-       pending_mdsmap.set_inline_data_enabled(true);
-       pending_mdsmap.compat.incompat.insert(MDS_FEATURE_INCOMPAT_INLINE);
-      } else if (val == "false" || val == "no" ||
-                (!interr.length() && n == 0)) {
-       ss << "inline data disabled";
-       pending_mdsmap.set_inline_data_enabled(false);
+
+        fsmap.modify_filesystem(
+            fs->fscid,
+            [](std::shared_ptr<Filesystem> fs)
+        {
+          fs->mds_map.set_inline_data_enabled(true);
+        });
+
+        // Update `compat`
+        CompatSet c = fsmap.get_compat();
+        c.incompat.insert(MDS_FEATURE_INCOMPAT_INLINE);
+        fsmap.update_compat(c);
       } else {
-       ss << "value must be false|no|0 or true|yes|1";
-       return -EINVAL;
+       ss << "inline data disabled";
+        fsmap.modify_filesystem(
+            fs->fscid,
+            [](std::shared_ptr<Filesystem> fs)
+        {
+          fs->mds_map.set_inline_data_enabled(false);
+        });
       }
     } else if (var == "max_file_size") {
       if (interr.length()) {
@@ -1558,39 +1852,299 @@ int MDSMonitor::filesystem_command(
        ss << var << " must at least " << CEPH_MIN_STRIPE_UNIT;
        return -ERANGE;
       }
-      pending_mdsmap.max_file_size = n;
+      fsmap.modify_filesystem(
+          fs->fscid,
+          [n](std::shared_ptr<Filesystem> fs)
+      {
+        fs->mds_map.set_max_filesize(n);
+      });
     } else if (var == "allow_new_snaps") {
-      if (val == "false" || val == "no" || (interr.length() == 0 && n == 0)) {
-       pending_mdsmap.clear_snaps_allowed();
+      bool enable_snaps = false;
+      int r = parse_bool(val, &enable_snaps, ss);
+      if (r != 0) {
+        return r;
+      }
+
+      if (!enable_snaps) {
+        fsmap.modify_filesystem(
+            fs->fscid,
+            [](std::shared_ptr<Filesystem> fs)
+        {
+          fs->mds_map.clear_snaps_allowed();
+        });
        ss << "disabled new snapshots";
-      } else if (val == "true" || val == "yes" || (interr.length() == 0 && n == 1)) {
+      } else {
        string confirm;
        if (!cmd_getval(g_ceph_context, cmdmap, "confirm", confirm) ||
            confirm != "--yes-i-really-mean-it") {
          ss << "Snapshots are unstable and will probably break your FS! Set to --yes-i-really-mean-it if you are sure you want to enable them";
          return -EPERM;
        }
-       pending_mdsmap.set_snaps_allowed();
+        fsmap.modify_filesystem(
+            fs->fscid,
+            [](std::shared_ptr<Filesystem> fs)
+        {
+          fs->mds_map.set_snaps_allowed();
+        });
        ss << "enabled new snapshots";
-      } else {
-       ss << "value must be true|yes|1 or false|no|0";
-       return -EINVAL;
       }
+    } else if (var == "cluster_down") {
+      bool is_down = false;
+      int r = parse_bool(val, &is_down, ss);
+      if (r != 0) {
+        return r;
+      }
+
+      fsmap.modify_filesystem(
+          fs->fscid,
+          [is_down](std::shared_ptr<Filesystem> fs)
+      {
+        if (is_down) {
+          fs->mds_map.set_flag(CEPH_MDSMAP_DOWN);
+        } else {
+          fs->mds_map.clear_flag(CEPH_MDSMAP_DOWN);
+        }
+      });
+
+      ss << "marked " << (is_down ? "down" : "up");
     } else {
       ss << "unknown variable " << var;
       return -EINVAL;
     }
-    r = 0;
+
+    return 0;
+  }
+};
+
+class AddDataPoolHandler : public FileSystemCommandHandler
+{
+  public:
+  AddDataPoolHandler()
+    : FileSystemCommandHandler("fs add_data_pool")
+  {}
+
+  int handle(
+      Monitor *mon,
+      FSMap &fsmap,
+      MonOpRequestRef op,
+      map<string, cmd_vartype> &cmdmap,
+      std::stringstream &ss)
+  {
+    string poolname;
+    cmd_getval(g_ceph_context, cmdmap, "pool", poolname);
+
+    std::string fs_name;
+    if (!cmd_getval(g_ceph_context, cmdmap, "fs_name", fs_name)
+        || fs_name.empty()) {
+      ss << "Missing filesystem name";
+      return -EINVAL;
+    }
+
+    auto fs = fsmap.get_filesystem(fs_name);
+    if (fs == nullptr) {
+      ss << "Not found: '" << fs_name << "'";
+      return -ENOENT;
+    }
+
+    int64_t poolid = mon->osdmon()->osdmap.lookup_pg_pool_name(poolname);
+    if (poolid < 0) {
+      string err;
+      poolid = strict_strtol(poolname.c_str(), 10, &err);
+      if (err.length()) {
+       ss << "pool '" << poolname << "' does not exist";
+       return -ENOENT;
+      }
+    }
+
+    int r = _check_pool(mon->osdmon()->osdmap, poolid, &ss);
+    if (r != 0) {
+      return r;
+    }
+
+    fsmap.modify_filesystem(
+        fs->fscid,
+        [poolid](std::shared_ptr<Filesystem> fs)
+    {
+      fs->mds_map.add_data_pool(poolid);
+    });
+
+    ss << "added data pool " << poolid << " to fsmap";
+
+    return 0;
+  }
+};
+
+
+class RemoveDataPoolHandler : public FileSystemCommandHandler
+{
+  public:
+  RemoveDataPoolHandler()
+    : FileSystemCommandHandler("fs remove_data_pool")
+  {}
+
+  int handle(
+      Monitor *mon,
+      FSMap &fsmap,
+      MonOpRequestRef op,
+      map<string, cmd_vartype> &cmdmap,
+      std::stringstream &ss)
+  {
+    string poolname;
+    cmd_getval(g_ceph_context, cmdmap, "pool", poolname);
+
+    std::string fs_name;
+    if (!cmd_getval(g_ceph_context, cmdmap, "fs_name", fs_name)
+        || fs_name.empty()) {
+      ss << "Missing filesystem name";
+      return -EINVAL;
+    }
+
+    auto fs = fsmap.get_filesystem(fs_name);
+    if (fs == nullptr) {
+      ss << "Not found: '" << fs_name << "'";
+      return -ENOENT;
+    }
+
+    int64_t poolid = mon->osdmon()->osdmap.lookup_pg_pool_name(poolname);
+    if (poolid < 0) {
+      string err;
+      poolid = strict_strtol(poolname.c_str(), 10, &err);
+      if (err.length()) {
+       poolid = -1;
+       ss << "pool '" << poolname << "' does not exist";
+        return -ENOENT;
+      } else if (poolid < 0) {
+        ss << "invalid pool id '" << poolid << "'";
+        return -EINVAL;
+      }
+    }
+
+    assert(poolid >= 0);  // Checked by parsing code above
+
+    if (fs->mds_map.get_first_data_pool() == poolid) {
+      poolid = -1;
+      ss << "cannot remove default data pool";
+      return -EINVAL;
+    }
+
+
+    int r = 0;
+    fsmap.modify_filesystem(fs->fscid,
+        [&r, poolid](std::shared_ptr<Filesystem> fs)
+    {
+      r = fs->mds_map.remove_data_pool(poolid);
+    });
+    if (r == -ENOENT) {
+      // It was already removed, succeed in silence
+      return 0;
+    } else if (r == 0) {
+      // We removed it, succeed
+      ss << "removed data pool " << poolid << " from fsmap";
+      return 0;
+    } else {
+      // Unexpected error, bubble up
+      return r;
+    }
+  }
+};
+
+
+/**
+ * For commands that refer to a particular filesystem,
+ * enable wrapping to implement the legacy version of
+ * the command (like "mds add_data_pool" vs "fs add_data_pool")
+ *
+ * The wrapped handler must expect a fs_name argument in
+ * its command map.
+ */
+template<typename T>
+class LegacyHandler : public T
+{
+  std::string legacy_prefix;
+
+  public:
+  LegacyHandler(const std::string &new_prefix)
+    : T()
+  {
+    legacy_prefix = new_prefix;
+  }
+
+  virtual std::string const &get_prefix() {return legacy_prefix;}
+
+  int handle(
+      Monitor *mon,
+      FSMap &fsmap,
+      MonOpRequestRef op,
+      map<string, cmd_vartype> &cmdmap,
+      std::stringstream &ss)
+  {
+    auto fs = fsmap.get_legacy_filesystem();
+    if (fs == nullptr) {
+      ss << "No filesystem configured";
+      return -ENOENT;
+    }
+    std::map<string, cmd_vartype> modified = cmdmap;
+    modified["fs_name"] = fs->mds_map.get_fs_name();
+    return T::handle(mon, fsmap, op, modified, ss);
+  }
+};
+
+int MDSMonitor::filesystem_command(
+    MonOpRequestRef op,
+    std::string const &prefix,
+    map<string, cmd_vartype> &cmdmap,
+    std::stringstream &ss)
+{
+  dout(4) << __func__ << " prefix='" << prefix << "'" << dendl;
+  op->mark_mdsmon_event(__func__);
+  MMonCommand *m = static_cast<MMonCommand*>(op->get_req());
+  int r = 0;
+  string whostr;
+  cmd_getval(g_ceph_context, cmdmap, "who", whostr);
+
+  if (prefix == "mds stop" ||
+      prefix == "mds deactivate") {
+
+    mds_role_t role;
+    r = parse_role(whostr, &role, ss);
+    if (r < 0 ) {
+      return r;
+    }
+    auto fs = pending_fsmap.get_filesystem(role.fscid);
+
+    if (!fs->mds_map.is_active(role.rank)) {
+      r = -EEXIST;
+      ss << "mds." << role << " not active (" 
+        << ceph_mds_state_name(fs->mds_map.get_state(role.rank)) << ")";
+    } else if (fs->mds_map.get_root() == role.rank ||
+               fs->mds_map.get_tableserver() == role.rank) {
+      r = -EINVAL;
+      ss << "can't tell the root (" << fs->mds_map.get_root()
+        << ") or tableserver (" << fs->mds_map.get_tableserver()
+        << ") to deactivate";
+    } else if (fs->mds_map.get_num_in_mds() <= size_t(fs->mds_map.get_max_mds())) {
+      r = -EBUSY;
+      ss << "must decrease max_mds or else MDS will immediately reactivate";
+    } else {
+      r = 0;
+      mds_gid_t gid = fs->mds_map.up.at(role.rank);
+      ss << "telling mds." << role << " "
+         << pending_fsmap.get_info_gid(gid).addr << " to deactivate";
+
+      pending_fsmap.modify_daemon(gid, [](MDSMap::mds_info_t *info) {
+        info->state = MDSMap::STATE_STOPPING;
+      });
+    }
   } else if (prefix == "mds setmap") {
     string confirm;
     if (!cmd_getval(g_ceph_context, cmdmap, "confirm", confirm) ||
-       confirm != "--yes-i-really-mean-it") {
+       confirm != "--yes-i-really-mean-it") {
       ss << "WARNING: this can make your filesystem inaccessible! "
-           "Add --yes-i-really-mean-it if you are sure you wish to continue.";
+           "Add --yes-i-really-mean-it if you are sure you wish to continue.";
       return -EINVAL;;
     }
-
-    MDSMap map;
+    
+    FSMap map;
     try {
       map.decode(m->get_data());
     } catch(buffer::error &e) {
@@ -1602,15 +2156,14 @@ int MDSMonitor::filesystem_command(
     if (cmd_getval(g_ceph_context, cmdmap, "epoch", epochnum))
       e = epochnum;
 
-    if (pending_mdsmap.epoch == e) {
-      map.epoch = pending_mdsmap.epoch;  // make sure epoch is correct
-      pending_mdsmap = map;
+    if (pending_fsmap.epoch == e) {
+      map.epoch = pending_fsmap.epoch;  // make sure epoch is correct
+      pending_fsmap = map;
       ss << "set mds map";
     } else {
-      ss << "next mdsmap epoch " << pending_mdsmap.epoch << " != " << e;
+      ss << "next fsmap epoch " << pending_fsmap.epoch << " != " << e;
       return -EINVAL;
     }
-
   } else if (prefix == "mds set_state") {
     mds_gid_t gid;
     if (!cmd_getval(g_ceph_context, cmdmap, "gid", gid)) {
@@ -1624,14 +2177,15 @@ int MDSMonitor::filesystem_command(
          << cmd_vartype_stringify(cmdmap["state"]) << "'";
       return -EINVAL;
     }
-    if (!pending_mdsmap.is_dne_gid(gid)) {
-      MDSMap::mds_info_t& info = pending_mdsmap.get_info_gid(gid);
-      info.state = state;
+    if (pending_fsmap.gid_exists(gid)) {
+      pending_fsmap.modify_daemon(gid, [state](MDSMap::mds_info_t *info) {
+        info->state = state;
+      });
       stringstream ss;
-      ss << "set mds gid " << gid << " to state " << state << " " << ceph_mds_state_name(state);
+      ss << "set mds gid " << gid << " to state " << state << " "
+         << ceph_mds_state_name(state);
       return 0;
     }
-
   } else if (prefix == "mds fail") {
     string who;
     cmd_getval(g_ceph_context, cmdmap, "who", who);
@@ -1640,18 +2194,6 @@ int MDSMonitor::filesystem_command(
       mon->osdmon()->wait_for_writeable(op, new C_RetryMessage(this, op));
       return -EAGAIN; // don't propose yet; wait for message to be retried
     }
-
-  } else if (prefix == "mds repaired") {
-    mds_rank_t rank;
-    cmd_getval(g_ceph_context, cmdmap, "rank", rank);
-    if (pending_mdsmap.damaged.count(rank)) {
-      dout(4) << "repaired: restoring rank " << rank << dendl;
-      pending_mdsmap.damaged.erase(rank);
-      pending_mdsmap.failed.insert(rank);
-    } else {
-      dout(4) << "repaired: no-op on rank " << rank << dendl;
-    }
-    r = 0;
   } else if (prefix == "mds rm") {
     mds_gid_t gid;
     if (!cmd_getval(g_ceph_context, cmdmap, "gid", gid)) {
@@ -1659,54 +2201,47 @@ int MDSMonitor::filesystem_command(
          << cmd_vartype_stringify(cmdmap["gid"]) << "'";
       return -EINVAL;
     }
-    int state = pending_mdsmap.get_state_gid(gid);
-    if (state == 0) {
+    if (!pending_fsmap.gid_exists(gid)) {
       ss << "mds gid " << gid << " dne";
       r = 0;
-    } else if (state > 0) {
-      ss << "cannot remove active mds." << pending_mdsmap.get_info_gid(gid).name
-        << " rank " << pending_mdsmap.get_info_gid(gid).rank;
-      return -EBUSY;
     } else {
-      pending_mdsmap.mds_info.erase(gid);
-      stringstream ss;
-      ss << "removed mds gid " << gid;
-      return 0;
+      MDSMap::DaemonState state = pending_fsmap.get_info_gid(gid).state;
+      if (state > 0) {
+        ss << "cannot remove active mds." << pending_fsmap.get_info_gid(gid).name
+           << " rank " << pending_fsmap.get_info_gid(gid).rank;
+        return -EBUSY;
+      } else {
+        pending_fsmap.erase(gid, {});
+        stringstream ss;
+        ss << "removed mds gid " << gid;
+        return 0;
+      }
     }
   } else if (prefix == "mds rmfailed") {
-    mds_rank_t who;
     string confirm;
     if (!cmd_getval(g_ceph_context, cmdmap, "confirm", confirm) ||
-       confirm != "--yes-i-really-mean-it") {
-         ss << "WARNING: this can make your filesystem inaccessible! "
-               "Add --yes-i-really-mean-it if you are sure you wish to continue.";
-         return -EPERM;
+       confirm != "--yes-i-really-mean-it") {
+         ss << "WARNING: this can make your filesystem inaccessible! "
+               "Add --yes-i-really-mean-it if you are sure you wish to continue.";
+         return -EPERM;
     }
-    if (!cmd_getval(g_ceph_context, cmdmap, "who", who)) {
-      ss << "error parsing 'who' value '"
-         << cmd_vartype_stringify(cmdmap["who"]) << "'";
+    
+    std::string role_str;
+    cmd_getval(g_ceph_context, cmdmap, "who", role_str);
+    mds_role_t role;
+    int r = parse_role(role_str, &role, ss);
+    if (r < 0) {
+      ss << "invalid role '" << role_str << "'";
       return -EINVAL;
     }
-    pending_mdsmap.failed.erase(who);
+
+    std::shared_ptr<Filesystem> fs = pending_fsmap.get_filesystem(role.fscid);
+    assert(fs != nullptr);
+
+    fs->mds_map.failed.erase(role.rank);
     stringstream ss;
-    ss << "removed failed mds." << who;
+    ss << "removed failed mds." << role;
     return 0;
-  } else if (prefix == "mds cluster_down") {
-    if (pending_mdsmap.test_flag(CEPH_MDSMAP_DOWN)) {
-      ss << "mdsmap already marked DOWN";
-    } else {
-      pending_mdsmap.set_flag(CEPH_MDSMAP_DOWN);
-      ss << "marked mdsmap DOWN";
-    }
-    r = 0;
-  } else if (prefix == "mds cluster_up") {
-    if (pending_mdsmap.test_flag(CEPH_MDSMAP_DOWN)) {
-      pending_mdsmap.clear_flag(CEPH_MDSMAP_DOWN);
-      ss << "unmarked mdsmap DOWN";
-    } else {
-      ss << "mdsmap not marked DOWN";
-    }
-    r = 0;
   } else if (prefix == "mds compat rm_compat") {
     int64_t f;
     if (!cmd_getval(g_ceph_context, cmdmap, "feature", f)) {
@@ -1714,11 +2249,13 @@ int MDSMonitor::filesystem_command(
          << cmd_vartype_stringify(cmdmap["feature"]) << "'";
       return -EINVAL;
     }
-    if (pending_mdsmap.compat.compat.contains(f)) {
+    if (pending_fsmap.compat.compat.contains(f)) {
       ss << "removing compat feature " << f;
-      pending_mdsmap.compat.compat.remove(f);
+      CompatSet modified = pending_fsmap.compat;
+      modified.compat.remove(f);
+      pending_fsmap.update_compat(modified);
     } else {
-      ss << "compat feature " << f << " not present in " << pending_mdsmap.compat;
+      ss << "compat feature " << f << " not present in " << pending_fsmap.compat;
     }
     r = 0;
   } else if (prefix == "mds compat rm_incompat") {
@@ -1728,63 +2265,114 @@ int MDSMonitor::filesystem_command(
          << cmd_vartype_stringify(cmdmap["feature"]) << "'";
       return -EINVAL;
     }
-    if (pending_mdsmap.compat.incompat.contains(f)) {
+    if (pending_fsmap.compat.incompat.contains(f)) {
       ss << "removing incompat feature " << f;
-      pending_mdsmap.compat.incompat.remove(f);
+      CompatSet modified = pending_fsmap.compat;
+      modified.incompat.remove(f);
+      pending_fsmap.update_compat(modified);
     } else {
-      ss << "incompat feature " << f << " not present in " << pending_mdsmap.compat;
+      ss << "incompat feature " << f << " not present in " << pending_fsmap.compat;
     }
     r = 0;
-
-  } else if (prefix == "mds add_data_pool") {
-    string poolname;
-    cmd_getval(g_ceph_context, cmdmap, "pool", poolname);
-    int64_t poolid = mon->osdmon()->osdmap.lookup_pg_pool_name(poolname);
-    if (poolid < 0) {
-      string err;
-      poolid = strict_strtol(poolname.c_str(), 10, &err);
-      if (err.length()) {
-       ss << "pool '" << poolname << "' does not exist";
-       return -ENOENT;
-      }
-    }
-
-    r = _check_pool(poolid, &ss);
-    if (r != 0) {
+  } else if (prefix == "mds repaired") {
+    std::string role_str;
+    cmd_getval(g_ceph_context, cmdmap, "rank", role_str);
+    mds_role_t role;
+    r = parse_role(role_str, &role, ss);
+    if (r < 0) {
       return r;
     }
 
-    pending_mdsmap.add_data_pool(poolid);
-    ss << "added data pool " << poolid << " to mdsmap";
-  } else if (prefix == "mds remove_data_pool") {
-    string poolname;
-    cmd_getval(g_ceph_context, cmdmap, "pool", poolname);
-    int64_t poolid = mon->osdmon()->osdmap.lookup_pg_pool_name(poolname);
-    if (poolid < 0) {
-      string err;
-      poolid = strict_strtol(poolname.c_str(), 10, &err);
-      if (err.length()) {
-       r = -ENOENT;
-       poolid = -1;
-       ss << "pool '" << poolname << "' does not exist";
-      }
+    bool modified = pending_fsmap.undamaged(role.fscid, role.rank);
+    if (modified) {
+      dout(4) << "repaired: restoring rank " << role << dendl;
+    } else {
+      dout(4) << "repaired: no-op on rank " << role << dendl;
     }
 
-    if (pending_mdsmap.get_first_data_pool() == poolid) {
-      r = -EINVAL;
-      poolid = -1;
-      ss << "cannot remove default data pool";
-    }
+    r = 0;
+  } else {
+    return -ENOSYS;
+  }
 
-    if (poolid >= 0) {
-      r = pending_mdsmap.remove_data_pool(poolid);
-      if (r == -ENOENT)
-       r = 0;
-      if (r == 0)
-       ss << "removed data pool " << poolid << " from mdsmap";
+  return r;
+}
+
+/**
+ * Helper to legacy_filesystem_command
+ */
+void MDSMonitor::modify_legacy_filesystem(
+    std::function<void(std::shared_ptr<Filesystem> )> fn)
+{
+  pending_fsmap.modify_filesystem(
+    pending_fsmap.legacy_client_fscid,
+    fn
+  );
+}
+
+
+
+/**
+ * Handle a command that affects the filesystem (i.e. a filesystem
+ * must exist for the command to act upon).
+ *
+ * @retval 0        Command was successfully handled and has side effects
+ * @retval -EAGAIN  Messages has been requeued for retry
+ * @retval -ENOSYS  Unknown command
+ * @retval < 0      An error has occurred; **ss** may have been set.
+ */
+int MDSMonitor::legacy_filesystem_command(
+    MonOpRequestRef op,
+    std::string const &prefix,
+    map<string, cmd_vartype> &cmdmap,
+    std::stringstream &ss)
+{
+  dout(4) << __func__ << " prefix='" << prefix << "'" << dendl;
+  op->mark_mdsmon_event(__func__);
+  int r = 0;
+  string whostr;
+  cmd_getval(g_ceph_context, cmdmap, "who", whostr);
+
+  assert (pending_fsmap.legacy_client_fscid != FS_CLUSTER_ID_NONE);
+
+  if (prefix == "mds set_max_mds") {
+    // NOTE: deprecated by "fs set max_mds"
+    int64_t maxmds;
+    if (!cmd_getval(g_ceph_context, cmdmap, "maxmds", maxmds) || maxmds < 0) {
+      return -EINVAL;
     }
+    if (maxmds > MAX_MDS) {
+      ss << "may not have more than " << MAX_MDS << " MDS ranks";
+      return -EINVAL;
+    }
+
+    modify_legacy_filesystem(
+        [maxmds](std::shared_ptr<Filesystem> fs)
+    {
+      fs->mds_map.set_max_mds(maxmds);
+    });
+
+    r = 0;
+    ss << "max_mds = " << maxmds;
+  } else if (prefix == "mds cluster_down") {
+    // NOTE: deprecated by "fs set cluster_down"
+    modify_legacy_filesystem(
+        [](std::shared_ptr<Filesystem> fs)
+    {
+      fs->mds_map.set_flag(CEPH_MDSMAP_DOWN);
+    });
+    ss << "marked fsmap DOWN";
+    r = 0;
+  } else if (prefix == "mds cluster_up") {
+    // NOTE: deprecated by "fs set cluster_up"
+    modify_legacy_filesystem(
+        [](std::shared_ptr<Filesystem> fs)
+    {
+      fs->mds_map.clear_flag(CEPH_MDSMAP_DOWN);
+    });
+    ss << "unmarked fsmap DOWN";
+    r = 0;
   } else {
-    ss << "unrecognized command";
     return -ENOSYS;
   }
 
@@ -1794,28 +2382,142 @@ int MDSMonitor::filesystem_command(
 
 void MDSMonitor::check_subs()
 {
-  string type = "mdsmap";
-  if (mon->session_map.subs.count(type) == 0)
-    return;
-  xlist<Subscription*>::iterator p = mon->session_map.subs[type]->begin();
-  while (!p.end()) {
-    Subscription *sub = *p;
-    ++p;
-    check_sub(sub);
+  std::list<std::string> types;
+
+  // Subscriptions may be to "fsmap" (MDS and legacy clients),
+  // "fsmap.<namespace>", or to "fsmap" for the full state of all
+  // filesystems.  Build a list of all the types we service
+  // subscriptions for.
+  types.push_back("mdsmap");
+  types.push_back("fsmap");
+  for (const auto &i : fsmap.filesystems) {
+    auto fscid = i.first;
+    std::ostringstream oss;
+    oss << "fsmap." << fscid;
+    types.push_back(oss.str());
+  }
+
+  for (const auto &type : types) {
+    if (mon->session_map.subs.count(type) == 0)
+      return;
+    xlist<Subscription*>::iterator p = mon->session_map.subs[type]->begin();
+    while (!p.end()) {
+      Subscription *sub = *p;
+      ++p;
+      check_sub(sub);
+    }
   }
 }
 
+
 void MDSMonitor::check_sub(Subscription *sub)
 {
-  if (sub->next <= mdsmap.get_epoch()) {
-    sub->session->con->send_message(new MMDSMap(mon->monmap->fsid, &mdsmap));
-    if (sub->onetime)
+  dout(20) << __func__ << ": " << sub->type << dendl;
+
+  if (sub->type == "fsmap") {
+    if (sub->next <= fsmap.get_epoch()) {
+      sub->session->con->send_message(new MFSMap(mon->monmap->fsid, &fsmap));
+      if (sub->onetime) {
+        mon->session_map.remove_sub(sub);
+      } else {
+        sub->next = fsmap.get_epoch() + 1;
+      }
+    }
+  } else {
+    if (sub->next > fsmap.get_epoch()) {
+      return;
+    }
+
+    const bool is_mds = sub->session->inst.name.is_mds();
+    mds_gid_t mds_gid = MDS_GID_NONE;
+    fs_cluster_id_t fscid = FS_CLUSTER_ID_NONE;
+    if (is_mds) {
+      // What (if any) namespace are you assigned to?
+      auto mds_info = fsmap.get_mds_info();
+      for (const auto &i : mds_info) {
+        if (i.second.addr == sub->session->inst.addr) {
+          mds_gid = i.first;
+          fscid = fsmap.mds_roles.at(mds_gid);
+        }
+      }
+    } else {
+      // You're a client.  Did you request a particular
+      // namespace?
+      if (sub->type.find("mdsmap.") == 0) {
+        auto namespace_id_str = sub->type.substr(std::string("mdsmap.").size());
+        dout(10) << __func__ << ": namespace_id " << namespace_id_str << dendl;
+        std::string err;
+        fscid = strict_strtoll(namespace_id_str.c_str(), 10, &err);
+        if (!err.empty()) {
+          // Client asked for a non-existent namespace, send them nothing
+          dout(1) << "Invalid client subscription '" << sub->type
+                  << "'" << dendl;
+          return;
+        }
+        if (fsmap.filesystems.count(fscid) == 0) {
+          // Client asked for a non-existent namespace, send them nothing
+          // TODO: something more graceful for when a client has a filesystem
+          // mounted, and the fileysstem is deleted.  Add a "shut down you fool"
+          // flag to MMDSMap?
+          dout(1) << "Client subscribed to non-existent namespace '" <<
+                  fscid << "'" << dendl;
+          return;
+        }
+      } else {
+        // Unqualified request for "mdsmap": give it the one marked
+        // for use by legacy clients.
+        if (fsmap.legacy_client_fscid != FS_CLUSTER_ID_NONE) {
+          fscid = fsmap.legacy_client_fscid;
+        } else {
+          dout(1) << "Client subscribed for legacy filesystem but "
+                     "none is configured" << dendl;
+          return;
+        }
+      }
+    }
+    dout(10) << __func__ << ": is_mds=" << is_mds << ", fscid= " << fscid << dendl;
+
+    // Work out the effective latest epoch
+    MDSMap *mds_map = nullptr;
+    MDSMap null_map;
+    null_map.compat = fsmap.compat;
+    if (fscid == FS_CLUSTER_ID_NONE) {
+      // For a client, we should have already dropped out
+      assert(is_mds);
+
+      if (fsmap.standby_daemons.count(mds_gid)) {
+        // For an MDS, we need to feed it an MDSMap with its own state in
+        null_map.mds_info[mds_gid] = fsmap.standby_daemons[mds_gid];
+        null_map.epoch = fsmap.standby_epochs[mds_gid];
+      } else {
+        null_map.epoch = fsmap.epoch;
+      }
+      mds_map = &null_map;
+    } else {
+      // Check the effective epoch 
+      mds_map = &(fsmap.filesystems.at(fscid)->mds_map);
+    }
+
+    dout(10) << __func__ << " selected MDS map epoch " <<
+      mds_map->epoch << " for namespace " << fscid << " for subscriber "
+      << sub->session->inst.name << " who wants epoch " << sub->next << dendl;
+
+    assert(mds_map != nullptr);
+    if (sub->next > mds_map->epoch) {
+      return;
+    }
+    auto msg = new MMDSMap(mon->monmap->fsid, mds_map);
+
+    sub->session->con->send_message(msg);
+    if (sub->onetime) {
       mon->session_map.remove_sub(sub);
-    else
-      sub->next = mdsmap.get_epoch() + 1;
+    } else {
+      sub->next = mds_map->get_epoch() + 1;
+    }
   }
 }
 
+
 void MDSMonitor::update_metadata(mds_gid_t gid,
                                 const map<string, string>& metadata)
 {
@@ -1836,7 +2538,7 @@ void MDSMonitor::remove_from_metadata(MonitorDBStore::TransactionRef t)
   bool update = false;
   for (map<mds_gid_t, Metadata>::iterator i = pending_metadata.begin();
        i != pending_metadata.end(); ) {
-    if (pending_mdsmap.get_state_gid(i->first) == MDSMap::STATE_NULL) {
+    if (!pending_fsmap.gid_exists(i->first)) {
       pending_metadata.erase(i++);
       update = true;
     } else {
@@ -1906,11 +2608,12 @@ int MDSMonitor::print_nodes(Formatter *f)
       continue;
     }
     const mds_gid_t gid = it->first;
-    if (mdsmap.get_state_gid(gid) == MDSMap::STATE_NULL) {
+    if (!fsmap.gid_exists(gid)) {
       dout(5) << __func__ << ": GID " << gid << " not existent" << dendl;
       continue;
     }
-    const MDSMap::mds_info_t& mds_info = mdsmap.get_info_gid(gid);
+    const MDSMap::mds_info_t& mds_info = fsmap.get_info_gid(gid);
+    // FIXME: include filesystem name with rank here
     mdses[hostname->second].push_back(mds_info.rank);
   }
 
@@ -1918,270 +2621,307 @@ int MDSMonitor::print_nodes(Formatter *f)
   return 0;
 }
 
-void MDSMonitor::tick()
+/**
+ * If a cluster is undersized (with respect to max_mds), then
+ * attempt to find daemons to grow it.
+ */
+bool MDSMonitor::maybe_expand_cluster(std::shared_ptr<Filesystem> fs)
 {
-  // make sure mds's are still alive
-  // ...if i am an active leader
-  if (!is_active()) return;
-
-  // Do nothing if the filesystem is disabled
-  if (!mdsmap.get_enabled()) return;
-
-  dout(10) << mdsmap << dendl;
-  
   bool do_propose = false;
 
-  if (!mon->is_leader()) return;
+  if (fs->mds_map.test_flag(CEPH_MDSMAP_DOWN)) {
+    return do_propose;
+  }
 
-  // expand mds cluster (add new nodes to @in)?
-  while (pending_mdsmap.get_num_in_mds() < size_t(pending_mdsmap.get_max_mds()) &&
-        !pending_mdsmap.is_degraded()) {
+  while (fs->mds_map.get_num_in_mds() < size_t(fs->mds_map.get_max_mds()) &&
+        !fs->mds_map.is_degraded()) {
     mds_rank_t mds = mds_rank_t(0);
     string name;
-    while (pending_mdsmap.is_in(mds))
+    while (fs->mds_map.is_in(mds)) {
       mds++;
-    mds_gid_t newgid = pending_mdsmap.find_replacement_for(mds, name,
+    }
+    mds_gid_t newgid = pending_fsmap.find_replacement_for({fs->fscid, mds}, name,
                          g_conf->mon_force_standby_active);
-    if (!newgid)
+    if (newgid == MDS_GID_NONE) {
       break;
+    }
 
-    MDSMap::mds_info_t& info = pending_mdsmap.mds_info[newgid];
-    dout(1) << "adding standby " << info.addr << " as mds." << mds << dendl;
-    
-    info.rank = mds;
-    if (pending_mdsmap.stopped.count(mds)) {
-      info.state = MDSMap::STATE_STARTING;
-      pending_mdsmap.stopped.erase(mds);
-    } else
-      info.state = MDSMap::STATE_CREATING;
-    info.inc = ++pending_mdsmap.inc[mds];
-    pending_mdsmap.in.insert(mds);
-    pending_mdsmap.up[mds] = newgid;
+    dout(1) << "adding standby " << pending_fsmap.get_info_gid(newgid).addr
+            << " as mds." << mds << dendl;
+    pending_fsmap.promote(newgid, fs, mds);
     do_propose = true;
   }
 
-  // check beacon timestamps
-  utime_t now = ceph_clock_now(g_ceph_context);
-  utime_t cutoff = now;
-  cutoff -= g_conf->mds_beacon_grace;
-
-  // make sure last_beacon is fully populated
-  for (map<mds_gid_t,MDSMap::mds_info_t>::iterator p = pending_mdsmap.mds_info.begin();
-       p != pending_mdsmap.mds_info.end();
-       ++p) {
-    if (last_beacon.count(p->first) == 0) {
-      const MDSMap::mds_info_t& info = p->second;
-      dout(10) << " adding " << p->second.addr << " mds." << info.rank << "." << info.inc
-              << " " << ceph_mds_state_name(info.state)
-              << " to last_beacon" << dendl;
-      last_beacon[p->first].stamp = ceph_clock_now(g_ceph_context);
-      last_beacon[p->first].seq = 0;
-    }
-  }
-
-  if (mon->osdmon()->is_writeable()) {
-
-    bool propose_osdmap = false;
-
-    map<mds_gid_t, beacon_info_t>::iterator p = last_beacon.begin();
-    while (p != last_beacon.end()) {
-      mds_gid_t gid = p->first;
-      utime_t since = p->second.stamp;
-      uint64_t seq = p->second.seq;
-      ++p;
-      
-      if (pending_mdsmap.mds_info.count(gid) == 0) {
-       // clean it out
-       last_beacon.erase(gid);
-       continue;
-      }
-
-      if (since >= cutoff)
-       continue;
+  return do_propose;
+}
 
-      MDSMap::mds_info_t& info = pending_mdsmap.mds_info[gid];
 
-      dout(10) << "no beacon from " << gid << " " << info.addr << " mds." << info.rank << "." << info.inc
-              << " " << ceph_mds_state_name(info.state)
-              << " since " << since << dendl;
-      
-      // are we in?
-      // and is there a non-laggy standby that can take over for us?
-      mds_gid_t sgid;
-      if (info.rank >= 0 &&
-         info.state != MDSMap::STATE_STANDBY &&
-         info.state != MDSMap::STATE_STANDBY_REPLAY &&
-         (sgid = pending_mdsmap.find_replacement_for(info.rank, info.name, 
-                    g_conf->mon_force_standby_active)) != MDS_GID_NONE) {
-       MDSMap::mds_info_t& si = pending_mdsmap.mds_info[sgid];
-       dout(10) << " replacing " << gid << " " << info.addr << " mds." << info.rank << "." << info.inc
-                << " " << ceph_mds_state_name(info.state)
-                << " with " << sgid << "/" << si.name << " " << si.addr << dendl;
-       switch (info.state) {
-       case MDSMap::STATE_CREATING:
-       case MDSMap::STATE_STARTING:
-         si.state = info.state;
-         break;
-       case MDSMap::STATE_REPLAY:
-       case MDSMap::STATE_RESOLVE:
-       case MDSMap::STATE_RECONNECT:
-       case MDSMap::STATE_REJOIN:
-       case MDSMap::STATE_CLIENTREPLAY:
-       case MDSMap::STATE_ACTIVE:
-       case MDSMap::STATE_STOPPING:
-       case MDSMap::STATE_DNE:
-         si.state = MDSMap::STATE_REPLAY;
-         break;
-       default:
-         assert(0);
-       }
+/**
+ * If a daemon is laggy, and a suitable replacement
+ * is available, fail this daemon (remove from map) and pass its
+ * role to another daemon.
+ */
+void MDSMonitor::maybe_replace_gid(mds_gid_t gid,
+    const beacon_info_t &beacon,
+    bool *mds_propose, bool *osd_propose)
+{
+  assert(mds_propose != nullptr);
+  assert(osd_propose != nullptr);
+
+  const MDSMap::mds_info_t info = pending_fsmap.get_info_gid(gid);
+  const auto fscid = pending_fsmap.mds_roles.at(gid);
+
+  dout(10) << "no beacon from " << gid << " " << info.addr << " mds."
+    << info.rank << "." << info.inc
+    << " " << ceph_mds_state_name(info.state)
+    << " since " << beacon.stamp << dendl;
+
+  // are we in?
+  // and is there a non-laggy standby that can take over for us?
+  mds_gid_t sgid;
+  if (info.rank >= 0 &&
+      info.state != MDSMap::STATE_STANDBY &&
+      info.state != MDSMap::STATE_STANDBY_REPLAY &&
+      !pending_fsmap.get_filesystem(fscid)->mds_map.test_flag(CEPH_MDSMAP_DOWN) &&
+      (sgid = pending_fsmap.find_replacement_for({fscid, info.rank}, info.name,
+                g_conf->mon_force_standby_active)) != MDS_GID_NONE) {
+    
+    MDSMap::mds_info_t si = pending_fsmap.get_info_gid(sgid);
+    dout(10) << " replacing " << gid << " " << info.addr << " mds."
+      << info.rank << "." << info.inc
+      << " " << ceph_mds_state_name(info.state)
+      << " with " << sgid << "/" << si.name << " " << si.addr << dendl;
+
+    // Remember what NS the old one was in
+    const fs_cluster_id_t fscid = pending_fsmap.mds_roles.at(gid);
+
+    // Remove the old one
+    *osd_propose |= fail_mds_gid(gid);
+
+    // Promote the replacement
+    const std::shared_ptr<Filesystem> fs = pending_fsmap.get_filesystem(fscid);
+    pending_fsmap.promote(sgid, fs, info.rank);
+
+    *mds_propose = true;
+  } else if (info.state == MDSMap::STATE_STANDBY_REPLAY) {
+    dout(10) << " failing " << gid << " " << info.addr << " mds." << info.rank << "." << info.inc
+      << " " << ceph_mds_state_name(info.state)
+      << dendl;
+    fail_mds_gid(gid);
+    *mds_propose = true;
+  } else {
+    if (info.state == MDSMap::STATE_STANDBY ||
+        info.state == MDSMap::STATE_STANDBY_REPLAY) {
+      // remove it
+      dout(10) << " removing " << gid << " " << info.addr << " mds." << info.rank << "." << info.inc
+        << " " << ceph_mds_state_name(info.state)
+        << " (laggy)" << dendl;
+      fail_mds_gid(gid);
+      *mds_propose = true;
+    } else if (!info.laggy()) {
+      dout(10) << " marking " << gid << " " << info.addr << " mds." << info.rank << "." << info.inc
+        << " " << ceph_mds_state_name(info.state)
+        << " laggy" << dendl;
+      pending_fsmap.modify_daemon(info.global_id, [](MDSMap::mds_info_t *info) {
+          info->laggy_since = ceph_clock_now(g_ceph_context);
+      });
+      *mds_propose = true;
+    }
+    last_beacon.erase(gid);
+  }
+}
 
-       info.state_seq = seq;
-       si.rank = info.rank;
-       si.inc = ++pending_mdsmap.inc[info.rank];
-       pending_mdsmap.up[info.rank] = sgid;
-       if (si.state > 0)
-         pending_mdsmap.last_failure = pending_mdsmap.epoch;
-       if (si.state > 0 ||
-           si.state == MDSMap::STATE_CREATING ||
-           si.state == MDSMap::STATE_STARTING) {
-         // blacklist laggy mds
-         utime_t until = now;
-         until += g_conf->mds_blacklist_interval;
-         pending_mdsmap.last_failure_osd_epoch = mon->osdmon()->blacklist(info.addr, until);
-         propose_osdmap = true;
-       }
-       pending_mdsmap.mds_info.erase(gid);
-        pending_daemon_health.erase(gid);
-        pending_daemon_health_rm.insert(gid);
-       last_beacon.erase(gid);
-       do_propose = true;
-      } else if (info.state == MDSMap::STATE_STANDBY_REPLAY) {
-       dout(10) << " failing " << gid << " " << info.addr << " mds." << info.rank << "." << info.inc
-                << " " << ceph_mds_state_name(info.state)
-                << dendl;
-       pending_mdsmap.mds_info.erase(gid);
-        pending_daemon_health.erase(gid);
-        pending_daemon_health_rm.insert(gid);
-       last_beacon.erase(gid);
-       do_propose = true;
-      } else {
-       if (info.state == MDSMap::STATE_STANDBY ||
-           info.state == MDSMap::STATE_STANDBY_REPLAY) {
-         // remove it
-         dout(10) << " removing " << gid << " " << info.addr << " mds." << info.rank << "." << info.inc
-                  << " " << ceph_mds_state_name(info.state)
-                  << " (laggy)" << dendl;
-         pending_mdsmap.mds_info.erase(gid);
-          pending_daemon_health.erase(gid);
-          pending_daemon_health_rm.insert(gid);
-         do_propose = true;
-       } else if (!info.laggy()) {
-         dout(10) << " marking " << gid << " " << info.addr << " mds." << info.rank << "." << info.inc
-                  << " " << ceph_mds_state_name(info.state)
-                  << " laggy" << dendl;
-         info.laggy_since = now;
-         do_propose = true;
-       }
-       last_beacon.erase(gid);
-      }
-    }
+bool MDSMonitor::maybe_promote_standby(std::shared_ptr<Filesystem> fs)
+{
+  assert(!fs->mds_map.test_flag(CEPH_MDSMAP_DOWN));
 
-    if (propose_osdmap)
-      request_proposal(mon->osdmon());
-  }
+  bool do_propose = false;
 
   // have a standby take over?
   set<mds_rank_t> failed;
-  pending_mdsmap.get_failed_mds_set(failed);
-  if (!failed.empty() && !pending_mdsmap.test_flag(CEPH_MDSMAP_DOWN)) {
+  fs->mds_map.get_failed_mds_set(failed);
+  if (!failed.empty()) {
     set<mds_rank_t>::iterator p = failed.begin();
     while (p != failed.end()) {
       mds_rank_t f = *p++;
-      string name;  // FIXME
-      mds_gid_t sgid = pending_mdsmap.find_replacement_for(f, name,
+      mds_gid_t sgid = pending_fsmap.find_replacement_for({fs->fscid, f}, {},
           g_conf->mon_force_standby_active);
       if (sgid) {
-       MDSMap::mds_info_t& si = pending_mdsmap.mds_info[sgid];
-       dout(0) << " taking over failed mds." << f << " with " << sgid << "/" << si.name << " " << si.addr << dendl;
-       si.state = MDSMap::STATE_REPLAY;
-       si.rank = f;
-       si.inc = ++pending_mdsmap.inc[f];
-       pending_mdsmap.in.insert(f);
-       pending_mdsmap.up[f] = sgid;
-       pending_mdsmap.failed.erase(f);
+        const MDSMap::mds_info_t si = pending_fsmap.get_info_gid(sgid);
+        dout(0) << " taking over failed mds." << f << " with " << sgid
+                << "/" << si.name << " " << si.addr << dendl;
+        pending_fsmap.promote(sgid, fs, f);
        do_propose = true;
       }
     }
   }
 
-  // have a standby follow someone?
+  // There were no failures to replace, so try using any available standbys
+  // as standby-replay daemons.
   if (failed.empty()) {
-    for (map<mds_gid_t,MDSMap::mds_info_t>::iterator j = pending_mdsmap.mds_info.begin();
-        j != pending_mdsmap.mds_info.end();
-        ++j) {
-      MDSMap::mds_info_t& info = j->second;
-      
-      if (info.state != MDSMap::STATE_STANDBY)
-       continue;
+    for (const auto &j : pending_fsmap.standby_daemons) {
+      const auto &gid = j.first;
+      const auto &info = j.second;
+      assert(info.state == MDSMap::STATE_STANDBY);
 
       /*
        * This mds is standby but has no rank assigned.
        * See if we can find it somebody to shadow
        */
-      dout(20) << "gid " << j->first << " is standby and following nobody" << dendl;
+      dout(20) << "gid " << gid << " is standby and following nobody" << dendl;
       
       // standby for someone specific?
       if (info.standby_for_rank >= 0) {
-       if (pending_mdsmap.is_followable(info.standby_for_rank) &&
-           try_standby_replay(info, pending_mdsmap.mds_info[pending_mdsmap.up[info.standby_for_rank]]))
-         do_propose = true;
+        // The mds_info_t may or may not tell us exactly which filesystem
+        // the standby_for_rank refers to: lookup via legacy_client_fscid
+        mds_role_t target_role = {
+          info.standby_for_ns == FS_CLUSTER_ID_NONE ?
+            pending_fsmap.legacy_client_fscid : info.standby_for_ns,
+          info.standby_for_rank};
+
+        // If we managed to resolve a full target role
+        if (target_role.fscid != FS_CLUSTER_ID_NONE) {
+          auto fs = pending_fsmap.get_filesystem(target_role.fscid);
+          if (fs->mds_map.is_followable(target_role.rank)) {
+            do_propose |= try_standby_replay(
+                info,
+                *fs,
+                fs->mds_map.get_info(target_role.rank));
+          }
+        }
+
        continue;
       }
 
       // check everyone
-      for (map<mds_gid_t,MDSMap::mds_info_t>::iterator i = pending_mdsmap.mds_info.begin();
-          i != pending_mdsmap.mds_info.end();
-          ++i) {
-       if (i->second.rank >= 0 && pending_mdsmap.is_followable(i->second.rank)) {
-         if ((info.standby_for_name.length() && info.standby_for_name != i->second.name) ||
-             info.standby_for_rank >= 0)
-           continue;   // we're supposed to follow someone else
-
-         if (info.standby_for_rank == MDSMap::MDS_STANDBY_ANY &&
-             try_standby_replay(info, i->second)) {
-           do_propose = true;
-           break;
-         }
-         continue;
-       }
+      for (auto fs_i : pending_fsmap.filesystems) {
+        const MDSMap &mds_map = fs_i.second->mds_map;
+        for (auto mds_i : mds_map.mds_info) {
+          MDSMap::mds_info_t &cand_info = mds_i.second;
+          if (cand_info.rank >= 0 && mds_map.is_followable(cand_info.rank)) {
+            if ((info.standby_for_name.length() && info.standby_for_name != cand_info.name) ||
+                info.standby_for_rank != MDS_RANK_NONE) {
+              continue;   // we're supposed to follow someone else
+            }
+
+            if (info.standby_for_rank == MDSMap::MDS_STANDBY_ANY &&
+                try_standby_replay(info, *(fs_i.second), cand_info)) {
+              do_propose = true;
+              break;
+            }
+            continue;
+          }
+        }
       }
     }
   }
 
-  if (do_propose)
+  return do_propose;
+}
+
+void MDSMonitor::tick()
+{
+  // make sure mds's are still alive
+  // ...if i am an active leader
+  if (!is_active()) return;
+
+  dout(10) << fsmap << dendl;
+
+  bool do_propose = false;
+
+  if (!mon->is_leader()) return;
+
+  // expand mds cluster (add new nodes to @in)?
+  for (auto i : pending_fsmap.filesystems) {
+    do_propose |= maybe_expand_cluster(i.second);
+  }
+
+  // check beacon timestamps
+  utime_t now = ceph_clock_now(g_ceph_context);
+  utime_t cutoff = now;
+  cutoff -= g_conf->mds_beacon_grace;
+
+  // make sure last_beacon is fully populated
+  for (const auto &p : pending_fsmap.mds_roles) {
+    auto &gid = p.first;
+    if (last_beacon.count(gid) == 0) {
+      last_beacon[gid].stamp = ceph_clock_now(g_ceph_context);
+      last_beacon[gid].seq = 0;
+    }
+  }
+
+  // If the OSDMap is writeable, we can blacklist things, so we can
+  // try failing any laggy MDS daemons.  Consider each one for failure.
+  if (mon->osdmon()->is_writeable()) {
+    bool propose_osdmap = false;
+
+    map<mds_gid_t, beacon_info_t>::iterator p = last_beacon.begin();
+    while (p != last_beacon.end()) {
+      mds_gid_t gid = p->first;
+      auto beacon_info = p->second;
+      ++p;
+
+      if (!pending_fsmap.gid_exists(gid)) {
+       // clean it out
+       last_beacon.erase(gid);
+       continue;
+      }
+
+      if (beacon_info.stamp < cutoff) {
+        maybe_replace_gid(gid, beacon_info, &do_propose, &propose_osdmap);
+      }
+    }
+
+    if (propose_osdmap) {
+      request_proposal(mon->osdmon());
+    }
+  }
+
+  for (auto i : pending_fsmap.filesystems) {
+    auto fs = i.second;
+    if (!fs->mds_map.test_flag(CEPH_MDSMAP_DOWN)) {
+      do_propose |= maybe_promote_standby(fs);
+    }
+  }
+
+  if (do_propose) {
     propose_pending();
+  }
 }
 
-bool MDSMonitor::try_standby_replay(MDSMap::mds_info_t& finfo, MDSMap::mds_info_t& ainfo)
+/**
+ * finfo: the would-be follower
+ * leader_fs: the Filesystem containing the would-be leader
+ * ainfo: the would-be leader
+ */
+bool MDSMonitor::try_standby_replay(
+    const MDSMap::mds_info_t& finfo,
+    const Filesystem &leader_fs,
+    const MDSMap::mds_info_t& ainfo)
 {
   // someone else already following?
-  mds_gid_t lgid = pending_mdsmap.find_standby_for(ainfo.rank, ainfo.name);
-  if (lgid) {
-    MDSMap::mds_info_t& sinfo = pending_mdsmap.mds_info[lgid];
-    dout(20) << " mds." << ainfo.rank
-            << " standby gid " << lgid << " with state "
-            << ceph_mds_state_name(sinfo.state)
-            << dendl;
-    if (sinfo.state == MDSMap::STATE_STANDBY_REPLAY) {
-      dout(20) << "  skipping this MDS since it has a follower!" << dendl;
-      return false; // this MDS already has a standby
-    }
+  if (leader_fs.has_standby_replay(ainfo.global_id)) {
+    dout(20) << " mds." << ainfo.rank << " already has a follower" << dendl;
+    return false;
+  } else {
+    // Assign the new role to the standby
+    dout(10) << "  setting to follow mds rank " << ainfo.rank << dendl;
+    pending_fsmap.assign_standby_replay(finfo.global_id, leader_fs.fscid, ainfo.rank);
+    return true;
   }
+}
 
-  // hey, we found an MDS without a standby. Pair them!
-  finfo.standby_for_rank = ainfo.rank;
-  dout(10) << "  setting to shadow mds rank " << finfo.standby_for_rank << dendl;
-  finfo.state = MDSMap::STATE_STANDBY_REPLAY;
-  return true;
+MDSMonitor::MDSMonitor(Monitor *mn, Paxos *p, string service_name)
+  : PaxosService(mn, p, service_name)
+{
+  handlers.push_back(std::make_shared<SetHandler>());
+  handlers.push_back(std::make_shared<LegacyHandler<SetHandler> >("mds set"));
+  handlers.push_back(std::make_shared<FlagSetHandler>());
+  handlers.push_back(std::make_shared<AddDataPoolHandler>());
+  handlers.push_back(std::make_shared<LegacyHandler<AddDataPoolHandler> >(
+        "mds add_data_pool"));
+  handlers.push_back(std::make_shared<RemoveDataPoolHandler>());
+  handlers.push_back(std::make_shared<LegacyHandler<RemoveDataPoolHandler> >(
+        "mds remove_data_pool"));
 }
+
index b755ba9e82c21027c52fc5a439782bca6238354b..2f4b19374bc62496847bdabc0788cf5c3435c0d4 100644 (file)
@@ -25,29 +25,30 @@ using namespace std;
 #include "include/types.h"
 #include "msg/Messenger.h"
 
-#include "mds/MDSMap.h"
+#include "mds/FSMap.h"
 
 #include "PaxosService.h"
 #include "Session.h"
 
 #include "messages/MMDSBeacon.h"
 
-class MMDSGetMap;
 class MMonCommand;
 class MMDSLoadTargets;
+class MMDSMap;
+class FileSystemCommandHandler;
 
 #define MDS_HEALTH_PREFIX "mds_health"
 
 class MDSMonitor : public PaxosService {
  public:
   // mds maps
-  MDSMap mdsmap;          // current
+  FSMap fsmap;          // current
   bufferlist mdsmap_bl;   // encoded
 
-  MDSMap pending_mdsmap;  // current + pending updates
+  FSMap pending_fsmap;  // current + pending updates
 
   // my helpers
-  void print_map(MDSMap &m, int dbl=7);
+  void print_map(FSMap &m, int dbl=7);
 
   class C_Updated : public Context {
     MDSMonitor *mm;
@@ -66,7 +67,7 @@ class MDSMonitor : public PaxosService {
     }
   };
 
-  void create_new_fs(MDSMap &m, const std::string &name, int metadata_pool, int data_pool);
+  void create_new_fs(FSMap &m, const std::string &name, int metadata_pool, int data_pool);
 
   version_t get_trim_to();
 
@@ -100,15 +101,31 @@ class MDSMonitor : public PaxosService {
                  list<pair<health_status_t,string> > *detail,
                  CephContext *cct) const override;
   int fail_mds(std::ostream &ss, const std::string &arg);
-  void fail_mds_gid(mds_gid_t gid);
+  /**
+   * Return true if a blacklist was done (i.e. OSD propose needed)
+   */
+  bool fail_mds_gid(mds_gid_t gid);
 
   bool preprocess_command(MonOpRequestRef op);
   bool prepare_command(MonOpRequestRef op);
+
+  int parse_role(
+      const std::string &role_str,
+      mds_role_t *role,
+      std::ostream &ss);
+
   int management_command(
       MonOpRequestRef op,
       std::string const &prefix,
       map<string, cmd_vartype> &cmdmap,
       std::stringstream &ss);
+  void modify_legacy_filesystem(
+      std::function<void(std::shared_ptr<Filesystem> )> fn);
+  int legacy_filesystem_command(
+      MonOpRequestRef op,
+      std::string const &prefix,
+      map<string, cmd_vartype> &cmdmap,
+      std::stringstream &ss);
   int filesystem_command(
       MonOpRequestRef op,
       std::string const &prefix,
@@ -122,14 +139,20 @@ class MDSMonitor : public PaxosService {
   };
   map<mds_gid_t, beacon_info_t> last_beacon;
 
-  bool try_standby_replay(MDSMap::mds_info_t& finfo, MDSMap::mds_info_t& ainfo);
+  bool try_standby_replay(
+      const MDSMap::mds_info_t& finfo,
+      const Filesystem &leader_fs,
+      const MDSMap::mds_info_t& ainfo);
+
+  std::list<std::shared_ptr<FileSystemCommandHandler> > handlers;
 
 public:
-  MDSMonitor(Monitor *mn, Paxos *p, string service_name)
-    : PaxosService(mn, p, service_name)
-  {
-  }
+  MDSMonitor(Monitor *mn, Paxos *p, string service_name);
 
+  bool maybe_promote_standby(std::shared_ptr<Filesystem> fs);
+  bool maybe_expand_cluster(std::shared_ptr<Filesystem> fs);
+  void maybe_replace_gid(mds_gid_t gid, const beacon_info_t &beacon,
+      bool *mds_propose, bool *osd_propose);
   void tick();     // check state, take actions
 
   void dump_info(Formatter *f);
@@ -140,6 +163,7 @@ public:
   void check_sub(Subscription *sub);
 
 private:
+  MDSMap *generate_mds_map(fs_cluster_id_t fscid);
   void update_metadata(mds_gid_t gid, const Metadata& metadata);
   void remove_from_metadata(MonitorDBStore::TransactionRef t);
   int load_metadata(map<mds_gid_t, Metadata>& m);
index a01b32c10b2040e84f5287f647b9d12a3f30e451..59e7aed51f917db498d546caee14d560f5db525f 100644 (file)
@@ -300,7 +300,11 @@ COMMAND("mon metadata name=id,type=CephString",
 COMMAND("mds stat", "show MDS status", "mds", "r", "cli,rest")
 COMMAND("mds dump " 
        "name=epoch,type=CephInt,req=false,range=0", \
-       "dump info, optionally from epoch", "mds", "r", "cli,rest")
+       "dump legacy MDS cluster info, optionally from epoch",
+        "mds", "r", "cli,rest")
+COMMAND("fs dump "
+       "name=epoch,type=CephInt,req=false,range=0", \
+       "dump all CephFS status, optionally from epoch", "mds", "r", "cli,rest")
 COMMAND("mds getmap " \
        "name=epoch,type=CephInt,req=false,range=0", \
        "get MDS map, optionally from epoch", "mds", "r", "cli,rest")
@@ -338,12 +342,12 @@ COMMAND("mds set_state " \
        "set mds state of <gid> to <numeric-state>", "mds", "rw", "cli,rest")
 COMMAND("mds fail name=who,type=CephString", \
        "force mds to status failed", "mds", "rw", "cli,rest")
-COMMAND("mds repaired name=rank,type=CephInt", \
+COMMAND("mds repaired name=rank,type=CephString", \
        "mark a damaged MDS rank as no longer damaged", "mds", "rw", "cli,rest")
 COMMAND("mds rm " \
        "name=gid,type=CephInt,range=0", \
        "remove nonactive mds", "mds", "rw", "cli,rest")
-COMMAND("mds rmfailed name=who,type=CephInt,range=0 name=confirm,type=CephString,req=false", \
+COMMAND("mds rmfailed name=who,type=CephString name=confirm,type=CephString,req=false", \
        "remove failed mds", "mds", "rw", "cli,rest")
 COMMAND("mds cluster_down", "take MDS cluster down", "mds", "rw", "cli,rest")
 COMMAND("mds cluster_up", "bring MDS cluster up", "mds", "rw", "cli,rest")
@@ -384,6 +388,27 @@ COMMAND("fs reset " \
 COMMAND("fs ls ", \
        "list filesystems", \
        "fs", "r", "cli,rest")
+COMMAND("fs get name=fs_name,type=CephString", \
+       "get info about one filesystem", \
+       "fs", "r", "cli,rest")
+COMMAND("fs set " \
+       "name=fs_name,type=CephString " \
+       "name=var,type=CephChoices,strings=max_mds|max_file_size"
+        "|allow_new_snaps|inline_data|cluster_down " \
+       "name=val,type=CephString "                                     \
+       "name=confirm,type=CephString,req=false",                       \
+       "set mds parameter <var> to <val>", "mds", "rw", "cli,rest")
+COMMAND("fs flag set name=flag_name,type=CephChoices,strings=enable_multiple "
+        "name=val,type=CephString", \
+       "Set a global CephFS flag", \
+       "fs", "rw", "cli,rest")
+COMMAND("fs add_data_pool name=fs_name,type=CephString " \
+       "name=pool,type=CephString", \
+       "add data pool <pool>", "mds", "rw", "cli,rest")
+COMMAND("fs remove_data_pool name=fs_name,type=CephString " \
+       "name=pool,type=CephString", \
+       "remove data pool <pool>", "mds", "rw", "cli,rest")
+
 /*
  * Monmap commands
  */
index 84a6112cd626d2021c53b68ac501cef5ee907edc..ceea9f1ce1b70ee03ab8939e97869082eadffcd7 100644 (file)
@@ -2461,8 +2461,8 @@ void Monitor::get_cluster_status(stringstream &ss, Formatter *f)
     f->open_object_section("pgmap");
     pgmon()->pg_map.print_summary(f, NULL);
     f->close_section();
-    f->open_object_section("mdsmap");
-    mdsmon()->mdsmap.print_summary(f, NULL);
+    f->open_object_section("fsmap");
+    mdsmon()->fsmap.print_summary(f, NULL);
     f->close_section();
     f->close_section();
   } else {
@@ -2472,8 +2472,10 @@ void Monitor::get_cluster_status(stringstream &ss, Formatter *f)
     ss << "     monmap " << *monmap << "\n";
     ss << "            election epoch " << get_epoch()
        << ", quorum " << get_quorum() << " " << get_quorum_names() << "\n";
-    if (mdsmon()->mdsmap.get_enabled())
-      ss << "     mdsmap " << mdsmon()->mdsmap << "\n";
+    if (mdsmon()->fsmap.any_filesystems()) {
+      ss << "     mdsmap " << mdsmon()->fsmap << "\n";
+    }
+
     osdmon()->osdmap.print_summary(NULL, ss);
     pgmon()->pg_map.print_summary(NULL, &ss);
   }
@@ -4244,9 +4246,12 @@ void Monitor::handle_subscribe(MonOpRequestRef op)
                               p->second.flags & CEPH_SUBSCRIBE_ONETIME,
                               m->get_connection()->has_feature(CEPH_FEATURE_INCSUBOSDMAP));
 
-    if (p->first == "mdsmap") {
+    if (p->first.find("mdsmap") == 0 || p->first == "fsmap") {
+      dout(10) << __func__ << ": MDS sub '" << p->first << "'" << dendl;
       if ((int)s->is_capable("mds", MON_CAP_R)) {
-        mdsmon()->check_sub(s->sub_map["mdsmap"]);
+        Subscription *sub = s->sub_map[p->first];
+        assert(sub != nullptr);
+        mdsmon()->check_sub(sub);
       }
     } else if (p->first == "osdmap") {
       if ((int)s->is_capable("osd", MON_CAP_R)) {
@@ -4295,6 +4300,8 @@ void Monitor::handle_get_version(MonOpRequestRef op)
 
   if (m->what == "mdsmap") {
     svc = mdsmon();
+  } else if (m->what == "fsmap") {
+    svc = mdsmon();
   } else if (m->what == "osdmap") {
     svc = osdmon();
   } else if (m->what == "monmap") {
index f6bff252c78d1ecc04de3d119cada69eae4452c4..1b280b894251cb70dfa7da0655f2223be744d114 100644 (file)
@@ -7835,8 +7835,8 @@ int OSDMonitor::_check_remove_pool(int64_t pool, const pg_pool_t *p,
   const string& poolstr = osdmap.get_pool_name(pool);
 
   // If the Pool is in use by CephFS, refuse to delete it
-  MDSMap const &pending_mdsmap = mon->mdsmon()->pending_mdsmap;
-  if (pending_mdsmap.pool_in_use(pool)) {
+  FSMap const &pending_fsmap = mon->mdsmon()->pending_fsmap;
+  if (pending_fsmap.pool_in_use(pool)) {
     *ss << "pool '" << poolstr << "' is in use by CephFS";
     return -EBUSY;
   }
@@ -7884,8 +7884,8 @@ bool OSDMonitor::_check_become_tier(
   const std::string &tier_pool_name = osdmap.get_pool_name(tier_pool_id);
   const std::string &base_pool_name = osdmap.get_pool_name(base_pool_id);
 
-  const MDSMap &pending_mdsmap = mon->mdsmon()->pending_mdsmap;
-  if (pending_mdsmap.pool_in_use(tier_pool_id)) {
+  const FSMap &pending_fsmap = mon->mdsmon()->pending_fsmap;
+  if (pending_fsmap.pool_in_use(tier_pool_id)) {
     *ss << "pool '" << tier_pool_name << "' is in use by CephFS";
     *err = -EBUSY;
     return false;
@@ -7934,8 +7934,8 @@ bool OSDMonitor::_check_remove_tier(
   const std::string &base_pool_name = osdmap.get_pool_name(base_pool_id);
 
   // Apply CephFS-specific checks
-  const MDSMap &pending_mdsmap = mon->mdsmon()->pending_mdsmap;
-  if (pending_mdsmap.pool_in_use(base_pool_id)) {
+  const FSMap &pending_fsmap = mon->mdsmon()->pending_fsmap;
+  if (pending_fsmap.pool_in_use(base_pool_id)) {
     if (base_pool->type != pg_pool_t::TYPE_REPLICATED) {
       // If the underlying pool is erasure coded, we can't permit the
       // removal of the replicated tier that CephFS relies on to access it