//make sure unsafe requests get saved
resend_unsafe_requests(session);
+ early_kick_flushing_caps(session);
+
auto m = MClientReconnect::create();
+ bool allow_multi = session->mds_features.test(CEPHFS_FEATURE_MULTI_RECONNECT);
// i have an open session.
ceph::unordered_set<inodeno_t> did_snaprealm;
Inode *in = p->second;
auto it = in->caps.find(mds);
if (it != in->caps.end()) {
+ if (allow_multi &&
+ m->get_approx_size() >= (std::numeric_limits<int>::max() >> 1)) {
+ m->mark_more();
+ session->con->send_message2(std::move(m));
+
+ m = MClientReconnect::create();
+ }
+
Cap &cap = it->second;
ldout(cct, 10) << " caps on " << p->first
<< " " << ccap_string(cap.issued)
}
}
- early_kick_flushing_caps(session);
-
- m->set_encoding_version(0); // use connection features to choose encoding
+ if (!allow_multi)
+ m->set_encoding_version(0); // use connection features to choose encoding
session->con->send_message2(std::move(m));
mount_cond.Signal();
for (xlist<Inode*>::iterator p = session->flushing_caps.begin(); !p.end(); ++p) {
Inode *in = *p;
- ceph_assert(in->auth_cap);
+ Cap *cap = in->auth_cap;
+ ceph_assert(cap);
// if flushing caps were revoked, we re-send the cap flush in client reconnect
// stage. This guarantees that MDS processes the cap flush message before issuing
session->early_flushing_caps.insert(in);
+ // send_reconnect() also will reset these sequence numbers. make sure
+ // sequence numbers in cap flush message match later reconnect message.
+ cap->seq = 0;
+ cap->issue_seq = 0;
+ cap->mseq = 0;
+ cap->issued = cap->implemented;
+
if (in->cap_snaps.size())
flush_snaps(in, true);
if (in->flushing_caps)
void Server::handle_client_reconnect(const MClientReconnect::const_ref &m)
{
- dout(7) << "handle_client_reconnect " << m->get_source() << dendl;
+ dout(7) << "handle_client_reconnect " << m->get_source()
+ << (m->has_more() ? " (more)" : "") << dendl;
client_t from = m->get_source().num();
Session *session = mds->get_session(m);
ceph_assert(session);
}
if (deny) {
- auto m = MClientSession::create(CEPH_SESSION_CLOSE);
- mds->send_message_client(m, session);
+ auto r = MClientSession::create(CEPH_SESSION_CLOSE);
+ mds->send_message_client(r, session);
if (session->is_open())
kill_session(session, nullptr);
return;
}
- // notify client of success with an OPEN
- auto reply = MClientSession::create(CEPH_SESSION_OPEN);
- if (session->info.has_feature(CEPHFS_FEATURE_MIMIC))
- reply->supported_features = supported_features;
- mds->send_message_client(reply, session);
+ if (!m->has_more()) {
+ // notify client of success with an OPEN
+ auto reply = MClientSession::create(CEPH_SESSION_OPEN);
+ if (session->info.has_feature(CEPHFS_FEATURE_MIMIC))
+ reply->supported_features = supported_features;
+ mds->send_message_client(reply, session);
+ mds->clog->debug() << "reconnect by " << session->info.inst << " after " << delay;
+ }
session->last_cap_renew = clock::now();
- mds->clog->debug() << "reconnect by " << session->info.inst << " after " << delay;
// snaprealms
for (const auto &r : m->realms) {
mdcache->rejoin_recovered_caps(p.first, from, p.second, MDS_RANK_NONE);
}
}
- mdcache->rejoin_recovered_client(session->get_client(), session->info.inst);
reconnect_last_seen = clock::now();
- // remove from gather set
- client_reconnect_gather.erase(from);
- if (client_reconnect_gather.empty())
- reconnect_gather_finish();
+ if (!m->has_more()) {
+ mdcache->rejoin_recovered_client(session->get_client(), session->info.inst);
+
+ // remove from gather set
+ client_reconnect_gather.erase(from);
+ if (client_reconnect_gather.empty())
+ reconnect_gather_finish();
+ }
}
void Server::infer_supported_features(Session *session, client_metadata_t& client_metadata)
#define CEPHFS_FEATURE_REPLY_ENCODING 9
#define CEPHFS_FEATURE_RECLAIM_CLIENT 10
#define CEPHFS_FEATURE_LAZY_CAP_WANTED 11
+#define CEPHFS_FEATURE_MULTI_RECONNECT 12
#define CEPHFS_FEATURES_ALL { \
0, 1, 2, 3, 4, \
CEPHFS_FEATURE_REPLY_ENCODING, \
CEPHFS_FEATURE_RECLAIM_CLIENT, \
CEPHFS_FEATURE_LAZY_CAP_WANTED, \
+ CEPHFS_FEATURE_MULTI_RECONNECT, \
}
#define CEPHFS_FEATURES_MDS_SUPPORTED CEPHFS_FEATURES_ALL
public:
friend factory;
private:
- static constexpr int HEAD_VERSION = 4;
+ static constexpr int HEAD_VERSION = 5;
static constexpr int COMPAT_VERSION = 4;
public:
- map<inodeno_t, cap_reconnect_t> caps; // only head inodes
+ map<inodeno_t, cap_reconnect_t> caps; // only head inodes
vector<snaprealm_reconnect_t> realms;
+ bool more = false;
MClientReconnect() :
- MessageInstance(CEPH_MSG_CLIENT_RECONNECT, HEAD_VERSION, COMPAT_VERSION) { }
+ MessageInstance(CEPH_MSG_CLIENT_RECONNECT, HEAD_VERSION, COMPAT_VERSION) {}
private:
~MClientReconnect() override {}
+ size_t cap_size = 0;
+ size_t realm_size = 0;
+ size_t approx_size = sizeof(__u32) + sizeof(__u32) + 1;
+
+ void calc_item_size() {
+ using ceph::encode;
+ {
+ bufferlist bl;
+ inodeno_t ino;
+ cap_reconnect_t cr;
+ encode(ino, bl);
+ encode(cr, bl);
+ cap_size = bl.length();
+ }
+ {
+ bufferlist bl;
+ snaprealm_reconnect_t sr;
+ encode(sr, bl);
+ realm_size = bl.length();
+ }
+ }
+
public:
std::string_view get_type_name() const override { return "client_reconnect"; }
void print(ostream& out) const override {
out << "client_reconnect("
- << caps.size() << " caps)";
+ << caps.size() << " caps " << realms.size() << " realms )";
}
// Force to use old encoding.
if (v <= 3)
header.compat_version = 0;
}
+ size_t get_approx_size() {
+ return approx_size;
+ }
+ void mark_more() { more = true; }
+ bool has_more() const { return more; }
void add_cap(inodeno_t ino, uint64_t cap_id, inodeno_t pathbase, const string& path,
int wanted, int issued, inodeno_t sr, snapid_t sf, bufferlist& lb)
{
caps[ino] = cap_reconnect_t(cap_id, pathbase, path, wanted, issued, sr, sf, lb);
+ if (!cap_size)
+ calc_item_size();
+ approx_size += cap_size + path.length() + lb.length();
}
void add_snaprealm(inodeno_t ino, snapid_t seq, inodeno_t parent) {
snaprealm_reconnect_t r;
r.realm.seq = seq;
r.realm.parent = parent;
realms.push_back(r);
+ if (!realm_size)
+ calc_item_size();
+ approx_size += realm_size;
}
void encode_payload(uint64_t features) override {
if (header.version >= 4) {
encode(caps, data);
encode(realms, data);
+ encode(more, data);
} else {
// compat crap
if (header.version == 3) {
if (header.version >= 4) {
decode(caps, p);
decode(realms, p);
+ if (header.version >= 5)
+ decode(more, p);
} else {
// compat crap
if (header.version == 3) {