]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mds: cache snaptable in snapclient
authorYan, Zheng <zyan@redhat.com>
Mon, 17 Jul 2017 08:26:14 +0000 (16:26 +0800)
committerYan, Zheng <zyan@redhat.com>
Fri, 9 Feb 2018 09:46:55 +0000 (17:46 +0800)
The idea is caching both snap infos and pending updates in snapclient.
The snapclient also tracks updates that are being committed, it applies
these commits to its cached snap infos. Steps to update snaptable are:

 - mds.x acquire locks (xlock on snaplock of affected snaprealm inode)
 - mds.x prepares snaptable update. (send preare to snapserver and waits
   for 'agree' reply)
 - snapserver sends notification about the update to all mds and waits
   for ACKs. (not implemented by this patch)
 - snapserver send 'agree' reply to mds.x
 - mds.x journals corresponding
 - mds.x commits the snaptable update and notifies all mds that it
   commits that update. then mds drops locks.

When receiving committing notification, mds applies the committing
update to its cached snap infos. By this way, cached snap infos get
synchronized before snaplock become readable.

Signed-off-by: "Yan, Zheng" <zyan@redhat.com>
src/mds/CMakeLists.txt
src/mds/MDSRank.cc
src/mds/MDSTable.cc
src/mds/MDSTableClient.cc
src/mds/MDSTableClient.h
src/mds/Server.cc
src/mds/SnapClient.cc [new file with mode: 0644]
src/mds/SnapClient.h
src/mds/SnapServer.cc

index f9e71ae96757d149ae70b0a9f0ee198fac03feb6..6aea92c8221e04f9baae8e6f93343063fd6bebee 100644 (file)
@@ -29,6 +29,7 @@ set(mds_srcs
   SimpleLock.cc
   SnapRealm.cc
   SnapServer.cc
+  SnapClient.cc
   snap.cc
   SessionMap.cc
   MDSContext.cc
index f476691118229b2bdddec99a094b2f36927f4088..e63061b2e8228c49fbc56c84a88edbcb4fac71b5 100644 (file)
@@ -1181,6 +1181,9 @@ void MDSRank::starting_done()
                               mdlog->start_new_segment();
                           })));
   }
+
+  // sync snaptable cache
+  snapclient->sync(new C_MDSInternalNoop);
 }
 
 
@@ -1338,6 +1341,8 @@ void MDSRank::replay_done()
       mdsmap->get_num_failed_mds() == 0) { // just me!
     dout(2) << "i am alone, moving to state reconnect" << dendl;
     request_state(MDSMap::STATE_RECONNECT);
+    // sync snaptable cache
+    snapclient->sync(new C_MDSInternalNoop);
   } else {
     dout(2) << "i am not alone, moving to state resolve" << dendl;
     request_state(MDSMap::STATE_RESOLVE);
@@ -1350,7 +1355,6 @@ void MDSRank::reopen_log()
   mdcache->rollback_uncommitted_fragments();
 }
 
-
 void MDSRank::resolve_start()
 {
   dout(1) << "resolve_start" << dendl;
@@ -1360,10 +1364,13 @@ void MDSRank::resolve_start()
   mdcache->resolve_start(new C_MDS_VoidFn(this, &MDSRank::resolve_done));
   finish_contexts(g_ceph_context, waiting_for_resolve);
 }
+
 void MDSRank::resolve_done()
 {
   dout(1) << "resolve_done" << dendl;
   request_state(MDSMap::STATE_RECONNECT);
+  // sync snaptable cache
+  snapclient->sync(new C_MDSInternalNoop);
 }
 
 void MDSRank::reconnect_start()
@@ -1501,6 +1508,8 @@ void MDSRank::creating_done()
 {
   dout(1)<< "creating_done" << dendl;
   request_state(MDSMap::STATE_ACTIVE);
+  // sync snaptable cache
+  snapclient->sync(new C_MDSInternalNoop);
 }
 
 void MDSRank::boot_create()
