DECODE_FINISH(p);
}
-// TODO: if Objecter has any slow requests, take that as a hint and
-// slow down our rate of purging (keep accepting pushes though)
+// if Objecter has any slow requests, take that as a hint and
+// slow down our rate of purging
PurgeQueue::PurgeQueue(
CephContext *cct_,
mds_rank_t rank_,
return could_consume;
}
+class C_IO_PurgeItem_Commit : public Context {
+public:
+ C_IO_PurgeItem_Commit(PurgeQueue *pq, std::vector<PurgeItemCommitOp> ops, uint64_t expire_to)
+ : purge_queue(pq), ops_vec(std::move(ops)), expire_to(expire_to) {
+ }
+
+ void finish(int r) override {
+ purge_queue->_commit_ops(r, ops_vec, expire_to);
+ }
+
+private:
+ PurgeQueue *purge_queue;
+ std::vector<PurgeItemCommitOp> ops_vec;
+ uint64_t expire_to;
+};
+
+void PurgeQueue::_commit_ops(int r, const std::vector<PurgeItemCommitOp>& ops_vec, uint64_t expire_to)
+{
+ if (r < 0) {
+ derr << " r = " << r << dendl;
+ return;
+ }
+
+ SnapContext nullsnapc;
+ C_GatherBuilder gather(cct);
+
+ for (auto &op : ops_vec) {
+ dout(10) << op.item.get_type_str() << dendl;
+ if (op.type == PurgeItemCommitOp::PURGE_OP_RANGE) {
+ uint64_t first_obj = 0, num_obj = 0;
+ uint64_t num = Striper::get_num_objects(op.item.layout, op.item.size);
+ num_obj = num;
+
+ if (op.item.action == PurgeItem::TRUNCATE_FILE) {
+ first_obj = 1;
+ if (num > 1)
+ num_obj = num - 1;
+ else
+ continue;
+ }
+
+ filer.purge_range(op.item.ino, &op.item.layout, op.item.snapc,
+ first_obj, num_obj, ceph::real_clock::now(), op.flags,
+ gather.new_sub());
+ } else if (op.type == PurgeItemCommitOp::PURGE_OP_REMOVE) {
+ if (op.item.action == PurgeItem::PURGE_DIR) {
+ objecter->remove(op.oid, op.oloc, nullsnapc,
+ ceph::real_clock::now(), op.flags,
+ gather.new_sub());
+ } else {
+ objecter->remove(op.oid, op.oloc, op.item.snapc,
+ ceph::real_clock::now(), op.flags,
+ gather.new_sub());
+ }
+ } else if (op.type == PurgeItemCommitOp::PURGE_OP_ZERO) {
+ filer.zero(op.item.ino, &op.item.layout, op.item.snapc,
+ 0, op.item.layout.object_size, ceph::real_clock::now(), 0, true,
+ gather.new_sub());
+ } else {
+ derr << "Invalid purge op: " << op.type << dendl;
+ ceph_abort();
+ }
+ }
+
+ ceph_assert(gather.has_subs());
+
+ gather.set_finisher(new C_OnFinisher(
+ new LambdaContext([this, expire_to](int r) {
+ std::lock_guard l(lock);
+
+ if (r == -EBLOCKLISTED) {
+ finisher.queue(on_error, r);
+ on_error = nullptr;
+ return;
+ }
+
+ _execute_item_complete(expire_to);
+ _consume();
+
+ // Have we gone idle? If so, do an extra write_head now instead of
+ // waiting for next flush after journaler_write_head_interval.
+ // Also do this periodically even if not idle, so that the persisted
+ // expire_pos doesn't fall too far behind our progress when consuming
+ // a very long queue.
+ if (!readonly &&
+ (in_flight.empty() || journaler.write_head_needed())) {
+ journaler.write_head(nullptr);
+ }
+ }), &finisher));
+
+ gather.activate();
+}
+
void PurgeQueue::_execute_item(
const PurgeItem &item,
uint64_t expire_to)
ops_high_water = std::max(ops_high_water, ops_in_flight);
logger->set(l_pq_executing_ops_high_water, ops_high_water);
- SnapContext nullsnapc;
+ std::vector<PurgeItemCommitOp> ops_vec;
+ auto submit_ops = [&]() {
+ finisher.queue(new C_IO_PurgeItem_Commit(this, std::move(ops_vec), expire_to));
+ };
- C_GatherBuilder gather(cct);
if (item.action == PurgeItem::PURGE_FILE) {
if (item.size > 0) {
uint64_t num = Striper::get_num_objects(item.layout, item.size);
dout(10) << " 0~" << item.size << " objects 0~" << num
<< " snapc " << item.snapc << " on " << item.ino << dendl;
- filer.purge_range(item.ino, &item.layout, item.snapc,
- 0, num, ceph::real_clock::now(), 0,
- gather.new_sub());
+ ops_vec.emplace_back(item, PurgeItemCommitOp::PURGE_OP_RANGE, 0);
}
// remove the backtrace object if it was not purged
object_t oid = CInode::get_object_name(item.ino, frag_t(), "");
- if (!gather.has_subs() || !item.layout.pool_ns.empty()) {
+ if (ops_vec.empty() || !item.layout.pool_ns.empty()) {
object_locator_t oloc(item.layout.pool_id);
dout(10) << " remove backtrace object " << oid
<< " pool " << oloc.pool << " snapc " << item.snapc << dendl;
- objecter->remove(oid, oloc, item.snapc,
- ceph::real_clock::now(), 0,
- gather.new_sub());
+ ops_vec.emplace_back(item, PurgeItemCommitOp::PURGE_OP_REMOVE, 0, oid, oloc);
}
// remove old backtrace objects
object_locator_t oloc(p);
dout(10) << " remove backtrace object " << oid
<< " old pool " << p << " snapc " << item.snapc << dendl;
- objecter->remove(oid, oloc, item.snapc,
- ceph::real_clock::now(), 0,
- gather.new_sub());
+ ops_vec.emplace_back(item, PurgeItemCommitOp::PURGE_OP_REMOVE, 0, oid, oloc);
}
} else if (item.action == PurgeItem::PURGE_DIR) {
object_locator_t oloc(metadata_pool);
for (const auto &leaf : leaves) {
object_t oid = CInode::get_object_name(item.ino, leaf, "");
dout(10) << " remove dirfrag " << oid << dendl;
- objecter->remove(oid, oloc, nullsnapc,
- ceph::real_clock::now(),
- 0, gather.new_sub());
+ ops_vec.emplace_back(item, PurgeItemCommitOp::PURGE_OP_REMOVE, 0, oid, oloc);
}
} else if (item.action == PurgeItem::TRUNCATE_FILE) {
const uint64_t num = Striper::get_num_objects(item.layout, item.size);
// keep backtrace object
if (num > 1) {
- filer.purge_range(item.ino, &item.layout, item.snapc,
- 1, num - 1, ceph::real_clock::now(),
- 0, gather.new_sub());
+ ops_vec.emplace_back(item, PurgeItemCommitOp::PURGE_OP_RANGE, 0);
}
- filer.zero(item.ino, &item.layout, item.snapc,
- 0, item.layout.object_size,
- ceph::real_clock::now(),
- 0, true, gather.new_sub());
+ ops_vec.emplace_back(item, PurgeItemCommitOp::PURGE_OP_ZERO, 0);
} else {
derr << "Invalid item (action=" << item.action << ") in purge queue, "
"dropping it" << dendl;
logger->set(l_pq_executing_high_water, files_high_water);
return;
}
- ceph_assert(gather.has_subs());
-
- gather.set_finisher(new C_OnFinisher(
- new LambdaContext([this, expire_to](int r){
- std::lock_guard l(lock);
-
- if (r == -EBLOCKLISTED) {
- finisher.queue(on_error, r);
- on_error = nullptr;
- return;
- }
- _execute_item_complete(expire_to);
- _consume();
-
- // Have we gone idle? If so, do an extra write_head now instead of
- // waiting for next flush after journaler_write_head_interval.
- // Also do this periodically even if not idle, so that the persisted
- // expire_pos doesn't fall too far behind our progress when consuming
- // a very long queue.
- if (!readonly &&
- (in_flight.empty() || journaler.write_head_needed())) {
- journaler.write_head(nullptr);
- }
- }), &finisher));
-
- gather.activate();
+ submit_ops();
}
void PurgeQueue::_execute_item_complete(