From e683fed714b86990783fde29fbd49518ce08b94b Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Fri, 4 Jan 2008 12:43:09 -0800 Subject: [PATCH] more caps work; fixes to avoid xlist::item copying, which is bad --- src/client/Client.cc | 11 +++- src/config.cc | 4 +- src/include/ceph_fs.h | 5 +- src/include/xlist.h | 4 ++ src/mds/CInode.cc | 2 +- src/mds/CInode.h | 51 +++++------------ src/mds/Capability.h | 11 ---- src/mds/Locker.cc | 42 ++++++++++---- src/mds/Locker.h | 1 + src/mds/MDCache.cc | 16 ++++-- src/mds/MDCache.h | 2 +- src/mds/Migrator.cc | 39 +++++++------ src/mds/Server.cc | 102 ++++++++++++++++++++++------------ src/mds/Server.h | 2 +- src/mds/SessionMap.h | 12 ++-- src/mds/journal.cc | 8 ++- src/messages/MClientSession.h | 19 +++++-- src/start.sh | 2 +- 18 files changed, 192 insertions(+), 141 deletions(-) diff --git a/src/client/Client.cc b/src/client/Client.cc index 654f061ac0733..ed022126acd96 100644 --- a/src/client/Client.cc +++ b/src/client/Client.cc @@ -787,6 +787,12 @@ void Client::handle_client_session(MClientSession *m) last_cap_renew = g_clock.now(); break; + case CEPH_SESSION_STALE: + // hmm, verify caps have been revoked? + messenger->send_message(new MClientSession(CEPH_SESSION_REQUEST_RESUME, g_clock.now()), + m->get_source_inst()); + break; + default: assert(0); } @@ -1586,8 +1592,7 @@ int Client::unmount() p != mds_sessions.end(); ++p) { dout(2) << "sending client_session close to mds" << p->first << " seq " << p->second << dendl; - messenger->send_message(new MClientSession(CEPH_SESSION_REQUEST_CLOSE, - p->second), + messenger->send_message(new MClientSession(CEPH_SESSION_REQUEST_CLOSE, p->second), mdsmap->get_inst(p->first)); } @@ -1654,7 +1659,7 @@ void Client::renew_caps() p++) { dout(15) << "renew_caps requesting from mds" << p->first << dendl; messenger->send_message(new MClientSession(CEPH_SESSION_REQUEST_RENEWCAPS), - mdsmap->get_inst(p->first)); + mdsmap->get_inst(p->first)); } } diff --git a/src/config.cc b/src/config.cc index 88a2592db3e05..aaf1c85fcf2aa 100644 --- a/src/config.cc +++ b/src/config.cc @@ -224,8 +224,8 @@ md_config_t g_conf = { mds_beacon_interval: 4, //30.0, mds_beacon_grace: 15, //60*60.0, - mds_cap_timeout: 60, // cap bits time out if client idle - mds_session_autoclose: 300, // autoclose idle session + mds_cap_timeout: 10, // cap bits time out if client idle + mds_session_autoclose: 30, // autoclose idle session mds_log: true, mds_log_max_events: -1, //MDS_CACHE_SIZE / 3, diff --git a/src/include/ceph_fs.h b/src/include/ceph_fs.h index dde40124c9c3b..498660c828068 100644 --- a/src/include/ceph_fs.h +++ b/src/include/ceph_fs.h @@ -285,7 +285,10 @@ enum { CEPH_SESSION_REQUEST_CLOSE, CEPH_SESSION_CLOSE, CEPH_SESSION_REQUEST_RENEWCAPS, - CEPH_SESSION_RENEWCAPS + CEPH_SESSION_RENEWCAPS, + CEPH_SESSION_STALE, // caps not renewed. + CEPH_SESSION_REQUEST_RESUME, + CEPH_SESSION_RESUME }; /* client_request */ diff --git a/src/include/xlist.h b/src/include/xlist.h index 319aa8647610b..350d34d15e9ce 100644 --- a/src/include/xlist.h +++ b/src/include/xlist.h @@ -27,6 +27,10 @@ public: ~item() { remove_myself(); } + // no copying! + item(const item& other); + const item& operator= (const item& right); + xlist* get_xlist() { return _list; } void remove_myself() { diff --git a/src/mds/CInode.cc b/src/mds/CInode.cc index 4320750c50eb8..a1184a16b7556 100644 --- a/src/mds/CInode.cc +++ b/src/mds/CInode.cc @@ -78,7 +78,7 @@ ostream& operator<<(ostream& out, CInode& in) // hack: spit out crap on which clients have caps if (!in.get_client_caps().empty()) { out << " caps={"; - for (map::iterator it = in.get_client_caps().begin(); + for (map::iterator it = in.get_client_caps().begin(); it != in.get_client_caps().end(); it++) { if (it != in.get_client_caps().begin()) out << ","; diff --git a/src/mds/CInode.h b/src/mds/CInode.h index c9ce121b05ab6..a88a5db1f6007 100644 --- a/src/mds/CInode.h +++ b/src/mds/CInode.h @@ -191,7 +191,7 @@ public: // -- distributed state -- protected: // file capabilities - map client_caps; // client -> caps + map client_caps; // client -> caps map mds_caps_wanted; // [auth] mds -> caps wanted int replica_caps_wanted; // [replica] what i've requested from auth utime_t replica_caps_wanted_keep_until; @@ -342,23 +342,24 @@ public: // -- caps -- (new) // client caps bool is_any_caps() { return !client_caps.empty(); } - map& get_client_caps() { return client_caps; } + map& get_client_caps() { return client_caps; } Capability *get_client_cap(int client) { if (client_caps.count(client)) - return &client_caps[client]; + return client_caps[client]; return 0; } Capability *add_client_cap(int client, CInode *in, xlist& cls) { if (client_caps.empty()) get(PIN_CAPS); assert(client_caps.count(client) == 0); - Capability *cap = &client_caps[client]; + Capability *cap = client_caps[client] = new Capability; cap->set_inode(in); cap->add_to_cap_list(cls); return cap; } void remove_client_cap(int client) { assert(client_caps.count(client) == 1); + delete client_caps[client]; client_caps.erase(client); if (client_caps.empty()) put(PIN_CAPS); @@ -376,59 +377,33 @@ public: inode.mtime = MAX(inode.mtime, icr.mtime); inode.atime = MAX(inode.atime, icr.atime); } - /* - void set_client_caps(map& cl) { - if (client_caps.empty() && !cl.empty()) - get(PIN_CAPS); - client_caps.clear(); - client_caps = cl; - } - */ void clear_client_caps() { - if (!client_caps.empty()) - put(PIN_CAPS); - client_caps.clear(); + while (!client_caps.empty()) + remove_client_cap(client_caps.begin()->first); } void export_client_caps(map& cl) { - for (map::iterator it = client_caps.begin(); + for (map::iterator it = client_caps.begin(); it != client_caps.end(); it++) { - cl[it->first] = it->second.make_export(); + cl[it->first] = it->second->make_export(); } } - void merge_client_caps(map& cl, set& new_client_caps) { - if (client_caps.empty() && !cl.empty()) - get(PIN_CAPS); - - for (map::iterator it = cl.begin(); - it != cl.end(); - it++) { - new_client_caps.insert(it->first); - if (client_caps.count(it->first)) { - // merge - client_caps[it->first].merge(it->second); - } else { - // new - client_caps[it->first] = Capability(this, it->second); - } - } - } // caps issued, wanted int get_caps_issued() { int c = 0; - for (map::iterator it = client_caps.begin(); + for (map::iterator it = client_caps.begin(); it != client_caps.end(); it++) - c |= it->second.issued(); + c |= it->second->issued(); return c; } int get_caps_wanted() { int w = 0; - for (map::iterator it = client_caps.begin(); + for (map::iterator it = client_caps.begin(); it != client_caps.end(); it++) { - w |= it->second.wanted(); + w |= it->second->wanted(); //cout << " get_caps_wanted client " << it->first << " " << cap_string(it->second.wanted()) << endl; } if (is_auth()) diff --git a/src/mds/Capability.h b/src/mds/Capability.h index 6cb7ef520c34c..a0ca6667d5844 100644 --- a/src/mds/Capability.h +++ b/src/mds/Capability.h @@ -76,17 +76,6 @@ public: suppress(false), stale(false), session_caps_item(this) { } - Capability(CInode *i, Export& other) : - inode(i), - wanted_caps(other.wanted), - last_sent(0), last_recv(0), - suppress(false), stale(false), - session_caps_item(this) { - // issued vs pending - if (other.issued & ~other.pending) - issue(other.issued); - issue(other.pending); - } bool is_suppress() { return suppress; } void set_suppress(bool b) { suppress = b; } diff --git a/src/mds/Locker.cc b/src/mds/Locker.cc index e4a8792008aef..02ba34a56352c 100644 --- a/src/mds/Locker.cc +++ b/src/mds/Locker.cc @@ -524,16 +524,17 @@ bool Locker::issue_caps(CInode *in) int nissued = 0; // client caps - for (map::iterator it = in->client_caps.begin(); + for (map::iterator it = in->client_caps.begin(); it != in->client_caps.end(); it++) { - if (it->second.pending() != (it->second.wanted() & allowed)) { + Capability *cap = it->second; + if (cap->pending() != (cap->wanted() & allowed)) { // issue nissued++; - int before = it->second.pending(); - long seq = it->second.issue(it->second.wanted() & allowed); - int after = it->second.pending(); + int before = cap->pending(); + long seq = cap->issue(cap->wanted() & allowed); + int after = cap->pending(); // twiddle file_data_version? if (!(before & CEPH_CAP_WRBUFFER) && @@ -543,13 +544,15 @@ bool Locker::issue_caps(CInode *in) } if (seq > 0 && - !it->second.is_suppress()) { - dout(7) << " sending MClientFileCaps to client" << it->first << " seq " << it->second.get_last_seq() << " new pending " << cap_string(it->second.pending()) << " was " << cap_string(before) << dendl; + !cap->is_suppress()) { + dout(7) << " sending MClientFileCaps to client" << it->first << " seq " << cap->get_last_seq() + << " new pending " << cap_string(cap->pending()) << " was " << cap_string(before) + << dendl; mds->send_message_client(new MClientFileCaps(MClientFileCaps::OP_GRANT, in->inode, - it->second.get_last_seq(), - it->second.pending(), - it->second.wanted()), + cap->get_last_seq(), + cap->pending(), + cap->wanted()), it->first); } } @@ -560,7 +563,7 @@ bool Locker::issue_caps(CInode *in) void Locker::revoke_stale_caps(Session *session) { - dout(10) << "revoke_stale_caps for client " << session->inst.name << dendl; + dout(10) << "revoke_stale_caps for " << session->inst.name << dendl; for (xlist::iterator p = session->caps.begin(); !p.end(); ++p) { Capability *cap = *p; @@ -574,7 +577,22 @@ void Locker::revoke_stale_caps(Session *session) dout(10) << " nothing issued on " << *in << dendl; } cap->set_stale(true); - } + } +} + +void Locker::resume_stale_caps(Session *session) +{ + dout(10) << "resume_stale_caps for " << session->inst.name << dendl; + + for (xlist::iterator p = session->caps.begin(); !p.end(); ++p) { + Capability *cap = *p; + CInode *in = cap->get_inode(); + if (cap->is_stale()) { + dout(10) << " clearing stale flag on " << *in << dendl; + cap->set_stale(false); + file_eval(&in->filelock); + } + } } class C_MDL_RequestInodeFileCaps : public Context { diff --git a/src/mds/Locker.h b/src/mds/Locker.h index 7fb2ec97039b5..5619245e5cf6e 100644 --- a/src/mds/Locker.h +++ b/src/mds/Locker.h @@ -182,6 +182,7 @@ protected: Capability* issue_new_caps(CInode *in, int mode, Session *session); bool issue_caps(CInode *in); void revoke_stale_caps(Session *session); + void resume_stale_caps(Session *session); protected: void handle_client_file_caps(class MClientFileCaps *m); diff --git a/src/mds/MDCache.cc b/src/mds/MDCache.cc index 4f54c7c65cca9..98883875694ba 100644 --- a/src/mds/MDCache.cc +++ b/src/mds/MDCache.cc @@ -1082,7 +1082,7 @@ void MDCache::send_resolve_now(int who) } // [resolving] if (uncommitted_slave_updates.count(who)) { - for (map::iterator p = uncommitted_slave_updates[who].begin(); + for (map::iterator p = uncommitted_slave_updates[who].begin(); p != uncommitted_slave_updates[who].end(); ++p) { dout(10) << " including uncommitted " << p->first << dendl; @@ -1388,7 +1388,8 @@ void MDCache::handle_resolve_ack(MMDSResolveAck *ack) if (mds->is_resolve()) { // replay assert(uncommitted_slave_updates[from].count(*p)); - uncommitted_slave_updates[from][*p].commit.replay(mds); + uncommitted_slave_updates[from][*p]->commit.replay(mds); + delete uncommitted_slave_updates[from][*p]; uncommitted_slave_updates[from].erase(*p); // log commit mds->mdlog->submit_entry(new ESlaveUpdate(mds->mdlog, "unknown", *p, from, ESlaveUpdate::OP_COMMIT)); @@ -1406,7 +1407,8 @@ void MDCache::handle_resolve_ack(MMDSResolveAck *ack) if (mds->is_resolve()) { assert(uncommitted_slave_updates[from].count(*p)); - uncommitted_slave_updates[from][*p].rollback.replay(mds); + uncommitted_slave_updates[from][*p]->rollback.replay(mds); + delete uncommitted_slave_updates[from][*p]; uncommitted_slave_updates[from].erase(*p); mds->mdlog->submit_entry(new ESlaveUpdate(mds->mdlog, "unknown", *p, from, ESlaveUpdate::OP_ROLLBACK)); } else { @@ -2659,11 +2661,13 @@ void MDCache::rejoin_import_cap(CInode *in, int client, inode_caps_reconnect_t& in->reconnect_cap(client, icr, session->caps); // send REAP + Capability *cap = in->get_client_cap(client); + assert(cap); // ? MClientFileCaps *reap = new MClientFileCaps(MClientFileCaps::OP_IMPORT, in->inode, - in->client_caps[client].get_last_seq(), - in->client_caps[client].pending(), - in->client_caps[client].wanted()); + cap->get_last_seq(), + cap->pending(), + cap->wanted()); reap->set_mds(frommds); // reap from whom? mds->messenger->send_message(reap, session->inst); } diff --git a/src/mds/MDCache.h b/src/mds/MDCache.h index 59996c9733c6f..e7f590146bea9 100644 --- a/src/mds/MDCache.h +++ b/src/mds/MDCache.h @@ -386,7 +386,7 @@ protected: // from MMDSResolves map > > other_ambiguous_imports; - map > uncommitted_slave_updates; // for replay. + map > uncommitted_slave_updates; // for replay. map ambiguous_slave_updates; // for log trimming. map waiting_for_slave_update_commit; friend class ESlaveUpdate; diff --git a/src/mds/Migrator.cc b/src/mds/Migrator.cc index 5565d541bdf03..517644da49b0d 100644 --- a/src/mds/Migrator.cc +++ b/src/mds/Migrator.cc @@ -882,7 +882,7 @@ void Migrator::encode_export_inode_caps(CInode *in, bufferlist& bl, in->state_set(CInode::STATE_EXPORTINGCAPS); // make note of clients named by exported capabilities - for (map::iterator it = in->client_caps.begin(); + for (map::iterator it = in->client_caps.begin(); it != in->client_caps.end(); it++) exported_client_map[it->first] = mds->sessionmap.get_inst(entity_name_t::CLIENT(it->first)); @@ -893,16 +893,17 @@ void Migrator::finish_export_inode_caps(CInode *in) in->state_clear(CInode::STATE_EXPORTINGCAPS); // tell (all) clients about migrating caps.. - for (map::iterator it = in->client_caps.begin(); + for (map::iterator it = in->client_caps.begin(); it != in->client_caps.end(); it++) { + Capability *cap = it->second; dout(7) << "finish_export_inode telling client" << it->first << " exported caps on " << *in << dendl; MClientFileCaps *m = new MClientFileCaps(MClientFileCaps::OP_EXPORT, in->inode, - it->second.get_last_seq(), - it->second.pending(), - it->second.wanted()); + cap->get_last_seq(), + cap->pending(), + cap->wanted()); mds->send_message_client(m, it->first); } in->clear_client_caps(); @@ -2034,23 +2035,29 @@ void Migrator::finish_import_inode_caps(CInode *in, int from, map &cap_map) { assert(!cap_map.empty()); - - set new_caps; - in->merge_client_caps(cap_map, new_caps); - in->put(CInode::PIN_IMPORTINGCAPS); - for (set::iterator it = new_caps.begin(); - it != new_caps.end(); + for (map::iterator it = cap_map.begin(); + it != cap_map.end(); it++) { - dout(0) << "finish_import_inode_caps for client" << *it << " on " << *in << dendl; + dout(0) << "finish_import_inode_caps for client" << it->first << " on " << *in << dendl; + Session *session = mds->sessionmap.get_session(entity_name_t::CLIENT(it->first)); + assert(session); + + Capability *cap = in->get_client_cap(it->first); + if (!cap) + cap = in->add_client_cap(it->first, in, session->caps); + cap->merge(it->second); + MClientFileCaps *caps = new MClientFileCaps(MClientFileCaps::OP_IMPORT, in->inode, - in->client_caps[*it].get_last_seq(), - in->client_caps[*it].pending(), - in->client_caps[*it].wanted()); + cap->get_last_seq(), + cap->pending(), + cap->wanted()); caps->set_mds(from); // from whom? - mds->send_message_client(caps, *it); + mds->send_message_client(caps, session->inst); } + + in->put(CInode::PIN_IMPORTINGCAPS); } int Migrator::decode_import_dir(bufferlist::iterator& blp, diff --git a/src/mds/Server.cc b/src/mds/Server.cc index 7f4494898af32..133d31b81d00c 100644 --- a/src/mds/Server.cc +++ b/src/mds/Server.cc @@ -102,6 +102,7 @@ void Server::dispatch(Message *m) switch (m->get_type()) { case CEPH_MSG_CLIENT_SESSION: handle_client_session((MClientSession*)m); + delete m; return; case CEPH_MSG_CLIENT_REQUEST: handle_client_request((MClientRequest*)m); @@ -137,72 +138,79 @@ public: void Server::handle_client_session(MClientSession *m) { - dout(3) << "handle_client_session " << *m << " from " << m->get_source() << dendl; - bool open = false; + version_t pv; Session *session = mds->sessionmap.get_session(m->get_source()); + + dout(3) << "handle_client_session " << *m << " from " << m->get_source() << dendl; assert(m->get_source().is_client()); // should _not_ come from an mds! switch (m->op) { + case CEPH_SESSION_REQUEST_OPEN: + if (session && (session->is_opening() || session->is_open())) { + dout(10) << "already open|opening, dropping this req" << dendl; + return; + } + assert(!session); // ? + session = mds->sessionmap.get_or_add_session(m->get_source_inst()); + session->state = Session::STATE_OPENING; + mds->sessionmap.touch_session(session); + pv = ++mds->sessionmap.projected; + mdlog->submit_entry(new ESession(m->get_source_inst(), true, pv), + new C_MDS_session_finish(mds, session, true, pv)); + break; case CEPH_SESSION_REQUEST_RENEWCAPS: if (!session) { dout(10) << "dne, dropping this req" << dendl; - delete m; return; } mds->sessionmap.touch_session(session); - mds->messenger->send_message(new MClientSession(CEPH_SESSION_RENEWCAPS), session->inst); - delete m; - return; + mds->messenger->send_message(new MClientSession(CEPH_SESSION_RENEWCAPS, m->stamp), session->inst); + break; - case CEPH_SESSION_REQUEST_OPEN: - open = true; - if (session && (session->is_opening() || session->is_open())) { - dout(10) << "already open|opening, dropping this req" << dendl; - delete m; + case CEPH_SESSION_REQUEST_RESUME: + if (!session) { + dout(10) << "dne, replying with close" << dendl; + mds->messenger->send_message(new MClientSession(CEPH_SESSION_CLOSE), m->get_source_inst()); return; } - assert(!session); // ? - session = mds->sessionmap.get_or_add_session(m->get_source_inst()); - session->state = Session::STATE_OPENING; + if (!session->is_stale()) { + dout(10) << "hmm, got request_resume on non-stale session for " << session->inst << dendl; + assert(0); + return; + } + session->state = Session::STATE_OPEN; mds->sessionmap.touch_session(session); + mds->locker->resume_stale_caps(session); + mds->messenger->send_message(new MClientSession(CEPH_SESSION_RESUME, m->stamp), session->inst); break; case CEPH_SESSION_REQUEST_CLOSE: if (!session || session->is_closing()) { dout(10) << "already closing|dne, dropping this req" << dendl; - delete m; return; } if (m->seq < session->get_push_seq()) { dout(10) << "old push seq " << m->seq << " < " << session->get_push_seq() << ", dropping" << dendl; - delete m; return; } assert(m->seq == session->get_push_seq()); session->state = Session::STATE_CLOSING; + pv = ++mds->sessionmap.projected; + mdlog->submit_entry(new ESession(m->get_source_inst(), false, pv), + new C_MDS_session_finish(mds, session, false, pv)); break; default: assert(0); } - - // journal it - version_t pv = ++mds->sessionmap.projected; - dout(10) << " sessionmap v " << mds->sessionmap.version << " pv " << pv << dendl; - mdlog->submit_entry(new ESession(m->get_source_inst(), open, pv), - new C_MDS_session_finish(mds, session, open, pv)); - delete m; - - if (logger) logger->inc("hcsess"); } void Server::_session_logged(Session *session, bool open, version_t pv) { dout(10) << "_session_logged " << session->inst << " " << (open ? "open":"close") - << " " << pv - << dendl; + << " " << pv << dendl; // apply if (open) { @@ -217,11 +225,8 @@ void Server::_session_logged(Session *session, bool open, version_t pv) assert(!open); mds->sessionmap.version++; // noop } - - assert(pv == mds->sessionmap.version); - - // reply - if (open) + + if (open) mds->messenger->send_message(new MClientSession(CEPH_SESSION_OPEN), session->inst); else mds->messenger->send_message(new MClientSession(CEPH_SESSION_CLOSE), session->inst); @@ -287,14 +292,16 @@ void Server::find_idle_sessions() { dout(10) << "find_idle_sessions" << dendl; - utime_t cutoff = g_clock.now(); + // stale + utime_t now = g_clock.now(); + utime_t cutoff = now; cutoff -= g_conf.mds_cap_timeout; while (1) { Session *session = mds->sessionmap.get_oldest_active_session(); if (!session) break; - dout(20) << "laggiest session is " << session->inst << dendl; + dout(20) << "laggiest active session is " << session->inst << dendl; if (session->last_cap_renew >= cutoff) { - dout(20) << "laggiest session is " << session->inst << " and sufficiently new (" + dout(20) << "laggiest active session is " << session->inst << " and sufficiently new (" << session->last_cap_renew << ")" << dendl; break; } @@ -302,7 +309,32 @@ void Server::find_idle_sessions() dout(10) << "new stale session " << session->inst << " last " << session->last_cap_renew << dendl; mds->sessionmap.mark_session_stale(session); mds->locker->revoke_stale_caps(session); + mds->messenger->send_message(new MClientSession(CEPH_SESSION_STALE, g_clock.now()), + session->inst); } + + // dead + cutoff = now; + cutoff -= g_conf.mds_session_autoclose; + while (1) { + Session *session = mds->sessionmap.get_oldest_stale_session(); + if (!session) break; + dout(20) << "oldest stale session is " << session->inst << dendl; + if (session->last_cap_renew >= cutoff) { + dout(20) << "oldest stale session is " << session->inst << " and sufficiently new (" + << session->last_cap_renew << ")" << dendl; + break; + } + + dout(10) << "autoclosing stale session " << session->inst << " last " << session->last_cap_renew << dendl; + + mds->sessionmap.mark_session_stale(session); + mds->locker->revoke_stale_caps(session); + mds->messenger->send_message(new MClientSession(CEPH_SESSION_CLOSE), + session->inst); + } + + } diff --git a/src/mds/Server.h b/src/mds/Server.h index 5fcd3c155fd98..1550bf23fe457 100644 --- a/src/mds/Server.h +++ b/src/mds/Server.h @@ -56,7 +56,7 @@ public: set reconnected_caps; void handle_client_session(class MClientSession *m); - void _session_logged(Session *session, bool open, version_t cmapv); + void _session_logged(Session *session, bool open, version_t pv); void prepare_force_open_sessions(map &cm); void finish_force_open_sessions(map &cm); void terminate_sessions(); diff --git a/src/mds/SessionMap.h b/src/mds/SessionMap.h index 1dc6cd1ea63ae..bc9a2402dd9dc 100644 --- a/src/mds/SessionMap.h +++ b/src/mds/SessionMap.h @@ -36,11 +36,11 @@ class Session { // -- state etc -- public: static const int STATE_UNDEF = 0; - static const int STATE_OPENING = 1; + static const int STATE_OPENING = 1; // journaling open static const int STATE_OPEN = 2; - static const int STATE_CLOSING = 3; - static const int STATE_STALE = 4; // ? - static const int STATE_RECONNECTING = 5; + static const int STATE_CLOSING = 3; // journaling close + static const int STATE_STALE = 4; + static const int STATE_RECONNECTING = 5; // ? int state; entity_inst_t inst; @@ -165,6 +165,10 @@ public: void mark_session_stale(Session *s) { stale_sessions.push_back(&s->session_list_item); } + Session *get_oldest_stale_session() { + if (stale_sessions.empty()) return 0; + return stale_sessions.front(); + } void get_client_set(set& s) { for (hash_map::iterator p = session_map.begin(); diff --git a/src/mds/journal.cc b/src/mds/journal.cc index 036f1cdc9f710..3f25262fdff6b 100644 --- a/src/mds/journal.cc +++ b/src/mds/journal.cc @@ -613,14 +613,15 @@ void ESlaveUpdate::replay(MDS *mds) commit._segment = _segment; // may need this later rollback._segment = _segment; // may need this later mds->mdcache->uncommitted_slave_updates[master][reqid] = - MDSlaveUpdate(commit, rollback, _segment->slave_updates); + new MDSlaveUpdate(commit, rollback, _segment->slave_updates); break; case ESlaveUpdate::OP_COMMIT: if (mds->mdcache->uncommitted_slave_updates[master].count(reqid)) { dout(10) << "ESlaveUpdate.replay commit " << reqid << " for mds" << master << ": applying commit blob" << dendl; - mds->mdcache->uncommitted_slave_updates[master][reqid].commit.replay(mds, _segment); + mds->mdcache->uncommitted_slave_updates[master][reqid]->commit.replay(mds, _segment); + delete mds->mdcache->uncommitted_slave_updates[master][reqid]; mds->mdcache->uncommitted_slave_updates[master].erase(reqid); } else { dout(10) << "ESlaveUpdate.replay commit " << reqid << " for mds" << master @@ -633,7 +634,8 @@ void ESlaveUpdate::replay(MDS *mds) dout(10) << "ESlaveUpdate.replay abort " << reqid << " for mds" << master << ": applying rollback blob" << dendl; assert(mds->mdcache->uncommitted_slave_updates[master].count(reqid)); - mds->mdcache->uncommitted_slave_updates[master][reqid].rollback.replay(mds, _segment); + mds->mdcache->uncommitted_slave_updates[master][reqid]->rollback.replay(mds, _segment); + delete mds->mdcache->uncommitted_slave_updates[master][reqid]; mds->mdcache->uncommitted_slave_updates[master].erase(reqid); } else { dout(10) << "ESlaveUpdate.replay abort " << reqid << " for mds" << master diff --git a/src/messages/MClientSession.h b/src/messages/MClientSession.h index 73aa364827723..15b27ac3efdf3 100644 --- a/src/messages/MClientSession.h +++ b/src/messages/MClientSession.h @@ -27,17 +27,22 @@ public: case CEPH_SESSION_CLOSE: return "close"; case CEPH_SESSION_REQUEST_RENEWCAPS: return "request_renewcaps"; case CEPH_SESSION_RENEWCAPS: return "renewcaps"; + case CEPH_SESSION_STALE: return "stale"; default: assert(0); return 0; } } int32_t op; - version_t seq; + version_t seq; // used when requesting close only + utime_t stamp; MClientSession() : Message(CEPH_MSG_CLIENT_SESSION) { } MClientSession(int o, version_t s=0) : Message(CEPH_MSG_CLIENT_SESSION), op(o), seq(s) { } + MClientSession(int o, utime_t st) : + Message(CEPH_MSG_CLIENT_SESSION), + op(o), seq(0), stamp(st) { } const char *get_type_name() { return "client_session"; } void print(ostream& out) { @@ -47,13 +52,15 @@ public: } void decode_payload() { - int off = 0; - ::_decode(op, payload, off); - ::_decode(seq, payload, off); + bufferlist::iterator p = payload.begin(); + ::_decode_simple(op, p); + ::_decode_simple(seq, p); + ::_decode_simple(stamp, p); } void encode_payload() { - ::_encode(op, payload); - ::_encode(seq, payload); + ::_encode_simple(op, payload); + ::_encode_simple(seq, payload); + ::_encode_simple(stamp, payload); } }; diff --git a/src/start.sh b/src/start.sh index 9f2410c4a1008..6794fc8662f09 100755 --- a/src/start.sh +++ b/src/start.sh @@ -6,4 +6,4 @@ ./cosd --mkfs --osd 1 & ./cosd --mkfs --osd 2 & ./cosd --mkfs --osd 3 & -./cmds & +./cmds --debug_ms 1 --debug_mds 20 & -- 2.39.5