index bf1e04ef3712cef3df05ac07e5242cfd5f6d11e3..be12dbfc0d63e5af5a18ec4d84fd72eff60ff03f 100644 (file)
@@ -114,6 +114,7 @@ void MDSTable::save_2(int r, version_t v)
 void MDSTable::reset()
 {
   reset_state();
+  projected_version = version;
   state = STATE_ACTIVE;
 }
 
index 7d0353e61d2e69d1b9898533e708fb7dda30bf5a..7d5c03a36d0d29d867abf3242e49622cf1533fe1 100644 (file)
@@ -187,6 +187,8 @@ void MDSTableClient::commit(version_t tid, LogSegment *ls)
   pending_commit[tid] = ls;
   ls->pending_commit_tids[table].insert(tid);
 
+  notify_commit(tid);
+
   assert(g_conf->mds_kill_mdstable_at != 4);
 
   if (server_ready) {
@@ -206,6 +208,8 @@ void MDSTableClient::got_journaled_agree(version_t tid, LogSegment *ls)
   dout(10) << "got_journaled_agree " << tid << dendl;
   ls->pending_commit_tids[table].insert(tid);
   pending_commit[tid] = ls;
+
+  notify_commit(tid);
 }
 
 void MDSTableClient::got_journaled_ack(version_t tid)
index ce15c54d87490c1af15fc5832f9c06034c295158..0bd4f78018a1a3d28a4a5318aebe84bd19e32cf0 100644 (file)
@@ -85,6 +85,7 @@ public:
   // child must implement
   virtual void resend_queries() = 0;
   virtual void handle_query_result(MMDSTableRequest *m) = 0;
+  virtual void notify_commit(version_t tid) = 0;
 
   // and friendly front-end for _prepare.
 
index 70f7bd0ad3063e48789fe92c12d1e8ee93c170cc..1a94ca4e578676847c49c35ddc7ada65ecb7e939 100644 (file)
@@ -919,6 +919,13 @@ void Server::handle_client_reconnect(MClientReconnect *m)
     return;
   }
 
+  // opening snaprealm past parents needs to use snaptable
+  if (!mds->snapclient->is_synced()) {
+    dout(10) << " snaptable isn't synced, waiting" << dendl;
+    mds->snapclient->wait_for_sync(new C_MDS_RetryMessage(mds, m));
+    return;
+  }
+
   // notify client of success with an OPEN
   m->get_connection()->send_message(new MClientSession(CEPH_SESSION_OPEN));
   session->last_cap_renew = ceph_clock_now();
@@ -997,7 +1004,15 @@ void Server::reconnect_gather_finish()
 {
   dout(7) << "reconnect_gather_finish.  failed on " << failed_reconnects << " clients" << dendl;
   assert(reconnect_done);
-  reconnect_done->complete(0);
+
+  if (!mds->snapclient->is_synced()) {
+    // make sure snaptable cache is populated. snaprealms will be
+    // extensively used in rejoin stage.
+    dout(7) << " snaptable cache isn't synced, delaying state transition" << dendl;
+    mds->snapclient->wait_for_sync(reconnect_done);
+  } else {
+    reconnect_done->complete(0);
+  }
   reconnect_done = NULL;
 }
 
@@ -8669,6 +8684,12 @@ void Server::handle_client_mksnap(MDRequestRef& mdr)
   decode(snapid, p);
   dout(10) << " stid " << stid << " snapid " << snapid << dendl;
 
+  // FIXME: notify all other mds the change
+  if (stid > mds->snapclient->get_cached_version()) {
+    mds->snapclient->refresh(stid, new C_MDS_RetryRequest(mdcache, mdr));
+    return;
+  }
+
   // journal
   SnapInfo info;
   info.ino = diri->ino();
@@ -8806,6 +8827,12 @@ void Server::handle_client_rmsnap(MDRequestRef& mdr)
   decode(seq, p);  
   dout(10) << " stid is " << stid << ", seq is " << seq << dendl;
 
+  // FIXME: notify all other mds the change
+  if (stid > mds->snapclient->get_cached_version()) {
+    mds->snapclient->refresh(stid, new C_MDS_RetryRequest(mdcache, mdr));
+    return;
+  }
+
   // journal
   auto &pi = diri->project_inode(false, true);
   pi.inode.version = diri->pre_dirty();
