]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
mds: handle fragment notify race
authorYan, Zheng <zyan@redhat.com>
Wed, 17 Oct 2018 03:07:53 +0000 (11:07 +0800)
committerYan, Zheng <zyan@redhat.com>
Wed, 17 Oct 2018 04:54:01 +0000 (12:54 +0800)
In the nornal case, mds does not trim dir inode whose child dirfrags
are likely being fragmented (see trim_inode()). But when fragmenting
subtree roots, following race can happen:

- mds.a (auth mds of dirfrag) sends fragment_notify message to
  mds.c and drops wrlock on dirfragtreelock.
- mds.b (auth mds of dir inode) changes dirfragtreelock state to
  SYNC and send lock message mds.c
- mds.c receives the lock message and changes dirfragtreelock state
  to SYNC
- mds.c trim dirfrag and dir inode from its cache
- mds.c receives the fragment_notify message

The fix is asking replicas to ack fragment_notify message, unlocking
dirfragtreelock after mds gets all acks.

Fixes: http://tracker.ceph.com/issues/36035
Signed-off-by: "Yan, Zheng" <zyan@redhat.com>
src/mds/Locker.cc
src/mds/Locker.h
src/mds/MDCache.cc
src/mds/MDCache.h
src/messages/MMDSFragmentNotify.h
src/messages/MMDSFragmentNotifyAck.h [new file with mode: 0644]
src/msg/Message.cc
src/msg/Message.h

index 2bc6c4175df825105b5a7439b8c239fd3b2c188a..8ea4043db2c45c3741a045bae86dea4c04fcaca2 100644 (file)
@@ -785,6 +785,24 @@ void Locker::drop_rdlocks_for_early_reply(MutationImpl *mut)
   issue_caps_set(need_issue);
 }
 
+void Locker::drop_locks_for_fragment_unfreeze(MutationImpl *mut)
+{
+  set<CInode*> need_issue;
+
+  for (auto it = mut->locks.begin(); it != mut->locks.end(); ) {
+    SimpleLock *lock = it->lock;
+    if (lock->get_type() == CEPH_LOCK_IDFT) {
+      ++it;
+      continue;
+    }
+    bool ni = false;
+    wrlock_finish(it++, mut, &ni);
+    if (ni)
+      need_issue.insert(static_cast<CInode*>(lock->get_parent()));
+  }
+  issue_caps_set(need_issue);
+}
+
 // generics
 
 void Locker::eval_gather(SimpleLock *lock, bool first, bool *pneed_issue, MDSInternalContextBase::vec *pfinishers)
index 2b935c11c3a895e5c0ab6b2524c417e1f686d6ed..24b9d7820d1adf57b992e4bbd91c967b972ddd8f 100644 (file)
@@ -86,6 +86,7 @@ public:
   void set_xlocks_done(MutationImpl *mut, bool skip_dentry=false);
   void drop_non_rdlocks(MutationImpl *mut, set<CInode*> *pneed_issue=0);
   void drop_rdlocks_for_early_reply(MutationImpl *mut);
+  void drop_locks_for_fragment_unfreeze(MutationImpl *mut);
 
   void eval_gather(SimpleLock *lock, bool first=false, bool *need_issue=0, MDSInternalContextBase::vec *pfinishers=0);
   void eval(SimpleLock *lock, bool *need_issue);
