]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mds: preserve ESlaveUpdate::OP_PREPARE logevent before doing commit 34507/head
authorsongxinying <songxinying@sensetime.com>
Sun, 12 Apr 2020 14:01:00 +0000 (22:01 +0800)
committersongxinying <songxinying@sensetime.com>
Thu, 23 Apr 2020 01:32:38 +0000 (09:32 +0800)
Fixes: https://tracker.ceph.com/issues/45024
Signed-off-by: songxinying <songxinying@sensetime.com>
src/mds/LogSegment.h
src/mds/MDCache.cc
src/mds/MDCache.h
src/mds/Mutation.h
src/mds/Server.cc
src/mds/journal.cc

index 095d65d9583ed13ae1f781a36edaff3c32b3f4a7..3aad27bf29ee718cadae0cc4f49bab3892b884dd 100644 (file)
@@ -83,14 +83,13 @@ class LogSegment {
   elist<CInode*>  dirty_dirfrag_nest;
   elist<CInode*>  dirty_dirfrag_dirfragtree;
 
-  elist<MDSlaveUpdate*> slave_updates{0}; // passed to begin() manually
-
   set<CInode*> truncating_inodes;
   interval_set<inodeno_t> purge_inodes;
   MDSContext* purged_cb = nullptr;
 
   map<int, ceph::unordered_set<version_t> > pending_commit_tids;  // mdstable
   set<metareqid_t> uncommitted_masters;
+  set<metareqid_t> uncommitted_slaves;
   set<dirfrag_t> uncommitted_fragments;
 
   // client request ids
index 4dd07f3b7ff8bcd51c3d175aa1429a138baffc01..df873266d42b28f8a324320b814acfc9520168f0 100644 (file)
@@ -2705,16 +2705,13 @@ void MDCache::send_slave_resolves()
   map<mds_rank_t, ref_t<MMDSResolve>> resolves;
 
   if (mds->is_resolve()) {
-    for (map<mds_rank_t, map<metareqid_t, MDSlaveUpdate*> >::iterator p = uncommitted_slave_updates.begin();
-        p != uncommitted_slave_updates.end();
+    for (map<metareqid_t, uslave>::iterator p = uncommitted_slaves.begin();
+        p != uncommitted_slaves.end();
         ++p) {
-      resolves[p->first] = make_message<MMDSResolve>();
-      for (map<metareqid_t, MDSlaveUpdate*>::iterator q = p->second.begin();
-          q != p->second.end();
-          ++q) {
-       dout(10) << " including uncommitted " << q->first << dendl;
-       resolves[p->first]->add_slave_request(q->first, false);
-      }
+      mds_rank_t master = p->second.master;
+      auto &m = resolves[master];
+      if (!m) m = make_message<MMDSResolve>();
+      m->add_slave_request(p->first, false);
     }
   } else {
     set<mds_rank_t> resolve_set;
@@ -3373,7 +3370,7 @@ void MDCache::handle_resolve_ack(const cref_t<MMDSResolveAck> &ack)
 
     if (mds->is_resolve()) {
       // replay
-      MDSlaveUpdate *su = get_uncommitted_slave_update(p.first, from);
+      MDSlaveUpdate *su = get_uncommitted_slave(p.first, from);
       ceph_assert(su);
 
       // log commit
@@ -3382,7 +3379,7 @@ void MDCache::handle_resolve_ack(const cref_t<MMDSResolveAck> &ack)
                                     new C_MDC_SlaveCommit(this, from, p.first));
       mds->mdlog->flush();
 
-      finish_uncommitted_slave_update(p.first, from);
+      finish_uncommitted_slave(p.first);
     } else {
       MDRequestRef mdr = request_get(p.first);
       // information about master imported caps
@@ -3398,7 +3395,7 @@ void MDCache::handle_resolve_ack(const cref_t<MMDSResolveAck> &ack)
     dout(10) << " abort on slave " << metareq << dendl;
 
     if (mds->is_resolve()) {
-      MDSlaveUpdate *su = get_uncommitted_slave_update(metareq, from);
+      MDSlaveUpdate *su = get_uncommitted_slave(metareq, from);
       ceph_assert(su);
 
       // perform rollback (and journal a rollback entry)
@@ -3435,24 +3432,45 @@ void MDCache::handle_resolve_ack(const cref_t<MMDSResolveAck> &ack)
   }
 }
 
-void MDCache::add_uncommitted_slave_update(metareqid_t reqid, mds_rank_t master, MDSlaveUpdate *su)
+void MDCache::add_uncommitted_slave(metareqid_t reqid, LogSegment *ls, mds_rank_t master, MDSlaveUpdate *su)
 {
-  ceph_assert(uncommitted_slave_updates[master].count(reqid) == 0);
-  uncommitted_slave_updates[master][reqid] = su;
+  auto const &ret = uncommitted_slaves.emplace(std::piecewise_construct,
+                                               std::forward_as_tuple(reqid),
+                                               std::forward_as_tuple());
+  ceph_assert(ret.second);
+  ls->uncommitted_slaves.insert(reqid);
+  uslave &u = ret.first->second;
+  u.master = master;
+  u.ls = ls;
+  u.su = su;
+  if (su == nullptr) {
+    return;
+  }
   for(set<CInode*>::iterator p = su->olddirs.begin(); p != su->olddirs.end(); ++p)
     uncommitted_slave_rename_olddir[*p]++;
   for(set<CInode*>::iterator p = su->unlinked.begin(); p != su->unlinked.end(); ++p)
     uncommitted_slave_unlink[*p]++;
 }
 
-void MDCache::finish_uncommitted_slave_update(metareqid_t reqid, mds_rank_t master)
+void MDCache::finish_uncommitted_slave(metareqid_t reqid, bool assert_exist)
 {
-  ceph_assert(uncommitted_slave_updates[master].count(reqid));
-  MDSlaveUpdate* su = uncommitted_slave_updates[master][reqid];
+  auto it = uncommitted_slaves.find(reqid);
+  if (it == uncommitted_slaves.end()) {
+    ceph_assert(!assert_exist);
+    return;
+  }
+  uslave &u = it->second;
+  MDSlaveUpdate* su = u.su;
 
-  uncommitted_slave_updates[master].erase(reqid);
-  if (uncommitted_slave_updates[master].empty())
-    uncommitted_slave_updates.erase(master);
+  if (!u.waiters.empty()) {
+    mds->queue_waiters(u.waiters);
+  }
+  u.ls->uncommitted_slaves.erase(reqid);
+  uncommitted_slaves.erase(it);
+
+  if (su == nullptr) {
+    return;
+  }
   // discard the non-auth subtree we renamed out of
   for(set<CInode*>::iterator p = su->olddirs.begin(); p != su->olddirs.end(); ++p) {
     CInode *diri = *p;
@@ -3489,23 +3507,26 @@ void MDCache::finish_uncommitted_slave_update(metareqid_t reqid, mds_rank_t mast
   delete su;
 }
 
-MDSlaveUpdate* MDCache::get_uncommitted_slave_update(metareqid_t reqid, mds_rank_t master)
+MDSlaveUpdate* MDCache::get_uncommitted_slave(metareqid_t reqid, mds_rank_t master)
 {
 
-  MDSlaveUpdate* su = NULL;
-  if (uncommitted_slave_updates.count(master) &&
-      uncommitted_slave_updates[master].count(reqid)) {
-    su = uncommitted_slave_updates[master][reqid];
-    ceph_assert(su);
+  MDSlaveUpdate* su = nullptr;
+  auto it = uncommitted_slaves.find(reqid);
+  if (it != uncommitted_slaves.end() &&
+      it->second.master == master) {
+    su = it->second.su;
   }
   return su;
 }
 
-void MDCache::finish_rollback(metareqid_t reqid) {
-  auto p = resolve_need_rollback.find(reqid);
+void MDCache::finish_rollback(metareqid_t reqid, MDRequestRef& mdr) {
+  auto p = resolve_need_rollback.find(mdr->reqid);
   ceph_assert(p != resolve_need_rollback.end());
-  if (mds->is_resolve())
-    finish_uncommitted_slave_update(reqid, p->second);
+  if (mds->is_resolve()) {
+    finish_uncommitted_slave(reqid, false);
+  } else if (mdr) {
+    finish_uncommitted_slave(mdr->reqid, mdr->more()->slave_update_journaled);
+  }
   resolve_need_rollback.erase(p);
   maybe_finish_slave_resolve();
 }
index 2c841481e32b0abde4ca337ed53778ed16995490..d6d4d11c9e923cba315839ff60e0f09d840c1659 100644 (file)
@@ -435,6 +435,12 @@ class MDCache {
   void committed_master_slave(metareqid_t r, mds_rank_t from);
   void finish_committed_masters();
 
+  void add_uncommitted_slave(metareqid_t reqid, LogSegment*, mds_rank_t, MDSlaveUpdate *su=nullptr);
+  void wait_for_uncommitted_slave(metareqid_t reqid, MDSContext *c) {
+    uncommitted_slaves.at(reqid).waiters.push_back(c);
+  }
+  void finish_uncommitted_slave(metareqid_t reqid, bool assert_exist=true);
+  MDSlaveUpdate* get_uncommitted_slave(metareqid_t reqid, mds_rank_t master);
   void _logged_slave_commit(mds_rank_t from, metareqid_t reqid);
 
   void set_recovery_set(set<mds_rank_t>& s);
@@ -463,7 +469,7 @@ class MDCache {
   void add_rollback(metareqid_t reqid, mds_rank_t master) {
     resolve_need_rollback[reqid] = master;
   }
-  void finish_rollback(metareqid_t reqid);
+  void finish_rollback(metareqid_t reqid, MDRequestRef& mdr);
 
   // ambiguous imports
   void add_ambiguous_import(dirfrag_t base, const vector<dirfrag_t>& bounds);
@@ -994,6 +1000,14 @@ class MDCache {
     bool recovering = false;
   };
 
+  struct uslave {
+    uslave() {}
+    mds_rank_t master;
+    LogSegment *ls = nullptr;
+    MDSlaveUpdate *su = nullptr;
+    MDSContext::vec waiters;
+  };
+
   struct open_ino_info_t {
     open_ino_info_t() {}
     vector<inode_backpointer_t> ancestors;
@@ -1032,9 +1046,6 @@ class MDCache {
   void disambiguate_my_imports();
   void disambiguate_other_imports();
   void trim_unlinked_inodes();
-  void add_uncommitted_slave_update(metareqid_t reqid, mds_rank_t master, MDSlaveUpdate*);
-  void finish_uncommitted_slave_update(metareqid_t reqid, mds_rank_t master);
-  MDSlaveUpdate* get_uncommitted_slave_update(metareqid_t reqid, mds_rank_t master);
 
   void send_slave_resolves();
   void send_subtree_resolves();
@@ -1138,11 +1149,11 @@ class MDCache {
   // from MMDSResolves
   map<mds_rank_t, map<dirfrag_t, vector<dirfrag_t> > > other_ambiguous_imports;
 
-  map<mds_rank_t, map<metareqid_t, MDSlaveUpdate*> > uncommitted_slave_updates;  // slave: for replay.
   map<CInode*, int> uncommitted_slave_rename_olddir;  // slave: preserve the non-auth dir until seeing commit.
   map<CInode*, int> uncommitted_slave_unlink;  // slave: preserve the unlinked inode until seeing commit.
 
   map<metareqid_t, umaster> uncommitted_masters;         // master: req -> slave set
+  map<metareqid_t, uslave> uncommitted_slaves;  // slave: preserve the slave req until seeing commit.
 
   set<metareqid_t> pending_masters;
   map<int, set<metareqid_t> > ambiguous_slave_updates;
index 9f350a9d4eb658af793239256ca10aebed680866..5ab68fd62b219478869e9f3496c8b693e9c4a265 100644 (file)
@@ -449,20 +449,16 @@ private:
 };
 
 struct MDSlaveUpdate {
-  MDSlaveUpdate(int oo, ceph::buffer::list &rbl, elist<MDSlaveUpdate*> &list) :
-    origop(oo),
-    item(this) {
+  MDSlaveUpdate(int oo, ceph::buffer::list &rbl) :
+    origop(oo) {
     rollback.claim(rbl);
-    list.push_back(&item);
   }
   ~MDSlaveUpdate() {
-    item.remove_myself();
     if (waiter)
       waiter->complete(0);
   }
   int origop;
   ceph::buffer::list rollback;
-  elist<MDSlaveUpdate*>::item item;
   Context *waiter = nullptr;
   std::set<CInode*> olddirs;
   std::set<CInode*> unlinked;
index ae5996e086f35a20c96095e6d322684fcbcd9252..5a89c8bc4074443b011f204f79374fcc03748542 100644 (file)
@@ -6638,6 +6638,7 @@ void Server::handle_slave_link_prep(MDRequestRef& mdr)
   // commit case
   mdcache->predirty_journal_parents(mdr, &le->commit, dnl->get_inode(), 0, PREDIRTY_SHALLOW|PREDIRTY_PRIMARY);
   mdcache->journal_dirty_inode(mdr.get(), &le->commit, targeti);
+  mdcache->add_uncommitted_slave(mdr->reqid, mdr->ls, mdr->slave_to_mds);
 
   // set up commit waiter
   mdr->more()->slave_commit = new C_MDS_SlaveLinkCommit(this, mdr, targeti);
@@ -6718,6 +6719,8 @@ void Server::_committed_slave(MDRequestRef& mdr)
 
   ceph_assert(g_conf()->mds_kill_link_at != 8);
 
+  bool assert_exist = mdr->more()->slave_update_journaled;
+  mdcache->finish_uncommitted_slave(mdr->reqid, assert_exist);
   auto req = make_message<MMDSSlaveRequest>(mdr->reqid, mdr->attempt, MMDSSlaveRequest::OP_COMMITTED);
   mds->send_message_mds(req, mdr->slave_to_mds);
   mdcache->request_finish(mdr);
@@ -6832,7 +6835,7 @@ void Server::_link_rollback_finish(MutationRef& mut, MDRequestRef& mdr,
   if (mdr)
     mdcache->request_finish(mdr);
 
-  mdcache->finish_rollback(mut->reqid);
+  mdcache->finish_rollback(mut->reqid, mdr);
 
   mut->cleanup();
 }
@@ -7303,6 +7306,7 @@ void Server::handle_slave_rmdir_prep(MDRequestRef& mdr)
     return;
   }
 
+  mdr->ls = mdlog->get_current_segment();
   ESlaveUpdate *le =  new ESlaveUpdate(mdlog, "slave_rmdir", mdr->reqid, mdr->slave_to_mds,
                                       ESlaveUpdate::OP_PREPARE, ESlaveUpdate::RMDIR);
   mdlog->start_entry(le);
@@ -7316,6 +7320,7 @@ void Server::handle_slave_rmdir_prep(MDRequestRef& mdr)
   le->commit.renamed_dirino = in->ino();
 
   mdcache->project_subtree_rename(in, dn->get_dir(), straydn->get_dir());
+  mdcache->add_uncommitted_slave(mdr->reqid, mdr->ls, mdr->slave_to_mds);
 
   mdr->more()->slave_update_journaled = true;
   submit_mdlog_entry(le, new C_MDS_SlaveRmdirPrep(this, mdr, dn, straydn),
@@ -7389,7 +7394,7 @@ void Server::handle_slave_rmdir_prep_ack(MDRequestRef& mdr, const cref_t<MMDSSla
 void Server::_commit_slave_rmdir(MDRequestRef& mdr, int r, CDentry *straydn)
 {
   dout(10) << "_commit_slave_rmdir " << *mdr << " r=" << r << dendl;
-  
+
   if (r == 0) {
     if (mdr->more()->slave_update_journaled) {
       CInode *strayin = straydn->get_projected_linkage()->get_inode();
@@ -7518,7 +7523,7 @@ void Server::_rmdir_rollback_finish(MDRequestRef& mdr, metareqid_t reqid, CDentr
   if (mdr)
     mdcache->request_finish(mdr);
 
-  mdcache->finish_rollback(reqid);
+  mdcache->finish_rollback(reqid, mdr);
 }
 
 
@@ -8983,6 +8988,7 @@ void Server::handle_slave_rename_prep(MDRequestRef& mdr)
     mdr->ls = NULL;
     _logged_slave_rename(mdr, srcdn, destdn, straydn);
   } else {
+    mdcache->add_uncommitted_slave(mdr->reqid, mdr->ls, mdr->slave_to_mds);
     mdr->more()->slave_update_journaled = true;
     submit_mdlog_entry(le, new C_MDS_SlaveRenamePrep(this, mdr, srcdn, destdn, straydn),
                       mdr, __func__);
@@ -9605,7 +9611,7 @@ void Server::_rename_rollback_finish(MutationRef& mut, MDRequestRef& mdr, CDentr
       mdr->more()->slave_rolling_back = false;
   }
 
-  mdcache->finish_rollback(mut->reqid);
+  mdcache->finish_rollback(mut->reqid, mdr);
 
   mut->cleanup();
 }
index 14966f14ce153847b78a94da0e2216ea2b18e37b..e7deb600cfb82d4b0d96df7bf968c868852be9d7 100644 (file)
@@ -119,6 +119,14 @@ void LogSegment::try_to_expire(MDSRank *mds, MDSGatherBuilder &gather_bld, int o
     mds->mdcache->wait_for_uncommitted_master(*p, gather_bld.new_sub());
   }
 
+  // slave ops that haven't been committed
+  for (set<metareqid_t>::iterator p = uncommitted_slaves.begin();
+       p != uncommitted_slaves.end();
+       ++p) {
+    dout(10) << "try_to_expire waiting for master to ack OP_FINISH on " << *p << dendl;
+    mds->mdcache->wait_for_uncommitted_slave(*p, gather_bld.new_sub());
+  }
+
   // uncommitted fragments
   for (set<dirfrag_t>::iterator p = uncommitted_fragments.begin();
        p != uncommitted_fragments.end();
@@ -194,16 +202,6 @@ void LogSegment::try_to_expire(MDSRank *mds, MDSGatherBuilder &gather_bld, int o
 
   ceph_assert(g_conf()->mds_kill_journal_expire_at != 4);
 
-  // slave updates
-  for (elist<MDSlaveUpdate*>::iterator p = slave_updates.begin(member_offset(MDSlaveUpdate,
-                                                                            item));
-       !p.end(); ++p) {
-    MDSlaveUpdate *su = *p;
-    dout(10) << "try_to_expire waiting on slave update " << su << dendl;
-    ceph_assert(su->waiter == 0);
-    su->waiter = gather_bld.new_sub();
-  }
-
   // idalloc
   if (inotablev > mds->inotable->get_committed_version()) {
     dout(10) << "try_to_expire saving inotable table, need " << inotablev
@@ -2496,7 +2494,6 @@ void ESlaveUpdate::generate_test_instances(std::list<ESlaveUpdate*>& ls)
   ls.push_back(new ESlaveUpdate());
 }
 
-
 void ESlaveUpdate::replay(MDSRank *mds)
 {
   MDSlaveUpdate *su;
@@ -2505,29 +2502,21 @@ void ESlaveUpdate::replay(MDSRank *mds)
   case ESlaveUpdate::OP_PREPARE:
     dout(10) << "ESlaveUpdate.replay prepare " << reqid << " for mds." << master 
             << ": applying commit, saving rollback info" << dendl;
-    su = new MDSlaveUpdate(origop, rollback, segment->slave_updates);
+    su = new MDSlaveUpdate(origop, rollback);
     commit.replay(mds, segment, su);
-    mds->mdcache->add_uncommitted_slave_update(reqid, master, su);
+    mds->mdcache->add_uncommitted_slave(reqid, segment, master, su);
     break;
 
   case ESlaveUpdate::OP_COMMIT:
-    su = mds->mdcache->get_uncommitted_slave_update(reqid, master);
-    if (su) {
-      dout(10) << "ESlaveUpdate.replay commit " << reqid << " for mds." << master << dendl;
-      mds->mdcache->finish_uncommitted_slave_update(reqid, master);
-    } else {
-      dout(10) << "ESlaveUpdate.replay commit " << reqid << " for mds." << master 
-              << ": ignoring, no previously saved prepare" << dendl;
-    }
+    dout(10) << "ESlaveUpdate.replay commit " << reqid << " for mds." << master << dendl;
+    mds->mdcache->finish_uncommitted_slave(reqid, false);
     break;
 
   case ESlaveUpdate::OP_ROLLBACK:
     dout(10) << "ESlaveUpdate.replay abort " << reqid << " for mds." << master
             << ": applying rollback commit blob" << dendl;
     commit.replay(mds, segment);
-    su = mds->mdcache->get_uncommitted_slave_update(reqid, master);
-    if (su)
-      mds->mdcache->finish_uncommitted_slave_update(reqid, master);
+    mds->mdcache->finish_uncommitted_slave(reqid, false);
     break;
 
   default: