class C_MDS_session_finish : public Context {
MDS *mds;
Session *session;
+ __u64 state_seq;
bool open;
version_t cmapv;
interval_set<inodeno_t> inos;
version_t inotablev;
public:
- C_MDS_session_finish(MDS *m, Session *se, bool s, version_t mv) :
- mds(m), session(se), open(s), cmapv(mv), inotablev(0) { }
- C_MDS_session_finish(MDS *m, Session *se, bool s, version_t mv, interval_set<inodeno_t>& i, version_t iv) :
- mds(m), session(se), open(s), cmapv(mv), inos(i), inotablev(iv) { }
+ C_MDS_session_finish(MDS *m, Session *se, __u64 sseq, bool s, version_t mv) :
+ mds(m), session(se), state_seq(sseq), open(s), cmapv(mv), inotablev(0) { }
+ C_MDS_session_finish(MDS *m, Session *se, __u64 sseq, bool s, version_t mv, interval_set<inodeno_t>& i, version_t iv) :
+ mds(m), session(se), state_seq(sseq), open(s), cmapv(mv), inos(i), inotablev(iv) { }
void finish(int r) {
assert(r == 0);
- mds->server->_session_logged(session, open, cmapv, inos, inotablev);
+ mds->server->_session_logged(session, state_seq, open, cmapv, inos, inotablev);
}
};
return;
}
+ __u64 sseq = 0;
switch (m->get_op()) {
case CEPH_SESSION_REQUEST_OPEN:
if (session->is_opening() || session->is_open()) {
}
if (session->is_closed())
mds->sessionmap.add_session(session);
- mds->sessionmap.set_state(session, Session::STATE_OPENING);
+ sseq = mds->sessionmap.set_state(session, Session::STATE_OPENING);
mds->sessionmap.touch_session(session);
pv = ++mds->sessionmap.projected;
mdlog->start_submit_entry(new ESession(m->get_source_inst(), true, pv),
- new C_MDS_session_finish(mds, session, true, pv));
+ new C_MDS_session_finish(mds, session, sseq, true, pv));
mdlog->flush();
break;
<< ", BUGGY!" << dendl;
assert(0);
}
- mds->sessionmap.set_state(session, Session::STATE_CLOSING);
+ sseq = mds->sessionmap.set_state(session, Session::STATE_CLOSING);
pv = ++mds->sessionmap.projected;
interval_set<inodeno_t> both = session->prealloc_inos;
piv = 0;
mdlog->start_submit_entry(new ESession(m->get_source_inst(), false, pv, both, piv),
- new C_MDS_session_finish(mds, session, false, pv, both, piv));
+ new C_MDS_session_finish(mds, session, sseq, false, pv, both, piv));
mdlog->flush();
}
break;
}
}
-void Server::_session_logged(Session *session, bool open, version_t pv, interval_set<inodeno_t>& inos, version_t piv)
+void Server::_session_logged(Session *session, __u64 state_seq, bool open, version_t pv, interval_set<inodeno_t>& inos, version_t piv)
{
- dout(10) << "_session_logged " << session->inst << " " << (open ? "open":"close")
+ dout(10) << "_session_logged " << session->inst << " state_seq " << state_seq << " " << (open ? "open":"close")
<< " " << pv << dendl;
// apply
- if (open) {
+ if (session->get_state_seq() != state_seq) {
+ dout(10) << " journaled state_seq " << state_seq << " != current " << session->get_state_seq()
+ << ", noop" << dendl;
+ // close must have been canceled (by an import?), or any number of other things..
+ } else if (open) {
assert(session->is_opening());
mds->sessionmap.set_state(session, Session::STATE_OPEN);
mds->messenger->send_message(new MClientSession(CEPH_SESSION_OPEN), session->inst);
} else if (session->is_closing() || session->is_stale_closing()) {
-
// kill any lingering capabilities, leases, requests
while (!session->caps.empty()) {
Capability *cap = session->caps.front();
session->clear();
mds->sessionmap.remove_session(session);
} else {
- // close must have been canceled (by an import?) ...
- assert(!open);
+ assert(0);
}
mds->sessionmap.version++; // noop
}
p != sessions.end();
++p) {
Session *session = *p;
- if (session->is_closing()) continue;
- mds->sessionmap.set_state(session, Session::STATE_CLOSING);
+ if (session->is_closing() || session->is_closed())
+ continue;
+ __u64 sseq = mds->sessionmap.set_state(session, Session::STATE_CLOSING);
version_t pv = ++mds->sessionmap.projected;
mdlog->start_submit_entry(new ESession(session->inst, false, pv),
- new C_MDS_session_finish(mds, session, false, pv));
+ new C_MDS_session_finish(mds, session, sseq, false, pv));
mdlog->flush();
}
}
}
}
-void Server::end_session (Session *session)
+void Server::end_session(Session *session)
{
- assert (session);
+ assert(session);
mds->sessionmap.set_state(session, Session::STATE_STALE_PURGING);
if (session->prealloc_inos.empty()) {
_finish_session_purge(session);
{
dout(10) << "_finish_session_purge " << session->inst << dendl;
assert(session->is_stale_purging());
- mds->sessionmap.set_state(session, Session::STATE_STALE_CLOSING);
+ __u64 sseq = mds->sessionmap.set_state(session, Session::STATE_STALE_CLOSING);
version_t pv = ++mds->sessionmap.projected;
mdlog->start_submit_entry(new ESession(session->inst, false, pv),
- new C_MDS_session_finish(mds, session, false, pv));
+ new C_MDS_session_finish(mds, session, sseq, false, pv));
mdlog->flush();
}
dout(7) << " client had no session, removing from session map" << dendl;
assert(session); // ?
version_t pv = ++mds->sessionmap.projected;
+ __u64 sseq = session->get_state_seq();
mdlog->start_submit_entry(new ESession(session->inst, false, pv),
- new C_MDS_session_finish(mds, session, false, pv));
+ new C_MDS_session_finish(mds, session, sseq, false, pv));
mdlog->flush();
} else {