]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mds: make purge_queue delete objects asynchronously and keep accepting pushes 37095/head
authorYanhu Cao <gmayyyha@gmail.com>
Fri, 11 Sep 2020 01:43:49 +0000 (09:43 +0800)
committerroot <root@cld-unknown397880.i.nease.net>
Wed, 23 Dec 2020 02:57:16 +0000 (10:57 +0800)
Signed-off-by: Yanhu Cao <gmayyyha@gmail.com>
src/mds/PurgeQueue.cc
src/mds/PurgeQueue.h

index 71cc929aecc31560ea4d0075c21e0756f60c1016..8dda909f73dfe20c3f98cac2f9baafba054277ac 100644 (file)
@@ -98,8 +98,8 @@ void PurgeItem::decode(bufferlist::const_iterator &p)
   DECODE_FINISH(p);
 }
 
-// TODO: if Objecter has any slow requests, take that as a hint and
-// slow down our rate of purging (keep accepting pushes though)
+// if Objecter has any slow requests, take that as a hint and
+// slow down our rate of purging
 PurgeQueue::PurgeQueue(
       CephContext *cct_,
       mds_rank_t rank_,
@@ -495,6 +495,99 @@ bool PurgeQueue::_consume()
   return could_consume;
 }
 
+class C_IO_PurgeItem_Commit : public Context {
+public:
+  C_IO_PurgeItem_Commit(PurgeQueue *pq, std::vector<PurgeItemCommitOp> ops, uint64_t expire_to)
+    : purge_queue(pq), ops_vec(std::move(ops)), expire_to(expire_to) {
+  }
+
+  void finish(int r) override {
+    purge_queue->_commit_ops(r, ops_vec, expire_to);
+  }
+
+private:
+  PurgeQueue *purge_queue;
+  std::vector<PurgeItemCommitOp> ops_vec;
+  uint64_t expire_to;
+};
+
+void PurgeQueue::_commit_ops(int r, const std::vector<PurgeItemCommitOp>& ops_vec, uint64_t expire_to)
+{
+  if (r < 0) {
+    derr << " r = " << r << dendl;
+    return;
+  }
+
+  SnapContext nullsnapc;
+  C_GatherBuilder gather(cct);
+
+  for (auto &op : ops_vec) {
+    dout(10) << op.item.get_type_str() << dendl;
+    if (op.type == PurgeItemCommitOp::PURGE_OP_RANGE) {
+      uint64_t first_obj = 0, num_obj = 0;
+      uint64_t num = Striper::get_num_objects(op.item.layout, op.item.size);
+      num_obj = num;
+
+      if (op.item.action == PurgeItem::TRUNCATE_FILE) {
+        first_obj = 1;
+        if (num > 1)
+          num_obj = num - 1;
+        else
+          continue;
+      }
+
+      filer.purge_range(op.item.ino, &op.item.layout, op.item.snapc,
+                        first_obj, num_obj, ceph::real_clock::now(), op.flags,
+                        gather.new_sub());
+    } else if (op.type == PurgeItemCommitOp::PURGE_OP_REMOVE) {
+      if (op.item.action == PurgeItem::PURGE_DIR) {
+        objecter->remove(op.oid, op.oloc, nullsnapc,
+                         ceph::real_clock::now(), op.flags,
+                         gather.new_sub());
+      } else {
+        objecter->remove(op.oid, op.oloc, op.item.snapc,
+                         ceph::real_clock::now(), op.flags,
+                         gather.new_sub());
+      }
+    } else if (op.type == PurgeItemCommitOp::PURGE_OP_ZERO) {
+      filer.zero(op.item.ino, &op.item.layout, op.item.snapc,
+                 0, op.item.layout.object_size, ceph::real_clock::now(), 0, true,
+                 gather.new_sub());
+    } else {
+      derr << "Invalid purge op: " << op.type << dendl;
+      ceph_abort();
+    }
+  }
+
+  ceph_assert(gather.has_subs());
+
+  gather.set_finisher(new C_OnFinisher(
+                     new LambdaContext([this, expire_to](int r) {
+    std::lock_guard l(lock);
+
+    if (r == -EBLOCKLISTED) {
+      finisher.queue(on_error, r);
+      on_error = nullptr;
+      return;
+    }
+
+    _execute_item_complete(expire_to);
+    _consume();
+
+    // Have we gone idle?  If so, do an extra write_head now instead of
+    // waiting for next flush after journaler_write_head_interval.
+    // Also do this periodically even if not idle, so that the persisted
+    // expire_pos doesn't fall too far behind our progress when consuming
+    // a very long queue.
+    if (!readonly &&
+        (in_flight.empty() || journaler.write_head_needed())) {
+      journaler.write_head(nullptr);
+    }
+  }), &finisher));
+
+  gather.activate();
+}
+
 void PurgeQueue::_execute_item(
     const PurgeItem &item,
     uint64_t expire_to)
