From 62a66f85fb14e9c642776eeda1fd3d20aa02abea Mon Sep 17 00:00:00 2001 From: "Yan, Zheng" Date: Thu, 20 Dec 2018 21:09:31 +0800 Subject: [PATCH] client: split large reconnect into multiple messages Signed-off-by: "Yan, Zheng" --- src/client/Client.cc | 26 ++++++++++++++++--- src/mds/Server.cc | 34 +++++++++++++++---------- src/mds/cephfs_features.h | 2 ++ src/messages/MClientReconnect.h | 45 ++++++++++++++++++++++++++++++--- 4 files changed, 85 insertions(+), 22 deletions(-) diff --git a/src/client/Client.cc b/src/client/Client.cc index 713af0fa7f184..9b2c298025c66 100644 --- a/src/client/Client.cc +++ b/src/client/Client.cc @@ -2781,7 +2781,10 @@ void Client::send_reconnect(MetaSession *session) //make sure unsafe requests get saved resend_unsafe_requests(session); + early_kick_flushing_caps(session); + auto m = MClientReconnect::create(); + bool allow_multi = session->mds_features.test(CEPHFS_FEATURE_MULTI_RECONNECT); // i have an open session. ceph::unordered_set did_snaprealm; @@ -2791,6 +2794,14 @@ void Client::send_reconnect(MetaSession *session) Inode *in = p->second; auto it = in->caps.find(mds); if (it != in->caps.end()) { + if (allow_multi && + m->get_approx_size() >= (std::numeric_limits::max() >> 1)) { + m->mark_more(); + session->con->send_message2(std::move(m)); + + m = MClientReconnect::create(); + } + Cap &cap = it->second; ldout(cct, 10) << " caps on " << p->first << " " << ccap_string(cap.issued) @@ -2834,9 +2845,8 @@ void Client::send_reconnect(MetaSession *session) } } - early_kick_flushing_caps(session); - - m->set_encoding_version(0); // use connection features to choose encoding + if (!allow_multi) + m->set_encoding_version(0); // use connection features to choose encoding session->con->send_message2(std::move(m)); mount_cond.Signal(); @@ -4463,7 +4473,8 @@ void Client::early_kick_flushing_caps(MetaSession *session) for (xlist::iterator p = session->flushing_caps.begin(); !p.end(); ++p) { Inode *in = *p; - ceph_assert(in->auth_cap); + Cap *cap = in->auth_cap; + ceph_assert(cap); // if flushing caps were revoked, we re-send the cap flush in client reconnect // stage. This guarantees that MDS processes the cap flush message before issuing @@ -4476,6 +4487,13 @@ void Client::early_kick_flushing_caps(MetaSession *session) session->early_flushing_caps.insert(in); + // send_reconnect() also will reset these sequence numbers. make sure + // sequence numbers in cap flush message match later reconnect message. + cap->seq = 0; + cap->issue_seq = 0; + cap->mseq = 0; + cap->issued = cap->implemented; + if (in->cap_snaps.size()) flush_snaps(in, true); if (in->flushing_caps) diff --git a/src/mds/Server.cc b/src/mds/Server.cc index dad962be33e19..186fbcf873b60 100644 --- a/src/mds/Server.cc +++ b/src/mds/Server.cc @@ -1198,7 +1198,8 @@ void Server::reconnect_clients(MDSInternalContext *reconnect_done_) void Server::handle_client_reconnect(const MClientReconnect::const_ref &m) { - dout(7) << "handle_client_reconnect " << m->get_source() << dendl; + dout(7) << "handle_client_reconnect " << m->get_source() + << (m->has_more() ? " (more)" : "") << dendl; client_t from = m->get_source().num(); Session *session = mds->get_session(m); ceph_assert(session); @@ -1249,21 +1250,23 @@ void Server::handle_client_reconnect(const MClientReconnect::const_ref &m) } if (deny) { - auto m = MClientSession::create(CEPH_SESSION_CLOSE); - mds->send_message_client(m, session); + auto r = MClientSession::create(CEPH_SESSION_CLOSE); + mds->send_message_client(r, session); if (session->is_open()) kill_session(session, nullptr); return; } - // notify client of success with an OPEN - auto reply = MClientSession::create(CEPH_SESSION_OPEN); - if (session->info.has_feature(CEPHFS_FEATURE_MIMIC)) - reply->supported_features = supported_features; - mds->send_message_client(reply, session); + if (!m->has_more()) { + // notify client of success with an OPEN + auto reply = MClientSession::create(CEPH_SESSION_OPEN); + if (session->info.has_feature(CEPHFS_FEATURE_MIMIC)) + reply->supported_features = supported_features; + mds->send_message_client(reply, session); + mds->clog->debug() << "reconnect by " << session->info.inst << " after " << delay; + } session->last_cap_renew = clock::now(); - mds->clog->debug() << "reconnect by " << session->info.inst << " after " << delay; // snaprealms for (const auto &r : m->realms) { @@ -1316,14 +1319,17 @@ void Server::handle_client_reconnect(const MClientReconnect::const_ref &m) mdcache->rejoin_recovered_caps(p.first, from, p.second, MDS_RANK_NONE); } } - mdcache->rejoin_recovered_client(session->get_client(), session->info.inst); reconnect_last_seen = clock::now(); - // remove from gather set - client_reconnect_gather.erase(from); - if (client_reconnect_gather.empty()) - reconnect_gather_finish(); + if (!m->has_more()) { + mdcache->rejoin_recovered_client(session->get_client(), session->info.inst); + + // remove from gather set + client_reconnect_gather.erase(from); + if (client_reconnect_gather.empty()) + reconnect_gather_finish(); + } } void Server::infer_supported_features(Session *session, client_metadata_t& client_metadata) diff --git a/src/mds/cephfs_features.h b/src/mds/cephfs_features.h index 06922cd65cfa3..5a0a8599c4c2d 100644 --- a/src/mds/cephfs_features.h +++ b/src/mds/cephfs_features.h @@ -26,6 +26,7 @@ #define CEPHFS_FEATURE_REPLY_ENCODING 9 #define CEPHFS_FEATURE_RECLAIM_CLIENT 10 #define CEPHFS_FEATURE_LAZY_CAP_WANTED 11 +#define CEPHFS_FEATURE_MULTI_RECONNECT 12 #define CEPHFS_FEATURES_ALL { \ 0, 1, 2, 3, 4, \ @@ -36,6 +37,7 @@ CEPHFS_FEATURE_REPLY_ENCODING, \ CEPHFS_FEATURE_RECLAIM_CLIENT, \ CEPHFS_FEATURE_LAZY_CAP_WANTED, \ + CEPHFS_FEATURE_MULTI_RECONNECT, \ } #define CEPHFS_FEATURES_MDS_SUPPORTED CEPHFS_FEATURES_ALL diff --git a/src/messages/MClientReconnect.h b/src/messages/MClientReconnect.h index 0e100c1e9b682..ba3b0fb2cdac3 100644 --- a/src/messages/MClientReconnect.h +++ b/src/messages/MClientReconnect.h @@ -24,23 +24,46 @@ class MClientReconnect : public MessageInstance { public: friend factory; private: - static constexpr int HEAD_VERSION = 4; + static constexpr int HEAD_VERSION = 5; static constexpr int COMPAT_VERSION = 4; public: - map caps; // only head inodes + map caps; // only head inodes vector realms; + bool more = false; MClientReconnect() : - MessageInstance(CEPH_MSG_CLIENT_RECONNECT, HEAD_VERSION, COMPAT_VERSION) { } + MessageInstance(CEPH_MSG_CLIENT_RECONNECT, HEAD_VERSION, COMPAT_VERSION) {} private: ~MClientReconnect() override {} + size_t cap_size = 0; + size_t realm_size = 0; + size_t approx_size = sizeof(__u32) + sizeof(__u32) + 1; + + void calc_item_size() { + using ceph::encode; + { + bufferlist bl; + inodeno_t ino; + cap_reconnect_t cr; + encode(ino, bl); + encode(cr, bl); + cap_size = bl.length(); + } + { + bufferlist bl; + snaprealm_reconnect_t sr; + encode(sr, bl); + realm_size = bl.length(); + } + } + public: std::string_view get_type_name() const override { return "client_reconnect"; } void print(ostream& out) const override { out << "client_reconnect(" - << caps.size() << " caps)"; + << caps.size() << " caps " << realms.size() << " realms )"; } // Force to use old encoding. @@ -50,11 +73,19 @@ public: if (v <= 3) header.compat_version = 0; } + size_t get_approx_size() { + return approx_size; + } + void mark_more() { more = true; } + bool has_more() const { return more; } void add_cap(inodeno_t ino, uint64_t cap_id, inodeno_t pathbase, const string& path, int wanted, int issued, inodeno_t sr, snapid_t sf, bufferlist& lb) { caps[ino] = cap_reconnect_t(cap_id, pathbase, path, wanted, issued, sr, sf, lb); + if (!cap_size) + calc_item_size(); + approx_size += cap_size + path.length() + lb.length(); } void add_snaprealm(inodeno_t ino, snapid_t seq, inodeno_t parent) { snaprealm_reconnect_t r; @@ -62,6 +93,9 @@ public: r.realm.seq = seq; r.realm.parent = parent; realms.push_back(r); + if (!realm_size) + calc_item_size(); + approx_size += realm_size; } void encode_payload(uint64_t features) override { @@ -80,6 +114,7 @@ public: if (header.version >= 4) { encode(caps, data); encode(realms, data); + encode(more, data); } else { // compat crap if (header.version == 3) { @@ -107,6 +142,8 @@ public: if (header.version >= 4) { decode(caps, p); decode(realms, p); + if (header.version >= 5) + decode(more, p); } else { // compat crap if (header.version == 3) { -- 2.39.5