]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
* fixed link/unlink recovery behavior
authorsageweil <sageweil@29311d96-e01e-0410-9327-a35deaab8ce9>
Tue, 26 Jun 2007 22:35:23 +0000 (22:35 +0000)
committersageweil <sageweil@29311d96-e01e-0410-9327-a35deaab8ce9>
Tue, 26 Jun 2007 22:35:23 +0000 (22:35 +0000)
* fixed up slave request recovery handling in general
* unlink now reanchors when moving ots tray
* fixed rename replication of straydn

git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@1445 29311d96-e01e-0410-9327-a35deaab8ce9

18 files changed:
branches/sage/cephmds2/TODO
branches/sage/cephmds2/client/SyntheticClient.cc
branches/sage/cephmds2/mds/CDentry.h
branches/sage/cephmds2/mds/CInode.cc
branches/sage/cephmds2/mds/MDCache.cc
branches/sage/cephmds2/mds/MDCache.h
branches/sage/cephmds2/mds/MDS.cc
branches/sage/cephmds2/mds/Server.cc
branches/sage/cephmds2/mds/Server.h
branches/sage/cephmds2/mds/events/ESlaveUpdate.h
branches/sage/cephmds2/mds/journal.cc
branches/sage/cephmds2/messages/MDiscoverReply.h
branches/sage/cephmds2/messages/MMDSImportMap.h
branches/sage/cephmds2/messages/MMDSResolveAck.h [new file with mode: 0644]
branches/sage/cephmds2/messages/MMDSSlaveRequest.h
branches/sage/cephmds2/msg/Message.cc
branches/sage/cephmds2/msg/Message.h
branches/sage/cephmds2/script/find_auth_pins.pl

index 97561d1bed1da64cf96813938a475b0bca370ae3..44f1c36f58f698f55fc562e581b69461f8ae53c5 100644 (file)
@@ -47,28 +47,32 @@ sage doc
 
 sage mds
 
