- make sure locker avoids frozen inodes
/- make sure predirty_nested stops if it can't wrlock versionlock (acquire_locks normally hides that detail for us)
- make sure stray inode is always opened on startup
-- make sure inode cache expire for frozen inode behaves
+/- make sure inode cache expire for frozen inode behaves
- look at the client_map session opening code.. versus rollback (of import, or slave request)
#include "events/EUpdate.h"
#include "events/ESlaveUpdate.h"
#include "events/EOpen.h"
+#include "events/ECommitted.h"
#include "events/EPurgeFinish.h"
case EVENT_UPDATE: le = new EUpdate; break;
case EVENT_SLAVEUPDATE: le = new ESlaveUpdate; break;
case EVENT_OPEN: le = new EOpen; break;
+ case EVENT_COMMITTED: le = new ECommitted; break;
case EVENT_PURGEFINISH: le = new EPurgeFinish; break;
#define EVENT_UPDATE 20
#define EVENT_SLAVEUPDATE 21
#define EVENT_OPEN 22
+#define EVENT_COMMITTED 23
#define EVENT_PURGEFINISH 30
#include "include/xlist.h"
#include "include/interval_set.h"
#include "include/Context.h"
+#include "mdstypes.h"
#include <ext/hash_set>
using __gnu_cxx::hash_set;
// committed anchor transactions
hash_set<version_t> pending_commit_atids;
+ set<metareqid_t> uncommitted_slaves;
// client request ids
map<int, tid_t> last_client_tids;
#include "events/EPurgeFinish.h"
#include "events/EImportFinish.h"
#include "events/EFragment.h"
+#include "events/ECommitted.h"
#include "messages/MGenericMessage.h"
+// ===================================
+// slave requests
+
+
+/*
+ * some handlers for master requests with slaves. we need to make
+ * sure slaves journal commits before we forget we mastered them.
+ */
+struct C_MDC_CommittedSlaves : public Context {
+ MDCache *cache;
+ metareqid_t reqid;
+ LogSegment *ls;
+ list<Context*> waiters;
+ C_MDC_CommittedSlaves(MDCache *s, metareqid_t r, LogSegment *l, list<Context*> &w) :
+ cache(s), reqid(r), ls(l) {
+ waiters.swap(w);
+ }
+ void finish(int r) {
+ cache->_logged_committed_slaves(reqid, ls, waiters);
+ }
+};
+
+void MDCache::log_all_uncommitted_slaves()
+{
+ while (!uncommitted_slaves.empty())
+ log_committed_slaves(uncommitted_slaves.begin()->first);
+}
+
+void MDCache::log_committed_slaves(metareqid_t reqid)
+{
+ dout(10) << "log_committed_slaves " << reqid << dendl;
+ mds->mdlog->submit_entry(new ECommitted(reqid),
+ new C_MDC_CommittedSlaves(this, reqid,
+ uncommitted_slaves[reqid].ls,
+ uncommitted_slaves[reqid].waiters));
+ mds->mdcache->uncommitted_slaves.erase(reqid);
+}
+
+void MDCache::_logged_committed_slaves(metareqid_t reqid, LogSegment *ls, list<Context*> &waiters)
+{
+ dout(10) << "_logged_committed_slaves " << reqid << dendl;
+ ls->uncommitted_slaves.erase(reqid);
+ mds->queue_waiters(waiters);
+}
+
+// while active...
+
+void MDCache::committed_slave(metareqid_t r, int from)
+{
+ dout(10) << "committed_slave mds" << from << " on " << r << dendl;
+ assert(uncommitted_slaves.count(r));
+ uncommitted_slaves[r].slaves.erase(from);
+ if (uncommitted_slaves[r].slaves.empty())
+ log_committed_slaves(r);
+}
+
+// at end of resolve...
+
+struct C_MDC_SlaveCommit : public Context {
+ MDCache *cache;
+ int from;
+ metareqid_t reqid;
+ C_MDC_SlaveCommit(MDCache *c, int f, metareqid_t r) : cache(c), from(f), reqid(r) {}
+ void finish(int r) {
+ cache->_logged_slave_commit(from, reqid);
+ }
+};
+
+void MDCache::_logged_slave_commit(int from, metareqid_t reqid)
+{
+ dout(10) << "_logged_slave_commit from mds" << from << " " << reqid << dendl;
+ delete uncommitted_slave_updates[from][reqid];
+ uncommitted_slave_updates[from].erase(reqid);
+ if (uncommitted_slave_updates[from].empty())
+ uncommitted_slave_updates.erase(from);
+
+ if (uncommitted_slave_updates.empty() && mds->is_resolve())
+ maybe_resolve_finish();
+}
+
+
+
for (list<metareqid_t>::iterator p = m->slave_requests.begin();
p != m->slave_requests.end();
++p) {
- if (mds->sessionmap.have_completed_request(*p)) {
+ if (uncommitted_slaves.count(*p)) { //mds->sessionmap.have_completed_request(*p)) {
// COMMIT
dout(10) << " ambiguous slave request " << *p << " will COMMIT" << dendl;
ack->add_commit(*p);
void MDCache::maybe_resolve_finish()
{
if (got_resolve != recovery_set) {
- dout(10) << "maybe_resolve_finish still waiting for more resolves, got (" << got_resolve
- << "), need (" << recovery_set << ")" << dendl;
+ dout(10) << "maybe_resolve_finish still waiting for more resolves, got ("
+ << got_resolve << "), need (" << recovery_set << ")" << dendl;
}
else if (!need_resolve_ack.empty()) {
- dout(10) << "maybe_resolve_finish still waiting for resolve_ack from (" << need_resolve_ack << ")" << dendl;
+ dout(10) << "maybe_resolve_finish still waiting for resolve_ack from ("
+ << need_resolve_ack << ")" << dendl;
}
else if (!need_resolve_rollback.empty()) {
- dout(10) << "maybe_resolve_finish still waiting for rollback to commit on (" << need_resolve_rollback << ")" << dendl;
- } else {
+ dout(10) << "maybe_resolve_finish still waiting for rollback to commit on ("
+ << need_resolve_rollback << ")" << dendl;
+ }
+ else if (!uncommitted_slave_updates.empty()) {
+ dout(10) << "maybe_resolve_finish still waiting for slave commits to commit" << dendl;
+ }
+ else {
dout(10) << "maybe_resolve_finish got all resolves+resolve_acks, done." << dendl;
disambiguate_imports();
if (mds->is_resolve()) {
}
}
+
void MDCache::handle_resolve_ack(MMDSResolveAck *ack)
{
dout(10) << "handle_resolve_ack " << *ack << " from " << ack->get_source() << dendl;
else
assert(0);
- delete uncommitted_slave_updates[from][*p];
- uncommitted_slave_updates[from].erase(*p);
+ mds->mdlog->wait_for_sync(new C_MDC_SlaveCommit(this, from, *p));
} else {
MDRequest *mdr = request_get(*p);
if (mdr->more()->slave_commit) {
}
+
+
// --------------------------------------------------------------------
// ANCHORS
void request_drop_locks(MDRequest *r);
void request_cleanup(MDRequest *r);
+ // slaves
+ void add_uncommitted_slaves(metareqid_t reqid, LogSegment *ls, set<int> &slaves) {
+ uncommitted_slaves[reqid].ls = ls;
+ uncommitted_slaves[reqid].slaves = slaves;
+ }
+ void wait_for_uncommitted_slaves(metareqid_t reqid, Context *c) {
+ uncommitted_slaves[reqid].waiters.push_back(c);
+ }
+ void log_all_uncommitted_slaves();
+ void log_committed_slaves(metareqid_t reqid);
+ void _logged_committed_slaves(metareqid_t reqid, LogSegment *ls, list<Context*> &waiters);
+ void committed_slave(metareqid_t r, int from);
+ void _logged_slave_commit(int from, metareqid_t reqid);
+
// inode purging
map<CInode*, map<off_t, off_t> > purging; // inode -> newsize -> oldsize
// from MMDSResolves
map<int, map<dirfrag_t, list<dirfrag_t> > > other_ambiguous_imports;
- map<int, map<metareqid_t, MDSlaveUpdate*> > uncommitted_slave_updates; // for replay.
- map<metareqid_t, bool> ambiguous_slave_updates; // for log trimming.
- map<metareqid_t, Context*> waiting_for_slave_update_commit;
+ map<int, map<metareqid_t, MDSlaveUpdate*> > uncommitted_slave_updates; // slave: for replay.
+
+ // track master requests whose slaves haven't acknowledged commit
+ struct uslave {
+ set<int> slaves;
+ LogSegment *ls;
+ list<Context*> waiters;
+ };
+ map<metareqid_t, uslave> uncommitted_slaves; // master: req -> slave set
+
+ //map<metareqid_t, bool> ambiguous_slave_updates; // for log trimming.
+ //map<metareqid_t, Context*> waiting_for_slave_update_commit;
friend class ESlaveUpdate;
+ friend class ECommitted;
set<int> wants_resolve; // nodes i need to send my resolve to
set<int> got_resolve; // nodes i got resolves from
#include "events/ESlaveUpdate.h"
#include "events/ESession.h"
#include "events/EOpen.h"
+#include "events/ECommitted.h"
#include "include/filepath.h"
#include "common/Timer.h"
}
break;
+ case MMDSSlaveRequest::OP_COMMITTED:
+ {
+ metareqid_t r = m->get_reqid();
+ mds->mdcache->committed_slave(r, from);
+ }
+ break;
+
default:
assert(0);
}
void Server::_link_remote(MDRequest *mdr, CDentry *dn, CInode *targeti)
{
dout(10) << "_link_remote " << *dn << " to " << *targeti << dendl;
-
+
// 1. send LinkPrepare to dest (journal nlink++ prepare)
int linkauth = targeti->authority().first;
if (mdr->more()->witnessed.count(linkauth) == 0) {
mdr->ls = mdlog->get_current_segment();
EUpdate *le = new EUpdate(mdlog, "link_remote");
le->metablob.add_client_req(mdr->reqid);
+ if (!mdr->more()->slaves.empty()) {
+ dout(20) << " noting uncommitted_slaves " << mdr->more()->slaves << dendl;
+
+ le->reqid = mdr->reqid;
+ le->had_slaves = true;
+
+ mds->mdcache->add_uncommitted_slaves(mdr->reqid, mdr->ls, mdr->more()->slaves);
+ }
+
mds->locker->predirty_nested(mdr, &le->metablob, targeti, dn->dir, PREDIRTY_DIR, 1);
le->metablob.add_remote_dentry(dn, true, targeti->ino(),
MODE_TO_DT(targeti->inode.mode)); // new remote
}
+struct C_MDS_CommittedSlave : public Context {
+ Server *server;
+ MDRequest *mdr;
+ C_MDS_CommittedSlave(Server *s, MDRequest *m) : server(s), mdr(m) {}
+ void finish(int r) {
+ server->_committed_slave(mdr);
+ }
+};
+
void Server::_commit_slave_link(MDRequest *mdr, int r, CInode *targeti)
{
dout(10) << "_commit_slave_link " << *mdr
// write a commit to the journal
ESlaveUpdate *le = new ESlaveUpdate(mdlog, "slave_link_commit", mdr->reqid, mdr->slave_to_mds,
ESlaveUpdate::OP_COMMIT, ESlaveUpdate::LINK);
- mdlog->submit_entry(le);
- mds->mdcache->request_finish(mdr);
+ mdlog->submit_entry(le, new C_MDS_CommittedSlave(this, mdr));
} else {
do_link_rollback(mdr->more()->rollback_bl, mdr->slave_to_mds, mdr);
}
}
+void Server::_committed_slave(MDRequest *mdr)
+{
+ dout(10) << "_committed_slave " << *mdr << dendl;
+ MMDSSlaveRequest *req = new MMDSSlaveRequest(mdr->reqid, MMDSSlaveRequest::OP_COMMITTED);
+ mds->send_message_mds(req, mdr->slave_to_mds);
+ mds->mdcache->request_finish(mdr);
+}
+
struct C_MDS_LoggedLinkRollback : public Context {
Server *server;
Mutation *mut;
mdr->ls = mdlog->get_current_segment();
EUpdate *le = new EUpdate(mdlog, "rename");
le->metablob.add_client_req(mdr->reqid);
+ if (!mdr->more()->slaves.empty()) {
+ dout(20) << " noting uncommitted_slaves " << mdr->more()->slaves << dendl;
+
+ le->reqid = mdr->reqid;
+ le->had_slaves = true;
+
+ mds->mdcache->add_uncommitted_slaves(mdr->reqid, mdr->ls, mdr->more()->slaves);
+ }
_rename_prepare(mdr, &le->metablob, &le->client_map, srcdn, destdn, straydn);
+
if (!srcdn->is_auth() && srcdn->is_primary()) {
// importing inode; also journal imported client map
if (srcdn->is_primary() &&
!srcdn->inode->is_freezing_inode() &&
!srcdn->inode->is_frozen_inode()) {
- // srci auth.
- // set ambiguous auth.
+ // set ambiguous auth for srci
+ /*
+ * NOTE: we don't worry about ambiguous cache expire as we do
+ * with subtree migrations because all slaves will pin
+ * srcdn->inode for duration of this rename.
+ */
srcdn->inode->state_set(CInode::STATE_AMBIGUOUSAUTH);
// freeze?
else {
assert(srcdn->is_remote());
rollback.orig_src.remote_ino = srcdn->get_remote_ino();
- rollback.orig_src.remote_ino = srcdn->get_remote_d_type();
+ rollback.orig_src.remote_d_type = srcdn->get_remote_d_type();
}
rollback.orig_dest.dirfrag = destdn->dir->dirfrag();
rollback.orig_dest.ino = destdn->inode->ino();
else if (destdn->is_remote()) {
rollback.orig_dest.remote_ino = destdn->get_remote_ino();
- rollback.orig_dest.remote_ino = destdn->get_remote_d_type();
+ rollback.orig_dest.remote_d_type = destdn->get_remote_d_type();
}
if (straydn) {
mdlog->submit_entry(le, new C_MDS_SlaveRenamePrep(this, mdr, srcdn, destdn, straydn));
} else {
// don't journal.
- dout(10) << "not journaling, i'm not auth for anything, and srci isn't open" << dendl;
+ dout(10) << "not journaling: i'm not auth for anything, and srci has no caps" << dendl;
// prepare anyway; this may twiddle dir_auth
EMetaBlob blob;
mds->queue_waiters(finished);
}
- mdlog->submit_entry(le);
- mds->mdcache->request_finish(mdr);
+ mdlog->submit_entry(le, new C_MDS_CommittedSlave(this, mdr));
} else {
if (srcdn->is_auth() && destdn->is_primary() &&
destdn->inode->state_test(CInode::STATE_AMBIGUOUSAUTH)) {
void handle_slave_link_prep(MDRequest *mdr);
void _logged_slave_link(MDRequest *mdr, CInode *targeti);
void _commit_slave_link(MDRequest *mdr, int r, CInode *targeti);
+ void _committed_slave(MDRequest *mdr); // use for rename, too
void handle_slave_link_prep_ack(MDRequest *mdr, MMDSSlaveRequest *m);
void do_link_rollback(bufferlist &rbl, int master, MDRequest *mdr);
void _link_rollback_finish(Mutation *mut, MDRequest *mdr);
EMetaBlob metablob;
string type;
bufferlist client_map;
+ metareqid_t reqid;
+ bool had_slaves;
EUpdate() : LogEvent(EVENT_UPDATE) { }
EUpdate(MDLog *mdlog, const char *s) :
LogEvent(EVENT_UPDATE), metablob(mdlog),
- type(s) { }
+ type(s), had_slaves(false) { }
void print(ostream& out) {
if (type.length())
::encode(type, bl);
::encode(metablob, bl);
::encode(client_map, bl);
+ ::encode(reqid, bl);
+ ::encode(had_slaves, bl);
}
void decode(bufferlist::iterator &bl) {
::decode(type, bl);
::decode(metablob, bl);
::decode(client_map, bl);
+ ::decode(reqid, bl);
+ ::decode(had_slaves, bl);
}
void update_segment();
#include "events/EUpdate.h"
#include "events/ESlaveUpdate.h"
#include "events/EOpen.h"
+#include "events/ECommitted.h"
#include "events/EPurgeFinish.h"
}
}
+ // master ops with possibly uncommitted slaves
+ for (set<metareqid_t>::iterator p = uncommitted_slaves.begin();
+ p != uncommitted_slaves.end();
+ p++) {
+ dout(10) << "try_to_expire waiting for slaves to ack commit on " << *p << dendl;
+ if (!gather) gather = new C_Gather;
+ mds->mdcache->wait_for_uncommitted_slaves(*p, gather->new_sub());
+ }
+
// dirty non-auth mtimes
for (xlist<CInode*>::iterator p = dirty_dirfrag_dir.begin(); !p.end(); ++p) {
CInode *in = *p;
void EUpdate::update_segment()
{
metablob.update_segment(_segment);
+
+ if (had_slaves)
+ _segment->uncommitted_slaves.insert(reqid);
}
void EUpdate::replay(MDS *mds)
{
metablob.replay(mds, _segment);
+
+ if (had_slaves) {
+ dout(10) << "EUpdate.replay " << reqid << " had slaves, expecting a matching ECommitted" << dendl;
+ _segment->uncommitted_slaves.insert(reqid);
+ }
}
}
+// -----------------------
+// ECommitted
+
+void ECommitted::replay(MDS *mds)
+{
+ if (mds->mdcache->uncommitted_slaves.count(reqid)) {
+ dout(10) << "ECommitted.replay " << reqid << dendl;
+ mds->mdcache->uncommitted_slaves[reqid].ls->uncommitted_slaves.erase(reqid);
+ mds->mdcache->uncommitted_slaves.erase(reqid);
+ } else {
+ dout(10) << "ECommitted.replay " << reqid << " -- didn't see original op" << dendl;
+ }
+}
+
+
// -----------------------
// ESlaveUpdate
static const int OP_RENAMEPREPACK = -7;
static const int OP_FINISH = 17;
+ static const int OP_COMMITTED = -18;
static const int OP_ABORT = 20; // used for recovery only
//static const int OP_COMMIT = 21; // used for recovery only
case OP_RENAMEPREPACK: return "rename_prep_ack";
case OP_FINISH: return "finish"; // commit
+ case OP_COMMITTED: return "committed";
+
case OP_ABORT: return "abort";
//case OP_COMMIT: return "commit";