]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mds: throttle purge stray operations
authorJohn Spray <john.spray@redhat.com>
Tue, 20 Jan 2015 12:17:56 +0000 (12:17 +0000)
committerJohn Spray <john.spray@redhat.com>
Fri, 20 Mar 2015 12:32:47 +0000 (12:32 +0000)
Pull out the stray-handling code into a separate StrayManager
class.

There is an additional improvement to stray reintegration here,
by passing the DN that triggered the reintegration through,
we should make a better choice about *which* hardlink to
reintegrate the inode into, rather than just picking the first.

Fixes: #10390
Signed-off-by: John Spray <john.spray@redhat.com>
src/mds/CDentry.h
src/mds/CDir.cc
src/mds/CInode.h
src/mds/MDCache.cc
src/mds/MDCache.h
src/mds/Makefile.am
src/mds/Server.cc
src/mds/StrayManager.cc [new file with mode: 0644]
src/mds/StrayManager.h [new file with mode: 0644]

index 01f706a116d11b1d112bc0b25e20072829d705e3..f56f2a208baa8ae297db5e0b8f1238d4995dcf9a 100644 (file)
@@ -152,6 +152,7 @@ protected:
   friend class Migrator;
   friend class Locker;
   friend class MDCache;
+  friend class StrayManager;
   friend class CInode;
   friend class C_MDC_XlockRequest;
 
index a99498daafea11c933881d1055268a4c609402e9..f505d2c8213cc80e5c278bbd8c1c629860f307be 100644 (file)
@@ -670,8 +670,13 @@ void CDir::remove_null_dentries() {
   assert(get_num_any() == items.size());
 }
 
