From 9662ad40e05e8ea29b6dc312e3a71577b478cbc3 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Tue, 9 Feb 2010 10:03:12 -0800 Subject: [PATCH] mds: behave when we pipeline session updates to journal --- src/mds/Server.cc | 52 ++++++++++++++++++++++++-------------------- src/mds/Server.h | 3 ++- src/mds/SessionMap.h | 8 +++++-- 3 files changed, 37 insertions(+), 26 deletions(-) diff --git a/src/mds/Server.cc b/src/mds/Server.cc index 7380d8dba043f..377d6a324016c 100644 --- a/src/mds/Server.cc +++ b/src/mds/Server.cc @@ -144,18 +144,19 @@ void Server::dispatch(Message *m) class C_MDS_session_finish : public Context { MDS *mds; Session *session; + __u64 state_seq; bool open; version_t cmapv; interval_set inos; version_t inotablev; public: - C_MDS_session_finish(MDS *m, Session *se, bool s, version_t mv) : - mds(m), session(se), open(s), cmapv(mv), inotablev(0) { } - C_MDS_session_finish(MDS *m, Session *se, bool s, version_t mv, interval_set& i, version_t iv) : - mds(m), session(se), open(s), cmapv(mv), inos(i), inotablev(iv) { } + C_MDS_session_finish(MDS *m, Session *se, __u64 sseq, bool s, version_t mv) : + mds(m), session(se), state_seq(sseq), open(s), cmapv(mv), inotablev(0) { } + C_MDS_session_finish(MDS *m, Session *se, __u64 sseq, bool s, version_t mv, interval_set& i, version_t iv) : + mds(m), session(se), state_seq(sseq), open(s), cmapv(mv), inos(i), inotablev(iv) { } void finish(int r) { assert(r == 0); - mds->server->_session_logged(session, open, cmapv, inos, inotablev); + mds->server->_session_logged(session, state_seq, open, cmapv, inos, inotablev); } }; @@ -185,6 +186,7 @@ void Server::handle_client_session(MClientSession *m) return; } + __u64 sseq = 0; switch (m->get_op()) { case CEPH_SESSION_REQUEST_OPEN: if (session->is_opening() || session->is_open()) { @@ -193,11 +195,11 @@ void Server::handle_client_session(MClientSession *m) } if (session->is_closed()) mds->sessionmap.add_session(session); - mds->sessionmap.set_state(session, Session::STATE_OPENING); + sseq = mds->sessionmap.set_state(session, Session::STATE_OPENING); mds->sessionmap.touch_session(session); pv = ++mds->sessionmap.projected; mdlog->start_submit_entry(new ESession(m->get_source_inst(), true, pv), - new C_MDS_session_finish(mds, session, true, pv)); + new C_MDS_session_finish(mds, session, sseq, true, pv)); mdlog->flush(); break; @@ -231,7 +233,7 @@ void Server::handle_client_session(MClientSession *m) << ", BUGGY!" << dendl; assert(0); } - mds->sessionmap.set_state(session, Session::STATE_CLOSING); + sseq = mds->sessionmap.set_state(session, Session::STATE_CLOSING); pv = ++mds->sessionmap.projected; interval_set both = session->prealloc_inos; @@ -243,7 +245,7 @@ void Server::handle_client_session(MClientSession *m) piv = 0; mdlog->start_submit_entry(new ESession(m->get_source_inst(), false, pv, both, piv), - new C_MDS_session_finish(mds, session, false, pv, both, piv)); + new C_MDS_session_finish(mds, session, sseq, false, pv, both, piv)); mdlog->flush(); } break; @@ -253,18 +255,21 @@ void Server::handle_client_session(MClientSession *m) } } -void Server::_session_logged(Session *session, bool open, version_t pv, interval_set& inos, version_t piv) +void Server::_session_logged(Session *session, __u64 state_seq, bool open, version_t pv, interval_set& inos, version_t piv) { - dout(10) << "_session_logged " << session->inst << " " << (open ? "open":"close") + dout(10) << "_session_logged " << session->inst << " state_seq " << state_seq << " " << (open ? "open":"close") << " " << pv << dendl; // apply - if (open) { + if (session->get_state_seq() != state_seq) { + dout(10) << " journaled state_seq " << state_seq << " != current " << session->get_state_seq() + << ", noop" << dendl; + // close must have been canceled (by an import?), or any number of other things.. + } else if (open) { assert(session->is_opening()); mds->sessionmap.set_state(session, Session::STATE_OPEN); mds->messenger->send_message(new MClientSession(CEPH_SESSION_OPEN), session->inst); } else if (session->is_closing() || session->is_stale_closing()) { - // kill any lingering capabilities, leases, requests while (!session->caps.empty()) { Capability *cap = session->caps.front(); @@ -295,8 +300,7 @@ void Server::_session_logged(Session *session, bool open, version_t pv, interval session->clear(); mds->sessionmap.remove_session(session); } else { - // close must have been canceled (by an import?) ... - assert(!open); + assert(0); } mds->sessionmap.version++; // noop } @@ -349,11 +353,12 @@ void Server::terminate_sessions() p != sessions.end(); ++p) { Session *session = *p; - if (session->is_closing()) continue; - mds->sessionmap.set_state(session, Session::STATE_CLOSING); + if (session->is_closing() || session->is_closed()) + continue; + __u64 sseq = mds->sessionmap.set_state(session, Session::STATE_CLOSING); version_t pv = ++mds->sessionmap.projected; mdlog->start_submit_entry(new ESession(session->inst, false, pv), - new C_MDS_session_finish(mds, session, false, pv)); + new C_MDS_session_finish(mds, session, sseq, false, pv)); mdlog->flush(); } } @@ -419,9 +424,9 @@ void Server::find_idle_sessions() } } -void Server::end_session (Session *session) +void Server::end_session(Session *session) { - assert (session); + assert(session); mds->sessionmap.set_state(session, Session::STATE_STALE_PURGING); if (session->prealloc_inos.empty()) { _finish_session_purge(session); @@ -441,10 +446,10 @@ void Server::_finish_session_purge(Session *session) { dout(10) << "_finish_session_purge " << session->inst << dendl; assert(session->is_stale_purging()); - mds->sessionmap.set_state(session, Session::STATE_STALE_CLOSING); + __u64 sseq = mds->sessionmap.set_state(session, Session::STATE_STALE_CLOSING); version_t pv = ++mds->sessionmap.projected; mdlog->start_submit_entry(new ESession(session->inst, false, pv), - new C_MDS_session_finish(mds, session, false, pv)); + new C_MDS_session_finish(mds, session, sseq, false, pv)); mdlog->flush(); } @@ -507,8 +512,9 @@ void Server::handle_client_reconnect(MClientReconnect *m) dout(7) << " client had no session, removing from session map" << dendl; assert(session); // ? version_t pv = ++mds->sessionmap.projected; + __u64 sseq = session->get_state_seq(); mdlog->start_submit_entry(new ESession(session->inst, false, pv), - new C_MDS_session_finish(mds, session, false, pv)); + new C_MDS_session_finish(mds, session, sseq, false, pv)); mdlog->flush(); } else { diff --git a/src/mds/Server.h b/src/mds/Server.h index 4aa9378d3add1..ad562e261bcc7 100644 --- a/src/mds/Server.h +++ b/src/mds/Server.h @@ -69,7 +69,8 @@ public: Session *get_session(Message *m); void handle_client_session(class MClientSession *m); - void _session_logged(Session *session, bool open, version_t pv, interval_set& inos,version_t piv); + void _session_logged(Session *session, __u64 state_seq, + bool open, version_t pv, interval_set& inos,version_t piv); void _finish_session_purge(Session *); version_t prepare_force_open_sessions(map &cm); void finish_force_open_sessions(map &cm); diff --git a/src/mds/SessionMap.h b/src/mds/SessionMap.h index 14b8bc9d30058..5c5e4b87fa88e 100644 --- a/src/mds/SessionMap.h +++ b/src/mds/SessionMap.h @@ -50,6 +50,7 @@ public: private: int state; + __u64 state_seq; friend class SessionMap; public: entity_inst_t inst; @@ -89,6 +90,7 @@ public: client_t get_client() { return client_t(inst.name.num()); } int get_state() { return state; } + __u64 get_state_seq() { return state_seq; } bool is_new() { return state == STATE_NEW; } bool is_opening() { return state == STATE_OPENING; } bool is_open() { return state == STATE_OPEN; } @@ -137,7 +139,7 @@ public: Session() : - state(STATE_NEW), + state(STATE_NEW), state_seq(0), session_list_item(this), cap_push_seq(0) { } ~Session() { @@ -241,11 +243,13 @@ public: if (by_state[state].empty()) return 0; return by_state[state].front(); } - void set_state(Session *session, int s) { + __u64 set_state(Session *session, int s) { if (session->state != s) { session->state = s; + session->state_seq++; by_state[s].push_back(&session->session_list_item); } + return session->state_seq; } void dump(); -- 2.39.5