]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
mds: fix resolution, trimming of master requests with slaves
authorSage Weil <sage@newdream.net>
Mon, 2 Jun 2008 20:40:00 +0000 (13:40 -0700)
committerSage Weil <sage@newdream.net>
Mon, 2 Jun 2008 20:40:00 +0000 (13:40 -0700)
src/TODO
src/mds/LogEvent.cc
src/mds/LogEvent.h
src/mds/LogSegment.h
src/mds/MDCache.cc
src/mds/MDCache.h
src/mds/Server.cc
src/mds/Server.h
src/mds/events/EUpdate.h
src/mds/journal.cc
src/messages/MMDSSlaveRequest.h

index 762fd5940d74303e7f47ec5cab5005d8fb4bf8e3..7d693cee0408d97bb7d1deb02a7299d3c8f0d4e4 100644 (file)
--- a/src/TODO
+++ b/src/TODO
@@ -76,7 +76,7 @@ mds mustfix
 - make sure locker avoids frozen inodes
 /- make sure predirty_nested stops if it can't wrlock versionlock (acquire_locks normally hides that detail for us)
 - make sure stray inode is always opened on startup
-- make sure inode cache expire for frozen inode behaves
+/- make sure inode cache expire for frozen inode behaves
 
 - look at the client_map session opening code.. versus rollback (of import, or slave request)
 
index e9dafa34109a54291999fd56a0cd60cdcfd9f4ab..24920369d771a56d50962caef52e5ddf1c70305c 100644 (file)
@@ -31,6 +31,7 @@
 #include "events/EUpdate.h"
 #include "events/ESlaveUpdate.h"
 #include "events/EOpen.h"
+#include "events/ECommitted.h"
 
 #include "events/EPurgeFinish.h"
 
@@ -68,6 +69,7 @@ LogEvent *LogEvent::decode(bufferlist& bl)
   case EVENT_UPDATE: le = new EUpdate; break;
   case EVENT_SLAVEUPDATE: le = new ESlaveUpdate; break;
   case EVENT_OPEN: le = new EOpen; break;
+  case EVENT_COMMITTED: le = new ECommitted; break;
 
   case EVENT_PURGEFINISH: le = new EPurgeFinish; break;
 
index ae806854fc1ba443122fb12adfd512b4edebc518..1b04d805d855073861a8728bee2dbdc4407cf84e 100644 (file)
@@ -29,6 +29,7 @@
 #define EVENT_UPDATE       20
 #define EVENT_SLAVEUPDATE  21
 #define EVENT_OPEN         22
+#define EVENT_COMMITTED    23
 
 #define EVENT_PURGEFINISH  30
 
index 344c31e1507c07c3406fbb9501d2421dc38a0fc2..e1ce7dcabc1661b2a3b388529f2541bb3c0cb027 100644 (file)
@@ -18,6 +18,7 @@
 #include "include/xlist.h"
 #include "include/interval_set.h"
 #include "include/Context.h"
+#include "mdstypes.h"
 
 #include <ext/hash_set>
 using __gnu_cxx::hash_set;