-// remove dirty null dentries for deleted directory. the dirfrag will be
-// deleted soon, so it's safe to not commit dirty dentries.
+/** remove dirty null dentries for deleted directory. the dirfrag will be
+ *  deleted soon, so it's safe to not commit dirty dentries.
+ *
+ *  This is called when a directory is being deleted, a prerequisite
+ *  of which is that its children have been unlinked: we expect to only see
+ *  null, unprojected dentries here.
+ */
 void CDir::try_remove_dentries_for_stray()
 {
   dout(10) << __func__ << dendl;
@@ -685,8 +690,8 @@ void CDir::try_remove_dentries_for_stray()
     CDentry *dn = p->second;
     ++p;
     if (dn->last == CEPH_NOSNAP) {
-      if (!dn->get_linkage()->is_null() || dn->is_projected())
-       continue; // shouldn't happen
+      assert(!dn->is_projected());
+      assert(dn->get_linkage()->is_null());
       if (clear_dirty && dn->is_dirty())
        dn->mark_clean();
       // It's OK to remove lease prematurely because we will never link
@@ -696,8 +701,7 @@ void CDir::try_remove_dentries_for_stray()
       if (dn->get_num_ref() == 0)
        remove_dentry(dn);
     } else {
-      if (dn->is_projected())
-       continue; // shouldn't happen
+      assert(!dn->is_projected());
       CDentry::linkage_t *dnl= dn->get_linkage();
       CInode *in = NULL;
       if (dnl->is_primary()) {
index 7b3ced3b65b4570f461a3f0bed1337ffe93c6781..321ef0bc8ac1b926ea800f5b19a4534567823cf6 100644 (file)
@@ -523,6 +523,7 @@ public:
   friend class Locker;
   friend class Migrator;
   friend class MDCache;
+  friend class StrayManager;
   friend class CDir;
   friend class CInodeExport;
 
index 4898af301d6499cf3682432224cf08865c4c9a75..76bbf2ad42a3d64801b0c52993c08737ecc90adc 100644 (file)
@@ -168,11 +168,8 @@ public:
 
 MDCache::MDCache(MDS *m) :
   logger(0),
-  num_strays(0),
-  num_strays_purging(0),
-  num_strays_delayed(0),
   recovery_queue(m),
-  delayed_eval_stray(member_offset(CDentry, item_stray))
+  stray_manager(m)
 {
   mds = m;
   migrator = new Migrator(mds, this);
@@ -711,14 +708,15 @@ CDentry *MDCache::get_or_create_stray_dentry(CInode *in)
   if (!straydn) {
     straydn = straydir->add_null_dentry(straydname);
     straydn->mark_new();
-
-    num_strays++;
-    logger->set(l_mdc_num_strays, num_strays);
-    logger->inc(l_mdc_strays_created);
   } else {
     assert(straydn->get_projected_linkage()->is_null());
   }
 
+  // Notify even if a null dentry already existed, because
+  // StrayManager counts the number of stray inodes, not the
+  // number of dentries in the directory.
+  stray_manager.notify_stray_created();
+
   straydn->state_set(CDentry::STATE_STRAY);
   return straydn;
 }
@@ -6187,14 +6185,7 @@ bool MDCache::trim(int max, int count)
   dout(7) << "trim max=" << max << "  cur=" << lru.lru_get_size() << dendl;
 
   // process delayed eval_stray()
-  for (elist<CDentry*>::iterator p = delayed_eval_stray.begin(); !p.end(); ) {
-    CDentry *dn = *p;
-    ++p;
-    dn->item_stray.remove_myself();
-    num_strays_delayed--;
-    eval_stray(dn);
-  }
-  logger->set(l_mdc_num_strays_delayed, num_strays_delayed);
+  stray_manager.advance_delayed();
 
   map<mds_rank_t, MCacheExpire*> expiremap;
   bool is_standby_replay = mds->is_standby_replay();
@@ -6456,6 +6447,11 @@ void MDCache::trim_dirfrag(CDir *dir, CDir *con, map<mds_rank_t, MCacheExpire*>&
   in->close_dirfrag(dir->dirfrag().frag);
 }
 
+/**
+ * Try trimming an inode from the cache
+ *
+ * @return true if the inode is still in cache, else false if it was trimmed
+ */
 bool MDCache::trim_inode(CDentry *dn, CInode *in, CDir *con, map<mds_rank_t, MCacheExpire*>& expiremap)
 {
   dout(15) << "trim_inode " << *in << dendl;
@@ -6485,10 +6481,13 @@ bool MDCache::trim_inode(CDentry *dn, CInode *in, CDir *con, map<mds_rank_t, MCa
   // INODE
   if (in->is_auth()) {
     // eval stray after closing dirfrags
-    if (dn && !mds->is_standby_replay()) {
+    if (dn) {
       maybe_eval_stray(in);
-      if (dn->get_num_ref() > 0)
+      if (dn->get_num_ref() > 0) {
+        // Independent of whether we passed this on to the purge queue,
+        // if it still has refs then don't count it as trimmed
        return true;
+      }
     }
   } else {
     mds_authority_t auth = in->authority();
@@ -7401,10 +7400,13 @@ bool MDCache::shutdown_export_strays()
     }
     strays[i]->get_dirfrags(dfs);
   }
+
+  stray_manager.abort_queue();
   
-  while (!dfs.empty()) {
-    CDir *dir = dfs.front();
-    dfs.pop_front();
+  for (std::list<CDir*>::iterator dfs_i = dfs.begin();
+       dfs_i != dfs.end(); ++dfs_i)
+  {
+    CDir *dir = *dfs_i;
 
     if (!dir->is_complete()) {
       dir->fetch(0);
@@ -7419,10 +7421,16 @@ bool MDCache::shutdown_export_strays()
       if (dnl->is_null()) continue;
       done = false;
       
+      if (dn->state_test(CDentry::STATE_PURGING)) {
+        // Don't try to migrate anything that is actually
+        // being purged right now
+        continue;
+      }
+
       // FIXME: we'll deadlock if a rename fails.
       if (exported_strays.count(dnl->get_inode()->ino()) == 0) {
        exported_strays.insert(dnl->get_inode()->ino());
-       migrate_stray(dn, mds_rank_t(0));  // send to root!
+       stray_manager.migrate_stray(dn, mds_rank_t(0));  // send to root!
       } else {
        dout(10) << "already exporting " << *dn << dendl;
       }
@@ -9086,491 +9094,68 @@ void MDCache::scan_stray_dir(dirfrag_t next)
     for (CDir::map_t::iterator q = dir->items.begin(); q != dir->items.end(); ++q) {
       CDentry *dn = q->second;
       CDentry::linkage_t *dnl = dn->get_projected_linkage();
-      num_strays++;
-      if (dnl->is_primary())
+      stray_manager.notify_stray_created();
+      if (dnl->is_primary()) {
        maybe_eval_stray(dnl->get_inode());
+      }
     }
   }
 }
 
-struct C_MDC_EvalStray : public MDCacheContext {
-  CDentry *dn;
-  C_MDC_EvalStray(MDCache *c, CDentry *d) : MDCacheContext(c), dn(d) {}
-  void finish(int r) {
-    mdcache->eval_stray(dn);
-  }
-};
-
-void MDCache::eval_stray(CDentry *dn, bool delay)
+/**
+ * If a remote dentry refers to an inode whose primary
+ * dentry is a stray, then evaluate the inode for purging if
+ * we have the auth copy, or migrate the stray to use if we
+ * do not.
+ */
+void MDCache::eval_remote(CDentry *remote_dn)
 {
-  dout(10) << "eval_stray " << *dn << dendl;
-  CDentry::linkage_t *dnl = dn->get_projected_linkage();
-  dout(10) << " inode is " << *dnl->get_inode() << dendl;
-  assert(dnl->is_primary());
-  CInode *in = dnl->get_inode();
-  assert(in);
+  assert(remote_dn);
+  dout(10) << "eval_remote " << *remote_dn << dendl;
 
-  assert(dn->get_dir()->get_inode()->is_stray());
+  CDentry::linkage_t *dnl = remote_dn->get_projected_linkage();
+  assert(dnl->is_remote());
+  CInode *in = dnl->get_inode();
 
-  if (!dn->is_auth()) {
-    // has to be mine
-    // move to bottom of lru so that we trim quickly!
-    touch_dentry_bottom(dn);
+  if (!in) {
+    dout(20) << __func__ << ": no inode, cannot evaluate" << dendl;
     return;
   }
 
-  // purge?
-  if (in->inode.nlink == 0) {
-      // past snaprealm parents imply snapped dentry remote links.
-      // only important for directories.  normal file data snaps are handled
-      // by the object store.
-    if (in->snaprealm && in->snaprealm->has_past_parents()) {
-      if (!in->snaprealm->have_past_parents_open() &&
-         !in->snaprealm->open_parents(new C_MDC_EvalStray(this, dn)))
-       return;
-      in->snaprealm->prune_past_parents();
-      in->purge_stale_snap_data(in->snaprealm->get_snaps());
-    }
-    if (in->is_dir()) {
-      if (in->snaprealm && in->snaprealm->has_past_parents()) {
-       dout(20) << "  directory has past parents " << in->snaprealm->srnode.past_parents << dendl;
-       return;  // not until some snaps are deleted.
-      }
-      if (in->has_dirfrags()) {
-       list<CDir*> ls;
-       in->get_nested_dirfrags(ls);
-       for (list<CDir*>::iterator p = ls.begin(); p != ls.end(); ++p)
-         (*p)->try_remove_dentries_for_stray();
-      }
-    }
-    if (dn->is_replicated()) {
-      dout(20) << " replicated" << dendl;
-      return;
-    }
-    if (dn->is_any_leases() || in->is_any_caps()) {
-      dout(20) << " caps | leases" << dendl;
-      return;  // wait
-    }
-    if (dn->state_test(CDentry::STATE_PURGING)) {
-      dout(20) << " already purging" << dendl;
-      return;  // already purging
-    }
-    if (in->state_test(CInode::STATE_NEEDSRECOVER) ||
-       in->state_test(CInode::STATE_RECOVERING)) {
-      dout(20) << " pending recovery" << dendl;
-      return;  // don't mess with file size probing
-    }
-    if (in->get_num_ref() > (int)in->is_dirty() + (int)in->is_dirty_parent()) {
-      dout(20) << " too many inode refs" << dendl;
-      return;
-    }
-    if (dn->get_num_ref() > (int)dn->is_dirty() + !!in->get_num_ref()) {
-      dout(20) << " too many dn refs" << dendl;
-      return;
-    }
-    if (delay) {
-      if (!dn->item_stray.is_on_list()) {
-       delayed_eval_stray.push_back(&dn->item_stray);
-       num_strays_delayed++;
-       logger->set(l_mdc_num_strays_delayed, num_strays_delayed);
-      }
-    // don't purge multiversion inode with snap data
-    } else if (in->snaprealm && in->snaprealm->has_past_parents() &&
-              !in->old_inodes.empty()) {
-      assert(!in->is_dir());
-      dout(20) << " file has past parents " << in->snaprealm->srnode.past_parents << dendl;
-      if (in->is_file() && in->get_projected_inode()->size > 0)
-       truncate_stray(dn); // truncate head objects
-    } else {
-      if (in->is_dir())
-       in->close_dirfrags();
-      purge_stray(dn);
-    }
-  }
-  else if (in->inode.nlink >= 1) {
-    // trivial reintegrate?
-    if (!in->remote_parents.empty()) {
-      CDentry *rlink = *in->remote_parents.begin();
-      
-      // don't do anything if the remote parent is projected, or we may
-      // break user-visible semantics!
-      // NOTE: we repeat this check in _rename(), since our submission path is racey.
-      if (!rlink->is_projected()) {
-       if (rlink->is_auth() && rlink->dir->can_auth_pin())
-         reintegrate_stray(dn, rlink);
-       
-       if (!rlink->is_auth() && dn->is_auth())
-         migrate_stray(dn, rlink->authority().first);
-      }
-    }
-  } else {
-    // wait for next use.
-  }
-}
-
-void MDCache::eval_remote(CDentry *dn)
-{
-  dout(10) << "eval_remote " << *dn << dendl;
-  CDentry::linkage_t *dnl = dn->get_projected_linkage();
-  assert(dnl->is_remote());
-  CInode *in = dnl->get_inode();
-  if (!in) return;
-
   // refers to stray?
   if (in->get_parent_dn()->get_dir()->get_inode()->is_stray()) {
-    if (in->is_auth())
-      eval_stray(in->get_parent_dn());
-    else
-      migrate_stray(in->get_parent_dn(), mds->get_nodeid());
-  }
-}
-
-void MDCache::fetch_backtrace(inodeno_t ino, int64_t pool, bufferlist& bl, Context *fin)
-{
-  object_t oid = CInode::get_object_name(ino, frag_t(), "");
-  mds->objecter->getxattr(oid, object_locator_t(pool), "parent", CEPH_NOSNAP, &bl, 0, fin);
-}
-
-class C_IO_MDC_PurgeStrayPurged : public MDCacheIOContext {
-  CDentry *dn;
-  bool only_head;
-public:
-  C_IO_MDC_PurgeStrayPurged(MDCache *c, CDentry *d, bool oh) :
-    MDCacheIOContext(c), dn(d), only_head(oh) { }
-  void finish(int r) {
-    assert(r == 0 || r == -ENOENT);
-    mdcache->_purge_stray_purged(dn, only_head);
-  }
-};
-
-void MDCache::truncate_stray(CDentry *dn)
-{
-  CDentry::linkage_t *dnl = dn->get_projected_linkage();
-  CInode *in = dnl->get_inode();
-  dout(10) << "truncate_stray " << *dn << " " << *in << dendl;
-  assert(!dn->is_replicated());
-
-  dn->state_set(CDentry::STATE_PURGING);
-  dn->get(CDentry::PIN_PURGING);
-  in->state_set(CInode::STATE_PURGING);
-
-  if (dn->item_stray.is_on_list())
-    dn->item_stray.remove_myself();
-
-  C_GatherBuilder gather(
-    g_ceph_context,
-    new C_OnFinisher(new C_IO_MDC_PurgeStrayPurged(this, dn, true),
-                    &mds->finisher));
-
-  SnapRealm *realm = in->find_snaprealm();
-  assert(realm);
-  dout(10) << " realm " << *realm << dendl;
-  const SnapContext *snapc = &realm->get_snap_context();
-
-  uint64_t period = (uint64_t)in->inode.layout.fl_object_size *
-                   (uint64_t)in->inode.layout.fl_stripe_count;
-  uint64_t to = in->inode.get_max_size();
-  to = MAX(in->inode.size, to);
-  // when truncating a file, the filer does not delete stripe objects that are
-  // truncated to zero. so we need to purge stripe objects up to the max size
-  // the file has ever been.
-  to = MAX(in->inode.max_size_ever, to);
-  if (period && to > period) {
-    uint64_t num = (to - 1) / period;
-    dout(10) << "purge_stray 0~" << to << " objects 0~" << num
-      << " snapc " << snapc << " on " << *in << dendl;
-    mds->filer->purge_range(in->ino(), &in->inode.layout, *snapc,
-                           1, num, ceph_clock_now(g_ceph_context),
-                           0, gather.new_sub());
-  }
-
-  // keep backtrace object
-  if (period && to > 0) {
-    mds->filer->zero(in->ino(), &in->inode.layout, *snapc,
-                    0, period, ceph_clock_now(g_ceph_context),
-                    0, true, NULL, gather.new_sub());
-  }
-
-  assert(gather.has_subs());
-  gather.activate();
-}
-
-void MDCache::purge_stray(CDentry *dn)
-{
-  CDentry::linkage_t *dnl = dn->get_projected_linkage();
-  CInode *in = dnl->get_inode();
-  dout(10) << "purge_stray " << *dn << " " << *in << dendl;
-  assert(!dn->is_replicated());
-
-  dn->state_set(CDentry::STATE_PURGING);
-  dn->get(CDentry::PIN_PURGING);
-  in->state_set(CInode::STATE_PURGING);
-
-  num_strays_purging++;
-  logger->set(l_mdc_num_strays_purging, num_strays_purging);
-
-  if (dn->item_stray.is_on_list()) {
-    dn->item_stray.remove_myself();
-    num_strays_delayed--;
-    logger->set(l_mdc_num_strays_delayed, num_strays_delayed);
-  }
-
-  if (in->is_dirty_parent())
-    in->clear_dirty_parent();
-
-  // CHEAT.  there's no real need to journal our intent to purge, since
-  // that is implicit in the dentry's presence and non-use in the stray
-  // dir.  on recovery, we'll need to re-eval all strays anyway.
-  
-  SnapContext nullsnapc;
-  C_GatherBuilder gather(
-    g_ceph_context,
-    new C_OnFinisher(new C_IO_MDC_PurgeStrayPurged(this, dn, false),
-                    &mds->finisher));
-
-  if (in->is_dir()) {
-    object_locator_t oloc(mds->mdsmap->get_metadata_pool());
-    list<frag_t> ls;
-    if (!in->dirfragtree.is_leaf(frag_t()))
-      in->dirfragtree.get_leaves(ls);
-    ls.push_back(frag_t());
-    for (list<frag_t>::iterator p = ls.begin();
-         p != ls.end();
-         ++p) {
-      object_t oid = CInode::get_object_name(in->inode.ino, *p, "");
-      dout(10) << "purge_stray remove dirfrag " << oid << dendl;
-      mds->objecter->remove(oid, oloc, nullsnapc, ceph_clock_now(g_ceph_context),
-                            0, NULL, gather.new_sub());
-    }
-    assert(gather.has_subs());
-    gather.activate();
-    return;
-  }
+    CDentry *stray_dn = in->get_parent_dn();
 
-  const SnapContext *snapc;
-  SnapRealm *realm = in->find_snaprealm();
-  if (realm) {
-    dout(10) << " realm " << *realm << dendl;
-    snapc = &realm->get_snap_context();
-  } else {
-    dout(10) << " NO realm, using null context" << dendl;
-    snapc = &nullsnapc;
-    assert(in->last == CEPH_NOSNAP);
-  }
+    if (in->is_auth()) {
+      dout(20) << __func__ << ": have auth for inode, evaluating" << dendl;
 
-  if (in->is_file()) {
-    uint64_t period = (uint64_t)in->inode.layout.fl_object_size *
-                     (uint64_t)in->inode.layout.fl_stripe_count;
-    uint64_t to = in->inode.get_max_size();
-    to = MAX(in->inode.size, to);
-    // when truncating a file, the filer does not delete stripe objects that are
-    // truncated to zero. so we need to purge stripe objects up to the max size
-    // the file has ever been.
-    to = MAX(in->inode.max_size_ever, to);
-    if (to && period) {
-      uint64_t num = (to + period - 1) / period;
-      dout(10) << "purge_stray 0~" << to << " objects 0~" << num
-              << " snapc " << snapc << " on " << *in << dendl;
-      mds->filer->purge_range(in->inode.ino, &in->inode.layout, *snapc,
-                             0, num, ceph_clock_now(g_ceph_context), 0,
-                             gather.new_sub());
+      stray_manager.eval_remote_stray(stray_dn, remote_dn);
+    } else {
+      dout(20) << __func__ << ": do not have auth for inode, migrating " << dendl;
+      /*
+       * Inodes get filed into a stray dentry when a client unlinks
+       * the primary DN for them.  However, that doesn't mean there
+       * isn't a remote DN still in the world.  The remote DN just
+       * ends up pointing at a stray.  Strays can pretty much live
+       * forever in this scenario.
+       *
+       * Therefore, we have a special behaviour here: migrate a stray
+       * to <me> when <I> handle a client request with a trace referring
+       * to a stray inode on another MDS.
+       */
+      stray_manager.migrate_stray(stray_dn, mds->get_nodeid());
     }
-  }
-
-  inode_t *pi = in->get_projected_inode();
-  object_t oid = CInode::get_object_name(pi->ino, frag_t(), "");
-  // remove the backtrace object if it was not purged
-  if (!gather.has_subs()) {
-    object_locator_t oloc(pi->layout.fl_pg_pool);
-    dout(10) << "purge_stray remove backtrace object " << oid
-            << " pool " << oloc.pool << " snapc " << snapc << dendl;
-    mds->objecter->remove(oid, oloc, *snapc, ceph_clock_now(g_ceph_context), 0,
-                         NULL, gather.new_sub());
-  }
-  // remove old backtrace objects
-  for (compact_set<int64_t>::iterator p = pi->old_pools.begin();
-       p != pi->old_pools.end();
-       ++p) {
-    object_locator_t oloc(*p);
-    dout(10) << "purge_stray remove backtrace object " << oid
-            << " old pool " << *p << " snapc " << snapc << dendl;
-    mds->objecter->remove(oid, oloc, *snapc, ceph_clock_now(g_ceph_context), 0,
-                         NULL, gather.new_sub());
-  }
-  assert(gather.has_subs());
-  gather.activate();
-}
-
-class C_MDC_PurgeStrayLogged : public MDCacheContext {
-  CDentry *dn;
-  version_t pdv;
-  LogSegment *ls;
-public:
-  C_MDC_PurgeStrayLogged(MDCache *c, CDentry *d, version_t v, LogSegment *s) : 
-    MDCacheContext(c), dn(d), pdv(v), ls(s) { }
-  void finish(int r) {
-    mdcache->_purge_stray_logged(dn, pdv, ls);
-  }
-};
-class C_MDC_PurgeStrayLoggedTruncate : public MDCacheContext {
-  CDentry *dn;
-  LogSegment *ls;
-public:
-  C_MDC_PurgeStrayLoggedTruncate(MDCache *c, CDentry *d, LogSegment *s) : 
-    MDCacheContext(c), dn(d), ls(s) { }
-  void finish(int r) {
-    mdcache->_purge_stray_logged_truncate(dn, ls);
-  }
-};
-
-void MDCache::_purge_stray_purged(CDentry *dn, bool only_head)
-{
-  CInode *in = dn->get_projected_linkage()->get_inode();
-  dout(10) << "_purge_stray_purged " << *dn << " " << *in << dendl;
-
-  if (!only_head &&
-      in->get_num_ref() == (int)in->is_dirty() &&
-      dn->get_num_ref() == (int)dn->is_dirty() + !!in->get_num_ref() + 1/*PIN_PURGING*/) {
-    // kill dentry.
-    version_t pdv = dn->pre_dirty();
-    dn->push_projected_linkage(); // NULL
-
-    EUpdate *le = new EUpdate(mds->mdlog, "purge_stray");
-    mds->mdlog->start_entry(le);
-
-    // update dirfrag fragstat, rstat
-    CDir *dir = dn->get_dir();
-    fnode_t *pf = dir->project_fnode();
-    pf->version = dir->pre_dirty();
-    if (in->is_dir())
-      pf->fragstat.nsubdirs--;
-    else
-      pf->fragstat.nfiles--;
-    pf->rstat.sub(in->inode.accounted_rstat);
-
-    le->metablob.add_dir_context(dn->dir);
-    EMetaBlob::dirlump& dl = le->metablob.add_dir(dn->dir, true);
-    le->metablob.add_null_dentry(dl, dn, true);
-    le->metablob.add_destroyed_inode(in->ino());
-
-    mds->mdlog->submit_entry(le, new C_MDC_PurgeStrayLogged(this, dn, pdv, mds->mdlog->get_current_segment()));
-
-    num_strays_purging--;
-    num_strays--;
-    logger->set(l_mdc_num_strays, num_strays);
-    logger->set(l_mdc_num_strays_purging, num_strays_purging);
-    logger->inc(l_mdc_strays_purged);
   } else {
-    // new refs.. just truncate to 0
-    EUpdate *le = new EUpdate(mds->mdlog, "purge_stray truncate");
-    mds->mdlog->start_entry(le);
-    
-    inode_t *pi = in->project_inode();
-    pi->size = 0;
-    pi->max_size_ever = 0;
-    pi->client_ranges.clear();
-    pi->truncate_size = 0;
-    pi->truncate_from = 0;
-    pi->version = in->pre_dirty();
-
-    le->metablob.add_dir_context(dn->dir);
-    le->metablob.add_primary_dentry(dn, in, true);
-
-    mds->mdlog->submit_entry(le, new C_MDC_PurgeStrayLoggedTruncate(this, dn, mds->mdlog->get_current_segment()));
+    dout(20) << __func__ << ": inode's primary dn not stray" << dendl;
   }
 }
 
-void MDCache::_purge_stray_logged(CDentry *dn, version_t pdv, LogSegment *ls)
-{
-  CInode *in = dn->get_linkage()->get_inode();
-  dout(10) << "_purge_stray_logged " << *dn << " " << *in << dendl;
-
-  assert(!in->state_test(CInode::STATE_RECOVERING));
-
-  // unlink
-  assert(dn->get_projected_linkage()->is_null());
-  dn->dir->unlink_inode(dn);
-  dn->pop_projected_linkage();
-  dn->mark_dirty(pdv, ls);
-
-  dn->dir->pop_and_dirty_projected_fnode(ls);
-
-  in->state_clear(CInode::STATE_ORPHAN);
-  dn->state_clear(CDentry::STATE_PURGING);
-  dn->put(CDentry::PIN_PURGING);
-
-  // drop inode
-  if (in->is_dirty())
-    in->mark_clean();
-
-  remove_inode(in);
-
-  // drop dentry?
-  if (dn->is_new()) {
-    dout(20) << " dn is new, removing" << dendl;
-    dn->mark_clean();
-    dn->dir->remove_dentry(dn);
-  } else
-    touch_dentry_bottom(dn);  // drop dn as quickly as possible.
-}
-
-void MDCache::_purge_stray_logged_truncate(CDentry *dn, LogSegment *ls)
-{
-  CInode *in = dn->get_projected_linkage()->get_inode();
-  dout(10) << "_purge_stray_logged_truncate " << *dn << " " << *in << dendl;
-
-  dn->state_clear(CDentry::STATE_PURGING);
-  dn->put(CDentry::PIN_PURGING);
-
-  in->pop_and_dirty_projected_inode(ls);
-
-  eval_stray(dn);
-}
-
-void MDCache::reintegrate_stray(CDentry *straydn, CDentry *rdn)
+void MDCache::fetch_backtrace(inodeno_t ino, int64_t pool, bufferlist& bl, Context *fin)
 {
-  dout(10) << "reintegrate_stray " << *straydn << " into " << *rdn << dendl;
-  
-  // rename it to another mds.
-  filepath src;
-  straydn->make_path(src);
-  filepath dst;
-  rdn->make_path(dst);
-
-  MClientRequest *req = new MClientRequest(CEPH_MDS_OP_RENAME);
-  req->set_filepath(dst);
-  req->set_filepath2(src);
-  req->set_tid(mds->issue_tid());
-
-  mds->send_message_mds(req, rdn->authority().first);
+  object_t oid = CInode::get_object_name(ino, frag_t(), "");
+  mds->objecter->getxattr(oid, object_locator_t(pool), "parent", CEPH_NOSNAP, &bl, 0, fin);
 }
 
-void MDCache::migrate_stray(CDentry *dn, mds_rank_t to)
-{
-  CInode *in = dn->get_linkage()->get_inode();
-  assert(in);
-  CInode *diri = dn->dir->get_inode();
-  assert(diri->is_stray());
-  dout(10) << "migrate_stray from mds." << MDS_INO_STRAY_OWNER(diri->inode.ino)
-          << " to mds." << to
-          << " " << *dn << " " << *in << dendl;
-
-  // rename it to another mds.
-  filepath src;
-  dn->make_path(src);
-
-  string dname;
-  in->name_stray_dentry(dname);
-  filepath dst(dname, MDS_INO_STRAY(to, 0));
-
-  MClientRequest *req = new MClientRequest(CEPH_MDS_OP_RENAME);
-  req->set_filepath(dst);
-  req->set_filepath2(src);
-  req->set_tid(mds->issue_tid());
-
-  mds->send_message_mds(req, to);
-}
 
 
 
@@ -12068,9 +11653,12 @@ void MDCache::register_perfcounters()
         "Stray dentries", "stry");
     pcb.add_u64(l_mdc_num_strays_purging, "num_strays_purging");
     pcb.add_u64(l_mdc_num_strays_delayed, "num_strays_delayed");
+    pcb.add_u64(l_mdc_num_purge_ops, "num_purge_ops");
     pcb.add_u64_counter(l_mdc_strays_created, "strays_created");
     pcb.add_u64_counter(l_mdc_strays_purged, "strays_purged",
         "Stray dentries purged", "purg");
+    pcb.add_u64_counter(l_mdc_strays_reintegrated, "strays_reintegrated");
+    pcb.add_u64_counter(l_mdc_strays_migrated, "strays_migrated");
 
     /* Recovery queue statistics */
     pcb.add_u64(l_mdc_num_recovering_processing, "num_recovering_processing");
@@ -12084,5 +11672,37 @@ void MDCache::register_perfcounters()
     logger = pcb.create_perf_counters();
     g_ceph_context->get_perfcounters_collection()->add(logger);
     recovery_queue.set_logger(logger);
+    stray_manager.set_logger(logger);
+}
+
+/**
+ * Call this when putting references to an inode/dentry or
+ * when attempting to trim it.
+ *
+ * If this inode is no longer linked by anyone, and this MDS
+ * rank holds the primary dentry, and that dentry is in a stray
+ * directory, then give up the dentry to the StrayManager, never
+ * to be seen again by MDCache.
+ *
+ * @param delay if true, then purgeable inodes are stashed til
+ *              the next trim(), rather than being purged right
+ *              away.
+ */
+void MDCache::maybe_eval_stray(CInode *in, bool delay) {
+  if (in->inode.nlink > 0 || in->is_base() || is_readonly() || mds->is_standby_replay())
+    return;
+  CDentry *dn = in->get_projected_parent_dn();
+
+  if (dn->state_test(CDentry::STATE_PURGING)) {
+    /* We have already entered the purging process, no need
+     * to re-evaluate me ! */
+    return;
+  }
+
+  if (dn->get_projected_linkage()->is_primary() &&
+      dn->get_dir()->get_inode()->is_stray()) {
+    stray_manager.eval_stray(dn, delay);
+  }
 }
 
+
index 39b1f66f143eb0285badf1ca7d3ec4e0561fbe74..24873295ba5cc5d8e7d44df626a92b689b2e0cc6 100644 (file)
@@ -27,6 +27,7 @@
 #include "include/Context.h"
 #include "events/EMetaBlob.h"
 #include "RecoveryQueue.h"
+#include "StrayManager.h"
 #include "MDSContext.h"
 
 #include "messages/MClientRequest.h"
@@ -74,16 +75,22 @@ struct MDSlaveUpdate;
 
 enum {
   l_mdc_first = 3000,
-  // How many dentries are currently in stray dirs
+  // How many inodes currently in stray dentries
   l_mdc_num_strays,
   // How many stray dentries are currently being purged
   l_mdc_num_strays_purging,
   // How many stray dentries are currently delayed for purge due to refs
   l_mdc_num_strays_delayed,
+  // How many purge RADOS ops might currently be in flight?
+  l_mdc_num_purge_ops,
   // How many dentries have ever been added to stray dir
   l_mdc_strays_created,
   // How many dentries have ever finished purging from stray dir
   l_mdc_strays_purged,
+  // How many strays have been reintegrated?
+  l_mdc_strays_reintegrated,
+  // How many strays have been migrated?
+  l_mdc_strays_migrated,
 
   // How many inode sizes currently being recovered
   l_mdc_num_recovering_processing,
@@ -135,6 +142,17 @@ public:
   void advance_stray() {
     stray_index = (stray_index+1)%NUM_STRAY;
   }
+
+  /**
+   * Call this when you know that a CDentry is ready to be passed
+   * on to StrayManager (i.e. this is a stray you've just created)
+   */
+  void notify_stray(CDentry *dn) {
+    assert(dn->get_dir()->get_inode()->is_stray());
+    stray_manager.eval_stray(dn);
+  }
+
+  void maybe_eval_stray(CInode *in, bool delay=false);
   bool is_readonly() { return readonly; }
   void force_readonly();
 
@@ -143,10 +161,6 @@ public:
   int num_inodes_with_caps;
   int num_caps;
 
-  uint64_t num_strays;
-  uint64_t num_strays_purging;
-  uint64_t num_strays_delayed;
-
   unsigned max_dir_commit_size;
 
   ceph_file_layout default_file_layout;
@@ -166,6 +180,16 @@ public:
     r->ttl = ttl;
   }
 
+  void notify_stray_removed()
+  {
+    stray_manager.notify_stray_removed();
+  }
+
+  void notify_stray_created()
+  {
+    stray_manager.notify_stray_created();
+  }
+
   // -- client caps --
   uint64_t              last_cap_id;
   
@@ -569,6 +593,9 @@ public:
   friend class Migrator;
   friend class MDBalancer;
 
+  // StrayManager needs to be able to remove_inode() from us
+  // when it is done purging
+  friend class StrayManager;
 
   // File size recovery
 private:
@@ -913,38 +940,14 @@ public:
 
   // -- stray --
 public:
-  elist<CDentry*> delayed_eval_stray;
-
-  void eval_stray(CDentry *dn, bool delay=false);
   void eval_remote(CDentry *dn);
-
-  void maybe_eval_stray(CInode *in, bool delay=false) {
-    if (in->inode.nlink > 0 || in->is_base() || is_readonly())
-      return;
-    CDentry *dn = in->get_projected_parent_dn();
-    if (!dn->state_test(CDentry::STATE_PURGING) &&
-       dn->get_projected_linkage()->is_primary() &&
-       dn->get_dir()->get_inode()->is_stray())
-      eval_stray(dn, delay);
-  }
-
   void fetch_backtrace(inodeno_t ino, int64_t pool, bufferlist& bl, Context *fin);
 
 protected:
   void scan_stray_dir(dirfrag_t next=dirfrag_t());
-  void truncate_stray(CDentry *dn);
-  void purge_stray(CDentry *dn);
-  void _purge_stray_purged(CDentry *dn, bool only_head);
-  void _purge_stray_logged(CDentry *dn, version_t pdv, LogSegment *ls);
-  void _purge_stray_logged_truncate(CDentry *dn, LogSegment *ls);
+  StrayManager stray_manager;
   friend struct C_MDC_RetryScanStray;
   friend class C_IO_MDC_FetchedBacktrace;
-  friend class C_MDC_PurgeStrayLogged;
-  friend class C_MDC_PurgeStrayLoggedTruncate;
-  friend class C_IO_MDC_PurgeStrayPurged;
-  void reintegrate_stray(CDentry *dn, CDentry *rlink);
-  void migrate_stray(CDentry *dn, mds_rank_t dest);
-
 
   // == messages ==
  public:
index 93b0175519217614837432f6539587c8ef22fba9..e62ab3f92dd5fa0be1e01ac78ec06b7afccfd062 100644 (file)
@@ -11,6 +11,7 @@ libmds_la_SOURCES = \
        mds/Mutation.cc \
        mds/MDCache.cc \
        mds/RecoveryQueue.cc \
+       mds/StrayManager.cc \
        mds/Locker.cc \
        mds/Migrator.cc \
        mds/MDBalancer.cc \
@@ -53,6 +54,7 @@ noinst_HEADERS += \
        mds/MDBalancer.h \
        mds/MDCache.h \
        mds/RecoveryQueue.h \
+       mds/StrayManager.h \
        mds/MDLog.h \
        mds/MDS.h \
        mds/Beacon.h \
index 61ca3cc314a7ed0d7e00c886d301f464b7ebdd8d..c279400d5c4cec1071292ccb71b668ae6362a697 100644 (file)
@@ -5348,8 +5348,11 @@ void Server::_unlink_local_finish(MDRequestRef& mdr,
   dn->get_dir()->try_remove_unlinked_dn(dn);
 
   // clean up?
-  if (straydn)
-    mdcache->eval_stray(straydn);
+  if (straydn) {
+    // Tip off the MDCache that this dentry is a stray that
+    // might be elegible for purge.
+    mdcache->notify_stray(straydn);
+  }
 }
 
 bool Server::_rmdir_prepare_witness(MDRequestRef& mdr, mds_rank_t who, CDentry *dn, CDentry *straydn)
@@ -6183,8 +6186,9 @@ void Server::_rename_finish(MDRequestRef& mdr, CDentry *srcdn, CDentry *destdn,
     mds->locker->eval(in, CEPH_CAP_LOCKS, true);
 
   // clean up?
-  if (straydn) 
-    mdcache->eval_stray(straydn);
+  if (straydn) {
+    mdcache->notify_stray(straydn);
+  }
 }
 
 
@@ -6694,6 +6698,20 @@ void Server::_rename_apply(MDRequestRef& mdr, CDentry *srcdn, CDentry *destdn, C
     }
   }
 
+  if (srcdn->get_dir()->inode->is_stray() &&
+      srcdn->get_dir()->inode->get_stray_owner() == mds->whoami) {
+    // A reintegration event or a migration away from me
+    dout(20) << __func__ << ": src dentry was a stray, updating stats" << dendl;
+    mdcache->notify_stray_removed();
+  }
+
+  if (destdn->get_dir()->inode->is_stray() &&
+      destdn->get_dir()->inode->get_stray_owner() == mds->whoami) {
+    // A stray migration (to me)
+    dout(20) << __func__ << ": dst dentry was a stray, updating stats" << dendl;
+    mdcache->notify_stray_created();
+  }
+
   // src
   if (srcdn->is_auth())
     srcdn->mark_dirty(mdr->more()->pvmap[srcdn], mdr->ls);
diff --git a/src/mds/StrayManager.cc b/src/mds/StrayManager.cc
new file mode 100644 (file)
index 0000000..294f5cd
--- /dev/null
@@ -0,0 +1,911 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2015 Red Hat
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software 
+ * Foundation.  See file COPYING.
+ * 
+ */
+
+
+#include "common/perf_counters.h"
+
+#include "osdc/Objecter.h"
+#include "osdc/Filer.h"
+#include "mds/MDS.h"
+#include "mds/MDCache.h"
+#include "mds/MDLog.h"
+#include "mds/CDir.h"
+#include "mds/CDentry.h"
+#include "events/EUpdate.h"
+#include "messages/MClientRequest.h"
+
+#include "StrayManager.h"
+
+#define dout_subsys ceph_subsys_mds
+#undef dout_prefix
+#define dout_prefix _prefix(_dout, mds)
+static ostream& _prefix(std::ostream *_dout, MDS *mds) {
+  return *_dout << "mds." << mds->get_nodeid() << ".cache.strays ";
+}
+
+class StrayManagerIOContext : public virtual MDSIOContextBase {
+protected:
+  StrayManager *sm;
+  virtual MDS *get_mds()
+  {
+    return sm->mds;
+  }
+public:
+  StrayManagerIOContext(StrayManager *sm_) : sm(sm_) {}
+};
+
+
+class StrayManagerContext : public virtual MDSInternalContextBase {
+protected:
+  StrayManager *sm;
+  virtual MDS *get_mds()
+  {
+    return sm->mds;
+  }
+public:
+  StrayManagerContext(StrayManager *sm_) : sm(sm_) {}
+};
+
+
+/**
+ * Context wrapper for _purge_stray_purged completion
+ */
+class C_IO_PurgeStrayPurged : public StrayManagerIOContext {
+  CDentry *dn;
+  bool only_head;
+  // How many ops_in_flight were allocated to this purge?
+  uint32_t ops_allowance;
+public:
+  C_IO_PurgeStrayPurged(StrayManager *sm_, CDentry *d, bool oh, uint32_t ops) : 
+    StrayManagerIOContext(sm_), dn(d), only_head(oh), ops_allowance(ops) { }
+  void finish(int r) {
+    assert(r == 0 || r == -ENOENT);
+    sm->_purge_stray_purged(dn, ops_allowance, only_head);
+  }
+};
+
+/**
+ * Purge a dentry from a stray directory.  This function
+ * is called once eval_stray is satisfied and StrayManager
+ * throttling is also satisfied.  There is no going back
+ * at this stage!
+ */
+void StrayManager::purge(CDentry *dn, uint32_t op_allowance)
+{
+  CDentry::linkage_t *dnl = dn->get_projected_linkage();
+  CInode *in = dnl->get_inode();
+  dout(10) << __func__ << " " << *dn << " " << *in << dendl;
+  assert(!dn->is_replicated());
+
+  num_strays_purging++;
+  logger->set(l_mdc_num_strays_purging, num_strays_purging);
+
+  if (dn->item_stray.is_on_list()) {
+    dn->item_stray.remove_myself();
+    num_strays_delayed--;
+    logger->set(l_mdc_num_strays_delayed, num_strays_delayed);
+  }
+
+  // CHEAT.  there's no real need to journal our intent to purge, since
+  // that is implicit in the dentry's presence and non-use in the stray
+  // dir.  on recovery, we'll need to re-eval all strays anyway.
+  
+  SnapContext nullsnapc;
+  C_GatherBuilder gather(
+    g_ceph_context,
+    new C_OnFinisher(new C_IO_PurgeStrayPurged(
+        this, dn, false, op_allowance), &mds->finisher));
+
+  if (in->is_dir()) {
+    object_locator_t oloc(mds->mdsmap->get_metadata_pool());
+    std::list<frag_t> ls;
+    if (!in->dirfragtree.is_leaf(frag_t()))
+      in->dirfragtree.get_leaves(ls);
+    ls.push_back(frag_t());
+    for (std::list<frag_t>::iterator p = ls.begin();
+         p != ls.end();
+         ++p) {
+      object_t oid = CInode::get_object_name(in->inode.ino, *p, "");
+      dout(10) << __func__ << " remove dirfrag " << oid << dendl;
+      mds->objecter->remove(oid, oloc, nullsnapc, ceph_clock_now(g_ceph_context),
+                            0, NULL, gather.new_sub());
+    }
+    assert(gather.has_subs());
+    gather.activate();
+    return;
+  }
+
+  const SnapContext *snapc;
+  SnapRealm *realm = in->find_snaprealm();
+  if (realm) {
+    dout(10) << " realm " << *realm << dendl;
+    snapc = &realm->get_snap_context();
+  } else {
+    dout(10) << " NO realm, using null context" << dendl;
+    snapc = &nullsnapc;
+    assert(in->last == CEPH_NOSNAP);
+  }
+
+  if (in->is_file()) {
+    uint64_t period = (uint64_t)in->inode.layout.fl_object_size *
+                     (uint64_t)in->inode.layout.fl_stripe_count;
+    uint64_t to = in->inode.get_max_size();
+    to = MAX(in->inode.size, to);
+    // when truncating a file, the filer does not delete stripe objects that are
+    // truncated to zero. so we need to purge stripe objects up to the max size
+    // the file has ever been.
+    to = MAX(in->inode.max_size_ever, to);
+    if (to && period) {
+      uint64_t num = (to + period - 1) / period;
+      dout(10) << __func__ << " 0~" << to << " objects 0~" << num
+              << " snapc " << snapc << " on " << *in << dendl;
+      mds->filer->purge_range(in->inode.ino, &in->inode.layout, *snapc,
+                             0, num, ceph_clock_now(g_ceph_context), 0,
+                             gather.new_sub());
+    }
+  }
+
+  inode_t *pi = in->get_projected_inode();
+  object_t oid = CInode::get_object_name(pi->ino, frag_t(), "");
+  // remove the backtrace object if it was not purged
+  if (!gather.has_subs()) {
+    object_locator_t oloc(pi->layout.fl_pg_pool);
+    dout(10) << __func__ << " remove backtrace object " << oid
+            << " pool " << oloc.pool << " snapc " << snapc << dendl;
+    mds->objecter->remove(oid, oloc, *snapc, ceph_clock_now(g_ceph_context), 0,
+                         NULL, gather.new_sub());
+  }
+  // remove old backtrace objects
+  for (compact_set<int64_t>::iterator p = pi->old_pools.begin();
+       p != pi->old_pools.end();
+       ++p) {
+    object_locator_t oloc(*p);
+    dout(10) << __func__ << " remove backtrace object " << oid
+            << " old pool " << *p << " snapc " << snapc << dendl;
+    mds->objecter->remove(oid, oloc, *snapc, ceph_clock_now(g_ceph_context), 0,
+                         NULL, gather.new_sub());
+  }
+  assert(gather.has_subs());
+  gather.activate();
+}
+
+class C_PurgeStrayLogged : public StrayManagerContext {
+  CDentry *dn;
+  version_t pdv;
+  LogSegment *ls;
+public:
+  C_PurgeStrayLogged(StrayManager *sm_, CDentry *d, version_t v, LogSegment *s) : 
+    StrayManagerContext(sm_), dn(d), pdv(v), ls(s) { }
+  void finish(int r) {
+    sm->_purge_stray_logged(dn, pdv, ls);
+  }
+};
+
+class C_TruncateStrayLogged : public StrayManagerContext {
+  CDentry *dn;
+  LogSegment *ls;
+public:
+  C_TruncateStrayLogged(StrayManager *sm, CDentry *d, LogSegment *s) :
+    StrayManagerContext(sm), dn(d), ls(s) { }
+  void finish(int r) {
+    sm->_truncate_stray_logged(dn, ls);
+  }
+};
+
+/**
+ * Completion handler for a Filer::purge on a stray inode.
+ *
+ *
+ */
+void StrayManager::_purge_stray_purged(
+    CDentry *dn, uint32_t ops_allowance, bool only_head)
+{
+  CInode *in = dn->get_projected_linkage()->get_inode();
+  dout(10) << "_purge_stray_purged " << *dn << " " << *in << dendl;
+
+  if (only_head) {
+    /* This was a ::truncate */
+    EUpdate *le = new EUpdate(mds->mdlog, "purge_stray truncate");
+    mds->mdlog->start_entry(le);
+    
+    inode_t *pi = in->project_inode();
+    pi->size = 0;
+    pi->max_size_ever = 0;
+    pi->client_ranges.clear();
+    pi->truncate_size = 0;
+    pi->truncate_from = 0;
+    pi->version = in->pre_dirty();
+
+    le->metablob.add_dir_context(dn->dir);
+    le->metablob.add_primary_dentry(dn, in, true);
+
+    mds->mdlog->submit_entry(le,
+        new C_TruncateStrayLogged(
+          this, dn, mds->mdlog->get_current_segment()));
+  } else {
+    if (in->get_num_ref() != (int)in->is_dirty() ||
+        dn->get_num_ref() != (int)dn->is_dirty() + !!in->get_num_ref() + 1/*PIN_PURGING*/) {
+      // Nobody should be taking new references to an inode when it
+      // is being purged (aside from it it were 
+
+      derr << "Rogue reference after purge to " << *dn << dendl;
+      assert(0 == "rogue reference to purging inode");
+    }
+
+    // kill dentry.
+    version_t pdv = dn->pre_dirty();
+    dn->push_projected_linkage(); // NULL
+
+    EUpdate *le = new EUpdate(mds->mdlog, "purge_stray");
+    mds->mdlog->start_entry(le);
+
+    // update dirfrag fragstat, rstat
+    CDir *dir = dn->get_dir();
+    fnode_t *pf = dir->project_fnode();
+    pf->version = dir->pre_dirty();
+    if (in->is_dir())
+      pf->fragstat.nsubdirs--;
+    else
+      pf->fragstat.nfiles--;
+    pf->rstat.sub(in->inode.accounted_rstat);
+
+    le->metablob.add_dir_context(dn->dir);
+    EMetaBlob::dirlump& dl = le->metablob.add_dir(dn->dir, true);
+    le->metablob.add_null_dentry(dl, dn, true);
+    le->metablob.add_destroyed_inode(in->ino());
+
+    mds->mdlog->submit_entry(le, new C_PurgeStrayLogged(this, dn, pdv,
+          mds->mdlog->get_current_segment()));
+
+    num_strays--;
+    logger->set(l_mdc_num_strays, num_strays);
+    logger->inc(l_mdc_strays_purged);
+  }
+
+  num_strays_purging--;
+  logger->set(l_mdc_num_strays_purging, num_strays_purging);
+
+  // Release resources
+  dout(10) << __func__ << ": decrementing op allowance "
+    << ops_allowance << " from " << ops_in_flight << " in flight" << dendl;
+  assert(ops_in_flight >= ops_allowance);
+  ops_in_flight -= ops_allowance;
+  logger->set(l_mdc_num_purge_ops, ops_in_flight);
+  files_purging -= 1;
+  _advance();
+}
+
+void StrayManager::_purge_stray_logged(CDentry *dn, version_t pdv, LogSegment *ls)
+{
+  CInode *in = dn->get_linkage()->get_inode();
+  dout(10) << "_purge_stray_logged " << *dn << " " << *in << dendl;
+
+  assert(!in->state_test(CInode::STATE_RECOVERING));
+
+  // unlink
+  assert(dn->get_projected_linkage()->is_null());
+  dn->dir->unlink_inode(dn);
+  dn->pop_projected_linkage();
+  dn->mark_dirty(pdv, ls);
+
+  dn->dir->pop_and_dirty_projected_fnode(ls);
+
+  in->state_clear(CInode::STATE_ORPHAN);
+  dn->state_clear(CDentry::STATE_PURGING);
+  dn->put(CDentry::PIN_PURGING);
+
+  // drop inode
+  if (in->is_dirty())
+    in->mark_clean();
+
+  // drop dentry?
+  if (dn->is_new()) {
+    dout(20) << " dn is new, removing" << dendl;
+    dn->mark_clean();
+    dn->dir->remove_dentry(dn);
+    in->mdcache->remove_inode(in);
+  } else {
+    in->mdcache->touch_dentry_bottom(dn);  // drop dn as quickly as possible.
+  }
+}
+
+
+/**
+ * Enqueue a purge operation on a dentry that has passed the tests
+ * in eval_stray.  This may start the operation inline if the throttle
+ * allowances are already available.
+ *
+ * @param trunc false to purge dentry (normal), true to just truncate
+ *                 inode (snapshots)
+ */
+void StrayManager::enqueue(CDentry *dn, bool trunc)
+{
+  CDentry::linkage_t *dnl = dn->get_projected_linkage();
+  assert(dnl);
+  CInode *in = dnl->get_inode();
+  assert(in);
+
+  /* We consider a stray to be purging as soon as it is enqueued, to avoid
+   * enqueing it twice */
+  dn->state_set(CDentry::STATE_PURGING);
+  dn->get(CDentry::PIN_PURGING);
+  in->state_set(CInode::STATE_PURGING);
+
+  /* We must clear this as soon as enqueuing it, to prevent the journal
+   * expiry code from seeing a dirty parent and trying to write a backtrace */
+  if (!trunc) {
+    if (in->is_dirty_parent()) {
+      in->clear_dirty_parent();
+    }
+  }
+
+
+  // Try to purge immediately if possible, else enqueue
+  const uint32_t ops_required = _calculate_ops_required(in, trunc);
+
+  bool consumed = _consume(dn, trunc, ops_required);
+  if (consumed) {
+    dout(10) << __func__ << ": purging this dentry immediately: "
+      << *dn << dendl;
+  } else {
+    dout(10) << __func__ << ": enqueuing this dentry for later purge: "
+      << *dn << dendl;
+    ready_for_purge.push_back(QueuedStray(dn, trunc, ops_required));
+  }
+}
+
+
+/**
+ * Iteratively call _consume on items from the ready_for_purge
+ * list until it returns false (throttle limit reached)
+ */
+void StrayManager::_advance()
+{
+  std::list<QueuedStray>::iterator i;
+  for (i = ready_for_purge.begin();
+       i != ready_for_purge.end(); ++i) {
+    const QueuedStray &qs = *i;
+    const bool consumed = _consume(qs.dn, qs.trunc, qs.ops_required);
+    if (!consumed) {
+      break;
+    }
+  }
+
+  // Erase all the ones that returned true from _consume
+  ready_for_purge.erase(ready_for_purge.begin(), i);
+}
+
+/**
+ * Attempt to purge an inode, if throttling permits
+ * it.  Note that there are compromises to how
+ * the throttling works, in interests of simplicity:
+ *  * If insufficient ops are available to execute
+ *    the next item on the queue, we block even if
+ *    there are items further down the queue requiring
+ *    fewer ops which might be executable
+ *  * The ops considered "in use" by a purge will be
+ *    an overestimate over the period of execution, as
+ *    we count filer_max_purge_ops and ops for old backtraces
+ *    as in use throughout, even though towards the end
+ *    of the purge the actual ops in flight will be
+ *    lower.
+ *  * The ops limit may be exceeded if the number of ops
+ *    required by a single inode is greater than the
+ *    limit, for example directories with very many
+ *    fragments.
+ *
+ * Return true if we successfully consumed resource,
+ * false if insufficient resource was available.
+ */
+bool StrayManager::_consume(CDentry *dn, bool trunc, uint32_t ops_required)
+{
+  const int files_avail = g_conf->mds_max_purge_files - files_purging;
+
+  if (files_avail <= 0) {
+    dout(20) << __func__ << ": throttling on max files" << dendl;
+    return false;
+  } else {
+    dout(20) << __func__ << ": purging dn: " << *dn << dendl;
+  }
+
+  // Calculate how much of the ops allowance is available, allowing
+  // for the case where the limit is currently being exceeded.
+  uint32_t ops_avail;
+  if (ops_in_flight <= g_conf->mds_max_purge_ops) {
+    ops_avail = g_conf->mds_max_purge_ops - ops_in_flight;
+  } else {
+    ops_avail = 0;
+  }
+
+  /* The ops_in_flight > 0 condition here handles the case where the
+   * ops required by this inode would never fit in the limit: we wait
+   * instead until nothing else is running */
+  if (ops_in_flight > 0 && ops_avail < ops_required) {
+    dout(20) << __func__ << ": throttling on max ops (require "
+             << ops_required << ", " << ops_in_flight << " in flight" << dendl;
+    return false;
+  }
+
+  // Resources are available, acquire them and execute the purge
+  files_purging += 1;
+  dout(10) << __func__ << ": allocating allowance "
+    << ops_required << " to " << ops_in_flight << " in flight" << dendl;
+  ops_in_flight += ops_required;
+  logger->set(l_mdc_num_purge_ops, ops_in_flight);
+  if (trunc) {
+    truncate(dn, ops_required);
+  } else {
+    purge(dn, ops_required);
+  }
+  return true;
+}
+
+
+/**
+ * Return the maximum number of concurrent RADOS ops that
+ * may be executed while purging this inode.
+ *
+ * @param trunc true if it's a truncate, false if it's a purge
+ */
+uint32_t StrayManager::_calculate_ops_required(CInode *in, bool trunc)
+{
+  uint32_t ops_required = 0;
+  if (in->is_dir()) {
+    // Directory, count dirfrags to be deleted
+    std::list<frag_t> ls;
+    if (!in->dirfragtree.is_leaf(frag_t())) {
+      in->dirfragtree.get_leaves(ls);
+    }
+    // One for the root, plus any leaves
+    ops_required = 1 + ls.size();
+  } else {
+    // File, work out concurrent Filer::purge deletes
+    const uint64_t period = (uint64_t)in->inode.layout.fl_object_size *
+                     (uint64_t)in->inode.layout.fl_stripe_count;
+    const uint64_t to = MAX(in->inode.max_size_ever,
+            MAX(in->inode.size, in->inode.get_max_size()));
+
+    const uint64_t num = MAX(1, (to + period - 1) / period);
+    ops_required = MIN(num, g_conf->filer_max_purge_ops);
+
+    // Account for removing (or zeroing) backtrace
+    ops_required += 1;
+
+    // Account for deletions for old pools
+    if (!trunc) {
+      ops_required += in->get_projected_inode()->old_pools.size();
+    }
+  }
+
+  return ops_required;
+}
+
+void StrayManager::advance_delayed()
+{
+  for (elist<CDentry*>::iterator p = delayed_eval_stray.begin(); !p.end(); ) {
+    CDentry *dn = *p;
+    ++p;
+    dn->item_stray.remove_myself();
+    num_strays_delayed--;
+
+    if (dn->get_projected_linkage()->is_null()) {
+      /* A special case: a stray dentry can go null if its inode is being
+       * re-linked into another MDS's stray dir during a shutdown migration. */
+      dout(4) << __func__ << ": delayed dentry is now null: " << *dn << dendl;
+      continue;
+    }
+
+    const bool purging = eval_stray(dn);
+    if (!purging) {
+      derr << "Dentry " << *dn << " was purgeable but no longer is!" << dendl;
+      /*
+       * This can happen if a stray is purgeable, but has gained an extra
+       * reference by virtue of having its backtrace updated.
+       * FIXME perhaps we could simplify this further by
+       * avoiding writing the backtrace of purge-ready strays, so
+       * that this code could be more rigid?
+       */
+    }
+  }
+  logger->set(l_mdc_num_strays_delayed, num_strays_delayed);
+}
+
+void StrayManager::notify_stray_created()
+{
+  num_strays++;
+  logger->set(l_mdc_num_strays, num_strays);
+  logger->inc(l_mdc_strays_created);
+}
+
+void StrayManager::notify_stray_removed()
+{
+  num_strays--;
+  logger->set(l_mdc_num_strays, num_strays);
+}
+
+struct C_EvalStray : public StrayManagerContext {
+  CDentry *dn;
+  C_EvalStray(StrayManager *sm_, CDentry *d) : StrayManagerContext(sm_), dn(d) {}
+  void finish(int r) {
+    sm->eval_stray(dn);
+  }
+};
+
+struct C_MDC_EvalStray : public StrayManagerContext {
+  CDentry *dn;
+  C_MDC_EvalStray(StrayManager *sm_, CDentry *d) : StrayManagerContext(sm_), dn(d) {}
+  void finish(int r) {
+    sm->eval_stray(dn);
+  }
+};
+
+/**
+ * Evaluate a stray dentry for purging or reintegration.
+ *
+ * If the inode has no linkage, and no more references, then
+ * we may decide to purge it.
+ *
+ * If the inode still has linkage, then it means someone else
+ * (a hard link) is still referring to it, and we should
+ * think about reintegrating that inode into the remote dentry.
+ *
+ * @returns true if the dentry will be purged (caller should never
+ *          take more refs after this happens), else false.
+ */
+bool StrayManager::eval_stray(CDentry *dn, bool delay)
+{
+  dout(10) << "eval_stray " << *dn << dendl;
+  CDentry::linkage_t *dnl = dn->get_projected_linkage();
+  assert(dnl->is_primary());
+  dout(10) << " inode is " << *dnl->get_inode() << dendl;
+  CInode *in = dnl->get_inode();
+  assert(in);
+
+  // The only dentries elegible for purging are those
+  // in the stray directories
+  assert(dn->get_dir()->get_inode()->is_stray());
+
+  // Inode may not pass through this function if it
+  // was already identified for purging (i.e. cannot
+  // call eval_stray() after purge()
+  assert(!dn->state_test(CDentry::STATE_PURGING));
+
+  if (!dn->is_auth()) {
+    // has to be mine
+    // move to bottom of lru so that we trim quickly!
+
+    in->mdcache->touch_dentry_bottom(dn);
+    return false;
+  }
+
+  // purge?
+  if (in->inode.nlink == 0) {
+    // past snaprealm parents imply snapped dentry remote links.
+    // only important for directories.  normal file data snaps are handled
+    // by the object store.
+    if (in->snaprealm && in->snaprealm->has_past_parents()) {
+      if (!in->snaprealm->have_past_parents_open() &&
+          !in->snaprealm->open_parents(new C_MDC_EvalStray(this, dn))) {
+        return false;
+      }
+      in->snaprealm->prune_past_parents();
+      in->purge_stale_snap_data(in->snaprealm->get_snaps());
+    }
+    if (in->is_dir()) {
+      if (in->snaprealm && in->snaprealm->has_past_parents()) {
+       dout(20) << "  directory has past parents "
+          << in->snaprealm->srnode.past_parents << dendl;
+       return false;  // not until some snaps are deleted.
+      }
+
+      if (in->has_dirfrags()) {
+        list<CDir*> ls;
+        in->get_nested_dirfrags(ls);
+        for (list<CDir*>::iterator p = ls.begin(); p != ls.end(); ++p) {
+          (*p)->try_remove_dentries_for_stray();
+        }
+      }
+    }
+    if (dn->is_replicated()) {
+      dout(20) << " replicated" << dendl;
+      return false;
+    }
+    if (dn->is_any_leases() || in->is_any_caps()) {
+      dout(20) << " caps | leases" << dendl;
+      return false;  // wait
+    }
+    if (dn->state_test(CDentry::STATE_PURGING)) {
+      dout(20) << " already purging" << dendl;
+      return false;  // already purging
+    }
+    if (in->state_test(CInode::STATE_NEEDSRECOVER) ||
+       in->state_test(CInode::STATE_RECOVERING)) {
+      dout(20) << " pending recovery" << dendl;
+      return false;  // don't mess with file size probing
+    }
+    if (in->get_num_ref() > (int)in->is_dirty() + (int)in->is_dirty_parent()) {
+      dout(20) << " too many inode refs" << dendl;
+      return false;
+    }
+    if (dn->get_num_ref() > (int)dn->is_dirty() + !!in->get_num_ref()) {
+      dout(20) << " too many dn refs" << dendl;
+      return false;
+    }
+    if (delay) {
+      if (!dn->item_stray.is_on_list()) {
+       delayed_eval_stray.push_back(&dn->item_stray);
+       num_strays_delayed++;
+       logger->set(l_mdc_num_strays_delayed, num_strays_delayed);
+      }
+    // don't purge multiversion inode with snap data
+    } else if (in->snaprealm && in->snaprealm->has_past_parents() &&
+              !in->old_inodes.empty()) {
+      // A file with snapshots: we will truncate the HEAD revision
+      // but leave the metadata intact.
+      assert(!in->is_dir());
+      dout(20) << " file has past parents "
+        << in->snaprealm->srnode.past_parents << dendl;
+      if (in->is_file() && in->get_projected_inode()->size > 0) {
+       enqueue(dn, true); // truncate head objects    
+      }
+    } else {
+      // A straightforward file, ready to be purged.  Enqueue it.
+      if (in->is_dir()) {
+       in->close_dirfrags();
+      }
+
+      enqueue(dn, false);
+    }
+
+    return true;
+  } else {
+    /*
+     * Where a stray has some links, they should be remotes, check
+     * if we can do anything with them if we happen to have them in
+     * cache.
+     */
+    eval_remote_stray(dn, NULL);
+    return false;
+  }
+}
+
+void StrayManager::eval_remote_stray(CDentry *stray_dn, CDentry *remote_dn)
+{
+  assert(stray_dn != NULL);
+  assert(stray_dn->get_dir()->get_inode()->is_stray());
+
+  /* If no remote_dn hinted, pick one arbitrarily */
+  if (remote_dn == NULL) {
+    CDentry::linkage_t *stray_dnl = stray_dn->get_projected_linkage();
+    assert(stray_dnl->is_primary());
+    CInode *stray_in = stray_dnl->get_inode();
+    assert(stray_in != NULL);
+    assert(stray_in->inode.nlink >= 1);
+
+    if (!stray_in->remote_parents.empty()) {
+      remote_dn = *stray_in->remote_parents.begin();
+    } else {
+      dout(20) << __func__
+        << ": not reintegrating (no remote parents in cache)" << dendl;
+      return;
+    }
+  }
+    // NOTE: we repeat this check in _rename(), since our submission path is racey.
+    if (!remote_dn->is_projected()) {
+      if (remote_dn->is_auth() && remote_dn->dir->can_auth_pin()) {
+        reintegrate_stray(stray_dn, remote_dn);
+      } else if (!remote_dn->is_auth() && stray_dn->is_auth()) {
+        migrate_stray(stray_dn, remote_dn->authority().first);
+      } else {
+        dout(20) << __func__ << ": not reintegrating" << dendl;
+      }
+    } else {
+      // don't do anything if the remote parent is projected, or we may
+      // break user-visible semantics!
+      dout(20) << __func__ << ": not reintegrating (projected)" << dendl;
+    }
+}
+
+
+
+/**
+ * When hard links exist to an inode whose primary dentry
+ * is unlinked, the inode gets a stray primary dentry.
+ *
+ * We may later "reintegrate" the inode into a remaining
+ * non-stray dentry (one of what was previously a remote
+ * dentry) by issuing a rename from the stray to the other
+ * dentry.
+ */
+void StrayManager::reintegrate_stray(CDentry *straydn, CDentry *rdn)
+{
+  dout(10) << __func__ << " " << *straydn << " into " << *rdn << dendl;
+
+  logger->inc(l_mdc_strays_reintegrated);
+  
+  // rename it to another mds.
+  filepath src;
+  straydn->make_path(src);
+  filepath dst;
+  rdn->make_path(dst);
+
+  MClientRequest *req = new MClientRequest(CEPH_MDS_OP_RENAME);
+  req->set_filepath(dst);
+  req->set_filepath2(src);
+  req->set_tid(mds->issue_tid());
+
+  mds->send_message_mds(req, rdn->authority().first);
+}
+
+/**
+ * Given a dentry within one of my stray directories,
+ * send it off to a stray directory in another MDS.
+ *
+ * This is for use:
+ *  * Case A: when shutting down a rank, we migrate strays
+ *    away from ourselves rather than waiting for purge
+ *  * Case B: when a client request has a trace that refers to
+ *    a stray inode on another MDS, we migrate that inode from
+ *    there to here, in order that we can later re-integrate it
+ *    here.
+ *
+ * In case B, the receiver should be calling into eval_stray
+ * on completion of mv (i.e. inode put), resulting in a subsequent
+ * reintegration.
+ */
+void StrayManager::migrate_stray(CDentry *dn, mds_rank_t to)
+{
+  CInode *in = dn->get_linkage()->get_inode();
+  assert(in);
+  CInode *diri = dn->dir->get_inode();
+  assert(diri->is_stray());
+  dout(10) << "migrate_stray from mds." << MDS_INO_STRAY_OWNER(diri->inode.ino)
+          << " to mds." << to
+          << " " << *dn << " " << *in << dendl;
+
+  logger->inc(l_mdc_strays_migrated);
+
+  // rename it to another mds.
+  filepath src;
+  dn->make_path(src);
+
+  string dname;
+  in->name_stray_dentry(dname);
+  filepath dst(dname, MDS_INO_STRAY(to, 0));
+
+  MClientRequest *req = new MClientRequest(CEPH_MDS_OP_RENAME);
+  req->set_filepath(dst);
+  req->set_filepath2(src);
+  req->set_tid(mds->issue_tid());
+
+  mds->send_message_mds(req, to);
+}
+
+  StrayManager::StrayManager(MDS *mds)
+  : delayed_eval_stray(member_offset(CDentry, item_stray)),
+    mds(mds), logger(NULL),
+    ops_in_flight(0), files_purging(0),
+    num_strays(0), num_strays_purging(0), num_strays_delayed(0)
+{
+  assert(mds != NULL);
+
+  assert(g_conf->mds_max_purge_ops >= g_conf->filer_max_purge_ops);
+}
+
+
+/**
+ * For any strays that are enqueued for purge, but
+ * currently blocked on throttling, clear their
+ * purging status.  Used during MDS rank shutdown
+ * so that it can migrate these strays instead
+ * of waiting for them to trickle through the
+ * queue.
+ */
+void StrayManager::abort_queue()
+{
+  for (std::list<QueuedStray>::iterator i = ready_for_purge.begin();
+       i != ready_for_purge.end(); ++i)
+  {
+    const QueuedStray &qs = *i;
+    CDentry *dn = qs.dn;
+    dout(10) << __func__ << ": aborting enqueued purge " << *dn << dendl;
+
+    CDentry::linkage_t *dnl = dn->get_projected_linkage();
+    assert(dnl);
+    CInode *in = dnl->get_inode();
+    assert(in);
+
+    // Clear flags set in enqueue
+    dn->state_clear(CDentry::STATE_PURGING);
+    dn->put(CDentry::PIN_PURGING);
+    in->state_clear(CInode::STATE_PURGING);
+  }
+  ready_for_purge.clear();
+}
+
+void StrayManager::truncate(CDentry *dn, uint32_t op_allowance)
+{
+  CDentry::linkage_t *dnl = dn->get_projected_linkage();
+  CInode *in = dnl->get_inode();
+  assert(in);
+  dout(10) << __func__ << ": " << *dn << " " << *in << dendl;
+  assert(!dn->is_replicated());
+
+  if (dn->item_stray.is_on_list()) {
+    dn->item_stray.remove_myself();
+    num_strays_delayed--;
+    logger->set(l_mdc_num_strays_delayed, num_strays_delayed);
+  }
+
+  num_strays_purging++;
+  logger->set(l_mdc_num_strays_purging, num_strays_purging);
+
+  C_GatherBuilder gather(
+    g_ceph_context,
+    new C_OnFinisher(new C_IO_PurgeStrayPurged(this, dn, true, 0),
+                    &mds->finisher));
+
+  SnapRealm *realm = in->find_snaprealm();
+  assert(realm);
+  dout(10) << " realm " << *realm << dendl;
+  const SnapContext *snapc = &realm->get_snap_context();
+
+  uint64_t period = (uint64_t)in->inode.layout.fl_object_size *
+                   (uint64_t)in->inode.layout.fl_stripe_count;
+  uint64_t to = in->inode.get_max_size();
+  to = MAX(in->inode.size, to);
+  // when truncating a file, the filer does not delete stripe objects that are
+  // truncated to zero. so we need to purge stripe objects up to the max size
+  // the file has ever been.
+  to = MAX(in->inode.max_size_ever, to);
+  if (period && to > period) {
+    uint64_t num = (to - 1) / period;
+    dout(10) << __func__ << " 0~" << to << " objects 0~" << num
+      << " snapc " << snapc << " on " << *in << dendl;
+    mds->filer->purge_range(in->ino(), &in->inode.layout, *snapc,
+                           1, num, ceph_clock_now(g_ceph_context),
+                           0, gather.new_sub());
+  }
+
+  // keep backtrace object
+  if (period && to > 0) {
+    mds->filer->zero(in->ino(), &in->inode.layout, *snapc,
+                    0, period, ceph_clock_now(g_ceph_context),
+                    0, true, NULL, gather.new_sub());
+  }
+
+  assert(gather.has_subs());
+  gather.activate();
+}
+
+
+/**
+ * Callback: we have logged the update to an inode's metadata
+ * reflecting it's newly-zeroed length.
+ */
+void StrayManager::_truncate_stray_logged(CDentry *dn, LogSegment *ls)
+{
+  CInode *in = dn->get_projected_linkage()->get_inode();
+
+  dout(10) << __func__ << ": " << *dn << " " << *in << dendl;
+
+  dn->state_clear(CDentry::STATE_PURGING);
+  dn->put(CDentry::PIN_PURGING);
+
+  in->pop_and_dirty_projected_inode(ls);
+
+  eval_stray(dn);
+}
+
diff --git a/src/mds/StrayManager.h b/src/mds/StrayManager.h
new file mode 100644 (file)
index 0000000..f10e424
--- /dev/null
@@ -0,0 +1,92 @@
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2015 Red Hat
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software 
+ * Foundation.  See file COPYING.
+ * 
+ */
+
+#ifndef STRAY_MANAGER_H
+#define STRAY_MANAGER_H
+
+#include "include/elist.h"
+#include <list>
+
+class MDS;
+class PerfCounters;
+class CInode;
+class CDentry;
+
+class StrayManager
+{
+  protected:
+  class QueuedStray {
+    public:
+    CDentry *dn;
+    bool trunc;
+    uint32_t ops_required;
+    QueuedStray(CDentry *dn_, bool t, uint32_t ops)
+      : dn(dn_), trunc(t), ops_required(ops) {}
+  };
+
+  // Has passed through eval_stray and still has refs
+  elist<CDentry*> delayed_eval_stray;
+
+  // No more refs, can purge these
+  std::list<QueuedStray> ready_for_purge;
+
+  // Global references for doing I/O
+  MDS *mds;
+  PerfCounters *logger;
+
+  // Throttled allowances
+  uint64_t ops_in_flight;
+  uint64_t files_purging;
+
+  // Statistics
+  uint64_t num_strays;
+  uint64_t num_strays_purging;
+  uint64_t num_strays_delayed;
+
+  void truncate(CDentry *dn, uint32_t op_allowance);
+  void purge(CDentry *dn, uint32_t op_allowance);
+  void _purge_stray_purged(CDentry *dn, uint32_t ops, bool only_head);
+  void _purge_stray_logged(CDentry *dn, version_t pdv, LogSegment *ls);
+  void _truncate_stray_logged(CDentry *dn, LogSegment *ls);
+
+  friend class StrayManagerIOContext;
+  friend class StrayManagerContext;
+
+  friend class C_PurgeStrayLogged;
+  friend class C_TruncateStrayLogged;
+  friend class C_IO_PurgeStrayPurged;
+
+  void _advance();
+  bool _consume(CDentry *dn, bool trunc, uint32_t ops_required);
+  uint32_t _calculate_ops_required(CInode *in, bool trunc);
+
+  void reintegrate_stray(CDentry *dn, CDentry *rlink);
+
+
+  // My public interface is for consumption by MDCache
+  public:
+
+  void enqueue(CDentry *dn, bool trunc);
+  void advance_delayed();
+  bool eval_stray(CDentry *dn, bool delay=false);
+  void eval_remote_stray(CDentry *stray_dn, CDentry *remote_dn=NULL);
+  void migrate_stray(CDentry *dn, mds_rank_t dest);
+
+  StrayManager(MDS *mds);
+  void set_logger(PerfCounters *l) {logger = l;}
+  void notify_stray_created();
+  void notify_stray_removed();
+  void abort_queue();
+};
+
+#endif  // STRAY_MANAGER_H