@@ -8946,6 +8973,12 @@ void Server::handle_client_renamesnap(MDRequestRef& mdr)
   version_t stid = mdr->more()->stid;
   dout(10) << " stid is " << stid << dendl;
 
+  // FIXME: notify all other mds the change
+  if (stid > mds->snapclient->get_cached_version()) {
+    mds->snapclient->refresh(stid, new C_MDS_RetryRequest(mdcache, mdr));
+    return;
+  }
+
   // journal
   auto &pi = diri->project_inode(false, true);
   pi.inode.ctime = mdr->get_op_stamp();
diff --git a/src/mds/SnapClient.cc b/src/mds/SnapClient.cc
new file mode 100644 (file)
index 0000000..51bc29e
--- /dev/null
@@ -0,0 +1,257 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+
+#include "MDSMap.h"
+#include "MDSRank.h"
+#include "msg/Messenger.h"
+#include "messages/MMDSTableRequest.h"
+#include "SnapClient.h"
+
+#include "common/config.h"
+#include "include/assert.h"
+
+#define dout_context g_ceph_context
+#define dout_subsys ceph_subsys_mds
+#undef dout_prefix
+#define dout_prefix *_dout << "mds." << mds->get_nodeid() << ".snapclient "
+
+void SnapClient::resend_queries()
+{
+  if (!waiting_for_version.empty() || (!synced && sync_reqid > 0)) {
+    version_t want;
+    if (!waiting_for_version.empty())
+      want = MAX(cached_version, waiting_for_version.rbegin()->first);
+    else
+      want = MAX(cached_version, 1);
+    refresh(want, NULL);
+    if (!synced)
+      sync_reqid = last_reqid;
+  }
+}
+
+void SnapClient::handle_query_result(MMDSTableRequest *m)
+{
+  dout(10) << __func__ << " " << *m << dendl;
+
+  char type;
+  using ceph::decode;
+  bufferlist::iterator p = m->bl.begin();
+  decode(type, p);
+
+  switch (type) {
+  case 'U': // uptodate
+    assert(cached_version == m->get_tid());
+    break;
+  case 'F': // full
+    {
+      set<snapid_t> old_snaps;
+      if (cached_version > 0)
+       get_snaps(old_snaps);
+
+      decode(cached_snaps, p);
+      decode(cached_pending_update, p);
+      decode(cached_pending_destroy, p);
+      cached_version = m->get_tid();
+
+      // increase destroy_seq if any snapshot gets destroyed.
+      if (!old_snaps.empty() && old_snaps != filter(old_snaps))
+       destroy_seq++;
+    }
+    break;
+  default:
+    ceph_abort();
+  };
+
+  if (!committing_tids.empty()) {
+    for (auto p = committing_tids.begin();
+        p != committing_tids.end() && *p < cached_version; ) {
+      if (!cached_pending_update.count(*p) && !cached_pending_destroy.count(*p)) {
+       // pending update/destroy have been committed.
+       committing_tids.erase(p++);
+      } else {
+       ++p;
+      }
+    }
+  }
+
+  if (m->reqid >= sync_reqid)
+    synced = true;
+
+  if (synced && !waiting_for_version.empty()) {
+    std::list<MDSInternalContextBase*> finished;
+    for (auto p = waiting_for_version.begin();
+       p != waiting_for_version.end(); ) {
+      if (p->first > cached_version)
+       break;
+      finished.splice(finished.end(), p->second);
+      waiting_for_version.erase(p++);
+    }
+    if (!finished.empty())
+      mds->queue_waiters(finished);
+  }
+}
+
+void SnapClient::notify_commit(version_t tid)
+{
+  dout(10) << __func__ << " tid " << tid << dendl;
+
+  assert(cached_version == 0 || cached_version >= tid);
+  if (cached_version == 0) {
+    committing_tids.insert(tid);
+  } else if (cached_pending_update.count(tid)) {
+    committing_tids.insert(tid);
+  } else if (cached_pending_destroy.count(tid)) {
+    committing_tids.insert(tid);
+    destroy_seq++;
+  } else if (cached_version > tid) {
+    // no need to record the tid if it has already been committed.
+  } else {
+    ceph_abort();
+  }
+}
+
+void SnapClient::refresh(version_t want, MDSInternalContextBase *onfinish)
+{
+  dout(10) << __func__ << " want " << want << dendl;
+
+  assert(want >= cached_version);
+  if (onfinish)
+    waiting_for_version[want].push_back(onfinish);
+
+  if (!server_ready)
+    return;
+
+  mds_rank_t ts = mds->mdsmap->get_tableserver();
+  MMDSTableRequest *req = new MMDSTableRequest(table, TABLESERVER_OP_QUERY, ++last_reqid, 0);
+  using ceph::encode;
+  char op = 'F';
+  encode(op, req->bl);
+  encode(cached_version, req->bl);
+  mds->send_message_mds(req, ts);
+}
+
+void SnapClient::sync(MDSInternalContextBase *onfinish)
+{
+  dout(10) << __func__ << dendl;
+
+  refresh(MAX(cached_version, 1), onfinish);
+  synced = false;
+  if (server_ready)
+    sync_reqid = last_reqid;
+  else
+    sync_reqid = (last_reqid == ~0ULL) ? 1 : last_reqid + 1;
+}
+
+void SnapClient::get_snaps(set<snapid_t>& result) const
+{
+  assert(cached_version > 0);
+  for (auto& p : cached_snaps)
+    result.insert(p.first);
+
+  for (auto tid : committing_tids) {
+    auto q = cached_pending_update.find(tid);
+    if (q != cached_pending_update.end())
+      result.insert(q->second.snapid);
+
+    auto r = cached_pending_destroy.find(tid);
+    if (r != cached_pending_destroy.end())
+      result.erase(r->second.first);
+  }
+}
+
+set<snapid_t> SnapClient::filter(const set<snapid_t>& snaps) const
+{
+  assert(cached_version > 0);
+  if (snaps.empty())
+    return snaps;
+
+  set<snapid_t> result;
+
+  for (auto p : snaps) {
+    if (cached_snaps.count(p))
+      result.insert(p);
+  }
+
+  for (auto tid : committing_tids) {
+    auto q = cached_pending_update.find(tid);
+    if (q != cached_pending_update.end()) {
+      if (snaps.count(q->second.snapid))
+       result.insert(q->second.snapid);
+    }
+
+    auto r = cached_pending_destroy.find(tid);
+    if (r != cached_pending_destroy.end())
+      result.erase(r->second.first);
+  }
+
+  dout(10) << __func__ << " " << snaps << " -> " << result <<  dendl;
+  return result;
+}
+
+const SnapInfo* SnapClient::get_snap_info(snapid_t snapid) const
+{
+  assert(cached_version > 0);
+
+  const SnapInfo* result = NULL;
+  auto it = cached_snaps.find(snapid);
+  if (it != cached_snaps.end())
+    result = &it->second;
+
+  for (auto tid : committing_tids) {
+    auto q = cached_pending_update.find(tid);
+    if (q != cached_pending_update.end() && q->second.snapid == snapid) {
+      result = &q->second;
+      break;
+    }
+
+    auto r = cached_pending_destroy.find(tid);
+    if (r != cached_pending_destroy.end() && r->second.first == snapid) {
+      result = NULL;
+      break;
+    }
+  }
+
+  dout(10) << __func__ << " snapid " << snapid << " -> " << result <<  dendl;
+  return result;
+}
+
+void SnapClient::get_snap_infos(map<snapid_t, const SnapInfo*>& infomap,
+                               const set<snapid_t>& snaps) const
+{
+  assert(cached_version > 0);
+
+  if (snaps.empty())
+    return;
+
+  map<snapid_t, const SnapInfo*> result;
+  for (auto p : snaps) {
+    auto it = cached_snaps.find(p);
+    if (it != cached_snaps.end())
+      result[p] = &it->second;
+  }
+
+  for (auto tid : committing_tids) {
+    auto q = cached_pending_update.find(tid);
+    if (q != cached_pending_update.end()) {
+      if (snaps.count(q->second.snapid))
+       result[q->second.snapid] = &q->second;
+    }
+
+    auto r = cached_pending_destroy.find(tid);
+    if (r != cached_pending_destroy.end())
+      result.erase(r->second.first);
+  }
+
+  infomap.insert(result.begin(), result.end());
+}
index 4cda3956ab5b87433d799b816deba84e35c6860a..6182493ec4015363df8456f89044645332946a66 100644 (file)
@@ -25,11 +25,27 @@ class MDSRank;
 class LogSegment;
 
 class SnapClient : public MDSTableClient {
+  version_t cached_version;
+  map<snapid_t, SnapInfo> cached_snaps;
+  map<version_t, SnapInfo> cached_pending_update;
+  map<version_t, pair<snapid_t,snapid_t> > cached_pending_destroy;
+
+  set<version_t> committing_tids;
+
+  map<version_t, std::list<MDSInternalContextBase*> > waiting_for_version;
+
+  uint64_t destroy_seq;
+
+  uint64_t sync_reqid;
+  bool synced;
 public:
-  explicit SnapClient(MDSRank *m) : MDSTableClient(m, TABLE_SNAP) {}
+  explicit SnapClient(MDSRank *m) :
+    MDSTableClient(m, TABLE_SNAP),
+    cached_version(0), destroy_seq(1), sync_reqid(0),  synced(false) {}
 
-  void resend_queries() override {}
-  void handle_query_result(MMDSTableRequest *m) override {}
+  void resend_queries() override;
+  void handle_query_result(MMDSTableRequest *m) override;
+  void notify_commit(version_t tid) override;
 
   void prepare_create(inodeno_t dirino, std::string_view name, utime_t stamp,
                      version_t *pstid, bufferlist *pbl, MDSInternalContextBase *onfinish) {
@@ -70,6 +86,23 @@ public:
     encode(stamp, bl);
     _prepare(bl, pstid, NULL, onfinish);
   }
+
+  version_t get_cached_version() const { return cached_version; }
+  uint64_t get_destroy_seq() const { return destroy_seq; }
+  void refresh(version_t want, MDSInternalContextBase *onfinish);
+
+  void sync(MDSInternalContextBase *onfinish);
+
+  bool is_synced() const { return synced; }
+  void wait_for_sync(MDSInternalContextBase *c) {
+    assert(!synced);
+    waiting_for_version[MAX(cached_version, 1)].push_back(c);
+  }
+
+  void get_snaps(set<snapid_t>& snaps) const;
+  set<snapid_t> filter(const set<snapid_t>& snaps) const;
+  const SnapInfo* get_snap_info(snapid_t snapid) const;
+  void get_snap_infos(map<snapid_t, const SnapInfo*>& infomap, const set<snapid_t>& snaps) const;
 };
 
 #endif