@@ -48,6 +49,7 @@ class LogSegment {
 
   // committed anchor transactions
   hash_set<version_t> pending_commit_atids;
+  set<metareqid_t> uncommitted_slaves;
 
   // client request ids
   map<int, tid_t> last_client_tids;
index bd16bb94802dd751655e782f5151e4daaf27baac..1740fc3c22686c267a27e267478735a3b45a0f35 100644 (file)
@@ -44,6 +44,7 @@
 #include "events/EPurgeFinish.h"
 #include "events/EImportFinish.h"
 #include "events/EFragment.h"
+#include "events/ECommitted.h"
 
 #include "messages/MGenericMessage.h"
 
@@ -927,6 +928,88 @@ int MDCache::num_subtrees_fullnonauth()
 
 
 
+// ===================================
+// slave requests
+
+
+/*
+ * some handlers for master requests with slaves.  we need to make 
+ * sure slaves journal commits before we forget we mastered them.
+ */
+struct C_MDC_CommittedSlaves : public Context {
+  MDCache *cache;
+  metareqid_t reqid;
+  LogSegment *ls;
+  list<Context*> waiters;
+  C_MDC_CommittedSlaves(MDCache *s, metareqid_t r, LogSegment *l, list<Context*> &w) :
+    cache(s), reqid(r), ls(l) {
+    waiters.swap(w);
+  }
+  void finish(int r) {
+    cache->_logged_committed_slaves(reqid, ls, waiters);
+  }
+};
+
+void MDCache::log_all_uncommitted_slaves()
+{
+  while (!uncommitted_slaves.empty())
+    log_committed_slaves(uncommitted_slaves.begin()->first);
+}
+
+void MDCache::log_committed_slaves(metareqid_t reqid)
+{
+  dout(10) << "log_committed_slaves " << reqid << dendl;
+  mds->mdlog->submit_entry(new ECommitted(reqid), 
+                          new C_MDC_CommittedSlaves(this, reqid, 
+                                                    uncommitted_slaves[reqid].ls,
+                                                    uncommitted_slaves[reqid].waiters));
+  mds->mdcache->uncommitted_slaves.erase(reqid);
+}
+
+void MDCache::_logged_committed_slaves(metareqid_t reqid, LogSegment *ls, list<Context*> &waiters)
+{
+  dout(10) << "_logged_committed_slaves " << reqid << dendl;
+  ls->uncommitted_slaves.erase(reqid);
+  mds->queue_waiters(waiters);
+}
+
+// while active...
+
+void MDCache::committed_slave(metareqid_t r, int from)
+{
+  dout(10) << "committed_slave mds" << from << " on " << r << dendl;
+  assert(uncommitted_slaves.count(r));
+  uncommitted_slaves[r].slaves.erase(from);
+  if (uncommitted_slaves[r].slaves.empty())
+    log_committed_slaves(r);
+}
+
+// at end of resolve...
+
+struct C_MDC_SlaveCommit : public Context {
+  MDCache *cache;
+  int from;
+  metareqid_t reqid;
+  C_MDC_SlaveCommit(MDCache *c, int f, metareqid_t r) : cache(c), from(f), reqid(r) {}
+  void finish(int r) {
+    cache->_logged_slave_commit(from, reqid);
+  }
+};
+
+void MDCache::_logged_slave_commit(int from, metareqid_t reqid)
+{
+  dout(10) << "_logged_slave_commit from mds" << from << " " << reqid << dendl;
+  delete uncommitted_slave_updates[from][reqid];
+  uncommitted_slave_updates[from].erase(reqid);
+  if (uncommitted_slave_updates[from].empty())
+    uncommitted_slave_updates.erase(from);
+  
+  if (uncommitted_slave_updates.empty() && mds->is_resolve())
+    maybe_resolve_finish();    
+}
+
+
+
 
 
 
@@ -1253,7 +1336,7 @@ void MDCache::handle_resolve(MMDSResolve *m)
     for (list<metareqid_t>::iterator p = m->slave_requests.begin();
         p != m->slave_requests.end();
         ++p) {
-      if (mds->sessionmap.have_completed_request(*p)) {
+      if (uncommitted_slaves.count(*p)) {  //mds->sessionmap.have_completed_request(*p)) {
        // COMMIT
        dout(10) << " ambiguous slave request " << *p << " will COMMIT" << dendl;
        ack->add_commit(*p);
@@ -1358,15 +1441,21 @@ void MDCache::handle_resolve(MMDSResolve *m)
 void MDCache::maybe_resolve_finish()
 {
   if (got_resolve != recovery_set) {
-    dout(10) << "maybe_resolve_finish still waiting for more resolves, got (" << got_resolve 
-            << "), need (" << recovery_set << ")" << dendl;
+    dout(10) << "maybe_resolve_finish still waiting for more resolves, got (" 
+            << got_resolve << "), need (" << recovery_set << ")" << dendl;
   } 
   else if (!need_resolve_ack.empty()) {
-    dout(10) << "maybe_resolve_finish still waiting for resolve_ack from (" << need_resolve_ack << ")" << dendl;
+    dout(10) << "maybe_resolve_finish still waiting for resolve_ack from (" 
+            << need_resolve_ack << ")" << dendl;
   }
   else if (!need_resolve_rollback.empty()) {
-    dout(10) << "maybe_resolve_finish still waiting for rollback to commit on (" << need_resolve_rollback << ")" << dendl;
-  } else {
+    dout(10) << "maybe_resolve_finish still waiting for rollback to commit on (" 
+            << need_resolve_rollback << ")" << dendl;
+  } 
+  else if (!uncommitted_slave_updates.empty()) {
+    dout(10) << "maybe_resolve_finish still waiting for slave commits to commit" << dendl;
+  } 
+  else {
     dout(10) << "maybe_resolve_finish got all resolves+resolve_acks, done." << dendl;
     disambiguate_imports();
     if (mds->is_resolve()) {
@@ -1377,6 +1466,7 @@ void MDCache::maybe_resolve_finish()
   } 
 }
 
+
 void MDCache::handle_resolve_ack(MMDSResolveAck *ack)
 {
   dout(10) << "handle_resolve_ack " << *ack << " from " << ack->get_source() << dendl;
@@ -1419,8 +1509,7 @@ void MDCache::handle_resolve_ack(MMDSResolveAck *ack)
       else
        assert(0);
 
-      delete uncommitted_slave_updates[from][*p];
-      uncommitted_slave_updates[from].erase(*p);
+      mds->mdlog->wait_for_sync(new C_MDC_SlaveCommit(this, from, *p));
     } else {
       MDRequest *mdr = request_get(*p);
       if (mdr->more()->slave_commit) {
@@ -4781,6 +4870,8 @@ for (int i=0; i<CInode::NUM_PINS; i++) {
 }
 
 
+
+
 // --------------------------------------------------------------------
 // ANCHORS
 
index 901f75bed1ec5d09598866f269891725f296e978..0e9b96e7bb9b0fcef3a3fc85a0e359fff9bcccfe 100644 (file)
@@ -442,6 +442,20 @@ public:
   void request_drop_locks(MDRequest *r);
   void request_cleanup(MDRequest *r);
 
+  // slaves
+  void add_uncommitted_slaves(metareqid_t reqid, LogSegment *ls, set<int> &slaves) {
+    uncommitted_slaves[reqid].ls = ls;
+    uncommitted_slaves[reqid].slaves = slaves;
+  }
+  void wait_for_uncommitted_slaves(metareqid_t reqid, Context *c) {
+    uncommitted_slaves[reqid].waiters.push_back(c);
+  }
+  void log_all_uncommitted_slaves();
+  void log_committed_slaves(metareqid_t reqid);
+  void _logged_committed_slaves(metareqid_t reqid, LogSegment *ls, list<Context*> &waiters);
+  void committed_slave(metareqid_t r, int from);
+  void _logged_slave_commit(int from, metareqid_t reqid);
+
 
   // inode purging
   map<CInode*, map<off_t, off_t> > purging;  // inode -> newsize -> oldsize
@@ -464,10 +478,20 @@ protected:
   // from MMDSResolves
   map<int, map<dirfrag_t, list<dirfrag_t> > > other_ambiguous_imports;  
 
-  map<int, map<metareqid_t, MDSlaveUpdate*> > uncommitted_slave_updates;  // for replay.
-  map<metareqid_t, bool>     ambiguous_slave_updates;         // for log trimming.
-  map<metareqid_t, Context*> waiting_for_slave_update_commit;
+  map<int, map<metareqid_t, MDSlaveUpdate*> > uncommitted_slave_updates;  // slave: for replay.
+
+  // track master requests whose slaves haven't acknowledged commit
+  struct uslave {
+    set<int> slaves;
+    LogSegment *ls;
+    list<Context*> waiters;
+  };
+  map<metareqid_t, uslave>                 uncommitted_slaves;         // master: req -> slave set
+
+  //map<metareqid_t, bool>     ambiguous_slave_updates;         // for log trimming.
+  //map<metareqid_t, Context*> waiting_for_slave_update_commit;
   friend class ESlaveUpdate;
+  friend class ECommitted;
 
   set<int> wants_resolve;   // nodes i need to send my resolve to
   set<int> got_resolve;     // nodes i got resolves from
index c79e6ebc573a599575f58571a4edb93be45be596..8e01c8f2b545c904941966251b9ed7d48a6fe652 100644 (file)
@@ -41,6 +41,7 @@
 #include "events/ESlaveUpdate.h"
 #include "events/ESession.h"
 #include "events/EOpen.h"
+#include "events/ECommitted.h"
 
 #include "include/filepath.h"
 #include "common/Timer.h"
@@ -865,6 +866,13 @@ void Server::handle_slave_request(MMDSSlaveRequest *m)
       }
       break;
 
+    case MMDSSlaveRequest::OP_COMMITTED:
+      {
+       metareqid_t r = m->get_reqid();
+       mds->mdcache->committed_slave(r, from);
+      }
+      break;
+
     default:
       assert(0);
     }
@@ -2286,7 +2294,7 @@ public:
 void Server::_link_remote(MDRequest *mdr, CDentry *dn, CInode *targeti)
 {
   dout(10) << "_link_remote " << *dn << " to " << *targeti << dendl;
-    
+
   // 1. send LinkPrepare to dest (journal nlink++ prepare)
   int linkauth = targeti->authority().first;
   if (mdr->more()->witnessed.count(linkauth) == 0) {
@@ -2313,6 +2321,15 @@ void Server::_link_remote(MDRequest *mdr, CDentry *dn, CInode *targeti)
   mdr->ls = mdlog->get_current_segment();
   EUpdate *le = new EUpdate(mdlog, "link_remote");
   le->metablob.add_client_req(mdr->reqid);
+  if (!mdr->more()->slaves.empty()) {
+    dout(20) << " noting uncommitted_slaves " << mdr->more()->slaves << dendl;
+    
+    le->reqid = mdr->reqid;
+    le->had_slaves = true;
+    
+    mds->mdcache->add_uncommitted_slaves(mdr->reqid, mdr->ls, mdr->more()->slaves);
+  }
+
   mds->locker->predirty_nested(mdr, &le->metablob, targeti, dn->dir, PREDIRTY_DIR, 1);
   le->metablob.add_remote_dentry(dn, true, targeti->ino(), 
                                 MODE_TO_DT(targeti->inode.mode));  // new remote
@@ -2468,6 +2485,15 @@ void Server::_logged_slave_link(MDRequest *mdr, CInode *targeti)
 }
 
 
+struct C_MDS_CommittedSlave : public Context {
+  Server *server;
+  MDRequest *mdr;
+  C_MDS_CommittedSlave(Server *s, MDRequest *m) : server(s), mdr(m) {}
+  void finish(int r) {
+    server->_committed_slave(mdr);
+  }
+};
+
 void Server::_commit_slave_link(MDRequest *mdr, int r, CInode *targeti)
 {  
   dout(10) << "_commit_slave_link " << *mdr
@@ -2478,13 +2504,20 @@ void Server::_commit_slave_link(MDRequest *mdr, int r, CInode *targeti)
     // write a commit to the journal
     ESlaveUpdate *le = new ESlaveUpdate(mdlog, "slave_link_commit", mdr->reqid, mdr->slave_to_mds,
                                        ESlaveUpdate::OP_COMMIT, ESlaveUpdate::LINK);
-    mdlog->submit_entry(le);
-    mds->mdcache->request_finish(mdr);
+    mdlog->submit_entry(le, new C_MDS_CommittedSlave(this, mdr));
   } else {
     do_link_rollback(mdr->more()->rollback_bl, mdr->slave_to_mds, mdr);
   }
 }
 
+void Server::_committed_slave(MDRequest *mdr)
+{
+  dout(10) << "_committed_slave " << *mdr << dendl;
+  MMDSSlaveRequest *req = new MMDSSlaveRequest(mdr->reqid, MMDSSlaveRequest::OP_COMMITTED);
+  mds->send_message_mds(req, mdr->slave_to_mds);
+  mds->mdcache->request_finish(mdr);
+}
+
 struct C_MDS_LoggedLinkRollback : public Context {
   Server *server;
   Mutation *mut;
@@ -3300,9 +3333,18 @@ void Server::handle_client_rename(MDRequest *mdr)
   mdr->ls = mdlog->get_current_segment();
   EUpdate *le = new EUpdate(mdlog, "rename");
   le->metablob.add_client_req(mdr->reqid);
+  if (!mdr->more()->slaves.empty()) {
+    dout(20) << " noting uncommitted_slaves " << mdr->more()->slaves << dendl;
+    
+    le->reqid = mdr->reqid;
+    le->had_slaves = true;
+    
+    mds->mdcache->add_uncommitted_slaves(mdr->reqid, mdr->ls, mdr->more()->slaves);
+  }
   
   _rename_prepare(mdr, &le->metablob, &le->client_map, srcdn, destdn, straydn);
 
+
   if (!srcdn->is_auth() && srcdn->is_primary()) {
     // importing inode; also journal imported client map
     
@@ -3748,8 +3790,12 @@ void Server::handle_slave_rename_prep(MDRequest *mdr)
     if (srcdn->is_primary() && 
        !srcdn->inode->is_freezing_inode() &&
        !srcdn->inode->is_frozen_inode()) {
-      // srci auth.  
-      // set ambiguous auth.
+      // set ambiguous auth for srci
+      /*
+       * NOTE: we don't worry about ambiguous cache expire as we do
+       * with subtree migrations because all slaves will pin
+       * srcdn->inode for duration of this rename.
+       */
       srcdn->inode->state_set(CInode::STATE_AMBIGUOUSAUTH);
 
       // freeze?
@@ -3798,7 +3844,7 @@ void Server::handle_slave_rename_prep(MDRequest *mdr)
   else {
     assert(srcdn->is_remote());
     rollback.orig_src.remote_ino = srcdn->get_remote_ino();
-    rollback.orig_src.remote_ino = srcdn->get_remote_d_type();
+    rollback.orig_src.remote_d_type = srcdn->get_remote_d_type();
   }
   
   rollback.orig_dest.dirfrag = destdn->dir->dirfrag();
@@ -3809,7 +3855,7 @@ void Server::handle_slave_rename_prep(MDRequest *mdr)
     rollback.orig_dest.ino = destdn->inode->ino();
   else if (destdn->is_remote()) {
     rollback.orig_dest.remote_ino = destdn->get_remote_ino();
-    rollback.orig_dest.remote_ino = destdn->get_remote_d_type();
+    rollback.orig_dest.remote_d_type = destdn->get_remote_d_type();
   }
   
   if (straydn) {
@@ -3837,7 +3883,7 @@ void Server::handle_slave_rename_prep(MDRequest *mdr)
     mdlog->submit_entry(le, new C_MDS_SlaveRenamePrep(this, mdr, srcdn, destdn, straydn));
   } else {
     // don't journal.
-    dout(10) << "not journaling, i'm not auth for anything, and srci isn't open" << dendl;
+    dout(10) << "not journaling: i'm not auth for anything, and srci has no caps" << dendl;
 
     // prepare anyway; this may twiddle dir_auth
     EMetaBlob blob;
@@ -3922,8 +3968,7 @@ void Server::_commit_slave_rename(MDRequest *mdr, int r,
       mds->queue_waiters(finished);
     }
 
-    mdlog->submit_entry(le);
-    mds->mdcache->request_finish(mdr);
+    mdlog->submit_entry(le, new C_MDS_CommittedSlave(this, mdr));
   } else {
     if (srcdn->is_auth() && destdn->is_primary() &&
        destdn->inode->state_test(CInode::STATE_AMBIGUOUSAUTH)) {
index ceb743328c55a651ea5535b88e63d8b5981ae7f7..dd78d0ce734a36504435a327c80f7ffe2c794c33 100644 (file)
@@ -142,6 +142,7 @@ public:
   void handle_slave_link_prep(MDRequest *mdr);
   void _logged_slave_link(MDRequest *mdr, CInode *targeti);
   void _commit_slave_link(MDRequest *mdr, int r, CInode *targeti);
+  void _committed_slave(MDRequest *mdr);  // use for rename, too
   void handle_slave_link_prep_ack(MDRequest *mdr, MMDSSlaveRequest *m);
   void do_link_rollback(bufferlist &rbl, int master, MDRequest *mdr);
   void _link_rollback_finish(Mutation *mut, MDRequest *mdr);
index d76bde19a0bc45b41d38b85a637c1f13017ef9d9..f13d8446670c44ec85bfb581e4c21d3ff91633d7 100644 (file)
@@ -23,11 +23,13 @@ public:
   EMetaBlob metablob;
   string type;
   bufferlist client_map;
+  metareqid_t reqid;
+  bool had_slaves;
 
   EUpdate() : LogEvent(EVENT_UPDATE) { }
   EUpdate(MDLog *mdlog, const char *s) : 
     LogEvent(EVENT_UPDATE), metablob(mdlog),
-    type(s) { }
+    type(s), had_slaves(false) { }
   
   void print(ostream& out) {
     if (type.length())
@@ -39,11 +41,15 @@ public:
     ::encode(type, bl);
     ::encode(metablob, bl);
     ::encode(client_map, bl);
+    ::encode(reqid, bl);
+    ::encode(had_slaves, bl);
   } 
   void decode(bufferlist::iterator &bl) {
     ::decode(type, bl);
     ::decode(metablob, bl);
     ::decode(client_map, bl);
+    ::decode(reqid, bl);
+    ::decode(had_slaves, bl);
   }
 
   void update_segment();
index 72359895dd617519e9a21dd6c6c0f694e4e38ea9..33b24125f8e4be694295f3b8d42d976dd78d9d37 100644 (file)
@@ -22,6 +22,7 @@
 #include "events/EUpdate.h"
 #include "events/ESlaveUpdate.h"
 #include "events/EOpen.h"
+#include "events/ECommitted.h"
 
 #include "events/EPurgeFinish.h"
 
@@ -105,6 +106,15 @@ C_Gather *LogSegment::try_to_expire(MDS *mds)
     }
   }
 
+  // master ops with possibly uncommitted slaves
+  for (set<metareqid_t>::iterator p = uncommitted_slaves.begin();
+       p != uncommitted_slaves.end();
+       p++) {
+    dout(10) << "try_to_expire waiting for slaves to ack commit on " << *p << dendl;
+    if (!gather) gather = new C_Gather;
+    mds->mdcache->wait_for_uncommitted_slaves(*p, gather->new_sub());
+  }
+
   // dirty non-auth mtimes
   for (xlist<CInode*>::iterator p = dirty_dirfrag_dir.begin(); !p.end(); ++p) {
     CInode *in = *p;
@@ -594,11 +604,19 @@ void EAnchorClient::replay(MDS *mds)
 void EUpdate::update_segment()
 {
   metablob.update_segment(_segment);
+
+  if (had_slaves)
+    _segment->uncommitted_slaves.insert(reqid);
 }
 
 void EUpdate::replay(MDS *mds)
 {
   metablob.replay(mds, _segment);
+
+  if (had_slaves) {
+    dout(10) << "EUpdate.replay " << reqid << " had slaves, expecting a matching ECommitted" << dendl;
+    _segment->uncommitted_slaves.insert(reqid);
+  }
 }
 
 
@@ -626,6 +644,21 @@ void EOpen::replay(MDS *mds)
 }
 
 
+// -----------------------
+// ECommitted
+
+void ECommitted::replay(MDS *mds)
+{
+  if (mds->mdcache->uncommitted_slaves.count(reqid)) {
+    dout(10) << "ECommitted.replay " << reqid << dendl;
+    mds->mdcache->uncommitted_slaves[reqid].ls->uncommitted_slaves.erase(reqid);
+    mds->mdcache->uncommitted_slaves.erase(reqid);
+  } else {
+    dout(10) << "ECommitted.replay " << reqid << " -- didn't see original op" << dendl;
+  }
+}
+
+
 
 // -----------------------
 // ESlaveUpdate
index 622f0acd5cae662ad3637c716675b350bf4e0827..6e1e84e0e737bbfe642f4b4cb070c80117d26296 100644 (file)
@@ -35,6 +35,7 @@ class MMDSSlaveRequest : public Message {
   static const int OP_RENAMEPREPACK = -7;
 
   static const int OP_FINISH = 17;  
+  static const int OP_COMMITTED = -18;  
 
   static const int OP_ABORT =  20;  // used for recovery only
   //static const int OP_COMMIT = 21;  // used for recovery only
@@ -56,6 +57,8 @@ class MMDSSlaveRequest : public Message {
     case OP_RENAMEPREPACK: return "rename_prep_ack";
 
     case OP_FINISH: return "finish"; // commit
+    case OP_COMMITTED: return "committed";
+
     case OP_ABORT: return "abort";
       //case OP_COMMIT: return "commit";