From: Sage Weil Date: Fri, 8 Mar 2013 20:56:54 +0000 (-0800) Subject: client: instantiate MetaSession for opening connections X-Git-Tag: v0.60~81^2~17 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=e47d4906845ef35a657702177963989b75907b0d;p=ceph.git client: instantiate MetaSession for opening connections The previous kludge where a waiting_for_session key indicated that we had an open in progress was... kludgey. Introduce some helpers to do the session creation/open. Move the waiting list to be a session member, and clean up associated code. Signed-off-by: Sage Weil --- diff --git a/src/client/Client.cc b/src/client/Client.cc index 43930bd8cc33..f115e10aca98 100644 --- a/src/client/Client.cc +++ b/src/client/Client.cc @@ -1111,31 +1111,23 @@ void Client::connect_mds_targets(int mds) q != info.export_targets.end(); q++) { if (mds_sessions.count(*q) == 0 && - waiting_for_session.count(*q) == 0 && mdsmap->is_clientreplay_or_active_or_stopping(*q)) { ldout(cct, 10) << "check_mds_sessions opening mds." << mds - << " export target mds." << *q << dendl; - messenger->send_message(new MClientSession(CEPH_SESSION_REQUEST_OPEN), - mdsmap->get_inst(*q)); - waiting_for_session[*q].size(); + << " export target mds." << *q << dendl; + _open_mds_session(*q); } } } void Client::dump_mds_sessions(Formatter *f) { - f->open_array_section("open_sessions"); + f->open_array_section("sessions"); for (map::const_iterator p = mds_sessions.begin(); p != mds_sessions.end(); ++p) { f->open_object_section("session"); p->second->dump(f); f->close_section(); } f->close_section(); - f->open_array_section("opening_sessions"); - for (map >::const_iterator p = waiting_for_session.begin(); p != waiting_for_session.end(); ++p) { - f->dump_int("mds", p->first); - } - f->close_section(); f->dump_int("mdsmap_epoch", mdsmap->get_epoch()); } void Client::dump_mds_requests(Formatter *f) @@ -1286,7 +1278,7 @@ int Client::make_request(MetaRequest *request, } // open a session? - if (mds_sessions.count(mds) == 0) { + if (!have_open_session(mds)) { Cond cond; if (!mdsmap->is_active_or_stopping(mds)) { @@ -1301,20 +1293,18 @@ int Client::make_request(MetaRequest *request, } } - if (waiting_for_session.count(mds) == 0) { - ldout(cct, 10) << "opening session to mds." << mds << dendl; - messenger->send_message(new MClientSession(CEPH_SESSION_REQUEST_OPEN), - mdsmap->get_inst(mds)); - } - + MetaSession *session = _get_or_open_mds_session(mds); + // wait - waiting_for_session[mds].push_back(&cond); - while (waiting_for_session.count(mds)) { - ldout(cct, 10) << "waiting for session to mds." << mds << " to open" << dendl; - cond.Wait(client_lock); + if (session->state == MetaSession::STATE_OPENING) { + session->waiting_for_open.push_back(&cond); + while (session->state == MetaSession::STATE_OPENING) { + ldout(cct, 10) << "waiting for session to mds." << mds << " to open" << dendl; + cond.Wait(client_lock); + } } - if (mds_sessions.count(mds) == 0) + if (!have_open_session(mds)) continue; } @@ -1461,6 +1451,35 @@ void Client::encode_cap_releases(MetaRequest *req, int mds) << req << ", mds " << mds <state == MetaSession::STATE_OPEN; +} + +MetaSession *Client::_get_or_open_mds_session(int mds) +{ + if (mds_sessions.count(mds)) + return mds_sessions[mds]; + return _open_mds_session(mds); +} + +MetaSession *Client::_open_mds_session(int mds) +{ + ldout(cct, 10) << "_open_mds_session mds." << mds << dendl; + assert(mds_sessions.count(mds) == 0); + MetaSession *session = new MetaSession; + session->mds_num = mds; + session->seq = 0; + session->inst = mdsmap->get_inst(mds); + session->state = MetaSession::STATE_OPENING; + mds_sessions[mds] = session; + messenger->send_message(new MClientSession(CEPH_SESSION_REQUEST_OPEN), + session->inst); + return session; +} + void Client::_closed_mds_session(int mds, MetaSession *s) { mount_cond.Signal(); @@ -1475,62 +1494,41 @@ void Client::handle_client_session(MClientSession *m) { int from = m->get_source().num(); ldout(cct, 10) << "handle_client_session " << *m << " from mds." << from << dendl; - MetaSession *mds_session = NULL; - if (mds_sessions.count(from)) - mds_session = mds_sessions[from]; + + if (mds_sessions.count(from) == 0) { + ldout(cct, 10) << " discarding session message from sessionless mds " << m->get_source_inst() << dendl; + m->put(); + return; + } + MetaSession *session = mds_sessions[from]; switch (m->get_op()) { case CEPH_SESSION_OPEN: - if (!mds_session) { - mds_sessions[from] = mds_session = new MetaSession(); - mds_session->mds_num = from; - mds_session->seq = 0; - mds_session->inst = m->get_source_inst(); - } - renew_caps(from); + renew_caps(session); if (unmounting) { - mds_session->closing = true; - messenger->send_message(new MClientSession(CEPH_SESSION_REQUEST_CLOSE, mds_session->seq), + session->state = MetaSession::STATE_CLOSING; + messenger->send_message(new MClientSession(CEPH_SESSION_REQUEST_CLOSE, session->seq), mdsmap->get_inst(from)); } else { + session->state = MetaSession::STATE_OPEN; connect_mds_targets(from); } break; case CEPH_SESSION_CLOSE: - _closed_mds_session(from, mds_session); + _closed_mds_session(from, session); break; case CEPH_SESSION_RENEWCAPS: - if (!mds_session) { - ldout(cct, 0) << "handle_client_session " << *m - << " no open session for mds." << from << dendl; - break; - } - if (mds_session->cap_renew_seq == m->get_seq()) { - mds_session->cap_ttl = - mds_session->last_cap_renew_request + mdsmap->get_session_timeout(); + if (session->cap_renew_seq == m->get_seq()) { + session->cap_ttl = + session->last_cap_renew_request + mdsmap->get_session_timeout(); wake_inode_waiters(from); } break; case CEPH_SESSION_STALE: - /* check that if we don't have a valid mds_session for this mds, - * we're still waiting for the open session reply */ - if (mds_session) { - renew_caps(from); - } else { - if (waiting_for_session.count(from) != 0) { - ldout(cct, 10) << "handle_client_session " << *m - << " still waiting for open session reply" << dendl; - // don't signal waiters, just return - m->put(); - return; - } else { - // how can we get a stale mds without an open or pending session? - assert(mds_session); - } - } + renew_caps(session); break; case CEPH_SESSION_RECALL_STATE: @@ -1542,8 +1540,7 @@ void Client::handle_client_session(MClientSession *m) } // kick waiting threads - signal_cond_list(waiting_for_session[from]); - waiting_for_session.erase(from); + signal_cond_list(session->waiting_for_open); m->put(); } @@ -1843,6 +1840,7 @@ void Client::handle_mds_map(MMDSMap* m) if (oldstate < MDSMap::STATE_ACTIVE) { kick_requests(p->first, false); kick_flushing_caps(p->first); + signal_cond_list(p->second->waiting_for_open); } connect_mds_targets(p->first); } @@ -1853,20 +1851,6 @@ void Client::handle_mds_map(MMDSMap* m) ls.swap(waiting_for_mdsmap); signal_cond_list(ls); - map >::iterator p = waiting_for_session.begin(); - while (p != waiting_for_session.end()) { - int oldstate = oldmap->get_state(p->first); - int newstate = mdsmap->get_state(p->first); - if (newstate >= MDSMap::STATE_ACTIVE && - oldstate < MDSMap::STATE_ACTIVE) { - ldout(cct, 20) << "kick_opening_mds_session mds." << p->first << dendl; - signal_cond_list(p->second); - waiting_for_session.erase(p++); - } else { - p++; - } - } - delete oldmap; m->put(); @@ -1977,7 +1961,7 @@ void Client::got_mds_push(int mds) s->seq++; ldout(cct, 10) << " mds." << mds << " seq now " << s->seq << dendl; - if (s->closing) + if (s->state == MetaSession::STATE_CLOSING) messenger->send_message(new MClientSession(CEPH_SESSION_REQUEST_CLOSE, s->seq), s->inst); } @@ -3750,8 +3734,8 @@ void Client::unmount() ++p) { ldout(cct, 2) << "sending client_session close to mds." << p->first << " seq " << p->second->seq << dendl; - if (!p->second->closing) { - p->second->closing = true; + if (p->second->state != MetaSession::STATE_CLOSING) { + p->second->state = MetaSession::STATE_CLOSING; messenger->send_message(new MClientSession(CEPH_SESSION_REQUEST_CLOSE, p->second->seq), mdsmap->get_inst(p->first)); } @@ -3839,19 +3823,17 @@ void Client::renew_caps() p++) { ldout(cct, 15) << "renew_caps requesting from mds." << p->first << dendl; if (mdsmap->get_state(p->first) >= MDSMap::STATE_REJOIN) - renew_caps(p->first); + renew_caps(p->second); } } -void Client::renew_caps(const int mds) +void Client::renew_caps(MetaSession *session) { - ldout(cct, 10) << "renew_caps mds." << mds << dendl; - MetaSession *session = mds_sessions[mds]; + ldout(cct, 10) << "renew_caps mds." << session->mds_num << dendl; session->last_cap_renew_request = ceph_clock_now(cct); uint64_t seq = ++session->cap_renew_seq; messenger->send_message(new MClientSession(CEPH_SESSION_REQUEST_RENEWCAPS, seq), - mdsmap->get_inst(mds)); - + session->inst); } @@ -7782,7 +7764,7 @@ void Client::ms_handle_remote_reset(Connection *con) } } if (mds >= 0) { - if (s->closing) { + if (s->state == MetaSession::STATE_CLOSING) { ldout(cct, 1) << "reset from mds we were closing; we'll call that closed" << dendl; _closed_mds_session(mds, s); } diff --git a/src/client/Client.h b/src/client/Client.h index d44b87e202b7..63b699c40228 100644 --- a/src/client/Client.h +++ b/src/client/Client.h @@ -217,7 +217,7 @@ class Client : public Dispatcher { Context *tick_event; utime_t last_cap_renew; void renew_caps(); - void renew_caps(int s); + void renew_caps(MetaSession *session); void flush_cap_releases(); public: void tick(); @@ -229,10 +229,12 @@ public: // mds sessions map mds_sessions; // mds -> push seq - map > waiting_for_session; list waiting_for_mdsmap; + bool have_open_session(int mds); void got_mds_push(int mds); + MetaSession *_get_or_open_mds_session(int mds); + MetaSession *_open_mds_session(int mds); void _closed_mds_session(int mds, MetaSession *s); void handle_client_session(MClientSession *m); void send_reconnect(int mds); diff --git a/src/client/MetaSession.cc b/src/client/MetaSession.cc index 36338c0bd605..c6a9496710b5 100644 --- a/src/client/MetaSession.cc +++ b/src/client/MetaSession.cc @@ -7,6 +7,17 @@ #include "common/Formatter.h" +const char *MetaSession::get_state_name() const +{ + switch (state) { + case STATE_NEW: return "new"; + case STATE_OPENING: return "opening"; + case STATE_OPEN: return "open"; + case STATE_CLOSING: return "closing"; + default: return "unknown"; + } +} + void MetaSession::dump(Formatter *f) const { f->dump_int("mds", mds_num); @@ -17,7 +28,7 @@ void MetaSession::dump(Formatter *f) const f->dump_stream("last_cap_renew_request") << last_cap_renew_request; f->dump_unsigned("cap_renew_seq", cap_renew_seq); f->dump_int("num_caps", num_caps); - f->dump_int("closing", (int)closing); + f->dump_string("state", get_state_name()); f->dump_int("was_stale", (int)was_stale); } diff --git a/src/client/MetaSession.h b/src/client/MetaSession.h index 20a7a588d732..bb3fad241a0d 100644 --- a/src/client/MetaSession.h +++ b/src/client/MetaSession.h @@ -19,15 +19,24 @@ class MClientCapRelease; struct MetaSession { int mds_num; + Connection *con; version_t seq; uint64_t cap_gen; utime_t cap_ttl, last_cap_renew_request; uint64_t cap_renew_seq; int num_caps; entity_inst_t inst; - bool closing; bool was_stale; + enum { + STATE_NEW, + STATE_OPENING, + STATE_OPEN, + STATE_CLOSING, + } state; + + list waiting_for_open; + xlist caps; xlist flushing_caps; xlist flushing_capsnaps; @@ -36,10 +45,17 @@ struct MetaSession { MClientCapRelease *release; - MetaSession() : mds_num(-1), seq(0), cap_gen(0), cap_renew_seq(0), num_caps(0), - closing(false), was_stale(false), release(NULL) {} + MetaSession() + : mds_num(-1), con(NULL), + seq(0), cap_gen(0), cap_renew_seq(0), num_caps(0), + was_stale(false), + state(STATE_NEW), + release(NULL) + {} ~MetaSession(); + const char *get_state_name() const; + void dump(Formatter *f) const; };