]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
mds: rework stale import/export message detection
authorYan, Zheng <zheng.z.yan@intel.com>
Thu, 24 Oct 2013 09:10:59 +0000 (17:10 +0800)
committerYan, Zheng <zheng.z.yan@intel.com>
Mon, 16 Dec 2013 04:15:22 +0000 (12:15 +0800)
Current code uses import state to detect obsolete import/export messages.
it does not work for the case: cancel a subtree export, export the same
subtree again, the messages for the first export get dispatched.

This patch introduces "transation ID" for subtree exports. Each subtree
export has a unique TID, the ID is recorded in all import/export related
messages. By comparing the TID, we can reliably detect stale messages.

Signed-off-by: Yan, Zheng <zheng.z.yan@intel.com>
12 files changed:
src/mds/Migrator.cc
src/mds/Migrator.h
src/messages/MExportDir.h
src/messages/MExportDirAck.h
src/messages/MExportDirCancel.h
src/messages/MExportDirDiscover.h
src/messages/MExportDirDiscoverAck.h
src/messages/MExportDirFinish.h
src/messages/MExportDirNotify.h
src/messages/MExportDirNotifyAck.h
src/messages/MExportDirPrep.h
src/messages/MExportDirPrepAck.h

index 139c15d21ce008c93eb96271d168545efdfc3d9b..f87f9605de37fc3d4dc103be7ed229eb8900ccb3 100644 (file)
@@ -243,7 +243,7 @@ void Migrator::export_try_cancel(CDir *dir)
     export_freeze_finish(dir);
     dir->state_clear(CDir::STATE_EXPORTING);
     if (mds->mdsmap->is_clientreplay_or_active_or_stopping(it->second.peer)) // tell them.
-      mds->send_message_mds(new MExportDirCancel(dir->dirfrag()), it->second.peer);
+      mds->send_message_mds(new MExportDirCancel(dir->dirfrag(), it->second.tid), it->second.peer);
     break;
 
   case EXPORT_FREEZING:
@@ -253,7 +253,7 @@ void Migrator::export_try_cancel(CDir *dir)
     export_freeze_finish(dir);
     dir->state_clear(CDir::STATE_EXPORTING);
     if (mds->mdsmap->is_clientreplay_or_active_or_stopping(it->second.peer)) // tell them.
-      mds->send_message_mds(new MExportDirCancel(dir->dirfrag()), it->second.peer);
+      mds->send_message_mds(new MExportDirCancel(dir->dirfrag(), it->second.tid), it->second.peer);
     break;
 
     // NOTE: state order reversal, warning comes after prepping
@@ -287,7 +287,7 @@ void Migrator::export_try_cancel(CDir *dir)
     export_unlock(dir);
     dir->state_clear(CDir::STATE_EXPORTING);
     if (mds->mdsmap->is_clientreplay_or_active_or_stopping(it->second.peer)) // tell them.
-      mds->send_message_mds(new MExportDirCancel(dir->dirfrag()), it->second.peer);
+      mds->send_message_mds(new MExportDirCancel(dir->dirfrag(), it->second.tid), it->second.peer);
     break;
 
   case EXPORT_EXPORTING:
@@ -717,6 +717,7 @@ void Migrator::export_dir(CDir *dir, int dest)
   export_state_t& stat = export_state[dir];
   stat.state = EXPORT_DISCOVERING;
   stat.peer = dest;
+  stat.tid = ++last_export_tid;
   stat.locks.swap(locks);
 
   dir->state_set(CDir::STATE_EXPORTING);
@@ -725,7 +726,9 @@ void Migrator::export_dir(CDir *dir, int dest)
   // send ExportDirDiscover (ask target)
   filepath path;
   dir->inode->make_path(path);
-  mds->send_message_mds(new MExportDirDiscover(mds->get_nodeid(), path, dir->dirfrag()), dest);
+  MExportDirDiscover *discover = new MExportDirDiscover(dir->dirfrag(), path,
+                                                       mds->get_nodeid(), stat.tid);
+  mds->send_message_mds(discover, dest);
   assert(g_conf->mds_kill_export_at != 2);
 
   // start the freeze, but hold it up with an auth_pin.
