]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
fixed slaveupdate trimming; journal rollback metablob for safety
authorsageweil <sageweil@29311d96-e01e-0410-9327-a35deaab8ce9>
Wed, 10 Oct 2007 21:23:21 +0000 (21:23 +0000)
committersageweil <sageweil@29311d96-e01e-0410-9327-a35deaab8ce9>
Wed, 10 Oct 2007 21:23:21 +0000 (21:23 +0000)
git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@1917 29311d96-e01e-0410-9327-a35deaab8ce9

branches/sage/mds/mds/LogSegment.h
branches/sage/mds/mds/MDCache.cc
branches/sage/mds/mds/MDCache.h
branches/sage/mds/mds/Server.cc
branches/sage/mds/mds/events/ESlaveUpdate.h
branches/sage/mds/mds/journal.cc

index 7579add569d506eef7cb6d8079db7f8b3512ca69..e73f5f8b61b9ccf1b0bff51da4117f4ee31623cb 100644 (file)
@@ -26,6 +26,7 @@ class CDir;
 class CInode;
 class CDentry;
 class MDS;
+class MDSlaveUpdate;
 
 class LogSegment {
  public:
@@ -40,6 +41,8 @@ class LogSegment {
   xlist<CInode*>  open_files;
   xlist<CInode*>  dirty_inode_mtimes;
 
+  xlist<MDSlaveUpdate*> slave_updates;
+
   //xlist<CInode*>  purging_inodes;
   map<CInode*, map<off_t,off_t> > purging_inodes;
 
index 9dbbbb0a011355244a7591e25ff4faa4fd17c13e..38c95be1e18c283aaad8df2b3a3af36686f53e4b 100644 (file)
@@ -1115,7 +1115,7 @@ void MDCache::send_resolve_now(int who)
   }
   // [resolving]
   if (uncommitted_slave_updates.count(who)) {
-    for (map<metareqid_t, EMetaBlob>::iterator p = uncommitted_slave_updates[who].begin();
+    for (map<metareqid_t, MDSlaveUpdate>::iterator p = uncommitted_slave_updates[who].begin();
         p != uncommitted_slave_updates[who].end();
         ++p) {
       dout(10) << " including uncommitted " << p->first << dendl;
@@ -1415,7 +1415,7 @@ void MDCache::handle_resolve_ack(MMDSResolveAck *ack)
     if (mds->is_resolve()) {
       // replay
       assert(uncommitted_slave_updates[from].count(*p));
-      uncommitted_slave_updates[from][*p].replay(mds);
+      uncommitted_slave_updates[from][*p].commit.replay(mds);
       uncommitted_slave_updates[from].erase(*p);
       // log commit
       mds->mdlog->submit_entry(new ESlaveUpdate(mds->mdlog, "unknown", *p, from, ESlaveUpdate::OP_COMMIT));
@@ -1433,6 +1433,7 @@ void MDCache::handle_resolve_ack(MMDSResolveAck *ack)
 
     if (mds->is_resolve()) {
       assert(uncommitted_slave_updates[from].count(*p));
+      uncommitted_slave_updates[from][*p].rollback.replay(mds);
       uncommitted_slave_updates[from].erase(*p);
       mds->mdlog->submit_entry(new ESlaveUpdate(mds->mdlog, "unknown", *p, from, ESlaveUpdate::OP_ROLLBACK));
     } else {
index a56dcbc9d55cee9522aa2b3af0448205571352cc..f85e8fdd6153d80e4707e3f1e5e836d997cd5ac1 100644 (file)
@@ -216,6 +216,25 @@ inline ostream& operator<<(ostream& out, MDRequest &mdr)
   return out;
 }
 
+struct MDSlaveUpdate {
+  EMetaBlob commit;
+  EMetaBlob rollback;
+  xlist<MDSlaveUpdate*>::item xlistitem;
+  Context *waiter;
+  MDSlaveUpdate() : xlistitem(this), waiter(0) {}
+  MDSlaveUpdate(EMetaBlob c, EMetaBlob r, xlist<MDSlaveUpdate*> &list) :
+    commit(c), rollback(r),
+    xlistitem(this),
+    waiter(0) {
+    list.push_back(&xlistitem);
+  }
+  ~MDSlaveUpdate() {
+    if (waiter) waiter->finish(0);
+    delete waiter;
+  }
+};
+
+
 class MDCache {
  public:
   // my master
@@ -349,7 +368,7 @@ protected:
   // from MMDSResolves
   map<int, map<dirfrag_t, list<dirfrag_t> > > other_ambiguous_imports;  
 
-  map<int, map<metareqid_t, EMetaBlob> > uncommitted_slave_updates;  // for replay.
+  map<int, map<metareqid_t, MDSlaveUpdate> > uncommitted_slave_updates;  // for replay.
   map<metareqid_t, bool>     ambiguous_slave_updates;         // for log trimming.
   map<metareqid_t, Context*> waiting_for_slave_update_commit;
   friend class ESlaveUpdate;
index 7f538fc29bded80cab6e35de69a81183092585f7..2d855898eaf9559a920e46a76869aeddd1dc5727 100644 (file)
@@ -2088,8 +2088,17 @@ void Server::handle_slave_link_prep(MDRequest *mdr)
     }
   }
 
-  // update journaled target inode
+  // journal it
+  mdr->ls = mdlog->get_current_segment();
+  ESlaveUpdate *le = new ESlaveUpdate(mdlog, "slave_link_prep", mdr->reqid, mdr->slave_to_mds, ESlaveUpdate::OP_PREPARE);
+
   inode_t *pi = dn->inode->project_inode();
+
+  // rollback case
+  le->rollback.add_dir_context(targeti->get_parent_dir());
+  le->rollback.add_primary_dentry(dn, true, targeti, pi);  // update old primary
+
+  // update journaled target inode
   bool inc;
   if (mdr->slave_request->get_op() == MMDSSlaveRequest::OP_LINKPREP) {
     inc = true;
@@ -2104,11 +2113,10 @@ void Server::handle_slave_link_prep(MDRequest *mdr)
 
   dout(10) << " projected inode " << pi << " v " << pi->version << dendl;
 
-  // journal it
-  mdr->ls = mdlog->get_current_segment();
-  ESlaveUpdate *le = new ESlaveUpdate(mdlog, "slave_link_prep", mdr->reqid, mdr->slave_to_mds, ESlaveUpdate::OP_PREPARE);
-  le->metablob.add_dir_context(targeti->get_parent_dir());
-  le->metablob.add_primary_dentry(dn, true, targeti, pi);  // update old primary
+  // commit case
+  le->commit.add_dir_context(targeti->get_parent_dir());
+  le->commit.add_primary_dentry(dn, true, targeti, pi);  // update old primary
+
   mdlog->submit_entry(le, new C_MDS_SlaveLinkPrep(this, mdr, targeti, old_ctime, inc));
 }
 
@@ -3339,7 +3347,22 @@ void Server::handle_slave_rename_prep(MDRequest *mdr)
     // journal.
     mdr->ls = mdlog->get_current_segment();
     ESlaveUpdate *le = new ESlaveUpdate(mdlog, "slave_rename_prep", mdr->reqid, mdr->slave_to_mds, ESlaveUpdate::OP_PREPARE);
-    _rename_prepare(mdr, &le->metablob, srcdn, destdn, straydn);
+
+    // rollback case
+    if (destdn->inode && destdn->inode->is_auth()) {
+      assert(destdn->is_remote());
+      le->rollback.add_dir_context(destdn->dir);
+      le->rollback.add_dentry(destdn, true);
+    }
+    if (srcdn->is_auth() ||
+       (srcdn->inode && srcdn->inode->is_auth())) {
+      le->rollback.add_dir_context(srcdn->dir);
+      le->rollback.add_dentry(srcdn, true);
+    }
+
+    // commit case
+    _rename_prepare(mdr, &le->commit, srcdn, destdn, straydn);
+
     mdlog->submit_entry(le, new C_MDS_SlaveRenamePrep(this, mdr, srcdn, destdn, straydn));
   } else {
     // don't journal.
index 23d280a7b831cd2f48cf25d538d68f76b339889f..54eaef9c6a29641553bebfe527ebfbd86fdc586d 100644 (file)
@@ -24,7 +24,14 @@ public:
   const static int OP_COMMIT = 2;
   const static int OP_ROLLBACK = 3;
   
-  EMetaBlob metablob;
+  /*
+   * we journal a rollback metablob that contains the unmodified metadata
+   * too, because we may be updating previously dirty metadata, which 
+   * will allow old log segments to be trimmed.  if we end of rolling back,
+   * those updates could be lost.. so we re-journal the unmodified metadata,
+   * and replay will apply _either_ commit or rollback.
+   */
+  EMetaBlob commit, rollback;
   string type;
   metareqid_t reqid;
   int master;
@@ -32,7 +39,7 @@ public:
 
   ESlaveUpdate() : LogEvent(EVENT_SLAVEUPDATE) { }
   ESlaveUpdate(MDLog *mdlog, const char *s, metareqid_t ri, int mastermds, int o) : 
-    LogEvent(EVENT_SLAVEUPDATE), metablob(mdlog),
+    LogEvent(EVENT_SLAVEUPDATE), commit(mdlog), rollback(mdlog),
     type(s),
     reqid(ri),
     master(mastermds),
@@ -44,7 +51,7 @@ public:
     out << " " << op;
     out << " " << reqid;
     out << " for mds" << master;
-    out << metablob;
+    out << commit << " " << rollback;
   }
 
   void encode_payload(bufferlist& bl) {
@@ -52,14 +59,16 @@ public:
     ::_encode(reqid, bl);
     ::_encode(master, bl);
     ::_encode(op, bl);
-    metablob._encode(bl);
+    commit._encode(bl);
+    rollback._encode(bl);
   } 
   void decode_payload(bufferlist& bl, int& off) {
     ::_decode(type, bl, off);
     ::_decode(reqid, bl, off);
     ::_decode(master, bl, off);
     ::_decode(op, bl, off);
-    metablob._decode(bl, off);
+    commit._decode(bl, off);
+    rollback._decode(bl, off);
   }
 
   bool has_expired(MDS *mds);
index eacf7f290057918af76db31fc8fe6f0b0cbf07d8..b78f516546aec09dee6f8e82de17a9b59db8a68b 100644 (file)
@@ -97,8 +97,7 @@ C_Gather *LogSegment::try_to_expire(MDS *mds)
   }
 
   // dirty non-auth mtimes
-  if(0) //fuckfuck
-    for (xlist<CInode*>::iterator p = dirty_inode_mtimes.begin(); !p.end(); ++p) {
+  for (xlist<CInode*>::iterator p = dirty_inode_mtimes.begin(); !p.end(); ++p) {
     dout(10) << "try_to_expire waiting for dirlock mtime flush on " << **p << dendl;
     if (!gather) gather = new C_Gather;
     (*p)->dirlock.add_waiter(SimpleLock::WAIT_STABLE, gather->new_sub());
@@ -117,6 +116,15 @@ C_Gather *LogSegment::try_to_expire(MDS *mds)
     dout(10) << "try_to_expire waiting for open files to rejournal" << dendl;
   }
 
+  // slave updates
+  for (xlist<MDSlaveUpdate*>::iterator p = slave_updates.begin(); !p.end(); ++p) {
+    MDSlaveUpdate *su = *p;
+    dout(10) << "try_to_expire waiting on slave update " << su << dendl;
+    assert(su->waiter == 0);
+    if (!gather) gather = new C_Gather;
+    su->waiter = gather->new_sub();
+  }
+
   // idalloc
   if (allocv > mds->idalloc->get_committed_version()) {
     dout(10) << "try_to_expire saving idalloc table, need " << allocv
@@ -842,106 +850,42 @@ void EOpen::replay(MDS *mds)
 // -----------------------
 // ESlaveUpdate
 
-bool ESlaveUpdate::has_expired(MDS *mds)
-{
-  switch (op) {
-  case ESlaveUpdate::OP_PREPARE:
-    if (mds->mdcache->ambiguous_slave_updates.count(reqid) == 0) {
-      dout(10) << "ESlaveUpdate.has_expired prepare " << reqid << " for mds" << master 
-              << ": haven't yet seen commit|rollback" << dendl;
-      return false;
-    } 
-    else if (mds->mdcache->ambiguous_slave_updates[reqid]) {
-      dout(10) << "ESlaveUpdate.has_expired prepare " << reqid << " for mds" << master 
-              << ": committed, checking metablob" << dendl;
-      bool exp = metablob.has_expired(mds);
-      if (exp) 
-       mds->mdcache->ambiguous_slave_updates.erase(reqid);
-      return exp;      
-    }
-    else {
-      dout(10) << "ESlaveUpdate.has_expired prepare " << reqid << " for mds" << master 
-              << ": aborted" << dendl;
-      mds->mdcache->ambiguous_slave_updates.erase(reqid);
-      return true;
-    }
-    
-  case ESlaveUpdate::OP_COMMIT:
-  case ESlaveUpdate::OP_ROLLBACK:
-    if (mds->mdcache->waiting_for_slave_update_commit.count(reqid)) {
-      dout(10) << "ESlaveUpdate.has_expired "
-              << ((op == ESlaveUpdate::OP_COMMIT) ? "commit ":"rollback ")
-              << reqid << " for mds" << master 
-              << ": noting commit, kicking prepare waiter" << dendl;
-      mds->mdcache->ambiguous_slave_updates[reqid] = (op == ESlaveUpdate::OP_COMMIT);
-      mds->mdcache->waiting_for_slave_update_commit[reqid]->finish(0);
-      delete mds->mdcache->waiting_for_slave_update_commit[reqid];
-      mds->mdcache->waiting_for_slave_update_commit.erase(reqid);
-    } else {
-      dout(10) << "ESlaveUpdate.has_expired "
-              << ((op == ESlaveUpdate::OP_COMMIT) ? "commit ":"rollback ")
-              << reqid << " for mds" << master 
-              << ": no prepare waiter, ignoring" << dendl;
-    }
-    return true;
-
-  default:
-    assert(0);
-    return false;
-  }
-}
-
-void ESlaveUpdate::expire(MDS *mds, Context *c)
-{
-  assert(op == ESlaveUpdate::OP_PREPARE);
-
-  if (mds->mdcache->ambiguous_slave_updates.count(reqid) == 0) {
-    // wait
-    dout(10) << "ESlaveUpdate.expire prepare " << reqid << " for mds" << master
-            << ": waiting for commit|rollback" << dendl;
-    mds->mdcache->waiting_for_slave_update_commit[reqid] = c;
-  } else {
-    // we committed.. expire the metablob
-    assert(mds->mdcache->ambiguous_slave_updates[reqid] == true); 
-    dout(10) << "ESlaveUpdate.expire prepare " << reqid << " for mds" << master
-            << ": waiting for metablob to expire" << dendl;
-    //metablob.expire(mds, c);
-  }
-}
-
 void ESlaveUpdate::replay(MDS *mds)
 {
   switch (op) {
   case ESlaveUpdate::OP_PREPARE:
     // FIXME: horribly inefficient copy; EMetaBlob needs a swap() or something
     dout(10) << "ESlaveUpdate.replay prepare " << reqid << " for mds" << master 
-            << ": saving blob for later commit" << dendl;
+            << ": saving blobs for later commit" << dendl;
     assert(mds->mdcache->uncommitted_slave_updates[master].count(reqid) == 0);
-    metablob._segment = _segment;  // may need this later
-    mds->mdcache->uncommitted_slave_updates[master][reqid] = metablob;
+    commit._segment = _segment;  // may need this later
+    rollback._segment = _segment;  // may need this later
+    mds->mdcache->uncommitted_slave_updates[master][reqid] = 
+      MDSlaveUpdate(commit, rollback, _segment->slave_updates);
     break;
 
   case ESlaveUpdate::OP_COMMIT:
     if (mds->mdcache->uncommitted_slave_updates[master].count(reqid)) {
       dout(10) << "ESlaveUpdate.replay commit " << reqid << " for mds" << master
-              << ": applying previously saved blob" << dendl;
-      mds->mdcache->uncommitted_slave_updates[master][reqid].replay(mds, _segment);
+              << ": applying commit blob" << dendl;
+      mds->mdcache->uncommitted_slave_updates[master][reqid].commit.replay(mds, _segment);
       mds->mdcache->uncommitted_slave_updates[master].erase(reqid);
     } else {
       dout(10) << "ESlaveUpdate.replay commit " << reqid << " for mds" << master 
-              << ": ignoring, no previously saved blob" << dendl;
+              << ": ignoring, no previously saved blobs" << dendl;
     }
     break;
 
   case ESlaveUpdate::OP_ROLLBACK:
     if (mds->mdcache->uncommitted_slave_updates[master].count(reqid)) {
       dout(10) << "ESlaveUpdate.replay abort " << reqid << " for mds" << master
-              << ": discarding previously saved blob" << dendl;
+              << ": applying rollback blob" << dendl;
       assert(mds->mdcache->uncommitted_slave_updates[master].count(reqid));
+      mds->mdcache->uncommitted_slave_updates[master][reqid].rollback.replay(mds, _segment);
       mds->mdcache->uncommitted_slave_updates[master].erase(reqid);
     } else {
       dout(10) << "ESlaveUpdate.replay abort " << reqid << " for mds" << master 
-              << ": ignoring, no previously saved blob" << dendl;
+              << ": ignoring, no previously saved blobs" << dendl;
     }
     break;