The idea is caching both snap infos and pending updates in snapclient.
The snapclient also tracks updates that are being committed, it applies
these commits to its cached snap infos. Steps to update snaptable are:
- mds.x acquire locks (xlock on snaplock of affected snaprealm inode)
- mds.x prepares snaptable update. (send preare to snapserver and waits
for 'agree' reply)
- snapserver sends notification about the update to all mds and waits
for ACKs. (not implemented by this patch)
- snapserver send 'agree' reply to mds.x
- mds.x journals corresponding
- mds.x commits the snaptable update and notifies all mds that it
commits that update. then mds drops locks.
When receiving committing notification, mds applies the committing
update to its cached snap infos. By this way, cached snap infos get
synchronized before snaplock become readable.
Signed-off-by: "Yan, Zheng" <zyan@redhat.com>
SimpleLock.cc
SnapRealm.cc
SnapServer.cc
+ SnapClient.cc
snap.cc
SessionMap.cc
MDSContext.cc
mdlog->start_new_segment();
})));
}
+
+ // sync snaptable cache
+ snapclient->sync(new C_MDSInternalNoop);
}
mdsmap->get_num_failed_mds() == 0) { // just me!
dout(2) << "i am alone, moving to state reconnect" << dendl;
request_state(MDSMap::STATE_RECONNECT);
+ // sync snaptable cache
+ snapclient->sync(new C_MDSInternalNoop);
} else {
dout(2) << "i am not alone, moving to state resolve" << dendl;
request_state(MDSMap::STATE_RESOLVE);
mdcache->rollback_uncommitted_fragments();
}
-
void MDSRank::resolve_start()
{
dout(1) << "resolve_start" << dendl;
mdcache->resolve_start(new C_MDS_VoidFn(this, &MDSRank::resolve_done));
finish_contexts(g_ceph_context, waiting_for_resolve);
}
+
void MDSRank::resolve_done()
{
dout(1) << "resolve_done" << dendl;
request_state(MDSMap::STATE_RECONNECT);
+ // sync snaptable cache
+ snapclient->sync(new C_MDSInternalNoop);
}
void MDSRank::reconnect_start()
{
dout(1)<< "creating_done" << dendl;
request_state(MDSMap::STATE_ACTIVE);
+ // sync snaptable cache
+ snapclient->sync(new C_MDSInternalNoop);
}
void MDSRank::boot_create()
void MDSTable::reset()
{
reset_state();
+ projected_version = version;
state = STATE_ACTIVE;
}
pending_commit[tid] = ls;
ls->pending_commit_tids[table].insert(tid);
+ notify_commit(tid);
+
assert(g_conf->mds_kill_mdstable_at != 4);
if (server_ready) {
dout(10) << "got_journaled_agree " << tid << dendl;
ls->pending_commit_tids[table].insert(tid);
pending_commit[tid] = ls;
+
+ notify_commit(tid);
}
void MDSTableClient::got_journaled_ack(version_t tid)
// child must implement
virtual void resend_queries() = 0;
virtual void handle_query_result(MMDSTableRequest *m) = 0;
+ virtual void notify_commit(version_t tid) = 0;
// and friendly front-end for _prepare.
return;
}
+ // opening snaprealm past parents needs to use snaptable
+ if (!mds->snapclient->is_synced()) {
+ dout(10) << " snaptable isn't synced, waiting" << dendl;
+ mds->snapclient->wait_for_sync(new C_MDS_RetryMessage(mds, m));
+ return;
+ }
+
// notify client of success with an OPEN
m->get_connection()->send_message(new MClientSession(CEPH_SESSION_OPEN));
session->last_cap_renew = ceph_clock_now();
{
dout(7) << "reconnect_gather_finish. failed on " << failed_reconnects << " clients" << dendl;
assert(reconnect_done);
- reconnect_done->complete(0);
+
+ if (!mds->snapclient->is_synced()) {
+ // make sure snaptable cache is populated. snaprealms will be
+ // extensively used in rejoin stage.
+ dout(7) << " snaptable cache isn't synced, delaying state transition" << dendl;
+ mds->snapclient->wait_for_sync(reconnect_done);
+ } else {
+ reconnect_done->complete(0);
+ }
reconnect_done = NULL;
}
decode(snapid, p);
dout(10) << " stid " << stid << " snapid " << snapid << dendl;
+ // FIXME: notify all other mds the change
+ if (stid > mds->snapclient->get_cached_version()) {
+ mds->snapclient->refresh(stid, new C_MDS_RetryRequest(mdcache, mdr));
+ return;
+ }
+
// journal
SnapInfo info;
info.ino = diri->ino();
decode(seq, p);
dout(10) << " stid is " << stid << ", seq is " << seq << dendl;
+ // FIXME: notify all other mds the change
+ if (stid > mds->snapclient->get_cached_version()) {
+ mds->snapclient->refresh(stid, new C_MDS_RetryRequest(mdcache, mdr));
+ return;
+ }
+
// journal
auto &pi = diri->project_inode(false, true);
pi.inode.version = diri->pre_dirty();
version_t stid = mdr->more()->stid;
dout(10) << " stid is " << stid << dendl;
+ // FIXME: notify all other mds the change
+ if (stid > mds->snapclient->get_cached_version()) {
+ mds->snapclient->refresh(stid, new C_MDS_RetryRequest(mdcache, mdr));
+ return;
+ }
+
// journal
auto &pi = diri->project_inode(false, true);
pi.inode.ctime = mdr->get_op_stamp();
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include "MDSMap.h"
+#include "MDSRank.h"
+#include "msg/Messenger.h"
+#include "messages/MMDSTableRequest.h"
+#include "SnapClient.h"
+
+#include "common/config.h"
+#include "include/assert.h"
+
+#define dout_context g_ceph_context
+#define dout_subsys ceph_subsys_mds
+#undef dout_prefix
+#define dout_prefix *_dout << "mds." << mds->get_nodeid() << ".snapclient "
+
+void SnapClient::resend_queries()
+{
+ if (!waiting_for_version.empty() || (!synced && sync_reqid > 0)) {
+ version_t want;
+ if (!waiting_for_version.empty())
+ want = MAX(cached_version, waiting_for_version.rbegin()->first);
+ else
+ want = MAX(cached_version, 1);
+ refresh(want, NULL);
+ if (!synced)
+ sync_reqid = last_reqid;
+ }
+}
+
+void SnapClient::handle_query_result(MMDSTableRequest *m)
+{
+ dout(10) << __func__ << " " << *m << dendl;
+
+ char type;
+ using ceph::decode;
+ bufferlist::iterator p = m->bl.begin();
+ decode(type, p);
+
+ switch (type) {
+ case 'U': // uptodate
+ assert(cached_version == m->get_tid());
+ break;
+ case 'F': // full
+ {
+ set<snapid_t> old_snaps;
+ if (cached_version > 0)
+ get_snaps(old_snaps);
+
+ decode(cached_snaps, p);
+ decode(cached_pending_update, p);
+ decode(cached_pending_destroy, p);
+ cached_version = m->get_tid();
+
+ // increase destroy_seq if any snapshot gets destroyed.
+ if (!old_snaps.empty() && old_snaps != filter(old_snaps))
+ destroy_seq++;
+ }
+ break;
+ default:
+ ceph_abort();
+ };
+
+ if (!committing_tids.empty()) {
+ for (auto p = committing_tids.begin();
+ p != committing_tids.end() && *p < cached_version; ) {
+ if (!cached_pending_update.count(*p) && !cached_pending_destroy.count(*p)) {
+ // pending update/destroy have been committed.
+ committing_tids.erase(p++);
+ } else {
+ ++p;
+ }
+ }
+ }
+
+ if (m->reqid >= sync_reqid)
+ synced = true;
+
+ if (synced && !waiting_for_version.empty()) {
+ std::list<MDSInternalContextBase*> finished;
+ for (auto p = waiting_for_version.begin();
+ p != waiting_for_version.end(); ) {
+ if (p->first > cached_version)
+ break;
+ finished.splice(finished.end(), p->second);
+ waiting_for_version.erase(p++);
+ }
+ if (!finished.empty())
+ mds->queue_waiters(finished);
+ }
+}
+
+void SnapClient::notify_commit(version_t tid)
+{
+ dout(10) << __func__ << " tid " << tid << dendl;
+
+ assert(cached_version == 0 || cached_version >= tid);
+ if (cached_version == 0) {
+ committing_tids.insert(tid);
+ } else if (cached_pending_update.count(tid)) {
+ committing_tids.insert(tid);
+ } else if (cached_pending_destroy.count(tid)) {
+ committing_tids.insert(tid);
+ destroy_seq++;
+ } else if (cached_version > tid) {
+ // no need to record the tid if it has already been committed.
+ } else {
+ ceph_abort();
+ }
+}
+
+void SnapClient::refresh(version_t want, MDSInternalContextBase *onfinish)
+{
+ dout(10) << __func__ << " want " << want << dendl;
+
+ assert(want >= cached_version);
+ if (onfinish)
+ waiting_for_version[want].push_back(onfinish);
+
+ if (!server_ready)
+ return;
+
+ mds_rank_t ts = mds->mdsmap->get_tableserver();
+ MMDSTableRequest *req = new MMDSTableRequest(table, TABLESERVER_OP_QUERY, ++last_reqid, 0);
+ using ceph::encode;
+ char op = 'F';
+ encode(op, req->bl);
+ encode(cached_version, req->bl);
+ mds->send_message_mds(req, ts);
+}
+
+void SnapClient::sync(MDSInternalContextBase *onfinish)
+{
+ dout(10) << __func__ << dendl;
+
+ refresh(MAX(cached_version, 1), onfinish);
+ synced = false;
+ if (server_ready)
+ sync_reqid = last_reqid;
+ else
+ sync_reqid = (last_reqid == ~0ULL) ? 1 : last_reqid + 1;
+}
+
+void SnapClient::get_snaps(set<snapid_t>& result) const
+{
+ assert(cached_version > 0);
+ for (auto& p : cached_snaps)
+ result.insert(p.first);
+
+ for (auto tid : committing_tids) {
+ auto q = cached_pending_update.find(tid);
+ if (q != cached_pending_update.end())
+ result.insert(q->second.snapid);
+
+ auto r = cached_pending_destroy.find(tid);
+ if (r != cached_pending_destroy.end())
+ result.erase(r->second.first);
+ }
+}
+
+set<snapid_t> SnapClient::filter(const set<snapid_t>& snaps) const
+{
+ assert(cached_version > 0);
+ if (snaps.empty())
+ return snaps;
+
+ set<snapid_t> result;
+
+ for (auto p : snaps) {
+ if (cached_snaps.count(p))
+ result.insert(p);
+ }
+
+ for (auto tid : committing_tids) {
+ auto q = cached_pending_update.find(tid);
+ if (q != cached_pending_update.end()) {
+ if (snaps.count(q->second.snapid))
+ result.insert(q->second.snapid);
+ }
+
+ auto r = cached_pending_destroy.find(tid);
+ if (r != cached_pending_destroy.end())
+ result.erase(r->second.first);
+ }
+
+ dout(10) << __func__ << " " << snaps << " -> " << result << dendl;
+ return result;
+}
+
+const SnapInfo* SnapClient::get_snap_info(snapid_t snapid) const
+{
+ assert(cached_version > 0);
+
+ const SnapInfo* result = NULL;
+ auto it = cached_snaps.find(snapid);
+ if (it != cached_snaps.end())
+ result = &it->second;
+
+ for (auto tid : committing_tids) {
+ auto q = cached_pending_update.find(tid);
+ if (q != cached_pending_update.end() && q->second.snapid == snapid) {
+ result = &q->second;
+ break;
+ }
+
+ auto r = cached_pending_destroy.find(tid);
+ if (r != cached_pending_destroy.end() && r->second.first == snapid) {
+ result = NULL;
+ break;
+ }
+ }
+
+ dout(10) << __func__ << " snapid " << snapid << " -> " << result << dendl;
+ return result;
+}
+
+void SnapClient::get_snap_infos(map<snapid_t, const SnapInfo*>& infomap,
+ const set<snapid_t>& snaps) const
+{
+ assert(cached_version > 0);
+
+ if (snaps.empty())
+ return;
+
+ map<snapid_t, const SnapInfo*> result;
+ for (auto p : snaps) {
+ auto it = cached_snaps.find(p);
+ if (it != cached_snaps.end())
+ result[p] = &it->second;
+ }
+
+ for (auto tid : committing_tids) {
+ auto q = cached_pending_update.find(tid);
+ if (q != cached_pending_update.end()) {
+ if (snaps.count(q->second.snapid))
+ result[q->second.snapid] = &q->second;
+ }
+
+ auto r = cached_pending_destroy.find(tid);
+ if (r != cached_pending_destroy.end())
+ result.erase(r->second.first);
+ }
+
+ infomap.insert(result.begin(), result.end());
+}
class LogSegment;
class SnapClient : public MDSTableClient {
+ version_t cached_version;
+ map<snapid_t, SnapInfo> cached_snaps;
+ map<version_t, SnapInfo> cached_pending_update;
+ map<version_t, pair<snapid_t,snapid_t> > cached_pending_destroy;
+
+ set<version_t> committing_tids;
+
+ map<version_t, std::list<MDSInternalContextBase*> > waiting_for_version;
+
+ uint64_t destroy_seq;
+
+ uint64_t sync_reqid;
+ bool synced;
public:
- explicit SnapClient(MDSRank *m) : MDSTableClient(m, TABLE_SNAP) {}
+ explicit SnapClient(MDSRank *m) :
+ MDSTableClient(m, TABLE_SNAP),
+ cached_version(0), destroy_seq(1), sync_reqid(0), synced(false) {}
- void resend_queries() override {}
- void handle_query_result(MMDSTableRequest *m) override {}
+ void resend_queries() override;
+ void handle_query_result(MMDSTableRequest *m) override;
+ void notify_commit(version_t tid) override;
void prepare_create(inodeno_t dirino, std::string_view name, utime_t stamp,
version_t *pstid, bufferlist *pbl, MDSInternalContextBase *onfinish) {
encode(stamp, bl);
_prepare(bl, pstid, NULL, onfinish);
}
+
+ version_t get_cached_version() const { return cached_version; }
+ uint64_t get_destroy_seq() const { return destroy_seq; }
+ void refresh(version_t want, MDSInternalContextBase *onfinish);
+
+ void sync(MDSInternalContextBase *onfinish);
+
+ bool is_synced() const { return synced; }
+ void wait_for_sync(MDSInternalContextBase *c) {
+ assert(!synced);
+ waiting_for_version[MAX(cached_version, 1)].push_back(c);
+ }
+
+ void get_snaps(set<snapid_t>& snaps) const;
+ set<snapid_t> filter(const set<snapid_t>& snaps) const;
+ const SnapInfo* get_snap_info(snapid_t snapid) const;
+ void get_snap_infos(map<snapid_t, const SnapInfo*>& infomap, const set<snapid_t>& snaps) const;
};
#endif
if (first_free > last_snap)
last_snap = first_free;
}
+ version++;
}
void SnapServer::handle_query(MMDSTableRequest *req)
{
- req->put();
-}
+ char op;
+ bufferlist::iterator p = req->bl.begin();
+ decode(op, p);
+ MMDSTableRequest *reply = new MMDSTableRequest(table, TABLESERVER_OP_QUERY_REPLY, req->reqid, version);
+ switch (op) {
+ case 'F': // full
+ version_t have_version;
+ decode(have_version, p);
+ assert(have_version <= version);
+ if (have_version == version) {
+ char type = 'U';
+ encode(type, reply->bl);
+ } else {
+ char type = 'F';
+ encode(type, reply->bl);
+ encode(snaps, reply->bl);
+ encode(pending_update, reply->bl);
+ encode(pending_destroy, reply->bl);
+ }
+ // FIXME: implement incremental change
+ break;
+ default:
+ ceph_abort();
+ };
+
+ mds->send_message(reply, req->get_connection());
+ req->put();
+}
void SnapServer::check_osd_map(bool force)
{