class CInode;
class CDentry;
class MDS;
+class MDSlaveUpdate;
class LogSegment {
public:
xlist<CInode*> open_files;
xlist<CInode*> dirty_inode_mtimes;
+ xlist<MDSlaveUpdate*> slave_updates;
+
//xlist<CInode*> purging_inodes;
map<CInode*, map<off_t,off_t> > purging_inodes;
}
// [resolving]
if (uncommitted_slave_updates.count(who)) {
- for (map<metareqid_t, EMetaBlob>::iterator p = uncommitted_slave_updates[who].begin();
+ for (map<metareqid_t, MDSlaveUpdate>::iterator p = uncommitted_slave_updates[who].begin();
p != uncommitted_slave_updates[who].end();
++p) {
dout(10) << " including uncommitted " << p->first << dendl;
if (mds->is_resolve()) {
// replay
assert(uncommitted_slave_updates[from].count(*p));
- uncommitted_slave_updates[from][*p].replay(mds);
+ uncommitted_slave_updates[from][*p].commit.replay(mds);
uncommitted_slave_updates[from].erase(*p);
// log commit
mds->mdlog->submit_entry(new ESlaveUpdate(mds->mdlog, "unknown", *p, from, ESlaveUpdate::OP_COMMIT));
if (mds->is_resolve()) {
assert(uncommitted_slave_updates[from].count(*p));
+ uncommitted_slave_updates[from][*p].rollback.replay(mds);
uncommitted_slave_updates[from].erase(*p);
mds->mdlog->submit_entry(new ESlaveUpdate(mds->mdlog, "unknown", *p, from, ESlaveUpdate::OP_ROLLBACK));
} else {
return out;
}
+struct MDSlaveUpdate {
+ EMetaBlob commit;
+ EMetaBlob rollback;
+ xlist<MDSlaveUpdate*>::item xlistitem;
+ Context *waiter;
+ MDSlaveUpdate() : xlistitem(this), waiter(0) {}
+ MDSlaveUpdate(EMetaBlob c, EMetaBlob r, xlist<MDSlaveUpdate*> &list) :
+ commit(c), rollback(r),
+ xlistitem(this),
+ waiter(0) {
+ list.push_back(&xlistitem);
+ }
+ ~MDSlaveUpdate() {
+ if (waiter) waiter->finish(0);
+ delete waiter;
+ }
+};
+
+
class MDCache {
public:
// my master
// from MMDSResolves
map<int, map<dirfrag_t, list<dirfrag_t> > > other_ambiguous_imports;
- map<int, map<metareqid_t, EMetaBlob> > uncommitted_slave_updates; // for replay.
+ map<int, map<metareqid_t, MDSlaveUpdate> > uncommitted_slave_updates; // for replay.
map<metareqid_t, bool> ambiguous_slave_updates; // for log trimming.
map<metareqid_t, Context*> waiting_for_slave_update_commit;
friend class ESlaveUpdate;
}
}
- // update journaled target inode
+ // journal it
+ mdr->ls = mdlog->get_current_segment();
+ ESlaveUpdate *le = new ESlaveUpdate(mdlog, "slave_link_prep", mdr->reqid, mdr->slave_to_mds, ESlaveUpdate::OP_PREPARE);
+
inode_t *pi = dn->inode->project_inode();
+
+ // rollback case
+ le->rollback.add_dir_context(targeti->get_parent_dir());
+ le->rollback.add_primary_dentry(dn, true, targeti, pi); // update old primary
+
+ // update journaled target inode
bool inc;
if (mdr->slave_request->get_op() == MMDSSlaveRequest::OP_LINKPREP) {
inc = true;
dout(10) << " projected inode " << pi << " v " << pi->version << dendl;
- // journal it
- mdr->ls = mdlog->get_current_segment();
- ESlaveUpdate *le = new ESlaveUpdate(mdlog, "slave_link_prep", mdr->reqid, mdr->slave_to_mds, ESlaveUpdate::OP_PREPARE);
- le->metablob.add_dir_context(targeti->get_parent_dir());
- le->metablob.add_primary_dentry(dn, true, targeti, pi); // update old primary
+ // commit case
+ le->commit.add_dir_context(targeti->get_parent_dir());
+ le->commit.add_primary_dentry(dn, true, targeti, pi); // update old primary
+
mdlog->submit_entry(le, new C_MDS_SlaveLinkPrep(this, mdr, targeti, old_ctime, inc));
}
// journal.
mdr->ls = mdlog->get_current_segment();
ESlaveUpdate *le = new ESlaveUpdate(mdlog, "slave_rename_prep", mdr->reqid, mdr->slave_to_mds, ESlaveUpdate::OP_PREPARE);
- _rename_prepare(mdr, &le->metablob, srcdn, destdn, straydn);
+
+ // rollback case
+ if (destdn->inode && destdn->inode->is_auth()) {
+ assert(destdn->is_remote());
+ le->rollback.add_dir_context(destdn->dir);
+ le->rollback.add_dentry(destdn, true);
+ }
+ if (srcdn->is_auth() ||
+ (srcdn->inode && srcdn->inode->is_auth())) {
+ le->rollback.add_dir_context(srcdn->dir);
+ le->rollback.add_dentry(srcdn, true);
+ }
+
+ // commit case
+ _rename_prepare(mdr, &le->commit, srcdn, destdn, straydn);
+
mdlog->submit_entry(le, new C_MDS_SlaveRenamePrep(this, mdr, srcdn, destdn, straydn));
} else {
// don't journal.
const static int OP_COMMIT = 2;
const static int OP_ROLLBACK = 3;
- EMetaBlob metablob;
+ /*
+ * we journal a rollback metablob that contains the unmodified metadata
+ * too, because we may be updating previously dirty metadata, which
+ * will allow old log segments to be trimmed. if we end of rolling back,
+ * those updates could be lost.. so we re-journal the unmodified metadata,
+ * and replay will apply _either_ commit or rollback.
+ */
+ EMetaBlob commit, rollback;
string type;
metareqid_t reqid;
int master;
ESlaveUpdate() : LogEvent(EVENT_SLAVEUPDATE) { }
ESlaveUpdate(MDLog *mdlog, const char *s, metareqid_t ri, int mastermds, int o) :
- LogEvent(EVENT_SLAVEUPDATE), metablob(mdlog),
+ LogEvent(EVENT_SLAVEUPDATE), commit(mdlog), rollback(mdlog),
type(s),
reqid(ri),
master(mastermds),
out << " " << op;
out << " " << reqid;
out << " for mds" << master;
- out << metablob;
+ out << commit << " " << rollback;
}
void encode_payload(bufferlist& bl) {
::_encode(reqid, bl);
::_encode(master, bl);
::_encode(op, bl);
- metablob._encode(bl);
+ commit._encode(bl);
+ rollback._encode(bl);
}
void decode_payload(bufferlist& bl, int& off) {
::_decode(type, bl, off);
::_decode(reqid, bl, off);
::_decode(master, bl, off);
::_decode(op, bl, off);
- metablob._decode(bl, off);
+ commit._decode(bl, off);
+ rollback._decode(bl, off);
}
bool has_expired(MDS *mds);
}
// dirty non-auth mtimes
- if(0) //fuckfuck
- for (xlist<CInode*>::iterator p = dirty_inode_mtimes.begin(); !p.end(); ++p) {
+ for (xlist<CInode*>::iterator p = dirty_inode_mtimes.begin(); !p.end(); ++p) {
dout(10) << "try_to_expire waiting for dirlock mtime flush on " << **p << dendl;
if (!gather) gather = new C_Gather;
(*p)->dirlock.add_waiter(SimpleLock::WAIT_STABLE, gather->new_sub());
dout(10) << "try_to_expire waiting for open files to rejournal" << dendl;
}
+ // slave updates
+ for (xlist<MDSlaveUpdate*>::iterator p = slave_updates.begin(); !p.end(); ++p) {
+ MDSlaveUpdate *su = *p;
+ dout(10) << "try_to_expire waiting on slave update " << su << dendl;
+ assert(su->waiter == 0);
+ if (!gather) gather = new C_Gather;
+ su->waiter = gather->new_sub();
+ }
+
// idalloc
if (allocv > mds->idalloc->get_committed_version()) {
dout(10) << "try_to_expire saving idalloc table, need " << allocv
// -----------------------
// ESlaveUpdate
-bool ESlaveUpdate::has_expired(MDS *mds)
-{
- switch (op) {
- case ESlaveUpdate::OP_PREPARE:
- if (mds->mdcache->ambiguous_slave_updates.count(reqid) == 0) {
- dout(10) << "ESlaveUpdate.has_expired prepare " << reqid << " for mds" << master
- << ": haven't yet seen commit|rollback" << dendl;
- return false;
- }
- else if (mds->mdcache->ambiguous_slave_updates[reqid]) {
- dout(10) << "ESlaveUpdate.has_expired prepare " << reqid << " for mds" << master
- << ": committed, checking metablob" << dendl;
- bool exp = metablob.has_expired(mds);
- if (exp)
- mds->mdcache->ambiguous_slave_updates.erase(reqid);
- return exp;
- }
- else {
- dout(10) << "ESlaveUpdate.has_expired prepare " << reqid << " for mds" << master
- << ": aborted" << dendl;
- mds->mdcache->ambiguous_slave_updates.erase(reqid);
- return true;
- }
-
- case ESlaveUpdate::OP_COMMIT:
- case ESlaveUpdate::OP_ROLLBACK:
- if (mds->mdcache->waiting_for_slave_update_commit.count(reqid)) {
- dout(10) << "ESlaveUpdate.has_expired "
- << ((op == ESlaveUpdate::OP_COMMIT) ? "commit ":"rollback ")
- << reqid << " for mds" << master
- << ": noting commit, kicking prepare waiter" << dendl;
- mds->mdcache->ambiguous_slave_updates[reqid] = (op == ESlaveUpdate::OP_COMMIT);
- mds->mdcache->waiting_for_slave_update_commit[reqid]->finish(0);
- delete mds->mdcache->waiting_for_slave_update_commit[reqid];
- mds->mdcache->waiting_for_slave_update_commit.erase(reqid);
- } else {
- dout(10) << "ESlaveUpdate.has_expired "
- << ((op == ESlaveUpdate::OP_COMMIT) ? "commit ":"rollback ")
- << reqid << " for mds" << master
- << ": no prepare waiter, ignoring" << dendl;
- }
- return true;
-
- default:
- assert(0);
- return false;
- }
-}
-
-void ESlaveUpdate::expire(MDS *mds, Context *c)
-{
- assert(op == ESlaveUpdate::OP_PREPARE);
-
- if (mds->mdcache->ambiguous_slave_updates.count(reqid) == 0) {
- // wait
- dout(10) << "ESlaveUpdate.expire prepare " << reqid << " for mds" << master
- << ": waiting for commit|rollback" << dendl;
- mds->mdcache->waiting_for_slave_update_commit[reqid] = c;
- } else {
- // we committed.. expire the metablob
- assert(mds->mdcache->ambiguous_slave_updates[reqid] == true);
- dout(10) << "ESlaveUpdate.expire prepare " << reqid << " for mds" << master
- << ": waiting for metablob to expire" << dendl;
- //metablob.expire(mds, c);
- }
-}
-
void ESlaveUpdate::replay(MDS *mds)
{
switch (op) {
case ESlaveUpdate::OP_PREPARE:
// FIXME: horribly inefficient copy; EMetaBlob needs a swap() or something
dout(10) << "ESlaveUpdate.replay prepare " << reqid << " for mds" << master
- << ": saving blob for later commit" << dendl;
+ << ": saving blobs for later commit" << dendl;
assert(mds->mdcache->uncommitted_slave_updates[master].count(reqid) == 0);
- metablob._segment = _segment; // may need this later
- mds->mdcache->uncommitted_slave_updates[master][reqid] = metablob;
+ commit._segment = _segment; // may need this later
+ rollback._segment = _segment; // may need this later
+ mds->mdcache->uncommitted_slave_updates[master][reqid] =
+ MDSlaveUpdate(commit, rollback, _segment->slave_updates);
break;
case ESlaveUpdate::OP_COMMIT:
if (mds->mdcache->uncommitted_slave_updates[master].count(reqid)) {
dout(10) << "ESlaveUpdate.replay commit " << reqid << " for mds" << master
- << ": applying previously saved blob" << dendl;
- mds->mdcache->uncommitted_slave_updates[master][reqid].replay(mds, _segment);
+ << ": applying commit blob" << dendl;
+ mds->mdcache->uncommitted_slave_updates[master][reqid].commit.replay(mds, _segment);
mds->mdcache->uncommitted_slave_updates[master].erase(reqid);
} else {
dout(10) << "ESlaveUpdate.replay commit " << reqid << " for mds" << master
- << ": ignoring, no previously saved blob" << dendl;
+ << ": ignoring, no previously saved blobs" << dendl;
}
break;
case ESlaveUpdate::OP_ROLLBACK:
if (mds->mdcache->uncommitted_slave_updates[master].count(reqid)) {
dout(10) << "ESlaveUpdate.replay abort " << reqid << " for mds" << master
- << ": discarding previously saved blob" << dendl;
+ << ": applying rollback blob" << dendl;
assert(mds->mdcache->uncommitted_slave_updates[master].count(reqid));
+ mds->mdcache->uncommitted_slave_updates[master][reqid].rollback.replay(mds, _segment);
mds->mdcache->uncommitted_slave_updates[master].erase(reqid);
} else {
dout(10) << "ESlaveUpdate.replay abort " << reqid << " for mds" << master
- << ": ignoring, no previously saved blob" << dendl;
+ << ": ignoring, no previously saved blobs" << dendl;
}
break;