]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
mds: implement PurgeQueue throttling
authorJohn Spray <john.spray@redhat.com>
Fri, 23 Dec 2016 18:00:17 +0000 (18:00 +0000)
committerJohn Spray <john.spray@redhat.com>
Wed, 8 Mar 2017 10:20:59 +0000 (10:20 +0000)
Signed-off-by: John Spray <john.spray@redhat.com>
src/mds/MDSDaemon.cc
src/mds/PurgeQueue.cc
src/mds/PurgeQueue.h

index 69b76f3e6f6b9d09f02d7635e9d4113b4e3e9fd4..ff5890b56b308c2bb6f66ec95cb190881a36515c 100644 (file)
@@ -352,9 +352,10 @@ const char** MDSDaemon::get_tracked_conf_keys() const
     "clog_to_syslog",
     "clog_to_syslog_facility",
     "clog_to_syslog_level",
-    // StrayManager
+    // PurgeQueue
     "mds_max_purge_ops",
     "mds_max_purge_ops_per_pg",
+    "mds_max_purge_files",
     "clog_to_graylog",
     "clog_to_graylog_host",
     "clog_to_graylog_port",
index 5c4ba15ab3fb855f820d6fadcadd70863ac78ee5..7e57eabc7fa81bc23c2d75a1f561b5c65af96274 100644 (file)
@@ -56,8 +56,6 @@ void PurgeItem::decode(bufferlist::iterator &p)
 
 // TODO: implement purge queue creation on startup
 // if we are on a filesystem created before purge queues existed
-// TODO: ensure that a deactivating MDS rank blocks
-// on complete drain of this queue before finishing
 // TODO: when we're deactivating, lift all limits on
 // how many OSD ops we're allowed to emit at a time to
 // race through the queue as fast as we can.
@@ -79,7 +77,9 @@ PurgeQueue::PurgeQueue(
     objecter(objecter_),
     journaler("pq", MDS_INO_PURGE_QUEUE + rank, metadata_pool,
       CEPH_FS_ONDISK_MAGIC, objecter_, nullptr, 0, &timer,
-      &finisher)
+      &finisher),
+    ops_in_flight(0),
+    max_purge_ops(0)
 {
 }
 
@@ -94,10 +94,9 @@ void PurgeQueue::create_logger()
 {
   PerfCountersBuilder pcb(g_ceph_context,
           "purge_queue", l_pq_first, l_pq_last);
-    //pcb.add_u64(l_mdc_num_strays_purging, "num_strays_purging", "Stray dentries purging");
-   // pcb.add_u64(l_mdc_num_purge_ops, "num_purge_ops", "Purge operations");
+  pcb.add_u64(l_pq_executing_ops, "pq_executing_ops", "Purge queue ops in flight");
   pcb.add_u64(l_pq_executing, "pq_executing", "Purge queue tasks in flight");
-  pcb.add_u64_counter(l_pq_executed, "pq_executed", "Purge queue tasks executed");
+  pcb.add_u64_counter(l_pq_executed, "pq_executed", "Purge queue tasks executed", "purg");
 
   logger.reset(pcb.create_perf_counters());
   g_ceph_context->get_perfcounters_collection()->add(logger.get());
@@ -168,74 +167,65 @@ void PurgeQueue::push(const PurgeItem &pi, Context *completion)
 
   // Note that flush calls are not 1:1 with IOs, Journaler
   // does its own batching.  So we just call every time.
-  // FIXME: *actually* as soon as we call _consume it will
-  // do a flush via _issue_read, so we really are doing one
-  // write per event.  Avoid this by avoiding doing the journaler
-  // read (see "if we could consume this PurgeItem immediately...")
   journaler.flush(completion);
 
   // Maybe go ahead and do something with it right away
   _consume();
-
-  // TODO: if we could consume this PurgeItem immediately, and
-  // Journaler does not have any outstanding prefetches, then
-  // short circuit its read by advancing read_pos to write_pos
-  // and passing the PurgeItem straight into _execute_item
 }
 
