From 9cb5742a61c9d872d6cb7e0962f3623de66d41d0 Mon Sep 17 00:00:00 2001 From: John Spray Date: Thu, 12 Feb 2015 10:06:27 +0000 Subject: [PATCH] mds: new OMAP storage for sessionmap Fixes: #10649 Signed-off-by: John Spray --- src/common/config_opts.h | 1 + src/mds/Server.cc | 53 ++++-- src/mds/SessionMap.cc | 346 +++++++++++++++++++++++++++++----- src/mds/SessionMap.h | 132 +++++++++++-- src/mds/journal.cc | 77 ++++---- src/test/encoding/types.h | 3 - src/tools/cephfs/TableTool.cc | 102 +++++++++- 7 files changed, 600 insertions(+), 114 deletions(-) diff --git a/src/common/config_opts.h b/src/common/config_opts.h index 6f3fca80ffc62..909d0c8b2e22f 100644 --- a/src/common/config_opts.h +++ b/src/common/config_opts.h @@ -357,6 +357,7 @@ OPTION(mds_beacon_grace, OPT_FLOAT, 15) OPTION(mds_enforce_unique_name, OPT_BOOL, true) OPTION(mds_blacklist_interval, OPT_FLOAT, 24.0*60.0) // how long to blacklist failed nodes OPTION(mds_session_timeout, OPT_FLOAT, 60) // cap bits and leases time out if client idle +OPTION(mds_sessionmap_keys_per_op, OPT_U32, 1024) // how many sessions should I try to load/store in a single OMAP operation? OPTION(mds_revoke_cap_timeout, OPT_FLOAT, 60) // detect clients which aren't revoking caps OPTION(mds_recall_state_timeout, OPT_FLOAT, 60) // detect clients which aren't trimming caps OPTION(mds_freeze_tree_timeout, OPT_FLOAT, 30) // detecting freeze tree deadlock diff --git a/src/mds/Server.cc b/src/mds/Server.cc index 846ade3cb0aaa..176760b616a96 100644 --- a/src/mds/Server.cc +++ b/src/mds/Server.cc @@ -265,9 +265,9 @@ void Server::handle_client_session(MClientSession *m) if (session->is_closed()) mds->sessionmap.add_session(session); + pv = mds->sessionmap.mark_projected(session); sseq = mds->sessionmap.set_state(session, Session::STATE_OPENING); mds->sessionmap.touch_session(session); - pv = ++mds->sessionmap.projected; mdlog->start_submit_entry(new ESession(m->get_source_inst(), true, pv, m->client_meta), new C_MDS_session_finish(mds, session, sseq, true, pv)); mdlog->flush(); @@ -357,6 +357,8 @@ void Server::_session_logged(Session *session, uint64_t state_seq, bool open, ve dout(10) << "_session_logged " << session->info.inst << " state_seq " << state_seq << " " << (open ? "open":"close") << " " << pv << dendl; + mds->sessionmap.mark_dirty(session); + if (piv) { mds->inotable->apply_release_ids(inos); assert(mds->inotable->get_version() == piv); @@ -430,18 +432,29 @@ void Server::_session_logged(Session *session, uint64_t state_seq, bool open, ve } else { assert(0); } - mds->sessionmap.version++; // noop } +/** + * Inject sessions from some source other than actual connections. + * + * For example: + * - sessions inferred from journal replay + * - sessions learned from other MDSs during rejoin + * - sessions learned from other MDSs during dir/caps migration + * - sessions learned from other MDSs during a cross-MDS rename + */ version_t Server::prepare_force_open_sessions(map& cm, map& sseqmap) { - version_t pv = ++mds->sessionmap.projected; + version_t pv = mds->sessionmap.get_projected(); + dout(10) << "prepare_force_open_sessions " << pv << " on " << cm.size() << " clients" << dendl; for (map::iterator p = cm.begin(); p != cm.end(); ++p) { + Session *session = mds->sessionmap.get_or_add_session(p->second); + pv = mds->sessionmap.mark_projected(session); if (session->is_closed() || session->is_closing() || session->is_killing()) @@ -451,7 +464,6 @@ version_t Server::prepare_force_open_sessions(map& cm, session->is_opening() || session->is_stale()); session->inc_importing(); -// mds->sessionmap.touch_session(session); } return pv; } @@ -466,8 +478,13 @@ void Server::finish_force_open_sessions(map& cm, * trying to force open a session... */ dout(10) << "finish_force_open_sessions on " << cm.size() << " clients," - << " v " << mds->sessionmap.version << " -> " << (mds->sessionmap.version+1) << dendl; + << " initial v " << mds->sessionmap.get_version() << dendl; + + + int sessions_inserted = 0; for (map::iterator p = cm.begin(); p != cm.end(); ++p) { + sessions_inserted++; + Session *session = mds->sessionmap.get_session(p->second.name); assert(session); @@ -487,10 +504,15 @@ void Server::finish_force_open_sessions(map& cm, dout(10) << "force_open_sessions skipping already-open " << session->info.inst << dendl; assert(session->is_open() || session->is_stale()); } - if (dec_import) + + if (dec_import) { session->dec_importing(); + } + + mds->sessionmap.mark_dirty(session); } - mds->sessionmap.version++; + + dout(10) << __func__ << ": final v " << mds->sessionmap.get_version() << dendl; } class C_MDS_TerminatedSessions : public ServerContext { @@ -612,7 +634,7 @@ void Server::kill_session(Session *session, Context *on_safe) void Server::journal_close_session(Session *session, int state, Context *on_safe) { uint64_t sseq = mds->sessionmap.set_state(session, state); - version_t pv = ++mds->sessionmap.projected; + version_t pv = mds->sessionmap.mark_projected(session); version_t piv = 0; // release alloc and pending-alloc inos for this session @@ -2135,7 +2157,8 @@ CInode* Server::prepare_new_inode(MDRequestRef& mdr, CDir *dir, inodeno_t useino if (mdr->session->info.prealloc_inos.size()) { mdr->used_prealloc_ino = in->inode.ino = mdr->session->take_ino(useino); // prealloc -> used - mds->sessionmap.projected++; + mds->sessionmap.mark_projected(mdr->session); + dout(10) << "prepare_new_inode used_prealloc " << mdr->used_prealloc_ino << " (" << mdr->session->info.prealloc_inos << ", " << mdr->session->info.prealloc_inos.size() << " left)" @@ -2159,7 +2182,7 @@ CInode* Server::prepare_new_inode(MDRequestRef& mdr, CDir *dir, inodeno_t useino mds->inotable->project_alloc_ids(mdr->prealloc_inos, got); assert(mdr->prealloc_inos.size()); // or else fix projected increment semantics mdr->session->pending_prealloc_inos.insert(mdr->prealloc_inos); - mds->sessionmap.projected++; + mds->sessionmap.mark_projected(mdr->session); dout(10) << "prepare_new_inode prealloc " << mdr->prealloc_inos << dendl; } @@ -2224,14 +2247,14 @@ CInode* Server::prepare_new_inode(MDRequestRef& mdr, CDir *dir, inodeno_t useino void Server::journal_allocated_inos(MDRequestRef& mdr, EMetaBlob *blob) { - dout(20) << "journal_allocated_inos sessionmapv " << mds->sessionmap.projected + dout(20) << "journal_allocated_inos sessionmapv " << mds->sessionmap.get_projected() << " inotablev " << mds->inotable->get_projected_version() << dendl; blob->set_ino_alloc(mdr->alloc_ino, mdr->used_prealloc_ino, mdr->prealloc_inos, mdr->client_request->get_source(), - mds->sessionmap.projected, + mds->sessionmap.get_projected(), mds->inotable->get_projected_version()); } @@ -2249,13 +2272,13 @@ void Server::apply_allocated_inos(MDRequestRef& mdr) assert(session); session->pending_prealloc_inos.subtract(mdr->prealloc_inos); session->info.prealloc_inos.insert(mdr->prealloc_inos); - mds->sessionmap.version++; + mds->sessionmap.mark_dirty(session); mds->inotable->apply_alloc_ids(mdr->prealloc_inos); } if (mdr->used_prealloc_ino) { assert(session); session->info.used_inos.erase(mdr->used_prealloc_ino); - mds->sessionmap.version++; + mds->sessionmap.mark_dirty(session); } } @@ -6125,7 +6148,7 @@ void Server::handle_client_rename(MDRequestRef& mdr) _rename_prepare(mdr, &le->metablob, &le->client_map, srcdn, destdn, straydn); if (le->client_map.length()) - le->cmapv = mds->sessionmap.projected; + le->cmapv = mds->sessionmap.get_projected(); // -- commit locally -- C_MDS_rename_finish *fin = new C_MDS_rename_finish(mds, mdr, srcdn, destdn, straydn); diff --git a/src/mds/SessionMap.cc b/src/mds/SessionMap.cc index 4fa43d9cd544a..6651ac76e72af 100644 --- a/src/mds/SessionMap.cc +++ b/src/mds/SessionMap.cc @@ -69,13 +69,145 @@ object_t SessionMap::get_object_name() class C_IO_SM_Load : public SessionMapIOContext { public: - bufferlist bl; - C_IO_SM_Load(SessionMap *cm) : SessionMapIOContext(cm) {} + const bool first; //< Am I the initial (header) load? + int header_r; //< Return value from OMAP header read + int values_r; //< Return value from OMAP value read + bufferlist header_bl; + std::map session_vals; + + C_IO_SM_Load(SessionMap *cm, const bool f) + : SessionMapIOContext(cm), first(f), header_r(0), values_r(0) {} + void finish(int r) { - sessionmap->_load_finish(r, bl); + sessionmap->_load_finish(r, header_r, values_r, first, header_bl, session_vals); } }; + +/** + * Decode OMAP header. Call this once when loading. + */ +void SessionMapStore::decode_header( + bufferlist &header_bl) +{ + bufferlist::iterator q = header_bl.begin(); + DECODE_START(1, q) + ::decode(version, q); + DECODE_FINISH(q); +} + +void SessionMapStore::encode_header( + bufferlist *header_bl) +{ + ENCODE_START(1, 1, *header_bl); + ::encode(version, *header_bl); + ENCODE_FINISH(*header_bl); +} + +/** + * Decode and insert some serialized OMAP values. Call this + * repeatedly to insert batched loads. + */ +void SessionMapStore::decode_values(std::map &session_vals) +{ + for (std::map::iterator i = session_vals.begin(); + i != session_vals.end(); ++i) { + + entity_inst_t inst; + + bool parsed = inst.name.parse(i->first); + if (!parsed) { + derr << "Corrupt entity name '" << i->first << "' in sessionmap" << dendl; + throw buffer::malformed_input("Corrupt entity name in sessionmap"); + } + + Session *s = get_or_add_session(inst); + if (s->is_closed()) + s->set_state(Session::STATE_OPEN); + bufferlist::iterator q = i->second.begin(); + s->decode(q); + } +} + +/** + * An OMAP read finished. + */ +void SessionMap::_load_finish( + int operation_r, + int header_r, + int values_r, + bool first, + bufferlist &header_bl, + std::map &session_vals) +{ + if (operation_r < 0) { + derr << "_load_finish got " << cpp_strerror(operation_r) << dendl; + assert(0 == "failed to load sessionmap"); + } + + // Decode header + if (first) { + if (header_r != 0) { + derr << __func__ << ": header error: " << cpp_strerror(header_r) << dendl; + assert(0 == "error reading header!"); + } + + if(header_bl.length() == 0) { + dout(4) << __func__ << ": header missing, loading legacy..." << dendl; + load_legacy(); + return; + } + + decode_header(header_bl); + dout(10) << __func__ << " loaded version " << version << dendl; + } + + if (values_r != 0) { + derr << __func__ << ": error reading values: " + << cpp_strerror(values_r) << dendl; + assert(0 == "error reading values"); + } + + // Decode session_vals + decode_values(session_vals); + + if (session_vals.size() == g_conf->mds_sessionmap_keys_per_op) { + // Issue another read if we're not at the end of the omap + const std::string last_key = session_vals.rbegin()->first; + dout(10) << __func__ << ": continue omap load from '" + << last_key << "'" << dendl; + object_t oid = get_object_name(); + object_locator_t oloc(mds->mdsmap->get_metadata_pool()); + C_IO_SM_Load *c = new C_IO_SM_Load(this, false); + ObjectOperation op; + op.omap_get_vals(last_key, "", g_conf->mds_sessionmap_keys_per_op, + &c->session_vals, &c->values_r); + mds->objecter->read(oid, oloc, op, CEPH_NOSNAP, NULL, 0, + new C_OnFinisher(c, &mds->finisher)); + } else { + // I/O is complete. Update `by_state` + dout(10) << __func__ << ": omap load complete" << dendl; + for (ceph::unordered_map::iterator i = session_map.begin(); + i != session_map.end(); ++i) { + Session *s = i->second; + if (by_state.count(s->get_state()) == 0) + by_state[s->get_state()] = new xlist; + by_state[s->get_state()]->push_back(&s->item_session_list); + } + + // Population is complete. Trigger load waiters. + dout(10) << __func__ << ": v " << version + << ", " << session_map.size() << " sessions" << dendl; + projected = committing = committed = version; + dump(); + finish_contexts(g_ceph_context, waiting_for_load); + } +} + +/** + * Populate session state from OMAP records in this + * rank's sessionmap object. + */ void SessionMap::load(MDSInternalContextBase *onload) { dout(10) << "load" << dendl; @@ -83,14 +215,47 @@ void SessionMap::load(MDSInternalContextBase *onload) if (onload) waiting_for_load.push_back(onload); - C_IO_SM_Load *c = new C_IO_SM_Load(this); + C_IO_SM_Load *c = new C_IO_SM_Load(this, true); object_t oid = get_object_name(); object_locator_t oloc(mds->mdsmap->get_metadata_pool()); + + ObjectOperation op; + op.omap_get_header(&c->header_bl, &c->header_r); + op.omap_get_vals("", "", g_conf->mds_sessionmap_keys_per_op, + &c->session_vals, &c->values_r); + + mds->objecter->read(oid, oloc, op, CEPH_NOSNAP, NULL, 0, new C_OnFinisher(c, &mds->finisher)); +} + +class C_IO_SM_LoadLegacy : public SessionMapIOContext { +public: + bufferlist bl; + C_IO_SM_LoadLegacy(SessionMap *cm) : SessionMapIOContext(cm) {} + void finish(int r) { + sessionmap->_load_legacy_finish(r, bl); + } +}; + + +/** + * Load legacy (object data blob) SessionMap format, assuming + * that waiting_for_load has already been populated with + * the relevant completion. This is the fallback if we do not + * find an OMAP header when attempting to load normally. + */ +void SessionMap::load_legacy() +{ + dout(10) << __func__ << dendl; + + C_IO_SM_LoadLegacy *c = new C_IO_SM_LoadLegacy(this); + object_t oid = get_object_name(); + object_locator_t oloc(mds->mdsmap->get_metadata_pool()); + mds->objecter->read_full(oid, oloc, CEPH_NOSNAP, &c->bl, 0, new C_OnFinisher(c, &mds->finisher)); } -void SessionMap::_load_finish(int r, bufferlist &bl) +void SessionMap::_load_legacy_finish(int r, bufferlist &bl) { bufferlist::iterator blp = bl.begin(); if (r < 0) { @@ -98,13 +263,24 @@ void SessionMap::_load_finish(int r, bufferlist &bl) assert(0 == "failed to load sessionmap"); } dump(); - decode(blp); // note: this sets last_cap_renew = now() + decode_legacy(blp); // note: this sets last_cap_renew = now() dout(10) << "_load_finish v " << version << ", " << session_map.size() << " sessions, " << bl.length() << " bytes" << dendl; projected = committing = committed = version; dump(); + + // Mark all sessions dirty, so that on next save() we will write + // a complete OMAP version of the data loaded from the legacy format + for (ceph::unordered_map::iterator i = session_map.begin(); + i != session_map.end(); ++i) { + // Don't use mark_dirty because on this occasion we want to ignore the + // keys_per_op limit and do one big write (upgrade must be atomic) + dirty_sessions.insert(i->first); + } + loaded_legacy = true; + finish_contexts(g_ceph_context, waiting_for_load); } @@ -124,7 +300,7 @@ public: void SessionMap::save(MDSInternalContextBase *onsave, version_t needv) { - dout(10) << "save needv " << needv << ", v " << version << dendl; + dout(10) << __func__ << ": needv " << needv << ", v " << version << dendl; if (needv && committing >= needv) { assert(committing > committed); @@ -133,21 +309,76 @@ void SessionMap::save(MDSInternalContextBase *onsave, version_t needv) } commit_waiters[version].push_back(onsave); - - bufferlist bl; - - encode(bl); + committing = version; SnapContext snapc; object_t oid = get_object_name(); object_locator_t oloc(mds->mdsmap->get_metadata_pool()); - mds->objecter->write_full(oid, oloc, - snapc, - bl, ceph_clock_now(g_ceph_context), 0, - NULL, - new C_OnFinisher(new C_IO_SM_Save(this, version), - &mds->finisher)); + ObjectOperation op; + + /* Compose OSD OMAP transaction for full write */ + bufferlist header_bl; + encode_header(&header_bl); + op.omap_set_header(header_bl); + + /* If we loaded a legacy sessionmap, then erase the old data. If + * an old-versioned MDS tries to read it, it'll fail out safely + * with an end_of_buffer exception */ + if (loaded_legacy) { + dout(4) << __func__ << " erasing legacy sessionmap" << dendl; + op.truncate(0); + loaded_legacy = false; // only need to truncate once. + } + + dout(20) << " updating keys:" << dendl; + map to_set; + for(std::set::const_iterator i = dirty_sessions.begin(); + i != dirty_sessions.end(); ++i) { + const entity_name_t name = *i; + const Session *session = session_map[name]; + + if (session->is_open() || + session->is_closing() || + session->is_stale() || + session->is_killing()) { + dout(20) << " " << name << dendl; + // Serialize K + std::ostringstream k; + k << name; + + // Serialize V + bufferlist bl; + session->info.encode(bl); + + // Add to RADOS op + to_set[k.str()] = bl; + } else { + dout(20) << " " << name << " (ignoring)" << dendl; + } + } + if (!to_set.empty()) { + op.omap_set(to_set); + } + + dout(20) << " removing keys:" << dendl; + set to_remove; + for(std::set::const_iterator i = null_sessions.begin(); + i != null_sessions.end(); ++i) { + dout(20) << " " << *i << dendl; + std::ostringstream k; + k << *i; + to_remove.insert(k.str()); + } + if (!to_remove.empty()) { + op.omap_rm_keys(to_remove); + } + + dirty_sessions.clear(); + null_sessions.clear(); + + mds->objecter->mutate(oid, oloc, op, snapc, ceph_clock_now(g_ceph_context), + 0, NULL, new C_OnFinisher(new C_IO_SM_Save(this, version), &mds->finisher)); } void SessionMap::_save_finish(version_t v) @@ -160,37 +391,13 @@ void SessionMap::_save_finish(version_t v) } -// ------------------- - -void SessionMapStore::encode(bufferlist& bl) const -{ - uint64_t pre = -1; // for 0.19 compatibility; we forgot an encoding prefix. - ::encode(pre, bl); - - ENCODE_START(3, 3, bl); - ::encode(version, bl); - - for (ceph::unordered_map::const_iterator p = session_map.begin(); - p != session_map.end(); - ++p) { - if (p->second->is_open() || - p->second->is_closing() || - p->second->is_stale() || - p->second->is_killing()) { - ::encode(p->first, bl); - p->second->info.encode(bl); - } - } - ENCODE_FINISH(bl); -} - /** * Deserialize sessions, and update by_state index */ -void SessionMap::decode(bufferlist::iterator &p) +void SessionMap::decode_legacy(bufferlist::iterator &p) { // Populate `sessions` - SessionMapStore::decode(p); + SessionMapStore::decode_legacy(p); // Update `by_state` for (ceph::unordered_map::iterator i = session_map.begin(); @@ -212,7 +419,7 @@ uint64_t SessionMap::set_state(Session *session, int s) { return session->get_state_seq(); } -void SessionMapStore::decode(bufferlist::iterator& p) +void SessionMapStore::decode_legacy(bufferlist::iterator& p) { utime_t now = ceph_clock_now(g_ceph_context); uint64_t pre; @@ -354,6 +561,10 @@ void SessionMap::remove_session(Session *s) s->item_session_list.remove_myself(); session_map.erase(s->info.inst.name); s->put(); + if (dirty_sessions.count(s->info.inst.name)) { + dirty_sessions.erase(s->info.inst.name); + } + null_sessions.insert(s->info.inst.name); } void SessionMap::touch_session(Session *session) @@ -449,3 +660,50 @@ void Session::decode(bufferlist::iterator &p) _update_human_name(); } +void SessionMap::_mark_dirty(Session *s) +{ + if (dirty_sessions.size() >= g_conf->mds_sessionmap_keys_per_op) { + // Pre-empt the usual save() call from journal segment trim, in + // order to avoid building up an oversized OMAP update operation + // from too many sessions modified at once + save(new C_MDSInternalNoop, version); + } + + dirty_sessions.insert(s->info.inst.name); +} + +void SessionMap::mark_dirty(Session *s) +{ + dout(20) << __func__ << " s=" << s << " name=" << s->info.inst.name + << " v=" << version << dendl; + + _mark_dirty(s); + version++; + s->pop_pv(version); +} + +void SessionMap::replay_dirty_session(Session *s) +{ + dout(20) << __func__ << " s=" << s << " name=" << s->info.inst.name + << " v=" << version << dendl; + + _mark_dirty(s); + + replay_advance_version(); +} + +void SessionMap::replay_advance_version() +{ + version++; + projected = version; +} + +version_t SessionMap::mark_projected(Session *s) +{ + dout(20) << __func__ << " s=" << s << " name=" << s->info.inst.name + << " pv=" << projected << " -> " << projected + 1 << dendl; + ++projected; + s->push_pv(projected); + return projected; +} + diff --git a/src/mds/SessionMap.h b/src/mds/SessionMap.h index c990adacff935..e5a78ff831617 100644 --- a/src/mds/SessionMap.h +++ b/src/mds/SessionMap.h @@ -86,8 +86,27 @@ private: void _update_human_name(); std::string human_name; + // Versions in this this session was projected: used to verify + // that appropriate mark_dirty calls follow. + std::deque projected; + public: + void push_pv(version_t pv) + { + if (!projected.empty()) { + assert(projected.back() != pv); + } + projected.push_back(pv); + } + + void pop_pv(version_t v) + { + assert(!projected.empty()); + assert(projected.front() == v); + projected.pop_front(); + } + inline int get_state() const {return state;} void set_state(int new_state) { @@ -154,12 +173,12 @@ public: int get_state() { return state; } const char *get_state_name() { return get_state_name(state); } uint64_t get_state_seq() { return state_seq; } - bool is_closed() { return state == STATE_CLOSED; } - bool is_opening() { return state == STATE_OPENING; } - bool is_open() { return state == STATE_OPEN; } - bool is_closing() { return state == STATE_CLOSING; } - bool is_stale() { return state == STATE_STALE; } - bool is_killing() { return state == STATE_KILLING; } + bool is_closed() const { return state == STATE_CLOSED; } + bool is_opening() const { return state == STATE_OPENING; } + bool is_open() const { return state == STATE_OPEN; } + bool is_closing() const { return state == STATE_CLOSING; } + bool is_stale() const { return state == STATE_STALE; } + bool is_killing() const { return state == STATE_KILLING; } void inc_importing() { ++importing_count; @@ -253,7 +272,6 @@ public: last_cap_renew = utime_t(); } - }; /* @@ -267,13 +285,18 @@ class MDS; * encode/decode outside of live MDS instance. */ class SessionMapStore { +protected: + version_t version; public: ceph::unordered_map session_map; - version_t version; mds_rank_t rank; - virtual void encode(bufferlist& bl) const; - virtual void decode(bufferlist::iterator& blp); + version_t get_version() const {return version;} + + virtual void encode_header(bufferlist *header_bl); + virtual void decode_header(bufferlist &header_bl); + virtual void decode_values(std::map &session_vals); + virtual void decode_legacy(bufferlist::iterator& blp); void dump(Formatter *f) const; void set_rank(mds_rank_t r) @@ -309,18 +332,45 @@ class SessionMap : public SessionMapStore { public: MDS *mds; -public: // i am lazy +protected: version_t projected, committing, committed; +public: map* > by_state; uint64_t set_state(Session *session, int state); map > commit_waiters; SessionMap(MDS *m) : mds(m), - projected(0), committing(0), committed(0) + projected(0), committing(0), committed(0), + loaded_legacy(false) { } + void set_version(const version_t v) + { + version = projected = v; + } + + void set_projected(const version_t v) + { + projected = v; + } + + version_t get_projected() const + { + return projected; + } + + version_t get_committed() const + { + return committed; + } + + version_t get_committing() const + { + return committed; + } + // sessions - void decode(bufferlist::iterator& blp); + void decode_legacy(bufferlist::iterator& blp); bool empty() { return session_map.empty(); } const ceph::unordered_map &get_sessions() const { @@ -428,10 +478,62 @@ public: // i am lazy object_t get_object_name(); void load(MDSInternalContextBase *onload); - void _load_finish(int r, bufferlist &bl); + void _load_finish( + int operation_r, + int header_r, + int values_r, + bool first, + bufferlist &header_bl, + std::map &session_vals); + + void load_legacy(); + void _load_legacy_finish(int r, bufferlist &bl); + void save(MDSInternalContextBase *onsave, version_t needv=0); void _save_finish(version_t v); - + +protected: + std::set dirty_sessions; + std::set null_sessions; + bool loaded_legacy; + void _mark_dirty(Session *session); +public: + + /** + * Advance the version, and mark this session + * as dirty within the new version. + * + * Dirty means journalled but needing writeback + * to the backing store. Must have called + * mark_projected previously for this session. + */ + void mark_dirty(Session *session); + + /** + * Advance the projected version, and mark this + * session as projected within the new version + * + * Projected means the session is updated in memory + * but we're waiting for the journal write of the update + * to finish. Must subsequently call mark_dirty + * for sessions in the same global order as calls + * to mark_projected. + */ + version_t mark_projected(Session *session); + + /** + * During replay, advance versions to account + * for a session modification, and mark the + * session dirty. + */ + void replay_dirty_session(Session *session); + + /** + * During replay, if a session no longer present + * would have consumed a version, advance `version` + * and `projected` to account for that. + */ + void replay_advance_version(); }; diff --git a/src/mds/journal.cc b/src/mds/journal.cc index 1870ec70832d3..8b14c012a40ef 100644 --- a/src/mds/journal.cc +++ b/src/mds/journal.cc @@ -229,10 +229,10 @@ void LogSegment::try_to_expire(MDS *mds, MDSGatherBuilder &gather_bld, int op_pr } // sessionmap - if (sessionmapv > mds->sessionmap.committed) { + if (sessionmapv > mds->sessionmap.get_committed()) { dout(10) << "try_to_expire saving sessionmap, need " << sessionmapv - << ", committed is " << mds->sessionmap.committed - << " (" << mds->sessionmap.committing << ")" + << ", committed is " << mds->sessionmap.get_committed() + << " (" << mds->sessionmap.get_committing() << ")" << dendl; mds->sessionmap.save(gather_bld.new_sub(), sessionmapv); } @@ -1498,12 +1498,12 @@ void EMetaBlob::replay(MDS *mds, LogSegment *logseg, MDSlaveUpdate *slaveup) } } if (sessionmapv) { - if (mds->sessionmap.version >= sessionmapv) { + if (mds->sessionmap.get_version() >= sessionmapv) { dout(10) << "EMetaBlob.replay sessionmap v " << sessionmapv - << " <= table " << mds->sessionmap.version << dendl; - } else if (mds->sessionmap.version + 2 >= sessionmapv) { + << " <= table " << mds->sessionmap.get_version() << dendl; + } else if (mds->sessionmap.get_version() + 2 >= sessionmapv) { dout(10) << "EMetaBlob.replay sessionmap v " << sessionmapv - << " -(1|2) == table " << mds->sessionmap.version + << " -(1|2) == table " << mds->sessionmap.get_version() << " prealloc " << preallocated_inos << " used " << used_preallocated_ino << dendl; @@ -1524,26 +1524,28 @@ void EMetaBlob::replay(MDS *mds, LogSegment *logseg, MDSlaveUpdate *slaveup) assert(i == used_preallocated_ino); session->info.used_inos.clear(); } - mds->sessionmap.projected = ++mds->sessionmap.version; + mds->sessionmap.replay_dirty_session(session); } if (!preallocated_inos.empty()) { session->info.prealloc_inos.insert(preallocated_inos); - mds->sessionmap.projected = ++mds->sessionmap.version; + mds->sessionmap.replay_dirty_session(session); } + } else { dout(10) << "EMetaBlob.replay no session for " << client_name << dendl; - if (used_preallocated_ino) - mds->sessionmap.projected = ++mds->sessionmap.version; + if (used_preallocated_ino) { + mds->sessionmap.replay_advance_version(); + } if (!preallocated_inos.empty()) - mds->sessionmap.projected = ++mds->sessionmap.version; + mds->sessionmap.replay_advance_version(); } - assert(sessionmapv == mds->sessionmap.version); + assert(sessionmapv == mds->sessionmap.get_version()); } else { mds->clog->error() << "journal replay sessionmap v " << sessionmapv - << " -(1|2) > table " << mds->sessionmap.version << "\n"; + << " -(1|2) > table " << mds->sessionmap.get_version() << "\n"; assert(g_conf->mds_wipe_sessions); mds->sessionmap.wipe(); - mds->sessionmap.version = mds->sessionmap.projected = sessionmapv; + mds->sessionmap.set_version(sessionmapv); } } @@ -1616,14 +1618,12 @@ void ESession::update_segment() void ESession::replay(MDS *mds) { - if (mds->sessionmap.version >= cmapv) { - dout(10) << "ESession.replay sessionmap " << mds->sessionmap.version + if (mds->sessionmap.get_version() >= cmapv) { + dout(10) << "ESession.replay sessionmap " << mds->sessionmap.get_version() << " >= " << cmapv << ", noop" << dendl; } else { - dout(10) << "ESession.replay sessionmap " << mds->sessionmap.version + dout(10) << "ESession.replay sessionmap " << mds->sessionmap.get_version() << " < " << cmapv << " " << (open ? "open":"close") << " " << client_inst << dendl; - mds->sessionmap.projected = ++mds->sessionmap.version; - assert(mds->sessionmap.version == cmapv); Session *session; if (open) { session = mds->sessionmap.get_or_add_session(client_inst); @@ -1636,6 +1636,7 @@ void ESession::replay(MDS *mds) if (session->connection == NULL) { dout(10) << " removed session " << session->info.inst << dendl; mds->sessionmap.remove_session(session); + session = NULL; } else { session->clear(); // the client has reconnected; keep the Session, but reset dout(10) << " reset session " << session->info.inst << " (they reconnected)" << dendl; @@ -1645,6 +1646,12 @@ void ESession::replay(MDS *mds) << " from time " << stamp << ", ignoring"; } } + if (session) { + mds->sessionmap.replay_dirty_session(session); + } else { + mds->sessionmap.replay_advance_version(); + } + assert(mds->sessionmap.get_version() == cmapv); } if (inos.size() && inotablev) { @@ -1769,15 +1776,15 @@ void ESessions::update_segment() void ESessions::replay(MDS *mds) { - if (mds->sessionmap.version >= cmapv) { - dout(10) << "ESessions.replay sessionmap " << mds->sessionmap.version + if (mds->sessionmap.get_version() >= cmapv) { + dout(10) << "ESessions.replay sessionmap " << mds->sessionmap.get_version() << " >= " << cmapv << ", noop" << dendl; } else { - dout(10) << "ESessions.replay sessionmap " << mds->sessionmap.version + dout(10) << "ESessions.replay sessionmap " << mds->sessionmap.get_version() << " < " << cmapv << dendl; mds->sessionmap.open_sessions(client_map); - assert(mds->sessionmap.version == cmapv); - mds->sessionmap.projected = mds->sessionmap.version; + assert(mds->sessionmap.get_version() == cmapv); + mds->sessionmap.set_projected(mds->sessionmap.get_version()); } update_segment(); } @@ -2035,11 +2042,11 @@ void EUpdate::replay(MDS *mds) } if (client_map.length()) { - if (mds->sessionmap.version >= cmapv) { + if (mds->sessionmap.get_version() >= cmapv) { dout(10) << "EUpdate.replay sessionmap v " << cmapv - << " <= table " << mds->sessionmap.version << dendl; + << " <= table " << mds->sessionmap.get_version() << dendl; } else { - dout(10) << "EUpdate.replay sessionmap " << mds->sessionmap.version + dout(10) << "EUpdate.replay sessionmap " << mds->sessionmap.get_version() << " < " << cmapv << dendl; // open client sessions? map cm; @@ -2049,8 +2056,8 @@ void EUpdate::replay(MDS *mds) mds->server->prepare_force_open_sessions(cm, seqm); mds->server->finish_force_open_sessions(cm, seqm); - assert(mds->sessionmap.version == cmapv); - mds->sessionmap.projected = mds->sessionmap.version; + assert(mds->sessionmap.get_version() == cmapv); + mds->sessionmap.set_projected(mds->sessionmap.get_version()); } } } @@ -2854,18 +2861,18 @@ void EImportStart::replay(MDS *mds) mds_authority_t(mds->get_nodeid(), mds->get_nodeid())); // open client sessions? - if (mds->sessionmap.version >= cmapv) { - dout(10) << "EImportStart.replay sessionmap " << mds->sessionmap.version + if (mds->sessionmap.get_version() >= cmapv) { + dout(10) << "EImportStart.replay sessionmap " << mds->sessionmap.get_version() << " >= " << cmapv << ", noop" << dendl; } else { - dout(10) << "EImportStart.replay sessionmap " << mds->sessionmap.version + dout(10) << "EImportStart.replay sessionmap " << mds->sessionmap.get_version() << " < " << cmapv << dendl; map cm; bufferlist::iterator blp = client_map.begin(); ::decode(cm, blp); mds->sessionmap.open_sessions(cm); - assert(mds->sessionmap.version == cmapv); - mds->sessionmap.projected = mds->sessionmap.version; + assert(mds->sessionmap.get_version() == cmapv); + mds->sessionmap.set_projected(mds->sessionmap.get_version()); } update_segment(); } diff --git a/src/test/encoding/types.h b/src/test/encoding/types.h index bcc03472c2d9a..10bc22f2582df 100644 --- a/src/test/encoding/types.h +++ b/src/test/encoding/types.h @@ -180,9 +180,6 @@ TYPE(InoTable) #include "mds/SnapServer.h" TYPEWITHSTRAYDATA(SnapServer) -#include "mds/SessionMap.h" -TYPE(SessionMapStore) - #include "mds/events/ECommitted.h" TYPE(ECommitted) #include "mds/events/EExport.h" diff --git a/src/tools/cephfs/TableTool.cc b/src/tools/cephfs/TableTool.cc index 4b22de04f7231..8d257cfd55e31 100644 --- a/src/tools/cephfs/TableTool.cc +++ b/src/tools/cephfs/TableTool.cc @@ -261,14 +261,112 @@ public: } }; +template +class TableHandlerOmap +{ +private: + // The RADOS object ID for the table + std::string object_name; + + // The rank in question (may be NONE) + mds_rank_t rank; + + // Whether this is an MDSTable subclass (i.e. has leading version field to decode) + bool mds_table; + +public: + TableHandlerOmap(mds_rank_t r, std::string const &name, bool mds_table_) + : rank(r), mds_table(mds_table_) + { + // Compose object name of the table we will dump + std::ostringstream oss; + oss << "mds"; + if (rank != MDS_RANK_NONE) { + oss << rank; + } + oss << "_" << name; + object_name = oss.str(); + } + + int load_and_dump(librados::IoCtx *io, Formatter *f) + { + assert(io != NULL); + assert(f != NULL); + + // Read in the header + bufferlist header_bl; + int r = io->omap_get_header(object_name, &header_bl); + if (r != 0) { + derr << "error reading header: " << cpp_strerror(r) << dendl; + return r; + } + + // Decode the header + A table_inst; + table_inst.set_rank(rank); + try { + table_inst.decode_header(header_bl); + } catch (buffer::error &e) { + derr << "table " << object_name << " is corrupt" << dendl; + return -EIO; + } + + // Read and decode OMAP values in chunks + std::string last_key = ""; + while(true) { + std::map values; + int r = io->omap_get_vals(object_name, last_key, + g_conf->mds_sessionmap_keys_per_op, &values); + + if (r != 0) { + derr << "error reading values: " << cpp_strerror(r) << dendl; + return r; + } + + if (values.empty()) { + break; + } + + try { + table_inst.decode_values(values); + } catch (buffer::error &e) { + derr << "table " << object_name << " is corrupt" << dendl; + return -EIO; + } + last_key = values.rbegin()->first; + } + + table_inst.dump(f); + + return 0; + } + + int reset(librados::IoCtx *io) + { + A table_inst; + table_inst.set_rank(rank); + table_inst.reset_state(); + + bufferlist header_bl; + table_inst.encode_header(&header_bl); + + // Compose a transaction to clear and write header + librados::ObjectWriteOperation op; + op.omap_clear(); + op.omap_set_header(header_bl); + + return io->operate(object_name, &op); + } +}; + int TableTool::_show_session_table(mds_rank_t rank, Formatter *f) { - return TableHandler(rank, "sessionmap", false).load_and_dump(&io, f); + return TableHandlerOmap(rank, "sessionmap", false).load_and_dump(&io, f); } int TableTool::_reset_session_table(mds_rank_t rank, Formatter *f) { - return TableHandler(rank, "sessionmap", false).reset(&io); + return TableHandlerOmap(rank, "sessionmap", false).reset(&io); } int TableTool::_show_ino_table(mds_rank_t rank, Formatter *f) -- 2.39.5