From: Yanhu Cao Date: Fri, 11 Sep 2020 01:43:49 +0000 (+0800) Subject: mds: make purge_queue delete objects asynchronously and keep accepting pushes X-Git-Tag: v16.1.0~50^2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=50aafa453709eb95df77ddac28e07083c5f71884;p=ceph.git mds: make purge_queue delete objects asynchronously and keep accepting pushes Signed-off-by: Yanhu Cao --- diff --git a/src/mds/PurgeQueue.cc b/src/mds/PurgeQueue.cc index 71cc929aecc3..8dda909f73df 100644 --- a/src/mds/PurgeQueue.cc +++ b/src/mds/PurgeQueue.cc @@ -98,8 +98,8 @@ void PurgeItem::decode(bufferlist::const_iterator &p) DECODE_FINISH(p); } -// TODO: if Objecter has any slow requests, take that as a hint and -// slow down our rate of purging (keep accepting pushes though) +// if Objecter has any slow requests, take that as a hint and +// slow down our rate of purging PurgeQueue::PurgeQueue( CephContext *cct_, mds_rank_t rank_, @@ -495,6 +495,99 @@ bool PurgeQueue::_consume() return could_consume; } +class C_IO_PurgeItem_Commit : public Context { +public: + C_IO_PurgeItem_Commit(PurgeQueue *pq, std::vector ops, uint64_t expire_to) + : purge_queue(pq), ops_vec(std::move(ops)), expire_to(expire_to) { + } + + void finish(int r) override { + purge_queue->_commit_ops(r, ops_vec, expire_to); + } + +private: + PurgeQueue *purge_queue; + std::vector ops_vec; + uint64_t expire_to; +}; + +void PurgeQueue::_commit_ops(int r, const std::vector& ops_vec, uint64_t expire_to) +{ + if (r < 0) { + derr << " r = " << r << dendl; + return; + } + + SnapContext nullsnapc; + C_GatherBuilder gather(cct); + + for (auto &op : ops_vec) { + dout(10) << op.item.get_type_str() << dendl; + if (op.type == PurgeItemCommitOp::PURGE_OP_RANGE) { + uint64_t first_obj = 0, num_obj = 0; + uint64_t num = Striper::get_num_objects(op.item.layout, op.item.size); + num_obj = num; + + if (op.item.action == PurgeItem::TRUNCATE_FILE) { + first_obj = 1; + if (num > 1) + num_obj = num - 1; + else + continue; + } + + filer.purge_range(op.item.ino, &op.item.layout, op.item.snapc, + first_obj, num_obj, ceph::real_clock::now(), op.flags, + gather.new_sub()); + } else if (op.type == PurgeItemCommitOp::PURGE_OP_REMOVE) { + if (op.item.action == PurgeItem::PURGE_DIR) { + objecter->remove(op.oid, op.oloc, nullsnapc, + ceph::real_clock::now(), op.flags, + gather.new_sub()); + } else { + objecter->remove(op.oid, op.oloc, op.item.snapc, + ceph::real_clock::now(), op.flags, + gather.new_sub()); + } + } else if (op.type == PurgeItemCommitOp::PURGE_OP_ZERO) { + filer.zero(op.item.ino, &op.item.layout, op.item.snapc, + 0, op.item.layout.object_size, ceph::real_clock::now(), 0, true, + gather.new_sub()); + } else { + derr << "Invalid purge op: " << op.type << dendl; + ceph_abort(); + } + } + + ceph_assert(gather.has_subs()); + + gather.set_finisher(new C_OnFinisher( + new LambdaContext([this, expire_to](int r) { + std::lock_guard l(lock); + + if (r == -EBLOCKLISTED) { + finisher.queue(on_error, r); + on_error = nullptr; + return; + } + + _execute_item_complete(expire_to); + _consume(); + + // Have we gone idle? If so, do an extra write_head now instead of + // waiting for next flush after journaler_write_head_interval. + // Also do this periodically even if not idle, so that the persisted + // expire_pos doesn't fall too far behind our progress when consuming + // a very long queue. + if (!readonly && + (in_flight.empty() || journaler.write_head_needed())) { + journaler.write_head(nullptr); + } + }), &finisher)); + + gather.activate(); +} + void PurgeQueue::_execute_item( const PurgeItem &item, uint64_t expire_to) @@ -512,28 +605,26 @@ void PurgeQueue::_execute_item( ops_high_water = std::max(ops_high_water, ops_in_flight); logger->set(l_pq_executing_ops_high_water, ops_high_water); - SnapContext nullsnapc; + std::vector ops_vec; + auto submit_ops = [&]() { + finisher.queue(new C_IO_PurgeItem_Commit(this, std::move(ops_vec), expire_to)); + }; - C_GatherBuilder gather(cct); if (item.action == PurgeItem::PURGE_FILE) { if (item.size > 0) { uint64_t num = Striper::get_num_objects(item.layout, item.size); dout(10) << " 0~" << item.size << " objects 0~" << num << " snapc " << item.snapc << " on " << item.ino << dendl; - filer.purge_range(item.ino, &item.layout, item.snapc, - 0, num, ceph::real_clock::now(), 0, - gather.new_sub()); + ops_vec.emplace_back(item, PurgeItemCommitOp::PURGE_OP_RANGE, 0); } // remove the backtrace object if it was not purged object_t oid = CInode::get_object_name(item.ino, frag_t(), ""); - if (!gather.has_subs() || !item.layout.pool_ns.empty()) { + if (ops_vec.empty() || !item.layout.pool_ns.empty()) { object_locator_t oloc(item.layout.pool_id); dout(10) << " remove backtrace object " << oid << " pool " << oloc.pool << " snapc " << item.snapc << dendl; - objecter->remove(oid, oloc, item.snapc, - ceph::real_clock::now(), 0, - gather.new_sub()); + ops_vec.emplace_back(item, PurgeItemCommitOp::PURGE_OP_REMOVE, 0, oid, oloc); } // remove old backtrace objects @@ -541,9 +632,7 @@ void PurgeQueue::_execute_item( object_locator_t oloc(p); dout(10) << " remove backtrace object " << oid << " old pool " << p << " snapc " << item.snapc << dendl; - objecter->remove(oid, oloc, item.snapc, - ceph::real_clock::now(), 0, - gather.new_sub()); + ops_vec.emplace_back(item, PurgeItemCommitOp::PURGE_OP_REMOVE, 0, oid, oloc); } } else if (item.action == PurgeItem::PURGE_DIR) { object_locator_t oloc(metadata_pool); @@ -554,9 +643,7 @@ void PurgeQueue::_execute_item( for (const auto &leaf : leaves) { object_t oid = CInode::get_object_name(item.ino, leaf, ""); dout(10) << " remove dirfrag " << oid << dendl; - objecter->remove(oid, oloc, nullsnapc, - ceph::real_clock::now(), - 0, gather.new_sub()); + ops_vec.emplace_back(item, PurgeItemCommitOp::PURGE_OP_REMOVE, 0, oid, oloc); } } else if (item.action == PurgeItem::TRUNCATE_FILE) { const uint64_t num = Striper::get_num_objects(item.layout, item.size); @@ -565,14 +652,9 @@ void PurgeQueue::_execute_item( // keep backtrace object if (num > 1) { - filer.purge_range(item.ino, &item.layout, item.snapc, - 1, num - 1, ceph::real_clock::now(), - 0, gather.new_sub()); + ops_vec.emplace_back(item, PurgeItemCommitOp::PURGE_OP_RANGE, 0); } - filer.zero(item.ino, &item.layout, item.snapc, - 0, item.layout.object_size, - ceph::real_clock::now(), - 0, true, gather.new_sub()); + ops_vec.emplace_back(item, PurgeItemCommitOp::PURGE_OP_ZERO, 0); } else { derr << "Invalid item (action=" << item.action << ") in purge queue, " "dropping it" << dendl; @@ -587,33 +669,8 @@ void PurgeQueue::_execute_item( logger->set(l_pq_executing_high_water, files_high_water); return; } - ceph_assert(gather.has_subs()); - - gather.set_finisher(new C_OnFinisher( - new LambdaContext([this, expire_to](int r){ - std::lock_guard l(lock); - - if (r == -EBLOCKLISTED) { - finisher.queue(on_error, r); - on_error = nullptr; - return; - } - _execute_item_complete(expire_to); - _consume(); - - // Have we gone idle? If so, do an extra write_head now instead of - // waiting for next flush after journaler_write_head_interval. - // Also do this periodically even if not idle, so that the persisted - // expire_pos doesn't fall too far behind our progress when consuming - // a very long queue. - if (!readonly && - (in_flight.empty() || journaler.write_head_needed())) { - journaler.write_head(nullptr); - } - }), &finisher)); - - gather.activate(); + submit_ops(); } void PurgeQueue::_execute_item_complete( diff --git a/src/mds/PurgeQueue.h b/src/mds/PurgeQueue.h index 2bbcfeb49598..270c99ac5166 100644 --- a/src/mds/PurgeQueue.h +++ b/src/mds/PurgeQueue.h @@ -94,6 +94,28 @@ enum { l_pq_last }; +struct PurgeItemCommitOp { +public: + enum PurgeType : uint8_t { + PURGE_OP_RANGE = 0, + PURGE_OP_REMOVE = 1, + PURGE_OP_ZERO + }; + + PurgeItemCommitOp(PurgeItem _item, PurgeType _type, int _flags) + : item(_item), type(_type), flags(_flags) {} + + PurgeItemCommitOp(PurgeItem _item, PurgeType _type, int _flags, + object_t _oid, object_locator_t _oloc) + : item(_item), type(_type), flags(_flags), oid(_oid), oloc(_oloc) {} + + PurgeItem item; + PurgeType type; + int flags; + object_t oid; + object_locator_t oloc; +}; + /** * A persistent queue of PurgeItems. This class both writes and reads * to the queue. There is one of these per MDS rank. @@ -131,6 +153,8 @@ public: // to the queue (there is no callback for when it is executed) void push(const PurgeItem &pi, Context *completion); + void _commit_ops(int r, const std::vector& ops_vec, uint64_t expire_to); + // If the on-disk queue is empty and we are not currently processing // anything. bool is_idle() const;