objecter->ms_handle_reset(con);
} else if (con->get_peer_type() == CEPH_ENTITY_TYPE_CLIENT) {
Session *session = (Session *)con->get_priv();
- if (!session || session->is_closed())
+ if (!session || session->is_closed() || session->is_new())
messenger->mark_down(con->get_peer_addr());
+ if (session->is_new())
+ sessionmap.remove_session(session);
if (session)
session->put();
}
int protocol, bufferlist& authorizer_data, bufferlist& authorizer_reply,
bool& is_valid)
{
+ Mutex::Locker l(mds_lock);
+
AuthAuthorizeHandler *authorize_handler = get_authorize_handler(protocol);
if (!authorize_handler) {
is_valid = false;
s->inst.name = n;
dout(10) << " new session " << s << " for " << s->inst << dendl;
con->set_priv(s);
+ sessionmap.add_session(s);
} else {
dout(10) << " existing session " << s << " for " << s->inst << dendl;
con->set_priv(s->get());
{
Session *session = (Session *)m->get_connection()->get_priv();
if (session) {
- dout(20) << "get_session got " << session->inst << " from message" << dendl;
+ dout(20) << "get_session have " << session << " " << session->inst << " state " << session->get_state() << dendl;
session->put(); // not carry ref
} else {
dout(20) << "get_session dne for " << m->get_source_inst() << dendl;
<< " on " << cm.size() << " clients"
<< dendl;
for (map<client_t,entity_inst_t>::iterator p = cm.begin(); p != cm.end(); ++p) {
- Session *session = mds->sessionmap.get_or_add_session(p->second);
+ Session *session = mds->sessionmap.get_or_add_open_session(p->second);
if (session->is_closed() || session->is_closing())
mds->sessionmap.set_state(session, Session::STATE_OPENING);
mds->sessionmap.touch_session(session);
dout(1) << " no longer in reconnect state, ignoring reconnect, sending close" << dendl;
ss << "denied reconnect attempt (mds is " << ceph_mds_state_name(mds->get_state())
<< ") from " << m->get_source_inst();
- } else {
+ } else if (!session) {
dout(1) << " no session for " << m->get_source() << ", ignoring reconnect, sending close" << dendl;
ss << "denied reconnect attempt from " << m->get_source_inst() << " (no session)";
- }
+ } else if (session->is_closed()) {
+ dout(1) << " no session for " << m->get_source() << ", ignoring reconnect, sending close" << dendl;
+ ss << "denied reconnect attempt from " << m->get_source_inst() << " (session closed)";
+ } else
+ assert(0);
ss << " after " << delay << " (allowed interval " << g_conf.mds_reconnect_timeout << ")";
mds->logclient.log(LOG_INFO, ss);
mds->messenger->send_message(new MClientSession(CEPH_SESSION_CLOSE), m->get_source_inst());
class Session : public RefCountedObject {
// -- state etc --
public:
- static const int STATE_CLOSED = 0;
+ static const int STATE_NEW = 0;
static const int STATE_OPENING = 1; // journaling open
static const int STATE_OPEN = 2;
static const int STATE_CLOSING = 3; // journaling close
static const int STATE_STALE = 4;
static const int STATE_STALE_PURGING = 5;
static const int STATE_STALE_CLOSING = 6;
+ static const int STATE_CLOSED = 7;
private:
int state;
client_t get_client() { return client_t(inst.name.num()); }
- bool is_closed() { return state == STATE_CLOSED; }
+ int get_state() { return state; }
+ bool is_new() { return state == STATE_NEW; }
bool is_opening() { return state == STATE_OPENING; }
bool is_open() { return state == STATE_OPEN; }
bool is_closing() { return state == STATE_CLOSING; }
bool is_stale() { return state == STATE_STALE; }
bool is_stale_purging() { return state == STATE_STALE_PURGING; }
bool is_stale_closing() { return state == STATE_STALE_CLOSING; }
+ bool is_closed() { return state == STATE_CLOSED; }
// -- caps --
private:
Session() :
- state(STATE_CLOSED),
+ state(STATE_NEW),
session_list_item(this),
cap_push_seq(0) { }
~Session() {
return session_map[w];
return 0;
}
- Session* get_or_add_session(entity_inst_t i) {
+ Session* get_or_add_open_session(entity_inst_t i) {
if (session_map.count(i.name))
return session_map[i.name];
Session *s = session_map[i.name] = new Session;
s->inst = i;
+ set_state(s, Session::STATE_OPEN);
+ s->last_cap_renew = g_clock.now();
return s;
}
void add_session(Session *s) {
assert(session_map.count(s->inst.name) == 0);
session_map[s->inst.name] = s;
+ by_state[s->state].push_back(&s->session_list_item);
s->get();
}
void remove_session(Session *s) {
void open_sessions(map<client_t,entity_inst_t>& client_map) {
for (map<client_t,entity_inst_t>::iterator p = client_map.begin();
p != client_map.end();
- ++p) {
- Session *session = get_or_add_session(p->second);
- session->inst = p->second;
- set_state(session, Session::STATE_OPEN);
- }
+ ++p)
+ get_or_add_open_session(p->second);
version++;
}
mds->sessionmap.projected = ++mds->sessionmap.version;
assert(mds->sessionmap.version == cmapv);
if (open) {
- Session *session = mds->sessionmap.get_or_add_session(client_inst);
- session->last_cap_renew = g_clock.now();
- mds->sessionmap.set_state(session, Session::STATE_OPEN);
+ Session *session = mds->sessionmap.get_or_add_open_session(client_inst);
+ dout(10) << "session " << session << " state " << session->get_state() << dendl;
} else {
Session *session = mds->sessionmap.get_session(client_inst.name);
if (session)