elist<CInode*>  dirty_dirfrag_nest;
   elist<CInode*>  dirty_dirfrag_dirfragtree;
 
-  elist<MDSlaveUpdate*> slave_updates{0}; // passed to begin() manually
-
   set<CInode*> truncating_inodes;
 
   map<int, ceph::unordered_set<version_t> > pending_commit_tids;  // mdstable
   set<metareqid_t> uncommitted_masters;
+  set<metareqid_t> uncommitted_slaves;
   set<dirfrag_t> uncommitted_fragments;
 
   // client request ids
 
   map<mds_rank_t, MMDSResolve::ref> resolves;
 
   if (mds->is_resolve()) {
-    for (map<mds_rank_t, map<metareqid_t, MDSlaveUpdate*> >::iterator p = uncommitted_slave_updates.begin();
-        p != uncommitted_slave_updates.end();
+    for (map<metareqid_t, uslave>::iterator p = uncommitted_slaves.begin();
+        p != uncommitted_slaves.end();
         ++p) {
-      resolves[p->first] = MMDSResolve::create();
-      for (map<metareqid_t, MDSlaveUpdate*>::iterator q = p->second.begin();
-          q != p->second.end();
-          ++q) {
-       dout(10) << " including uncommitted " << q->first << dendl;
-       resolves[p->first]->add_slave_request(q->first, false);
-      }
+      mds_rank_t master = p->second.master;
+      auto &m = resolves[master];
+      if (!m) m = MMDSResolve::create();
+      m->add_slave_request(p->first, false);
     }
   } else {
     set<mds_rank_t> resolve_set;
 
     if (mds->is_resolve()) {
       // replay
-      MDSlaveUpdate *su = get_uncommitted_slave_update(p.first, from);
+      MDSlaveUpdate *su = get_uncommitted_slave(p.first, from);
       ceph_assert(su);
 
       // log commit
                                     new C_MDC_SlaveCommit(this, from, p.first));
       mds->mdlog->flush();
 
-      finish_uncommitted_slave_update(p.first, from);
+      finish_uncommitted_slave(p.first);
     } else {
       MDRequestRef mdr = request_get(p.first);
       // information about master imported caps
     dout(10) << " abort on slave " << metareq << dendl;
 
     if (mds->is_resolve()) {
-      MDSlaveUpdate *su = get_uncommitted_slave_update(metareq, from);
+      MDSlaveUpdate *su = get_uncommitted_slave(metareq, from);
       ceph_assert(su);
 
       // perform rollback (and journal a rollback entry)
   }
 }
 
-void MDCache::add_uncommitted_slave_update(metareqid_t reqid, mds_rank_t master, MDSlaveUpdate *su)
+void MDCache::add_uncommitted_slave(metareqid_t reqid, LogSegment *ls, mds_rank_t master, MDSlaveUpdate *su)
 {
-  ceph_assert(uncommitted_slave_updates[master].count(reqid) == 0);
-  uncommitted_slave_updates[master][reqid] = su;
+  auto const &ret = uncommitted_slaves.emplace(std::piecewise_construct,
+                                               std::forward_as_tuple(reqid),
+                                               std::forward_as_tuple());
+  ceph_assert(ret.second);
+  ls->uncommitted_slaves.insert(reqid);
+  uslave &u = ret.first->second;
+  u.master = master;
+  u.ls = ls;
+  u.su = su;
+  if (su == nullptr) {
+    return;
+  }
   for(set<CInode*>::iterator p = su->olddirs.begin(); p != su->olddirs.end(); ++p)
     uncommitted_slave_rename_olddir[*p]++;
   for(set<CInode*>::iterator p = su->unlinked.begin(); p != su->unlinked.end(); ++p)
     uncommitted_slave_unlink[*p]++;
 }
 
-void MDCache::finish_uncommitted_slave_update(metareqid_t reqid, mds_rank_t master)
+void MDCache::finish_uncommitted_slave(metareqid_t reqid, bool assert_exist)
 {
-  ceph_assert(uncommitted_slave_updates[master].count(reqid));
-  MDSlaveUpdate* su = uncommitted_slave_updates[master][reqid];
+  auto it = uncommitted_slaves.find(reqid);
+  if (it == uncommitted_slaves.end()) {
+    ceph_assert(!assert_exist);
+    return;
+  }
+  uslave &u = it->second;
+  MDSlaveUpdate* su = u.su;
 
-  uncommitted_slave_updates[master].erase(reqid);
-  if (uncommitted_slave_updates[master].empty())
-    uncommitted_slave_updates.erase(master);
+  if (!u.waiters.empty()) {
+    mds->queue_waiters(u.waiters);
+  }
+  u.ls->uncommitted_slaves.erase(reqid);
+  uncommitted_slaves.erase(it);
+
+  if (su == nullptr) {
+    return;
+  }
   // discard the non-auth subtree we renamed out of
   for(set<CInode*>::iterator p = su->olddirs.begin(); p != su->olddirs.end(); ++p) {
     CInode *diri = *p;
   delete su;
 }
 
-MDSlaveUpdate* MDCache::get_uncommitted_slave_update(metareqid_t reqid, mds_rank_t master)
+MDSlaveUpdate* MDCache::get_uncommitted_slave(metareqid_t reqid, mds_rank_t master)
 {
 
-  MDSlaveUpdate* su = NULL;
-  if (uncommitted_slave_updates.count(master) &&
-      uncommitted_slave_updates[master].count(reqid)) {
-    su = uncommitted_slave_updates[master][reqid];
-    ceph_assert(su);
+  MDSlaveUpdate* su = nullptr;
+  auto it = uncommitted_slaves.find(reqid);
+  if (it != uncommitted_slaves.end() &&
+      it->second.master == master) {
+    su = it->second.su;
   }
   return su;
 }
 
-void MDCache::finish_rollback(metareqid_t reqid) {
-  auto p = resolve_need_rollback.find(reqid);
+void MDCache::finish_rollback(metareqid_t reqid, MDRequestRef& mdr) {
+  auto p = resolve_need_rollback.find(mdr->reqid);
   ceph_assert(p != resolve_need_rollback.end());
-  if (mds->is_resolve())
-    finish_uncommitted_slave_update(reqid, p->second);
+  if (mds->is_resolve()) {
+    finish_uncommitted_slave(reqid, false);
+  } else if (mdr) {
+    finish_uncommitted_slave(mdr->reqid, mdr->more()->slave_update_journaled);
+  }
   resolve_need_rollback.erase(p);
   maybe_finish_slave_resolve();
 }
 
   void committed_master_slave(metareqid_t r, mds_rank_t from);
   void finish_committed_masters();
 
+  void add_uncommitted_slave(metareqid_t reqid, LogSegment*, mds_rank_t, MDSlaveUpdate *su=nullptr);
+  void wait_for_uncommitted_slave(metareqid_t reqid, MDSContext *c) {
+    uncommitted_slaves.at(reqid).waiters.push_back(c);
+  }
+  void finish_uncommitted_slave(metareqid_t reqid, bool assert_exist=true);
+  MDSlaveUpdate* get_uncommitted_slave(metareqid_t reqid, mds_rank_t master);
   void _logged_slave_commit(mds_rank_t from, metareqid_t reqid);
 
   // -- recovery --
   // from MMDSResolves
   map<mds_rank_t, map<dirfrag_t, vector<dirfrag_t> > > other_ambiguous_imports;  
 
-  map<mds_rank_t, map<metareqid_t, MDSlaveUpdate*> > uncommitted_slave_updates;  // slave: for replay.
   map<CInode*, int> uncommitted_slave_rename_olddir;  // slave: preserve the non-auth dir until seeing commit.
   map<CInode*, int> uncommitted_slave_unlink;  // slave: preserve the unlinked inode until seeing commit.
 
   };
   map<metareqid_t, umaster>                 uncommitted_masters;         // master: req -> slave set
 
+  struct uslave {
+    uslave() {}
+    mds_rank_t master;
+    LogSegment *ls = nullptr;
+    MDSlaveUpdate *su = nullptr;
+    MDSContext::vec waiters;
+  };
+  map<metareqid_t, uslave> uncommitted_slaves;  // slave: preserve the slave req until seeing commit.
+
   set<metareqid_t>             pending_masters;
   map<int, set<metareqid_t> >  ambiguous_slave_updates;
 
   void disambiguate_my_imports();
   void disambiguate_other_imports();
   void trim_unlinked_inodes();
-  void add_uncommitted_slave_update(metareqid_t reqid, mds_rank_t master, MDSlaveUpdate*);
-  void finish_uncommitted_slave_update(metareqid_t reqid, mds_rank_t master);
-  MDSlaveUpdate* get_uncommitted_slave_update(metareqid_t reqid, mds_rank_t master);
 
   void send_slave_resolves();
   void send_subtree_resolves();
   void add_rollback(metareqid_t reqid, mds_rank_t master) {
     resolve_need_rollback[reqid] = master;
   }
-  void finish_rollback(metareqid_t reqid);
+  void finish_rollback(metareqid_t reqid, MDRequestRef& mdr);
 
   // ambiguous imports
   void add_ambiguous_import(dirfrag_t base, const vector<dirfrag_t>& bounds);
 
 struct MDSlaveUpdate {
   int origop;
   bufferlist rollback;
-  elist<MDSlaveUpdate*>::item item;
   Context *waiter;
   set<CInode*> olddirs;
   set<CInode*> unlinked;
-  MDSlaveUpdate(int oo, bufferlist &rbl, elist<MDSlaveUpdate*> &list) :
-    origop(oo),
-    item(this),
-    waiter(0) {
+  MDSlaveUpdate(int oo, bufferlist &rbl) :
+    origop(oo) {
     rollback.claim(rbl);
-    list.push_back(&item);
   }
   ~MDSlaveUpdate() {
-    item.remove_myself();
     if (waiter)
       waiter->complete(0);
   }
 
   // commit case
   mdcache->predirty_journal_parents(mdr, &le->commit, dnl->get_inode(), 0, PREDIRTY_SHALLOW|PREDIRTY_PRIMARY);
   mdcache->journal_dirty_inode(mdr.get(), &le->commit, targeti);
+  mdcache->add_uncommitted_slave(mdr->reqid, mdr->ls, mdr->slave_to_mds);
 
   // set up commit waiter
   mdr->more()->slave_commit = new C_MDS_SlaveLinkCommit(this, mdr, targeti);
 
   ceph_assert(g_conf()->mds_kill_link_at != 8);
 
+  bool assert_exist = mdr->more()->slave_update_journaled;
+  mdcache->finish_uncommitted_slave(mdr->reqid, assert_exist);
   auto req = MMDSSlaveRequest::create(mdr->reqid, mdr->attempt, MMDSSlaveRequest::OP_COMMITTED);
   mds->send_message_mds(req, mdr->slave_to_mds);
   mdcache->request_finish(mdr);
   if (mdr)
     mdcache->request_finish(mdr);
 
-  mdcache->finish_rollback(mut->reqid);
+  mdcache->finish_rollback(mut->reqid, mdr);
 
   mut->cleanup();
 }
     return;
   }
 
+  mdr->ls = mdlog->get_current_segment();
   ESlaveUpdate *le =  new ESlaveUpdate(mdlog, "slave_rmdir", mdr->reqid, mdr->slave_to_mds,
                                       ESlaveUpdate::OP_PREPARE, ESlaveUpdate::RMDIR);
   mdlog->start_entry(le);
   le->commit.renamed_dirino = in->ino();
 
   mdcache->project_subtree_rename(in, dn->get_dir(), straydn->get_dir());
+  mdcache->add_uncommitted_slave(mdr->reqid, mdr->ls, mdr->slave_to_mds);
 
   mdr->more()->slave_update_journaled = true;
   submit_mdlog_entry(le, new C_MDS_SlaveRmdirPrep(this, mdr, dn, straydn),
 void Server::_commit_slave_rmdir(MDRequestRef& mdr, int r, CDentry *straydn)
 {
   dout(10) << "_commit_slave_rmdir " << *mdr << " r=" << r << dendl;
-  
+
   if (r == 0) {
     if (mdr->more()->slave_update_journaled) {
       CInode *strayin = straydn->get_projected_linkage()->get_inode();
   if (mdr)
     mdcache->request_finish(mdr);
 
-  mdcache->finish_rollback(reqid);
+  mdcache->finish_rollback(reqid, mdr);
 }
 
 
     mdr->ls = NULL;
     _logged_slave_rename(mdr, srcdn, destdn, straydn);
   } else {
+    mdcache->add_uncommitted_slave(mdr->reqid, mdr->ls, mdr->slave_to_mds);
     mdr->more()->slave_update_journaled = true;
     submit_mdlog_entry(le, new C_MDS_SlaveRenamePrep(this, mdr, srcdn, destdn, straydn),
                       mdr, __func__);
       mdr->more()->slave_rolling_back = false;
   }
 
-  mdcache->finish_rollback(mut->reqid);
+  mdcache->finish_rollback(mut->reqid, mdr);
 
   mut->cleanup();
 }
 
     mds->mdcache->wait_for_uncommitted_master(*p, gather_bld.new_sub());
   }
 
+  // slave ops that haven't been committed
+  for (set<metareqid_t>::iterator p = uncommitted_slaves.begin();
+       p != uncommitted_slaves.end();
+       ++p) {
+    dout(10) << "try_to_expire waiting for master to ack OP_FINISH on " << *p << dendl;
+    mds->mdcache->wait_for_uncommitted_slave(*p, gather_bld.new_sub());
+  }
+
   // uncommitted fragments
   for (set<dirfrag_t>::iterator p = uncommitted_fragments.begin();
        p != uncommitted_fragments.end();
 
   ceph_assert(g_conf()->mds_kill_journal_expire_at != 4);
 
-  // slave updates
-  for (elist<MDSlaveUpdate*>::iterator p = slave_updates.begin(member_offset(MDSlaveUpdate,
-                                                                            item));
-       !p.end(); ++p) {
-    MDSlaveUpdate *su = *p;
-    dout(10) << "try_to_expire waiting on slave update " << su << dendl;
-    ceph_assert(su->waiter == 0);
-    su->waiter = gather_bld.new_sub();
-  }
-
   // idalloc
   if (inotablev > mds->inotable->get_committed_version()) {
     dout(10) << "try_to_expire saving inotable table, need " << inotablev
   ls.push_back(new ESlaveUpdate());
 }
 
-
 void ESlaveUpdate::replay(MDSRank *mds)
 {
   MDSlaveUpdate *su;
   case ESlaveUpdate::OP_PREPARE:
     dout(10) << "ESlaveUpdate.replay prepare " << reqid << " for mds." << master 
             << ": applying commit, saving rollback info" << dendl;
-    su = new MDSlaveUpdate(origop, rollback, segment->slave_updates);
+    su = new MDSlaveUpdate(origop, rollback);
     commit.replay(mds, segment, su);
-    mds->mdcache->add_uncommitted_slave_update(reqid, master, su);
+    mds->mdcache->add_uncommitted_slave(reqid, segment, master, su);
     break;
 
   case ESlaveUpdate::OP_COMMIT:
-    su = mds->mdcache->get_uncommitted_slave_update(reqid, master);
-    if (su) {
-      dout(10) << "ESlaveUpdate.replay commit " << reqid << " for mds." << master << dendl;
-      mds->mdcache->finish_uncommitted_slave_update(reqid, master);
-    } else {
-      dout(10) << "ESlaveUpdate.replay commit " << reqid << " for mds." << master 
-              << ": ignoring, no previously saved prepare" << dendl;
-    }
+    dout(10) << "ESlaveUpdate.replay commit " << reqid << " for mds." << master << dendl;
+    mds->mdcache->finish_uncommitted_slave(reqid, false);
     break;
 
   case ESlaveUpdate::OP_ROLLBACK:
     dout(10) << "ESlaveUpdate.replay abort " << reqid << " for mds." << master
             << ": applying rollback commit blob" << dendl;
     commit.replay(mds, segment);
-    su = mds->mdcache->get_uncommitted_slave_update(reqid, master);
-    if (su)
-      mds->mdcache->finish_uncommitted_slave_update(reqid, master);
+    mds->mdcache->finish_uncommitted_slave(reqid, false);
     break;
 
   default: