From: Yan, Zheng Date: Mon, 17 Jul 2017 08:26:14 +0000 (+0800) Subject: mds: cache snaptable in snapclient X-Git-Tag: v13.1.0~413^2~38 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=ca1126fdb0de88be82b295985d0e61bf9127852e;p=ceph.git mds: cache snaptable in snapclient The idea is caching both snap infos and pending updates in snapclient. The snapclient also tracks updates that are being committed, it applies these commits to its cached snap infos. Steps to update snaptable are: - mds.x acquire locks (xlock on snaplock of affected snaprealm inode) - mds.x prepares snaptable update. (send preare to snapserver and waits for 'agree' reply) - snapserver sends notification about the update to all mds and waits for ACKs. (not implemented by this patch) - snapserver send 'agree' reply to mds.x - mds.x journals corresponding - mds.x commits the snaptable update and notifies all mds that it commits that update. then mds drops locks. When receiving committing notification, mds applies the committing update to its cached snap infos. By this way, cached snap infos get synchronized before snaplock become readable. Signed-off-by: "Yan, Zheng" --- diff --git a/src/mds/CMakeLists.txt b/src/mds/CMakeLists.txt index f9e71ae9675..6aea92c8221 100644 --- a/src/mds/CMakeLists.txt +++ b/src/mds/CMakeLists.txt @@ -29,6 +29,7 @@ set(mds_srcs SimpleLock.cc SnapRealm.cc SnapServer.cc + SnapClient.cc snap.cc SessionMap.cc MDSContext.cc diff --git a/src/mds/MDSRank.cc b/src/mds/MDSRank.cc index f4766911182..e63061b2e82 100644 --- a/src/mds/MDSRank.cc +++ b/src/mds/MDSRank.cc @@ -1181,6 +1181,9 @@ void MDSRank::starting_done() mdlog->start_new_segment(); }))); } + + // sync snaptable cache + snapclient->sync(new C_MDSInternalNoop); } @@ -1338,6 +1341,8 @@ void MDSRank::replay_done() mdsmap->get_num_failed_mds() == 0) { // just me! dout(2) << "i am alone, moving to state reconnect" << dendl; request_state(MDSMap::STATE_RECONNECT); + // sync snaptable cache + snapclient->sync(new C_MDSInternalNoop); } else { dout(2) << "i am not alone, moving to state resolve" << dendl; request_state(MDSMap::STATE_RESOLVE); @@ -1350,7 +1355,6 @@ void MDSRank::reopen_log() mdcache->rollback_uncommitted_fragments(); } - void MDSRank::resolve_start() { dout(1) << "resolve_start" << dendl; @@ -1360,10 +1364,13 @@ void MDSRank::resolve_start() mdcache->resolve_start(new C_MDS_VoidFn(this, &MDSRank::resolve_done)); finish_contexts(g_ceph_context, waiting_for_resolve); } + void MDSRank::resolve_done() { dout(1) << "resolve_done" << dendl; request_state(MDSMap::STATE_RECONNECT); + // sync snaptable cache + snapclient->sync(new C_MDSInternalNoop); } void MDSRank::reconnect_start() @@ -1501,6 +1508,8 @@ void MDSRank::creating_done() { dout(1)<< "creating_done" << dendl; request_state(MDSMap::STATE_ACTIVE); + // sync snaptable cache + snapclient->sync(new C_MDSInternalNoop); } void MDSRank::boot_create() diff --git a/src/mds/MDSTable.cc b/src/mds/MDSTable.cc index bf1e04ef371..be12dbfc0d6 100644 --- a/src/mds/MDSTable.cc +++ b/src/mds/MDSTable.cc @@ -114,6 +114,7 @@ void MDSTable::save_2(int r, version_t v) void MDSTable::reset() { reset_state(); + projected_version = version; state = STATE_ACTIVE; } diff --git a/src/mds/MDSTableClient.cc b/src/mds/MDSTableClient.cc index 7d0353e61d2..7d5c03a36d0 100644 --- a/src/mds/MDSTableClient.cc +++ b/src/mds/MDSTableClient.cc @@ -187,6 +187,8 @@ void MDSTableClient::commit(version_t tid, LogSegment *ls) pending_commit[tid] = ls; ls->pending_commit_tids[table].insert(tid); + notify_commit(tid); + assert(g_conf->mds_kill_mdstable_at != 4); if (server_ready) { @@ -206,6 +208,8 @@ void MDSTableClient::got_journaled_agree(version_t tid, LogSegment *ls) dout(10) << "got_journaled_agree " << tid << dendl; ls->pending_commit_tids[table].insert(tid); pending_commit[tid] = ls; + + notify_commit(tid); } void MDSTableClient::got_journaled_ack(version_t tid) diff --git a/src/mds/MDSTableClient.h b/src/mds/MDSTableClient.h index ce15c54d874..0bd4f78018a 100644 --- a/src/mds/MDSTableClient.h +++ b/src/mds/MDSTableClient.h @@ -85,6 +85,7 @@ public: // child must implement virtual void resend_queries() = 0; virtual void handle_query_result(MMDSTableRequest *m) = 0; + virtual void notify_commit(version_t tid) = 0; // and friendly front-end for _prepare. diff --git a/src/mds/Server.cc b/src/mds/Server.cc index 70f7bd0ad30..1a94ca4e578 100644 --- a/src/mds/Server.cc +++ b/src/mds/Server.cc @@ -919,6 +919,13 @@ void Server::handle_client_reconnect(MClientReconnect *m) return; } + // opening snaprealm past parents needs to use snaptable + if (!mds->snapclient->is_synced()) { + dout(10) << " snaptable isn't synced, waiting" << dendl; + mds->snapclient->wait_for_sync(new C_MDS_RetryMessage(mds, m)); + return; + } + // notify client of success with an OPEN m->get_connection()->send_message(new MClientSession(CEPH_SESSION_OPEN)); session->last_cap_renew = ceph_clock_now(); @@ -997,7 +1004,15 @@ void Server::reconnect_gather_finish() { dout(7) << "reconnect_gather_finish. failed on " << failed_reconnects << " clients" << dendl; assert(reconnect_done); - reconnect_done->complete(0); + + if (!mds->snapclient->is_synced()) { + // make sure snaptable cache is populated. snaprealms will be + // extensively used in rejoin stage. + dout(7) << " snaptable cache isn't synced, delaying state transition" << dendl; + mds->snapclient->wait_for_sync(reconnect_done); + } else { + reconnect_done->complete(0); + } reconnect_done = NULL; } @@ -8669,6 +8684,12 @@ void Server::handle_client_mksnap(MDRequestRef& mdr) decode(snapid, p); dout(10) << " stid " << stid << " snapid " << snapid << dendl; + // FIXME: notify all other mds the change + if (stid > mds->snapclient->get_cached_version()) { + mds->snapclient->refresh(stid, new C_MDS_RetryRequest(mdcache, mdr)); + return; + } + // journal SnapInfo info; info.ino = diri->ino(); @@ -8806,6 +8827,12 @@ void Server::handle_client_rmsnap(MDRequestRef& mdr) decode(seq, p); dout(10) << " stid is " << stid << ", seq is " << seq << dendl; + // FIXME: notify all other mds the change + if (stid > mds->snapclient->get_cached_version()) { + mds->snapclient->refresh(stid, new C_MDS_RetryRequest(mdcache, mdr)); + return; + } + // journal auto &pi = diri->project_inode(false, true); pi.inode.version = diri->pre_dirty(); @@ -8946,6 +8973,12 @@ void Server::handle_client_renamesnap(MDRequestRef& mdr) version_t stid = mdr->more()->stid; dout(10) << " stid is " << stid << dendl; + // FIXME: notify all other mds the change + if (stid > mds->snapclient->get_cached_version()) { + mds->snapclient->refresh(stid, new C_MDS_RetryRequest(mdcache, mdr)); + return; + } + // journal auto &pi = diri->project_inode(false, true); pi.inode.ctime = mdr->get_op_stamp(); diff --git a/src/mds/SnapClient.cc b/src/mds/SnapClient.cc new file mode 100644 index 00000000000..51bc29e886c --- /dev/null +++ b/src/mds/SnapClient.cc @@ -0,0 +1,257 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2004-2006 Sage Weil + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include "MDSMap.h" +#include "MDSRank.h" +#include "msg/Messenger.h" +#include "messages/MMDSTableRequest.h" +#include "SnapClient.h" + +#include "common/config.h" +#include "include/assert.h" + +#define dout_context g_ceph_context +#define dout_subsys ceph_subsys_mds +#undef dout_prefix +#define dout_prefix *_dout << "mds." << mds->get_nodeid() << ".snapclient " + +void SnapClient::resend_queries() +{ + if (!waiting_for_version.empty() || (!synced && sync_reqid > 0)) { + version_t want; + if (!waiting_for_version.empty()) + want = MAX(cached_version, waiting_for_version.rbegin()->first); + else + want = MAX(cached_version, 1); + refresh(want, NULL); + if (!synced) + sync_reqid = last_reqid; + } +} + +void SnapClient::handle_query_result(MMDSTableRequest *m) +{ + dout(10) << __func__ << " " << *m << dendl; + + char type; + using ceph::decode; + bufferlist::iterator p = m->bl.begin(); + decode(type, p); + + switch (type) { + case 'U': // uptodate + assert(cached_version == m->get_tid()); + break; + case 'F': // full + { + set old_snaps; + if (cached_version > 0) + get_snaps(old_snaps); + + decode(cached_snaps, p); + decode(cached_pending_update, p); + decode(cached_pending_destroy, p); + cached_version = m->get_tid(); + + // increase destroy_seq if any snapshot gets destroyed. + if (!old_snaps.empty() && old_snaps != filter(old_snaps)) + destroy_seq++; + } + break; + default: + ceph_abort(); + }; + + if (!committing_tids.empty()) { + for (auto p = committing_tids.begin(); + p != committing_tids.end() && *p < cached_version; ) { + if (!cached_pending_update.count(*p) && !cached_pending_destroy.count(*p)) { + // pending update/destroy have been committed. + committing_tids.erase(p++); + } else { + ++p; + } + } + } + + if (m->reqid >= sync_reqid) + synced = true; + + if (synced && !waiting_for_version.empty()) { + std::list finished; + for (auto p = waiting_for_version.begin(); + p != waiting_for_version.end(); ) { + if (p->first > cached_version) + break; + finished.splice(finished.end(), p->second); + waiting_for_version.erase(p++); + } + if (!finished.empty()) + mds->queue_waiters(finished); + } +} + +void SnapClient::notify_commit(version_t tid) +{ + dout(10) << __func__ << " tid " << tid << dendl; + + assert(cached_version == 0 || cached_version >= tid); + if (cached_version == 0) { + committing_tids.insert(tid); + } else if (cached_pending_update.count(tid)) { + committing_tids.insert(tid); + } else if (cached_pending_destroy.count(tid)) { + committing_tids.insert(tid); + destroy_seq++; + } else if (cached_version > tid) { + // no need to record the tid if it has already been committed. + } else { + ceph_abort(); + } +} + +void SnapClient::refresh(version_t want, MDSInternalContextBase *onfinish) +{ + dout(10) << __func__ << " want " << want << dendl; + + assert(want >= cached_version); + if (onfinish) + waiting_for_version[want].push_back(onfinish); + + if (!server_ready) + return; + + mds_rank_t ts = mds->mdsmap->get_tableserver(); + MMDSTableRequest *req = new MMDSTableRequest(table, TABLESERVER_OP_QUERY, ++last_reqid, 0); + using ceph::encode; + char op = 'F'; + encode(op, req->bl); + encode(cached_version, req->bl); + mds->send_message_mds(req, ts); +} + +void SnapClient::sync(MDSInternalContextBase *onfinish) +{ + dout(10) << __func__ << dendl; + + refresh(MAX(cached_version, 1), onfinish); + synced = false; + if (server_ready) + sync_reqid = last_reqid; + else + sync_reqid = (last_reqid == ~0ULL) ? 1 : last_reqid + 1; +} + +void SnapClient::get_snaps(set& result) const +{ + assert(cached_version > 0); + for (auto& p : cached_snaps) + result.insert(p.first); + + for (auto tid : committing_tids) { + auto q = cached_pending_update.find(tid); + if (q != cached_pending_update.end()) + result.insert(q->second.snapid); + + auto r = cached_pending_destroy.find(tid); + if (r != cached_pending_destroy.end()) + result.erase(r->second.first); + } +} + +set SnapClient::filter(const set& snaps) const +{ + assert(cached_version > 0); + if (snaps.empty()) + return snaps; + + set result; + + for (auto p : snaps) { + if (cached_snaps.count(p)) + result.insert(p); + } + + for (auto tid : committing_tids) { + auto q = cached_pending_update.find(tid); + if (q != cached_pending_update.end()) { + if (snaps.count(q->second.snapid)) + result.insert(q->second.snapid); + } + + auto r = cached_pending_destroy.find(tid); + if (r != cached_pending_destroy.end()) + result.erase(r->second.first); + } + + dout(10) << __func__ << " " << snaps << " -> " << result << dendl; + return result; +} + +const SnapInfo* SnapClient::get_snap_info(snapid_t snapid) const +{ + assert(cached_version > 0); + + const SnapInfo* result = NULL; + auto it = cached_snaps.find(snapid); + if (it != cached_snaps.end()) + result = &it->second; + + for (auto tid : committing_tids) { + auto q = cached_pending_update.find(tid); + if (q != cached_pending_update.end() && q->second.snapid == snapid) { + result = &q->second; + break; + } + + auto r = cached_pending_destroy.find(tid); + if (r != cached_pending_destroy.end() && r->second.first == snapid) { + result = NULL; + break; + } + } + + dout(10) << __func__ << " snapid " << snapid << " -> " << result << dendl; + return result; +} + +void SnapClient::get_snap_infos(map& infomap, + const set& snaps) const +{ + assert(cached_version > 0); + + if (snaps.empty()) + return; + + map result; + for (auto p : snaps) { + auto it = cached_snaps.find(p); + if (it != cached_snaps.end()) + result[p] = &it->second; + } + + for (auto tid : committing_tids) { + auto q = cached_pending_update.find(tid); + if (q != cached_pending_update.end()) { + if (snaps.count(q->second.snapid)) + result[q->second.snapid] = &q->second; + } + + auto r = cached_pending_destroy.find(tid); + if (r != cached_pending_destroy.end()) + result.erase(r->second.first); + } + + infomap.insert(result.begin(), result.end()); +} diff --git a/src/mds/SnapClient.h b/src/mds/SnapClient.h index 4cda3956ab5..6182493ec40 100644 --- a/src/mds/SnapClient.h +++ b/src/mds/SnapClient.h @@ -25,11 +25,27 @@ class MDSRank; class LogSegment; class SnapClient : public MDSTableClient { + version_t cached_version; + map cached_snaps; + map cached_pending_update; + map > cached_pending_destroy; + + set committing_tids; + + map > waiting_for_version; + + uint64_t destroy_seq; + + uint64_t sync_reqid; + bool synced; public: - explicit SnapClient(MDSRank *m) : MDSTableClient(m, TABLE_SNAP) {} + explicit SnapClient(MDSRank *m) : + MDSTableClient(m, TABLE_SNAP), + cached_version(0), destroy_seq(1), sync_reqid(0), synced(false) {} - void resend_queries() override {} - void handle_query_result(MMDSTableRequest *m) override {} + void resend_queries() override; + void handle_query_result(MMDSTableRequest *m) override; + void notify_commit(version_t tid) override; void prepare_create(inodeno_t dirino, std::string_view name, utime_t stamp, version_t *pstid, bufferlist *pbl, MDSInternalContextBase *onfinish) { @@ -70,6 +86,23 @@ public: encode(stamp, bl); _prepare(bl, pstid, NULL, onfinish); } + + version_t get_cached_version() const { return cached_version; } + uint64_t get_destroy_seq() const { return destroy_seq; } + void refresh(version_t want, MDSInternalContextBase *onfinish); + + void sync(MDSInternalContextBase *onfinish); + + bool is_synced() const { return synced; } + void wait_for_sync(MDSInternalContextBase *c) { + assert(!synced); + waiting_for_version[MAX(cached_version, 1)].push_back(c); + } + + void get_snaps(set& snaps) const; + set filter(const set& snaps) const; + const SnapInfo* get_snap_info(snapid_t snapid) const; + void get_snap_infos(map& infomap, const set& snaps) const; }; #endif diff --git a/src/mds/SnapServer.cc b/src/mds/SnapServer.cc index 1f36546f1f5..79e58d91b62 100644 --- a/src/mds/SnapServer.cc +++ b/src/mds/SnapServer.cc @@ -58,6 +58,7 @@ void SnapServer::reset_state() if (first_free > last_snap) last_snap = first_free; } + version++; } @@ -247,10 +248,36 @@ void SnapServer::_server_update(bufferlist& bl) void SnapServer::handle_query(MMDSTableRequest *req) { - req->put(); -} + char op; + bufferlist::iterator p = req->bl.begin(); + decode(op, p); + MMDSTableRequest *reply = new MMDSTableRequest(table, TABLESERVER_OP_QUERY_REPLY, req->reqid, version); + switch (op) { + case 'F': // full + version_t have_version; + decode(have_version, p); + assert(have_version <= version); + if (have_version == version) { + char type = 'U'; + encode(type, reply->bl); + } else { + char type = 'F'; + encode(type, reply->bl); + encode(snaps, reply->bl); + encode(pending_update, reply->bl); + encode(pending_destroy, reply->bl); + } + // FIXME: implement incremental change + break; + default: + ceph_abort(); + }; + + mds->send_message(reply, req->get_connection()); + req->put(); +} void SnapServer::check_osd_map(bool force) {