* > 0 -> in
* <= 0 -> out
*/
-#define CEPH_MDS_STATE_DNE 0 /* down, does not exist. */
-#define CEPH_MDS_STATE_STOPPED -1 /* down, once existed, but no subtrees.
- empty log. */
-#define CEPH_MDS_STATE_BOOT -4 /* up, boot announcement. */
-#define CEPH_MDS_STATE_STANDBY -5 /* up, idle. waiting for assignment. */
-#define CEPH_MDS_STATE_CREATING -6 /* up, creating MDS instance. */
-#define CEPH_MDS_STATE_STARTING -7 /* up, starting previously stopped mds. */
+#define CEPH_MDS_STATE_DNE 0 /* down, does not exist. */
+#define CEPH_MDS_STATE_STOPPED -1 /* down, once existed, but no subtrees.
+ empty log. */
+#define CEPH_MDS_STATE_BOOT -4 /* up, boot announcement. */
+#define CEPH_MDS_STATE_STANDBY -5 /* up, idle. waiting for assignment. */
+#define CEPH_MDS_STATE_CREATING -6 /* up, creating MDS instance. */
+#define CEPH_MDS_STATE_STARTING -7 /* up, starting previously stopped mds. */
#define CEPH_MDS_STATE_STANDBY_REPLAY -8 /* up, tailing active node's journal */
-#define CEPH_MDS_STATE_REPLAY 8 /* up, replaying journal. */
-#define CEPH_MDS_STATE_RESOLVE 9 /* up, disambiguating distributed
- operations (import, rename, etc.) */
-#define CEPH_MDS_STATE_RECONNECT 10 /* up, reconnect to clients */
-#define CEPH_MDS_STATE_REJOIN 11 /* up, rejoining distributed cache */
-#define CEPH_MDS_STATE_ACTIVE 12 /* up, active */
-#define CEPH_MDS_STATE_STOPPING 13 /* up, but exporting metadata */
+#define CEPH_MDS_STATE_REPLAY 8 /* up, replaying journal. */
+#define CEPH_MDS_STATE_RESOLVE 9 /* up, disambiguating distributed
+ operations (import, rename, etc.) */
+#define CEPH_MDS_STATE_RECONNECT 10 /* up, reconnect to clients */
+#define CEPH_MDS_STATE_REJOIN 11 /* up, rejoining distributed cache */
+#define CEPH_MDS_STATE_CLIENTREPLAY 12 /* up, replaying client operations */
+#define CEPH_MDS_STATE_ACTIVE 13 /* up, active */
+#define CEPH_MDS_STATE_STOPPING 14 /* up, but exporting metadata */
static inline const char *ceph_mds_state_name(int s)
{
case CEPH_MDS_STATE_RESOLVE: return "up:resolve";
case CEPH_MDS_STATE_RECONNECT: return "up:reconnect";
case CEPH_MDS_STATE_REJOIN: return "up:rejoin";
+ case CEPH_MDS_STATE_CLIENTREPLAY: return "up:clientreplay";
case CEPH_MDS_STATE_ACTIVE: return "up:active";
case CEPH_MDS_STATE_STOPPING: return "up:stopping";
default: return "";
hash_map<metareqid_t, MDRequest*> active_requests;
public:
+ int get_num_active_requests() { return active_requests.size(); }
+
MDRequest* request_start(MClientRequest *req);
MDRequest* request_start_slave(metareqid_t rid, int by);
MDRequest* request_start_internal(int op);
<< ceph_mds_state_name(state) << dendl;
want_state = state;
- // now active?
+ // did i just recover?
+ if ((is_active() || is_clientreplay()) &&
+ (oldstate == MDSMap::STATE_REJOIN ||
+ oldstate == MDSMap::STATE_RECONNECT))
+ recovery_done();
+
if (is_active()) {
- // did i just recover?
- if (oldstate == MDSMap::STATE_REJOIN ||
- oldstate == MDSMap::STATE_RECONNECT)
- recovery_done();
finish_contexts(waiting_for_active); // kick waiters
} else if (is_replay() || is_standby_replay()) {
replay_start();
resolve_start();
} else if (is_reconnect()) {
reconnect_start();
+ } else if (is_clientreplay()) {
+ clientreplay_start();
} else if (is_creating()) {
boot_create();
} else if (is_starting()) {
request_state(MDSMap::STATE_REJOIN); // move to rejoin state
mdcache->reconnect_clean_open_file_lists();
-
- /*
- if (mdsmap->get_num_in_mds() == 1 &&
- mdsmap->get_num_mds(MDSMap::STATE_FAILED) == 0) { // just me!
-
- // finish processing caps (normally, this happens during rejoin, but we're skipping that...)
- mdcache->rejoin_gather_finish();
-
- request_state(MDSMap::STATE_ACTIVE); // go active
- } else {
- request_state(MDSMap::STATE_REJOIN); // move to rejoin state
- }
- */
}
void MDS::rejoin_joint_start()
dout(1) << "rejoin_done" << dendl;
mdcache->show_subtrees();
mdcache->show_cache();
+
+ if (waiting_for_replay.empty())
+ request_state(MDSMap::STATE_ACTIVE);
+ else
+ request_state(MDSMap::STATE_CLIENTREPLAY);
+}
+
+void MDS::clientreplay_start()
+{
+ dout(1) << "clientreplay_start" << dendl;
+ queue_waiters(waiting_for_replay);
+}
+
+void MDS::clientreplay_done()
+{
+ dout(1) << "clientreplay_done" << dendl;
request_state(MDSMap::STATE_ACTIVE);
}
void MDS::recovery_done()
{
dout(1) << "recovery_done -- successful recovery!" << dendl;
- assert(is_active());
+ assert(is_active() || is_clientreplay());
// kick anchortable (resent AGREEs)
if (mdsmap->get_tableserver() == whoami) {
bcast_mds_map();
mdcache->populate_mydir();
-
- queue_waiters(waiting_for_active);
}
void MDS::handle_mds_recovery(int who)
// finish any triggered contexts
- if (finished_queue.size()) {
+ static bool finishing = false;
+ if (!finishing && finished_queue.size()) {
dout(7) << "mds has " << finished_queue.size() << " queued contexts" << dendl;
dout(10) << finished_queue << dendl;
list<Context*> ls;
ls.splice(ls.begin(), finished_queue);
assert(finished_queue.empty());
+ finishing = true;
finish_contexts(ls);
+ finishing = false;
+ } else {
+ // done with all client replayed requests?
+ if (!finishing &&
+ is_clientreplay() &&
+ mdcache->is_open() &&
+ mdcache->get_num_active_requests() == 0 &&
+ want_state == MDSMap::STATE_CLIENTREPLAY)
+ clientreplay_done();
}
-
// hack: thrash exports
static utime_t start;
utime_t now = g_clock.now();
int state; // my confirmed state
int want_state; // the state i want
- list<Context*> waiting_for_active;
+ list<Context*> waiting_for_active, waiting_for_replay;
map<int, list<Context*> > waiting_for_active_peer;
list<Context*> waiting_for_nolaggy;
void wait_for_active_peer(int who, Context *c) {
waiting_for_active_peer[who].push_back(c);
}
+ void wait_for_replay(Context *c) {
+ waiting_for_replay.push_back(c);
+ }
int get_state() { return state; }
bool is_creating() { return state == MDSMap::STATE_CREATING; }
bool is_resolve() { return state == MDSMap::STATE_RESOLVE; }
bool is_reconnect() { return state == MDSMap::STATE_RECONNECT; }
bool is_rejoin() { return state == MDSMap::STATE_REJOIN; }
+ bool is_clientreplay() { return state == MDSMap::STATE_CLIENTREPLAY; }
bool is_active() { return state == MDSMap::STATE_ACTIVE; }
bool is_stopping() { return state == MDSMap::STATE_STOPPING; }
void rejoin_done();
void recovery_done();
void handle_mds_recovery(int who);
+ void clientreplay_start();
+ void clientreplay_done();
void stopping_start();
void stopping_done();
static const int STATE_RESOLVE = CEPH_MDS_STATE_RESOLVE; // up, disambiguating distributed operations (import, rename, etc.)
static const int STATE_RECONNECT = CEPH_MDS_STATE_RECONNECT; // up, reconnect to clients
static const int STATE_REJOIN = CEPH_MDS_STATE_REJOIN; // up, replayed journal, rejoining distributed cache
+ static const int STATE_CLIENTREPLAY = CEPH_MDS_STATE_CLIENTREPLAY; // up, active
static const int STATE_ACTIVE = CEPH_MDS_STATE_ACTIVE; // up, active
static const int STATE_STOPPING = CEPH_MDS_STATE_STOPPING; // up, exporting metadata (-> standby or out)
bool is_resolve(int m) { return get_state(m) == STATE_RESOLVE; }
bool is_reconnect(int m) { return get_state(m) == STATE_RECONNECT; }
bool is_rejoin(int m) { return get_state(m) == STATE_REJOIN; }
+ bool is_clientreplay(int m) { return get_state(m) == STATE_CLIENTREPLAY; }
bool is_active(int m) { return get_state(m) == STATE_ACTIVE; }
bool is_stopping(int m) { return get_state(m) == STATE_STOPPING; }
bool is_active_or_stopping(int m) { return is_active(m) || is_stopping(m); }
}
// active?
- if (!mds->is_active() && !mds->is_stopping()) {
- dout(3) << "not active yet, waiting" << dendl;
- mds->wait_for_active(new C_MDS_RetryMessage(mds, m));
- return;
+ if (!mds->is_active() &&
+ !(mds->is_stopping() && m->get_orig_source().is_mds())) {
+ if (mds->is_reconnect() &&
+ m->get_type() == CEPH_MSG_CLIENT_REQUEST &&
+ ((MClientRequest*)m)->is_replay()) {
+ dout(3) << "queuing replayed op" << dendl;
+ mds->wait_for_replay(new C_MDS_RetryMessage(mds, m));
+ return;
+ } else if (mds->is_clientreplay() &&
+ m->get_type() == CEPH_MSG_CLIENT_REQUEST &&
+ ((MClientRequest*)m)->is_replay()) {
+ // replaying!
+ } else {
+ dout(3) << "not active yet, waiting" << dendl;
+ mds->wait_for_active(new C_MDS_RetryMessage(mds, m));
+ return;
+ }
}
switch (m->get_type()) {
if (client_inst.name.is_mds())
return;
+ if (req->is_replay()) {
+ dout(10) << "early_reply - none for replay request" << dendl;
+ return;
+ }
+
MClientReply *reply = new MClientReply(mdr->client_request, 0);
reply->set_unsafe();
if (logger) logger->inc(l_mdss_hcreq);
- if (!mds->is_active() &&
- !(mds->is_stopping() && req->get_orig_source().is_mds())) {
- dout(5) << " not active (or stopping+mds), discarding request." << dendl;
- delete req;
- return;
- }
-
if (!mdcache->is_open()) {
dout(5) << "waiting for root" << dendl;
mdcache->wait_for_open(new C_MDS_RetryMessage(mds, req));
out << " " << get_filepath2();
if (head.num_retry)
out << " RETRY=" << (int)head.num_retry;
+ if (get_flags() & CEPH_MDS_FLAG_REPLAY)
+ out << " REPLAY";
out << ")";
}