]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
client: split large reconnect into multiple messages
authorYan, Zheng <zyan@redhat.com>
Thu, 20 Dec 2018 13:09:31 +0000 (21:09 +0800)
committerYan, Zheng <zyan@redhat.com>
Thu, 14 Feb 2019 05:59:26 +0000 (13:59 +0800)
Signed-off-by: "Yan, Zheng" <zyan@redhat.com>
src/client/Client.cc
src/mds/Server.cc
src/mds/cephfs_features.h
src/messages/MClientReconnect.h

index 713af0fa7f1845b5326ff4c2c76304f550654461..9b2c298025c66116e21664eb6365a9787084f42c 100644 (file)
@@ -2781,7 +2781,10 @@ void Client::send_reconnect(MetaSession *session)
   //make sure unsafe requests get saved
   resend_unsafe_requests(session);
 
+  early_kick_flushing_caps(session);
+
   auto m = MClientReconnect::create();
+  bool allow_multi = session->mds_features.test(CEPHFS_FEATURE_MULTI_RECONNECT);
 
   // i have an open session.
   ceph::unordered_set<inodeno_t> did_snaprealm;
@@ -2791,6 +2794,14 @@ void Client::send_reconnect(MetaSession *session)
     Inode *in = p->second;
     auto it = in->caps.find(mds);
     if (it != in->caps.end()) {
+      if (allow_multi &&
+         m->get_approx_size() >= (std::numeric_limits<int>::max() >> 1)) {
+       m->mark_more();
+       session->con->send_message2(std::move(m));
+
+       m = MClientReconnect::create();
+      }
+
       Cap &cap = it->second;
       ldout(cct, 10) << " caps on " << p->first
               << " " << ccap_string(cap.issued)
@@ -2834,9 +2845,8 @@ void Client::send_reconnect(MetaSession *session)
     }
   }
 
-  early_kick_flushing_caps(session);
-
-  m->set_encoding_version(0); // use connection features to choose encoding
+  if (!allow_multi)
+    m->set_encoding_version(0); // use connection features to choose encoding
   session->con->send_message2(std::move(m));
 
   mount_cond.Signal();
@@ -4463,7 +4473,8 @@ void Client::early_kick_flushing_caps(MetaSession *session)
 
   for (xlist<Inode*>::iterator p = session->flushing_caps.begin(); !p.end(); ++p) {
     Inode *in = *p;
-    ceph_assert(in->auth_cap);
+    Cap *cap = in->auth_cap;
+    ceph_assert(cap);
 
     // if flushing caps were revoked, we re-send the cap flush in client reconnect
     // stage. This guarantees that MDS processes the cap flush message before issuing
@@ -4476,6 +4487,13 @@ void Client::early_kick_flushing_caps(MetaSession *session)
 
     session->early_flushing_caps.insert(in);
 
+    // send_reconnect() also will reset these sequence numbers. make sure
+    // sequence numbers in cap flush message match later reconnect message.
+    cap->seq = 0;
+    cap->issue_seq = 0;
+    cap->mseq = 0;
+    cap->issued = cap->implemented;
+
     if (in->cap_snaps.size())
       flush_snaps(in, true);
     if (in->flushing_caps)
index dad962be33e197ded3dc981467295fb9e082a903..186fbcf873b60614c86a93c13eff42d05104d1b5 100644 (file)
@@ -1198,7 +1198,8 @@ void Server::reconnect_clients(MDSInternalContext *reconnect_done_)
 
 void Server::handle_client_reconnect(const MClientReconnect::const_ref &m)
 {
-  dout(7) << "handle_client_reconnect " << m->get_source() << dendl;
+  dout(7) << "handle_client_reconnect " << m->get_source()
+         << (m->has_more() ? " (more)" : "") << dendl;
   client_t from = m->get_source().num();
   Session *session = mds->get_session(m);
   ceph_assert(session);
@@ -1249,21 +1250,23 @@ void Server::handle_client_reconnect(const MClientReconnect::const_ref &m)
   }
 
   if (deny) {
-    auto m = MClientSession::create(CEPH_SESSION_CLOSE);
-    mds->send_message_client(m, session);
+    auto r = MClientSession::create(CEPH_SESSION_CLOSE);
+    mds->send_message_client(r, session);
     if (session->is_open())
       kill_session(session, nullptr);
     return;
   }
 
-  // notify client of success with an OPEN
-  auto reply = MClientSession::create(CEPH_SESSION_OPEN);
-  if (session->info.has_feature(CEPHFS_FEATURE_MIMIC))
-    reply->supported_features = supported_features;
-  mds->send_message_client(reply, session);
+  if (!m->has_more()) {
+    // notify client of success with an OPEN
+    auto reply = MClientSession::create(CEPH_SESSION_OPEN);
+    if (session->info.has_feature(CEPHFS_FEATURE_MIMIC))
+      reply->supported_features = supported_features;
+    mds->send_message_client(reply, session);
+    mds->clog->debug() << "reconnect by " << session->info.inst << " after " << delay;
+  }
 
   session->last_cap_renew = clock::now();
-  mds->clog->debug() << "reconnect by " << session->info.inst << " after " << delay;
   
   // snaprealms
   for (const auto &r : m->realms) {
@@ -1316,14 +1319,17 @@ void Server::handle_client_reconnect(const MClientReconnect::const_ref &m)
       mdcache->rejoin_recovered_caps(p.first, from, p.second, MDS_RANK_NONE);
     }
   }
