oldmap->get_stopped_mds_set(oldstopped);
mdsmap->get_stopped_mds_set(stopped);
for (set<mds_rank_t>::iterator p = stopped.begin(); p != stopped.end(); ++p)
- if (oldstopped.count(*p) == 0) // newly so?
+ if (oldstopped.count(*p) == 0) { // newly so?
mdcache->migrator->handle_mds_failure_or_stop(*p);
+ if (mdsmap->get_tableserver() == whoami)
+ snapserver->handle_mds_failure_or_stop(*p);
+ }
}
{
mdcache->handle_mds_failure(who);
+ if (mdsmap->get_tableserver() == whoami)
+ snapserver->handle_mds_failure_or_stop(who);
+
snapclient->handle_mds_failure(who);
}
case TABLESERVER_OP_QUERY_REPLY:
handle_query_result(m);
break;
+
+ case TABLESERVER_OP_NOTIFY_PREP:
+ handle_notify_prep(m);
+ break;
case TABLESERVER_OP_AGREE:
if (pending_prepare.count(reqid)) {
// child must implement
virtual void resend_queries() = 0;
virtual void handle_query_result(MMDSTableRequest *m) = 0;
+ virtual void handle_notify_prep(MMDSTableRequest *m) = 0;
virtual void notify_commit(version_t tid) = 0;
// and friendly front-end for _prepare.
case TABLESERVER_OP_PREPARE: return handle_prepare(req);
case TABLESERVER_OP_COMMIT: return handle_commit(req);
case TABLESERVER_OP_ROLLBACK: return handle_rollback(req);
+ case TABLESERVER_OP_NOTIFY_ACK: return handle_notify_ack(req);
default: assert(0 == "unrecognized mds_table_server request op");
}
}
MMDSTableRequest *reply = new MMDSTableRequest(table, TABLESERVER_OP_AGREE, req->reqid, tid);
reply->bl = req->bl;
- mds->send_message_mds(reply, from);
+
+ if (_notify_prep(tid)) {
+ auto& p = pending_notifies[tid];
+ p.notify_ack_gather = active_clients;
+ p.mds = from;
+ p.reply = reply;
+ } else {
+ mds->send_message_mds(reply, from);
+ }
req->put();
}
+void MDSTableServer::handle_notify_ack(MMDSTableRequest *m)
+{
+ dout(7) << __func__ << " " << *m << dendl;
+ mds_rank_t from = mds_rank_t(m->get_source().num());
+ version_t tid = m->get_tid();
+
+ auto p = pending_notifies.find(tid);
+ if (p != pending_notifies.end()) {
+ if (p->second.notify_ack_gather.erase(from)) {
+ if (p->second.notify_ack_gather.empty()) {
+ if (p->second.onfinish)
+ p->second.onfinish->complete(0);
+ else
+ mds->send_message_mds(p->second.reply, p->second.mds);
+ pending_notifies.erase(p);
+ }
+ } else {
+ dout(0) << "got unexpected notify ack for tid " << tid << " from mds." << from << dendl;
+ }
+ } else {
+ }
+ m->put();
+}
+
class C_Commit : public MDSLogContextBase {
MDSTableServer *server;
MMDSTableRequest *req;
_note_server_update(bl);
}
-
// recovery
+class C_ServerRecovery : public MDSInternalContextBase {
+ MDSTableServer *server;
+ MDSRank *get_mds() override { return server->mds; }
+public:
+ C_ServerRecovery(MDSTableServer *s) : server(s) {}
+ void finish(int r) override {
+ server->_do_server_recovery();
+ }
+};
+
+void MDSTableServer::_do_server_recovery()
+{
+ dout(7) << __func__ << " " << active_clients << dendl;
+ map<mds_rank_t, uint64_t> next_reqids;
+
+ for (auto p : pending_for_mds) {
+ mds_rank_t who = p.second.mds;
+ if (!active_clients.count(who))
+ continue;
+
+ if (p.second.reqid >= next_reqids[who])
+ next_reqids[who] = p.second.reqid + 1;
+
+ version_t tid = p.second.tid;
+ MMDSTableRequest *reply = new MMDSTableRequest(table, TABLESERVER_OP_AGREE, p.second.reqid, tid);
+ _get_reply_buffer(tid, &reply->bl);
+ mds->send_message_mds(reply, who);
+ }
+
+ for (auto p : active_clients) {
+ MMDSTableRequest *reply = new MMDSTableRequest(table, TABLESERVER_OP_SERVER_READY, next_reqids[p]);
+ mds->send_message_mds(reply, p);
+ }
+ recovered = true;
+}
+
void MDSTableServer::finish_recovery(set<mds_rank_t>& active)
{
- dout(7) << "finish_recovery" << dendl;
- for (set<mds_rank_t>::iterator p = active.begin(); p != active.end(); ++p)
- handle_mds_recovery(*p); // resend agrees for everyone.
+ dout(7) << __func__ << dendl;
+
+ active_clients = active;
+
+ // don't know if survivor mds have received all 'notify prep' messages.
+ // so we need to send 'notify prep' again.
+ if (!pending_for_mds.empty() && _notify_prep(version)) {
+ auto& q = pending_notifies[version];
+ q.notify_ack_gather = active_clients;
+ q.mds = MDS_RANK_NONE;
+ q.onfinish = new C_ServerRecovery(this);
+ } else {
+ _do_server_recovery();
+ }
}
void MDSTableServer::handle_mds_recovery(mds_rank_t who)
{
dout(7) << "handle_mds_recovery mds." << who << dendl;
+ active_clients.insert(who);
+ if (!recovered) {
+ dout(7) << " still not recovered, delaying" << dendl;
+ return;
+ }
+
uint64_t next_reqid = 0;
// resend agrees for recovered mds
- for (map<version_t,mds_table_pending_t>::iterator p = pending_for_mds.begin();
- p != pending_for_mds.end();
- ++p) {
+ for (auto p = pending_for_mds.begin(); p != pending_for_mds.end(); ++p) {
if (p->second.mds != who)
continue;
+ assert(!pending_notifies.count(p->second.tid));
+
if (p->second.reqid >= next_reqid)
next_reqid = p->second.reqid + 1;
+
MMDSTableRequest *reply = new MMDSTableRequest(table, TABLESERVER_OP_AGREE, p->second.reqid, p->second.tid);
_get_reply_buffer(p->second.tid, &reply->bl);
mds->send_message_mds(reply, who);
MMDSTableRequest *reply = new MMDSTableRequest(table, TABLESERVER_OP_SERVER_READY, next_reqid);
mds->send_message_mds(reply, who);
}
+
+void MDSTableServer::handle_mds_failure_or_stop(mds_rank_t who)
+{
+ dout(7) << __func__ << " mds." << who << dendl;
+
+ active_clients.erase(who);
+
+ list<MMDSTableRequest*> rollback;
+ for (auto p = pending_notifies.begin(); p != pending_notifies.end(); ) {
+ auto q = p++;
+ if (q->second.mds == who) {
+ // haven't sent reply yet.
+ rollback.push_back(q->second.reply);
+ pending_notifies.erase(q);
+ } else if (q->second.notify_ack_gather.erase(who)) {
+ // the failed mds will reload snaptable when it recovers.
+ // so we can remove it from the gather set.
+ if (q->second.notify_ack_gather.empty()) {
+ if (q->second.onfinish)
+ q->second.onfinish->complete(0);
+ else
+ mds->send_message_mds(q->second.reply, q->second.mds);
+ pending_notifies.erase(q);
+ }
+ }
+ }
+
+ for (auto p : rollback) {
+ p->op = TABLESERVER_OP_ROLLBACK;
+ handle_rollback(p);
+ }
+}
class MDSTableServer : public MDSTable {
protected:
int table;
+ bool recovered;
+ set<mds_rank_t> active_clients;
private:
map<version_t,mds_table_pending_t> pending_for_mds; // ** child should encode this! **
set<version_t> committing_tids;
+ struct notify_info_t {
+ set<mds_rank_t> notify_ack_gather;
+ mds_rank_t mds;
+ MMDSTableRequest *reply;
+ MDSInternalContextBase *onfinish;
+ notify_info_t() : reply(NULL), onfinish(NULL) {}
+ };
+ map<version_t, notify_info_t> pending_notifies;
+
void handle_prepare(MMDSTableRequest *m);
void _prepare_logged(MMDSTableRequest *m, version_t tid);
friend class C_Prepare;
void _server_update_logged(bufferlist& bl);
friend class C_ServerUpdate;
+ void handle_notify_ack(MMDSTableRequest *m);
+
public:
virtual void handle_query(MMDSTableRequest *m) = 0;
virtual void _prepare(bufferlist &bl, uint64_t reqid, mds_rank_t bymds) = 0;
virtual void _commit(version_t tid, MMDSTableRequest *req=NULL) = 0;
virtual void _rollback(version_t tid) = 0;
virtual void _server_update(bufferlist& bl) { ceph_abort(); }
+ virtual bool _notify_prep(version_t tid) { return false; };
void _note_prepare(mds_rank_t mds, uint64_t reqid, bool replay=false) {
version++;
projected_version = version;
}
- MDSTableServer(MDSRank *m, int tab) : MDSTable(m, get_mdstable_name(tab), false), table(tab) {}
+ MDSTableServer(MDSRank *m, int tab) :
+ MDSTable(m, get_mdstable_name(tab), false), table(tab), recovered(false) {}
~MDSTableServer() override {}
void handle_request(MMDSTableRequest *m);
// recovery
void finish_recovery(set<mds_rank_t>& active);
+ void _do_server_recovery();
+ friend class C_ServerRecovery;
+
void handle_mds_recovery(mds_rank_t who);
+ void handle_mds_failure_or_stop(mds_rank_t who);
};
#endif
decode(snapid, p);
dout(10) << " stid " << stid << " snapid " << snapid << dendl;
- // FIXME: notify all other mds the change
- if (stid > mds->snapclient->get_cached_version()) {
- mds->snapclient->refresh(stid, new C_MDS_RetryRequest(mdcache, mdr));
- return;
- }
+ assert(mds->snapclient->get_cached_version() >= stid);
// journal
SnapInfo info;
decode(seq, p);
dout(10) << " stid is " << stid << ", seq is " << seq << dendl;
- // FIXME: notify all other mds the change
- if (stid > mds->snapclient->get_cached_version()) {
- mds->snapclient->refresh(stid, new C_MDS_RetryRequest(mdcache, mdr));
- return;
- }
+ assert(mds->snapclient->get_cached_version() >= stid);
// journal
auto &pi = diri->project_inode(false, true);
version_t stid = mdr->more()->stid;
dout(10) << " stid is " << stid << dendl;
- // FIXME: notify all other mds the change
- if (stid > mds->snapclient->get_cached_version()) {
- mds->snapclient->refresh(stid, new C_MDS_RetryRequest(mdcache, mdr));
- return;
- }
+ assert(mds->snapclient->get_cached_version() >= stid);
// journal
auto &pi = diri->project_inode(false, true);
}
}
- if (m->reqid >= sync_reqid)
+ if (m->op == TABLESERVER_OP_QUERY_REPLY && m->reqid >= sync_reqid)
synced = true;
if (synced && !waiting_for_version.empty()) {
}
}
+void SnapClient::handle_notify_prep(MMDSTableRequest *m)
+{
+ dout(10) << __func__ << " " << *m << dendl;
+ handle_query_result(m);
+ MMDSTableRequest *ack = new MMDSTableRequest(table, TABLESERVER_OP_NOTIFY_ACK, 0, m->get_tid());
+ mds->send_message(ack, m->get_connection());
+}
+
void SnapClient::notify_commit(version_t tid)
{
dout(10) << __func__ << " tid " << tid << dendl;
void resend_queries() override;
void handle_query_result(MMDSTableRequest *m) override;
+ void handle_notify_prep(MMDSTableRequest *m) override;
void notify_commit(version_t tid) override;
void prepare_create(inodeno_t dirino, std::string_view name, utime_t stamp,
}
}
+bool SnapServer::_notify_prep(version_t tid)
+{
+ using ceph::encode;
+ bufferlist bl;
+ char type = 'F';
+ encode(type, bl);
+ encode(snaps, bl);
+ encode(pending_update, bl);
+ encode(pending_destroy, bl);
+ assert(version == tid);
+
+ for (auto p : active_clients) {
+ MMDSTableRequest *m = new MMDSTableRequest(table, TABLESERVER_OP_NOTIFY_PREP, 0, version);
+ m->bl = bl;
+ mds->send_message_mds(m, p);
+ }
+ return true;
+}
+
void SnapServer::handle_query(MMDSTableRequest *req)
{
+ using ceph::encode;
+ using ceph::decode;
char op;
bufferlist::iterator p = req->bl.begin();
decode(op, p);
version_t last_checked_osdmap;
-public:
- SnapServer(MDSRank *m, MonClient *monc)
- : MDSTableServer(m, TABLE_SNAP), mon_client(monc), last_checked_osdmap(0)
- {}
-
- void reset_state() override;
void encode_server_state(bufferlist& bl) const override {
ENCODE_START(3, 3, bl);
encode(last_snap, bl);
DECODE_FINISH(bl);
}
- // To permit enc/decoding in isolation in dencoder
- SnapServer() : MDSTableServer(NULL, TABLE_SNAP), last_checked_osdmap(0) {}
- void encode(bufferlist& bl) const {
- encode_server_state(bl);
- }
- void decode(bufferlist::iterator& bl) {
- decode_server_state(bl);
- }
- void dump(Formatter *f) const;
- static void generate_test_instances(list<SnapServer*>& ls);
-
// server bits
void _prepare(bufferlist &bl, uint64_t reqid, mds_rank_t bymds) override;
void _get_reply_buffer(version_t tid, bufferlist *pbl) const override;
void _commit(version_t tid, MMDSTableRequest *req=NULL) override;
void _rollback(version_t tid) override;
void _server_update(bufferlist& bl) override;
+ bool _notify_prep(version_t tid) override;
void handle_query(MMDSTableRequest *m) override;
+public:
+ SnapServer(MDSRank *m, MonClient *monc)
+ : MDSTableServer(m, TABLE_SNAP), mon_client(monc), last_checked_osdmap(0) {}
+ SnapServer() : MDSTableServer(NULL, TABLE_SNAP), last_checked_osdmap(0) {}
+
+ void reset_state() override;
+
void check_osd_map(bool force);
+
+ void encode(bufferlist& bl) const {
+ encode_server_state(bl);
+ }
+ void decode(bufferlist::iterator& bl) {
+ decode_server_state(bl);
+ }
+
+ void dump(Formatter *f) const;
+ static void generate_test_instances(list<SnapServer*>& ls);
};
WRITE_CLASS_ENCODER(SnapServer)
TABLESERVER_OP_ROLLBACK = 7,
TABLESERVER_OP_SERVER_UPDATE = 8,
TABLESERVER_OP_SERVER_READY = -9,
+ TABLESERVER_OP_NOTIFY_ACK = 10,
+ TABLESERVER_OP_NOTIFY_PREP = -11,
};
inline const char *get_mdstableserver_opname(int op) {
case TABLESERVER_OP_ROLLBACK: return "rollback";
case TABLESERVER_OP_SERVER_UPDATE: return "server_update";
case TABLESERVER_OP_SERVER_READY: return "server_ready";
+ case TABLESERVER_OP_NOTIFY_ACK: return "notify_ack";
+ case TABLESERVER_OP_NOTIFY_PREP: return "notify_prep";
default: ceph_abort(); return 0;
}
}