@@ -512,28 +605,26 @@ void PurgeQueue::_execute_item(
   ops_high_water = std::max(ops_high_water, ops_in_flight);
   logger->set(l_pq_executing_ops_high_water, ops_high_water);
 
-  SnapContext nullsnapc;
+  std::vector<PurgeItemCommitOp> ops_vec;
+  auto submit_ops = [&]() {
+    finisher.queue(new C_IO_PurgeItem_Commit(this, std::move(ops_vec), expire_to));
+  };
 
-  C_GatherBuilder gather(cct);
   if (item.action == PurgeItem::PURGE_FILE) {
     if (item.size > 0) {
       uint64_t num = Striper::get_num_objects(item.layout, item.size);
       dout(10) << " 0~" << item.size << " objects 0~" << num
                << " snapc " << item.snapc << " on " << item.ino << dendl;
-      filer.purge_range(item.ino, &item.layout, item.snapc,
-                        0, num, ceph::real_clock::now(), 0,
-                        gather.new_sub());
+      ops_vec.emplace_back(item, PurgeItemCommitOp::PURGE_OP_RANGE, 0);
     }
 
     // remove the backtrace object if it was not purged
     object_t oid = CInode::get_object_name(item.ino, frag_t(), "");
-    if (!gather.has_subs() || !item.layout.pool_ns.empty()) {
+    if (ops_vec.empty() || !item.layout.pool_ns.empty()) {
       object_locator_t oloc(item.layout.pool_id);
       dout(10) << " remove backtrace object " << oid
                << " pool " << oloc.pool << " snapc " << item.snapc << dendl;
-      objecter->remove(oid, oloc, item.snapc,
-                            ceph::real_clock::now(), 0,
-                            gather.new_sub());
+      ops_vec.emplace_back(item, PurgeItemCommitOp::PURGE_OP_REMOVE, 0, oid, oloc);
     }
 
     // remove old backtrace objects
@@ -541,9 +632,7 @@ void PurgeQueue::_execute_item(
       object_locator_t oloc(p);
       dout(10) << " remove backtrace object " << oid
                << " old pool " << p << " snapc " << item.snapc << dendl;
-      objecter->remove(oid, oloc, item.snapc,
-                            ceph::real_clock::now(), 0,
-                            gather.new_sub());
+      ops_vec.emplace_back(item, PurgeItemCommitOp::PURGE_OP_REMOVE, 0, oid, oloc);
     }
   } else if (item.action == PurgeItem::PURGE_DIR) {
     object_locator_t oloc(metadata_pool);
@@ -554,9 +643,7 @@ void PurgeQueue::_execute_item(
     for (const auto &leaf : leaves) {
       object_t oid = CInode::get_object_name(item.ino, leaf, "");
       dout(10) << " remove dirfrag " << oid << dendl;
-      objecter->remove(oid, oloc, nullsnapc,
-                       ceph::real_clock::now(),
-                       0, gather.new_sub());
+      ops_vec.emplace_back(item, PurgeItemCommitOp::PURGE_OP_REMOVE, 0, oid, oloc);
     }
   } else if (item.action == PurgeItem::TRUNCATE_FILE) {
     const uint64_t num = Striper::get_num_objects(item.layout, item.size);
@@ -565,14 +652,9 @@ void PurgeQueue::_execute_item(
 
     // keep backtrace object
     if (num > 1) {
-      filer.purge_range(item.ino, &item.layout, item.snapc,
-                       1, num - 1, ceph::real_clock::now(),
-                       0, gather.new_sub());
+      ops_vec.emplace_back(item, PurgeItemCommitOp::PURGE_OP_RANGE, 0);
     }
-    filer.zero(item.ino, &item.layout, item.snapc,
-              0, item.layout.object_size,
-              ceph::real_clock::now(),
-              0, true, gather.new_sub());
+    ops_vec.emplace_back(item, PurgeItemCommitOp::PURGE_OP_ZERO, 0);
   } else {
     derr << "Invalid item (action=" << item.action << ") in purge queue, "
             "dropping it" << dendl;
@@ -587,33 +669,8 @@ void PurgeQueue::_execute_item(
     logger->set(l_pq_executing_high_water, files_high_water);
     return;
   }
-  ceph_assert(gather.has_subs());
-
-  gather.set_finisher(new C_OnFinisher(
-                      new LambdaContext([this, expire_to](int r){
-    std::lock_guard l(lock);
-
-    if (r == -EBLOCKLISTED) {
-      finisher.queue(on_error, r);
-      on_error = nullptr;
-      return;
-    }
 
-    _execute_item_complete(expire_to);
-    _consume();
-
-    // Have we gone idle?  If so, do an extra write_head now instead of
-    // waiting for next flush after journaler_write_head_interval.
-    // Also do this periodically even if not idle, so that the persisted
-    // expire_pos doesn't fall too far behind our progress when consuming
-    // a very long queue.
-    if (!readonly &&
-       (in_flight.empty() || journaler.write_head_needed())) {
-      journaler.write_head(nullptr);
-    }
-  }), &finisher));
-
-  gather.activate();
+  submit_ops();
 }
 
 void PurgeQueue::_execute_item_complete(
index 2bbcfeb49598689b09c71e23ba71cf2ed9061c0e..270c99ac5166cfbaad486c257a53b7d7b7951b94 100644 (file)
@@ -94,6 +94,28 @@ enum {
   l_pq_last
 };
 
+struct PurgeItemCommitOp {
+public:
+  enum PurgeType : uint8_t {
+    PURGE_OP_RANGE = 0,
+    PURGE_OP_REMOVE = 1,
+    PURGE_OP_ZERO
+  };
+
+  PurgeItemCommitOp(PurgeItem _item, PurgeType _type, int _flags)
+    : item(_item), type(_type), flags(_flags) {}
+
+  PurgeItemCommitOp(PurgeItem _item, PurgeType _type, int _flags,
+                    object_t _oid, object_locator_t _oloc)
+    : item(_item), type(_type), flags(_flags), oid(_oid), oloc(_oloc) {}
+
+  PurgeItem item;
+  PurgeType type;
+  int flags;
+  object_t oid;
+  object_locator_t oloc;
+};
+
 /**
  * A persistent queue of PurgeItems.  This class both writes and reads
  * to the queue.  There is one of these per MDS rank.
@@ -131,6 +153,8 @@ public:
   // to the queue (there is no callback for when it is executed)
   void push(const PurgeItem &pi, Context *completion);
 
+  void _commit_ops(int r, const std::vector<PurgeItemCommitOp>& ops_vec, uint64_t expire_to);
+
   // If the on-disk queue is empty and we are not currently processing
   // anything.
   bool is_idle() const;