q != info.export_targets.end();
q++) {
if (mds_sessions.count(*q) == 0 &&
- waiting_for_session.count(*q) == 0 &&
mdsmap->is_clientreplay_or_active_or_stopping(*q)) {
ldout(cct, 10) << "check_mds_sessions opening mds." << mds
- << " export target mds." << *q << dendl;
- messenger->send_message(new MClientSession(CEPH_SESSION_REQUEST_OPEN),
- mdsmap->get_inst(*q));
- waiting_for_session[*q].size();
+ << " export target mds." << *q << dendl;
+ _open_mds_session(*q);
}
}
}
void Client::dump_mds_sessions(Formatter *f)
{
- f->open_array_section("open_sessions");
+ f->open_array_section("sessions");
for (map<int,MetaSession*>::const_iterator p = mds_sessions.begin(); p != mds_sessions.end(); ++p) {
f->open_object_section("session");
p->second->dump(f);
f->close_section();
}
f->close_section();
- f->open_array_section("opening_sessions");
- for (map<int,list<Cond*> >::const_iterator p = waiting_for_session.begin(); p != waiting_for_session.end(); ++p) {
- f->dump_int("mds", p->first);
- }
- f->close_section();
f->dump_int("mdsmap_epoch", mdsmap->get_epoch());
}
void Client::dump_mds_requests(Formatter *f)
}
// open a session?
- if (mds_sessions.count(mds) == 0) {
+ if (!have_open_session(mds)) {
Cond cond;
if (!mdsmap->is_active_or_stopping(mds)) {
}
}
- if (waiting_for_session.count(mds) == 0) {
- ldout(cct, 10) << "opening session to mds." << mds << dendl;
- messenger->send_message(new MClientSession(CEPH_SESSION_REQUEST_OPEN),
- mdsmap->get_inst(mds));
- }
-
+ MetaSession *session = _get_or_open_mds_session(mds);
+
// wait
- waiting_for_session[mds].push_back(&cond);
- while (waiting_for_session.count(mds)) {
- ldout(cct, 10) << "waiting for session to mds." << mds << " to open" << dendl;
- cond.Wait(client_lock);
+ if (session->state == MetaSession::STATE_OPENING) {
+ session->waiting_for_open.push_back(&cond);
+ while (session->state == MetaSession::STATE_OPENING) {
+ ldout(cct, 10) << "waiting for session to mds." << mds << " to open" << dendl;
+ cond.Wait(client_lock);
+ }
}
- if (mds_sessions.count(mds) == 0)
+ if (!have_open_session(mds))
continue;
}
<< req << ", mds " << mds <<dendl;
}
+bool Client::have_open_session(int mds)
+{
+ return
+ mds_sessions.count(mds) &&
+ mds_sessions[mds]->state == MetaSession::STATE_OPEN;
+}
+
+MetaSession *Client::_get_or_open_mds_session(int mds)
+{
+ if (mds_sessions.count(mds))
+ return mds_sessions[mds];
+ return _open_mds_session(mds);
+}
+
+MetaSession *Client::_open_mds_session(int mds)
+{
+ ldout(cct, 10) << "_open_mds_session mds." << mds << dendl;
+ assert(mds_sessions.count(mds) == 0);
+ MetaSession *session = new MetaSession;
+ session->mds_num = mds;
+ session->seq = 0;
+ session->inst = mdsmap->get_inst(mds);
+ session->state = MetaSession::STATE_OPENING;
+ mds_sessions[mds] = session;
+ messenger->send_message(new MClientSession(CEPH_SESSION_REQUEST_OPEN),
+ session->inst);
+ return session;
+}
+
void Client::_closed_mds_session(int mds, MetaSession *s)
{
mount_cond.Signal();
{
int from = m->get_source().num();
ldout(cct, 10) << "handle_client_session " << *m << " from mds." << from << dendl;
- MetaSession *mds_session = NULL;
- if (mds_sessions.count(from))
- mds_session = mds_sessions[from];
+
+ if (mds_sessions.count(from) == 0) {
+ ldout(cct, 10) << " discarding session message from sessionless mds " << m->get_source_inst() << dendl;
+ m->put();
+ return;
+ }
+ MetaSession *session = mds_sessions[from];
switch (m->get_op()) {
case CEPH_SESSION_OPEN:
- if (!mds_session) {
- mds_sessions[from] = mds_session = new MetaSession();
- mds_session->mds_num = from;
- mds_session->seq = 0;
- mds_session->inst = m->get_source_inst();
- }
- renew_caps(from);
+ renew_caps(session);
if (unmounting) {
- mds_session->closing = true;
- messenger->send_message(new MClientSession(CEPH_SESSION_REQUEST_CLOSE, mds_session->seq),
+ session->state = MetaSession::STATE_CLOSING;
+ messenger->send_message(new MClientSession(CEPH_SESSION_REQUEST_CLOSE, session->seq),
mdsmap->get_inst(from));
} else {
+ session->state = MetaSession::STATE_OPEN;
connect_mds_targets(from);
}
break;
case CEPH_SESSION_CLOSE:
- _closed_mds_session(from, mds_session);
+ _closed_mds_session(from, session);
break;
case CEPH_SESSION_RENEWCAPS:
- if (!mds_session) {
- ldout(cct, 0) << "handle_client_session " << *m
- << " no open session for mds." << from << dendl;
- break;
- }
- if (mds_session->cap_renew_seq == m->get_seq()) {
- mds_session->cap_ttl =
- mds_session->last_cap_renew_request + mdsmap->get_session_timeout();
+ if (session->cap_renew_seq == m->get_seq()) {
+ session->cap_ttl =
+ session->last_cap_renew_request + mdsmap->get_session_timeout();
wake_inode_waiters(from);
}
break;
case CEPH_SESSION_STALE:
- /* check that if we don't have a valid mds_session for this mds,
- * we're still waiting for the open session reply */
- if (mds_session) {
- renew_caps(from);
- } else {
- if (waiting_for_session.count(from) != 0) {
- ldout(cct, 10) << "handle_client_session " << *m
- << " still waiting for open session reply" << dendl;
- // don't signal waiters, just return
- m->put();
- return;
- } else {
- // how can we get a stale mds without an open or pending session?
- assert(mds_session);
- }
- }
+ renew_caps(session);
break;
case CEPH_SESSION_RECALL_STATE:
}
// kick waiting threads
- signal_cond_list(waiting_for_session[from]);
- waiting_for_session.erase(from);
+ signal_cond_list(session->waiting_for_open);
m->put();
}
if (oldstate < MDSMap::STATE_ACTIVE) {
kick_requests(p->first, false);
kick_flushing_caps(p->first);
+ signal_cond_list(p->second->waiting_for_open);
}
connect_mds_targets(p->first);
}
ls.swap(waiting_for_mdsmap);
signal_cond_list(ls);
- map<int,list<Cond*> >::iterator p = waiting_for_session.begin();
- while (p != waiting_for_session.end()) {
- int oldstate = oldmap->get_state(p->first);
- int newstate = mdsmap->get_state(p->first);
- if (newstate >= MDSMap::STATE_ACTIVE &&
- oldstate < MDSMap::STATE_ACTIVE) {
- ldout(cct, 20) << "kick_opening_mds_session mds." << p->first << dendl;
- signal_cond_list(p->second);
- waiting_for_session.erase(p++);
- } else {
- p++;
- }
- }
-
delete oldmap;
m->put();
s->seq++;
ldout(cct, 10) << " mds." << mds << " seq now " << s->seq << dendl;
- if (s->closing)
+ if (s->state == MetaSession::STATE_CLOSING)
messenger->send_message(new MClientSession(CEPH_SESSION_REQUEST_CLOSE, s->seq),
s->inst);
}
++p) {
ldout(cct, 2) << "sending client_session close to mds." << p->first
<< " seq " << p->second->seq << dendl;
- if (!p->second->closing) {
- p->second->closing = true;
+ if (p->second->state != MetaSession::STATE_CLOSING) {
+ p->second->state = MetaSession::STATE_CLOSING;
messenger->send_message(new MClientSession(CEPH_SESSION_REQUEST_CLOSE, p->second->seq),
mdsmap->get_inst(p->first));
}
p++) {
ldout(cct, 15) << "renew_caps requesting from mds." << p->first << dendl;
if (mdsmap->get_state(p->first) >= MDSMap::STATE_REJOIN)
- renew_caps(p->first);
+ renew_caps(p->second);
}
}
-void Client::renew_caps(const int mds)
+void Client::renew_caps(MetaSession *session)
{
- ldout(cct, 10) << "renew_caps mds." << mds << dendl;
- MetaSession *session = mds_sessions[mds];
+ ldout(cct, 10) << "renew_caps mds." << session->mds_num << dendl;
session->last_cap_renew_request = ceph_clock_now(cct);
uint64_t seq = ++session->cap_renew_seq;
messenger->send_message(new MClientSession(CEPH_SESSION_REQUEST_RENEWCAPS, seq),
- mdsmap->get_inst(mds));
-
+ session->inst);
}
}
}
if (mds >= 0) {
- if (s->closing) {
+ if (s->state == MetaSession::STATE_CLOSING) {
ldout(cct, 1) << "reset from mds we were closing; we'll call that closed" << dendl;
_closed_mds_session(mds, s);
}