index 1f36546f1f58e8f95cb874fbc5d1552b910327d2..79e58d91b62830ce719c3723cad3a90fc67ae2bb 100644 (file)
@@ -58,6 +58,7 @@ void SnapServer::reset_state()
     if (first_free > last_snap)
       last_snap = first_free;
   }
+  version++;
 }
 
 
@@ -247,10 +248,36 @@ void SnapServer::_server_update(bufferlist& bl)
 
 void SnapServer::handle_query(MMDSTableRequest *req)
 {
-  req->put();
-}
+  char op;
+  bufferlist::iterator p = req->bl.begin();
+  decode(op, p);
 
+  MMDSTableRequest *reply = new MMDSTableRequest(table, TABLESERVER_OP_QUERY_REPLY, req->reqid, version);
 
+  switch (op) {
+    case 'F': // full
+      version_t have_version;
+      decode(have_version, p);
+      assert(have_version <= version);
+      if (have_version == version) {
+       char type = 'U';
+       encode(type, reply->bl);
+      } else {
+       char type = 'F';
+       encode(type, reply->bl);
+       encode(snaps, reply->bl);
+       encode(pending_update, reply->bl);
+       encode(pending_destroy, reply->bl);
+      }
+      // FIXME: implement incremental change
+      break;
+    default:
+      ceph_abort();
+  };
+
+  mds->send_message(reply, req->get_connection());
+  req->put();
+}
 
 void SnapServer::check_osd_map(bool force)
 {