-- slave request cleanup on failure
-  - flag request, and discard on re-dispatch?  (cuz it'll be waiting on random stuff)
-
- - recovering node needs to know what slave ops committed
-   - either commit or abort
-   - if master op still exists, then ABORT.
-   - if master op dne, then COMMIT.
- - surviving node needs to 
-   - wait for log to flush (commits to finish), then
-   - for uncommitted requests, 
-     - include reqid in resolve message to recovering node
-     - remove failed from witnesses, waiting_on_slave, and
-     - redispatch
-   - somehow wait for needed peers to recover...
+- unlink needs to adjust anchortable
+
+/- rename_prep needs to do a CDentryDiscover on straydn.  and a CDirDiscover while we're at it!
+
+/- slave request cleanup on failure
+/  - flag request, and discard on re-dispatch?  (cuz it'll be waiting on random stuff)
+
+/- fix slave op commit/abort logic:
+/ - recovering node needs to know what stray prepare ops committed
+/   - include with import_map
+/   - wait for explicit commit/abort from peer.
+/ - surviving node needs to 
+/   - wait for log to flush (commits to finish), then
+/   - for uncommitted master requests, 
+/     - remove failed from witnesses, waiting_on_slave, and
+/     - redispatch
+/     - somehow wait for needed peers to recover...
+/   - for uncommitted slave requests,
+/     - include with import_map, wait for explicit commit/abort from peer.
+
+- make unlink/link behave with commit/abort recovery
 
 - new thrashing test with
  - link, unlink, and rename (lots of hard links!)
 
-- check/fix mdr->slaves, wrt slave ops (authpin etc.) that 'fail'
-
 - fix up writeback of dir inode mtime
-
 - revisit wrlocks, dir inode mtime updates.  esp in rename.
   - if auth, pin and be happy.  decide early.
   - make no attempt to dirty inodes until a gather
index 77cf529f49f414663dbcb500ea6d0a2fb37862e5..834013799ef7757164a19748090ebbabd4e4f6ba 100644 (file)
@@ -1496,7 +1496,7 @@ int SyntheticClient::thrash_links(const char *basedir, int dirs, int files, int
       sprintf(t, "/file.%d", a);
       src += t;
     }
-    string dst;
+    string dst = basedir;
     {
       char t[80];
       for (int d=0; d<depth; d++) {
index 807b0b110c24a3ddb770e66194c128010430153a..c2e6c03eae9e7ad0bc58c2181c989d6daa6b90e4 100644 (file)
@@ -246,7 +246,7 @@ class CDentryDiscover {
   int    replica_nonce;
   int    lockstate;
 
-  inodeno_t ino;
+  //inodeno_t ino;
   inodeno_t remote_ino;
 
 public:
@@ -254,7 +254,7 @@ public:
   CDentryDiscover(CDentry *dn, int nonce) :
     dname(dn->get_name()), replica_nonce(nonce),
     lockstate(dn->lock.get_replica_state()),
-    ino(dn->get_ino()),
+    //ino(dn->get_ino()),
     remote_ino(dn->get_remote_ino()) { }
 
   string& get_dname() { return dname; }
@@ -274,16 +274,18 @@ public:
 
   void _encode(bufferlist& bl) {
     ::_encode(dname, bl);
-    bl.append((char*)&replica_nonce, sizeof(replica_nonce));
-    bl.append((char*)&lockstate, sizeof(lockstate));
+    //::_encode(ino, bl);
+    ::_encode(remote_ino, bl);
+    ::_encode(replica_nonce, bl);
+    ::_encode(lockstate, bl);
   }
   
   void _decode(bufferlist& bl, int& off) {
     ::_decode(dname, bl, off);
-    bl.copy(off, sizeof(replica_nonce), (char*)&replica_nonce);
-    off += sizeof(replica_nonce);
-    bl.copy(off, sizeof(lockstate), (char*)&lockstate);
-    off += sizeof(lockstate);
+    //::_decode(ino, bl, off);
+    ::_decode(remote_ino, bl, off);
+    ::_decode(replica_nonce, bl, off);
+    ::_decode(lockstate, bl, off);
   }
 
 };
index 610cafe2fd3afe65d733af85060ec01ed4ce4a03..65a7938670d784744a96351c02e3a87b8f7a84f7 100644 (file)
@@ -273,7 +273,10 @@ void CInode::make_path(string& s)
     s = "";  // root
   } 
   else if (is_stray()) {
-    s = "~";
+    s = "~stray";
+    char n[10];
+    sprintf(n, "%d", (int)(ino()-MDS_INO_STRAY_OFFSET));
+    s += n;
   }
   else {
     s = "(dangling)";  // dangling
@@ -288,7 +291,7 @@ void CInode::make_anchor_trace(vector<Anchor>& trace)
     dout(10) << "make_anchor_trace added " << trace.back() << endl;
   }
   else 
-    assert(is_root());
+    assert(is_root() || is_stray());
 }
 
 void CInode::name_stray_dentry(string& dname)
index 7776372314a73db7b9edc60fe3360e3ca0a745cc..9f00dac3b521afb0f5102bd9b0dd7c7f45b06084 100644 (file)
@@ -47,6 +47,7 @@
 #include "messages/MGenericMessage.h"
 
 #include "messages/MMDSImportMap.h"
+#include "messages/MMDSResolveAck.h"
 #include "messages/MMDSCacheRejoin.h"
 
 #include "messages/MDiscover.h"
@@ -296,6 +297,23 @@ void MDCache::open_foreign_stray(int who, Context *c)
 }
 
 
+CDentry *MDCache::get_or_create_stray_dentry(CInode *in)
+{
+  string straydname;
+  in->name_stray_dentry(straydname);
+  frag_t fg = stray->pick_dirfrag(straydname);
+
+  CDir *straydir = stray->get_or_open_dirfrag(this, fg);
+  
+  CDentry *straydn = straydir->lookup(straydname);
+  if (!straydn) 
+    straydn = straydir->add_dentry(straydname, 0);
+  
+  return straydn;
+}
+
+
+
 MDSCacheObject *MDCache::get_object(MDSCacheObjectInfo &info) 
 {
   // inode?
@@ -961,33 +979,27 @@ void MDCache::send_import_map_now(int who)
     m->add_ambiguous_import(p->first, p->second);
   
 
-  // [survivor] list requests that may have slave PREPARE events journaled
+  // list prepare requests lacking a commit
+  // [active survivor]
   for (hash_map<metareqid_t, MDRequest*>::iterator p = active_requests.begin();
        p != active_requests.end();
        ++p) {
-    // might this slave have a PREPARE journaled?
-    // or, were we just waiting on this slave?
-    if (p->second->is_master() &&
-       (p->second->witnessed.count(who) ||
-        p->second->waiting_on_slave.count(who))) {
-      if (p->second->committing) {
-       dout(10) << " committing " << *p->second << ", waiting for log to flush" << endl;
-       mds->mdlog->wait_for_sync(new C_MDC_SendImportMap(this, who));
-       delete m;
-       return; 
-      }
-
+    if (p->second->is_slave() && p->second->slave_to_mds == who) {
       dout(10) << " including uncommitted " << *p->second << endl;
-      m->add_master_request(p->first);
-
-      // discard this peer's prepare (if any)
-      p->second->witnessed.erase(who);
-      p->second->waiting_on_slave.erase(who);
-      
-      // retry request when peer recovers!
-      mds->wait_for_active_peer(who, new C_MDS_RetryRequest(this, p->second));
+      m->add_slave_request(p->first);
     }
   }
+  // [resolving]
+  if (uncommitted_slave_updates.count(who)) {
+    for (map<metareqid_t, EMetaBlob>::iterator p = uncommitted_slave_updates[who].begin();
+        p != uncommitted_slave_updates[who].end();
+        ++p) {
+      dout(10) << " including uncommitted " << p->first << endl;
+      m->add_slave_request(p->first);
+    }
+    need_resolve_ack.insert(who);
+  }
+
 
   // send
   mds->send_message_mds(m, who, MDS_PORT_CACHE);
@@ -1057,16 +1069,43 @@ void MDCache::handle_mds_failure(int who)
     p = n;
   }
 
-  // clean up any slave requests from this node
+  // clean up any requests slave to/from this node
   list<MDRequest*> finish;
   for (hash_map<metareqid_t, MDRequest*>::iterator p = active_requests.begin();
        p != active_requests.end();
        ++p) {
     // slave to the failed node?
-    if (p->second->slave_to_mds == who) 
-      finish.push_back(p->second);
-
+    if (p->second->slave_to_mds == who) {
+      if (p->second->slave_did_prepare()) {
+       dout(10) << " slave request " << *p->second << " uncommitted, will resolve shortly" << endl;
+      } else {
+       dout(10) << " slave request " << *p->second << " has no prepare, finishing up" << endl;
+       if (p->second->slave_request)
+         p->second->aborted = true;
+       else
+         finish.push_back(p->second);
+      }
+    }
+    
+    // failed node is slave?
+    if (!p->second->committing) {
+      if (p->second->witnessed.count(who)) {
+       dout(10) << " master request " << *p->second << " no longer witnessed by slave mds" << who
+                << endl;
+       // discard this peer's prepare (if any)
+       p->second->witnessed.erase(who);
+      }
+      
+      if (p->second->waiting_on_slave.count(who)) {
+       dout(10) << " master request " << *p->second << " waiting for slave mds" << who
+                << " to recover" << endl;
+       // retry request when peer recovers
+       p->second->waiting_on_slave.erase(who);
+       mds->wait_for_active_peer(who, new C_MDS_RetryRequest(this, p->second));
+      }
+    }
   }
+
   while (!finish.empty()) {
     dout(10) << "cleaning up slave request " << *finish.front() << endl;
     request_finish(finish.front());
@@ -1149,16 +1188,25 @@ void MDCache::handle_import_map(MMDSImportMap *m)
   dout(7) << "handle_import_map from " << m->get_source() << endl;
   int from = m->get_source().num();
 
-  // try to disambiguate any unmatched slave prepares
-  for (list<metareqid_t>::iterator p = m->master_requests.begin();
-       p != m->master_requests.end();
-       ++p) {
-    if (uncommitted_slave_updates.count(*p)) {
-      // master request still exists; ABORT
-      dout(10) << " master request " << *p << " still in-progress, ABORTing our PREPARE" << endl;
-      uncommitted_slave_updates.erase(*p);
-      mds->mdlog->submit_entry(new ESlaveUpdate("unknown/recovered", *p, ESlaveUpdate::OP_ABORT));
+  // ambiguous slave requests?
+  if (!m->slave_requests.empty()) {
+    MMDSResolveAck *ack = new MMDSResolveAck;
+
+    for (list<metareqid_t>::iterator p = m->slave_requests.begin();
+        p != m->slave_requests.end();
+        ++p) {
+      if (mds->clientmap.have_completed_request(*p)) {
+       // COMMIT
+       dout(10) << " ambiguous slave request " << *p << " will COMMIT" << endl;
+       ack->add_commit(*p);
+      } else {
+       // ABORT
+       dout(10) << " ambiguous slave request " << *p << " will ABORT" << endl;
+       ack->add_abort(*p);      
+      }
     }
+
+    mds->send_message_mds(ack, from, MDS_PORT_CACHE);
   }
 
   // update my dir_auth values
@@ -1201,9 +1249,9 @@ void MDCache::handle_import_map(MMDSImportMap *m)
   show_subtrees();
 
 
-  // recovering?
-  if (!mds->is_rejoin() && !mds->is_active() && !mds->is_stopping()) {
-    // note ambiguous imports too.. unless i'm already active
+  // resolving?
+  if (mds->is_resolve()) {
+    // note ambiguous imports too
     for (map<dirfrag_t, list<dirfrag_t> >::iterator pi = m->ambiguous_imap.begin();
         pi != m->ambiguous_imap.end();
         ++pi) {
@@ -1213,39 +1261,90 @@ void MDCache::handle_import_map(MMDSImportMap *m)
 
     // did i get them all?
     got_import_map.insert(from);
-    
-    if (got_import_map == recovery_set) {
-      dout(10) << "got all import maps, done resolving subtrees" << endl;
-      commit_slave_updates();
-      disambiguate_imports();
-      recalc_auth_bits();
-      trim_non_auth(); 
-
-      // reconnect clients
-      mds->set_want_state(MDSMap::STATE_RECONNECT);
 
-    } else {
-      dout(10) << "still waiting for more importmaps, got " << got_import_map 
-              << ", need " << recovery_set << endl;
-    }
+    maybe_resolve_finish();
   }
 
   delete m;
 }
 
+void MDCache::maybe_resolve_finish()
+{
+  if (got_import_map != recovery_set) {
+    dout(10) << "still waiting for more importmaps, got " << got_import_map 
+            << ", need " << recovery_set << endl;
+  } 
+  else if (!need_resolve_ack.empty()) {
+    dout(10) << "still waiting for resolve_ack from " << need_resolve_ack << endl;
+  } 
+  else {
+    dout(10) << "got all import maps, resolve_acks, done resolving subtrees" << endl;
+    disambiguate_imports();
+    recalc_auth_bits();
+    trim_non_auth(); 
+    
+    // reconnect clients
+    mds->set_want_state(MDSMap::STATE_RECONNECT);
+  } 
+}
 
-void MDCache::commit_slave_updates()
+void MDCache::handle_resolve_ack(MMDSResolveAck *ack)
 {
-  dout(10) << "commit_slave_updates" << endl;
-  
-  while (!uncommitted_slave_updates.empty()) {
-    map<metareqid_t, EMetaBlob>::iterator p = uncommitted_slave_updates.begin();
-    dout(10) << " committing slave update " << *p << endl;
-    p->second.replay(mds);
-    mds->mdlog->submit_entry(new ESlaveUpdate("unknown/recovered", p->first, ESlaveUpdate::OP_COMMIT));
+  dout(10) << "handle_resolve_ack " << *ack << " from " << ack->get_source() << endl;
+  int from = ack->get_source().num();
+
+  for (list<metareqid_t>::iterator p = ack->commit.begin();
+       p != ack->commit.end();
+       ++p) {
+    dout(10) << " commit on slave " << *p << endl;
+    
+    if (mds->is_resolve()) {
+      // replay
+      assert(uncommitted_slave_updates[from].count(*p));
+      uncommitted_slave_updates[from][*p].replay(mds);
+      uncommitted_slave_updates[from].erase(*p);
+      // log commit
+      mds->mdlog->submit_entry(new ESlaveUpdate("unknown", *p, from, ESlaveUpdate::OP_COMMIT));
+    } else {
+      MDRequest *mdr = request_get(*p);
+      assert(mdr->slave_request == 0);  // shouldn't be doing anything!
+      request_finish(mdr);
+    }
   }
+
+  for (list<metareqid_t>::iterator p = ack->abort.begin();
+       p != ack->abort.end();
+       ++p) {
+    dout(10) << " abort on slave " << *p << endl;
+
+    if (mds->is_resolve()) {
+      assert(uncommitted_slave_updates[from].count(*p));
+      uncommitted_slave_updates[from].erase(*p);
+      mds->mdlog->submit_entry(new ESlaveUpdate("unknown", *p, from, ESlaveUpdate::OP_ABORT));
+    } else {
+      MDRequest *mdr = request_get(*p);
+      if (mdr->slave_commit) {
+       mdr->slave_commit->finish(-1);
+       delete mdr->slave_commit;
+       mdr->slave_commit = 0;
+      }
+      if (mdr->slave_request) 
+       mdr->aborted = true;
+      else
+       request_finish(mdr);
+    }
+  }
+
+  need_resolve_ack.erase(from);
+
+  if (mds->is_resolve()) 
+    maybe_resolve_finish();
+
+  delete ack;
 }
 
+
+
 void MDCache::disambiguate_imports()
 {
   dout(10) << "disambiguate_imports" << endl;
@@ -1644,7 +1743,7 @@ void MDCache::handle_cache_rejoin_rejoin(MMDSCacheRejoin *m)
        if (dn->is_replica(from) &&
            (m->weak_dentries.count(dn->get_dir()->dirfrag()) == 0 ||
             m->weak_dentries[dn->get_dir()->dirfrag()].count(dn->get_name()) == 0)) {
-         dn->remove_replica(from);
+         dentry_remove_replica(dn, from);
          dout(10) << " rem " << *dn << endl;
        }
       }
@@ -2726,9 +2825,20 @@ void MDCache::inode_remove_replica(CInode *in, int from)
     mds->locker->simple_eval(&in->linklock);
     mds->locker->simple_eval(&in->dirfragtreelock);
     mds->locker->file_eval(&in->filelock);
+    mds->locker->scatter_eval(&in->dirlock);
   }
 }
 
+void MDCache::dentry_remove_replica(CDentry *dn, int from)
+{
+  dn->remove_replica(from);
+
+  // fix lock
+  if (dn->lock.remove_replica(from) ||
+      !dn->is_replicated())
+    mds->locker->simple_eval(&dn->lock);
+}
+
 
 
 // =========================================================================================
@@ -2957,6 +3067,10 @@ void MDCache::dispatch(Message *m)
     handle_import_map((MMDSImportMap*)m);
     break;
 
+  case MSG_MDS_RESOLVEACK:
+    handle_resolve_ack((MMDSResolveAck*)m);
+    break;
+
   case MSG_MDS_CACHEREJOIN:
     handle_cache_rejoin((MMDSCacheRejoin*)m);
     break;
@@ -3389,7 +3503,7 @@ CInode *MDCache::get_dentry_inode(CDentry *dn, MDRequest *mdr)
     dn->link_remote(in);
     return in;
   } else {
-    dout(10) << "get_dentry_ninode on remote dn, opening inode for " << *dn << endl;
+    dout(10) << "get_dentry_inode on remote dn, opening inode for " << *dn << endl;
     open_remote_ino(dn->get_remote_ino(), mdr, new C_MDS_RetryRequest(this, mdr));
     return 0;
   }
@@ -3444,11 +3558,13 @@ void MDCache::open_remote_ino_2(inodeno_t ino,
   CInode *in = 0;
   while (1) {
     // inode?
+    dout(10) << " " << i << ": " << anchortrace[i-1] << endl;
     CInode *in = get_inode(anchortrace[i-1].ino);
     if (in) break;
     i--;
     if (!i) {
-      in = root;
+      CInode *in = get_inode(anchortrace[i].dirfrag.ino);
+      assert(in);
       break;
     }
   }
@@ -3562,6 +3678,13 @@ void MDCache::request_finish(MDRequest *mdr)
 {
   dout(7) << "request_finish " << *mdr << endl;
 
+  // slave finisher?
+  if (mdr->slave_commit) {
+    mdr->slave_commit->finish(0);
+    delete mdr->slave_commit;
+    mdr->slave_commit = 0;
+  }
+
   delete mdr->client_request;
   delete mdr->slave_request;
   request_cleanup(mdr);
@@ -4115,6 +4238,7 @@ void MDCache::handle_discover(MDiscover *dis)
        dout(7) << *cur << " dirfrag not open, not inode auth, setting dir_auth_hint" << endl;
        reply->set_dir_auth_hint(cur->authority().first);
       }
+      reply->set_wanted_xlocks_hint(dis->wants_xlocked());
       
       // set hint (+ dentry, if there is one)
       if (dis->get_want().depth() > i)
@@ -4401,16 +4525,19 @@ void MDCache::handle_discover_reply(MDiscoverReply *m)
     // let's try again.
     int hint = m->get_dir_auth_hint();
 
-    // include any path fragment we were looking for at the time
+
+    // include dentry _and_ dirfrag, just in case
     filepath want;
-    if (m->get_error_dentry().length() > 0)
-      want.push_dentry(m->get_error_dentry());
-    
-    mds->send_message_mds(new MDiscover(mds->get_nodeid(),
-                                       cur->ino(),
-                                       want,
-                                       true),  // being conservative here.
-                         hint, MDS_PORT_CACHE);
+    want.push_dentry(m->get_error_dentry());
+    MDiscover *dis = new MDiscover(mds->get_nodeid(),
+                                  cur->ino(),
+                                  want,
+                                  true,
+                                  m->get_wanted_xlocks_hint());
+    frag_t fg = cur->pick_dirfrag(m->get_error_dentry());
+    dis->set_base_dir_frag(fg);    
+
+    mds->send_message_mds(dis, hint, MDS_PORT_CACHE);
     
     // note the dangling discover
     dir_discovers[cur->ino()].insert(hint);
@@ -4481,7 +4608,47 @@ CDir *MDCache::forge_replica_dir(CInode *diri, frag_t fg, int from)
   return dir;
 }
                                 
+    
+CDentry *MDCache::add_replica_stray(bufferlist &bl, CInode *in, int from)
+{
+  int off = 0;
+  
+  // inode
+  CInodeDiscover indis;
+  indis._decode(bl, off);
+  CInode *strayin = get_inode(indis.get_ino());
+  if (!strayin)
+    strayin = new CInode(this, false);
+  indis.update_inode(strayin);
+  dout(15) << "strayin " << *strayin << endl;
+  
+  // dir
+  CDirDiscover dirdis;
+  dirdis._decode(bl, off);
+  list<Context*> finished;
+  CDir *straydir = add_replica_dir(strayin, dirdis.get_dirfrag().frag, dirdis,
+                                           from, finished);
+  mds->queue_waiters(finished);
+  dout(15) << "straydir " << *straydir << endl;
+  
+  // dentry
+  CDentryDiscover dndis;
+  dndis._decode(bl, off);
+  
+  string straydname;
+  in->name_stray_dentry(straydname);
+  CDentry *straydn = straydir->lookup(straydname);
+  if (straydn) {
+    dout(10) << "had straydn " << *straydn << endl;
+    dndis.update_dentry(straydn);
+  } else {
+    straydn = straydir->add_dentry( dndis.get_dname(), 0 );
+    dndis.update_new_dentry(straydn);
+    dout(10) << "added straydn " << *straydn << endl;
+  }
 
+  return straydn;
+}
 
 
 
index bdef2a95e8b128d9e90c578da2152ec5b829d359..6d08495fe98e9c86aad988a9d38cce9332e86b6c 100644 (file)
@@ -41,6 +41,7 @@ class Logger;
 class Message;
 
 class MMDSImportMap;
+class MMDSResolveAck;
 class MMDSCacheRejoin;
 class MMDSCacheRejoinAck;
 class MDiscover;
@@ -104,6 +105,7 @@ struct MDRequest {
   //  (useful for wrlock, which may be a moving auth target)
   bool done_locking; 
   bool committing;
+  bool aborted;
 
   // for rename/link/unlink
   utime_t now;
@@ -126,25 +128,27 @@ struct MDRequest {
   MDRequest() : 
     client_request(0), ref(0), 
     slave_request(0), slave_to_mds(-1), 
-    done_locking(false), committing(false),
+    done_locking(false), committing(false), aborted(false),
     src_reanchor_atid(0), dst_reanchor_atid(0), inode_import_v(0),
     slave_commit(0) { }
   MDRequest(metareqid_t ri, MClientRequest *req) : 
     reqid(ri), client_request(req), ref(0), 
     slave_request(0), slave_to_mds(-1), 
-    done_locking(false), committing(false),
+    done_locking(false), committing(false), aborted(false),
     src_reanchor_atid(0), dst_reanchor_atid(0), inode_import_v(0),
     slave_commit(0) { }
   MDRequest(metareqid_t ri, int by) : 
     reqid(ri), client_request(0), ref(0),
     slave_request(0), slave_to_mds(by), 
-    done_locking(false), committing(false),
+    done_locking(false), committing(false), aborted(false),
     src_reanchor_atid(0), dst_reanchor_atid(0), inode_import_v(0),
     slave_commit(0) { }
   
   bool is_master() { return slave_to_mds < 0; }
   bool is_slave() { return slave_to_mds >= 0; }
 
+  bool slave_did_prepare() { return slave_commit; }
+  
   // pin items in cache
   void pin(MDSCacheObject *o) {
     if (pins.count(o) == 0) {
@@ -296,21 +300,44 @@ public:
 protected:
   set<int> recovery_set;
 
+public:
+  void set_recovery_set(set<int>& s);
+  void handle_mds_failure(int who);
+  void handle_mds_recovery(int who);
+
+protected:
+  // [resolve]
   // from EImportStart w/o EImportFinish during journal replay
   map<dirfrag_t, list<dirfrag_t> >            my_ambiguous_imports;  
   // from MMDSImportMaps
   map<int, map<dirfrag_t, list<dirfrag_t> > > other_ambiguous_imports;  
 
-  map<metareqid_t, EMetaBlob> uncommitted_slave_updates;
+  map<int, map<metareqid_t, EMetaBlob> > uncommitted_slave_updates;
   friend class ESlaveUpdate;
 
   set<int> wants_import_map;   // nodes i need to send my import map to
   set<int> got_import_map;     // nodes i got import_maps from
+  set<int> need_resolve_ack;   // nodes i need a resolve_ack from
   
   void handle_import_map(MMDSImportMap *m);
-  void commit_slave_updates();
+  void handle_resolve_ack(MMDSResolveAck *m);
+  void maybe_resolve_finish();
   void disambiguate_imports();
+  void recalc_auth_bits();
+public:
+  // ambiguous imports
+  void add_ambiguous_import(dirfrag_t base, list<dirfrag_t>& bounds);
+  void add_ambiguous_import(CDir *base, const set<CDir*>& bounds);
+  void cancel_ambiguous_import(dirfrag_t dirino);
+  void finish_ambiguous_import(dirfrag_t dirino);
+  void send_import_map(int who);
+  void send_import_map_now(int who);
+  void send_import_map_later(int who);
+  void send_pending_import_maps();  // maybe.
+  void log_import_map(Context *onsync=0);
 
+protected:
+  // [rejoin]
   set<int> rejoin_gather;      // nodes from whom i need a rejoin
   set<int> rejoin_ack_gather;  // nodes from whom i need a rejoin ack
   set<int> want_rejoin_ack;    // nodes to whom i need to send a rejoin ack
@@ -322,26 +349,10 @@ protected:
   void handle_cache_rejoin_missing(MMDSCacheRejoin *m);
   void handle_cache_rejoin_full(MMDSCacheRejoin *m);
   void send_cache_rejoin_acks();
-  void recalc_auth_bits();
-
 public:
-  void set_recovery_set(set<int>& s);
-  void handle_mds_failure(int who);
-  void handle_mds_recovery(int who);
-  void send_import_map(int who);
-  void send_import_map_now(int who);
-  void send_import_map_later(int who);
-  void send_pending_import_maps();  // maybe.
   void send_cache_rejoins();
-  void log_import_map(Context *onsync=0);
 
 
-  // ambiguous imports
-  void add_ambiguous_import(dirfrag_t base, list<dirfrag_t>& bounds);
-  void add_ambiguous_import(CDir *base, const set<CDir*>& bounds);
-  void cancel_ambiguous_import(dirfrag_t dirino);
-  void finish_ambiguous_import(dirfrag_t dirino);
-
 
 
   friend class Locker;
@@ -430,6 +441,7 @@ public:
   }
 
   void inode_remove_replica(CInode *in, int rep);
+  void dentry_remove_replica(CDentry *dn, int rep);
 
   void rename_file(CDentry *srcdn, CDentry *destdn);
 
@@ -462,6 +474,7 @@ public:
   CInode *create_stray_inode(int whose=-1);
   void open_local_stray();
   void open_foreign_stray(int who, Context *c);
+  CDentry *get_or_create_stray_dentry(CInode *in);
 
   Context *_get_waiter(MDRequest *mdr, Message *req);
   int path_traverse(MDRequest *mdr, Message *req, 
@@ -518,6 +531,10 @@ protected:
                        int from,
                        list<Context*>& finished);
   CDir* forge_replica_dir(CInode *diri, frag_t fg, int from);
+public:
+  CDentry *add_replica_stray(bufferlist &bl, CInode *strayin, int from);
+protected:
+
     
 
   // -- namespace --
index 5292888baff898dc5cfa59f19bf0debee0ddf252..33d24b30d2cde2c1b1c93a29a989c062d5f29c4d 100644 (file)
@@ -605,9 +605,9 @@ void MDS::handle_mds_map(MMDSMap *m)
     dout(10) << "i am newly resolving, sharing import map" << endl;
     set<int> who;
     mdsmap->get_mds_set(who, MDSMap::STATE_RESOLVE);
+    mdsmap->get_mds_set(who, MDSMap::STATE_REJOIN);
     mdsmap->get_mds_set(who, MDSMap::STATE_ACTIVE);
     mdsmap->get_mds_set(who, MDSMap::STATE_STOPPING);
-    mdsmap->get_mds_set(who, MDSMap::STATE_REJOIN);     // hrm. FIXME.
     for (set<int>::iterator p = who.begin(); p != who.end(); ++p) {
       if (*p == whoami) continue;
       mdcache->send_import_map(*p);  // now.
index 50e930fb304ff9295077afd41d2485a83abaef1b..bc15a2c01ed79801f816af58340dacd7f9c49a25 100644 (file)
@@ -599,6 +599,12 @@ void Server::dispatch_slave_request(MDRequest *mdr)
 {
   dout(7) << "dispatch_slave_request " << *mdr << " " << *mdr->slave_request << endl;
 
+  if (mdr->aborted) {
+    dout(7) << " abort flag set, finishing" << endl;
+    mdcache->request_finish(mdr);
+    return;
+  }
+
   switch (mdr->slave_request->get_op()) {
   case MMDSSlaveRequest::OP_XLOCK:
     {
@@ -628,7 +634,7 @@ void Server::dispatch_slave_request(MDRequest *mdr)
                   << *lock << " on " << *lock->get_parent() << endl;
        } else {
          dout(10) << "don't have object, dropping" << endl;
-         assert(0); // can this happen?  hmm.
+         assert(0); // can this happen, if we auth pinned properly.
        }
       }
 
@@ -669,13 +675,6 @@ void Server::dispatch_slave_request(MDRequest *mdr)
     break;
 
   case MMDSSlaveRequest::OP_FINISH:
-    // slave finisher?
-    if (mdr->slave_commit) {
-      mdr->slave_commit->finish(0);
-      delete mdr->slave_commit;
-      mdr->slave_commit = 0;
-    }
-
     // finish off request.
     mdcache->request_finish(mdr);
     break;
@@ -2089,7 +2088,7 @@ void Server::handle_slave_link_prep(MDRequest *mdr)
   }
 
   // journal it
-  ESlaveUpdate *le = new ESlaveUpdate("slave_link_prep", mdr->reqid, ESlaveUpdate::OP_PREPARE);
+  ESlaveUpdate *le = new ESlaveUpdate("slave_link_prep", mdr->reqid, mdr->slave_to_mds, ESlaveUpdate::OP_PREPARE);
 
   version_t tpv = targeti->pre_dirty();
 
@@ -2122,8 +2121,7 @@ public:
   C_MDS_SlaveLinkCommit(Server *s, MDRequest *r, CInode *t, version_t v, bool in) :
     server(s), mdr(r), targeti(t), tpv(v), inc(in) { }
   void finish(int r) {
-    assert(r == 0);
-    server->_commit_slave_link(mdr, targeti, tpv, inc);
+    server->_commit_slave_link(mdr, r, targeti, tpv, inc);
   }
 };
 
@@ -2145,22 +2143,33 @@ void Server::_logged_slave_link(MDRequest *mdr, CInode *targeti, version_t tpv,
   mdr->slave_request = 0;
 }
 
-void Server::_commit_slave_link(MDRequest *mdr, CInode *targeti, version_t tpv, bool inc)
+void Server::_commit_slave_link(MDRequest *mdr, int r, CInode *targeti, version_t tpv, bool inc)
 {  
   dout(10) << "_commit_slave_link " << *mdr
+          << " r=" << r
           << " inc=" << inc
           << " " << *targeti << endl;
 
-  // update the target
-  if (inc)
-    targeti->inode.nlink++;
-  else
-    targeti->inode.nlink--;
-  targeti->inode.ctime = mdr->now;
-  targeti->mark_dirty(tpv);
+  ESlaveUpdate *le;
+  
+  if (r == 0) {
+    // commit.
+
+    // update the target
+    if (inc)
+      targeti->inode.nlink++;
+    else
+      targeti->inode.nlink--;
+    targeti->inode.ctime = mdr->now;
+    targeti->mark_dirty(tpv);
+    
+    // write a commit to the journal
+    le = new ESlaveUpdate("slave_link_commit", mdr->reqid, mdr->slave_to_mds, ESlaveUpdate::OP_COMMIT);
+  } else {
+    // abort
+    le = new ESlaveUpdate("slave_link_abort", mdr->reqid, mdr->slave_to_mds, ESlaveUpdate::OP_ABORT);
+  }
 
-  // write a commit to the journal
-  ESlaveUpdate *le = new ESlaveUpdate("slave_link_commit", mdr->reqid, ESlaveUpdate::OP_COMMIT);
   mds->mdlog->submit_entry(le);
 }
 
@@ -2280,19 +2289,24 @@ void Server::handle_client_unlink(MDRequest *mdr)
 
   // yay!
   mdr->done_locking = true;  // avoid wrlock racing
-
   if (mdr->now == utime_t())
     mdr->now = g_clock.real_now();
 
   // get stray dn ready?
   CDentry *straydn = 0;
   if (dn->is_primary()) {
-    string straydname;
-    dn->inode->name_stray_dentry(straydname);
-    frag_t fg = mdcache->get_stray()->pick_dirfrag(straydname);
-    CDir *straydir = mdcache->get_stray()->get_or_open_dirfrag(mdcache, fg);
-    straydn = straydir->add_dentry(straydname, 0);
+    straydn = mdcache->get_or_create_stray_dentry(dn->inode);
     dout(10) << " straydn is " << *straydn << endl;
+
+    if (!mdr->dst_reanchor_atid &&
+       dn->inode->is_anchored()) {
+      dout(10) << "reanchoring to stray " << *dn->inode << endl;
+      vector<Anchor> trace;
+      straydn->make_anchor_trace(trace, dn->inode);
+      mds->anchorclient->prepare_update(dn->inode->ino(), trace, &mdr->dst_reanchor_atid, 
+                                       new C_MDS_RetryRequest(mdcache, mdr));
+      return;
+    }
   }
 
   // ok!
@@ -2360,7 +2374,10 @@ void Server::_unlink_local(MDRequest *mdr, CDentry *dn, CDentry *straydn)
   pi->nlink--;
   pi->ctime = mdr->now;
   pi->version = ipv;
-  
+
+  if (mdr->dst_reanchor_atid)
+    le->metablob.add_anchor_transaction(mdr->dst_reanchor_atid);
+
   // finisher
   C_MDS_unlink_local_finish *fin = new C_MDS_unlink_local_finish(mds, mdr, dn, straydn, 
                                                                 ipv, dirpv);
@@ -2412,11 +2429,15 @@ void Server::_unlink_local_finish(MDRequest *mdr,
     }
     mds->send_message_mds(unlink, it->first, MDS_PORT_CACHE);
   }
-
+  
   // reply
   MClientReply *reply = new MClientReply(mdr->client_request, 0);
   reply_request(mdr, reply, dn->dir->get_inode());  // FIXME: imprecise ref
   
+  // commit anchor update?
+  if (mdr->dst_reanchor_atid) 
+    mds->anchorclient->commit(mdr->dst_reanchor_atid);
+
   // clean up?
   if (straydn)
     mdcache->eval_stray(straydn);
@@ -2471,6 +2492,9 @@ void Server::_unlink_remote(MDRequest *mdr, CDentry *dn)
   le->metablob.add_dir_context(dn->get_dir());
   le->metablob.add_null_dentry(dn, true);
 
+  if (mdr->dst_reanchor_atid)
+    le->metablob.add_anchor_transaction(mdr->dst_reanchor_atid);
+
   // finisher
   C_MDS_unlink_remote_finish *fin = new C_MDS_unlink_remote_finish(mds, mdr, dn, dirpv);
   
@@ -2514,6 +2538,10 @@ void Server::_unlink_remote_finish(MDRequest *mdr,
   // reply
   MClientReply *reply = new MClientReply(mdr->client_request, 0);
   reply_request(mdr, reply, dn->dir->get_inode());  // FIXME: imprecise ref
+
+  // commit anchor update?
+  if (mdr->dst_reanchor_atid) 
+    mds->anchorclient->commit(mdr->dst_reanchor_atid);
 }
 
 
@@ -2727,6 +2755,13 @@ void Server::handle_client_rename(MDRequest *mdr)
   if (mdr->now == utime_t())
     mdr->now = g_clock.real_now();
 
+  // -- create stray dentry? --
+  CDentry *straydn = 0;
+  if (destdn->is_primary()) {
+    straydn = mdcache->get_or_create_stray_dentry(destdn->inode);
+    dout(10) << "straydn is " << *straydn << endl;
+  }
+
   // -- prepare witnesses --
   set<int> witnesses = mdr->extra_witnesses;
   if (srcdn->is_auth())
@@ -2746,6 +2781,18 @@ void Server::handle_client_rename(MDRequest *mdr)
       srcdn->make_path(req->srcdnpath);
       destdn->make_path(req->destdnpath);
       req->now = mdr->now;
+
+      if (straydn) {
+       CInodeDiscover *indis = straydn->dir->inode->replicate_to(*p);
+       CDirDiscover *dirdis = straydn->dir->replicate_to(*p);
+       CDentryDiscover *dndis = straydn->replicate_to(*p);
+       indis->_encode(req->stray);
+       dirdis->_encode(req->stray);
+       dndis->_encode(req->stray);
+       delete dirdis;
+       delete dndis;
+      }
+
       mds->send_message_mds(req, *p, MDS_PORT_SERVER);
 
       assert(mdr->waiting_on_slave.count(*p) == 0);
@@ -2773,12 +2820,6 @@ void Server::handle_client_rename(MDRequest *mdr)
     dout(10) << " already (just!) got inode export from srcdn auth" << endl;
   }
   
-  // -- prepare journal entry --
-  EUpdate *le = new EUpdate("rename");
-  le->metablob.add_client_req(mdr->reqid);
-  
-  CDentry *straydn = _rename_prepare(mdr, &le->metablob, srcdn, destdn);
-
   // -- prepare anchor updates -- 
   bool linkmerge = (srcdn->inode == destdn->inode &&
                    (srcdn->is_primary() || destdn->is_primary()));
@@ -2801,6 +2842,8 @@ void Server::handle_client_rename(MDRequest *mdr)
        destdn->inode->is_anchored() &&
        !mdr->dst_reanchor_atid) {
       dout(10) << "reanchoring dst->stray " << *destdn->inode << endl;
+
+      assert(straydn);
       vector<Anchor> trace;
       straydn->make_anchor_trace(trace, destdn->inode);
       
@@ -2814,11 +2857,18 @@ void Server::handle_client_rename(MDRequest *mdr)
       return;  // waiting for anchor prepares
   }
 
+  // -- prepare journal entry --
+  EUpdate *le = new EUpdate("rename");
+  le->metablob.add_client_req(mdr->reqid);
+  
+  _rename_prepare(mdr, &le->metablob, srcdn, destdn, straydn);
 
   // -- commit locally --
   C_MDS_rename_finish *fin = new C_MDS_rename_finish(mds, mdr, srcdn, destdn, straydn);
 
   // and apply!
+  // we do this now because we may also be importing an inode, and the locker is currently
+  // depending on a this happening quickly.
   _rename_apply(mdr, srcdn, destdn, straydn);
 
   journal_opens();  // journal pending opens, just in case
@@ -2856,9 +2906,9 @@ void Server::_rename_finish(MDRequest *mdr, CDentry *srcdn, CDentry *destdn, CDe
 
 // helpers
 
-CDentry *Server::_rename_prepare(MDRequest *mdr,
-                                EMetaBlob *metablob, 
-                                CDentry *srcdn, CDentry *destdn)
+void Server::_rename_prepare(MDRequest *mdr,
+                            EMetaBlob *metablob, 
+                            CDentry *srcdn, CDentry *destdn, CDentry *straydn)
 {
   dout(10) << "_rename_prepare " << *mdr << " " << *srcdn << " " << *destdn << endl;
 
@@ -2868,7 +2918,6 @@ CDentry *Server::_rename_prepare(MDRequest *mdr,
 
   inode_t *pi = 0; // inode getting nlink--
   version_t ipv;   // it's version
-  CDentry *straydn = 0;
   
   if (linkmerge) {
     dout(10) << "will merge remote+primary links" << endl;
@@ -2886,18 +2935,10 @@ CDentry *Server::_rename_prepare(MDRequest *mdr,
     metablob->add_null_dentry(srcdn, true);
 
   } else {
-
     // move to stray?
     if (destdn->is_primary()) {
-      // primary.
-      // move inode to stray dir.
-      string straydname;
-      destdn->inode->name_stray_dentry(straydname);
-      frag_t fg = mdcache->get_stray()->pick_dirfrag(straydname);
-      CDir *straydir = mdcache->get_stray()->get_or_open_dirfrag(mdcache, fg);
-      straydn = straydir->add_dentry(straydname, 0);
-      dout(10) << "straydn is " << *straydn << endl;
-      mdr->pin(straydn);
+      // primary.  we'll move inode to stray dir.
+      assert(straydn);
 
       // link-- inode, move to stray dir.
       metablob->add_dir_context(straydn->dir);
@@ -2909,7 +2950,7 @@ CDentry *Server::_rename_prepare(MDRequest *mdr,
       // remote.
       // nlink-- targeti
       metablob->add_dir_context(destdn->inode->get_parent_dir());
-      if (destdn->is_auth())
+      if (destdn->inode->is_auth())
        ipv = mdr->pvmap[destdn->inode] = destdn->inode->pre_dirty();
       pi = metablob->add_primary_dentry(destdn->inode->parent, true, destdn->inode);  // update primary
       dout(10) << "remote targeti (nlink--) is " << *destdn->inode << endl;
@@ -2950,7 +2991,11 @@ CDentry *Server::_rename_prepare(MDRequest *mdr,
     pi->version = ipv;
   }
 
-  return straydn;
+  // anchor updates?
+  if (mdr->src_reanchor_atid)
+    metablob->add_anchor_transaction(mdr->src_reanchor_atid);
+  if (mdr->dst_reanchor_atid)
+    metablob->add_anchor_transaction(mdr->dst_reanchor_atid);
 }
 
 
@@ -3048,7 +3093,7 @@ void Server::_rename_apply(MDRequest *mdr, CDentry *srcdn, CDentry *destdn, CDen
       srcdn->mark_dirty(mdr->pvmap[srcdn]);
 
     // srcdn inode import?
-    if (!srcdn->is_auth() && destdn->is_auth()) {
+    if (!srcdn->is_auth() && destdn->is_primary() && destdn->is_auth()) {
       assert(mdr->inode_import.length() > 0);
       int off = 0;
       mdcache->migrator->decode_import_inode(destdn, mdr->inode_import, off, 
@@ -3057,7 +3102,7 @@ void Server::_rename_apply(MDRequest *mdr, CDentry *srcdn, CDentry *destdn, CDen
   }
 
   // update subtree map?
-  if (destdn->inode->is_dir()) 
+  if (destdn->is_primary() && destdn->inode->is_dir()) 
     mdcache->adjust_subtree_after_rename(destdn->inode, srcdn->dir);
 }
 
@@ -3088,7 +3133,7 @@ public:
   C_MDS_SlaveRenameCommit(Server *s, MDRequest *m, CDentry *sr, CDentry *de, CDentry *st) :
     server(s), mdr(m), srcdn(sr), destdn(de), straydn(st) {}
   void finish(int r) {
-    server->_commit_slave_rename(mdr, srcdn, destdn, straydn);
+    server->_commit_slave_rename(mdr, r, srcdn, destdn, straydn);
   }
 };
 
@@ -3126,30 +3171,21 @@ void Server::handle_slave_rename_prep(MDRequest *mdr)
   dout(10) << " srcdn " << *srcdn << endl;
   mdr->pin(srcdn);
 
-  // open destdn stray dirfrag?
+  // stray?
+  CDentry *straydn = 0;
   if (destdn->is_primary()) {
-    CInode *dstray = mdcache->get_inode(MDS_INO_STRAY(mdr->slave_to_mds));
-    if (!dstray) {
-      mdcache->open_foreign_stray(mdr->slave_to_mds, new C_MDS_RetryRequest(mdcache, mdr));
-      return;
-    }
-    
-    string straydname;
-    destdn->inode->name_stray_dentry(straydname);
-    frag_t fg = dstray->pick_dirfrag(straydname);
-    CDir *straydir = dstray->get_dirfrag(fg);
-    if (!straydir) {
-      mdcache->open_remote_dir(dstray, fg, new C_MDS_RetryRequest(mdcache, mdr));
-      return;
-    }
-    dout(10) << " straydir is " << *straydir << endl;
+    assert(mdr->slave_request->stray.length() > 0);
+    straydn = mdcache->add_replica_stray(mdr->slave_request->stray, 
+                                        destdn->inode, mdr->slave_to_mds);
+    assert(straydn);
+    mdr->pin(straydn);
   }
 
   // journal it
-  ESlaveUpdate *le = new ESlaveUpdate("slave_rename_prep", mdr->reqid, ESlaveUpdate::OP_PREPARE);
+  ESlaveUpdate *le = new ESlaveUpdate("slave_rename_prep", mdr->reqid, mdr->slave_to_mds, ESlaveUpdate::OP_PREPARE);
   
   mdr->now = mdr->slave_request->now;
-  CDentry *straydn = _rename_prepare(mdr, &le->metablob, srcdn, destdn);
+  _rename_prepare(mdr, &le->metablob, srcdn, destdn, straydn);
 
   mds->mdlog->submit_entry(le, new C_MDS_SlaveRenamePrep(this, mdr, srcdn, destdn, straydn));
 }
@@ -3179,14 +3215,22 @@ void Server::_logged_slave_rename(MDRequest *mdr,
   mdr->slave_request = 0;
 }
 
-void Server::_commit_slave_rename(MDRequest *mdr, 
+void Server::_commit_slave_rename(MDRequest *mdr, int r,
                                  CDentry *srcdn, CDentry *destdn, CDentry *straydn)
 {
-  dout(10) << "_commit_slave_rename " << *mdr << endl;
-  _rename_apply(mdr, srcdn, destdn, straydn);
+  dout(10) << "_commit_slave_rename " << *mdr << " r=" << r << endl;
 
-  // write a commit to the journal
-  ESlaveUpdate *le = new ESlaveUpdate("slave_rename_commit", mdr->reqid, ESlaveUpdate::OP_COMMIT);
+  ESlaveUpdate *le;
+  if (r == 0) {
+    // commit
+    _rename_apply(mdr, srcdn, destdn, straydn);
+    
+    // write a commit to the journal
+    le = new ESlaveUpdate("slave_rename_commit", mdr->reqid, mdr->slave_to_mds, ESlaveUpdate::OP_COMMIT);
+  } else {
+    // abort
+    le = new ESlaveUpdate("slave_rename_abort", mdr->reqid, mdr->slave_to_mds, ESlaveUpdate::OP_ABORT);
+  }
   mds->mdlog->submit_entry(le);
 }
 
index d14bf069e1d976c671dc7cc3d9257873d4f36792..b7e2197c522e5463e8ceca33d470b8929e400f2c 100644 (file)
@@ -129,7 +129,7 @@ public:
 
   void handle_slave_link_prep(MDRequest *mdr);
   void _logged_slave_link(MDRequest *mdr, CInode *targeti, version_t tpv, bool inc);
-  void _commit_slave_link(MDRequest *mdr, CInode *targeti, version_t tpv, bool inc);
+  void _commit_slave_link(MDRequest *mdr, int r, CInode *targeti, version_t tpv, bool inc);
   void handle_slave_link_prep_ack(MDRequest *mdr, MMDSSlaveRequest *m);
 
   // unlink
@@ -151,16 +151,16 @@ public:
                      CDentry *srcdn, CDentry *destdn, CDentry *straydn);
 
   // helpers
-  CDentry *_rename_prepare(MDRequest *mdr,
-                          EMetaBlob *metablob, 
-                          CDentry *srcdn, CDentry *destdn);
+  void _rename_prepare(MDRequest *mdr,
+                      EMetaBlob *metablob, 
+                      CDentry *srcdn, CDentry *destdn, CDentry *straydn);
   void _rename_apply(MDRequest *mdr, CDentry *srcdn, CDentry *destdn, CDentry *straydn); 
 
   // slaving
   void handle_slave_rename_prep(MDRequest *mdr);
   void handle_slave_rename_prep_ack(MDRequest *mdr, MMDSSlaveRequest *m);
   void _logged_slave_rename(MDRequest *mdr, CDentry *srcdn, CDentry *destdn, CDentry *straydn);
-  void _commit_slave_rename(MDRequest *mdr, CDentry *srcdn, CDentry *destdn, CDentry *straydn);
+  void _commit_slave_rename(MDRequest *mdr, int r, CDentry *srcdn, CDentry *destdn, CDentry *straydn);
   void handle_slave_rename_get_inode(MDRequest *mdr);
   void handle_slave_rename_get_inode_ack(MDRequest *mdr, MMDSSlaveRequest *m);
 
index 6550d2838297fd5c2ecd5788dac4ff21853e9fa9..f83dc87778a0f4efc9ff0f7a43a8e7575bdf7d4c 100644 (file)
@@ -26,14 +26,16 @@ public:
   
   string type;
   metareqid_t reqid;
+  int master;
   int op;  // prepare, commit, abort
   EMetaBlob metablob;
 
   ESlaveUpdate() : LogEvent(EVENT_SLAVEUPDATE) { }
-  ESlaveUpdate(const char *s, metareqid_t ri, int o) : 
+  ESlaveUpdate(const char *s, metareqid_t ri, int mastermds, int o) : 
     LogEvent(EVENT_SLAVEUPDATE),
     type(s),
     reqid(ri),
+    master(mastermds),
     op(o) { }
   
   void print(ostream& out) {
@@ -41,18 +43,21 @@ public:
       out << type << " ";
     out << " " << op;
     out << " " << reqid;
+    out << " for mds" << master;
     out << metablob;
   }
 
   void encode_payload(bufferlist& bl) {
     ::_encode(type, bl);
     ::_encode(reqid, bl);
+    ::_encode(master, bl);
     ::_encode(op, bl);
     metablob._encode(bl);
   } 
   void decode_payload(bufferlist& bl, int& off) {
     ::_decode(type, bl, off);
     ::_decode(reqid, bl, off);
+    ::_decode(master, bl, off);
     ::_decode(op, bl, off);
     metablob._decode(bl, off);
   }
index 5c522d2fa2850bf627e634858ab3f6165837426d..da57c55e471e2ddda3935eb3a17d0350a3383b7e 100644 (file)
@@ -731,29 +731,34 @@ void ESlaveUpdate::replay(MDS *mds)
 {
   switch (op) {
   case ESlaveUpdate::OP_PREPARE:
-    // FIXME: horribly inefficient
-    dout(10) << "ESlaveUpdate.replay prepare " << reqid << ": saving blob for later commit" << endl;
-    assert(mds->mdcache->uncommitted_slave_updates.count(reqid) == 0);
-    mds->mdcache->uncommitted_slave_updates[reqid] = metablob;
+    // FIXME: horribly inefficient copy; EMetaBlob needs a swap() or something
+    dout(10) << "ESlaveUpdate.replay prepare " << reqid << " for mds" << master 
+            << ": saving blob for later commit" << endl;
+    assert(mds->mdcache->uncommitted_slave_updates[master].count(reqid) == 0);
+    mds->mdcache->uncommitted_slave_updates[master][reqid] = metablob;
     break;
 
   case ESlaveUpdate::OP_COMMIT:
-    if (mds->mdcache->uncommitted_slave_updates.count(reqid)) {
-      dout(10) << "ESlaveUpdate.replay commit " << reqid << ": applying previously saved blob" << endl;
-      mds->mdcache->uncommitted_slave_updates[reqid].replay(mds);
-      mds->mdcache->uncommitted_slave_updates.erase(reqid);
+    if (mds->mdcache->uncommitted_slave_updates[master].count(reqid)) {
+      dout(10) << "ESlaveUpdate.replay commit " << reqid << " for mds" << master
+              << ": applying previously saved blob" << endl;
+      mds->mdcache->uncommitted_slave_updates[master][reqid].replay(mds);
+      mds->mdcache->uncommitted_slave_updates[master].erase(reqid);
     } else {
-      dout(10) << "ESlaveUpdate.replay commit " << reqid << ": ignoring, no previously saved blob" << endl;
+      dout(10) << "ESlaveUpdate.replay commit " << reqid << " for mds" << master 
+              << ": ignoring, no previously saved blob" << endl;
     }
     break;
 
   case ESlaveUpdate::OP_ABORT:
-    if (mds->mdcache->uncommitted_slave_updates.count(reqid)) {
-      dout(10) << "ESlaveUpdate.replay abort " << reqid << ": discarding previously saved blob" << endl;
-      assert(mds->mdcache->uncommitted_slave_updates.count(reqid));
-      mds->mdcache->uncommitted_slave_updates.erase(reqid);
+    if (mds->mdcache->uncommitted_slave_updates[master].count(reqid)) {
+      dout(10) << "ESlaveUpdate.replay abort " << reqid << " for mds" << master
+              << ": discarding previously saved blob" << endl;
+      assert(mds->mdcache->uncommitted_slave_updates[master].count(reqid));
+      mds->mdcache->uncommitted_slave_updates[master].erase(reqid);
     } else {
-      dout(10) << "ESlaveUpdate.replay abort " << reqid << ": ignoring, no previously saved blob" << endl;
+      dout(10) << "ESlaveUpdate.replay abort " << reqid << " for mds" << master 
+              << ": ignoring, no previously saved blob" << endl;
     }
     break;
 
index 4367e8177a0522bf45e46acad964bf68bddf98a6..5821bc85db38e26b8866f8928478327b72552a37 100644 (file)
@@ -76,7 +76,8 @@ class MDiscoverReply : public Message {
   bool flag_error_ino;
   bool        flag_error_dir;
   string      error_dentry;   // dentry that was not found (to trigger waiters on asker)
-  int         dir_auth_hint;
+  int dir_auth_hint;
+  bool wanted_xlocks_hint;
   
   vector<CDirDiscover*>    dirs;      // not inode-aligned if no_base_dir = true.
   vector<CDentryDiscover*> dentries;  // not inode-aligned if no_base_dentry = true
@@ -113,6 +114,9 @@ class MDiscoverReply : public Message {
   bool is_flag_error_dir() { return flag_error_dir; }
   string& get_error_dentry() { return error_dentry; }
   int get_dir_auth_hint() { return dir_auth_hint; }
+  bool get_wanted_xlocks_hint() { return wanted_xlocks_hint; }
+
+  void set_wanted_xlocks_hint(bool w) { wanted_xlocks_hint = w; }
 
   // these index _arguments_ are aligned to each ([[dir, ] dentry, ] inode) set.
   CInodeDiscover& get_inode(int n) { return *(inodes[n]); }
@@ -199,6 +203,7 @@ class MDiscoverReply : public Message {
     ::_decode(flag_error_dir, payload, off);
     ::_decode(error_dentry, payload, off);
     ::_decode(dir_auth_hint, payload, off);
+    ::_decode(wanted_xlocks_hint, payload, off);
     
     // dirs
     int n;
@@ -236,6 +241,7 @@ class MDiscoverReply : public Message {
     ::_encode(flag_error_dir, payload);
     ::_encode(error_dentry, payload);
     ::_encode(dir_auth_hint, payload);
+    ::_encode(wanted_xlocks_hint, payload);
 
     // dirs
     int n = dirs.size();
index 49d4a9b35190ec9b4ad63b8e4d9447864c03afdd..d83d5681ad71e9d98eaf705b534ef3aad7e3f211 100644 (file)
@@ -24,7 +24,7 @@ class MMDSImportMap : public Message {
  public:
   map<dirfrag_t, list<dirfrag_t> > imap;
   map<dirfrag_t, list<dirfrag_t> > ambiguous_imap;
-  list<metareqid_t> master_requests;
+  list<metareqid_t> slave_requests;
 
   MMDSImportMap() : Message(MSG_MDS_IMPORTMAP) {}
 
@@ -33,7 +33,7 @@ class MMDSImportMap : public Message {
   void print(ostream& out) {
     out << "mdsimportmap(" << imap.size()
        << "+" << ambiguous_imap.size()
-       << " imports +" << master_requests.size() << " requests)";
+       << " imports +" << slave_requests.size() << " slave requests)";
   }
   
   void add_import(dirfrag_t im) {
@@ -47,20 +47,20 @@ class MMDSImportMap : public Message {
     ambiguous_imap[im] = m;
   }
 
-  void add_master_request(metareqid_t reqid) {
-    master_requests.push_back(reqid);
+  void add_slave_request(metareqid_t reqid) {
+    slave_requests.push_back(reqid);
   }
 
   void encode_payload() {
     ::_encode(imap, payload);
     ::_encode(ambiguous_imap, payload);
-    ::_encode(master_requests, payload);
+    ::_encode(slave_requests, payload);
   }
   void decode_payload() {
     int off = 0;
     ::_decode(imap, payload, off);
     ::_decode(ambiguous_imap, payload, off);
-    ::_decode(master_requests, payload, off);
+    ::_decode(slave_requests, payload, off);
   }
 };
 
diff --git a/branches/sage/cephmds2/messages/MMDSResolveAck.h b/branches/sage/cephmds2/messages/MMDSResolveAck.h
new file mode 100644 (file)
index 0000000..1870e22
--- /dev/null
@@ -0,0 +1,56 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software 
+ * Foundation.  See file COPYING.
+ * 
+ */
+
+#ifndef __MMDSRESOLVEACK_H
+#define __MMDSRESOLVEACK_H
+
+#include "msg/Message.h"
+
+#include "include/types.h"
+
+
+class MMDSResolveAck : public Message {
+ public:
+  list<metareqid_t> commit;
+  list<metareqid_t> abort;
+
+  MMDSResolveAck() : Message(MSG_MDS_RESOLVEACK) {}
+
+  char *get_type_name() { return "resolve_ack"; }
+  /*void print(ostream& out) {
+    out << "resolve_ack.size()
+       << "+" << ambiguous_imap.size()
+       << " imports +" << slave_requests.size() << " slave requests)";
+  }
+  */
+  
+  void add_commit(metareqid_t r) {
+    commit.push_back(r);
+  }
+  void add_abort(metareqid_t r) {
+    abort.push_back(r);
+  }
+
+  void encode_payload() {
+    ::_encode(commit, payload);
+    ::_encode(abort, payload);
+  }
+  void decode_payload() {
+    int off = 0;
+    ::_decode(commit, payload, off);
+    ::_decode(abort, payload, off);
+  }
+};
+
+#endif
index 1e540a4b0101879fc228162c913ac6e226340762..e2dbbd8f7298aeae45873203d1a8ef2e38e0201b 100644 (file)
@@ -38,7 +38,11 @@ class MMDSSlaveRequest : public Message {
   static const int OP_RENAMEGETINODE =     8;
   static const int OP_RENAMEGETINODEACK = -8;
 
-  static const int OP_FINISH =     17;
+  static const int OP_FINISH = 17;  
+
+  static const int OP_ABORT =  20;  // used for recovery only
+  //static const int OP_COMMIT = 21;  // used for recovery only
+
 
   const static char *get_opname(int o) {
     switch (o) { 
@@ -58,6 +62,9 @@ class MMDSSlaveRequest : public Message {
     case OP_RENAMEGETINODEACK: return "rename_get_inode_ack";
 
     case OP_FINISH: return "finish"; // commit
+    case OP_ABORT: return "abort";
+      //case OP_COMMIT: return "commit";
+
     default: assert(0); return 0;
     }
   }
@@ -82,6 +89,8 @@ class MMDSSlaveRequest : public Message {
   version_t inode_export_v;
   utime_t now;
 
+  bufferlist stray;  // stray dir + dentry
+
 public:
   metareqid_t get_reqid() { return reqid; }
   int get_op() { return op; }
@@ -111,6 +120,7 @@ public:
     ::_encode(now, payload);
     ::_encode(inode_export, payload);
     ::_encode(inode_export_v, payload);
+    ::_encode(stray, payload);
   }
   void decode_payload() {
     int off = 0;
@@ -125,6 +135,7 @@ public:
     ::_decode(now, payload, off);
     ::_decode(inode_export, payload, off);
     ::_decode(inode_export_v, payload, off);
+    ::_decode(stray, payload, off);
   }
 
   char *get_type_name() { return "slave_request"; }
index 74b5c8a22b559f7729382062300481ae424ca6c4..716fafb49171991f31e888e7a243085ef36318aa 100644 (file)
@@ -53,6 +53,7 @@ using namespace std;
 #include "messages/MMDSMap.h"
 #include "messages/MMDSBeacon.h"
 #include "messages/MMDSImportMap.h"
+#include "messages/MMDSResolveAck.h"
 #include "messages/MMDSCacheRejoin.h"
 //#include "messages/MMDSCacheRejoinAck.h"
 
@@ -225,6 +226,9 @@ decode_message(msg_envelope_t& env, bufferlist& payload)
   case MSG_MDS_IMPORTMAP:
        m = new MMDSImportMap;
        break;
+  case MSG_MDS_RESOLVEACK:
+       m = new MMDSResolveAck;
+       break;
   case MSG_MDS_CACHEREJOIN:
        m = new MMDSCacheRejoin;
        break;
index e3f1786133e69665b992ecc6771cf9f2316f44e5..6d66e713cb1fe9fb74152b216dff3688db78253c 100644 (file)
@@ -86,7 +86,9 @@
 #define MSG_MDS_BEACON             105  // to monitor
 
 #define MSG_MDS_IMPORTMAP          106
-#define MSG_MDS_CACHEREJOIN        107
+#define MSG_MDS_RESOLVEACK         107
+
+#define MSG_MDS_CACHEREJOIN        108
 
 #define MSG_MDS_DISCOVER           110
 #define MSG_MDS_DISCOVERREPLY      111
index c02c12922ed7be6d9e8056ae603d17e27f856e56..d37fb109a48daeb91ef5c223f2dd8b9935c5d52d 100755 (executable)
@@ -9,16 +9,19 @@ while (<>) {
        #cdir:adjust_nested_auth_pins on [dir 163 /foo/ rep@13 | child] count now 0 + 1
 
        if (/adjust_nested_auth_pins/) {
-               my ($what) = /\[(\w+ \d+) /;
+               my ($what) = / (\w+)\]/;
+               $what =~ s/ 0x/ /;
                $hist{$what} .= "$l: $_"
                        if defined $pin{$what};
        }
 
        # cinode:auth_pin on inode [1000000002625 /gnu/blah_client_created. 0x89b7700] count now 1 + 0
 
-       if (/auth_pin /) {
-               my ($what) = /\[(\w+ \d+) /;
-#              print "add_waiter $c $what\n";
+       elsif (/auth_pin / && !/waiting/) {
+               #my ($what) = /\[(\w+ \w+) /;
+               my ($what) = / (\w+)\]/;
+               $what =~ s/ 0x/ /;
+               #print "$_ add_waiter $c $what\n";
                $pin{$what}++;
                $hist{$what} .= "$l: $_";
                push( @pins, $what ) unless grep {$_ eq $what} @pins;
@@ -26,8 +29,10 @@ while (<>) {
 
        # cinode:auth_unpin on inode [1000000002625 (dangling) 0x89b7700] count now 0 + 0
 
-       if (/auth_unpin/) {
-               my ($what) = /\[(\w+ \d+) /;# / on (.*\])/;
+       elsif (/auth_unpin/) {
+               #my ($what) = /\[(\w+ \w+) /;# / on (.*\])/;
+               my ($what) = / (\w+)\]/;
+               $what =~ s/ 0x/ /;
                $pin{$what}--;
                $hist{$what} .= "$l: $_";
                unless ($pin{$what}) {