]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
mds: use a persistent queue for purging deleted files
authorJohn Spray <john.spray@redhat.com>
Thu, 1 Dec 2016 20:22:43 +0000 (20:22 +0000)
committerJohn Spray <john.spray@redhat.com>
Wed, 8 Mar 2017 10:20:57 +0000 (10:20 +0000)
To avoid creating stray directories of unbounded size
and all the associated pain, use a more appropriate
datastructure to store a FIFO of inodes that need
purging.

Fixes: http://tracker.ceph.com/issues/11950
Signed-off-by: John Spray <john.spray@redhat.com>
src/mds/CMakeLists.txt
src/mds/MDCache.cc
src/mds/MDCache.h
src/mds/MDSRank.cc
src/mds/PurgeQueue.cc [new file with mode: 0644]
src/mds/PurgeQueue.h [new file with mode: 0644]
src/mds/StrayManager.cc
src/mds/StrayManager.h
src/mds/mdstypes.h

index 72d370992a65587850296302da58b62bddd1cefd..b7f7d0c51154c128f87a546a26539ec432e421e6 100644 (file)
@@ -11,6 +11,7 @@ set(mds_srcs
   MDCache.cc
   RecoveryQueue.cc
   StrayManager.cc
+  PurgeQueue.cc
   Locker.cc
   Migrator.cc
   MDBalancer.cc
index 37c42f40ddbd5c346bd0ed0ecb1f03bd8c0aa573..385b12e2e1925c2aea3bca78415e449afb739706 100644 (file)
@@ -461,6 +461,8 @@ void MDCache::create_mydir_hierarchy(MDSGather *gather)
   mydir->commit(0, gather->new_sub());
 
   myin->store(gather->new_sub());
+
+  stray_manager.purge_queue.create(new C_IO_Wrapper(mds, gather->new_sub()));
 }
 
 struct C_MDC_CreateSystemFile : public MDCacheLogContext {
@@ -584,8 +586,15 @@ void MDCache::open_root_inode(MDSInternalContextBase *c)
 
 void MDCache::open_mydir_inode(MDSInternalContextBase *c)
 {
+  MDSGatherBuilder gather(g_ceph_context);
+
   CInode *in = create_system_inode(MDS_INO_MDSDIR(mds->get_nodeid()), S_IFDIR|0755);  // initially inaccurate!
-  in->fetch(c);
+  in->fetch(gather.new_sub());
+
+  stray_manager.purge_queue.open(new C_IO_Wrapper(mds, gather.new_sub()));
+
+  gather.set_finisher(c);
+  gather.activate();
 }
 
 void MDCache::open_root()
index 3e49b4020e91bdb79c28c3d8e3b6413ce23e8673..a8e174cd78114d2a90d48b06867003d961fcc7d6 100644 (file)
@@ -73,7 +73,7 @@ enum {
   l_mdc_first = 3000,
   // How many inodes currently in stray dentries
   l_mdc_num_strays,
-  // How many stray dentries are currently being purged
+  // How many stray dentries are currently enqueued for purge
   l_mdc_num_strays_purging,
   // How many stray dentries are currently delayed for purge due to refs
   l_mdc_num_strays_delayed,
index e7da0aec4aaaae4634da05be12469185693f9c11..f87cb40ee60b4ccb13188f47082c899284816f4c 100644 (file)
@@ -159,6 +159,8 @@ void MDSRankDispatcher::init()
 
   progress_thread.create("mds_rank_progr");
 
+  mdcache->stray_manager.purge_queue.init();
+
   finisher->start();
 }
 
@@ -239,6 +241,8 @@ void MDSRankDispatcher::shutdown()
   // shut down cache
   mdcache->shutdown();
 
+  mdcache->stray_manager.purge_queue.shutdown();
+
   if (objecter->initialized.read())
     objecter->shutdown();
 
diff --git a/src/mds/PurgeQueue.cc b/src/mds/PurgeQueue.cc
new file mode 100644 (file)
index 0000000..56f5e20
--- /dev/null
@@ -0,0 +1,284 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2015 Red Hat
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software 
+ * Foundation.  See file COPYING.
+ * 
+ */
+
+#include "common/debug.h"
+#include "mds/mdstypes.h"
+#include "mds/CInode.h"
+
+#include "PurgeQueue.h"
+
+
+#define dout_context cct
+#define dout_subsys ceph_subsys_mds
+#undef dout_prefix
+#define dout_prefix _prefix(_dout, rank) << __func__ << ": "
+static ostream& _prefix(std::ostream *_dout, mds_rank_t rank) {
+  return *_dout << "mds." << rank << ".purge_queue ";
+}
+
+void PurgeItem::encode(bufferlist &bl) const
+{
+  ENCODE_START(1, 1, bl);
+  ::encode(ino, bl);
+  ::encode(size, bl);
+  ::encode(layout, bl, CEPH_FEATURE_FS_FILE_LAYOUT_V2);
+  ::encode(old_pools, bl);
+  ::encode(snapc, bl);
+  ENCODE_FINISH(bl);
+}
+
+void PurgeItem::decode(bufferlist::iterator &p)
+{
+  DECODE_START(1, p);
+  ::decode(ino, p);
+  ::decode(size, p);
+  ::decode(layout, p);
+  ::decode(old_pools, p);
+  ::decode(snapc, p);
+  DECODE_FINISH(p);
+}
+
+// TODO: implement purge queue creation on startup
+// if we are on a filesystem created before purge queues existed
+// TODO: ensure that a deactivating MDS rank blocks
+// on complete drain of this queue before finishing
+// TODO: when we're deactivating, lift all limits on
+// how many OSD ops we're allowed to emit at a time to
+// race through the queue as fast as we can.
+// TODO: populate logger here to gather latency stat?
+//       ...and a stat for the size of the queue, if we can
+//       somehow track that?  Could do an initial pass through
+//       the whole queue to count the items at startup?
+// TODO: there is absolutely no reason to consume an inode number
+// for this.  Shoudl just give objects a string name with a rank
+// suffix, like we do for MDSTables.  Requires a little refactor
+// of Journaler.
+PurgeQueue::PurgeQueue(
+      CephContext *cct_,
+      mds_rank_t rank_,
+      const int64_t metadata_pool_,
+      Objecter *objecter_)
+  :
+    cct(cct_),
+    rank(rank_),
+    lock("PurgeQueue"),
+    metadata_pool(metadata_pool_),
+    finisher(cct, "PurgeQueue", "PQ_Finisher"),
+    timer(cct, lock),
+    filer(objecter_, &finisher),
+    objecter(objecter_),
+    journaler("pq", MDS_INO_PURGE_QUEUE + rank, metadata_pool,
+      CEPH_FS_ONDISK_MAGIC, objecter_, nullptr, 0, &timer,
+      &finisher)
+{
+}
+
+void PurgeQueue::init()
+{
+  Mutex::Locker l(lock);
+
+  finisher.start();
+  timer.init();
+}
+
+void PurgeQueue::shutdown()
+{
+  Mutex::Locker l(lock);
+
+  journaler.shutdown();
+  timer.shutdown();
+  finisher.stop();
+}
+
+void PurgeQueue::open(Context *completion)
+{
+  dout(4) << "opening" << dendl;
+
+  Mutex::Locker l(lock);
+
+  journaler.recover(new FunctionContext([this, completion](int r){
+    Mutex::Locker l(lock);
+    dout(4) << "open complete" << dendl;
+    if (r == 0) {
+      journaler.set_writeable();
+    }
+    completion->complete(r);
+  }));
+}
+
+void PurgeQueue::create(Context *fin)
+{
+  dout(4) << "creating" << dendl;
+  Mutex::Locker l(lock);
+
+  file_layout_t layout = file_layout_t::get_default();
+  layout.pool_id = metadata_pool;
+  journaler.set_writeable();
+  journaler.create(&layout, JOURNAL_FORMAT_RESILIENT);
+  journaler.write_head(fin);
+}
+
+void PurgeQueue::push(const PurgeItem &pi, Context *completion)
+{
+  dout(4) << "pushing inode 0x" << std::hex << pi.ino << std::dec << dendl;
+  Mutex::Locker l(lock);
+
+  // Callers should have waited for open() before using us
+  assert(!journaler.is_readonly());
+
+  bufferlist bl;
+
+  ::encode(pi, bl);
+  journaler.append_entry(bl);
+
+  // Note that flush calls are not 1:1 with IOs, Journaler
+  // does its own batching.  So we just call every time.
+  // FIXME: *actually* as soon as we call _consume it will
+  // do a flush via _issue_read, so we really are doing one
+  // write per event.  Avoid this by avoiding doing the journaler
+  // read (see "if we could consume this PurgeItem immediately...")
+  journaler.flush(completion);
+
+  // Maybe go ahead and do something with it right away
+  _consume();
+
+  // TODO: if we could consume this PurgeItem immediately, and
+  // Journaler does not have any outstanding prefetches, then
+  // short circuit its read by advancing read_pos to write_pos
+  // and passing the PurgeItem straight into _execute_item
+}
+
+bool PurgeQueue::can_consume()
+{
+  // TODO: enforce limits (currently just allowing one in flight)
+  if (in_flight.size() > 0) {
+    return false;
+  } else {
+    return true;
+  }
+}
+
+void PurgeQueue::_consume()
+{
+  assert(lock.is_locked_by_me());
+
+  // Because we are the writer and the reader of the journal
+  // via the same Journaler instance, we never need to reread_head
+  
+  if (!can_consume()) {
+    dout(10) << " cannot consume right now" << dendl;
+
+    return;
+  }
+
+  if (!journaler.is_readable()) {
+    dout(10) << " not readable right now" << dendl;
+    if (!journaler.have_waiter()) {
+      journaler.wait_for_readable(new FunctionContext([this](int r) {
+        Mutex::Locker l(lock);
+        if (r == 0) {
+          _consume();
+        }
+      }));
+    }
+
+    return;
+  }
+
+  // The journaler is readable: consume an entry
+  bufferlist bl;
+  bool readable = journaler.try_read_entry(bl);
+  assert(readable);  // we checked earlier
+
+  dout(20) << " decoding entry" << dendl;
+  PurgeItem item;
+  bufferlist::iterator q = bl.begin();
+  ::decode(item, q);
+  dout(20) << " executing item (0x" << std::hex << item.ino
+           << std::dec << ")" << dendl;
+  _execute_item(item, journaler.get_read_pos());
+}
+
+void PurgeQueue::_execute_item(
+    const PurgeItem &item,
+    uint64_t expire_to)
+{
+  assert(lock.is_locked_by_me());
+
+  in_flight[expire_to] = item;
+
+  // TODO: handle things other than normal file purges
+  // (directories, snapshot truncates)
+  C_GatherBuilder gather(cct);
+  if (item.size > 0) {
+    uint64_t num = Striper::get_num_objects(item.layout, item.size);
+    dout(10) << " 0~" << item.size << " objects 0~" << num
+             << " snapc " << item.snapc << " on " << item.ino << dendl;
+    filer.purge_range(item.ino, &item.layout, item.snapc,
+                      0, num, ceph::real_clock::now(), 0,
+                      gather.new_sub());
+  }
+
+  // remove the backtrace object if it was not purged
+  object_t oid = CInode::get_object_name(item.ino, frag_t(), "");
+  if (!gather.has_subs() || !item.layout.pool_ns.empty()) {
+    object_locator_t oloc(item.layout.pool_id);
+    dout(10) << " remove backtrace object " << oid
+            << " pool " << oloc.pool << " snapc " << item.snapc << dendl;
+    objecter->remove(oid, oloc, item.snapc,
+                         ceph::real_clock::now(), 0,
+                         NULL, gather.new_sub());
+  }
+
+  // remove old backtrace objects
+  for (const auto &p : item.old_pools) {
+    object_locator_t oloc(p);
+    dout(10) << " remove backtrace object " << oid
+            << " old pool " << p << " snapc " << item.snapc << dendl;
+    objecter->remove(oid, oloc, item.snapc,
+                         ceph::real_clock::now(), 0,
+                         NULL, gather.new_sub());
+  }
+  assert(gather.has_subs());
+
+  gather.set_finisher(new FunctionContext([this, expire_to](int r){
+    execute_item_complete(expire_to);
+  }));
+  gather.activate();
+}
+
+void PurgeQueue::execute_item_complete(
+    uint64_t expire_to)
+{
+  dout(10) << "complete at 0x" << std::hex << expire_to << std::dec << dendl;
+  Mutex::Locker l(lock);
+  assert(in_flight.count(expire_to) == 1);
+
+  auto iter = in_flight.find(expire_to);
+  assert(iter != in_flight.end());
+  if (iter == in_flight.begin()) {
+    // This was the lowest journal position in flight, so we can now
+    // safely expire the journal up to here.
+    journaler.set_expire_pos(expire_to);
+    journaler.trim();
+  }
+
+  dout(10) << "completed item for ino 0x" << std::hex << iter->second.ino
+           << std::dec << dendl;
+
+  in_flight.erase(iter);
+
+  _consume();
+}
+
diff --git a/src/mds/PurgeQueue.h b/src/mds/PurgeQueue.h
new file mode 100644 (file)
index 0000000..e94759a
--- /dev/null
@@ -0,0 +1,101 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2015 Red Hat
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software 
+ * Foundation.  See file COPYING.
+ * 
+ */
+
+#ifndef PURGE_QUEUE_H_
+#define PURGE_QUEUE_H_
+
+#include "include/compact_set.h"
+#include "osdc/Journaler.h"
+
+
+class PurgeItem
+{
+public:
+  inodeno_t ino;
+  uint64_t size;
+  file_layout_t layout;
+  compact_set<int64_t> old_pools;
+  SnapContext snapc;
+
+  PurgeItem()
+   : ino(0), size(0)
+  {}
+
+  void encode(bufferlist &bl) const;
+  void decode(bufferlist::iterator &p);
+};
+WRITE_CLASS_ENCODER(PurgeItem)
+
+/**
+ * Note that this class does not take a reference to MDSRank: we are
+ * independent of all the metadata structures and do not need to
+ * take mds_lock for anything.
+ */
+class PurgeQueue
+{
+protected:
+  CephContext *cct;
+  const mds_rank_t rank;
+  Mutex lock;
+
+  int64_t metadata_pool;
+
+  // Don't use the MDSDaemon's Finisher and Timer, because this class
+  // operates outside of MDSDaemon::mds_lock
+  Finisher finisher;
+  SafeTimer timer;
+  Filer filer;
+  Objecter *objecter;
+  Journaler journaler;
+
+  std::map<uint64_t, PurgeItem> in_flight;
+
+  //PerfCounters *logger;
+
+  bool can_consume();
+
+  void _consume();
+
+  void _execute_item(
+      const PurgeItem &item,
+      uint64_t expire_to);
+  void execute_item_complete(
+      uint64_t expire_to);
+
+public:
+  void init();
+  void shutdown();
+
+  // Write an empty queue, use this during MDS rank creation
+  void create(Context *completion);
+
+  // Read the Journaler header for an existing queue and start consuming
+  void open(Context *completion);
+
+  // Submit one entry to the work queue.  Call back when it is persisted
+  // to the queue (there is no callback for when it is executed)
+  void push(const PurgeItem &pi, Context *completion);
+
+  PurgeQueue(
+      CephContext *cct_,
+      mds_rank_t rank_,
+      const int64_t metadata_pool_,
+      Objecter *objecter_);
+  ~PurgeQueue()
+  {}
+};
+
+
+#endif
+
index de2955a55345bc05c13509e17bc5543de1847ca9..e16d2f9932e50cb4d95d15c2a070ead88d189aa5 100644 (file)
@@ -138,45 +138,27 @@ void StrayManager::purge(CDentry *dn, uint32_t op_allowance)
     assert(in->last == CEPH_NOSNAP);
   }
 
