From 51cc9398941ad496b74efe7f027d7e3856ee6188 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Mon, 31 Dec 2007 14:52:26 -0800 Subject: [PATCH] clientmap replaced with sessionmap; untested --- src/mds/LogSegment.h | 4 +- src/mds/MDCache.cc | 6 +- src/mds/MDCache.h | 10 ++- src/mds/MDS.cc | 14 ++-- src/mds/MDS.h | 1 - src/mds/Migrator.cc | 4 +- src/mds/Server.cc | 167 +++++++++++++++++++++++-------------------- src/mds/Server.h | 2 +- src/mds/SessionMap.h | 79 ++++++++++++++++---- src/mds/journal.cc | 69 +++++++++--------- 10 files changed, 213 insertions(+), 143 deletions(-) diff --git a/src/mds/LogSegment.h b/src/mds/LogSegment.h index c4cf1d50897ff..4723960c79363 100644 --- a/src/mds/LogSegment.h +++ b/src/mds/LogSegment.h @@ -54,7 +54,7 @@ class LogSegment { // table version version_t allocv; - version_t clientmapv; + version_t sessionmapv; version_t anchortablev; // try to expire @@ -62,7 +62,7 @@ class LogSegment { // cons LogSegment(off_t off) : offset(off), end(off), num_events(0), - allocv(0), clientmapv(0), anchortablev(0) + allocv(0), sessionmapv(0), anchortablev(0) { } }; diff --git a/src/mds/MDCache.cc b/src/mds/MDCache.cc index af4dfdec9ce03..bf5947df107f5 100644 --- a/src/mds/MDCache.cc +++ b/src/mds/MDCache.cc @@ -1252,7 +1252,7 @@ void MDCache::handle_resolve(MMDSResolve *m) for (list::iterator p = m->slave_requests.begin(); p != m->slave_requests.end(); ++p) { - if (mds->clientmap.have_completed_request(*p)) { + if (mds->sessionmap.have_completed_request(*p)) { // COMMIT dout(10) << " ambiguous slave request " << *p << " will COMMIT" << dendl; ack->add_commit(*p); @@ -2664,7 +2664,9 @@ void MDCache::rejoin_import_cap(CInode *in, int client, inode_caps_reconnect_t& in->client_caps[client].wanted()); reap->set_mds( frommds ); // reap from whom? - mds->messenger->send_message(reap, mds->clientmap.get_inst(client)); + Session *session = mds->sessionmap.get_session(entity_name_t::CLIENT(client)); + assert(session); + mds->messenger->send_message(reap, session->inst); } void MDCache::rejoin_send_acks() diff --git a/src/mds/MDCache.h b/src/mds/MDCache.h index 35c324f150686..59996c9733c6f 100644 --- a/src/mds/MDCache.h +++ b/src/mds/MDCache.h @@ -39,6 +39,7 @@ class Renamer; class Logger; class Message; +class Session; class MMDSResolve; class MMDSResolveAck; @@ -80,6 +81,7 @@ struct PVList { */ struct MDRequest { metareqid_t reqid; + Session *session; // -- i am a client (master) request MClientRequest *client_request; // client request (if any) @@ -116,6 +118,8 @@ struct MDRequest { bool committing; bool aborted; + // break rarely-used fields into a separately allocated structure + // to save memory for most ops struct More { set slaves; // mds nodes that have slave requests to me (implies client_request) set waiting_on_slave; // peers i'm waiting for slavereq replies from. @@ -148,19 +152,19 @@ struct MDRequest { // --------------------------------------------------- MDRequest() : - client_request(0), ref(0), + session(0), client_request(0), ref(0), slave_request(0), slave_to_mds(-1), ls(0), done_locking(false), committing(false), aborted(false), _more(0) {} MDRequest(metareqid_t ri, MClientRequest *req) : - reqid(ri), client_request(req), ref(0), + reqid(ri), session(0), client_request(req), ref(0), slave_request(0), slave_to_mds(-1), ls(0), done_locking(false), committing(false), aborted(false), _more(0) {} MDRequest(metareqid_t ri, int by) : - reqid(ri), client_request(0), ref(0), + reqid(ri), session(0), client_request(0), ref(0), slave_request(0), slave_to_mds(by), ls(0), done_locking(false), committing(false), aborted(false), diff --git a/src/mds/MDS.cc b/src/mds/MDS.cc index 28ffc0d755ed9..cabca2dd284a4 100644 --- a/src/mds/MDS.cc +++ b/src/mds/MDS.cc @@ -71,7 +71,7 @@ // cons/des MDS::MDS(int whoami, Messenger *m, MonMap *mm) : timer(mds_lock), - clientmap(this), sessionmap(this) { + sessionmap(this) { this->whoami = whoami; @@ -281,7 +281,7 @@ void MDS::send_message_client(Message *m, int client) { version_t seq = sessionmap.inc_push_seq(client); dout(10) << "send_message_client client" << client << " seq " << seq << " " << *m << dendl; - messenger->send_message(m, sessionmap.get_inst(entity_name_t::CLIENT(client))); + messenger->send_message(m, sessionmap.get_session(entity_name_t::CLIENT(client))->inst); } void MDS::send_message_client(Message *m, entity_inst_t clientinst) @@ -653,12 +653,12 @@ void MDS::bcast_mds_map() dout(7) << "bcast_mds_map " << mdsmap->get_epoch() << dendl; // share the map with mounted clients - set clients; - sessionmap.get_session_set(clients); - for (set::const_iterator p = clients.begin(); + set clients; + sessionmap.get_client_session_set(clients); + for (set::const_iterator p = clients.begin(); p != clients.end(); ++p) - messenger->send_message(new MMDSMap(mdsmap), sessionmap.get_inst(*p)); + messenger->send_message(new MMDSMap(mdsmap), (*p)->inst); last_client_mdsmap_bcast = mdsmap->get_epoch(); } @@ -754,7 +754,7 @@ void MDS::boot_create() idalloc->reset(); idalloc->save(fin->new_sub()); - // write empty clientmap + // write empty sessionmap sessionmap.save(fin->new_sub()); // fixme: fake out anchortable diff --git a/src/mds/MDS.h b/src/mds/MDS.h index f0f5316fc6eb6..5a69301e681b0 100644 --- a/src/mds/MDS.h +++ b/src/mds/MDS.h @@ -200,7 +200,6 @@ class MDS : public Dispatcher { void reset_tick(); // -- client map -- - ClientMap clientmap; SessionMap sessionmap; epoch_t last_client_mdsmap_bcast; //void log_clientmap(Context *c); diff --git a/src/mds/Migrator.cc b/src/mds/Migrator.cc index 78cd7fcfcbcc0..5565d541bdf03 100644 --- a/src/mds/Migrator.cc +++ b/src/mds/Migrator.cc @@ -885,7 +885,7 @@ void Migrator::encode_export_inode_caps(CInode *in, bufferlist& bl, for (map::iterator it = in->client_caps.begin(); it != in->client_caps.end(); it++) - exported_client_map[it->first] = mds->clientmap.get_inst(it->first); + exported_client_map[it->first] = mds->sessionmap.get_inst(entity_name_t::CLIENT(it->first)); } void Migrator::finish_export_inode_caps(CInode *in) @@ -2276,7 +2276,7 @@ void Migrator::handle_export_caps(MExportCaps *ex) */ C_M_LoggedImportCaps *finish = new C_M_LoggedImportCaps(this, in, ex->get_source().num()); - ESessions *le = new ESessions(mds->clientmap.inc_projected()); + ESessions *le = new ESessions(++mds->sessionmap.projected); // decode new caps bufferlist::iterator blp = ex->cap_bl.begin(); diff --git a/src/mds/Server.cc b/src/mds/Server.cc index 602f3d2662c96..2e5ae550cc207 100644 --- a/src/mds/Server.cc +++ b/src/mds/Server.cc @@ -123,15 +123,15 @@ void Server::dispatch(Message *m) class C_MDS_session_finish : public Context { MDS *mds; - entity_inst_t client_inst; + Session *session; bool open; version_t cmapv; public: - C_MDS_session_finish(MDS *m, entity_inst_t ci, bool s, version_t mv) : - mds(m), client_inst(ci), open(s), cmapv(mv) { } + C_MDS_session_finish(MDS *m, Session *se, bool s, version_t mv) : + mds(m), session(se), open(s), cmapv(mv) { } void finish(int r) { assert(r == 0); - mds->server->_session_logged(client_inst, open, cmapv); + mds->server->_session_logged(session, open, cmapv); } }; @@ -139,41 +139,38 @@ public: void Server::handle_client_session(MClientSession *m) { dout(3) << "handle_client_session " << *m << " from " << m->get_source() << dendl; - int from = m->get_source().num(); bool open = false; Session *session = mds->sessionmap.get_session(m->get_source()); + assert(m->get_source().is_client()); // should _not_ come from an mds! switch (m->op) { case MClientSession::OP_REQUEST_OPEN: open = true; - if (!session) { - dout(10) << "already open, dropping this req" << dendl; + if (session && (session->is_opening() || session->is_open())) { + dout(10) << "already open|opening, dropping this req" << dendl; delete m; return; } - if (mds->clientmap.is_opening(from)) { - dout(10) << "already opening, dropping this req" << dendl; - delete m; - return; - } - mds->clientmap.add_opening(from); + assert(!session); // ? + session = mds->sessionmap.get_or_add_session(m->get_source()); + session->inst = m->get_source_inst(); + session->state = Session::STATE_OPENING; break; case MClientSession::OP_REQUEST_CLOSE: - if (mds->clientmap.is_closing(from)) { - dout(10) << "already closing, dropping this req" << dendl; + if (!session || session->is_closing()) { + dout(10) << "already closing|dne, dropping this req" << dendl; delete m; return; } - if (m->seq < mds->clientmap.get_push_seq(from)) { - dout(10) << "old push seq " << m->seq << " < " << mds->clientmap.get_push_seq(from) + if (m->seq < session->get_push_seq()) { + dout(10) << "old push seq " << m->seq << " < " << session->get_push_seq() << ", dropping" << dendl; delete m; return; } - assert(m->seq == mds->clientmap.get_push_seq(from)); - - mds->clientmap.add_closing(from); + assert(m->seq == session->get_push_seq()); + session->state = Session::STATE_CLOSING; break; default: @@ -181,64 +178,76 @@ void Server::handle_client_session(MClientSession *m) } // journal it - version_t cmapv = mds->clientmap.inc_projected(); - dout(10) << " clientmap v " << mds->clientmap.get_version() << " pv " << cmapv << dendl; - mdlog->submit_entry(new ESession(m->get_source_inst(), open, cmapv), - new C_MDS_session_finish(mds, m->get_source_inst(), open, cmapv)); + version_t pv = ++mds->sessionmap.projected; + dout(10) << " sessionmap v " << mds->sessionmap.version << " pv " << pv << dendl; + mdlog->submit_entry(new ESession(m->get_source_inst(), open, pv), + new C_MDS_session_finish(mds, session, open, pv)); delete m; if (logger) logger->inc("hcsess"); } -void Server::_session_logged(entity_inst_t client_inst, bool open, version_t cmapv) +void Server::_session_logged(Session *session, bool open, version_t pv) { - dout(10) << "_session_logged " << client_inst << " " << (open ? "open":"close") - << " " << cmapv + dout(10) << "_session_logged " << session->inst << " " << (open ? "open":"close") + << " " << pv << dendl; // apply - int from = client_inst.name.num(); if (open) { - assert(mds->clientmap.is_opening(from)); - mds->clientmap.open_session(client_inst); - } else if (mds->clientmap.is_closing(from)) { - mds->clientmap.close_session(from); - - // purge completed requests from clientmap - mds->clientmap.trim_completed_requests(client_inst.name, 0); + assert(session->is_opening()); + session->state = Session::STATE_OPEN; + mds->sessionmap.version++; + } else if (session->is_closing()) { + mds->sessionmap.remove_session(session); + mds->sessionmap.version++; } else { // close must have been canceled (by an import?) ... assert(!open); - mds->clientmap.noop(); + mds->sessionmap.version++; // noop } - assert(cmapv == mds->clientmap.get_version()); + assert(pv == mds->sessionmap.version); // reply if (open) - mds->messenger->send_message(new MClientSession(MClientSession::OP_OPEN), client_inst); + mds->messenger->send_message(new MClientSession(MClientSession::OP_OPEN), session->inst); else - mds->messenger->send_message(new MClientSession(MClientSession::OP_CLOSE), client_inst); + mds->messenger->send_message(new MClientSession(MClientSession::OP_CLOSE), session->inst); } void Server::prepare_force_open_sessions(map& cm) { - version_t cmapv = mds->clientmap.inc_projected(); - dout(10) << "prepare_force_open_sessions " << cmapv + /* + * FIXME ! + */ + version_t pv = ++mds->sessionmap.projected; + dout(10) << "prepare_force_open_sessions " << pv << " on " << cm.size() << " clients" << dendl; for (map::iterator p = cm.begin(); p != cm.end(); ++p) { - mds->clientmap.add_opening(p->first); + Session *session = mds->sessionmap.get_or_add_session(p->second.name); + if (session->state == Session::STATE_UNDEF || + session->state == Session::STATE_CLOSING) + session->state = Session::STATE_OPENING; } } void Server::finish_force_open_sessions(map& cm) { - version_t v = mds->clientmap.get_version(); - dout(10) << "finish_force_open_sessions on " << cm.size() << " clients, v " << v << " -> " << (v+1) << dendl; + /* + * FIXME: need to carefully consider the race conditions between a + * client trying to close a session and an MDS doing an import + * trying to force open a session... + */ + dout(10) << "finish_force_open_sessions on " << cm.size() << " clients," + << " v " << mds->sessionmap.version << " -> " << (mds->sessionmap.version+1) << dendl; for (map::iterator p = cm.begin(); p != cm.end(); ++p) { - if (mds->clientmap.is_closing(p->first)) { - dout(15) << "force_open_sessions canceling close on " << p->second << dendl; + Session *session = mds->sessionmap.get_session(p->second.name); + assert(session); + /* + if (session->is_closing()) { + dout(15) << "force_open_sessions canceling close on " << session->inst << dendl; mds->clientmap.remove_closing(p->first); continue; } @@ -246,12 +255,14 @@ void Server::finish_force_open_sessions(map& cm) dout(15) << "force_open_sessions have session " << p->second << dendl; continue; } - - dout(10) << "force_open_sessions opening " << p->second << dendl; - mds->clientmap.open_session(p->second); - mds->messenger->send_message(new MClientSession(MClientSession::OP_OPEN), p->second); + */ + if (session->is_opening()) { + dout(10) << "force_open_sessions opening " << session->inst << dendl; + session->state = Session::STATE_OPEN; + mds->messenger->send_message(new MClientSession(MClientSession::OP_OPEN), session->inst); + } } - mds->clientmap.set_version(v+1); + mds->sessionmap.version++; } @@ -260,35 +271,34 @@ void Server::terminate_sessions() dout(2) << "terminate_sessions" << dendl; // kill them off. clients will retry etc. - for (set::const_iterator p = mds->clientmap.get_session_set().begin(); - p != mds->clientmap.get_session_set().end(); + set sessions; + mds->sessionmap.get_client_session_set(sessions); + for (set::const_iterator p = sessions.begin(); + p != sessions.end(); ++p) { - if (mds->clientmap.is_closing(*p)) - continue; - mds->clientmap.add_closing(*p); - version_t cmapv = mds->clientmap.inc_projected(); - mdlog->submit_entry(new ESession(mds->clientmap.get_inst(*p), false, cmapv), - new C_MDS_session_finish(mds, mds->clientmap.get_inst(*p), false, cmapv)); + Session *session = *p; + if (session->is_closing()) continue; + session->state = Session::STATE_CLOSING; + version_t pv = ++mds->sessionmap.projected; + mdlog->submit_entry(new ESession(session->inst, false, pv), + new C_MDS_session_finish(mds, session, false, pv)); } } void Server::reconnect_clients() { - // reconnect with clients - if (mds->clientmap.get_session_set().empty()) { + mds->sessionmap.get_client_set(client_reconnect_gather); + + if (client_reconnect_gather.empty()) { dout(7) << "reconnect_clients -- no sessions, doing nothing." << dendl; reconnect_gather_finish(); return; } dout(7) << "reconnect_clients -- sending mdsmap to clients with sessions" << dendl; - mds->bcast_mds_map(); // send mdsmap to all client sessions - - // init gather list reconnect_start = g_clock.now(); - client_reconnect_gather = mds->clientmap.get_session_set(); dout(1) << "reconnect_clients -- " << client_reconnect_gather.size() << " sessions" << dendl; } @@ -296,15 +306,14 @@ void Server::handle_client_reconnect(MClientReconnect *m) { dout(7) << "handle_client_reconnect " << m->get_source() << dendl; int from = m->get_source().num(); + Session *session = mds->sessionmap.get_session(m->get_source()); if (m->closed) { - dout(7) << " client had no session, removing from clientmap" << dendl; - - mds->clientmap.add_closing(from); - version_t cmapv = mds->clientmap.inc_projected(); - mdlog->submit_entry(new ESession(mds->clientmap.get_inst(from), false, cmapv), - new C_MDS_session_finish(mds, mds->clientmap.get_inst(from), false, cmapv)); - + dout(7) << " client had no session, removing from session map" << dendl; + assert(session); // ? + version_t pv = ++mds->sessionmap.projected; + mdlog->submit_entry(new ESession(session->inst, false, pv), + new C_MDS_session_finish(mds, session, false, pv)); } else { // caps @@ -439,9 +448,9 @@ void Server::reply_request(MDRequest *mdr, MClientReply *reply, CInode *tracei) << " (" << strerror(-reply->get_result()) << ") " << *req << dendl; - // note result code in clientmap? + // note result code in session map? if (!req->is_idempotent()) - mds->clientmap.add_completed_request(mdr->reqid); + mdr->session->add_completed_request(mdr->reqid.tid); /* if (tracei && !tracei->hack_accessed) { @@ -512,8 +521,9 @@ void Server::handle_client_request(MClientRequest *req) } // active session? + Session *session = 0; if (req->get_client_inst().name.is_client() && - !mds->clientmap.have_session(req->get_client_inst().name.num())) { + !(session = mds->sessionmap.get_session(req->get_client_inst().name))) { dout(5) << "no session for " << req->get_client_inst().name << ", dropping" << dendl; delete req; return; @@ -530,7 +540,8 @@ void Server::handle_client_request(MClientRequest *req) // retry? if (req->get_retry_attempt()) { - if (mds->clientmap.have_completed_request(req->get_reqid())) { + assert(session); + if (session->have_completed_request(req->get_reqid().tid)) { dout(5) << "already completed " << req->get_reqid() << dendl; mds->messenger->send_message(new MClientReply(req, 0), req->get_client_inst()); delete req; @@ -540,13 +551,13 @@ void Server::handle_client_request(MClientRequest *req) // trim completed_request list if (req->get_oldest_client_tid() > 0) { dout(15) << " oldest_client_tid=" << req->get_oldest_client_tid() << dendl; - mds->clientmap.trim_completed_requests(req->get_client_inst().name, - req->get_oldest_client_tid()); + session->trim_completed_requests(req->get_oldest_client_tid()); } // register + dispatch MDRequest *mdr = mdcache->request_start(req); if (!mdr) return; + mdr->session = session; if (ref) { dout(10) << "inode op on ref " << *ref << dendl; diff --git a/src/mds/Server.h b/src/mds/Server.h index d2252f33df7bc..27d3651764a70 100644 --- a/src/mds/Server.h +++ b/src/mds/Server.h @@ -56,7 +56,7 @@ public: set reconnected_caps; void handle_client_session(class MClientSession *m); - void _session_logged(entity_inst_t ci, bool open, version_t cmapv); + void _session_logged(Session *session, bool open, version_t cmapv); void prepare_force_open_sessions(map &cm); void finish_force_open_sessions(map &cm); void terminate_sessions(); diff --git a/src/mds/SessionMap.h b/src/mds/SessionMap.h index c4aa9472459b4..d3e8cb0dc1b9a 100644 --- a/src/mds/SessionMap.h +++ b/src/mds/SessionMap.h @@ -28,20 +28,23 @@ using __gnu_cxx::hash_map; class CInode; class Session { - // -- state -- + // -- state etc -- public: + static const int STATE_UNDEF = 0; static const int STATE_OPENING = 1; static const int STATE_OPEN = 2; static const int STATE_CLOSING = 3; static const int STATE_STALE = 4; // ? static const int STATE_RECONNECTING = 5; -private: int state; utime_t last_alive; // last alive -public: entity_inst_t inst; + bool is_opening() { return state == STATE_OPENING; } + bool is_open() { return state == STATE_OPEN; } + bool is_closing() { return state == STATE_CLOSING; } + // -- caps -- private: version_t cap_push_seq; // cap push seq # @@ -75,19 +78,25 @@ public: } finish_contexts(fls); } - void add_trim_waiter(metareqid_t ri, Context *c) { - waiting_for_trim[ri.tid] = c; + void add_trim_waiter(tid_t tid, Context *c) { + waiting_for_trim[tid] = c; } - bool have_completed_request(metareqid_t ri) const { - return completed_requests.count(ri.tid); + bool have_completed_request(tid_t tid) const { + return completed_requests.count(tid); } + + Session() : + state(STATE_UNDEF), + cap_push_seq(0) { } }; class SessionMap { private: MDS *mds; hash_map session_map; + +public: // i am lazy version_t version, projected, committing, committed; map > commit_waiters; @@ -96,25 +105,53 @@ public: version(0), projected(0), committing(0), committed(0) { } + // sessions bool empty() { return session_map.empty(); } - Session* get_session(entity_name_t w) { if (session_map.count(w)) return &session_map[w]; return 0; } - Session* add_session(entity_name_t w) { + Session* get_or_add_session(entity_name_t w) { return &session_map[w]; } - void remove_session(entity_name_t w) { - session_map.erase(w); + Session* get_or_add_session(entity_inst_t i) { + Session *s = get_or_add_session(i.name); + s->inst = i; + return s; + } + void remove_session(Session *s) { + s->trim_completed_requests(0); + session_map.erase(s->inst.name); } - void get_session_set(set& s) { + void get_client_set(set& s) { for (hash_map::iterator p = session_map.begin(); p != session_map.end(); p++) - s.insert(p->first); + if (p->second.inst.name.is_client()) + s.insert(p->second.inst.name.num()); + } + void get_client_session_set(set& s) { + for (hash_map::iterator p = session_map.begin(); + p != session_map.end(); + p++) + if (p->second.inst.name.is_client()) + s.insert(&p->second); + } + + void open_sessions(map& client_map) { + for (map::iterator p = client_map.begin(); + p != client_map.end(); + ++p) { + Session *session = get_or_add_session(p->second); + if (session->state == Session::STATE_UNDEF || + session->state == Session::STATE_CLOSING) { + session->inst = p->second; + session->state = Session::STATE_OPEN; + } + } + version++; } // helpers @@ -128,7 +165,21 @@ public: version_t get_push_seq(int client) { return get_session(entity_name_t::CLIENT(client))->get_push_seq(); } - + bool have_completed_request(metareqid_t rid) { + Session *session = get_session(rid.name); + return session && session->have_completed_request(rid.tid); + } + void add_completed_request(metareqid_t rid) { + Session *session = get_session(rid.name); + assert(session); + session->add_completed_request(rid.tid); + } + void trim_completed_requests(entity_name_t c, tid_t tid) { + Session *session = get_session(c); + assert(session); + session->trim_completed_requests(tid); + } + // -- loading, saving -- inode_t inode; list waiting_for_load; diff --git a/src/mds/journal.cc b/src/mds/journal.cc index 3b39679dcd61f..4e1b50b3a4cb5 100644 --- a/src/mds/journal.cc +++ b/src/mds/journal.cc @@ -158,14 +158,14 @@ C_Gather *LogSegment::try_to_expire(MDS *mds) mds->idalloc->save(gather->new_sub(), allocv); } - // clientmap - if (clientmapv > mds->clientmap.get_committed()) { - dout(10) << "try_to_expire saving clientmap, need " << clientmapv - << ", committed is " << mds->clientmap.get_committed() - << " (" << mds->clientmap.get_committing() << ")" + // sessionmap + if (sessionmapv > mds->sessionmap.committed) { + dout(10) << "try_to_expire saving sessionmap, need " << sessionmapv + << ", committed is " << mds->sessionmap.committed + << " (" << mds->sessionmap.committing << ")" << dendl; if (!gather) gather = new C_Gather; - mds->clientmap.save(gather->new_sub(), clientmapv); + mds->sessionmap.save(gather->new_sub(), sessionmapv); } // pending commit atids @@ -362,7 +362,7 @@ bool EMetaBlob::has_expired(MDS *mds) for (list::iterator p = client_reqs.begin(); p != client_reqs.end(); ++p) { - if (mds->clientmap.have_completed_request(*p)) { + if (mds->sessionmap.have_completed_request(*p)) { dout(10) << "EMetaBlob.has_expired still have completed request " << *p << dendl; return false; @@ -504,10 +504,10 @@ void EMetaBlob::expire(MDS *mds, Context *c) for (list::iterator p = client_reqs.begin(); p != client_reqs.end(); ++p) { - if (mds->clientmap.have_completed_request(*p)) { + if (mds->sessionmap.have_completed_request(*p)) { dout(10) << "EMetaBlob.expire waiting on completed request " << *p << dendl; - mds->clientmap.add_trim_waiter(*p, gather->new_sub()); + mds->sessionmap.add_trim_waiter(*p, gather->new_sub()); } } @@ -740,7 +740,7 @@ void EMetaBlob::replay(MDS *mds, LogSegment *logseg) for (list::iterator p = client_reqs.begin(); p != client_reqs.end(); ++p) - mds->clientmap.add_completed_request(*p); + mds->sessionmap.add_completed_request(*p); // update segment @@ -752,49 +752,52 @@ void EMetaBlob::replay(MDS *mds, LogSegment *logseg) void ESession::update_segment() { - _segment->clientmapv = cmapv; + _segment->sessionmapv = cmapv; } void ESession::replay(MDS *mds) { - if (mds->clientmap.get_version() >= cmapv) { - dout(10) << "ESession.replay clientmap " << mds->clientmap.get_version() + if (mds->sessionmap.version >= cmapv) { + dout(10) << "ESession.replay sessionmap " << mds->sessionmap.version << " >= " << cmapv << ", noop" << dendl; // hrm, this isn't very pretty. if (!open) - mds->clientmap.trim_completed_requests(client_inst.name, 0); + mds->sessionmap.trim_completed_requests(client_inst.name, 0); } else { - dout(10) << "ESession.replay clientmap " << mds->clientmap.get_version() + dout(10) << "ESession.replay sessionmap " << mds->sessionmap.version << " < " << cmapv << dendl; - assert(mds->clientmap.get_version() + 1 == cmapv); + mds->sessionmap.projected = ++mds->sessionmap.version; + assert(mds->sessionmap.version == cmapv); if (open) { - mds->clientmap.open_session(client_inst); + Session *session = mds->sessionmap.get_or_add_session(client_inst.name); + session->inst = client_inst; + session->state = Session::STATE_OPEN; } else { - mds->clientmap.close_session(client_inst.name.num()); - mds->clientmap.trim_completed_requests(client_inst.name, 0); + Session *session = mds->sessionmap.get_session(client_inst.name); + if (session) + mds->sessionmap.remove_session(session); } - mds->clientmap.reset_projected(); // make it follow version. } } void ESessions::update_segment() { - _segment->clientmapv = cmapv; + _segment->sessionmapv = cmapv; } void ESessions::replay(MDS *mds) { - if (mds->clientmap.get_version() >= cmapv) { - dout(10) << "ESessions.replay clientmap " << mds->clientmap.get_version() + if (mds->sessionmap.version >= cmapv) { + dout(10) << "ESessions.replay sessionmap " << mds->sessionmap.version << " >= " << cmapv << ", noop" << dendl; } else { - dout(10) << "ESessions.replay clientmap " << mds->clientmap.get_version() + dout(10) << "ESessions.replay sessionmap " << mds->sessionmap.version << " < " << cmapv << dendl; - mds->clientmap.open_sessions(client_map); - assert(mds->clientmap.get_version() == cmapv); - mds->clientmap.reset_projected(); // make it follow version. + mds->sessionmap.open_sessions(client_map); + assert(mds->sessionmap.version == cmapv); + mds->sessionmap.projected = mds->sessionmap.version; } } @@ -1077,18 +1080,18 @@ void EImportStart::replay(MDS *mds) mds->mdcache->add_ambiguous_import(base, bounds); // open client sessions? - if (mds->clientmap.get_version() >= cmapv) { - dout(10) << "EImportStart.replay clientmap " << mds->clientmap.get_version() + if (mds->sessionmap.version >= cmapv) { + dout(10) << "EImportStart.replay sessionmap " << mds->sessionmap.version << " >= " << cmapv << ", noop" << dendl; } else { - dout(10) << "EImportStart.replay clientmap " << mds->clientmap.get_version() + dout(10) << "EImportStart.replay sessionmap " << mds->sessionmap.version << " < " << cmapv << dendl; map cm; bufferlist::iterator blp = client_map.begin(); ::_decode_simple(cm, blp); - mds->clientmap.open_sessions(cm); - assert(mds->clientmap.get_version() == cmapv); - mds->clientmap.reset_projected(); // make it follow version. + mds->sessionmap.open_sessions(cm); + assert(mds->sessionmap.version == cmapv); + mds->sessionmap.projected = mds->sessionmap.version; } } -- 2.39.5