]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
migrator export now makes second pass over subtree to delay auth change, dirty->clean...
authorsageweil <sageweil@29311d96-e01e-0410-9327-a35deaab8ce9>
Tue, 2 Oct 2007 00:16:59 +0000 (00:16 +0000)
committersageweil <sageweil@29311d96-e01e-0410-9327-a35deaab8ce9>
Tue, 2 Oct 2007 00:16:59 +0000 (00:16 +0000)
git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@1878 29311d96-e01e-0410-9327-a35deaab8ce9

17 files changed:
branches/sage/mds/mds/AnchorClient.cc
branches/sage/mds/mds/AnchorClient.h
branches/sage/mds/mds/CDentry.cc
branches/sage/mds/mds/CDentry.h
branches/sage/mds/mds/CDir.cc
branches/sage/mds/mds/CDir.h
branches/sage/mds/mds/CInode.cc
branches/sage/mds/mds/Locker.cc
branches/sage/mds/mds/LogSegment.h
branches/sage/mds/mds/MDCache.cc
branches/sage/mds/mds/MDCache.h
branches/sage/mds/mds/Migrator.cc
branches/sage/mds/mds/Migrator.h
branches/sage/mds/mds/Server.cc
branches/sage/mds/mds/journal.cc
branches/sage/mds/mds/mdstypes.h
branches/sage/mds/messages/MExportDir.h

index 3ae9db25ffd2e6b8c9c74fb65914baa520334106..b2fb1fb50d7bd45b21f66592e6807d2880282fbc 100644 (file)
@@ -25,6 +25,7 @@ using std::cerr;
 
 #include "MDS.h"
 #include "MDLog.h"
+#include "LogSegment.h"
 
 #include "events/EAnchorClient.h"
 #include "messages/MAnchor.h"
@@ -79,8 +80,6 @@ void AnchorClient::handle_anchor_reply(class MAnchor *m)
       *pending_create_prepare[ino].patid = atid;
       pending_create_prepare.erase(ino);
 