+  uint64_t to = 0;
   if (in->is_file()) {
-    uint64_t to = in->inode.get_max_size();
+    to = in->inode.get_max_size();
     to = MAX(in->inode.size, to);
     // when truncating a file, the filer does not delete stripe objects that are
     // truncated to zero. so we need to purge stripe objects up to the max size
     // the file has ever been.
     to = MAX(in->inode.max_size_ever, to);
-    if (to > 0) {
-      uint64_t num = Striper::get_num_objects(in->inode.layout, to);
-      dout(10) << __func__ << " 0~" << to << " objects 0~" << num
-              << " snapc " << snapc << " on " << *in << dendl;
-      filer.purge_range(in->inode.ino, &in->inode.layout, *snapc,
-                       0, num, ceph::real_clock::now(), 0,
-                       gather.new_sub());
-    }
   }
 
   inode_t *pi = in->get_projected_inode();
-  object_t oid = CInode::get_object_name(pi->ino, frag_t(), "");
-  // remove the backtrace object if it was not purged
-  if (!gather.has_subs() || !pi->layout.pool_ns.empty()) {
-    object_locator_t oloc(pi->layout.pool_id);
-    dout(10) << __func__ << " remove backtrace object " << oid
-            << " pool " << oloc.pool << " snapc " << snapc << dendl;
-    mds->objecter->remove(oid, oloc, *snapc,
-                         ceph::real_clock::now(), 0,
-                         gather.new_sub());
-  }
-  // remove old backtrace objects
-  for (compact_set<int64_t>::iterator p = pi->old_pools.begin();
-       p != pi->old_pools.end();
-       ++p) {
-    object_locator_t oloc(*p);
-    dout(10) << __func__ << " remove backtrace object " << oid
-            << " old pool " << *p << " snapc " << snapc << dendl;
-    mds->objecter->remove(oid, oloc, *snapc,
-                         ceph::real_clock::now(), 0,
-                         gather.new_sub());
-  }
+
+  PurgeItem item;
+  item.ino = in->inode.ino;
+  item.size = to;
+  item.layout = pi->layout;
+  item.old_pools = pi->old_pools;
+  item.snapc = *snapc;
+
+  purge_queue.push(item, gather.new_sub());
+
   assert(gather.has_subs());
   gather.activate();
 }
