OPTION(mds_enforce_unique_name, OPT_BOOL, true)
OPTION(mds_blacklist_interval, OPT_FLOAT, 24.0*60.0) // how long to blacklist failed nodes
OPTION(mds_session_timeout, OPT_FLOAT, 60) // cap bits and leases time out if client idle
+OPTION(mds_sessionmap_keys_per_op, OPT_U32, 1024) // how many sessions should I try to load/store in a single OMAP operation?
OPTION(mds_revoke_cap_timeout, OPT_FLOAT, 60) // detect clients which aren't revoking caps
OPTION(mds_recall_state_timeout, OPT_FLOAT, 60) // detect clients which aren't trimming caps
OPTION(mds_freeze_tree_timeout, OPT_FLOAT, 30) // detecting freeze tree deadlock
if (session->is_closed())
mds->sessionmap.add_session(session);
+ pv = mds->sessionmap.mark_projected(session);
sseq = mds->sessionmap.set_state(session, Session::STATE_OPENING);
mds->sessionmap.touch_session(session);
- pv = ++mds->sessionmap.projected;
mdlog->start_submit_entry(new ESession(m->get_source_inst(), true, pv, m->client_meta),
new C_MDS_session_finish(mds, session, sseq, true, pv));
mdlog->flush();
dout(10) << "_session_logged " << session->info.inst << " state_seq " << state_seq << " " << (open ? "open":"close")
<< " " << pv << dendl;
+ mds->sessionmap.mark_dirty(session);
+
if (piv) {
mds->inotable->apply_release_ids(inos);
assert(mds->inotable->get_version() == piv);
} else {
assert(0);
}
- mds->sessionmap.version++; // noop
}
+/**
+ * Inject sessions from some source other than actual connections.
+ *
+ * For example:
+ * - sessions inferred from journal replay
+ * - sessions learned from other MDSs during rejoin
+ * - sessions learned from other MDSs during dir/caps migration
+ * - sessions learned from other MDSs during a cross-MDS rename
+ */
version_t Server::prepare_force_open_sessions(map<client_t,entity_inst_t>& cm,
map<client_t,uint64_t>& sseqmap)
{
- version_t pv = ++mds->sessionmap.projected;
+ version_t pv = mds->sessionmap.get_projected();
+
dout(10) << "prepare_force_open_sessions " << pv
<< " on " << cm.size() << " clients"
<< dendl;
for (map<client_t,entity_inst_t>::iterator p = cm.begin(); p != cm.end(); ++p) {
+
Session *session = mds->sessionmap.get_or_add_session(p->second);
+ pv = mds->sessionmap.mark_projected(session);
if (session->is_closed() ||
session->is_closing() ||
session->is_killing())
session->is_opening() ||
session->is_stale());
session->inc_importing();
-// mds->sessionmap.touch_session(session);
}
return pv;
}
* trying to force open a session...
*/
dout(10) << "finish_force_open_sessions on " << cm.size() << " clients,"
- << " v " << mds->sessionmap.version << " -> " << (mds->sessionmap.version+1) << dendl;
+ << " initial v " << mds->sessionmap.get_version() << dendl;
+
+
+ int sessions_inserted = 0;
for (map<client_t,entity_inst_t>::iterator p = cm.begin(); p != cm.end(); ++p) {
+ sessions_inserted++;
+
Session *session = mds->sessionmap.get_session(p->second.name);
assert(session);
dout(10) << "force_open_sessions skipping already-open " << session->info.inst << dendl;
assert(session->is_open() || session->is_stale());
}
- if (dec_import)
+
+ if (dec_import) {
session->dec_importing();
+ }
+
+ mds->sessionmap.mark_dirty(session);
}
- mds->sessionmap.version++;
+
+ dout(10) << __func__ << ": final v " << mds->sessionmap.get_version() << dendl;
}
class C_MDS_TerminatedSessions : public ServerContext {
void Server::journal_close_session(Session *session, int state, Context *on_safe)
{
uint64_t sseq = mds->sessionmap.set_state(session, state);
- version_t pv = ++mds->sessionmap.projected;
+ version_t pv = mds->sessionmap.mark_projected(session);
version_t piv = 0;
// release alloc and pending-alloc inos for this session
if (mdr->session->info.prealloc_inos.size()) {
mdr->used_prealloc_ino =
in->inode.ino = mdr->session->take_ino(useino); // prealloc -> used
- mds->sessionmap.projected++;
+ mds->sessionmap.mark_projected(mdr->session);
+
dout(10) << "prepare_new_inode used_prealloc " << mdr->used_prealloc_ino
<< " (" << mdr->session->info.prealloc_inos
<< ", " << mdr->session->info.prealloc_inos.size() << " left)"
mds->inotable->project_alloc_ids(mdr->prealloc_inos, got);
assert(mdr->prealloc_inos.size()); // or else fix projected increment semantics
mdr->session->pending_prealloc_inos.insert(mdr->prealloc_inos);
- mds->sessionmap.projected++;
+ mds->sessionmap.mark_projected(mdr->session);
dout(10) << "prepare_new_inode prealloc " << mdr->prealloc_inos << dendl;
}
void Server::journal_allocated_inos(MDRequestRef& mdr, EMetaBlob *blob)
{
- dout(20) << "journal_allocated_inos sessionmapv " << mds->sessionmap.projected
+ dout(20) << "journal_allocated_inos sessionmapv " << mds->sessionmap.get_projected()
<< " inotablev " << mds->inotable->get_projected_version()
<< dendl;
blob->set_ino_alloc(mdr->alloc_ino,
mdr->used_prealloc_ino,
mdr->prealloc_inos,
mdr->client_request->get_source(),
- mds->sessionmap.projected,
+ mds->sessionmap.get_projected(),
mds->inotable->get_projected_version());
}
assert(session);
session->pending_prealloc_inos.subtract(mdr->prealloc_inos);
session->info.prealloc_inos.insert(mdr->prealloc_inos);
- mds->sessionmap.version++;
+ mds->sessionmap.mark_dirty(session);
mds->inotable->apply_alloc_ids(mdr->prealloc_inos);
}
if (mdr->used_prealloc_ino) {
assert(session);
session->info.used_inos.erase(mdr->used_prealloc_ino);
- mds->sessionmap.version++;
+ mds->sessionmap.mark_dirty(session);
}
}
_rename_prepare(mdr, &le->metablob, &le->client_map, srcdn, destdn, straydn);
if (le->client_map.length())
- le->cmapv = mds->sessionmap.projected;
+ le->cmapv = mds->sessionmap.get_projected();
// -- commit locally --
C_MDS_rename_finish *fin = new C_MDS_rename_finish(mds, mdr, srcdn, destdn, straydn);
class C_IO_SM_Load : public SessionMapIOContext {
public:
- bufferlist bl;
- C_IO_SM_Load(SessionMap *cm) : SessionMapIOContext(cm) {}
+ const bool first; //< Am I the initial (header) load?
+ int header_r; //< Return value from OMAP header read
+ int values_r; //< Return value from OMAP value read
+ bufferlist header_bl;
+ std::map<std::string, bufferlist> session_vals;
+
+ C_IO_SM_Load(SessionMap *cm, const bool f)
+ : SessionMapIOContext(cm), first(f), header_r(0), values_r(0) {}
+
void finish(int r) {
- sessionmap->_load_finish(r, bl);
+ sessionmap->_load_finish(r, header_r, values_r, first, header_bl, session_vals);
}
};
+
+/**
+ * Decode OMAP header. Call this once when loading.
+ */
+void SessionMapStore::decode_header(
+ bufferlist &header_bl)
+{
+ bufferlist::iterator q = header_bl.begin();
+ DECODE_START(1, q)
+ ::decode(version, q);
+ DECODE_FINISH(q);
+}
+
+void SessionMapStore::encode_header(
+ bufferlist *header_bl)
+{
+ ENCODE_START(1, 1, *header_bl);
+ ::encode(version, *header_bl);
+ ENCODE_FINISH(*header_bl);
+}
+
+/**
+ * Decode and insert some serialized OMAP values. Call this
+ * repeatedly to insert batched loads.
+ */
+void SessionMapStore::decode_values(std::map<std::string, bufferlist> &session_vals)
+{
+ for (std::map<std::string, bufferlist>::iterator i = session_vals.begin();
+ i != session_vals.end(); ++i) {
+
+ entity_inst_t inst;
+
+ bool parsed = inst.name.parse(i->first);
+ if (!parsed) {
+ derr << "Corrupt entity name '" << i->first << "' in sessionmap" << dendl;
+ throw buffer::malformed_input("Corrupt entity name in sessionmap");
+ }
+
+ Session *s = get_or_add_session(inst);
+ if (s->is_closed())
+ s->set_state(Session::STATE_OPEN);
+ bufferlist::iterator q = i->second.begin();
+ s->decode(q);
+ }
+}
+
+/**
+ * An OMAP read finished.
+ */
+void SessionMap::_load_finish(
+ int operation_r,
+ int header_r,
+ int values_r,
+ bool first,
+ bufferlist &header_bl,
+ std::map<std::string, bufferlist> &session_vals)
+{
+ if (operation_r < 0) {
+ derr << "_load_finish got " << cpp_strerror(operation_r) << dendl;
+ assert(0 == "failed to load sessionmap");
+ }
+
+ // Decode header
+ if (first) {
+ if (header_r != 0) {
+ derr << __func__ << ": header error: " << cpp_strerror(header_r) << dendl;
+ assert(0 == "error reading header!");
+ }
+
+ if(header_bl.length() == 0) {
+ dout(4) << __func__ << ": header missing, loading legacy..." << dendl;
+ load_legacy();
+ return;
+ }
+
+ decode_header(header_bl);
+ dout(10) << __func__ << " loaded version " << version << dendl;
+ }
+
+ if (values_r != 0) {
+ derr << __func__ << ": error reading values: "
+ << cpp_strerror(values_r) << dendl;
+ assert(0 == "error reading values");
+ }
+
+ // Decode session_vals
+ decode_values(session_vals);
+
+ if (session_vals.size() == g_conf->mds_sessionmap_keys_per_op) {
+ // Issue another read if we're not at the end of the omap
+ const std::string last_key = session_vals.rbegin()->first;
+ dout(10) << __func__ << ": continue omap load from '"
+ << last_key << "'" << dendl;
+ object_t oid = get_object_name();
+ object_locator_t oloc(mds->mdsmap->get_metadata_pool());
+ C_IO_SM_Load *c = new C_IO_SM_Load(this, false);
+ ObjectOperation op;
+ op.omap_get_vals(last_key, "", g_conf->mds_sessionmap_keys_per_op,
+ &c->session_vals, &c->values_r);
+ mds->objecter->read(oid, oloc, op, CEPH_NOSNAP, NULL, 0,
+ new C_OnFinisher(c, &mds->finisher));
+ } else {
+ // I/O is complete. Update `by_state`
+ dout(10) << __func__ << ": omap load complete" << dendl;
+ for (ceph::unordered_map<entity_name_t, Session*>::iterator i = session_map.begin();
+ i != session_map.end(); ++i) {
+ Session *s = i->second;
+ if (by_state.count(s->get_state()) == 0)
+ by_state[s->get_state()] = new xlist<Session*>;
+ by_state[s->get_state()]->push_back(&s->item_session_list);
+ }
+
+ // Population is complete. Trigger load waiters.
+ dout(10) << __func__ << ": v " << version
+ << ", " << session_map.size() << " sessions" << dendl;
+ projected = committing = committed = version;
+ dump();
+ finish_contexts(g_ceph_context, waiting_for_load);
+ }
+}
+
+/**
+ * Populate session state from OMAP records in this
+ * rank's sessionmap object.
+ */
void SessionMap::load(MDSInternalContextBase *onload)
{
dout(10) << "load" << dendl;
if (onload)
waiting_for_load.push_back(onload);
- C_IO_SM_Load *c = new C_IO_SM_Load(this);
+ C_IO_SM_Load *c = new C_IO_SM_Load(this, true);
object_t oid = get_object_name();
object_locator_t oloc(mds->mdsmap->get_metadata_pool());
+
+ ObjectOperation op;
+ op.omap_get_header(&c->header_bl, &c->header_r);
+ op.omap_get_vals("", "", g_conf->mds_sessionmap_keys_per_op,
+ &c->session_vals, &c->values_r);
+
+ mds->objecter->read(oid, oloc, op, CEPH_NOSNAP, NULL, 0, new C_OnFinisher(c, &mds->finisher));
+}
+
+class C_IO_SM_LoadLegacy : public SessionMapIOContext {
+public:
+ bufferlist bl;
+ C_IO_SM_LoadLegacy(SessionMap *cm) : SessionMapIOContext(cm) {}
+ void finish(int r) {
+ sessionmap->_load_legacy_finish(r, bl);
+ }
+};
+
+
+/**
+ * Load legacy (object data blob) SessionMap format, assuming
+ * that waiting_for_load has already been populated with
+ * the relevant completion. This is the fallback if we do not
+ * find an OMAP header when attempting to load normally.
+ */
+void SessionMap::load_legacy()
+{
+ dout(10) << __func__ << dendl;
+
+ C_IO_SM_LoadLegacy *c = new C_IO_SM_LoadLegacy(this);
+ object_t oid = get_object_name();
+ object_locator_t oloc(mds->mdsmap->get_metadata_pool());
+
mds->objecter->read_full(oid, oloc, CEPH_NOSNAP, &c->bl, 0,
new C_OnFinisher(c, &mds->finisher));
}
-void SessionMap::_load_finish(int r, bufferlist &bl)
+void SessionMap::_load_legacy_finish(int r, bufferlist &bl)
{
bufferlist::iterator blp = bl.begin();
if (r < 0) {
assert(0 == "failed to load sessionmap");
}
dump();
- decode(blp); // note: this sets last_cap_renew = now()
+ decode_legacy(blp); // note: this sets last_cap_renew = now()
dout(10) << "_load_finish v " << version
<< ", " << session_map.size() << " sessions, "
<< bl.length() << " bytes"
<< dendl;
projected = committing = committed = version;
dump();
+
+ // Mark all sessions dirty, so that on next save() we will write
+ // a complete OMAP version of the data loaded from the legacy format
+ for (ceph::unordered_map<entity_name_t, Session*>::iterator i = session_map.begin();
+ i != session_map.end(); ++i) {
+ // Don't use mark_dirty because on this occasion we want to ignore the
+ // keys_per_op limit and do one big write (upgrade must be atomic)
+ dirty_sessions.insert(i->first);
+ }
+ loaded_legacy = true;
+
finish_contexts(g_ceph_context, waiting_for_load);
}
void SessionMap::save(MDSInternalContextBase *onsave, version_t needv)
{
- dout(10) << "save needv " << needv << ", v " << version << dendl;
+ dout(10) << __func__ << ": needv " << needv << ", v " << version << dendl;
if (needv && committing >= needv) {
assert(committing > committed);
}
commit_waiters[version].push_back(onsave);
-
- bufferlist bl;
-
- encode(bl);
+
committing = version;
SnapContext snapc;
object_t oid = get_object_name();
object_locator_t oloc(mds->mdsmap->get_metadata_pool());
- mds->objecter->write_full(oid, oloc,
- snapc,
- bl, ceph_clock_now(g_ceph_context), 0,
- NULL,
- new C_OnFinisher(new C_IO_SM_Save(this, version),
- &mds->finisher));
+ ObjectOperation op;
+
+ /* Compose OSD OMAP transaction for full write */
+ bufferlist header_bl;
+ encode_header(&header_bl);
+ op.omap_set_header(header_bl);
+
+ /* If we loaded a legacy sessionmap, then erase the old data. If
+ * an old-versioned MDS tries to read it, it'll fail out safely
+ * with an end_of_buffer exception */
+ if (loaded_legacy) {
+ dout(4) << __func__ << " erasing legacy sessionmap" << dendl;
+ op.truncate(0);
+ loaded_legacy = false; // only need to truncate once.
+ }
+
+ dout(20) << " updating keys:" << dendl;
+ map<string, bufferlist> to_set;
+ for(std::set<entity_name_t>::const_iterator i = dirty_sessions.begin();
+ i != dirty_sessions.end(); ++i) {
+ const entity_name_t name = *i;
+ const Session *session = session_map[name];
+
+ if (session->is_open() ||
+ session->is_closing() ||
+ session->is_stale() ||
+ session->is_killing()) {
+ dout(20) << " " << name << dendl;
+ // Serialize K
+ std::ostringstream k;
+ k << name;
+
+ // Serialize V
+ bufferlist bl;
+ session->info.encode(bl);
+
+ // Add to RADOS op
+ to_set[k.str()] = bl;
+ } else {
+ dout(20) << " " << name << " (ignoring)" << dendl;
+ }
+ }
+ if (!to_set.empty()) {
+ op.omap_set(to_set);
+ }
+
+ dout(20) << " removing keys:" << dendl;
+ set<string> to_remove;
+ for(std::set<entity_name_t>::const_iterator i = null_sessions.begin();
+ i != null_sessions.end(); ++i) {
+ dout(20) << " " << *i << dendl;
+ std::ostringstream k;
+ k << *i;
+ to_remove.insert(k.str());
+ }
+ if (!to_remove.empty()) {
+ op.omap_rm_keys(to_remove);
+ }
+
+ dirty_sessions.clear();
+ null_sessions.clear();
+
+ mds->objecter->mutate(oid, oloc, op, snapc, ceph_clock_now(g_ceph_context),
+ 0, NULL, new C_OnFinisher(new C_IO_SM_Save(this, version), &mds->finisher));
}
void SessionMap::_save_finish(version_t v)
}
-// -------------------
-
-void SessionMapStore::encode(bufferlist& bl) const
-{
- uint64_t pre = -1; // for 0.19 compatibility; we forgot an encoding prefix.
- ::encode(pre, bl);
-
- ENCODE_START(3, 3, bl);
- ::encode(version, bl);
-
- for (ceph::unordered_map<entity_name_t,Session*>::const_iterator p = session_map.begin();
- p != session_map.end();
- ++p) {
- if (p->second->is_open() ||
- p->second->is_closing() ||
- p->second->is_stale() ||
- p->second->is_killing()) {
- ::encode(p->first, bl);
- p->second->info.encode(bl);
- }
- }
- ENCODE_FINISH(bl);
-}
-
/**
* Deserialize sessions, and update by_state index
*/
-void SessionMap::decode(bufferlist::iterator &p)
+void SessionMap::decode_legacy(bufferlist::iterator &p)
{
// Populate `sessions`
- SessionMapStore::decode(p);
+ SessionMapStore::decode_legacy(p);
// Update `by_state`
for (ceph::unordered_map<entity_name_t, Session*>::iterator i = session_map.begin();
return session->get_state_seq();
}
-void SessionMapStore::decode(bufferlist::iterator& p)
+void SessionMapStore::decode_legacy(bufferlist::iterator& p)
{
utime_t now = ceph_clock_now(g_ceph_context);
uint64_t pre;
s->item_session_list.remove_myself();
session_map.erase(s->info.inst.name);
s->put();
+ if (dirty_sessions.count(s->info.inst.name)) {
+ dirty_sessions.erase(s->info.inst.name);
+ }
+ null_sessions.insert(s->info.inst.name);
}
void SessionMap::touch_session(Session *session)
_update_human_name();
}
+void SessionMap::_mark_dirty(Session *s)
+{
+ if (dirty_sessions.size() >= g_conf->mds_sessionmap_keys_per_op) {
+ // Pre-empt the usual save() call from journal segment trim, in
+ // order to avoid building up an oversized OMAP update operation
+ // from too many sessions modified at once
+ save(new C_MDSInternalNoop, version);
+ }
+
+ dirty_sessions.insert(s->info.inst.name);
+}
+
+void SessionMap::mark_dirty(Session *s)
+{
+ dout(20) << __func__ << " s=" << s << " name=" << s->info.inst.name
+ << " v=" << version << dendl;
+
+ _mark_dirty(s);
+ version++;
+ s->pop_pv(version);
+}
+
+void SessionMap::replay_dirty_session(Session *s)
+{
+ dout(20) << __func__ << " s=" << s << " name=" << s->info.inst.name
+ << " v=" << version << dendl;
+
+ _mark_dirty(s);
+
+ replay_advance_version();
+}
+
+void SessionMap::replay_advance_version()
+{
+ version++;
+ projected = version;
+}
+
+version_t SessionMap::mark_projected(Session *s)
+{
+ dout(20) << __func__ << " s=" << s << " name=" << s->info.inst.name
+ << " pv=" << projected << " -> " << projected + 1 << dendl;
+ ++projected;
+ s->push_pv(projected);
+ return projected;
+}
+
void _update_human_name();
std::string human_name;
+ // Versions in this this session was projected: used to verify
+ // that appropriate mark_dirty calls follow.
+ std::deque<version_t> projected;
+
public:
+ void push_pv(version_t pv)
+ {
+ if (!projected.empty()) {
+ assert(projected.back() != pv);
+ }
+ projected.push_back(pv);
+ }
+
+ void pop_pv(version_t v)
+ {
+ assert(!projected.empty());
+ assert(projected.front() == v);
+ projected.pop_front();
+ }
+
inline int get_state() const {return state;}
void set_state(int new_state)
{
int get_state() { return state; }
const char *get_state_name() { return get_state_name(state); }
uint64_t get_state_seq() { return state_seq; }
- bool is_closed() { return state == STATE_CLOSED; }
- bool is_opening() { return state == STATE_OPENING; }
- bool is_open() { return state == STATE_OPEN; }
- bool is_closing() { return state == STATE_CLOSING; }
- bool is_stale() { return state == STATE_STALE; }
- bool is_killing() { return state == STATE_KILLING; }
+ bool is_closed() const { return state == STATE_CLOSED; }
+ bool is_opening() const { return state == STATE_OPENING; }
+ bool is_open() const { return state == STATE_OPEN; }
+ bool is_closing() const { return state == STATE_CLOSING; }
+ bool is_stale() const { return state == STATE_STALE; }
+ bool is_killing() const { return state == STATE_KILLING; }
void inc_importing() {
++importing_count;
last_cap_renew = utime_t();
}
-
};
/*
* encode/decode outside of live MDS instance.
*/
class SessionMapStore {
+protected:
+ version_t version;
public:
ceph::unordered_map<entity_name_t, Session*> session_map;
- version_t version;
mds_rank_t rank;
- virtual void encode(bufferlist& bl) const;
- virtual void decode(bufferlist::iterator& blp);
+ version_t get_version() const {return version;}
+
+ virtual void encode_header(bufferlist *header_bl);
+ virtual void decode_header(bufferlist &header_bl);
+ virtual void decode_values(std::map<std::string, bufferlist> &session_vals);
+ virtual void decode_legacy(bufferlist::iterator& blp);
void dump(Formatter *f) const;
void set_rank(mds_rank_t r)
public:
MDS *mds;
-public: // i am lazy
+protected:
version_t projected, committing, committed;
+public:
map<int,xlist<Session*>* > by_state;
uint64_t set_state(Session *session, int state);
map<version_t, list<MDSInternalContextBase*> > commit_waiters;
SessionMap(MDS *m) : mds(m),
- projected(0), committing(0), committed(0)
+ projected(0), committing(0), committed(0),
+ loaded_legacy(false)
{ }
+ void set_version(const version_t v)
+ {
+ version = projected = v;
+ }
+
+ void set_projected(const version_t v)
+ {
+ projected = v;
+ }
+
+ version_t get_projected() const
+ {
+ return projected;
+ }
+
+ version_t get_committed() const
+ {
+ return committed;
+ }
+
+ version_t get_committing() const
+ {
+ return committed;
+ }
+
// sessions
- void decode(bufferlist::iterator& blp);
+ void decode_legacy(bufferlist::iterator& blp);
bool empty() { return session_map.empty(); }
const ceph::unordered_map<entity_name_t, Session*> &get_sessions() const
{
object_t get_object_name();
void load(MDSInternalContextBase *onload);
- void _load_finish(int r, bufferlist &bl);
+ void _load_finish(
+ int operation_r,
+ int header_r,
+ int values_r,
+ bool first,
+ bufferlist &header_bl,
+ std::map<std::string, bufferlist> &session_vals);
+
+ void load_legacy();
+ void _load_legacy_finish(int r, bufferlist &bl);
+
void save(MDSInternalContextBase *onsave, version_t needv=0);
void _save_finish(version_t v);
-
+
+protected:
+ std::set<entity_name_t> dirty_sessions;
+ std::set<entity_name_t> null_sessions;
+ bool loaded_legacy;
+ void _mark_dirty(Session *session);
+public:
+
+ /**
+ * Advance the version, and mark this session
+ * as dirty within the new version.
+ *
+ * Dirty means journalled but needing writeback
+ * to the backing store. Must have called
+ * mark_projected previously for this session.
+ */
+ void mark_dirty(Session *session);
+
+ /**
+ * Advance the projected version, and mark this
+ * session as projected within the new version
+ *
+ * Projected means the session is updated in memory
+ * but we're waiting for the journal write of the update
+ * to finish. Must subsequently call mark_dirty
+ * for sessions in the same global order as calls
+ * to mark_projected.
+ */
+ version_t mark_projected(Session *session);
+
+ /**
+ * During replay, advance versions to account
+ * for a session modification, and mark the
+ * session dirty.
+ */
+ void replay_dirty_session(Session *session);
+
+ /**
+ * During replay, if a session no longer present
+ * would have consumed a version, advance `version`
+ * and `projected` to account for that.
+ */
+ void replay_advance_version();
};
}
// sessionmap
- if (sessionmapv > mds->sessionmap.committed) {
+ if (sessionmapv > mds->sessionmap.get_committed()) {
dout(10) << "try_to_expire saving sessionmap, need " << sessionmapv
- << ", committed is " << mds->sessionmap.committed
- << " (" << mds->sessionmap.committing << ")"
+ << ", committed is " << mds->sessionmap.get_committed()
+ << " (" << mds->sessionmap.get_committing() << ")"
<< dendl;
mds->sessionmap.save(gather_bld.new_sub(), sessionmapv);
}
}
}
if (sessionmapv) {
- if (mds->sessionmap.version >= sessionmapv) {
+ if (mds->sessionmap.get_version() >= sessionmapv) {
dout(10) << "EMetaBlob.replay sessionmap v " << sessionmapv
- << " <= table " << mds->sessionmap.version << dendl;
- } else if (mds->sessionmap.version + 2 >= sessionmapv) {
+ << " <= table " << mds->sessionmap.get_version() << dendl;
+ } else if (mds->sessionmap.get_version() + 2 >= sessionmapv) {
dout(10) << "EMetaBlob.replay sessionmap v " << sessionmapv
- << " -(1|2) == table " << mds->sessionmap.version
+ << " -(1|2) == table " << mds->sessionmap.get_version()
<< " prealloc " << preallocated_inos
<< " used " << used_preallocated_ino
<< dendl;
assert(i == used_preallocated_ino);
session->info.used_inos.clear();
}
- mds->sessionmap.projected = ++mds->sessionmap.version;
+ mds->sessionmap.replay_dirty_session(session);
}
if (!preallocated_inos.empty()) {
session->info.prealloc_inos.insert(preallocated_inos);
- mds->sessionmap.projected = ++mds->sessionmap.version;
+ mds->sessionmap.replay_dirty_session(session);
}
+
} else {
dout(10) << "EMetaBlob.replay no session for " << client_name << dendl;
- if (used_preallocated_ino)
- mds->sessionmap.projected = ++mds->sessionmap.version;
+ if (used_preallocated_ino) {
+ mds->sessionmap.replay_advance_version();
+ }
if (!preallocated_inos.empty())
- mds->sessionmap.projected = ++mds->sessionmap.version;
+ mds->sessionmap.replay_advance_version();
}
- assert(sessionmapv == mds->sessionmap.version);
+ assert(sessionmapv == mds->sessionmap.get_version());
} else {
mds->clog->error() << "journal replay sessionmap v " << sessionmapv
- << " -(1|2) > table " << mds->sessionmap.version << "\n";
+ << " -(1|2) > table " << mds->sessionmap.get_version() << "\n";
assert(g_conf->mds_wipe_sessions);
mds->sessionmap.wipe();
- mds->sessionmap.version = mds->sessionmap.projected = sessionmapv;
+ mds->sessionmap.set_version(sessionmapv);
}
}
void ESession::replay(MDS *mds)
{
- if (mds->sessionmap.version >= cmapv) {
- dout(10) << "ESession.replay sessionmap " << mds->sessionmap.version
+ if (mds->sessionmap.get_version() >= cmapv) {
+ dout(10) << "ESession.replay sessionmap " << mds->sessionmap.get_version()
<< " >= " << cmapv << ", noop" << dendl;
} else {
- dout(10) << "ESession.replay sessionmap " << mds->sessionmap.version
+ dout(10) << "ESession.replay sessionmap " << mds->sessionmap.get_version()
<< " < " << cmapv << " " << (open ? "open":"close") << " " << client_inst << dendl;
- mds->sessionmap.projected = ++mds->sessionmap.version;
- assert(mds->sessionmap.version == cmapv);
Session *session;
if (open) {
session = mds->sessionmap.get_or_add_session(client_inst);
if (session->connection == NULL) {
dout(10) << " removed session " << session->info.inst << dendl;
mds->sessionmap.remove_session(session);
+ session = NULL;
} else {
session->clear(); // the client has reconnected; keep the Session, but reset
dout(10) << " reset session " << session->info.inst << " (they reconnected)" << dendl;
<< " from time " << stamp << ", ignoring";
}
}
+ if (session) {
+ mds->sessionmap.replay_dirty_session(session);
+ } else {
+ mds->sessionmap.replay_advance_version();
+ }
+ assert(mds->sessionmap.get_version() == cmapv);
}
if (inos.size() && inotablev) {
void ESessions::replay(MDS *mds)
{
- if (mds->sessionmap.version >= cmapv) {
- dout(10) << "ESessions.replay sessionmap " << mds->sessionmap.version
+ if (mds->sessionmap.get_version() >= cmapv) {
+ dout(10) << "ESessions.replay sessionmap " << mds->sessionmap.get_version()
<< " >= " << cmapv << ", noop" << dendl;
} else {
- dout(10) << "ESessions.replay sessionmap " << mds->sessionmap.version
+ dout(10) << "ESessions.replay sessionmap " << mds->sessionmap.get_version()
<< " < " << cmapv << dendl;
mds->sessionmap.open_sessions(client_map);
- assert(mds->sessionmap.version == cmapv);
- mds->sessionmap.projected = mds->sessionmap.version;
+ assert(mds->sessionmap.get_version() == cmapv);
+ mds->sessionmap.set_projected(mds->sessionmap.get_version());
}
update_segment();
}
}
if (client_map.length()) {
- if (mds->sessionmap.version >= cmapv) {
+ if (mds->sessionmap.get_version() >= cmapv) {
dout(10) << "EUpdate.replay sessionmap v " << cmapv
- << " <= table " << mds->sessionmap.version << dendl;
+ << " <= table " << mds->sessionmap.get_version() << dendl;
} else {
- dout(10) << "EUpdate.replay sessionmap " << mds->sessionmap.version
+ dout(10) << "EUpdate.replay sessionmap " << mds->sessionmap.get_version()
<< " < " << cmapv << dendl;
// open client sessions?
map<client_t,entity_inst_t> cm;
mds->server->prepare_force_open_sessions(cm, seqm);
mds->server->finish_force_open_sessions(cm, seqm);
- assert(mds->sessionmap.version == cmapv);
- mds->sessionmap.projected = mds->sessionmap.version;
+ assert(mds->sessionmap.get_version() == cmapv);
+ mds->sessionmap.set_projected(mds->sessionmap.get_version());
}
}
}
mds_authority_t(mds->get_nodeid(), mds->get_nodeid()));
// open client sessions?
- if (mds->sessionmap.version >= cmapv) {
- dout(10) << "EImportStart.replay sessionmap " << mds->sessionmap.version
+ if (mds->sessionmap.get_version() >= cmapv) {
+ dout(10) << "EImportStart.replay sessionmap " << mds->sessionmap.get_version()
<< " >= " << cmapv << ", noop" << dendl;
} else {
- dout(10) << "EImportStart.replay sessionmap " << mds->sessionmap.version
+ dout(10) << "EImportStart.replay sessionmap " << mds->sessionmap.get_version()
<< " < " << cmapv << dendl;
map<client_t,entity_inst_t> cm;
bufferlist::iterator blp = client_map.begin();
::decode(cm, blp);
mds->sessionmap.open_sessions(cm);
- assert(mds->sessionmap.version == cmapv);
- mds->sessionmap.projected = mds->sessionmap.version;
+ assert(mds->sessionmap.get_version() == cmapv);
+ mds->sessionmap.set_projected(mds->sessionmap.get_version());
}
update_segment();
}
#include "mds/SnapServer.h"
TYPEWITHSTRAYDATA(SnapServer)
-#include "mds/SessionMap.h"
-TYPE(SessionMapStore)
-
#include "mds/events/ECommitted.h"
TYPE(ECommitted)
#include "mds/events/EExport.h"
}
};
+template <typename A>
+class TableHandlerOmap
+{
+private:
+ // The RADOS object ID for the table
+ std::string object_name;
+
+ // The rank in question (may be NONE)
+ mds_rank_t rank;
+
+ // Whether this is an MDSTable subclass (i.e. has leading version field to decode)
+ bool mds_table;
+
+public:
+ TableHandlerOmap(mds_rank_t r, std::string const &name, bool mds_table_)
+ : rank(r), mds_table(mds_table_)
+ {
+ // Compose object name of the table we will dump
+ std::ostringstream oss;
+ oss << "mds";
+ if (rank != MDS_RANK_NONE) {
+ oss << rank;
+ }
+ oss << "_" << name;
+ object_name = oss.str();
+ }
+
+ int load_and_dump(librados::IoCtx *io, Formatter *f)
+ {
+ assert(io != NULL);
+ assert(f != NULL);
+
+ // Read in the header
+ bufferlist header_bl;
+ int r = io->omap_get_header(object_name, &header_bl);
+ if (r != 0) {
+ derr << "error reading header: " << cpp_strerror(r) << dendl;
+ return r;
+ }
+
+ // Decode the header
+ A table_inst;
+ table_inst.set_rank(rank);
+ try {
+ table_inst.decode_header(header_bl);
+ } catch (buffer::error &e) {
+ derr << "table " << object_name << " is corrupt" << dendl;
+ return -EIO;
+ }
+
+ // Read and decode OMAP values in chunks
+ std::string last_key = "";
+ while(true) {
+ std::map<std::string, bufferlist> values;
+ int r = io->omap_get_vals(object_name, last_key,
+ g_conf->mds_sessionmap_keys_per_op, &values);
+
+ if (r != 0) {
+ derr << "error reading values: " << cpp_strerror(r) << dendl;
+ return r;
+ }
+
+ if (values.empty()) {
+ break;
+ }
+
+ try {
+ table_inst.decode_values(values);
+ } catch (buffer::error &e) {
+ derr << "table " << object_name << " is corrupt" << dendl;
+ return -EIO;
+ }
+ last_key = values.rbegin()->first;
+ }
+
+ table_inst.dump(f);
+
+ return 0;
+ }
+
+ int reset(librados::IoCtx *io)
+ {
+ A table_inst;
+ table_inst.set_rank(rank);
+ table_inst.reset_state();
+
+ bufferlist header_bl;
+ table_inst.encode_header(&header_bl);
+
+ // Compose a transaction to clear and write header
+ librados::ObjectWriteOperation op;
+ op.omap_clear();
+ op.omap_set_header(header_bl);
+
+ return io->operate(object_name, &op);
+ }
+};
+
int TableTool::_show_session_table(mds_rank_t rank, Formatter *f)
{
- return TableHandler<SessionMapStore>(rank, "sessionmap", false).load_and_dump(&io, f);
+ return TableHandlerOmap<SessionMapStore>(rank, "sessionmap", false).load_and_dump(&io, f);
}
int TableTool::_reset_session_table(mds_rank_t rank, Formatter *f)
{
- return TableHandler<SessionMapStore>(rank, "sessionmap", false).reset(&io);
+ return TableHandlerOmap<SessionMapStore>(rank, "sessionmap", false).reset(&io);
}
int TableTool::_show_ino_table(mds_rank_t rank, Formatter *f)