adjust_subtree_auth(rootdir, CDIR_AUTH_UNKNOWN);
}
resolve_gather = recovery_set;
+
+ resolve_snapclient_commits = mds->snapclient->get_journaled_tids();
}
void MDCache::send_resolves()
{
send_slave_resolves();
+
+ if (!resolve_done) {
+ // I'm survivor: refresh snap cache
+ mds->snapclient->sync(
+ new MDSInternalContextWrapper(mds,
+ new FunctionContext([this](int r) {
+ maybe_finish_slave_resolve();
+ })
+ )
+ );
+ dout(10) << "send_resolves waiting for snapclient cache to sync" << dendl;
+ return;
+ }
if (!resolve_ack_gather.empty()) {
dout(10) << "send_resolves still waiting for resolve ack from ("
<< resolve_ack_gather << ")" << dendl;
<< resolve_need_rollback << ")" << dendl;
return;
}
+
send_subtree_resolves();
}
p != resolves.end();
++p) {
MMDSResolve* m = p->second;
+ if (mds->is_resolve()) {
+ m->add_table_commits(TABLE_SNAP, resolve_snapclient_commits);
+ } else {
+ m->add_table_commits(TABLE_SNAP, mds->snapclient->get_journaled_tids());
+ }
m->subtrees = my_subtrees;
m->ambiguous_imports = my_ambig_imports;
dout(10) << "sending subtee resolve to mds." << p->first << dendl;
void MDCache::maybe_finish_slave_resolve() {
if (resolve_ack_gather.empty() && resolve_need_rollback.empty()) {
- send_subtree_resolves();
+ // snap cache get synced or I'm in resolve state
+ if (mds->snapclient->is_synced() || resolve_done)
+ send_subtree_resolves();
process_delayed_resolve();
}
}
dout(10) << "noting ambiguous import on " << pi->first << " bounds " << pi->second << dendl;
other_ambiguous_imports[from][pi->first].swap( pi->second );
}
+
+ // learn other mds' pendina snaptable commits. later when resolve finishes, we will reload
+ // snaptable cache from snapserver. By this way, snaptable cache get synced among all mds
+ for (auto p : m->table_clients) {
+ dout(10) << " noting " << get_mdstable_name(p.type)
+ << " pending_commits " << p.pending_commits << dendl;
+ MDSTableClient *client = mds->get_table_client(p.type);
+ for (auto q : p.pending_commits)
+ client->notify_commit(q);
+ }
// did i get them all?
resolve_gather.erase(from);
recalc_auth_bits(false);
resolve_done.release()->complete(0);
} else {
+ // I am survivor.
maybe_send_pending_rejoins();
}
}
bool resolves_pending;
set<mds_rank_t> resolve_gather; // nodes i need resolves from
set<mds_rank_t> resolve_ack_gather; // nodes i need a resolve_ack from
+ set<version_t> resolve_snapclient_commits;
map<metareqid_t, mds_rank_t> resolve_need_rollback; // rollbacks i'm writing to the journal
map<mds_rank_t, MMDSResolve*> delayed_resolve;
dout(10) << "handle_request " << *m << dendl;
assert(m->table == table);
+ if (mds->get_state() < MDSMap::STATE_RESOLVE) {
+ if (mds->get_want_state() == CEPH_MDS_STATE_RESOLVE) {
+ mds->wait_for_resolve(new C_MDS_RetryMessage(mds, m));
+ } else {
+ m->put();
+ }
+ return;
+ }
+
version_t tid = m->get_tid();
uint64_t reqid = m->reqid;
ack_waiters[tid].push_back(c);
}
+ set<version_t> get_journaled_tids() const {
+ set<version_t> tids;
+ for (auto p : pending_commit)
+ tids.insert(p.first);
+ return tids;
+ }
+
void handle_mds_failure(mds_rank_t mds);
// child must implement
decode(committing, bl);
}
};
- WRITE_CLASS_ENCODER(slave_request)
map<metareqid_t, slave_request> slave_requests;
+ // table client information
+ struct table_client {
+ __u8 type;
+ set<version_t> pending_commits;
+
+ table_client() : type(0) {}
+ table_client(int _type, const set<version_t>& commits)
+ : type(_type), pending_commits(commits) {}
+
+ void encode(bufferlist& bl) const {
+ using ceph::encode;
+ encode(type, bl);
+ encode(pending_commits, bl);
+ }
+ void decode(bufferlist::iterator& bl) {
+ using ceph::decode;
+ decode(type, bl);
+ decode(pending_commits, bl);
+ }
+ };
+
+ list<table_client> table_clients;
+
MMDSResolve() : Message(MSG_MDS_RESOLVE) {}
private:
~MMDSResolve() override {}
slave_requests[reqid].inode_caps.claim(bl);
}
+ void add_table_commits(int table, const set<version_t>& pending_commits) {
+ table_clients.push_back(table_client(table, pending_commits));
+ }
+
void encode_payload(uint64_t features) override {
using ceph::encode;
encode(subtrees, payload);
encode(ambiguous_imports, payload);
encode(slave_requests, payload);
+ encode(table_clients, payload);
}
void decode_payload() override {
using ceph::decode;
decode(subtrees, p);
decode(ambiguous_imports, p);
decode(slave_requests, p);
+ decode(table_clients, p);
}
};
}
WRITE_CLASS_ENCODER(MMDSResolve::slave_request)
+WRITE_CLASS_ENCODER(MMDSResolve::table_client)
#endif