// kick anchortable (resent AGREEs)
if (mdsmap->get_tableserver() == whoami) {
- anchorserver->finish_recovery();
- snapserver->finish_recovery();
+ set<int> active;
+ mdsmap->get_mds_set(active, MDSMap::STATE_CLIENTREPLAY);
+ mdsmap->get_mds_set(active, MDSMap::STATE_ACTIVE);
+ mdsmap->get_mds_set(active, MDSMap::STATE_STOPPING);
+ anchorserver->finish_recovery(active);
+ snapserver->finish_recovery(active);
}
-
- // kick anchorclient (resent COMMITs)
- anchorclient->finish_recovery();
- snapclient->finish_recovery();
-
+
mdcache->start_recovered_truncates();
mdcache->do_file_recover();
mdcache->handle_mds_recovery(who);
- if (anchorserver) {
+ if (mdsmap->get_tableserver() == whoami) {
anchorserver->handle_mds_recovery(who);
snapserver->handle_mds_recovery(who);
}
- anchorclient->handle_mds_recovery(who);
- snapclient->handle_mds_recovery(who);
-
+
queue_waiters(waiting_for_active_peer[who]);
waiting_for_active_peer.erase(who);
}
}
break;
+ case TABLESERVER_OP_SERVER_READY:
+ if (last_reqid == ~0ULL)
+ last_reqid = reqid;
+
+ resend_queries();
+ resend_prepares();
+ resend_commits();
+ break;
+
default:
assert(0);
}
void MDSTableClient::_prepare(bufferlist& mutation, version_t *ptid, bufferlist *pbl,
Context *onfinish)
{
+ if (last_reqid == ~0ULL) {
+ dout(10) << "tableserver is not ready yet, waiting for request id" << dendl;
+ waiting_for_reqid.push_back(_pending_prepare(onfinish, ptid, pbl, mutation));
+ return;
+ }
+
uint64_t reqid = ++last_reqid;
dout(10) << "_prepare " << reqid << dendl;
ls->pending_commit_tids[table].insert(tid);
pending_commit[tid] = ls;
}
+
void MDSTableClient::got_journaled_ack(version_t tid)
{
dout(10) << "got_journaled_ack " << tid << dendl;
}
}
-void MDSTableClient::finish_recovery()
-{
- dout(7) << "finish_recovery" << dendl;
- resend_commits();
-}
-
void MDSTableClient::resend_commits()
{
for (map<version_t,LogSegment*>::iterator p = pending_commit.begin();
}
}
-void MDSTableClient::handle_mds_recovery(int who)
+void MDSTableClient::resend_prepares()
{
- dout(7) << "handle_mds_recovery mds." << who << dendl;
-
- if (who != mds->mdsmap->get_tableserver())
- return; // do nothing.
+ while (!waiting_for_reqid.empty()) {
+ pending_prepare[++last_reqid] = waiting_for_reqid.front();
+ waiting_for_reqid.pop_front();
+ }
- resend_queries();
-
- // prepares.
for (map<uint64_t, _pending_prepare>::iterator p = pending_prepare.begin();
p != pending_prepare.end();
++p) {
- dout(10) << "resending " << p->first << dendl;
+ dout(10) << "resending prepare on " << p->first << dendl;
MMDSTableRequest *req = new MMDSTableRequest(table, TABLESERVER_OP_PREPARE, p->first);
req->bl = p->second.mutation;
mds->send_message_mds(req, mds->mdsmap->get_tableserver());
- }
-
- resend_commits();
+ }
}
bufferlist mutation;
_pending_prepare() : onfinish(0), ptid(0), pbl(0) {}
+ _pending_prepare(Context *c, version_t *pt, bufferlist *pb, bufferlist& m) :
+ onfinish(c), ptid(pt), pbl(pb), mutation(m) {}
};
map<uint64_t, _pending_prepare> pending_prepare;
+ list<_pending_prepare> waiting_for_reqid;
// pending commits
map<version_t, LogSegment*> pending_commit;
void _logged_ack(version_t tid);
public:
- MDSTableClient(MDS *m, int tab) : mds(m), table(tab), last_reqid(0) {}
+ MDSTableClient(MDS *m, int tab) : mds(m), table(tab), last_reqid(~0ULL) {}
virtual ~MDSTableClient() {}
void handle_request(MMDSTableRequest *m);
void _prepare(bufferlist& mutation, version_t *ptid, bufferlist *pbl, Context *onfinish);
void commit(version_t tid, LogSegment *ls);
- // for recovery (by other nodes)
- void handle_mds_recovery(int mds); // called when someone else recovers
void resend_commits();
+ void resend_prepares();
// for recovery (by me)
void got_journaled_agree(version_t tid, LogSegment *ls);
void wait_for_ack(version_t tid, Context *c) {
ack_waiters[tid].push_back(c);
}
- void finish_recovery(); // called when i recover and go active
void send_to_tableserver(MMDSTableRequest *req);
// recovery
-void MDSTableServer::finish_recovery()
+void MDSTableServer::finish_recovery(set<int>& active)
{
dout(7) << "finish_recovery" << dendl;
- handle_mds_recovery(-1); // resend agrees for everyone.
+ for (set<int>::iterator p = active.begin(); p != active.end(); ++p)
+ handle_mds_recovery(*p); // resend agrees for everyone.
}
void MDSTableServer::handle_mds_recovery(int who)
{
- if (who >= 0)
- dout(7) << "handle_mds_recovery mds." << who << dendl;
-
+ dout(7) << "handle_mds_recovery mds." << who << dendl;
+
+ uint64_t next_reqid = 0;
// resend agrees for recovered mds
for (map<version_t,mds_table_pending_t>::iterator p = pending_for_mds.begin();
p != pending_for_mds.end();
++p) {
- if (who >= 0 && p->second.mds != who)
+ if (p->second.mds != who)
continue;
+ if (p->second.reqid >= next_reqid)
+ next_reqid = p->second.reqid + 1;
MMDSTableRequest *reply = new MMDSTableRequest(table, TABLESERVER_OP_AGREE, p->second.reqid, p->second.tid);
- mds->send_message_mds(reply, p->second.mds);
+ mds->send_message_mds(reply, who);
}
+
+ MMDSTableRequest *reply = new MMDSTableRequest(table, TABLESERVER_OP_SERVER_READY, next_reqid);
+ mds->send_message_mds(reply, who);
}
}
// recovery
- void finish_recovery();
+ void finish_recovery(set<int>& active);
void handle_mds_recovery(int who);
};
TABLESERVER_OP_ACK = -6,
TABLESERVER_OP_ROLLBACK = 7,
TABLESERVER_OP_SERVER_UPDATE = 8,
+ TABLESERVER_OP_SERVER_READY = -9,
};
inline const char *get_mdstableserver_opname(int op) {
case TABLESERVER_OP_ACK: return "ack";
case TABLESERVER_OP_ROLLBACK: return "rollback";
case TABLESERVER_OP_SERVER_UPDATE: return "server_update";
+ case TABLESERVER_OP_SERVER_READY: return "server_ready";
default: assert(0); return 0;
}
};