-      pending_commit.insert(atid);
-      
       if (onfinish) {
         onfinish->finish(0);
         delete onfinish;
@@ -115,8 +114,6 @@ void AnchorClient::handle_anchor_reply(class MAnchor *m)
       *pending_destroy_prepare[ino].patid = atid;
       pending_destroy_prepare.erase(ino);
 
-      pending_commit.insert(atid);
-
       if (onfinish) {
         onfinish->finish(0);
         delete onfinish;
@@ -151,8 +148,6 @@ void AnchorClient::handle_anchor_reply(class MAnchor *m)
       *pending_update_prepare[ino].patid = atid;
       pending_update_prepare.erase(ino);
 
-      pending_commit.insert(atid);
-
       if (onfinish) {
         onfinish->finish(0);
         delete onfinish;
@@ -187,17 +182,11 @@ void AnchorClient::handle_anchor_reply(class MAnchor *m)
 
       // remove from committing list
       assert(pending_commit.count(atid));
-      pending_commit.erase(atid);
-
+      assert(pending_commit[atid]->pending_commit_atids.count(atid));
+      
       // log ACK.
-      mds->mdlog->submit_entry(new EAnchorClient(ANCHOR_OP_ACK, atid));
-
-      // kick any waiters
-      if (ack_waiters.count(atid)) {
-       dout(15) << "kicking waiters on atid " << atid << dendl;
-       mds->queue_waiters(ack_waiters[atid]);
-       ack_waiters.erase(atid);
-      }
+      mds->mdlog->submit_entry(new EAnchorClient(ANCHOR_OP_ACK, atid),
+                              new C_LoggedAck(this, atid));
     }
     break;
 
@@ -209,6 +198,24 @@ void AnchorClient::handle_anchor_reply(class MAnchor *m)
 }
 
 
+void AnchorClient::_logged_ack(version_t atid)
+{
+  dout(10) << "_logged_ack" << dendl;
+
+  assert(pending_commit.count(atid));
+  assert(pending_commit[atid]->pending_commit_atids.count(atid));
+  
+  pending_commit[atid]->pending_commit_atids.erase(atid);
+  pending_commit.erase(atid);
+  
+  // kick any waiters (LogSegment trim)
+  if (ack_waiters.count(atid)) {
+    dout(15) << "kicking ack waiters on atid " << atid << dendl;
+    mds->queue_waiters(ack_waiters[atid]);
+    ack_waiters.erase(atid);
+  }
+}
+
 
 /*
  * public async interface
@@ -291,12 +298,13 @@ void AnchorClient::prepare_update(inodeno_t ino, vector<Anchor>& trace,
 
 // COMMIT
 
-void AnchorClient::commit(version_t atid)
+void AnchorClient::commit(version_t atid, LogSegment *ls)
 {
   dout(10) << "commit " << atid << dendl;
 
-  assert(pending_commit.count(atid));
-  pending_commit.insert(atid);
+  assert(pending_commit.count(atid) == 0);
+  pending_commit[atid] = ls;
+  ls->pending_commit_atids.insert(atid);
 
   // send message
   MAnchor *req = new MAnchor(ANCHOR_OP_COMMIT, 0, atid);
@@ -318,11 +326,11 @@ void AnchorClient::finish_recovery()
 
 void AnchorClient::resend_commits()
 {
-  for (set<version_t>::iterator p = pending_commit.begin();
+  for (map<version_t,LogSegment*>::iterator p = pending_commit.begin();
        p != pending_commit.end();
        ++p) {
-    dout(10) << "resending commit on " << *p << dendl;
-    MAnchor *req = new MAnchor(ANCHOR_OP_COMMIT, 0, *p);
+    dout(10) << "resending commit on " << p->first << dendl;
+    MAnchor *req = new MAnchor(ANCHOR_OP_COMMIT, 0, p->first);
     mds->send_message_mds(req, 
                          mds->mdsmap->get_anchortable(),
                          MDS_PORT_ANCHORTABLE, MDS_PORT_ANCHORCLIENT);
index 6ec5603b0bc7e508e8e06955255b36d24d3851e8..fd790f39c399d783c5230e2f8f2fea627d738ccd 100644 (file)
@@ -27,6 +27,7 @@ using __gnu_cxx::hash_map;
 
 class Context;
 class MDS;
+class LogSegment;
 
 class AnchorClient : public Dispatcher {
   MDS *mds;
@@ -49,11 +50,22 @@ class AnchorClient : public Dispatcher {
   hash_map<inodeno_t, _pending_prepare> pending_update_prepare;
 
   // pending commits
-  set<version_t> pending_commit;
+  map<version_t, LogSegment*> pending_commit;
   map<version_t, list<Context*> > ack_waiters;
 
   void handle_anchor_reply(class MAnchor *m);  
 
+  class C_LoggedAck : public Context {
+    AnchorClient *ac;
+    version_t atid;
+  public:
+    C_LoggedAck(AnchorClient *a, version_t t) : ac(a), atid(t) {}
+    void finish(int r) {
+      ac->_logged_ack(atid);
+    }
+  };
+  void _logged_ack(version_t atid);
+
 public:
   AnchorClient(MDS *m) : mds(m) {}
   
@@ -66,7 +78,7 @@ public:
   void prepare_destroy(inodeno_t ino, version_t *atid, Context *onfinish);
   void prepare_update(inodeno_t ino, vector<Anchor>& trace, version_t *atid, Context *onfinish);
 
-  void commit(version_t atid);
+  void commit(version_t atid, LogSegment *ls);
 
   // for recovery (by other nodes)
   void handle_mds_recovery(int mds); // called when someone else recovers
@@ -75,8 +87,8 @@ public:
   void resend_prepares(hash_map<inodeno_t, _pending_prepare>& prepares, int op);
 
   // for recovery (by me)
-  void got_journaled_agree(version_t atid) {
-    pending_commit.insert(atid);
+  void got_journaled_agree(version_t atid, LogSegment *ls) {
+    pending_commit[atid] = ls;
   }
   void got_journaled_ack(version_t atid) {
     pending_commit.erase(atid);
index 1297acb869573ef490a1887b9a7c3388b60199df..2b6bb3470e8a8d15b76a925cce1ce5b1ead47d98 100644 (file)
@@ -120,7 +120,7 @@ pair<int,int> CDentry::authority()
 void CDentry::add_waiter(int tag, Context *c)
 {
   // wait on the directory?
-  if (tag & (WAIT_AUTHPINNABLE|WAIT_SINGLEAUTH)) {
+  if (tag & (WAIT_UNFREEZE|WAIT_SINGLEAUTH)) {
     dir->add_waiter(tag, c);
     return;
   }
index a96ff54d590e78600098bdd2502f1b9396f523b8..7991ab9447d7efdca3ff4abbd601330644a9b1fe 100644 (file)
@@ -211,13 +211,14 @@ public:
   // -- exporting
   // note: this assumes the dentry already exists.  
   // i.e., the name is already extracted... so we just need the other state.
-  void encode_export_state(bufferlist& bl) {
-    bl.append((char*)&state, sizeof(state));
-    bl.append((char*)&version, sizeof(version));
-    bl.append((char*)&projected_version, sizeof(projected_version));
+  void encode_export(bufferlist& bl) {
+    ::_encode_simple(state, bl);
+    ::_encode_simple(version, bl);
+    ::_encode_simple(projected_version, bl);
     lock._encode(bl);
-    ::_encode(replica_map, bl);
-
+    ::_encode_simple(replica_map, bl);
+  }
+  void finish_export() {
     // twiddle
     clear_replica_map();
     replica_nonce = EXPORT_NONCE;
@@ -225,6 +226,7 @@ public:
     if (is_dirty())
       mark_clean();
   }
+
   void decode_import_state(bufferlist& bl, int& off, int from, int to, LogSegment *ls) {
     int nstate;
     bl.copy(off, sizeof(nstate), (char*)&nstate);
index 6a3505148e5040c641d4f2341735d2661d3cd2a3..d01a0b89889e4bbab141692a64fe77b76dbf7eae 100644 (file)
@@ -794,7 +794,7 @@ void CDir::fetch(Context *c, bool ignore_authpinnability)
 
   if (!can_auth_pin() && !ignore_authpinnability) {
     dout(7) << "fetch waiting for authpinnable" << dendl;
-    add_waiter(WAIT_AUTHPINNABLE, c);
+    add_waiter(WAIT_UNFREEZE, c);
     return;
   }
 
index 1f14590741cea7ac6f8bebb64de0aaf230c4a565..f8ba1b29ce9d752631b4b80e9aab18af6e287a92 100644 (file)
@@ -143,7 +143,7 @@ class CDir : public MDSCacheObject {
   static const int WAIT_DNLOCK_OFFSET = 4;
 
   static const int WAIT_ANY  = (0xffffffff);
-  static const int WAIT_ATFREEZEROOT = (WAIT_AUTHPINNABLE|WAIT_UNFREEZE);
+  static const int WAIT_ATFREEZEROOT = (WAIT_UNFREEZE);
   static const int WAIT_ATSUBTREEROOT = (WAIT_SINGLEAUTH);
 
 
@@ -550,10 +550,11 @@ class CDirExport {
     
     st.pop_me = dir->pop_me;
     st.pop_auth_subtree = dir->pop_auth_subtree;
+    /*
     dir->pop_auth_subtree_nested -= dir->pop_auth_subtree;
     dir->pop_me.zero(now);
     dir->pop_auth_subtree.zero(now);
-
+    */
     rep_by = dir->dir_rep_by;
     replicas = dir->replica_map;
   }
