__u64 sseq = 0;
switch (m->get_op()) {
case CEPH_SESSION_REQUEST_OPEN:
- if (session->is_opening() || session->is_open()) {
- dout(10) << "already open|opening, dropping this req" << dendl;
+ if (session->is_opening() ||
+ session->is_open() ||
+ session->is_stale() ||
+ session->is_killing()) {
+ dout(10) << "currently open|opening|stale|killing, dropping this req" << dendl;
return;
}
+ assert(session->is_closed() ||
+ session->is_closing());
sseq = mds->sessionmap.set_state(session, Session::STATE_OPENING);
mds->sessionmap.touch_session(session);
pv = ++mds->sessionmap.projected;
break;
case CEPH_SESSION_REQUEST_RENEWCAPS:
- if (session->is_open() || session->is_stale()) {
- assert(session->is_stale() || session->is_open());
+ if (session->is_open() ||
+ session->is_stale()) {
mds->sessionmap.touch_session(session);
if (session->is_stale()) {
mds->sessionmap.set_state(session, Session::STATE_OPEN);
case CEPH_SESSION_REQUEST_CLOSE:
{
- if (session->is_closed() || session->is_closing()) {
- dout(10) << "already closed|closing, dropping this req" << dendl;
+ if (session->is_closed() ||
+ session->is_closing() ||
+ session->is_killing()) {
+ dout(10) << "already closed|closing|killing, dropping this req" << dendl;
return;
}
+ assert(session->is_open() ||
+ session->is_stale() ||
+ session->is_opening());
if (m->get_seq() < session->get_push_seq()) {
dout(10) << "old push seq " << m->get_seq() << " < " << session->get_push_seq()
<< ", dropping" << dendl;
}
if (m->get_seq() != session->get_push_seq()) {
dout(0) << "old push seq " << m->get_seq() << " != " << session->get_push_seq()
- << ", BUGGY!" << dendl;
+ << ", BUGGY!" << dendl;
assert(0);
}
sseq = mds->sessionmap.set_state(session, Session::STATE_CLOSING);
}
}
-void Server::_session_logged(Session *session, __u64 state_seq, bool open, version_t pv, interval_set<inodeno_t>& inos, version_t piv)
+void Server::_session_logged(Session *session, __u64 state_seq, bool open, version_t pv,
+ interval_set<inodeno_t>& inos, version_t piv)
{
dout(10) << "_session_logged " << session->inst << " state_seq " << state_seq << " " << (open ? "open":"close")
<< " " << pv << dendl;
assert(session->is_opening());
mds->sessionmap.set_state(session, Session::STATE_OPEN);
mds->messenger->send_message(new MClientSession(CEPH_SESSION_OPEN), session->inst);
- } else if (session->is_closing() || session->is_stale_closing()) {
+ } else if (session->is_closing() ||
+ session->is_killing()) {
// kill any lingering capabilities, leases, requests
while (!session->caps.empty()) {
Capability *cap = session->caps.front();
mds->messenger->send_message(new MClientSession(CEPH_SESSION_CLOSE), session->inst);
mds->sessionmap.set_state(session, Session::STATE_CLOSED);
session->clear();
- } else if (session->is_stale_closing()) {
+ } else if (session->is_killing()) {
// destroy session, close connection
mds->messenger->mark_down(session->inst.addr);
mds->sessionmap.remove_session(session);
+ } else {
+ assert(0);
}
} else {
assert(0);
<< dendl;
for (map<client_t,entity_inst_t>::iterator p = cm.begin(); p != cm.end(); ++p) {
Session *session = mds->sessionmap.get_or_add_session(p->second);
- if (session->is_closed() || session->is_closing())
+ if (session->is_closed() ||
+ session->is_closing() ||
+ session->is_killing())
mds->sessionmap.set_state(session, Session::STATE_OPENING);
mds->sessionmap.touch_session(session);
}
p != sessions.end();
++p) {
Session *session = *p;
- if (session->is_closing() || session->is_closed())
+ if (session->is_closing() ||
+ session->is_killing() ||
+ session->is_closed())
continue;
__u64 sseq = mds->sessionmap.set_state(session, Session::STATE_CLOSING);
version_t pv = ++mds->sessionmap.projected;
}
-struct C_MDS_session_purged : public Context {
- Server *server;
- Session *session;
- C_MDS_session_purged(Server *s, Session *ss) : server(s), session(ss) {}
- void finish(int r) {
- server->_finish_session_purge(session);
- }
-};
-
void Server::find_idle_sessions()
{
dout(10) << "find_idle_sessions" << dendl;
void Server::end_session(Session *session)
{
assert(session);
- mds->sessionmap.set_state(session, Session::STATE_STALE_PURGING);
- if (session->prealloc_inos.empty()) {
- _finish_session_purge(session);
- } else {
- C_Gather *fin = new C_Gather(new C_MDS_session_purged(this, session));
- for (map<inodeno_t,inodeno_t>::iterator p = session->prealloc_inos.m.begin();
- p != session->prealloc_inos.m.end();
- p++) {
- inodeno_t last = p->first + p->second;
- for (inodeno_t i = p->first; i < last; i = i + 1)
- mds->mdcache->purge_prealloc_ino(i, fin->new_sub());
- }
- }
-}
-
-void Server::_finish_session_purge(Session *session)
-{
- dout(10) << "_finish_session_purge " << session->inst << dendl;
- assert(session->is_stale_purging());
- __u64 sseq = mds->sessionmap.set_state(session, Session::STATE_STALE_CLOSING);
+ __u64 sseq = mds->sessionmap.set_state(session, Session::STATE_KILLING);
version_t pv = ++mds->sessionmap.projected;
mdlog->start_submit_entry(new ESession(session->inst, false, pv),
new C_MDS_session_finish(mds, session, sseq, false, pv));
mdlog->flush();
}
-
void Server::reconnect_clients()
{
mds->sessionmap.get_client_set(client_reconnect_gather);
delete req;
return;
}
- if (session->is_closed() || session->is_closing() || session->is_stale_purging() ||
- session->is_stale_closing()) {
- dout(5) << "session closed|closing|stale_purging|stale_closing, dropping" << dendl;
+ if (session->is_closed() ||
+ session->is_closing() ||
+ session->is_killing()) {
+ dout(5) << "session closed|closing|killing, dropping" << dendl;
delete req;
return;
}
// -- state etc --
public:
/*
-
- closed <---------------------+
- | |
- v |
- opening <---+ |
- | | |
- v | |
- open --> closing ------------+
- |^
- v|
- stale -> stale_purging --> stale_closing --> <deleted>
+ +-----------------------------------+
+ | |
+ closed <---------------------+ |
+ | | |
+ v | |
+ opening <---+ | |
+ | | | |
+ v | | |
+ open --> closing ------------+ |
+ |^ |
+ v| v
+ stale -> killing --------------> <deleted>
+
+ (closed) -> importing -> (opening or open or <deleted>)
*/
static const int STATE_CLOSED = 0;
static const int STATE_OPEN = 2;
static const int STATE_CLOSING = 3; // journaling close
static const int STATE_STALE = 4;
- static const int STATE_STALE_PURGING = 5;
- static const int STATE_STALE_CLOSING = 6;
+ static const int STATE_KILLING = 5;
const char *get_state_name(int s) {
switch (s) {
case STATE_OPEN: return "open";
case STATE_CLOSING: return "closing";
case STATE_STALE: return "stale";
- case STATE_STALE_PURGING: return "stale_purging";
- case STATE_STALE_CLOSING: return "stale_closing";
+ case STATE_KILLING: return "killing";
default: return "???";
}
}
bool is_open() { return state == STATE_OPEN; }
bool is_closing() { return state == STATE_CLOSING; }
bool is_stale() { return state == STATE_STALE; }
- bool is_stale_purging() { return state == STATE_STALE_PURGING; }
- bool is_stale_closing() { return state == STATE_STALE_CLOSING; }
+ bool is_killing() { return state == STATE_KILLING; }
// -- caps --
private: