/*
* some handlers for master requests with slaves. we need to make
- * sure slaves journal commits before we forget we mastered them.
+ * sure slaves journal commits before we forget we mastered them and
+ * remove them from the uncommitted_masters map (used during recovery
+ * to commit|abort slaves).
*/
-struct C_MDC_CommittedSlaves : public Context {
+struct C_MDC_CommittedMaster : public Context {
MDCache *cache;
metareqid_t reqid;
LogSegment *ls;
list<Context*> waiters;
- C_MDC_CommittedSlaves(MDCache *s, metareqid_t r, LogSegment *l, list<Context*> &w) :
+ C_MDC_CommittedMaster(MDCache *s, metareqid_t r, LogSegment *l, list<Context*> &w) :
cache(s), reqid(r), ls(l) {
waiters.swap(w);
}
void finish(int r) {
- cache->_logged_committed_slaves(reqid, ls, waiters);
+ cache->_logged_master_commit(reqid, ls, waiters);
}
};
-void MDCache::log_all_uncommitted_slaves()
+void MDCache::log_master_commit(metareqid_t reqid)
{
- while (!uncommitted_slaves.empty())
- log_committed_slaves(uncommitted_slaves.begin()->first);
-}
-
-void MDCache::log_committed_slaves(metareqid_t reqid)
-{
- dout(10) << "log_committed_slaves " << reqid << dendl;
+ dout(10) << "log_master_commit " << reqid << dendl;
mds->mdlog->submit_entry(new ECommitted(reqid),
- new C_MDC_CommittedSlaves(this, reqid,
- uncommitted_slaves[reqid].ls,
- uncommitted_slaves[reqid].waiters));
- mds->mdcache->uncommitted_slaves.erase(reqid);
+ new C_MDC_CommittedMaster(this, reqid,
+ uncommitted_masters[reqid].ls,
+ uncommitted_masters[reqid].waiters));
+ mds->mdcache->uncommitted_masters.erase(reqid);
}
-void MDCache::_logged_committed_slaves(metareqid_t reqid, LogSegment *ls, list<Context*> &waiters)
+void MDCache::_logged_master_commit(metareqid_t reqid, LogSegment *ls, list<Context*> &waiters)
{
- dout(10) << "_logged_committed_slaves " << reqid << dendl;
- ls->uncommitted_slaves.erase(reqid);
+ dout(10) << "_logged_master_commit " << reqid << dendl;
+ ls->uncommitted_masters.erase(reqid);
mds->queue_waiters(waiters);
}
// while active...
-void MDCache::committed_slave(metareqid_t r, int from)
+void MDCache::committed_master_slave(metareqid_t r, int from)
{
- dout(10) << "committed_slave mds" << from << " on " << r << dendl;
- assert(uncommitted_slaves.count(r));
- uncommitted_slaves[r].slaves.erase(from);
- if (uncommitted_slaves[r].slaves.empty())
- log_committed_slaves(r);
+ dout(10) << "committed_master_slave mds" << from << " on " << r << dendl;
+ assert(uncommitted_masters.count(r));
+ uncommitted_masters[r].slaves.erase(from);
+ if (uncommitted_masters[r].slaves.empty())
+ log_master_commit(r);
}
-// at end of resolve...
+
+/*
+ * at end of resolve... we must journal a commit|abort for all slave
+ * updates, before moving on.
+ *
+ * this is so that the master can safely journal ECommitted on ops it
+ * masters when it reaches up:active (all other recovering nodes must
+ * complete resolve before that happens).
+ */
struct C_MDC_SlaveCommit : public Context {
MDCache *cache;
int from;
void MDCache::_logged_slave_commit(int from, metareqid_t reqid)
{
dout(10) << "_logged_slave_commit from mds" << from << " " << reqid << dendl;
- delete uncommitted_slave_updates[from][reqid];
- uncommitted_slave_updates[from].erase(reqid);
- if (uncommitted_slave_updates[from].empty())
- uncommitted_slave_updates.erase(from);
- if (uncommitted_slave_updates.empty() && mds->is_resolve())
- maybe_resolve_finish();
+ // send a message
+ MMDSSlaveRequest *req = new MMDSSlaveRequest(reqid, MMDSSlaveRequest::OP_COMMITTED);
+ mds->send_message_mds(req, from);
}
for (list<metareqid_t>::iterator p = m->slave_requests.begin();
p != m->slave_requests.end();
++p) {
- if (uncommitted_slaves.count(*p)) { //mds->sessionmap.have_completed_request(*p)) {
+ if (uncommitted_masters.count(*p)) { //mds->sessionmap.have_completed_request(*p)) {
// COMMIT
dout(10) << " ambiguous slave request " << *p << " will COMMIT" << dendl;
ack->add_commit(*p);
+ uncommitted_masters[*p].slaves.insert(from); // wait for slave OP_COMMITTED before we log ECommitted
} else {
// ABORT
dout(10) << " ambiguous slave request " << *p << " will ABORT" << dendl;
dout(10) << "maybe_resolve_finish still waiting for rollback to commit on ("
<< need_resolve_rollback << ")" << dendl;
}
- else if (!uncommitted_slave_updates.empty()) {
- dout(10) << "maybe_resolve_finish still waiting for slave commits to commit" << dendl;
- }
else {
dout(10) << "maybe_resolve_finish got all resolves+resolve_acks, done." << dendl;
disambiguate_imports();
// log commit
mds->mdlog->submit_entry(new ESlaveUpdate(mds->mdlog, "unknown", *p, from,
ESlaveUpdate::OP_COMMIT, uncommitted_slave_updates[from][*p]->origop));
+
delete uncommitted_slave_updates[from][*p];
uncommitted_slave_updates[from].erase(*p);
+ if (uncommitted_slave_updates[from].empty())
+ uncommitted_slave_updates.erase(from);
+
+ mds->mdlog->wait_for_sync(new C_MDC_SlaveCommit(this, from, *p));
} else {
MDRequest *mdr = request_get(*p);
assert(mdr->slave_request == 0); // shouldn't be doing anything!
else
assert(0);
- mds->mdlog->wait_for_sync(new C_MDC_SlaveCommit(this, from, *p));
+ delete uncommitted_slave_updates[from][*p];
+ uncommitted_slave_updates[from].erase(*p);
+ if (uncommitted_slave_updates[from].empty())
+ uncommitted_slave_updates.erase(from);
} else {
MDRequest *mdr = request_get(*p);
if (mdr->more()->slave_commit) {
case MMDSSlaveRequest::OP_COMMITTED:
{
metareqid_t r = m->get_reqid();
- mds->mdcache->committed_slave(r, from);
+ mds->mdcache->committed_master_slave(r, from);
}
break;
if (targeti->is_auth())
_link_local(mdr, dn, targeti);
else
- _link_remote(mdr, dn, targeti);
+ _link_remote(mdr, true, dn, targeti);
}
}
-// remote
+// link / unlink remote
class C_MDS_link_remote_finish : public Context {
MDS *mds;
MDRequest *mdr;
+ bool inc;
CDentry *dn;
CInode *targeti;
version_t dpv;
public:
- C_MDS_link_remote_finish(MDS *m, MDRequest *r, CDentry *d, CInode *ti) :
- mds(m), mdr(r), dn(d), targeti(ti),
+ C_MDS_link_remote_finish(MDS *m, MDRequest *r, bool i, CDentry *d, CInode *ti) :
+ mds(m), mdr(r), inc(i), dn(d), targeti(ti),
dpv(d->get_projected_version()) {}
void finish(int r) {
assert(r == 0);
- mds->server->_link_remote_finish(mdr, dn, targeti, dpv);
+ mds->server->_link_remote_finish(mdr, inc, dn, targeti, dpv);
}
};
-void Server::_link_remote(MDRequest *mdr, CDentry *dn, CInode *targeti)
+void Server::_link_remote(MDRequest *mdr, bool inc, CDentry *dn, CInode *targeti)
{
- dout(10) << "_link_remote " << *dn << " to " << *targeti << dendl;
+ dout(10) << "_link_remote "
+ << (inc ? "link ":"unlink ")
+ << *dn << " to " << *targeti << dendl;
// 1. send LinkPrepare to dest (journal nlink++ prepare)
int linkauth = targeti->authority().first;
if (mdr->more()->witnessed.count(linkauth) == 0) {
- dout(10) << " targeti auth must prepare nlink++" << dendl;
+ dout(10) << " targeti auth must prepare nlink++/--" << dendl;
- MMDSSlaveRequest *req = new MMDSSlaveRequest(mdr->reqid, MMDSSlaveRequest::OP_LINKPREP);
+ int op;
+ if (inc)
+ op = MMDSSlaveRequest::OP_LINKPREP;
+ else
+ op = MMDSSlaveRequest::OP_UNLINKPREP;
+ MMDSSlaveRequest *req = new MMDSSlaveRequest(mdr->reqid, op);
targeti->set_object_info(req->get_object_info());
req->now = mdr->now;
mds->send_message_mds(req, linkauth);
mdr->more()->waiting_on_slave.insert(linkauth);
return;
}
- dout(10) << " targeti auth has prepared nlink++" << dendl;
+ dout(10) << " targeti auth has prepared nlink++/--" << dendl;
//assert(0); // test hack: verify that remote slave can do a live rollback.
- // go.
- // predirty dentry
- dn->pre_dirty();
-
// add to event
mdr->ls = mdlog->get_current_segment();
- EUpdate *le = new EUpdate(mdlog, "link_remote");
+ EUpdate *le = new EUpdate(mdlog, inc ? "link_remote":"unlink_remote");
le->metablob.add_client_req(mdr->reqid);
if (!mdr->more()->slaves.empty()) {
dout(20) << " noting uncommitted_slaves " << mdr->more()->slaves << dendl;
-
le->reqid = mdr->reqid;
le->had_slaves = true;
-
- mds->mdcache->add_uncommitted_slaves(mdr->reqid, mdr->ls, mdr->more()->slaves);
+ mds->mdcache->add_uncommitted_master(mdr->reqid, mdr->ls, mdr->more()->slaves);
}
- mds->locker->predirty_nested(mdr, &le->metablob, targeti, dn->dir, PREDIRTY_DIR, 1);
- le->metablob.add_remote_dentry(dn, true, targeti->ino(),
- MODE_TO_DT(targeti->inode.mode)); // new remote
+ if (inc) {
+ dn->pre_dirty();
+ mds->locker->predirty_nested(mdr, &le->metablob, targeti, dn->dir, PREDIRTY_DIR, 1);
+ le->metablob.add_remote_dentry(dn, true, targeti->ino(),
+ MODE_TO_DT(targeti->inode.mode)); // new remote
+ } else {
+ dn->pre_dirty();
+ mds->locker->predirty_nested(mdr, &le->metablob, targeti, dn->dir, PREDIRTY_DIR, -1);
+ le->metablob.add_null_dentry(dn, true);
+ }
+
+ if (mdr->more()->dst_reanchor_atid)
+ le->metablob.add_anchor_transaction(mdr->more()->dst_reanchor_atid);
// mark committing (needed for proper recovery)
mdr->committing = true;
// log + wait
- mdlog->submit_entry(le, new C_MDS_link_remote_finish(mds, mdr, dn, targeti));
+ mdlog->submit_entry(le, new C_MDS_link_remote_finish(mds, mdr, inc, dn, targeti));
}
-void Server::_link_remote_finish(MDRequest *mdr, CDentry *dn, CInode *targeti,
+void Server::_link_remote_finish(MDRequest *mdr, bool inc,
+ CDentry *dn, CInode *targeti,
version_t dpv)
{
- dout(10) << "_link_remote_finish " << *dn << " to " << *targeti << dendl;
-
- // link the new dentry
- dn->dir->link_remote_inode(dn, targeti);
- dn->mark_dirty(dpv, mdr->ls);
+ dout(10) << "_link_remote_finish "
+ << (inc ? "link ":"unlink ")
+ << *dn << " to " << *targeti << dendl;
+
+ if (inc) {
+ // link the new dentry
+ dn->dir->link_remote_inode(dn, targeti);
+ dn->mark_dirty(dpv, mdr->ls);
+ } else {
+ // unlink main dentry
+ dn->dir->unlink_inode(dn);
+ dn->mark_dirty(dn->get_projected_version(), mdr->ls); // dirty old dentry
+
+ // share unlink news with replicas
+ for (map<int,int>::iterator it = dn->replicas_begin();
+ it != dn->replicas_end();
+ it++) {
+ dout(7) << "_unlink_remote_finish sending MDentryUnlink to mds" << it->first << dendl;
+ MDentryUnlink *unlink = new MDentryUnlink(dn->dir->dirfrag(), dn->name);
+ mds->send_message_mds(unlink, it->first);
+ }
+ }
mdr->apply();
+ // commit anchor update?
+ if (mdr->more()->dst_reanchor_atid)
+ mds->anchorclient->commit(mdr->more()->dst_reanchor_atid, mdr->ls);
+
// bump target popularity
mds->balancer->hit_inode(mdr->now, targeti, META_POP_IWR);
//mds->balancer->hit_dir(mdr->now, dn->get_dir(), META_POP_DWR);
// reply
MClientReply *reply = new MClientReply(mdr->client_request, 0);
reply_request(mdr, reply, targeti, dn); // FIXME: imprecise ref
+
+ if (!inc)
+ // removing a new dn?
+ dn->dir->try_remove_unlinked_dn(dn);
}
// ok!
if (dn->is_remote() && !dn->inode->is_auth())
- _unlink_remote(mdr, dn);
+ _link_remote(mdr, false, dn, dn->inode);
else
_unlink_local(mdr, dn, straydn);
}
}
-class C_MDS_unlink_remote_finish : public Context {
- MDS *mds;
- MDRequest *mdr;
- CDentry *dn;
- version_t dnpv; // deleted dentry
-public:
- C_MDS_unlink_remote_finish(MDS *m, MDRequest *r, CDentry *d) :
- mds(m), mdr(r), dn(d),
- dnpv(d->get_projected_version()) { }
- void finish(int r) {
- assert(r == 0);
- mds->server->_unlink_remote_finish(mdr, dn, dnpv);
- }
-};
-
-void Server::_unlink_remote(MDRequest *mdr, CDentry *dn)
-{
- dout(10) << "_unlink_remote " << *dn << " " << *dn->inode << dendl;
-
- // 1. send LinkPrepare to dest (journal nlink-- prepare)
- int inauth = dn->inode->authority().first;
- if (mdr->more()->witnessed.count(inauth) == 0) {
- dout(10) << " inode auth must prepare nlink--" << dendl;
-
- MMDSSlaveRequest *req = new MMDSSlaveRequest(mdr->reqid, MMDSSlaveRequest::OP_UNLINKPREP);
- dn->inode->set_object_info(req->get_object_info());
- req->now = mdr->now;
- mds->send_message_mds(req, inauth);
-
- assert(mdr->more()->waiting_on_slave.count(inauth) == 0);
- mdr->more()->waiting_on_slave.insert(inauth);
- return;
- }
- dout(10) << " inode auth has prepared nlink--" << dendl;
-
- // ok, let's do it.
- // prepare log entry
- mdr->ls = mdlog->get_current_segment();
- EUpdate *le = new EUpdate(mdlog, "unlink_remote");
- le->metablob.add_client_req(mdr->reqid);
-
- // the unlinked dentry
- mds->locker->predirty_nested(mdr, &le->metablob, dn->inode, dn->dir, PREDIRTY_DIR, -1);
- dn->pre_dirty();
- le->metablob.add_null_dentry(dn, true);
-
- if (mdr->more()->dst_reanchor_atid)
- le->metablob.add_anchor_transaction(mdr->more()->dst_reanchor_atid);
-
- // finisher
- C_MDS_unlink_remote_finish *fin = new C_MDS_unlink_remote_finish(mds, mdr, dn);
-
- // mark committing (needed for proper recovery)
- mdr->committing = true;
-
- // log + wait
- mdlog->submit_entry(le, fin);
-}
-
-void Server::_unlink_remote_finish(MDRequest *mdr,
- CDentry *dn,
- version_t dnpv)
-{
- dout(10) << "_unlink_remote_finish " << *dn << dendl;
-
- // unlink main dentry
- dn->dir->unlink_inode(dn);
-
- mdr->apply();
- dn->mark_dirty(dnpv, mdr->ls); // dirty old dentry
-
- // share unlink news with replicas
- for (map<int,int>::iterator it = dn->replicas_begin();
- it != dn->replicas_end();
- it++) {
- dout(7) << "_unlink_remote_finish sending MDentryUnlink to mds" << it->first << dendl;
- MDentryUnlink *unlink = new MDentryUnlink(dn->dir->dirfrag(), dn->name);
- mds->send_message_mds(unlink, it->first);
- }
-
- // commit anchor update?
- if (mdr->more()->dst_reanchor_atid)
- mds->anchorclient->commit(mdr->more()->dst_reanchor_atid, mdr->ls);
-
- //mds->balancer->hit_dir(mdr->now, dn->dir, META_POP_DWR);
-
- // reply
- MClientReply *reply = new MClientReply(mdr->client_request, 0);
- reply_request(mdr, reply, 0, dn); // FIXME: imprecise ref
-
- // removing a new dn?
- dn->dir->try_remove_unlinked_dn(dn);
-}
-
-
-
/** _dir_is_not_empty
*
}
// test hack: bail after slave does prepare, so we can verify it's _live_ rollback.
- if (!mdr->more()->slaves.empty() && !srci->is_dir()) assert(0);
+ //if (!mdr->more()->slaves.empty() && !srci->is_dir()) assert(0);
//if (!mdr->more()->slaves.empty() && srci->is_dir()) assert(0);
// -- prepare anchor updates --
le->reqid = mdr->reqid;
le->had_slaves = true;
- mds->mdcache->add_uncommitted_slaves(mdr->reqid, mdr->ls, mdr->more()->slaves);
+ mds->mdcache->add_uncommitted_master(mdr->reqid, mdr->ls, mdr->more()->slaves);
}
_rename_prepare(mdr, &le->metablob, &le->client_map, srcdn, destdn, straydn);
{
dout(10) << "_rename_finish " << *mdr << dendl;
+ // test hack: test slave commit
+ if (!mdr->more()->slaves.empty() && !destdn->inode->is_dir()) assert(0);
+ //if (!mdr->more()->slaves.empty() && destdn->inode->is_dir()) assert(0);
+
// apply
_rename_apply(mdr, srcdn, destdn, straydn);
}
// master ops with possibly uncommitted slaves
- for (set<metareqid_t>::iterator p = uncommitted_slaves.begin();
- p != uncommitted_slaves.end();
+ for (set<metareqid_t>::iterator p = uncommitted_masters.begin();
+ p != uncommitted_masters.end();
p++) {
dout(10) << "try_to_expire waiting for slaves to ack commit on " << *p << dendl;
if (!gather) gather = new C_Gather;
- mds->mdcache->wait_for_uncommitted_slaves(*p, gather->new_sub());
+ mds->mdcache->wait_for_uncommitted_master(*p, gather->new_sub());
}
// dirty non-auth mtimes
metablob.update_segment(_segment);
if (had_slaves)
- _segment->uncommitted_slaves.insert(reqid);
+ _segment->uncommitted_masters.insert(reqid);
}
void EUpdate::replay(MDS *mds)
if (had_slaves) {
dout(10) << "EUpdate.replay " << reqid << " had slaves, expecting a matching ECommitted" << dendl;
- _segment->uncommitted_slaves.insert(reqid);
+ _segment->uncommitted_masters.insert(reqid);
+ set<int> slaves;
+ mds->mdcache->add_uncommitted_master(reqid, _segment, slaves);
}
}
void ECommitted::replay(MDS *mds)
{
- if (mds->mdcache->uncommitted_slaves.count(reqid)) {
+ if (mds->mdcache->uncommitted_masters.count(reqid)) {
dout(10) << "ECommitted.replay " << reqid << dendl;
- mds->mdcache->uncommitted_slaves[reqid].ls->uncommitted_slaves.erase(reqid);
- mds->mdcache->uncommitted_slaves.erase(reqid);
+ mds->mdcache->uncommitted_masters[reqid].ls->uncommitted_masters.erase(reqid);
+ mds->mdcache->uncommitted_masters.erase(reqid);
} else {
dout(10) << "ECommitted.replay " << reqid << " -- didn't see original op" << dendl;
}
dout(10) << "ESlaveUpdate.replay commit " << reqid << " for mds" << master << dendl;
delete mds->mdcache->uncommitted_slave_updates[master][reqid];
mds->mdcache->uncommitted_slave_updates[master].erase(reqid);
+ if (mds->mdcache->uncommitted_slave_updates[master].empty())
+ mds->mdcache->uncommitted_slave_updates.erase(master);
} else {
dout(10) << "ESlaveUpdate.replay commit " << reqid << " for mds" << master
<< ": ignoring, no previously saved prepare" << dendl;
commit.replay(mds, _segment);
delete mds->mdcache->uncommitted_slave_updates[master][reqid];
mds->mdcache->uncommitted_slave_updates[master].erase(reqid);
+ if (mds->mdcache->uncommitted_slave_updates[master].empty())
+ mds->mdcache->uncommitted_slave_updates.erase(master);
} else {
dout(10) << "ESlaveUpdate.replay abort " << reqid << " for mds" << master
<< ": ignoring, no previously saved prepare" << dendl;