-  mdcache->rejoin_recovered_client(session->get_client(), session->info.inst);
 
   reconnect_last_seen = clock::now();
 
-  // remove from gather set
-  client_reconnect_gather.erase(from);
-  if (client_reconnect_gather.empty())
-    reconnect_gather_finish();
+  if (!m->has_more()) {
+    mdcache->rejoin_recovered_client(session->get_client(), session->info.inst);
+
+    // remove from gather set
+    client_reconnect_gather.erase(from);
+    if (client_reconnect_gather.empty())
+      reconnect_gather_finish();
+  }
 }
 
 void Server::infer_supported_features(Session *session, client_metadata_t& client_metadata)
index 06922cd65cfa3a28c76664aee29c93c602594f8b..5a0a8599c4c2dd3296e4c60394b9e80c64ae3f08 100644 (file)
@@ -26,6 +26,7 @@
 #define CEPHFS_FEATURE_REPLY_ENCODING   9
 #define CEPHFS_FEATURE_RECLAIM_CLIENT  10
 #define CEPHFS_FEATURE_LAZY_CAP_WANTED  11
+#define CEPHFS_FEATURE_MULTI_RECONNECT  12
 
 #define CEPHFS_FEATURES_ALL {          \
   0, 1, 2, 3, 4,                       \
@@ -36,6 +37,7 @@
   CEPHFS_FEATURE_REPLY_ENCODING,        \
   CEPHFS_FEATURE_RECLAIM_CLIENT,       \
   CEPHFS_FEATURE_LAZY_CAP_WANTED,      \
+  CEPHFS_FEATURE_MULTI_RECONNECT,      \
 }
 
 #define CEPHFS_FEATURES_MDS_SUPPORTED CEPHFS_FEATURES_ALL
index 0e100c1e9b682387fc3a59cb7d2b2b9b2fdb11e3..ba3b0fb2cdac385fdc07276f1baebea0ce3e4238 100644 (file)
@@ -24,23 +24,46 @@ class MClientReconnect : public MessageInstance<MClientReconnect> {
 public:
   friend factory;
 private:
-  static constexpr int HEAD_VERSION = 4;
+  static constexpr int HEAD_VERSION = 5;
   static constexpr int COMPAT_VERSION = 4;
 
 public:
-  map<inodeno_t, cap_reconnect_t>  caps;   // only head inodes
+  map<inodeno_t, cap_reconnect_t> caps; // only head inodes
   vector<snaprealm_reconnect_t> realms;
+  bool more = false;
 
   MClientReconnect() :
-    MessageInstance(CEPH_MSG_CLIENT_RECONNECT, HEAD_VERSION, COMPAT_VERSION) { }
+    MessageInstance(CEPH_MSG_CLIENT_RECONNECT, HEAD_VERSION, COMPAT_VERSION) {}
 private:
   ~MClientReconnect() override {}
 
+  size_t cap_size = 0;
+  size_t realm_size = 0;
+  size_t approx_size = sizeof(__u32) + sizeof(__u32) + 1;
+
+  void calc_item_size() {
+    using ceph::encode;
+    {
+      bufferlist bl;
+      inodeno_t ino;
+      cap_reconnect_t cr;
+      encode(ino, bl);
+      encode(cr, bl);
+      cap_size = bl.length();
+    }
+    {
+      bufferlist bl;
+      snaprealm_reconnect_t sr;
+      encode(sr, bl);
+      realm_size = bl.length();
+    }
+  }
+
 public:
   std::string_view get_type_name() const override { return "client_reconnect"; }
   void print(ostream& out) const override {
     out << "client_reconnect("
-       << caps.size() << " caps)";
+       << caps.size() << " caps " << realms.size() << " realms )";
   }
 
   // Force to use old encoding.
@@ -50,11 +73,19 @@ public:
     if (v <= 3)
       header.compat_version = 0;
   }
+  size_t get_approx_size() {
+    return approx_size;
+  }
+  void mark_more() { more = true; }
+  bool has_more() const { return more; }
 
   void add_cap(inodeno_t ino, uint64_t cap_id, inodeno_t pathbase, const string& path,
               int wanted, int issued, inodeno_t sr, snapid_t sf, bufferlist& lb)
   {
     caps[ino] = cap_reconnect_t(cap_id, pathbase, path, wanted, issued, sr, sf, lb);
+    if (!cap_size)
+      calc_item_size();
+    approx_size += cap_size + path.length() + lb.length();
   }
   void add_snaprealm(inodeno_t ino, snapid_t seq, inodeno_t parent) {
     snaprealm_reconnect_t r;
@@ -62,6 +93,9 @@ public:
     r.realm.seq = seq;
     r.realm.parent = parent;
     realms.push_back(r);
+    if (!realm_size)
+      calc_item_size();
+    approx_size += realm_size;
   }
 
   void encode_payload(uint64_t features) override {
@@ -80,6 +114,7 @@ public:
     if (header.version >= 4) {
        encode(caps, data);
        encode(realms, data);
+       encode(more, data);
     } else {
       // compat crap
       if (header.version == 3) {
@@ -107,6 +142,8 @@ public:
     if (header.version >= 4) {
       decode(caps, p);
       decode(realms, p);
+      if (header.version >= 5)
+       decode(more, p);
     } else {
       // compat crap
       if (header.version == 3) {