index 8743ec9955a89bad7caf89b27f400711262798bf..f34217d12f7d61d1f3de5f47f240c8e7f9f61c4c 100644 (file)
@@ -3054,9 +3054,19 @@ void MDCache::handle_mds_failure(mds_rank_t who)
        p != fragments.end(); ) {
     dirfrag_t df = p->first;
     fragment_info_t& info = p->second;
-    ++p;
-    if (info.is_fragmenting())
+
+    if (info.is_fragmenting()) {
+      if (info.notify_ack_waiting.erase(who) &&
+         info.notify_ack_waiting.empty()) {
+       fragment_drop_locks(info);
+       fragment_maybe_finish(p++);
+      } else {
+       ++p;
+      }
       continue;
+    }
+
+    ++p;
     dout(10) << "cancelling fragment " << df << " bit " << info.bits << dendl;
     list<CDir*> dirs;
     info.dirs.swap(dirs);
@@ -7934,6 +7944,9 @@ void MDCache::dispatch(const Message::const_ref &m)
   case MSG_MDS_FRAGMENTNOTIFY:
     handle_fragment_notify(MMDSFragmentNotify::msgref_cast(m));
     break;
+  case MSG_MDS_FRAGMENTNOTIFYACK:
+    handle_fragment_notify_ack(MMDSFragmentNotifyAck::msgref_cast(m));
+    break;
 
   case MSG_MDS_FINDINO:
     handle_find_ino(MMDSFindIno::msgref_cast(m));
@@ -11321,29 +11334,29 @@ public:
 
 class C_MDC_FragmentCommit : public MDCacheLogContext {
   dirfrag_t basedirfrag;
-  list<CDir*> resultfrags;
+  MDRequestRef mdr;
 public:
-  C_MDC_FragmentCommit(MDCache *m, dirfrag_t df, list<CDir*>& l) :
-    MDCacheLogContext(m), basedirfrag(df), resultfrags(l) {}
+  C_MDC_FragmentCommit(MDCache *m, dirfrag_t df, const MDRequestRef& r) :
+    MDCacheLogContext(m), basedirfrag(df), mdr(r) {}
   void finish(int r) override {
-    mdcache->_fragment_committed(basedirfrag, resultfrags);
+    mdcache->_fragment_committed(basedirfrag, mdr);
   }
 };
 
-class C_IO_MDC_FragmentFinish : public MDCacheIOContext {
+class C_IO_MDC_FragmentPurgeOld : public MDCacheIOContext {
   dirfrag_t basedirfrag;
-  list<CDir*> resultfrags;
+  int bits;
+  MDRequestRef mdr;
 public:
-  C_IO_MDC_FragmentFinish(MDCache *m, dirfrag_t f, list<CDir*>& l) :
-    MDCacheIOContext(m), basedirfrag(f) {
-    resultfrags.swap(l);
-  }
+  C_IO_MDC_FragmentPurgeOld(MDCache *m, dirfrag_t f, int b,
+                           const MDRequestRef& r) :
+    MDCacheIOContext(m), basedirfrag(f), bits(b), mdr(r) {}
   void finish(int r) override {
     ceph_assert(r == 0 || r == -ENOENT);
-    mdcache->_fragment_finish(basedirfrag, resultfrags);
+    mdcache->_fragment_old_purged(basedirfrag, bits, mdr);
   }
   void print(ostream& out) const override {
-    out << "dirfrags_commit(" << basedirfrag << ")";
+    out << "fragment_purge_old(" << basedirfrag << ")";
   }
 };
 
@@ -11472,13 +11485,12 @@ void MDCache::dispatch_fragment_dir(MDRequestRef& mdr)
 void MDCache::_fragment_logged(MDRequestRef& mdr)
 {
   dirfrag_t basedirfrag = mdr->more()->fragment_base;
-  map<dirfrag_t,fragment_info_t>::iterator it = fragments.find(basedirfrag);
-  ceph_assert(it != fragments.end());
-  fragment_info_t &info = it->second;
+  auto& info = fragments.at(basedirfrag);
   CInode *diri = info.resultfrags.front()->get_inode();
 
   dout(10) << "fragment_logged " << basedirfrag << " bits " << info.bits
           << " on " << *diri << dendl;
+  mdr->mark_event("prepare logged");
 
   if (diri->is_auth())
     diri->pop_and_dirty_projected_inode(mdr->ls);
@@ -11506,23 +11518,46 @@ void MDCache::_fragment_logged(MDRequestRef& mdr)
 void MDCache::_fragment_stored(MDRequestRef& mdr)
 {
   dirfrag_t basedirfrag = mdr->more()->fragment_base;
-  map<dirfrag_t,fragment_info_t>::iterator it = fragments.find(basedirfrag);
-  ceph_assert(it != fragments.end());
-  fragment_info_t &info = it->second;
-  CInode *diri = info.resultfrags.front()->get_inode();
+  fragment_info_t &info = fragments.at(basedirfrag);
+  CDir *first = info.resultfrags.front();
+  CInode *diri = first->get_inode();
 
   dout(10) << "fragment_stored " << basedirfrag << " bits " << info.bits
           << " on " << *diri << dendl;
+  mdr->mark_event("new frags stored");
 
   // tell peers
-  CDir *first = *info.resultfrags.begin();
+  mds_rank_t diri_auth = (first->is_subtree_root() && !diri->is_auth()) ?
+                         diri->authority().first : CDIR_AUTH_UNKNOWN;
   for (const auto &p : first->get_replicas()) {
     if (mds->mdsmap->get_state(p.first) < MDSMap::STATE_REJOIN ||
        (mds->mdsmap->get_state(p.first) == MDSMap::STATE_REJOIN &&
         rejoin_gather.count(p.first)))
       continue;
 
-    auto notify = MMDSFragmentNotify::create(basedirfrag, info.bits);
+    auto notify = MMDSFragmentNotify::create(basedirfrag, info.bits, mdr->reqid.tid);
+    if (diri_auth != CDIR_AUTH_UNKNOWN && // subtree root
+       diri_auth != p.first) { // not auth mds of diri
+      /*
+       * In the nornal case, mds does not trim dir inode whose child dirfrags
+       * are likely being fragmented (see trim_inode()). But when fragmenting
+       * subtree roots, following race can happen:
+       *
+       * - mds.a (auth mds of dirfrag) sends fragment_notify message to
+       *   mds.c and drops wrlock on dirfragtreelock.
+       * - mds.b (auth mds of dir inode) changes dirfragtreelock state to
+       *   SYNC and send lock message mds.c
+       * - mds.c receives the lock message and changes dirfragtreelock state
+       *   to SYNC
+       * - mds.c trim dirfrag and dir inode from its cache
+       * - mds.c receives the fragment_notify message
+       *
+       * So we need to ensure replicas have received the notify, then unlock
+       * the dirfragtreelock.
+       */
+      notify->mark_ack_wanted();
+      info.notify_ack_waiting.insert(p.first);
+    }
 
     // freshly replicate new dirs to peers
     for (list<CDir*>::iterator q = info.resultfrags.begin();
@@ -11535,10 +11570,8 @@ void MDCache::_fragment_stored(MDRequestRef& mdr)
 
   // journal commit
   EFragment *le = new EFragment(mds->mdlog, EFragment::OP_COMMIT, basedirfrag, info.bits);
-  mds->mdlog->start_submit_entry(le, new C_MDC_FragmentCommit(this, basedirfrag,
-                                                             info.resultfrags));
+  mds->mdlog->start_submit_entry(le, new C_MDC_FragmentCommit(this, basedirfrag, mdr));
 
-  mds->locker->drop_locks(mdr.get());
 
   // unfreeze resulting frags
   for (list<CDir*>::iterator p = info.resultfrags.begin();
@@ -11558,22 +11591,26 @@ void MDCache::_fragment_stored(MDRequestRef& mdr)
     dir->unfreeze_dir();
   }
 
-  fragments.erase(it);
-  request_finish(mdr);
+  if (info.notify_ack_waiting.empty()) {
+    fragment_drop_locks(info);
+  } else {
+    mds->locker->drop_locks_for_fragment_unfreeze(mdr.get());
+  }
 }
 
-void MDCache::_fragment_committed(dirfrag_t basedirfrag, list<CDir*>& resultfrags)
+void MDCache::_fragment_committed(dirfrag_t basedirfrag, const MDRequestRef& mdr)
 {
   dout(10) << "fragment_committed " << basedirfrag << dendl;
-  map<dirfrag_t, ufragment>::iterator it = uncommitted_fragments.find(basedirfrag);
-  ceph_assert(it != uncommitted_fragments.end());
-  ufragment &uf = it->second;
+  if (mdr)
+    mdr->mark_event("commit logged");
+
+  ufragment &uf = uncommitted_fragments.at(basedirfrag);
 
   // remove old frags
   C_GatherBuilder gather(
     g_ceph_context,
     new C_OnFinisher(
-      new C_IO_MDC_FragmentFinish(this, basedirfrag, resultfrags),
+      new C_IO_MDC_FragmentPurgeOld(this, basedirfrag, uf.bits, mdr),
       mds->finisher));
 
   SnapContext nullsnapc;
@@ -11601,16 +11638,50 @@ void MDCache::_fragment_committed(dirfrag_t basedirfrag, list<CDir*>& resultfrag
   gather.activate();
 }
 
-void MDCache::_fragment_finish(dirfrag_t basedirfrag, list<CDir*>& resultfrags)
+void MDCache::_fragment_old_purged(dirfrag_t basedirfrag, int bits, const MDRequestRef& mdr)
 {
-  dout(10) << "fragment_finish " << basedirfrag << "resultfrags.size="
-           << resultfrags.size() << dendl;
-  map<dirfrag_t, ufragment>::iterator it = uncommitted_fragments.find(basedirfrag);
-  ceph_assert(it != uncommitted_fragments.end());
-  ufragment &uf = it->second;
+  dout(10) << "fragment_old_purged " << basedirfrag << dendl;
+  if (mdr)
+    mdr->mark_event("old frags purged");
+
+  EFragment *le = new EFragment(mds->mdlog, EFragment::OP_FINISH, basedirfrag, bits);
+  mds->mdlog->start_submit_entry(le);
+
+  finish_uncommitted_fragment(basedirfrag, EFragment::OP_FINISH);
+
+  if (mds->logger) {
+    if (bits > 0) {
+      mds->logger->inc(l_mds_dir_split);
+    } else {
+      mds->logger->inc(l_mds_dir_merge);
+    }
+  }
+
+  if (mdr) {
+    auto it = fragments.find(basedirfrag);
+    ceph_assert(it != fragments.end());
+    it->second.finishing = true;
+    if (it->second.notify_ack_waiting.empty())
+      fragment_maybe_finish(it);
+    else
+      mdr->mark_event("wating for notify acks");
+  }
+}
+
+void MDCache::fragment_drop_locks(fragment_info_t& info)
+{
+  mds->locker->drop_locks(info.mdr.get());
+  request_finish(info.mdr);
+  //info.mdr.reset();
+}
+
+void MDCache::fragment_maybe_finish(const fragment_info_iterator& it)
+{
+  if (!it->second.finishing)
+    return;
 
   // unmark & auth_unpin
-  for (const auto &dir : resultfrags) {
+  for (const auto &dir : it->second.resultfrags) {
     dir->state_clear(CDir::STATE_FRAGMENTING);
     dir->auth_unpin(this);
 
@@ -11621,23 +11692,37 @@ void MDCache::_fragment_finish(dirfrag_t basedirfrag, list<CDir*>& resultfrags)
     mds->balancer->maybe_fragment(dir, false);
   }
 
-  if (mds->logger) {
-    if (resultfrags.size() > 1) {
-      mds->logger->inc(l_mds_dir_split);
-    } else {
-      mds->logger->inc(l_mds_dir_merge);
-    }
+  fragments.erase(it);
+}
+
+
+void MDCache::handle_fragment_notify_ack(const MMDSFragmentNotifyAck::const_ref &ack)
+{
+  dout(10) << "handle_fragment_notify_ack " << *ack << " from " << ack->get_source() << dendl;
+  mds_rank_t from = mds_rank_t(ack->get_source().num());
+
+  if (mds->get_state() < MDSMap::STATE_ACTIVE) {
+    return;
   }
 
-  EFragment *le = new EFragment(mds->mdlog, EFragment::OP_FINISH, basedirfrag, uf.bits);
-  mds->mdlog->start_submit_entry(le);
+  auto it = fragments.find(ack->get_base_dirfrag());
+  if (it == fragments.end() ||
+      it->second.get_tid() != ack->get_tid()) {
+    dout(10) << "handle_fragment_notify_ack obsolete message, dropping" << dendl;
+    return;
+  }
 
-  finish_uncommitted_fragment(basedirfrag, EFragment::OP_FINISH);
+  if (it->second.notify_ack_waiting.erase(from) &&
+      it->second.notify_ack_waiting.empty()) {
+    fragment_drop_locks(it->second);
+    fragment_maybe_finish(it);
+  }
 }
 
 void MDCache::handle_fragment_notify(const MMDSFragmentNotify::const_ref &notify)
 {
   dout(10) << "handle_fragment_notify " << *notify << " from " << notify->get_source() << dendl;
+  mds_rank_t from = mds_rank_t(notify->get_source().num());
 
   if (mds->get_state() < MDSMap::STATE_REJOIN) {
     return;
@@ -11670,12 +11755,18 @@ void MDCache::handle_fragment_notify(const MMDSFragmentNotify::const_ref &notify
     // add new replica dirs values
     auto p = notify->basebl.cbegin();
     while (!p.end())
-      add_replica_dir(p, diri, mds_rank_t(notify->get_source().num()), waiters);
+      add_replica_dir(p, diri, from, waiters);
 
     mds->queue_waiters(waiters);
   } else {
     ceph_abort();
   }
+
+  if (notify->is_ack_wanted()) {
+    auto ack = MMDSFragmentNotifyAck::create(notify->get_base_dirfrag(),
+                                            notify->get_bits(), notify->get_tid());
+    mds->send_message_mds(ack, from);
+  }
 }
 
 void MDCache::add_uncommitted_fragment(dirfrag_t basedirfrag, int bits, list<frag_t>& old_frags,
@@ -11737,14 +11828,7 @@ void MDCache::rollback_uncommitted_fragments()
     ceph_assert(diri);
 
     if (uf.committed) {
-      list<CDir*> frags;
-      diri->get_dirfrags_under(p->first.frag, frags);
-      for (list<CDir*>::iterator q = frags.begin(); q != frags.end(); ++q) {
-       CDir *dir = *q;
-       dir->auth_pin(this);
-       dir->state_set(CDir::STATE_FRAGMENTING);
-      }
-      _fragment_committed(p->first, frags);
+      _fragment_committed(p->first, MDRequestRef());
       continue;
     }
 
@@ -11816,16 +11900,10 @@ void MDCache::rollback_uncommitted_fragments()
     for (list<frag_t>::iterator q = old_frags.begin(); q != old_frags.end(); ++q)
       ceph_assert(!diri->dirfragtree.is_leaf(*q));
 
-    for (list<CDir*>::iterator q = resultfrags.begin(); q != resultfrags.end(); ++q) {
-      CDir *dir = *q;
-      dir->auth_pin(this);
-      dir->state_set(CDir::STATE_FRAGMENTING);
-    }
-
     mds->mdlog->submit_entry(le);
 
     uf.old_frags.swap(old_frags);
-    _fragment_committed(p->first, resultfrags);
+    _fragment_committed(p->first, MDRequestRef());
   }
 }
 
index 1bbefffdcb49c0733504ebf75238b175822fa058..0e929c8ee29e1622236ffc15087df1918b46a696 100644 (file)
@@ -40,6 +40,7 @@
 #include "messages/MMDSFindIno.h"
 #include "messages/MMDSFindInoReply.h"
 #include "messages/MMDSFragmentNotify.h"
+#include "messages/MMDSFragmentNotifyAck.h"
 #include "messages/MMDSOpenIno.h"
 #include "messages/MMDSOpenInoReply.h"
 #include "messages/MMDSResolve.h"
@@ -1125,15 +1126,20 @@ private:
     list<CDir*> dirs;
     list<CDir*> resultfrags;
     MDRequestRef mdr;
+    set<mds_rank_t> notify_ack_waiting;
+    bool finishing = false;
+
     // for deadlock detection
-    bool all_frozen;
+    bool all_frozen = false;
     utime_t last_cum_auth_pins_change;
-    int last_cum_auth_pins;
-    int num_remote_waiters;    // number of remote authpin waiters
-    fragment_info_t() : bits(0), all_frozen(false), last_cum_auth_pins(0), num_remote_waiters(0) {}
+    int last_cum_auth_pins = 0;
+    int num_remote_waiters = 0;        // number of remote authpin waiters
+    fragment_info_t() {}
     bool is_fragmenting() { return !resultfrags.empty(); }
+    uint64_t get_tid() { return mdr ? mdr->reqid.tid : 0; }
   };
   map<dirfrag_t,fragment_info_t> fragments;
+  typedef map<dirfrag_t,fragment_info_t>::iterator fragment_info_iterator;
 
   void adjust_dir_fragments(CInode *diri, frag_t basefrag, int bits,
                            list<CDir*>& frags, MDSInternalContextBase::vec& waiters, bool replay);
@@ -1151,11 +1157,13 @@ private:
   void fragment_mark_and_complete(MDRequestRef& mdr);
   void fragment_frozen(MDRequestRef& mdr, int r);
   void fragment_unmark_unfreeze_dirs(list<CDir*>& dirs);
+  void fragment_drop_locks(fragment_info_t &info);
+  void fragment_maybe_finish(const fragment_info_iterator& it);
   void dispatch_fragment_dir(MDRequestRef& mdr);
   void _fragment_logged(MDRequestRef& mdr);
   void _fragment_stored(MDRequestRef& mdr);
-  void _fragment_committed(dirfrag_t f, list<CDir*>& resultfrags);
-  void _fragment_finish(dirfrag_t f, list<CDir*>& resultfrags);
+  void _fragment_committed(dirfrag_t f, const MDRequestRef& mdr);
+  void _fragment_old_purged(dirfrag_t f, int bits, const MDRequestRef& mdr);
 
   friend class EFragment;
   friend class C_MDC_FragmentFrozen;
@@ -1163,9 +1171,10 @@ private:
   friend class C_MDC_FragmentPrep;
   friend class C_MDC_FragmentStore;
   friend class C_MDC_FragmentCommit;
-  friend class C_IO_MDC_FragmentFinish;
+  friend class C_IO_MDC_FragmentPurgeOld;
 
   void handle_fragment_notify(const MMDSFragmentNotify::const_ref &m);
+  void handle_fragment_notify_ack(const MMDSFragmentNotifyAck::const_ref &m);
 
   void add_uncommitted_fragment(dirfrag_t basedirfrag, int bits, list<frag_t>& old_frag,
                                LogSegment *ls, bufferlist *rollback=NULL);
index 37e78f88e7b99b185be8d34e1e352a2d6af7827c..69dff281a202dfa21a657d3f11a242a4f9277366 100644 (file)
@@ -21,44 +21,53 @@ class MMDSFragmentNotify : public MessageInstance<MMDSFragmentNotify> {
 public:
   friend factory;
 private:
-  inodeno_t ino;
-  frag_t basefrag;
+  static constexpr int HEAD_VERSION = 2;
+  static constexpr int COMPAT_VERSION = 1;
+
+  dirfrag_t base_dirfrag;
   int8_t bits = 0;
+  bool ack_wanted = false;
 
  public:
-  inodeno_t get_ino() const { return ino; }
-  frag_t get_basefrag() const { return basefrag; }
+  inodeno_t get_ino() const { return base_dirfrag.ino; }
+  frag_t get_basefrag() const { return base_dirfrag.frag; }
+  dirfrag_t get_base_dirfrag() const { return base_dirfrag; }
   int get_bits() const { return bits; }
+  bool is_ack_wanted() const { return ack_wanted; }
+  void mark_ack_wanted() { ack_wanted = true; }
 
   bufferlist basebl;
 
 protected:
-  MMDSFragmentNotify() : MessageInstance(MSG_MDS_FRAGMENTNOTIFY) {}
-  MMDSFragmentNotify(dirfrag_t df, int b) :
- MessageInstance(MSG_MDS_FRAGMENTNOTIFY),
-    ino(df.ino), basefrag(df.frag), bits(b) { }
+  MMDSFragmentNotify() :
+    MessageInstance(MSG_MDS_FRAGMENTNOTIFY, HEAD_VERSION, COMPAT_VERSION) {}
+  MMDSFragmentNotify(dirfrag_t df, int b, uint64_t tid) :
+    MessageInstance(MSG_MDS_FRAGMENTNOTIFY, HEAD_VERSION, COMPAT_VERSION),
+    base_dirfrag(df), bits(b) {
+    set_tid(tid);
+  }
   ~MMDSFragmentNotify() override {}
 
 public:  
   const char *get_type_name() const override { return "fragment_notify"; }
   void print(ostream& o) const override {
-    o << "fragment_notify(" << ino << "." << basefrag
-      << " " << (int)bits << ")";
+    o << "fragment_notify(" << base_dirfrag << " " << (int)bits << ")";
   }
 
   void encode_payload(uint64_t features) override {
     using ceph::encode;
-    encode(ino, payload);
-    encode(basefrag, payload);
+    encode(base_dirfrag, payload);
     encode(bits, payload);
     encode(basebl, payload);
+    encode(ack_wanted, payload);
   }
   void decode_payload() override {
     auto p = payload.cbegin();
-    decode(ino, p);
-    decode(basefrag, p);
+    decode(base_dirfrag, p);
     decode(bits, p);
     decode(basebl, p);
+    if (header.version >= 2)
+      decode(ack_wanted, p);
   }
   
 };
diff --git a/src/messages/MMDSFragmentNotifyAck.h b/src/messages/MMDSFragmentNotifyAck.h
new file mode 100644 (file)
index 0000000..52bbf35
--- /dev/null
@@ -0,0 +1,60 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software 
+ * Foundation.  See file COPYING.
+ * 
+ */
+
+#ifndef CEPH_MMDSFRAGMENTNOTIFYAck_H
+#define CEPH_MMDSFRAGMENTNOTIFYAck_H
+
+#include "msg/Message.h"
+
+class MMDSFragmentNotifyAck : public MessageInstance<MMDSFragmentNotifyAck> {
+public:
+  friend factory;
+private:
+  dirfrag_t base_dirfrag;
+  int8_t bits = 0;
+
+ public:
+  dirfrag_t get_base_dirfrag() const { return base_dirfrag; }
+  int get_bits() const { return bits; }
+
+  bufferlist basebl;
+
+protected:
+  MMDSFragmentNotifyAck() : MessageInstance(MSG_MDS_FRAGMENTNOTIFYACK) {}
+  MMDSFragmentNotifyAck(dirfrag_t df, int b, uint64_t tid) :
+    MessageInstance(MSG_MDS_FRAGMENTNOTIFYACK),
+    base_dirfrag(df), bits(b) {
+    set_tid(tid);
+  }
+  ~MMDSFragmentNotifyAck() override {}
+
+public:
+  const char *get_type_name() const override { return "fragment_notify_ack"; }
+  void print(ostream& o) const override {
+    o << "fragment_notify_ack(" << base_dirfrag << " " << (int)bits << ")";
+  }
+
+  void encode_payload(uint64_t features) override {
+    using ceph::encode;
+    encode(base_dirfrag, payload);
+    encode(bits, payload);
+  }
+  void decode_payload() override {
+    auto p = payload.cbegin();
+    decode(base_dirfrag, p);
+    decode(bits, p);
+  }
+};
+
+#endif
index 693ecc94cd26105d3e9cba1e7c753d418b8554b5..caf0f031cdec917241d8732ae271d8c48b891f0e 100644 (file)
 #include "messages/MDiscoverReply.h"
 
 #include "messages/MMDSFragmentNotify.h"
+#include "messages/MMDSFragmentNotifyAck.h"
 
 #include "messages/MExportDirDiscover.h"
 #include "messages/MExportDirDiscoverAck.h"
@@ -704,6 +705,10 @@ Message *decode_message(CephContext *cct, int crcflags,
     m = MMDSFragmentNotify::create();
     break;
 
+  case MSG_MDS_FRAGMENTNOTIFYACK:
+    m = MMDSFragmentNotifyAck::create();
+    break;
+
   case MSG_MDS_EXPORTDIRDISCOVER:
     m = MExportDirDiscover::create();
     break;
index 02d1e95ed966e10356241d8df793cad0d91bad45..c54b20e3df322f6f10609dddd7031fd504739482 100644 (file)
 #define MSG_MDS_OPENINO            0x20f
 #define MSG_MDS_OPENINOREPLY       0x210
 #define MSG_MDS_SNAPUPDATE         0x211
+#define MSG_MDS_FRAGMENTNOTIFYACK  0x212
 #define MSG_MDS_LOCK               0x300
 #define MSG_MDS_INODEFILECAPS      0x301