]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mds: preserve ESlaveUpdate::OP_PREPARE logevent before doing commit 35394/head
authorsongxinying <songxinying@sensetime.com>
Sun, 12 Apr 2020 14:01:00 +0000 (22:01 +0800)
committerNathan Cutler <ncutler@suse.com>
Thu, 4 Jun 2020 14:32:01 +0000 (16:32 +0200)
Fixes: https://tracker.ceph.com/issues/45024
Signed-off-by: songxinying <songxinying@sensetime.com>
(cherry picked from commit 4940ab62e0d19ce36e53bcc67b2a2161c47f6c6d)

Conflicts:
    src/mds/MDCache.cc
- use MMDSResolve::create() in nautilus, instead of make_message<MMDSResolve>()
    src/mds/MDCache.h
    src/mds/Mutation.h
- in nautilus, these two files are structured differently from master (large
  chunks of the master code are missing in nautilus, ordering of code is
  different also)
    src/mds/Server.cc
- use nautilus equivalent instead of "make_message<MMDSSlaveRequest>"

src/mds/LogSegment.h
src/mds/MDCache.cc
src/mds/MDCache.h
src/mds/Mutation.h
src/mds/Server.cc
src/mds/journal.cc

index fa467fe6ebb2747734f27d8c24ee953255b124f5..c1c8e7ea0f5652e25a7f8aa5680be1026a93d419 100644 (file)
@@ -74,12 +74,11 @@ class LogSegment {
   elist<CInode*>  dirty_dirfrag_nest;
   elist<CInode*>  dirty_dirfrag_dirfragtree;
 
-  elist<MDSlaveUpdate*> slave_updates{0}; // passed to begin() manually
-
   set<CInode*> truncating_inodes;
 
   map<int, ceph::unordered_set<version_t> > pending_commit_tids;  // mdstable
   set<metareqid_t> uncommitted_masters;
+  set<metareqid_t> uncommitted_slaves;
   set<dirfrag_t> uncommitted_fragments;
 
   // client request ids
index 68a3299ae53a55fc1640c986ab08c9510757b748..b0872d2a986b57cd2459a14608c6ebd3eaf1dd0e 100644 (file)
@@ -2727,16 +2727,13 @@ void MDCache::send_slave_resolves()
   map<mds_rank_t, MMDSResolve::ref> resolves;
 
   if (mds->is_resolve()) {
-    for (map<mds_rank_t, map<metareqid_t, MDSlaveUpdate*> >::iterator p = uncommitted_slave_updates.begin();
-        p != uncommitted_slave_updates.end();
+    for (map<metareqid_t, uslave>::iterator p = uncommitted_slaves.begin();
+        p != uncommitted_slaves.end();
         ++p) {
-      resolves[p->first] = MMDSResolve::create();
-      for (map<metareqid_t, MDSlaveUpdate*>::iterator q = p->second.begin();
-          q != p->second.end();
-          ++q) {
-       dout(10) << " including uncommitted " << q->first << dendl;
-       resolves[p->first]->add_slave_request(q->first, false);
-      }
+      mds_rank_t master = p->second.master;
+      auto &m = resolves[master];
+      if (!m) m = MMDSResolve::create();
+      m->add_slave_request(p->first, false);
     }
   } else {
     set<mds_rank_t> resolve_set;
@@ -3401,7 +3398,7 @@ void MDCache::handle_resolve_ack(const MMDSResolveAck::const_ref &ack)
 
     if (mds->is_resolve()) {
       // replay
-      MDSlaveUpdate *su = get_uncommitted_slave_update(p.first, from);
+      MDSlaveUpdate *su = get_uncommitted_slave(p.first, from);
       ceph_assert(su);
 
       // log commit
@@ -3410,7 +3407,7 @@ void MDCache::handle_resolve_ack(const MMDSResolveAck::const_ref &ack)
                                     new C_MDC_SlaveCommit(this, from, p.first));
       mds->mdlog->flush();
 
-      finish_uncommitted_slave_update(p.first, from);
+      finish_uncommitted_slave(p.first);
     } else {
       MDRequestRef mdr = request_get(p.first);
       // information about master imported caps
@@ -3426,7 +3423,7 @@ void MDCache::handle_resolve_ack(const MMDSResolveAck::const_ref &ack)
     dout(10) << " abort on slave " << metareq << dendl;
 
     if (mds->is_resolve()) {
-      MDSlaveUpdate *su = get_uncommitted_slave_update(metareq, from);
+      MDSlaveUpdate *su = get_uncommitted_slave(metareq, from);
       ceph_assert(su);
 
       // perform rollback (and journal a rollback entry)
@@ -3463,24 +3460,45 @@ void MDCache::handle_resolve_ack(const MMDSResolveAck::const_ref &ack)
   }
 }
 
-void MDCache::add_uncommitted_slave_update(metareqid_t reqid, mds_rank_t master, MDSlaveUpdate *su)
+void MDCache::add_uncommitted_slave(metareqid_t reqid, LogSegment *ls, mds_rank_t master, MDSlaveUpdate *su)
 {
-  ceph_assert(uncommitted_slave_updates[master].count(reqid) == 0);
-  uncommitted_slave_updates[master][reqid] = su;
+  auto const &ret = uncommitted_slaves.emplace(std::piecewise_construct,
+                                               std::forward_as_tuple(reqid),
+                                               std::forward_as_tuple());
+  ceph_assert(ret.second);
+  ls->uncommitted_slaves.insert(reqid);
+  uslave &u = ret.first->second;
+  u.master = master;
+  u.ls = ls;
+  u.su = su;
+  if (su == nullptr) {
+    return;
+  }
   for(set<CInode*>::iterator p = su->olddirs.begin(); p != su->olddirs.end(); ++p)
     uncommitted_slave_rename_olddir[*p]++;
   for(set<CInode*>::iterator p = su->unlinked.begin(); p != su->unlinked.end(); ++p)
     uncommitted_slave_unlink[*p]++;
 }
 
-void MDCache::finish_uncommitted_slave_update(metareqid_t reqid, mds_rank_t master)
+void MDCache::finish_uncommitted_slave(metareqid_t reqid, bool assert_exist)
 {
-  ceph_assert(uncommitted_slave_updates[master].count(reqid));
-  MDSlaveUpdate* su = uncommitted_slave_updates[master][reqid];
+  auto it = uncommitted_slaves.find(reqid);
+  if (it == uncommitted_slaves.end()) {
+    ceph_assert(!assert_exist);
+    return;
+  }
+  uslave &u = it->second;
+  MDSlaveUpdate* su = u.su;
 
-  uncommitted_slave_updates[master].erase(reqid);
-  if (uncommitted_slave_updates[master].empty())
-    uncommitted_slave_updates.erase(master);
+  if (!u.waiters.empty()) {
+    mds->queue_waiters(u.waiters);
+  }
+  u.ls->uncommitted_slaves.erase(reqid);
+  uncommitted_slaves.erase(it);
+
+  if (su == nullptr) {
+    return;
+  }
   // discard the non-auth subtree we renamed out of
   for(set<CInode*>::iterator p = su->olddirs.begin(); p != su->olddirs.end(); ++p) {
     CInode *diri = *p;
@@ -3518,23 +3536,26 @@ void MDCache::finish_uncommitted_slave_update(metareqid_t reqid, mds_rank_t mast
   delete su;
 }
 
-MDSlaveUpdate* MDCache::get_uncommitted_slave_update(metareqid_t reqid, mds_rank_t master)
+MDSlaveUpdate* MDCache::get_uncommitted_slave(metareqid_t reqid, mds_rank_t master)
 {
 
-  MDSlaveUpdate* su = NULL;
-  if (uncommitted_slave_updates.count(master) &&
-      uncommitted_slave_updates[master].count(reqid)) {
-    su = uncommitted_slave_updates[master][reqid];
-    ceph_assert(su);
+  MDSlaveUpdate* su = nullptr;
+  auto it = uncommitted_slaves.find(reqid);
+  if (it != uncommitted_slaves.end() &&
+      it->second.master == master) {
+    su = it->second.su;
   }
   return su;
 }
 
-void MDCache::finish_rollback(metareqid_t reqid) {
-  auto p = resolve_need_rollback.find(reqid);
+void MDCache::finish_rollback(metareqid_t reqid, MDRequestRef& mdr) {
+  auto p = resolve_need_rollback.find(mdr->reqid);
   ceph_assert(p != resolve_need_rollback.end());
-  if (mds->is_resolve())
-    finish_uncommitted_slave_update(reqid, p->second);
+  if (mds->is_resolve()) {
+    finish_uncommitted_slave(reqid, false);
+  } else if (mdr) {
+    finish_uncommitted_slave(mdr->reqid, mdr->more()->slave_update_journaled);
+  }
   resolve_need_rollback.erase(p);
   maybe_finish_slave_resolve();
 }
index 5b0b361b4ffeacecbdfc4cc3640032f516e9c86f..d2781d28108aeac5f5a321d976aa7cb536ee5519 100644 (file)
@@ -478,6 +478,12 @@ public:
   void committed_master_slave(metareqid_t r, mds_rank_t from);
   void finish_committed_masters();
 
+  void add_uncommitted_slave(metareqid_t reqid, LogSegment*, mds_rank_t, MDSlaveUpdate *su=nullptr);
+  void wait_for_uncommitted_slave(metareqid_t reqid, MDSContext *c) {
+    uncommitted_slaves.at(reqid).waiters.push_back(c);
+  }
+  void finish_uncommitted_slave(metareqid_t reqid, bool assert_exist=true);
+  MDSlaveUpdate* get_uncommitted_slave(metareqid_t reqid, mds_rank_t master);
   void _logged_slave_commit(mds_rank_t from, metareqid_t reqid);
 
   // -- recovery --
@@ -496,7 +502,6 @@ protected:
   // from MMDSResolves
   map<mds_rank_t, map<dirfrag_t, vector<dirfrag_t> > > other_ambiguous_imports;  
 
-  map<mds_rank_t, map<metareqid_t, MDSlaveUpdate*> > uncommitted_slave_updates;  // slave: for replay.
   map<CInode*, int> uncommitted_slave_rename_olddir;  // slave: preserve the non-auth dir until seeing commit.
   map<CInode*, int> uncommitted_slave_unlink;  // slave: preserve the unlinked inode until seeing commit.
 
@@ -512,6 +517,15 @@ protected:
   };
   map<metareqid_t, umaster>                 uncommitted_masters;         // master: req -> slave set
 
+  struct uslave {
+    uslave() {}
+    mds_rank_t master;
+    LogSegment *ls = nullptr;
+    MDSlaveUpdate *su = nullptr;
+    MDSContext::vec waiters;
+  };
+  map<metareqid_t, uslave> uncommitted_slaves;  // slave: preserve the slave req until seeing commit.
+
   set<metareqid_t>             pending_masters;
   map<int, set<metareqid_t> >  ambiguous_slave_updates;
 
@@ -533,9 +547,6 @@ protected:
   void disambiguate_my_imports();
   void disambiguate_other_imports();
   void trim_unlinked_inodes();
-  void add_uncommitted_slave_update(metareqid_t reqid, mds_rank_t master, MDSlaveUpdate*);
-  void finish_uncommitted_slave_update(metareqid_t reqid, mds_rank_t master);
-  MDSlaveUpdate* get_uncommitted_slave_update(metareqid_t reqid, mds_rank_t master);
 
   void send_slave_resolves();
   void send_subtree_resolves();
@@ -564,7 +575,7 @@ public:
   void add_rollback(metareqid_t reqid, mds_rank_t master) {
     resolve_need_rollback[reqid] = master;
   }
-  void finish_rollback(metareqid_t reqid);
+  void finish_rollback(metareqid_t reqid, MDRequestRef& mdr);
 
   // ambiguous imports
   void add_ambiguous_import(dirfrag_t base, const vector<dirfrag_t>& bounds);
index 96bdf97f4cfdefbbc98a3665d479bf26108d807f..61a33c5ee6314942821f6532a4c04e196de34786 100644 (file)
@@ -415,19 +415,14 @@ typedef boost::intrusive_ptr<MDRequestImpl> MDRequestRef;
 struct MDSlaveUpdate {
   int origop;
   bufferlist rollback;
-  elist<MDSlaveUpdate*>::item item;
   Context *waiter;
   set<CInode*> olddirs;
   set<CInode*> unlinked;
-  MDSlaveUpdate(int oo, bufferlist &rbl, elist<MDSlaveUpdate*> &list) :
-    origop(oo),
-    item(this),
-    waiter(0) {
+  MDSlaveUpdate(int oo, bufferlist &rbl) :
+    origop(oo) {
     rollback.claim(rbl);
-    list.push_back(&item);
   }
   ~MDSlaveUpdate() {
-    item.remove_myself();
     if (waiter)
       waiter->complete(0);
   }
index 016784afc59ff107db281bb58e1001db1ee6ba6f..96f48a2eaa9bdee9f1afa91685f7b69b141851d4 100644 (file)
@@ -6385,6 +6385,7 @@ void Server::handle_slave_link_prep(MDRequestRef& mdr)
   // commit case
   mdcache->predirty_journal_parents(mdr, &le->commit, dnl->get_inode(), 0, PREDIRTY_SHALLOW|PREDIRTY_PRIMARY);
   mdcache->journal_dirty_inode(mdr.get(), &le->commit, targeti);
+  mdcache->add_uncommitted_slave(mdr->reqid, mdr->ls, mdr->slave_to_mds);
 
   // set up commit waiter
   mdr->more()->slave_commit = new C_MDS_SlaveLinkCommit(this, mdr, targeti);
@@ -6465,6 +6466,8 @@ void Server::_committed_slave(MDRequestRef& mdr)
 
   ceph_assert(g_conf()->mds_kill_link_at != 8);
 
+  bool assert_exist = mdr->more()->slave_update_journaled;
+  mdcache->finish_uncommitted_slave(mdr->reqid, assert_exist);
   auto req = MMDSSlaveRequest::create(mdr->reqid, mdr->attempt, MMDSSlaveRequest::OP_COMMITTED);
   mds->send_message_mds(req, mdr->slave_to_mds);
   mdcache->request_finish(mdr);
@@ -6579,7 +6582,7 @@ void Server::_link_rollback_finish(MutationRef& mut, MDRequestRef& mdr,
   if (mdr)
     mdcache->request_finish(mdr);
 
-  mdcache->finish_rollback(mut->reqid);
+  mdcache->finish_rollback(mut->reqid, mdr);
 
   mut->cleanup();
 }
@@ -7079,6 +7082,7 @@ void Server::handle_slave_rmdir_prep(MDRequestRef& mdr)
     return;
   }
 
+  mdr->ls = mdlog->get_current_segment();
   ESlaveUpdate *le =  new ESlaveUpdate(mdlog, "slave_rmdir", mdr->reqid, mdr->slave_to_mds,
                                       ESlaveUpdate::OP_PREPARE, ESlaveUpdate::RMDIR);
   mdlog->start_entry(le);
@@ -7092,6 +7096,7 @@ void Server::handle_slave_rmdir_prep(MDRequestRef& mdr)
   le->commit.renamed_dirino = in->ino();
 
   mdcache->project_subtree_rename(in, dn->get_dir(), straydn->get_dir());
+  mdcache->add_uncommitted_slave(mdr->reqid, mdr->ls, mdr->slave_to_mds);
 
   mdr->more()->slave_update_journaled = true;
   submit_mdlog_entry(le, new C_MDS_SlaveRmdirPrep(this, mdr, dn, straydn),
@@ -7165,7 +7170,7 @@ void Server::handle_slave_rmdir_prep_ack(MDRequestRef& mdr, const MMDSSlaveReque
 void Server::_commit_slave_rmdir(MDRequestRef& mdr, int r, CDentry *straydn)
 {
   dout(10) << "_commit_slave_rmdir " << *mdr << " r=" << r << dendl;
-  
+
   if (r == 0) {
     if (mdr->more()->slave_update_journaled) {
       CInode *strayin = straydn->get_projected_linkage()->get_inode();
@@ -7294,7 +7299,7 @@ void Server::_rmdir_rollback_finish(MDRequestRef& mdr, metareqid_t reqid, CDentr
   if (mdr)
     mdcache->request_finish(mdr);
 
-  mdcache->finish_rollback(reqid);
+  mdcache->finish_rollback(reqid, mdr);
 }
 
 
@@ -8800,6 +8805,7 @@ void Server::handle_slave_rename_prep(MDRequestRef& mdr)
     mdr->ls = NULL;
     _logged_slave_rename(mdr, srcdn, destdn, straydn);
   } else {
+    mdcache->add_uncommitted_slave(mdr->reqid, mdr->ls, mdr->slave_to_mds);
     mdr->more()->slave_update_journaled = true;
     submit_mdlog_entry(le, new C_MDS_SlaveRenamePrep(this, mdr, srcdn, destdn, straydn),
                       mdr, __func__);
@@ -9422,7 +9428,7 @@ void Server::_rename_rollback_finish(MutationRef& mut, MDRequestRef& mdr, CDentr
       mdr->more()->slave_rolling_back = false;
   }
 
-  mdcache->finish_rollback(mut->reqid);
+  mdcache->finish_rollback(mut->reqid, mdr);
 
   mut->cleanup();
 }
index 991b91cc13f94fc7d12949984664792cd42d45b1..b9ef6acab8990b4a831b3e4d55f99ee30f701e73 100644 (file)
@@ -118,6 +118,14 @@ void LogSegment::try_to_expire(MDSRank *mds, MDSGatherBuilder &gather_bld, int o
     mds->mdcache->wait_for_uncommitted_master(*p, gather_bld.new_sub());
   }
 
+  // slave ops that haven't been committed
+  for (set<metareqid_t>::iterator p = uncommitted_slaves.begin();
+       p != uncommitted_slaves.end();
+       ++p) {
+    dout(10) << "try_to_expire waiting for master to ack OP_FINISH on " << *p << dendl;
+    mds->mdcache->wait_for_uncommitted_slave(*p, gather_bld.new_sub());
+  }
+
   // uncommitted fragments
   for (set<dirfrag_t>::iterator p = uncommitted_fragments.begin();
        p != uncommitted_fragments.end();
@@ -193,16 +201,6 @@ void LogSegment::try_to_expire(MDSRank *mds, MDSGatherBuilder &gather_bld, int o
 
   ceph_assert(g_conf()->mds_kill_journal_expire_at != 4);
 
-  // slave updates
-  for (elist<MDSlaveUpdate*>::iterator p = slave_updates.begin(member_offset(MDSlaveUpdate,
-                                                                            item));
-       !p.end(); ++p) {
-    MDSlaveUpdate *su = *p;
-    dout(10) << "try_to_expire waiting on slave update " << su << dendl;
-    ceph_assert(su->waiter == 0);
-    su->waiter = gather_bld.new_sub();
-  }
-
   // idalloc
   if (inotablev > mds->inotable->get_committed_version()) {
     dout(10) << "try_to_expire saving inotable table, need " << inotablev
@@ -2436,7 +2434,6 @@ void ESlaveUpdate::generate_test_instances(list<ESlaveUpdate*>& ls)
   ls.push_back(new ESlaveUpdate());
 }
 
-
 void ESlaveUpdate::replay(MDSRank *mds)
 {
   MDSlaveUpdate *su;
@@ -2445,29 +2442,21 @@ void ESlaveUpdate::replay(MDSRank *mds)
   case ESlaveUpdate::OP_PREPARE:
     dout(10) << "ESlaveUpdate.replay prepare " << reqid << " for mds." << master 
             << ": applying commit, saving rollback info" << dendl;
-    su = new MDSlaveUpdate(origop, rollback, segment->slave_updates);
+    su = new MDSlaveUpdate(origop, rollback);
     commit.replay(mds, segment, su);
-    mds->mdcache->add_uncommitted_slave_update(reqid, master, su);
+    mds->mdcache->add_uncommitted_slave(reqid, segment, master, su);
     break;
 
   case ESlaveUpdate::OP_COMMIT:
-    su = mds->mdcache->get_uncommitted_slave_update(reqid, master);
-    if (su) {
-      dout(10) << "ESlaveUpdate.replay commit " << reqid << " for mds." << master << dendl;
-      mds->mdcache->finish_uncommitted_slave_update(reqid, master);
-    } else {
-      dout(10) << "ESlaveUpdate.replay commit " << reqid << " for mds." << master 
-              << ": ignoring, no previously saved prepare" << dendl;
-    }
+    dout(10) << "ESlaveUpdate.replay commit " << reqid << " for mds." << master << dendl;
+    mds->mdcache->finish_uncommitted_slave(reqid, false);
     break;
 
   case ESlaveUpdate::OP_ROLLBACK:
     dout(10) << "ESlaveUpdate.replay abort " << reqid << " for mds." << master
             << ": applying rollback commit blob" << dendl;
     commit.replay(mds, segment);
-    su = mds->mdcache->get_uncommitted_slave_update(reqid, master);
-    if (su)
-      mds->mdcache->finish_uncommitted_slave_update(reqid, master);
+    mds->mdcache->finish_uncommitted_slave(reqid, false);
     break;
 
   default: