From 8d2795cef58e38cf0ec098bbf6a0fcb7ce6a7e2c Mon Sep 17 00:00:00 2001 From: sageweil Date: Wed, 10 Oct 2007 21:23:21 +0000 Subject: [PATCH] fixed slaveupdate trimming; journal rollback metablob for safety git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@1917 29311d96-e01e-0410-9327-a35deaab8ce9 --- branches/sage/mds/mds/LogSegment.h | 3 + branches/sage/mds/mds/MDCache.cc | 5 +- branches/sage/mds/mds/MDCache.h | 21 ++++- branches/sage/mds/mds/Server.cc | 37 ++++++-- branches/sage/mds/mds/events/ESlaveUpdate.h | 19 ++-- branches/sage/mds/mds/journal.cc | 98 +++++---------------- 6 files changed, 91 insertions(+), 92 deletions(-) diff --git a/branches/sage/mds/mds/LogSegment.h b/branches/sage/mds/mds/LogSegment.h index 7579add569d50..e73f5f8b61b9c 100644 --- a/branches/sage/mds/mds/LogSegment.h +++ b/branches/sage/mds/mds/LogSegment.h @@ -26,6 +26,7 @@ class CDir; class CInode; class CDentry; class MDS; +class MDSlaveUpdate; class LogSegment { public: @@ -40,6 +41,8 @@ class LogSegment { xlist open_files; xlist dirty_inode_mtimes; + xlist slave_updates; + //xlist purging_inodes; map > purging_inodes; diff --git a/branches/sage/mds/mds/MDCache.cc b/branches/sage/mds/mds/MDCache.cc index 9dbbbb0a01135..38c95be1e18c2 100644 --- a/branches/sage/mds/mds/MDCache.cc +++ b/branches/sage/mds/mds/MDCache.cc @@ -1115,7 +1115,7 @@ void MDCache::send_resolve_now(int who) } // [resolving] if (uncommitted_slave_updates.count(who)) { - for (map::iterator p = uncommitted_slave_updates[who].begin(); + for (map::iterator p = uncommitted_slave_updates[who].begin(); p != uncommitted_slave_updates[who].end(); ++p) { dout(10) << " including uncommitted " << p->first << dendl; @@ -1415,7 +1415,7 @@ void MDCache::handle_resolve_ack(MMDSResolveAck *ack) if (mds->is_resolve()) { // replay assert(uncommitted_slave_updates[from].count(*p)); - uncommitted_slave_updates[from][*p].replay(mds); + uncommitted_slave_updates[from][*p].commit.replay(mds); uncommitted_slave_updates[from].erase(*p); // log commit mds->mdlog->submit_entry(new ESlaveUpdate(mds->mdlog, "unknown", *p, from, ESlaveUpdate::OP_COMMIT)); @@ -1433,6 +1433,7 @@ void MDCache::handle_resolve_ack(MMDSResolveAck *ack) if (mds->is_resolve()) { assert(uncommitted_slave_updates[from].count(*p)); + uncommitted_slave_updates[from][*p].rollback.replay(mds); uncommitted_slave_updates[from].erase(*p); mds->mdlog->submit_entry(new ESlaveUpdate(mds->mdlog, "unknown", *p, from, ESlaveUpdate::OP_ROLLBACK)); } else { diff --git a/branches/sage/mds/mds/MDCache.h b/branches/sage/mds/mds/MDCache.h index a56dcbc9d55ce..f85e8fdd6153d 100644 --- a/branches/sage/mds/mds/MDCache.h +++ b/branches/sage/mds/mds/MDCache.h @@ -216,6 +216,25 @@ inline ostream& operator<<(ostream& out, MDRequest &mdr) return out; } +struct MDSlaveUpdate { + EMetaBlob commit; + EMetaBlob rollback; + xlist::item xlistitem; + Context *waiter; + MDSlaveUpdate() : xlistitem(this), waiter(0) {} + MDSlaveUpdate(EMetaBlob c, EMetaBlob r, xlist &list) : + commit(c), rollback(r), + xlistitem(this), + waiter(0) { + list.push_back(&xlistitem); + } + ~MDSlaveUpdate() { + if (waiter) waiter->finish(0); + delete waiter; + } +}; + + class MDCache { public: // my master @@ -349,7 +368,7 @@ protected: // from MMDSResolves map > > other_ambiguous_imports; - map > uncommitted_slave_updates; // for replay. + map > uncommitted_slave_updates; // for replay. map ambiguous_slave_updates; // for log trimming. map waiting_for_slave_update_commit; friend class ESlaveUpdate; diff --git a/branches/sage/mds/mds/Server.cc b/branches/sage/mds/mds/Server.cc index 7f538fc29bded..2d855898eaf95 100644 --- a/branches/sage/mds/mds/Server.cc +++ b/branches/sage/mds/mds/Server.cc @@ -2088,8 +2088,17 @@ void Server::handle_slave_link_prep(MDRequest *mdr) } } - // update journaled target inode + // journal it + mdr->ls = mdlog->get_current_segment(); + ESlaveUpdate *le = new ESlaveUpdate(mdlog, "slave_link_prep", mdr->reqid, mdr->slave_to_mds, ESlaveUpdate::OP_PREPARE); + inode_t *pi = dn->inode->project_inode(); + + // rollback case + le->rollback.add_dir_context(targeti->get_parent_dir()); + le->rollback.add_primary_dentry(dn, true, targeti, pi); // update old primary + + // update journaled target inode bool inc; if (mdr->slave_request->get_op() == MMDSSlaveRequest::OP_LINKPREP) { inc = true; @@ -2104,11 +2113,10 @@ void Server::handle_slave_link_prep(MDRequest *mdr) dout(10) << " projected inode " << pi << " v " << pi->version << dendl; - // journal it - mdr->ls = mdlog->get_current_segment(); - ESlaveUpdate *le = new ESlaveUpdate(mdlog, "slave_link_prep", mdr->reqid, mdr->slave_to_mds, ESlaveUpdate::OP_PREPARE); - le->metablob.add_dir_context(targeti->get_parent_dir()); - le->metablob.add_primary_dentry(dn, true, targeti, pi); // update old primary + // commit case + le->commit.add_dir_context(targeti->get_parent_dir()); + le->commit.add_primary_dentry(dn, true, targeti, pi); // update old primary + mdlog->submit_entry(le, new C_MDS_SlaveLinkPrep(this, mdr, targeti, old_ctime, inc)); } @@ -3339,7 +3347,22 @@ void Server::handle_slave_rename_prep(MDRequest *mdr) // journal. mdr->ls = mdlog->get_current_segment(); ESlaveUpdate *le = new ESlaveUpdate(mdlog, "slave_rename_prep", mdr->reqid, mdr->slave_to_mds, ESlaveUpdate::OP_PREPARE); - _rename_prepare(mdr, &le->metablob, srcdn, destdn, straydn); + + // rollback case + if (destdn->inode && destdn->inode->is_auth()) { + assert(destdn->is_remote()); + le->rollback.add_dir_context(destdn->dir); + le->rollback.add_dentry(destdn, true); + } + if (srcdn->is_auth() || + (srcdn->inode && srcdn->inode->is_auth())) { + le->rollback.add_dir_context(srcdn->dir); + le->rollback.add_dentry(srcdn, true); + } + + // commit case + _rename_prepare(mdr, &le->commit, srcdn, destdn, straydn); + mdlog->submit_entry(le, new C_MDS_SlaveRenamePrep(this, mdr, srcdn, destdn, straydn)); } else { // don't journal. diff --git a/branches/sage/mds/mds/events/ESlaveUpdate.h b/branches/sage/mds/mds/events/ESlaveUpdate.h index 23d280a7b831c..54eaef9c6a296 100644 --- a/branches/sage/mds/mds/events/ESlaveUpdate.h +++ b/branches/sage/mds/mds/events/ESlaveUpdate.h @@ -24,7 +24,14 @@ public: const static int OP_COMMIT = 2; const static int OP_ROLLBACK = 3; - EMetaBlob metablob; + /* + * we journal a rollback metablob that contains the unmodified metadata + * too, because we may be updating previously dirty metadata, which + * will allow old log segments to be trimmed. if we end of rolling back, + * those updates could be lost.. so we re-journal the unmodified metadata, + * and replay will apply _either_ commit or rollback. + */ + EMetaBlob commit, rollback; string type; metareqid_t reqid; int master; @@ -32,7 +39,7 @@ public: ESlaveUpdate() : LogEvent(EVENT_SLAVEUPDATE) { } ESlaveUpdate(MDLog *mdlog, const char *s, metareqid_t ri, int mastermds, int o) : - LogEvent(EVENT_SLAVEUPDATE), metablob(mdlog), + LogEvent(EVENT_SLAVEUPDATE), commit(mdlog), rollback(mdlog), type(s), reqid(ri), master(mastermds), @@ -44,7 +51,7 @@ public: out << " " << op; out << " " << reqid; out << " for mds" << master; - out << metablob; + out << commit << " " << rollback; } void encode_payload(bufferlist& bl) { @@ -52,14 +59,16 @@ public: ::_encode(reqid, bl); ::_encode(master, bl); ::_encode(op, bl); - metablob._encode(bl); + commit._encode(bl); + rollback._encode(bl); } void decode_payload(bufferlist& bl, int& off) { ::_decode(type, bl, off); ::_decode(reqid, bl, off); ::_decode(master, bl, off); ::_decode(op, bl, off); - metablob._decode(bl, off); + commit._decode(bl, off); + rollback._decode(bl, off); } bool has_expired(MDS *mds); diff --git a/branches/sage/mds/mds/journal.cc b/branches/sage/mds/mds/journal.cc index eacf7f2900579..b78f516546aec 100644 --- a/branches/sage/mds/mds/journal.cc +++ b/branches/sage/mds/mds/journal.cc @@ -97,8 +97,7 @@ C_Gather *LogSegment::try_to_expire(MDS *mds) } // dirty non-auth mtimes - if(0) //fuckfuck - for (xlist::iterator p = dirty_inode_mtimes.begin(); !p.end(); ++p) { + for (xlist::iterator p = dirty_inode_mtimes.begin(); !p.end(); ++p) { dout(10) << "try_to_expire waiting for dirlock mtime flush on " << **p << dendl; if (!gather) gather = new C_Gather; (*p)->dirlock.add_waiter(SimpleLock::WAIT_STABLE, gather->new_sub()); @@ -117,6 +116,15 @@ C_Gather *LogSegment::try_to_expire(MDS *mds) dout(10) << "try_to_expire waiting for open files to rejournal" << dendl; } + // slave updates + for (xlist::iterator p = slave_updates.begin(); !p.end(); ++p) { + MDSlaveUpdate *su = *p; + dout(10) << "try_to_expire waiting on slave update " << su << dendl; + assert(su->waiter == 0); + if (!gather) gather = new C_Gather; + su->waiter = gather->new_sub(); + } + // idalloc if (allocv > mds->idalloc->get_committed_version()) { dout(10) << "try_to_expire saving idalloc table, need " << allocv @@ -842,106 +850,42 @@ void EOpen::replay(MDS *mds) // ----------------------- // ESlaveUpdate -bool ESlaveUpdate::has_expired(MDS *mds) -{ - switch (op) { - case ESlaveUpdate::OP_PREPARE: - if (mds->mdcache->ambiguous_slave_updates.count(reqid) == 0) { - dout(10) << "ESlaveUpdate.has_expired prepare " << reqid << " for mds" << master - << ": haven't yet seen commit|rollback" << dendl; - return false; - } - else if (mds->mdcache->ambiguous_slave_updates[reqid]) { - dout(10) << "ESlaveUpdate.has_expired prepare " << reqid << " for mds" << master - << ": committed, checking metablob" << dendl; - bool exp = metablob.has_expired(mds); - if (exp) - mds->mdcache->ambiguous_slave_updates.erase(reqid); - return exp; - } - else { - dout(10) << "ESlaveUpdate.has_expired prepare " << reqid << " for mds" << master - << ": aborted" << dendl; - mds->mdcache->ambiguous_slave_updates.erase(reqid); - return true; - } - - case ESlaveUpdate::OP_COMMIT: - case ESlaveUpdate::OP_ROLLBACK: - if (mds->mdcache->waiting_for_slave_update_commit.count(reqid)) { - dout(10) << "ESlaveUpdate.has_expired " - << ((op == ESlaveUpdate::OP_COMMIT) ? "commit ":"rollback ") - << reqid << " for mds" << master - << ": noting commit, kicking prepare waiter" << dendl; - mds->mdcache->ambiguous_slave_updates[reqid] = (op == ESlaveUpdate::OP_COMMIT); - mds->mdcache->waiting_for_slave_update_commit[reqid]->finish(0); - delete mds->mdcache->waiting_for_slave_update_commit[reqid]; - mds->mdcache->waiting_for_slave_update_commit.erase(reqid); - } else { - dout(10) << "ESlaveUpdate.has_expired " - << ((op == ESlaveUpdate::OP_COMMIT) ? "commit ":"rollback ") - << reqid << " for mds" << master - << ": no prepare waiter, ignoring" << dendl; - } - return true; - - default: - assert(0); - return false; - } -} - -void ESlaveUpdate::expire(MDS *mds, Context *c) -{ - assert(op == ESlaveUpdate::OP_PREPARE); - - if (mds->mdcache->ambiguous_slave_updates.count(reqid) == 0) { - // wait - dout(10) << "ESlaveUpdate.expire prepare " << reqid << " for mds" << master - << ": waiting for commit|rollback" << dendl; - mds->mdcache->waiting_for_slave_update_commit[reqid] = c; - } else { - // we committed.. expire the metablob - assert(mds->mdcache->ambiguous_slave_updates[reqid] == true); - dout(10) << "ESlaveUpdate.expire prepare " << reqid << " for mds" << master - << ": waiting for metablob to expire" << dendl; - //metablob.expire(mds, c); - } -} - void ESlaveUpdate::replay(MDS *mds) { switch (op) { case ESlaveUpdate::OP_PREPARE: // FIXME: horribly inefficient copy; EMetaBlob needs a swap() or something dout(10) << "ESlaveUpdate.replay prepare " << reqid << " for mds" << master - << ": saving blob for later commit" << dendl; + << ": saving blobs for later commit" << dendl; assert(mds->mdcache->uncommitted_slave_updates[master].count(reqid) == 0); - metablob._segment = _segment; // may need this later - mds->mdcache->uncommitted_slave_updates[master][reqid] = metablob; + commit._segment = _segment; // may need this later + rollback._segment = _segment; // may need this later + mds->mdcache->uncommitted_slave_updates[master][reqid] = + MDSlaveUpdate(commit, rollback, _segment->slave_updates); break; case ESlaveUpdate::OP_COMMIT: if (mds->mdcache->uncommitted_slave_updates[master].count(reqid)) { dout(10) << "ESlaveUpdate.replay commit " << reqid << " for mds" << master - << ": applying previously saved blob" << dendl; - mds->mdcache->uncommitted_slave_updates[master][reqid].replay(mds, _segment); + << ": applying commit blob" << dendl; + mds->mdcache->uncommitted_slave_updates[master][reqid].commit.replay(mds, _segment); mds->mdcache->uncommitted_slave_updates[master].erase(reqid); } else { dout(10) << "ESlaveUpdate.replay commit " << reqid << " for mds" << master - << ": ignoring, no previously saved blob" << dendl; + << ": ignoring, no previously saved blobs" << dendl; } break; case ESlaveUpdate::OP_ROLLBACK: if (mds->mdcache->uncommitted_slave_updates[master].count(reqid)) { dout(10) << "ESlaveUpdate.replay abort " << reqid << " for mds" << master - << ": discarding previously saved blob" << dendl; + << ": applying rollback blob" << dendl; assert(mds->mdcache->uncommitted_slave_updates[master].count(reqid)); + mds->mdcache->uncommitted_slave_updates[master][reqid].rollback.replay(mds, _segment); mds->mdcache->uncommitted_slave_updates[master].erase(reqid); } else { dout(10) << "ESlaveUpdate.replay abort " << reqid << " for mds" << master - << ": ignoring, no previously saved blob" << dendl; + << ": ignoring, no previously saved blobs" << dendl; } break; -- 2.39.5