@@ -862,7 +844,9 @@ StrayManager::StrayManager(MDSRank *mds)
     ops_in_flight(0), files_purging(0),
     max_purge_ops(0), 
     num_strays(0), num_strays_purging(0), num_strays_delayed(0),
-    filer(mds->objecter, mds->finisher)
+    filer(mds->objecter, mds->finisher),
+    purge_queue(g_ceph_context, mds->get_nodeid(),
+        mds->mdsmap->get_metadata_pool(), mds->objecter)
 {
   assert(mds != NULL);
 }
index d58ebfbbd73b555a59dc5009d95d5f68e935bdc4..65017798e8be0f8d51dfb9423f479163c3b6781e 100644 (file)
@@ -17,6 +17,7 @@
 #include "include/elist.h"
 #include <list>
 #include "osdc/Filer.h"
+#include "mds/PurgeQueue.h"
 
 class MDSRank;
 class PerfCounters;
@@ -72,6 +73,8 @@ class StrayManager
 
   Filer filer;
 
+  PurgeQueue purge_queue;
+
   void truncate(CDentry *dn, uint32_t op_allowance);
 
   /**
@@ -95,6 +98,13 @@ class StrayManager
    */
   void _truncate_stray_logged(CDentry *dn, LogSegment *ls);
 
+  // FIXME: doing this to let MDCache call through into purgequeue
+  // for initialization and teardown
+  // >>
+  friend class MDCache;
+  friend class MDSRankDispatcher;
+  // <<
+
   friend class StrayManagerIOContext;
   friend class StrayManagerLogContext;
   friend class StrayManagerContext;
index 9a6779307d686c1da7f4e6e505d8fa511f1c4346..ed7067fc25bfcbff0b5eb43a594d38279595f32a 100644 (file)
@@ -52,6 +52,7 @@
 #define MDS_INO_LOG_OFFSET        (2*MAX_MDS)
 #define MDS_INO_LOG_BACKUP_OFFSET (3*MAX_MDS)
 #define MDS_INO_LOG_POINTER_OFFSET    (4*MAX_MDS)
+#define MDS_INO_PURGE_QUEUE       (5*MAX_MDS)
 
 #define MDS_INO_SYSTEM_BASE       ((6*MAX_MDS) + (MAX_MDS * NUM_STRAY))