@@ -756,10 +759,11 @@ void Migrator::handle_export_discover_ack(MExportDirDiscoverAck *m)
 
   map<CDir*,export_state_t>::iterator it = export_state.find(dir);
   if (it == export_state.end() ||
-      it->second.state != EXPORT_DISCOVERING ||
+      it->second.tid != m->get_tid() ||
       it->second.peer != m->get_source().num()) {
     dout(7) << "must have aborted" << dendl;
   } else {
+    assert(it->second.state == EXPORT_DISCOVERING);
     // release locks to avoid deadlock
     export_unlock(dir);
     // freeze the subtree
@@ -797,7 +801,7 @@ void Migrator::export_frozen(CDir *dir)
     dir->state_clear(CDir::STATE_EXPORTING);
     mds->queue_waiters(it->second.waiting_for_finish);
 
-    mds->send_message_mds(new MExportDirCancel(dir->dirfrag()), it->second.peer);
+    mds->send_message_mds(new MExportDirCancel(dir->dirfrag(), it->second.tid), it->second.peer);
 
     export_state.erase(it);
     return;
@@ -815,7 +819,7 @@ void Migrator::export_frozen(CDir *dir)
   cache->get_subtree_bounds(dir, bounds);
 
   // generate prep message, log entry.
-  MExportDirPrep *prep = new MExportDirPrep(dir->dirfrag());
+  MExportDirPrep *prep = new MExportDirPrep(dir->dirfrag(), it->second.tid);
 
   // include list of bystanders
   for (map<int,int>::iterator p = dir->replicas_begin();
@@ -843,10 +847,10 @@ void Migrator::export_frozen(CDir *dir)
   set<dirfrag_t> dirfrags_added;
 
   // check bounds
-  for (set<CDir*>::iterator it = bounds.begin();
-       it != bounds.end();
-       ++it) {
-    CDir *bound = *it;
+  for (set<CDir*>::iterator p = bounds.begin();
+       p != bounds.end();
+       ++p) {
+    CDir *bound = *p;
 
     // pin it.
     bound->get(CDir::PIN_EXPORTBOUND);
@@ -918,13 +922,15 @@ void Migrator::handle_export_prep_ack(MExportDirPrepAck *m)
 
   map<CDir*,export_state_t>::iterator it = export_state.find(dir);
   if (it == export_state.end() ||
-      it->second.state != EXPORT_PREPPING) {
+      it->second.tid != m->get_tid() ||
+      it->second.peer != m->get_source().num()) {
     // export must have aborted.  
     dout(7) << "export must have aborted" << dendl;
     m->put();
     return;
   }
 
+  assert(it->second.state == EXPORT_PREPPING);
   assert (g_conf->mds_kill_export_at != 5);
   // send warnings
   set<CDir*> bounds;
@@ -942,7 +948,7 @@ void Migrator::handle_export_prep_ack(MExportDirPrepAck *m)
     it->second.warning_ack_waiting.insert(p->first);
     it->second.notify_ack_waiting.insert(p->first);  // we'll eventually get a notifyack, too!
 
-    MExportDirNotify *notify = new MExportDirNotify(dir->dirfrag(), true,
+    MExportDirNotify *notify = new MExportDirNotify(dir->dirfrag(), it->second.tid, true,
                                                    pair<int,int>(mds->get_nodeid(),CDIR_AUTH_UNKNOWN),
                                                    pair<int,int>(mds->get_nodeid(),it->second.peer));
     for (set<CDir*>::iterator q = bounds.begin(); q != bounds.end(); ++q)
@@ -966,10 +972,12 @@ void Migrator::handle_export_prep_ack(MExportDirPrepAck *m)
 class C_M_ExportGo : public Context {
   Migrator *migrator;
   CDir *dir;
+  uint64_t tid;
 public:
-  C_M_ExportGo(Migrator *m, CDir *d) : migrator(m), dir(d) {}
+  C_M_ExportGo(Migrator *m, CDir *d, uint64_t t) :
+    migrator(m), dir(d), tid(t) {}
   void finish(int r) {
-    migrator->export_go_synced(dir);
+    migrator->export_go_synced(dir, tid);
   }
 };
 
@@ -979,20 +987,21 @@ void Migrator::export_go(CDir *dir)
   dout(7) << "export_go " << *dir << " to " << export_state[dir].peer << dendl;
 
   // first sync log to flush out e.g. any cap imports
-  mds->mdlog->wait_for_safe(new C_M_ExportGo(this, dir));
+  mds->mdlog->wait_for_safe(new C_M_ExportGo(this, dir, export_state[dir].tid));
   mds->mdlog->flush();
 }
 
-void Migrator::export_go_synced(CDir *dir)
+void Migrator::export_go_synced(CDir *dir, uint64_t tid)
 {
 
   map<CDir*,export_state_t>::iterator it = export_state.find(dir);
   if (it == export_state.end() ||
-      it->second.state != EXPORT_WARNING) {
+      it->second.tid != tid) {
     // export must have aborted.  
     dout(7) << "export must have aborted on " << dir << dendl;
     return;
   }
+  assert(it->second.state == EXPORT_WARNING);
 
   dout(7) << "export_go_synced " << *dir << " to " << it->second.peer << dendl;
 
@@ -1011,7 +1020,7 @@ void Migrator::export_go_synced(CDir *dir)
   mds->balancer->subtract_export(dir, now);
   
   // fill export message with cache data
-  MExportDir *req = new MExportDir(dir->dirfrag());
+  MExportDir *req = new MExportDir(dir->dirfrag(), it->second.tid);
   map<client_t,entity_inst_t> exported_client_map;
   int num_exported_inodes = encode_export_dir(req->export_data,
                                              dir,   // recur start point
@@ -1331,6 +1340,7 @@ void Migrator::handle_export_ack(MExportDirAck *m)
   map<CDir*,export_state_t>::iterator it = export_state.find(dir);
   assert(it != export_state.end());
   assert(it->second.state == EXPORT_EXPORTING);
+  assert(it->second.tid == m->get_tid());
 
   it->second.state = EXPORT_LOGGINGFINISH;
   assert (g_conf->mds_kill_export_at != 9);
@@ -1375,7 +1385,7 @@ void Migrator::export_notify_abort(CDir *dir, set<CDir*>& bounds)
   for (set<int>::iterator p = stat.notify_ack_waiting.begin();
        p != stat.notify_ack_waiting.end();
        ++p) {
-    MExportDirNotify *notify = new MExportDirNotify(dir->dirfrag(), false,
+    MExportDirNotify *notify = new MExportDirNotify(dir->dirfrag(),stat.tid,false,
                                                    pair<int,int>(mds->get_nodeid(),stat.peer),
                                                    pair<int,int>(mds->get_nodeid(),CDIR_AUTH_UNKNOWN));
     for (set<CDir*>::iterator i = bounds.begin(); i != bounds.end(); ++i)
@@ -1460,7 +1470,7 @@ void Migrator::export_logged_finish(CDir *dir)
   for (set<int>::iterator p = stat.notify_ack_waiting.begin();
        p != stat.notify_ack_waiting.end();
        ++p) {
-    MExportDirNotify *notify = new MExportDirNotify(dir->dirfrag(), true,
+    MExportDirNotify *notify = new MExportDirNotify(dir->dirfrag(), stat.tid, true,
                                                    pair<int,int>(mds->get_nodeid(), stat.peer),
                                                    pair<int,int>(stat.peer, CDIR_AUTH_UNKNOWN));
 
@@ -1555,7 +1565,7 @@ void Migrator::export_finish(CDir *dir)
 
   // send finish/commit to new auth
   if (mds->mdsmap->is_clientreplay_or_active_or_stopping(it->second.peer)) {
-    mds->send_message_mds(new MExportDirFinish(dir->dirfrag()), it->second.peer);
+    mds->send_message_mds(new MExportDirFinish(dir->dirfrag(), it->second.tid), it->second.peer);
   } else {
     dout(7) << "not sending MExportDirFinish, dest has failed" << dendl;
   }
@@ -1634,18 +1644,23 @@ void Migrator::handle_export_discover(MExportDirDiscover *m)
   // note import state
   dirfrag_t df = m->get_dirfrag();
   // only start discovering on this message once.
+  map<dirfrag_t,import_state_t>::iterator it = import_state.find(df);
   if (!m->started) {
+    assert(it == import_state.end());
     m->started = true;
-    import_pending_msg[df] = m;
     import_state[df].state = IMPORT_DISCOVERING;
     import_state[df].peer = from;
+    import_state[df].tid = m->get_tid();
   } else {
     // am i retrying after ancient path_traverse results?
-    if (import_pending_msg.count(df) == 0 || import_pending_msg[df] != m) {
+    if (it == import_state.end() ||
+       it->second.peer != from ||
+       it->second.tid != m->get_tid()) {
       dout(7) << " dropping obsolete message" << dendl;
       m->put();
       return;
     }
+    assert(it->second.state == IMPORT_DISCOVERING);
   }
 
   if (!mds->mdcache->is_open()) {
@@ -1675,8 +1690,7 @@ void Migrator::handle_export_discover(MExportDirDiscover *m)
   // yay
   dout(7) << "handle_export_discover have " << df << " inode " << *in << dendl;
   
-  import_state[m->get_dirfrag()].state = IMPORT_DISCOVERED;
-  import_pending_msg.erase(m->get_dirfrag());
+  import_state[df].state = IMPORT_DISCOVERED;
 
   // pin inode in the cache (for now)
   assert(in->is_dir());
@@ -1684,14 +1698,13 @@ void Migrator::handle_export_discover(MExportDirDiscover *m)
 
   // reply
   dout(7) << " sending export_discover_ack on " << *in << dendl;
-  mds->send_message_mds(new MExportDirDiscoverAck(df), import_state[df].peer);
+  mds->send_message_mds(new MExportDirDiscoverAck(df, m->get_tid()), import_state[df].peer);
   m->put();
   assert (g_conf->mds_kill_import_at != 2);  
 }
 
 void Migrator::import_reverse_discovering(dirfrag_t df)
 {
-  import_pending_msg.erase(df);
   import_state.erase(df);
 }
 
@@ -1704,7 +1717,6 @@ void Migrator::import_reverse_discovered(dirfrag_t df, CInode *diri)
 
 void Migrator::import_reverse_prepping(CDir *dir)
 {
-  import_pending_msg.erase(dir->dirfrag());
   set<CDir*> bounds;
   cache->map_dirfrag_set(import_state[dir->dirfrag()].bound_ls, bounds);
   import_remove_pins(dir, bounds);
@@ -1759,18 +1771,21 @@ void Migrator::handle_export_prep(MExportDirPrep *m)
   map<dirfrag_t,import_state_t>::iterator it = import_state.find(m->get_dirfrag());
   if (!m->did_assim()) {
     assert(it != import_state.end());
+    assert(it->second.state == IMPORT_DISCOVERED);
     diri = cache->get_inode(m->get_dirfrag().ino);
     assert(diri);
     bufferlist::iterator p = m->basedir.begin();
     dir = cache->add_replica_dir(p, diri, oldauth, finished);
     dout(7) << "handle_export_prep on " << *dir << " (first pass)" << dendl;
   } else {
-    if (import_pending_msg.count(m->get_dirfrag()) == 0 ||
-       import_pending_msg[m->get_dirfrag()] != m) {
+    if (it == import_state.end() ||
+       it->second.peer != oldauth ||
+       it->second.tid != m->get_tid()) {
       dout(7) << "handle_export_prep obsolete message, dropping" << dendl;
       m->put();
       return;
     }
+    assert(it->second.state == IMPORT_PREPPING);
 
     dir = cache->get_dirfrag(m->get_dirfrag());
     assert(dir);
@@ -1794,7 +1809,6 @@ void Migrator::handle_export_prep(MExportDirPrep *m)
   if (!m->did_assim()) {
     dout(7) << "doing assim on " << *dir << dendl;
     m->mark_assim();  // only do this the first time!
-    import_pending_msg[dir->dirfrag()] = m;
 
     // change import state
     it->second.state = IMPORT_PREPPING;
@@ -1917,11 +1931,11 @@ void Migrator::handle_export_prep(MExportDirPrep *m)
 
   // ok!
   dout(7) << " sending export_prep_ack on " << *dir << dendl;
-  mds->send_message(new MExportDirPrepAck(dir->dirfrag()), m->get_connection());
+  mds->send_message(new MExportDirPrepAck(dir->dirfrag(), m->get_tid()), m->get_connection());
   
   // note new state
   it->second.state = IMPORT_PREPPED;
-  import_pending_msg.erase(dir->dirfrag());
+
   assert(g_conf->mds_kill_import_at != 4);
   // done 
   m->put();
@@ -1955,6 +1969,11 @@ void Migrator::handle_export_dir(MExportDir *m)
   CDir *dir = cache->get_dirfrag(m->dirfrag);
   assert(dir);
   
+  map<dirfrag_t,import_state_t>::iterator it = import_state.find(m->dirfrag);
+  assert(it != import_state.end());
+  assert(it->second.state == IMPORT_PREPPED);
+  assert(it->second.tid == m->get_tid());
+
   utime_t now = ceph_clock_now(g_ceph_context);
   int oldauth = m->get_source().num();
   dout(7) << "handle_export_dir importing " << *dir << " from " << oldauth << dendl;
@@ -1990,8 +2009,8 @@ void Migrator::handle_export_dir(MExportDir *m)
                        dir,                 // import root
                        le,
                        mds->mdlog->get_current_segment(),
-                       import_state[dir->dirfrag()].peer_exports,
-                       import_state[dir->dirfrag()].updated_scatterlocks,
+                       it->second.peer_exports,
+                       it->second.updated_scatterlocks,
                        now);
   }
   dout(10) << " " << m->bounds.size() << " imported bounds" << dendl;
@@ -2010,7 +2029,7 @@ void Migrator::handle_export_dir(MExportDir *m)
   dout(7) << "handle_export_dir did " << *dir << dendl;
 
   // note state
-  import_state[dir->dirfrag()].state = IMPORT_LOGGINGSTART;
+  it->second.state = IMPORT_LOGGINGSTART;
   assert (g_conf->mds_kill_import_at != 6);
 
   // log it
@@ -2190,7 +2209,7 @@ void Migrator::import_notify_finish(CDir *dir, set<CDir*>& bounds)
        p != stat.bystanders.end();
        ++p) {
     MExportDirNotify *notify =
-      new MExportDirNotify(dir->dirfrag(), false,
+      new MExportDirNotify(dir->dirfrag(), stat.tid, false,
                           pair<int,int>(stat.peer, mds->get_nodeid()),
                           pair<int,int>(mds->get_nodeid(), CDIR_AUTH_UNKNOWN));
     for (set<CDir*>::iterator i = bounds.begin(); i != bounds.end(); ++i)
@@ -2208,7 +2227,7 @@ void Migrator::import_notify_abort(CDir *dir, set<CDir*>& bounds)
        p != stat.bystanders.end();
        ++p) {
     MExportDirNotify *notify =
-      new MExportDirNotify(dir->dirfrag(), true,
+      new MExportDirNotify(dir->dirfrag(), stat.tid, true,
                           pair<int,int>(stat.peer, mds->get_nodeid()),
                           pair<int,int>(stat.peer, CDIR_AUTH_UNKNOWN));
     for (set<CDir*>::iterator i = bounds.begin(); i != bounds.end(); ++i)
@@ -2259,7 +2278,7 @@ void Migrator::import_logged_start(dirfrag_t df, CDir *dir, int from,
   dout(7) << "import_logged " << *dir << dendl;
 
   // note state
-  import_state[dir->dirfrag()].state = IMPORT_ACKING;
+  it->second.state = IMPORT_ACKING;
 
   assert (g_conf->mds_kill_import_at != 7);
 
@@ -2278,7 +2297,7 @@ void Migrator::import_logged_start(dirfrag_t df, CDir *dir, int from,
   // test surviving observer of a failed migration that did not complete
   //assert(dir->replica_map.size() < 2 || mds->whoami != 0);
 
-  mds->send_message_mds(new MExportDirAck(dir->dirfrag()), from);
+  mds->send_message_mds(new MExportDirAck(dir->dirfrag(), it->second.tid), from);
   assert (g_conf->mds_kill_import_at != 8);
 
   cache->show_subtrees();
@@ -2290,6 +2309,12 @@ void Migrator::handle_export_finish(MExportDirFinish *m)
   CDir *dir = cache->get_dirfrag(m->get_dirfrag());
   assert(dir);
   dout(7) << "handle_export_finish on " << *dir << dendl;
+
+  map<dirfrag_t,import_state_t>::iterator it = import_state.find(m->get_dirfrag());
+  assert(it != import_state.end());
+  assert(it->second.state == IMPORT_ACKING);
+  assert(it->second.tid == m->get_tid());
+
   import_finish(dir, false);
   m->put();
 }
@@ -2616,7 +2641,7 @@ void Migrator::handle_export_notify(MExportDirNotify *m)
   
   // send ack
   if (m->wants_ack()) {
-    mds->send_message_mds(new MExportDirNotifyAck(m->get_dirfrag()), from);
+    mds->send_message_mds(new MExportDirNotifyAck(m->get_dirfrag(), m->get_tid()), from);
   } else {
     // aborted.  no ack.
     dout(7) << "handle_export_notify no ack requested" << dendl;
index dff772c2276ffc018f5aeaaf336c6d912e96a3d4..c616543a5e12184214ec613cdff9e2b1994dfdfd 100644 (file)
@@ -54,6 +54,7 @@ private:
   MDS *mds;
   MDCache *cache;
 
+  uint64_t last_export_tid;
   // -- exports --
 public:
   // export stages.  used to clean up intelligently if there's a failure.
@@ -83,6 +84,7 @@ protected:
   struct export_state_t {
     int state;
     int peer;
+    uint64_t tid;
     set<SimpleLock*> locks;
     set<int> warning_ack_waiting;
     set<int> notify_ack_waiting;
@@ -128,6 +130,7 @@ protected:
   struct import_state_t {
     int state;
     int peer;
+    uint64_t tid;
     set<int> bystanders;
     list<dirfrag_t> bound_ls;
     list<ScatterLock*> updated_scatterlocks;
@@ -135,12 +138,10 @@ protected:
   };
 
   map<dirfrag_t, import_state_t>  import_state;
-  map<dirfrag_t,Message*>         import_pending_msg;
-
 
 public:
   // -- cons --
-  Migrator(MDS *m, MDCache *c) : mds(m), cache(c) {}
+  Migrator(MDS *m, MDCache *c) : mds(m), cache(c), last_export_tid(0) {}
 
   void dispatch(Message*);
 
@@ -257,7 +258,7 @@ public:
   void export_frozen(CDir *dir);
   void handle_export_prep_ack(MExportDirPrepAck *m);
   void export_go(CDir *dir);
-  void export_go_synced(CDir *dir);
+  void export_go_synced(CDir *dir, uint64_t tid);
   void export_try_cancel(CDir *dir);
   void export_reverse(CDir *dir);
   void export_notify_abort(CDir *dir, set<CDir*>& bounds);
index 688526d8dcdcbb9cb44982dbb33444440d0bd73c..2ff85d4f212afade6adff0882af887b1183a110e 100644 (file)
@@ -27,9 +27,9 @@ class MExportDir : public Message {
   bufferlist client_map;
 
   MExportDir() : Message(MSG_MDS_EXPORTDIR) {}
-  MExportDir(dirfrag_t df) : 
-    Message(MSG_MDS_EXPORTDIR),
-    dirfrag(df) {
+  MExportDir(dirfrag_t df, uint64_t tid) :
+    Message(MSG_MDS_EXPORTDIR), dirfrag(df) {
+    set_tid(tid);
   }
 private:
   ~MExportDir() {}
index 42c5c5d48cea9c8db61597f36892702f20b6fe17..ae159610d0b8d13ab16eac6c14397343df243104 100644 (file)
@@ -24,8 +24,10 @@ class MExportDirAck : public Message {
   dirfrag_t get_dirfrag() { return dirfrag; }
   
   MExportDirAck() : Message(MSG_MDS_EXPORTDIRACK) {}
-  MExportDirAck(dirfrag_t i) :
-    Message(MSG_MDS_EXPORTDIRACK), dirfrag(i) { }
+  MExportDirAck(dirfrag_t df, uint64_t tid) :
+    Message(MSG_MDS_EXPORTDIRACK), dirfrag(df) {
+    set_tid(tid);
+  }
 private:
   ~MExportDirAck() {}
 
index 9cd06021bd49205da3db19426245dce8bbd7a9da..b3b2c5825ef3601ed7b26732b80df4ac8b1bd63b 100644 (file)
@@ -25,9 +25,10 @@ class MExportDirCancel : public Message {
   dirfrag_t get_dirfrag() { return dirfrag; }
 
   MExportDirCancel() : Message(MSG_MDS_EXPORTDIRCANCEL) {}
-  MExportDirCancel(dirfrag_t df) : 
-    Message(MSG_MDS_EXPORTDIRCANCEL),
-       dirfrag(df) { }
+  MExportDirCancel(dirfrag_t df, uint64_t tid) :
+    Message(MSG_MDS_EXPORTDIRCANCEL), dirfrag(df) {
+    set_tid(tid);
+  }
 private:
   ~MExportDirCancel() {}
 
index 7602bd91ee970aca290d4d7de13b65084eeec01e..0b29daf14f54c50f6368a2b15cdb4b6022b76695 100644 (file)
@@ -34,13 +34,11 @@ class MExportDirDiscover : public Message {
   MExportDirDiscover() :     
     Message(MSG_MDS_EXPORTDIRDISCOVER),
     started(false) { }
-  MExportDirDiscover(int f, filepath& p, dirfrag_t df) : 
+  MExportDirDiscover(dirfrag_t df, filepath& p, int f, uint64_t tid) :
     Message(MSG_MDS_EXPORTDIRDISCOVER),
-    from(f), 
-    dirfrag(df),
-    path(p),
-    started(false)
-  { }
+    from(f), dirfrag(df), path(p), started(false) {
+    set_tid(tid);
+  }
 private:
   ~MExportDirDiscover() {}
 
index 82eeb5ea82c3d0d021f4e4610f1457689e482d4a..8e20be604bed8dd63e82c7cd9276e5fb646b53ed 100644 (file)
@@ -28,10 +28,11 @@ class MExportDirDiscoverAck : public Message {
   bool is_success() { return success; }
 
   MExportDirDiscoverAck() : Message(MSG_MDS_EXPORTDIRDISCOVERACK) {}
-  MExportDirDiscoverAck(dirfrag_t df, bool s=true) : 
+  MExportDirDiscoverAck(dirfrag_t df, uint64_t tid, bool s=true) :
     Message(MSG_MDS_EXPORTDIRDISCOVERACK),
-    dirfrag(df),
-    success(s) { }
+    dirfrag(df), success(s) {
+    set_tid(tid);
+  }
 private:
   ~MExportDirDiscoverAck() {}
 
index 6d05885f3e0bd50eeaeef7e5b8672e32991523d4..66d6f27eae26c1b8e5505b0a10fc60ce0a0828ed 100644 (file)
@@ -24,9 +24,9 @@ class MExportDirFinish : public Message {
   dirfrag_t get_dirfrag() { return dirfrag; }
   
   MExportDirFinish() {}
-  MExportDirFinish(dirfrag_t dirfrag) :
-    Message(MSG_MDS_EXPORTDIRFINISH) {
-    this->dirfrag = dirfrag;
+  MExportDirFinish(dirfrag_t df, uint64_t tid) :
+    Message(MSG_MDS_EXPORTDIRFINISH), dirfrag(df) {
+    set_tid(tid);
   }
 private:
   ~MExportDirFinish() {}
index a209901d48b4256dd6a282874b333ef4dc02ad05..745cf95f709db4f8b9fef8307066dcef8ce28e8c 100644 (file)
@@ -33,9 +33,11 @@ class MExportDirNotify : public Message {
   list<dirfrag_t>& get_bounds() { return bounds; }
 
   MExportDirNotify() {}
-  MExportDirNotify(dirfrag_t i, bool a, pair<__s32,__s32> oa, pair<__s32,__s32> na) :
+  MExportDirNotify(dirfrag_t i, uint64_t tid, bool a, pair<__s32,__s32> oa, pair<__s32,__s32> na) :
     Message(MSG_MDS_EXPORTDIRNOTIFY),
-    base(i), ack(a), old_auth(oa), new_auth(na) { }
+    base(i), ack(a), old_auth(oa), new_auth(na) {
+    set_tid(tid);
+  }
 private:
   ~MExportDirNotify() {}
 
index 35a6956185be85187de01683fb27bf5f9d5a1bbd..7b52c4f607c9fed499755f576fab3130f8a067a5 100644 (file)
@@ -26,9 +26,9 @@ class MExportDirNotifyAck : public Message {
   dirfrag_t get_dirfrag() { return dirfrag; }
   
   MExportDirNotifyAck() {}
-  MExportDirNotifyAck(dirfrag_t dirfrag) :
-    Message(MSG_MDS_EXPORTDIRNOTIFYACK) {
-    this->dirfrag = dirfrag;
+  MExportDirNotifyAck(dirfrag_t df, uint64_t tid) :
+    Message(MSG_MDS_EXPORTDIRNOTIFYACK), dirfrag(df) {
+    set_tid(tid);
   }
 private:
   ~MExportDirNotifyAck() {}
index 12f358ece037c6346d835d230288778b38f831b1..791a98abc18f63015e1f864a23d09658f1727c9a 100644 (file)
@@ -40,10 +40,11 @@ public:
   MExportDirPrep() {
     b_did_assim = false;
   }
-  MExportDirPrep(dirfrag_t df) : 
+  MExportDirPrep(dirfrag_t df, uint64_t tid) :
     Message(MSG_MDS_EXPORTDIRPREP),
-    dirfrag(df),
-    b_did_assim(false) { }
+    dirfrag(df), b_did_assim(false) {
+    set_tid(tid);
+  }
 private:
   ~MExportDirPrep() {}
 
index b5d180d5765bf4c436857ea3c9a7a5802bcdfa2b..c3d7b498701969bea3e9bbcb88b732c8ae211f40 100644 (file)
@@ -25,9 +25,10 @@ class MExportDirPrepAck : public Message {
   dirfrag_t get_dirfrag() { return dirfrag; }
   
   MExportDirPrepAck() {}
-  MExportDirPrepAck(dirfrag_t df) :
-    Message(MSG_MDS_EXPORTDIRPREPACK),
-    dirfrag(df) { }
+  MExportDirPrepAck(dirfrag_t df, uint64_t tid) :
+    Message(MSG_MDS_EXPORTDIRPREPACK), dirfrag(df) {
+    set_tid(tid);
+  }
 private:
   ~MExportDirPrepAck() {}