index 3d4c219cc4be72b7e79cbf7a1130fdf3c69a6fbf..26f0cb3ea570d12298472788db12005c6b02771d 100644 (file)
@@ -604,7 +604,7 @@ bool CInode::is_freezing()
 void CInode::add_waiter(int tag, Context *c) 
 {
   // wait on the directory?
-  if (tag & (WAIT_AUTHPINNABLE|WAIT_SINGLEAUTH)) {
+  if (tag & (WAIT_UNFREEZE|WAIT_SINGLEAUTH)) {
     parent->dir->add_waiter(tag, c);
     return;
   }
index 25195443a94b7f52f7b1c11e3eeda879ee22d2ef..f0a29ec51e1c01143883f1e73e3e1c4cb9c860d0 100644 (file)
@@ -202,7 +202,7 @@ bool Locker::acquire_locks(MDRequest *mdr,
     if (!object->can_auth_pin()) {
       // wait
       dout(10) << " can't auth_pin (freezing?), waiting to authpin " << *object << dendl;
-      object->add_waiter(MDSCacheObject::WAIT_AUTHPINNABLE, new C_MDS_RetryRequest(mdcache, mdr));
+      object->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_MDS_RetryRequest(mdcache, mdr));
       mds->locker->drop_locks(mdr);
       mdr->drop_local_auth_pins();
       return false;
@@ -979,7 +979,7 @@ void Locker::try_simple_eval(SimpleLock *lock)
   if (!lock->get_parent()->can_auth_pin()) {
     dout(7) << "try_simple_eval can't auth_pin, waiting on " << *lock->get_parent() << dendl;
     //if (!lock->get_parent()->is_waiter(MDSCacheObject::WAIT_SINGLEAUTH))
-    lock->get_parent()->add_waiter(MDSCacheObject::WAIT_AUTHPINNABLE, new C_Locker_SimpleEval(this, lock));
+    lock->get_parent()->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_Locker_SimpleEval(this, lock));
     return;
   }
 
@@ -1437,7 +1437,7 @@ void Locker::try_scatter_eval(ScatterLock *lock)
   if (!lock->get_parent()->can_auth_pin()) {
     dout(7) << "try_scatter_eval can't auth_pin, waiting on " << *lock->get_parent() << dendl;
     //if (!lock->get_parent()->is_waiter(MDSCacheObject::WAIT_SINGLEAUTH))
-    lock->get_parent()->add_waiter(MDSCacheObject::WAIT_AUTHPINNABLE, new C_Locker_ScatterEval(this, lock));
+    lock->get_parent()->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_Locker_ScatterEval(this, lock));
     return;
   }
 
@@ -2162,7 +2162,7 @@ void Locker::try_file_eval(FileLock *lock)
   if (!lock->get_parent()->can_auth_pin()) {
     dout(7) << "try_file_eval can't auth_pin, waiting on " << *in << dendl;
     //if (!lock->get_parent()->is_waiter(MDSCacheObject::WAIT_SINGLEAUTH))
-    in->add_waiter(CInode::WAIT_AUTHPINNABLE, new C_Locker_FileEval(this, lock));
+    in->add_waiter(CInode::WAIT_UNFREEZE, new C_Locker_FileEval(this, lock));
     return;
   }
 
index 832ac236af1fa82d985d5dd197788c6db9604851..58511c66db9049d35462e3863c5a2f89ee54a0dd 100644 (file)
@@ -19,6 +19,9 @@
 #include "include/interval_set.h"
 #include "include/Context.h"
 
+#include <ext/hash_set>
+using __gnu_cxx::hash_set;
+
 class CDir;
 class CInode;
 class CDentry;
@@ -41,7 +44,7 @@ class LogSegment {
   map<CInode*, map<off_t,off_t> > purging_inodes;
 
   // committed anchor transactions
-  interval_set<version_t> atids;
+  hash_set<version_t> pending_commit_atids;
 
   // client request ids
   map<int, tid_t> last_client_tids;
index a075fa7b3be3ea5971d28b13f47a7410d9c0afa1..ac7fc868df305a4c290ad994659cb894fab2d44c 100644 (file)
@@ -4477,7 +4477,7 @@ void MDCache::anchor_create(MDRequest *mdr, CInode *in, Context *onfinish)
   if (!in->can_auth_pin() &&
       !mdr->is_auth_pinned(in)) {
     dout(7) << "anchor_create not authpinnable, waiting on " << *in << dendl;
-    in->add_waiter(CInode::WAIT_AUTHPINNABLE, onfinish);
+    in->add_waiter(CInode::WAIT_UNFREEZE, onfinish);
     return;
   }
 
@@ -4510,13 +4510,12 @@ class C_MDC_AnchorCreateLogged : public Context {
   MDCache *cache;
   CInode *in;
   version_t atid;
-  version_t pdv;
   LogSegment *ls;
 public:
-  C_MDC_AnchorCreateLogged(MDCache *c, CInode *i, version_t t, version_t v, LogSegment *s) : 
-    cache(c), in(i), atid(t), pdv(v), ls(s) {}
+  C_MDC_AnchorCreateLogged(MDCache *c, CInode *i, version_t t, LogSegment *s) : 
+    cache(c), in(i), atid(t), ls(s) {}
   void finish(int r) {
-    cache->_anchor_create_logged(in, atid, pdv, ls);
+    cache->_anchor_create_logged(in, atid, ls);
   }
 };
 
@@ -4525,29 +4524,24 @@ void MDCache::_anchor_create_prepared(CInode *in, version_t atid)
   dout(10) << "_anchor_create_prepared " << *in << " atid " << atid << dendl;
   assert(in->inode.anchored == false);
 
-  // predirty, prepare log entry
-  version_t pdv = in->pre_dirty();
-
-  EUpdate *le = new EUpdate(mds->mdlog, "anchor_create");
-  le->metablob.add_dir_context(in->get_parent_dir());
-
   // update the logged inode copy
-  inode_t *pi = le->metablob.add_dentry(in->parent, true);
+  inode_t *pi = in->project_inode();
   pi->anchored = true;
-  pi->version = pdv;
+  pi->version = in->pre_dirty();
 
   // note anchor transaction
+  EUpdate *le = new EUpdate(mds->mdlog, "anchor_create");
+  le->metablob.add_dir_context(in->get_parent_dir());
+  le->metablob.add_primary_dentry(in->parent, true, 0, pi);
   le->metablob.add_anchor_transaction(atid);
-
-  // log + wait
-  mds->mdlog->submit_entry(le, new C_MDC_AnchorCreateLogged(this, in, atid, pdv, 
+  mds->mdlog->submit_entry(le, new C_MDC_AnchorCreateLogged(this, in, atid,
                                                            mds->mdlog->get_current_segment()));
 }
 
 
-void MDCache::_anchor_create_logged(CInode *in, version_t atid, version_t pdv, LogSegment *ls)
+void MDCache::_anchor_create_logged(CInode *in, version_t atid, LogSegment *ls)
 {
-  dout(10) << "_anchor_create_logged pdv " << pdv << " on " << *in << dendl;
+  dout(10) << "_anchor_create_logged on " << *in << dendl;
 
   // unpin
   assert(in->state_test(CInode::STATE_ANCHORING));
@@ -4556,11 +4550,10 @@ void MDCache::_anchor_create_logged(CInode *in, version_t atid, version_t pdv, L
   in->auth_unpin();
   
   // apply update to cache
-  in->inode.anchored = true;
-  in->mark_dirty(pdv, ls);
+  in->pop_and_dirty_projected_inode(ls);
   
   // tell the anchortable we've committed
-  mds->anchorclient->commit(atid);
+  mds->anchorclient->commit(atid, ls);
 
   // trigger waiters
   in->finish_waiting(CInode::WAIT_ANCHORED, 0);
@@ -4588,7 +4581,7 @@ void MDCache::anchor_destroy(CInode *in, Context *onfinish)
   if (!in->can_auth_pin()/* &&
                            !mdr->is_auth_pinned(in)*/) {
     dout(7) << "anchor_destroy not authpinnable, waiting on " << *in << dendl;
-    in->add_waiter(CInode::WAIT_AUTHPINNABLE, onfinish);
+    in->add_waiter(CInode::WAIT_UNFREEZE, onfinish);
     return;
   }
 
@@ -4618,12 +4611,12 @@ class C_MDC_AnchorDestroyLogged : public Context {
   MDCache *cache;
   CInode *in;
   version_t atid;
-  version_t pdv;
+  LogSegment *ls;
 public:
-  C_MDC_AnchorDestroyLogged(MDCache *c, CInode *i, version_t t, version_t v) :
-    cache(c), in(i), atid(t), pdv(v) {}
+  C_MDC_AnchorDestroyLogged(MDCache *c, CInode *i, version_t t, LogSegment *l) :
+    cache(c), in(i), atid(t), ls(l) {}
   void finish(int r) {
-    cache->_anchor_destroy_logged(in, atid, pdv);
+    cache->_anchor_destroy_logged(in, atid, ls);
   }
 };
 
@@ -4633,28 +4626,23 @@ void MDCache::_anchor_destroy_prepared(CInode *in, version_t atid)
 
   assert(in->inode.anchored == true);
 
-  // predirty, prepare log entry
-  version_t pdv = in->pre_dirty();
-
-  EUpdate *le = new EUpdate(mds->mdlog, "anchor_destroy");
-  le->metablob.add_dir_context(in->get_parent_dir());
-
   // update the logged inode copy
-  inode_t *pi = le->metablob.add_dentry(in->parent, true);
+  inode_t *pi = in->project_inode();
   pi->anchored = true;
-  pi->version = pdv;
-
-  // note anchor transaction
-  le->metablob.add_anchor_transaction(atid);
+  pi->version = in->pre_dirty();
 
   // log + wait
-  mds->mdlog->submit_entry(le, new C_MDC_AnchorDestroyLogged(this, in, atid, pdv));
+  EUpdate *le = new EUpdate(mds->mdlog, "anchor_destroy");
+  le->metablob.add_dir_context(in->get_parent_dir());
+  le->metablob.add_primary_dentry(in->parent, true, 0, pi);
+  le->metablob.add_anchor_transaction(atid);
+  mds->mdlog->submit_entry(le, new C_MDC_AnchorDestroyLogged(this, in, atid, mds->mdlog->get_current_segment()));
 }
 
 
-void MDCache::_anchor_destroy_logged(CInode *in, version_t atid, version_t pdv)
+void MDCache::_anchor_destroy_logged(CInode *in, version_t atid, LogSegment *ls)
 {
-  dout(10) << "_anchor_destroy_logged pdv " << pdv << " on " << *in << dendl;
+  dout(10) << "_anchor_destroy_logged on " << *in << dendl;
   
   // unpin
   assert(in->state_test(CInode::STATE_UNANCHORING));
@@ -4663,11 +4651,10 @@ void MDCache::_anchor_destroy_logged(CInode *in, version_t atid, version_t pdv)
   in->auth_unpin();
   
   // apply update to cache
-  in->inode.anchored = false;
-  in->inode.version = pdv;
-  
+  in->pop_and_dirty_projected_inode(ls);
+
   // tell the anchortable we've committed
-  mds->anchorclient->commit(atid);
+  mds->anchorclient->commit(atid, ls);
 
   // trigger waiters
   in->finish_waiting(CInode::WAIT_UNANCHORED, 0);
index 3db01bb6a0ea3dd21ec53d546d018f32d4a9db97..8f9b0e9ac2e7d2eb098dc15717fb83ce431ed108 100644 (file)
@@ -126,6 +126,7 @@ struct MDRequest {
   version_t dst_reanchor_atid;  // dst->stray
   bufferlist inode_import;
   version_t inode_import_v;
+  CInode *inode_export;         // inode we're exporting, if any
   CDentry *srcdn; // srcdn, if auth, on slave
   
   // called when slave commits
@@ -139,6 +140,7 @@ struct MDRequest {
     ls(0),
     done_locking(false), committing(false), aborted(false),
     src_reanchor_atid(0), dst_reanchor_atid(0), inode_import_v(0),
+    inode_export(0), srcdn(0),
     slave_commit(0) { }
   MDRequest(metareqid_t ri, MClientRequest *req) : 
     reqid(ri), client_request(req), ref(0), 
@@ -146,6 +148,7 @@ struct MDRequest {
     ls(0),
     done_locking(false), committing(false), aborted(false),
     src_reanchor_atid(0), dst_reanchor_atid(0), inode_import_v(0),
+    inode_export(0), srcdn(0),
     slave_commit(0) { }
   MDRequest(metareqid_t ri, int by) : 
     reqid(ri), client_request(0), ref(0),
@@ -153,6 +156,7 @@ struct MDRequest {
     ls(0),
     done_locking(false), committing(false), aborted(false),
     src_reanchor_atid(0), dst_reanchor_atid(0), inode_import_v(0),
+    inode_export(0), srcdn(0),
     slave_commit(0) { }
   
   bool is_master() { return slave_to_mds < 0; }
@@ -558,9 +562,9 @@ public:
   void anchor_destroy(CInode *in, Context *onfinish);
 protected:
   void _anchor_create_prepared(CInode *in, version_t atid);
-  void _anchor_create_logged(CInode *in, version_t atid, version_t pdv, LogSegment *ls);
+  void _anchor_create_logged(CInode *in, version_t atid, LogSegment *ls);
   void _anchor_destroy_prepared(CInode *in, version_t atid);
-  void _anchor_destroy_logged(CInode *in, version_t atid, version_t pdv);
+  void _anchor_destroy_logged(CInode *in, version_t atid, LogSegment *ls);
 
   friend class C_MDC_AnchorCreatePrepared;
   friend class C_MDC_AnchorCreateLogged;
index 3852784974a25505c72c32665e5872367aa5c2cf..7819bec5f3a673c7e810af1fd278f1510ef40f83 100644 (file)
@@ -775,7 +775,6 @@ void Migrator::export_go(CDir *dir)
   export_warning_ack_waiting.erase(dir);
   export_state[dir] = EXPORT_EXPORTING;
 
-  assert(export_data.count(dir) == 0);
   assert(dir->get_cum_auth_pins() == 0);
 
   // set ambiguous auth
@@ -786,22 +785,19 @@ void Migrator::export_go(CDir *dir)
   
   // fill export message with cache data
   utime_t now = g_clock.now();
-  C_Contexts *fin = new C_Contexts;       // collect all the waiters
   map<int,entity_inst_t> exported_client_map;
-  int num_exported_inodes = encode_export_dir( export_data[dir], 
-                                              fin, 
-                                              dir,   // base
+  list<bufferlist> export_data;
+  int num_exported_inodes = encode_export_dir( export_data,
                                               dir,   // recur start point
-                                              dest,
                                               exported_client_map,
                                               now );
   bufferlist bl;
   ::_encode(exported_client_map, bl);
-  export_data[dir].push_front(bl);
+  export_data.push_front(bl);
 
   // send the export data!
   MExportDir *req = new MExportDir(dir->dirfrag());
-  req->set_dirstate(export_data[dir]);
+  req->take_dirstate(export_data);
 
   // add bounds to message
   set<CDir*> bounds;
@@ -811,12 +807,9 @@ void Migrator::export_go(CDir *dir)
        ++p)
     req->add_export((*p)->dirfrag());
 
-  //end
+  // send
   mds->send_message_mds(req, dest, MDS_PORT_MIGRATOR);
 
-  // queue up the finisher
-  dir->add_waiter( CDir::WAIT_UNFREEZE, fin );
-
   // stats
   if (mds->logger) mds->logger->inc("ex");
   if (mds->logger) mds->logger->inc("iex", num_exported_inodes);
@@ -830,22 +823,39 @@ void Migrator::export_go(CDir *dir)
  * encode relevant state to be sent over the wire.
  * used by: encode_export_dir, file_rename (if foreign)
  */
-void Migrator::encode_export_inode(CInode *in, bufferlist& enc_state, int new_auth, 
+void Migrator::encode_export_inode(CInode *in, bufferlist& enc_state, 
                                   map<int,entity_inst_t>& exported_client_map,
                                   utime_t now)
 {
+  dout(7) << "encode_export_inode " << *in << dendl;
+  assert(!in->is_replica(mds->get_nodeid()));
+
+  CInodeExport istate(in, now);
+  istate._encode(enc_state);
+
+  // make note of clients named by exported capabilities
+  for (map<int, Capability>::iterator it = in->client_caps.begin();
+       it != in->client_caps.end();
+       it++) 
+    exported_client_map[it->first] = mds->clientmap.get_inst(it->first);
+}
+
+void Migrator::finish_export_inode(CInode *in, C_Contexts *fin)
+{
+  dout(12) << "finish_export_inode " << *in << dendl;
+
   // tell (all) clients about migrating caps.. mark STALE
   for (map<int, Capability>::iterator it = in->client_caps.begin();
        it != in->client_caps.end();
        it++) {
-    dout(7) << "encode_export_inode " << *in << " telling client" << it->first << " stale caps" << dendl;
+    dout(7) << "finish_export_inode telling client" << it->first
+           << " stale caps on " << *in << dendl;
     MClientFileCaps *m = new MClientFileCaps(MClientFileCaps::OP_STALE,
                                             in->inode, 
                                              it->second.get_last_seq(), 
                                              it->second.pending(),
                                              it->second.wanted());
     entity_inst_t inst = mds->clientmap.get_inst(it->first);
-    exported_client_map[it->first] = inst; 
     mds->send_message_client_maybe_open(m, inst);
   }
 
@@ -853,14 +863,7 @@ void Migrator::encode_export_inode(CInode *in, bufferlist& enc_state, int new_au
   if (!in->is_replicated())
     in->replicate_relax_locks();
 
-  // add inode
-  assert(!in->is_replica(mds->get_nodeid()));
-  CInodeExport istate(in, now);
-  istate._encode( enc_state );
-
-  // we're export this inode; fix inode state
-  dout(7) << "encode_export_inode " << *in << dendl;
-  
+  // clean
   if (in->is_dirty()) in->mark_clean();
   
   // clear/unpin cached_by (we're no longer the authority)
@@ -878,19 +881,21 @@ void Migrator::encode_export_inode(CInode *in, bufferlist& enc_state, int new_au
   in->state_clear(CInode::STATE_AUTH);
   in->replica_nonce = CInode::EXPORT_NONCE;
   
+  // waiters
+  list<Context*> waiters;
+  in->take_waiting(CInode::WAIT_ANY, waiters);
+  fin->take(waiters);
+  
   // *** other state too?
 
   // move to end of LRU so we drop out of cache quickly!
   if (in->get_parent_dn()) 
     cache->lru.lru_bottouch(in->get_parent_dn());
-}
 
+}
 
 int Migrator::encode_export_dir(list<bufferlist>& dirstatelist,
-                               C_Contexts *fin,
-                               CDir *basedir,
                                CDir *dir,
-                               int newauth, 
                                map<int,entity_inst_t>& exported_client_map,
                                utime_t now)
 {
@@ -902,32 +907,11 @@ int Migrator::encode_export_dir(list<bufferlist>& dirstatelist,
 
   // dir 
   bufferlist enc_dir;
-  
   CDirExport dstate(dir, now);
   dstate._encode( enc_dir );
   
-  // release open_by 
-  dir->clear_replica_map();
-
-  // mark
-  assert(dir->is_auth());
-  dir->state_clear(CDir::STATE_AUTH);
-  dir->replica_nonce = CDir::NONCE_EXPORT;
-
-  list<CDir*> subdirs;
-
-  if (dir->is_dirty())
-    dir->mark_clean();
-  
-  // discard most dir state
-  dir->state &= CDir::MASK_STATE_EXPORT_KEPT;  // i only retain a few things.
-  
-  // suck up all waiters
-  list<Context*> waiting;
-  dir->take_waiting(CDir::WAIT_ANY, waiting);    // all dir waiters
-  fin->take(waiting);
-  
   // dentries
+  list<CDir*> subdirs;
   CDir::map_t::iterator it;
   for (it = dir->begin(); it != dir->end(); it++) {
     CDentry *dn = it->second;
@@ -938,11 +922,11 @@ int Migrator::encode_export_dir(list<bufferlist>& dirstatelist,
     // -- dentry
     dout(7) << "encode_export_dir exporting " << *dn << dendl;
     
-    // name
+    // dn name
     ::_encode(it->first, enc_dir);
     
     // state
-    it->second->encode_export_state(enc_dir);
+    dn->encode_export(enc_dir);
     
     // points to...
     
@@ -967,7 +951,7 @@ int Migrator::encode_export_dir(list<bufferlist>& dirstatelist,
     // -- inode
     enc_dir.append("I", 1);    // inode dentry
     
-    encode_export_inode(in, enc_dir, newauth, exported_client_map, now);  // encode, and (update state for) export
+    encode_export_inode(in, enc_dir, exported_client_map, now);  // encode, and (update state for) export
     
     // directory?
     list<CDir*> dfs;
@@ -980,11 +964,6 @@ int Migrator::encode_export_dir(list<bufferlist>& dirstatelist,
        subdirs.push_back(dir);  // it's ours, recurse (later)
       }
     }
-    
-    // waiters
-    list<Context*> waiters;
-    in->take_waiting(CInode::WAIT_ANY, waiters);
-    fin->take(waiters);
   }
 
   // add to dirstatelist
@@ -994,12 +973,62 @@ int Migrator::encode_export_dir(list<bufferlist>& dirstatelist,
 
   // subdirs
   for (list<CDir*>::iterator it = subdirs.begin(); it != subdirs.end(); it++)
-    num_exported += encode_export_dir(dirstatelist, fin, basedir, *it, newauth, 
-                                     exported_client_map, now);
+    num_exported += encode_export_dir(dirstatelist, *it, exported_client_map, now);
 
   return num_exported;
 }
 
+void Migrator::finish_export_dir(CDir *dir, C_Contexts *fin, utime_t now)
+{
+  dout(10) << "finish_export_dir " << *dir << dendl;
+
+  // release open_by 
+  dir->clear_replica_map();
+
+  // mark
+  assert(dir->is_auth());
+  dir->state_clear(CDir::STATE_AUTH);
+  dir->replica_nonce = CDir::NONCE_EXPORT;
+
+  if (dir->is_dirty())
+    dir->mark_clean();
+  
+  // discard most dir state
+  dir->state &= CDir::MASK_STATE_EXPORT_KEPT;  // i only retain a few things.
+
+  // suck up all waiters
+  list<Context*> waiting;
+  dir->take_waiting(CDir::WAIT_ANY, waiting);    // all dir waiters
+  fin->take(waiting);
+  
+  // pop
+  dir->pop_auth_subtree_nested -= dir->pop_auth_subtree;
+  dir->pop_me.zero(now);
+  dir->pop_auth_subtree.zero(now);
+
+  // dentries
+  list<CDir*> subdirs;
+  CDir::map_t::iterator it;
+  for (it = dir->begin(); it != dir->end(); it++) {
+    CDentry *dn = it->second;
+    CInode *in = dn->get_inode();
+
+    // dentry
+    dn->finish_export();
+
+    // inode?
+    if (dn->is_primary()) {
+      finish_export_inode(in, fin);
+
+      // subdirs?
+      in->get_nested_dirfrags(subdirs);
+    }
+  }
+
+  // subdirs
+  for (list<CDir*>::iterator it = subdirs.begin(); it != subdirs.end(); it++) 
+    finish_export_dir(*it, fin, now);
+}
 
 class C_MDS_ExportFinishLogged : public Context {
   Migrator *migrator;
@@ -1027,7 +1056,6 @@ void Migrator::handle_export_ack(MExportDirAck *m)
   export_warning_ack_waiting.erase(dir);
   
   export_state[dir] = EXPORT_LOGGINGFINISH;
-  export_data.erase(dir);
   
   set<CDir*> bounds;
   cache->get_subtree_bounds(dir, bounds);
@@ -1055,6 +1083,8 @@ void Migrator::handle_export_ack(MExportDirAck *m)
 
 
 
+
+
 /*
  * this happens if hte dest failes after i send teh export data but before it is acked
  * that is, we don't know they safely received and logged it, so we reverse our changes
@@ -1065,7 +1095,6 @@ void Migrator::export_reverse(CDir *dir)
   dout(7) << "export_reverse " << *dir << dendl;
   
   assert(export_state[dir] == EXPORT_EXPORTING);
-  assert(export_data.count(dir));
   
   set<CDir*> bounds;
   cache->get_subtree_bounds(dir, bounds);
@@ -1083,27 +1112,10 @@ void Migrator::export_reverse(CDir *dir)
     bd->state_clear(CDir::STATE_EXPORTBOUND);
   }
 
-  // re-import the metadata
-  map<int,entity_inst_t> imported_client_map;
-  int off = 0;
-  ::_decode(imported_client_map, export_data[dir].front(), off);
-  export_data[dir].pop_front();
-
-  while (!export_data[dir].empty()) {
-    decode_import_dir(export_data[dir].front(), 
-                     export_peer[dir], 
-                     dir,                 // import root
-                     0,
-                     imported_client_map,
-                     0);
-    export_data[dir].pop_front();
-  }
-
   // process delayed expires
   cache->process_delayed_expire(dir);
   
   // some clean up
-  export_data.erase(dir);
   export_warning_ack_waiting.erase(dir);
   export_notify_ack_waiting.erase(dir);
 
@@ -1227,6 +1239,11 @@ void Migrator::export_finish(CDir *dir)
     dout(7) << "not sending MExportDirFinish, dest has failed" << dendl;
   }
   
+  // finish export (adjust local cache state)
+  C_Contexts *fin = new C_Contexts;
+  finish_export_dir(dir, fin, g_clock.now());
+  dir->add_waiter(CDir::WAIT_UNFREEZE, fin);
+
   // unfreeze
   dout(7) << "export_finish unfreezing" << dendl;
   dir->unfreeze_tree();
index f9336668dd7f736376a3eca5ad9ab482558e5028..ccfe2666d66abd05d25db83ef0eb202c321f34cd 100644 (file)
@@ -78,7 +78,7 @@ protected:
   // export fun
   map<CDir*,int>               export_state;
   map<CDir*,int>               export_peer;
-  map<CDir*,list<bufferlist> > export_data;   // only during EXPORTING state
+  //map<CDir*,list<bufferlist> > export_data;   // only during EXPORTING state
   map<CDir*,set<int> >         export_warning_ack_waiting;
   map<CDir*,set<int> >         export_notify_ack_waiting;
 
@@ -183,16 +183,15 @@ public:
   void export_dir_nicely(CDir *dir, int dest);
   void maybe_do_queued_export();
 
-  void encode_export_inode(CInode *in, bufferlist& enc_state, int newauth, 
+  void encode_export_inode(CInode *in, bufferlist& enc_state, 
                           map<int,entity_inst_t>& exported_client_map,
                           utime_t now);
+  void finish_export_inode(CInode *in, C_Contexts *fin);
   int encode_export_dir(list<bufferlist>& dirstatelist,
-                       class C_Contexts *fin,
-                       CDir *basedir,
                        CDir *dir,
-                       int newauth, 
                        map<int,entity_inst_t>& exported_client_map,
                        utime_t now);
+  void finish_export_dir(CDir *dir, class C_Contexts *fin, utime_t now);
 
   void add_export_finish_waiter(CDir *dir, Context *c) {
     export_finish_waiters[dir].push_back(c);
index 0e062e3ac37646ab69868cad539021b49888b04e..066502b5880966fbf96592437fa9f8670fffc0f1 100644 (file)
@@ -821,7 +821,7 @@ void Server::handle_slave_auth_pin(MDRequest *mdr)
          !(*p)->can_auth_pin()) {
        // wait
        dout(10) << " waiting for authpinnable on " << **p << dendl;
-       (*p)->add_waiter(CDir::WAIT_AUTHPINNABLE, new C_MDS_RetryRequest(mdcache, mdr));
+       (*p)->add_waiter(CDir::WAIT_UNFREEZE, new C_MDS_RetryRequest(mdcache, mdr));
        mdr->drop_local_auth_pins();
        return;
       }
@@ -1112,7 +1112,7 @@ CInode* Server::rdlock_path_pin_ref(MDRequest *mdr, bool want_auth)
   if (want_auth) {
     if (ref->is_frozen()) {
       dout(7) << "waiting for !frozen/authpinnable on " << *ref << dendl;
-      ref->add_waiter(CInode::WAIT_AUTHPINNABLE, new C_MDS_RetryRequest(mdcache, mdr));
+      ref->add_waiter(CInode::WAIT_UNFREEZE, new C_MDS_RetryRequest(mdcache, mdr));
       return 0;
     }
     mdr->auth_pin(ref);
@@ -1156,7 +1156,7 @@ CDentry* Server::rdlock_path_xlock_dentry(MDRequest *mdr, bool okexist, bool mus
   // make sure we can auth_pin (or have already authpinned) dir
   if (dir->is_frozen()) {
     dout(7) << "waiting for !frozen/authpinnable on " << *dir << dendl;
-    dir->add_waiter(CInode::WAIT_AUTHPINNABLE, new C_MDS_RetryRequest(mdcache, mdr));
+    dir->add_waiter(CInode::WAIT_UNFREEZE, new C_MDS_RetryRequest(mdcache, mdr));
     return 0;
   }
 
@@ -2449,7 +2449,7 @@ void Server::_unlink_local_finish(MDRequest *mdr,
   
   // commit anchor update?
   if (mdr->dst_reanchor_atid) 
-    mds->anchorclient->commit(mdr->dst_reanchor_atid);
+    mds->anchorclient->commit(mdr->dst_reanchor_atid, mdr->ls);
 
   // bump pop
   //mds->balancer->hit_dir(mdr->now, dn->dir, META_POP_DWR);
@@ -2555,7 +2555,7 @@ void Server::_unlink_remote_finish(MDRequest *mdr,
 
   // commit anchor update?
   if (mdr->dst_reanchor_atid) 
-    mds->anchorclient->commit(mdr->dst_reanchor_atid);
+    mds->anchorclient->commit(mdr->dst_reanchor_atid, mdr->ls);
 
   //mds->balancer->hit_dir(mdr->now, dn->dir, META_POP_DWR);
 
@@ -2946,8 +2946,8 @@ void Server::_rename_finish(MDRequest *mdr, CDentry *srcdn, CDentry *destdn, CDe
   _rename_apply(mdr, srcdn, destdn, straydn);
   
   // commit anchor updates?
-  if (mdr->src_reanchor_atid) mds->anchorclient->commit(mdr->src_reanchor_atid);
-  if (mdr->dst_reanchor_atid) mds->anchorclient->commit(mdr->dst_reanchor_atid);
+  if (mdr->src_reanchor_atid) mds->anchorclient->commit(mdr->src_reanchor_atid, mdr->ls);
+  if (mdr->dst_reanchor_atid) mds->anchorclient->commit(mdr->dst_reanchor_atid, mdr->ls);
 
   // bump popularity
   //if (srcdn->is_auth())
@@ -3325,7 +3325,7 @@ void Server::_logged_slave_rename(MDRequest *mdr,
   // bump popularity
   //if (srcdn->is_auth())
     //mds->balancer->hit_dir(mdr->now, srcdn->get_dir(), META_POP_DWR);
-  if (destdn->inode->is_auth())
+  if (destdn->inode && destdn->inode->is_auth())
     mds->balancer->hit_inode(mdr->now, destdn->inode, META_POP_IWR);
 
   // done.
@@ -3344,6 +3344,12 @@ void Server::_commit_slave_rename(MDRequest *mdr, int r,
     // commit
     _rename_apply(mdr, srcdn, destdn, straydn);
     
+    if (mdr->inode_export) {
+      C_Contexts *fin = new C_Contexts;
+      mdcache->migrator->finish_export_inode(mdr->inode_export, fin);
+      mds->queue_waiter(fin);
+    }
+
     // write a commit to the journal
     le = new ESlaveUpdate(mdlog, "slave_rename_commit", mdr->reqid, mdr->slave_to_mds, ESlaveUpdate::OP_COMMIT);
   } else {
@@ -3401,7 +3407,7 @@ void Server::handle_slave_rename_get_inode(MDRequest *mdr)
 
   map<int,entity_inst_t> exported_client_map;
   bufferlist inodebl;
-  mdcache->migrator->encode_export_inode(mdr->srcdn->inode, inodebl, mdr->slave_to_mds, 
+  mdcache->migrator->encode_export_inode(mdr->srcdn->inode, inodebl, 
                                         exported_client_map, 
                                         mdr->now);
   ::_encode(exported_client_map, reply->inode_export);
@@ -3409,8 +3415,6 @@ void Server::handle_slave_rename_get_inode(MDRequest *mdr)
   
   reply->inode_export_v = mdr->srcdn->inode->inode.version;
 
-  mdr->inode_import = reply->inode_export;   // keep a copy locally, in case we have to rollback
-  
   mds->send_message_mds(reply, mdr->slave_to_mds, MDS_PORT_SERVER);
 
   // clean up.
index a43a8c179d7bbdd94e13244ba78cb1582ea0b9c9..7e0b9f0f9474c10a63bb3c8a40849c22c8120a6f 100644 (file)
@@ -91,7 +91,7 @@ C_Gather *LogSegment::try_to_expire(MDS *mds)
        dir->commit(0, gather->new_sub());
       } else {
        dout(10) << " waiting for unfreeze on " << *dir << dendl;
-       dir->add_waiter(CDir::WAIT_AUTHPINNABLE, gather->new_sub());
+       dir->add_waiter(CDir::WAIT_UNFREEZE, gather->new_sub());
       }
     }
   }
@@ -103,7 +103,16 @@ C_Gather *LogSegment::try_to_expire(MDS *mds)
     (*p)->dirlock.add_waiter(SimpleLock::WAIT_STABLE, gather->new_sub());
   }
   
-  // 
+  // pending commit atids
+  for (hash_set<version_t>::iterator p = pending_commit_atids.begin();
+       p != pending_commit_atids.end();
+       ++p) {
+    if (!gather) gather = new C_Gather;
+    assert(!mds->anchorclient->has_committed(*p));
+    dout(10) << " anchor transaction " << *p 
+            << " pending commit (not yet acked), waiting" << dendl;
+    mds->anchorclient->wait_for_ack(*p, gather->new_sub());
+  }
 
   return gather;
 }
@@ -345,7 +354,7 @@ void EMetaBlob::expire(MDS *mds, Context *c)
     else
       // pbly about to export|split|merge. 
       // just wait for it to unfreeze, then retry
-      p->first->add_waiter(CDir::WAIT_AUTHPINNABLE, gather->new_sub());  
+      p->first->add_waiter(CDir::WAIT_UNFREEZE, gather->new_sub());  
   }
   for (list<CDir*>::iterator p = waitfor_export.begin();
        p != waitfor_export.end();
@@ -423,8 +432,9 @@ void EMetaBlob::expire(MDS *mds, Context *c)
 void EMetaBlob::update_segment(LogSegment *ls)
 {
   // atids?
-  for (list<version_t>::iterator p = atids.begin(); p != atids.end(); ++p)
-    ls->atids.insert(*p);
+  //for (list<version_t>::iterator p = atids.begin(); p != atids.end(); ++p)
+  //  ls->pending_commit_atids[*p] = ls;
+  // -> handled directly by AnchorClient
 
   // dirty inode mtimes
   // -> handled directly by Server.cc, replay()
@@ -434,7 +444,7 @@ void EMetaBlob::update_segment(LogSegment *ls)
     ls->allocv = alloc_tablev;
 
   // truncated inodes
-  // -> handled directory by Server.cc
+  // -> handled directly by Server.cc
 
   // client requests
   //  note the newest request per client
@@ -593,7 +603,7 @@ void EMetaBlob::replay(MDS *mds, LogSegment *logseg)
        p != atids.end();
        ++p) {
     dout(10) << "EMetaBlob.replay noting anchor transaction " << *p << dendl;
-    mds->anchorclient->got_journaled_agree(*p);
+    mds->anchorclient->got_journaled_agree(*p, logseg);
   }
 
   // dirtied inode mtimes
index a0f70ff421e84ad002f2d44a4bbd6a34b788c593..7b02ac38d8618f2d4e6bda69b95b62de449bd68d 100644 (file)
@@ -396,8 +396,7 @@ class MDSCacheObject {
 
   // -- wait --
   const static int WAIT_SINGLEAUTH  = (1<<30);
-  const static int WAIT_AUTHPINNABLE = (1<<29);
-  const static int WAIT_UNFREEZE = WAIT_AUTHPINNABLE;
+  const static int WAIT_UNFREEZE    = (1<<29); // pka AUTHPINNABLE
 
 
   // ============================================
index 8fafbe0312636258774537d71d3a6bae489bb71b..f00a7fa2507d1858e00cb90ebd5178680e82e9dc 100644 (file)
@@ -46,6 +46,9 @@ class MExportDir : public Message {
   void set_dirstate(const list<bufferlist>& ls) {
     dirstate = ls;
   }
+  void take_dirstate(list<bufferlist>& ls) {
+    dirstate.swap(ls);
+  }
   void add_export(dirfrag_t df) { 
     bounds.push_back(df); 
   }