-#if 0
-uint32_t StrayManager::_calculate_ops_required(CInode *in, bool trunc)
+uint32_t PurgeQueue::_calculate_ops(const PurgeItem &item) const
 {
   uint32_t ops_required = 0;
-  if (in->is_dir()) {
+  if (item.action == PurgeItem::PURGE_DIR) {
     // Directory, count dirfrags to be deleted
     std::list<frag_t> ls;
-    if (!in->dirfragtree.is_leaf(frag_t())) {
-      in->dirfragtree.get_leaves(ls);
+    if (!item.fragtree.is_leaf(frag_t())) {
+      item.fragtree.get_leaves(ls);
     }
     // One for the root, plus any leaves
     ops_required = 1 + ls.size();
   } else {
     // File, work out concurrent Filer::purge deletes
-    const uint64_t to = MAX(in->inode.max_size_ever,
-            MAX(in->inode.size, in->inode.get_max_size()));
+    const uint64_t num = (item.size > 0) ?
+      Striper::get_num_objects(item.layout, item.size) : 1;
 
-    const uint64_t num = (to > 0) ? Striper::get_num_objects(in->inode.layout, to) : 1;
     ops_required = MIN(num, g_conf->filer_max_purge_ops);
 
     // Account for removing (or zeroing) backtrace
     ops_required += 1;
 
     // Account for deletions for old pools
-    if (!trunc) {
-      ops_required += in->get_projected_inode()->old_pools.size();
+    if (item.action != PurgeItem::TRUNCATE_FILE) {
+      ops_required += item.old_pools.size();
     }
   }
 
   return ops_required;
 }
-#endif
 
 bool PurgeQueue::can_consume()
 {
-#if 0
-  // Calculate how much of the ops allowance is available, allowing
-  // for the case where the limit is currently being exceeded.
-  uint32_t ops_avail;
-  if (ops_in_flight <= max_purge_ops) {
-    ops_avail = max_purge_ops - ops_in_flight;
-  } else {
-    ops_avail = 0;
+  dout(20) << ops_in_flight << "/" << max_purge_ops << " ops, "
+           << in_flight.size() << "/" << g_conf->mds_max_purge_files
+           << " files" << dendl;
+
+  if (in_flight.size() == 0 && cct->_conf->mds_max_purge_files > 0) {
+    // Always permit consumption if nothing is in flight, so that the ops
+    // limit can never be so low as to forbid all progress (unless
+    // administrator has deliberately paused purging by setting max
+    // purge files to zero).
+    return true;
   }
 
-  dout(10) << __func__ << ": allocating allowance "
-    << ops_required << " to " << ops_in_flight << " in flight" << dendl;
-
-  logger->set(l_mdc_num_purge_ops, ops_in_flight);
-#endif
+  if (ops_in_flight >= max_purge_ops) {
+    dout(20) << "Throttling on op limit " << ops_in_flight << "/"
+             << max_purge_ops << dendl;
+    return false;
+  }
 
-  // TODO: enforce limits (currently just allowing one in flight)
-  if (in_flight.size() > cct->_conf->mds_max_purge_files) {
+  if (in_flight.size() >= cct->_conf->mds_max_purge_files) {
+    dout(20) << "Throttling on item limit " << in_flight.size()
+             << "/" << cct->_conf->mds_max_purge_files << dendl;
     return false;
   } else {
     return true;
@@ -246,41 +236,38 @@ void PurgeQueue::_consume()
 {
   assert(lock.is_locked_by_me());
 
-  // Because we are the writer and the reader of the journal
-  // via the same Journaler instance, we never need to reread_head
-  
-  if (!can_consume()) {
-    dout(10) << " cannot consume right now" << dendl;
-
-    return;
-  }
+  while(can_consume()) {
+    if (!journaler.is_readable()) {
+      dout(10) << " not readable right now" << dendl;
+      // Because we are the writer and the reader of the journal
+      // via the same Journaler instance, we never need to reread_head
+      if (!journaler.have_waiter()) {
+        journaler.wait_for_readable(new FunctionContext([this](int r) {
+          Mutex::Locker l(lock);
+          if (r == 0) {
+            _consume();
+          }
+        }));
+      }
 
-  if (!journaler.is_readable()) {
-    dout(10) << " not readable right now" << dendl;
-    if (!journaler.have_waiter()) {
-      journaler.wait_for_readable(new FunctionContext([this](int r) {
-        Mutex::Locker l(lock);
-        if (r == 0) {
-          _consume();
-        }
-      }));
+      return;
     }
 
-    return;
+    // The journaler is readable: consume an entry
+    bufferlist bl;
+    bool readable = journaler.try_read_entry(bl);
+    assert(readable);  // we checked earlier
+
+    dout(20) << " decoding entry" << dendl;
+    PurgeItem item;
+    bufferlist::iterator q = bl.begin();
+    ::decode(item, q);
+    dout(20) << " executing item (0x" << std::hex << item.ino
+             << std::dec << ")" << dendl;
+    _execute_item(item, journaler.get_read_pos());
   }
 
-  // The journaler is readable: consume an entry
-  bufferlist bl;
-  bool readable = journaler.try_read_entry(bl);
-  assert(readable);  // we checked earlier
-
-  dout(20) << " decoding entry" << dendl;
-  PurgeItem item;
-  bufferlist::iterator q = bl.begin();
-  ::decode(item, q);
-  dout(20) << " executing item (0x" << std::hex << item.ino
-           << std::dec << ")" << dendl;
-  _execute_item(item, journaler.get_read_pos());
+  dout(10) << " cannot consume right now" << dendl;
 }
 
 void PurgeQueue::_execute_item(
@@ -291,6 +278,8 @@ void PurgeQueue::_execute_item(
 
   in_flight[expire_to] = item;
   logger->set(l_pq_executing, in_flight.size());
+  ops_in_flight += _calculate_ops(item);
+  logger->set(l_pq_executing_ops, ops_in_flight);
 
   SnapContext nullsnapc;
 
@@ -387,19 +376,12 @@ void PurgeQueue::execute_item_complete(
   dout(10) << "completed item for ino 0x" << std::hex << iter->second.ino
            << std::dec << dendl;
 
+  ops_in_flight -= _calculate_ops(iter->second);
+  logger->set(l_pq_executing_ops, ops_in_flight);
+
   in_flight.erase(iter);
   logger->set(l_pq_executing, in_flight.size());
 
-#if 0
-  // Release resources
-  dout(10) << __func__ << ": decrementing op allowance "
-    << ops_allowance << " from " << ops_in_flight << " in flight" << dendl;
-  assert(ops_in_flight >= ops_allowance);
-  ops_in_flight -= ops_allowance;
-  logger->set(l_mdc_num_purge_ops, ops_in_flight);
-  files_purging -= 1;
-#endif
-
   logger->inc(l_pq_executed);
 
   _consume();
@@ -443,6 +425,25 @@ void PurgeQueue::handle_conf_change(const struct md_config_t *conf,
   if (changed.count("mds_max_purge_ops")
       || changed.count("mds_max_purge_ops_per_pg")) {
     update_op_limit(mds_map);
+  } else if (changed.count("mds_max_purge_files")) {
+    Mutex::Locker l(lock);
+
+    if (in_flight.empty()) {
+      // We might have gone from zero to a finite limit, so
+      // might need to kick off consume.
+      dout(4) << "maybe start work again (max_purge_files="
+              << conf->mds_max_purge_files << dendl;
+      finisher.queue(new FunctionContext([this](int r){
+        Mutex::Locker l(lock);
+        _consume();
+      }));
+    }
   }
 }
 
+bool PurgeQueue::is_idle() const
+{
+  return in_flight.empty() && (
+      journaler.get_read_pos() == journaler.get_write_pos());
+}
+
index c66a3b0a3766b6b93faf347f92b888c2b61f3f04..b235536745ba701660b0dad967257439414ac63e 100644 (file)
@@ -55,14 +55,9 @@ WRITE_CLASS_ENCODER(PurgeItem)
 
 enum {
   l_pq_first = 3500,
-  // >> TODO populate from PurgeQueue
-  // How many stray dentries are currently enqueued for purge
-  //l_mdc_num_strays_purging,
-  // How many purge RADOS ops might currently be in flight?
-  //l_mdc_num_purge_ops,
-  // << TODO
 
   // How many items have been finished by PurgeQueue
+  l_pq_executing_ops,
   l_pq_executing,
   l_pq_executed,
   l_pq_last
@@ -100,11 +95,11 @@ protected:
 
   // Throttled allowances
   uint64_t ops_in_flight;
-  uint64_t files_purging;
 
   // Dynamic op limit per MDS based on PG count
   uint64_t max_purge_ops;
 
+  uint32_t _calculate_ops(const PurgeItem &item) const;
 
   bool can_consume();
 
@@ -132,6 +127,10 @@ public:
   // to the queue (there is no callback for when it is executed)
   void push(const PurgeItem &pi, Context *completion);
 
+  // If the on-disk queue is empty and we are not currently processing
+  // anything.
+  bool is_idle() const;
+
   void update_op_limit(const MDSMap &mds_map);
 
   void handle_conf_change(const struct md_config_t *conf,