pcb.set_prio_default(PerfCountersBuilder::PRIO_USEFUL);
pcb.add_u64(l_pq_executing_ops, "pq_executing_ops", "Purge queue ops in flight");
pcb.add_u64(l_pq_executing, "pq_executing", "Purge queue tasks in flight");
+ pcb.add_u64(l_pq_item_in_journal, "pq_item_in_journal", "Purge item left in journal");
logger.reset(pcb.create_perf_counters());
g_ceph_context->get_perfcounters_collection()->add(logger.get());
{
std::lock_guard l(lock);
+ {
+ PurgeItem item;
+ bufferlist bl;
+
+ // calculate purge item serialized size stored in journal
+ // used to count how many items still left in journal later
+ ::encode(item, bl);
+ purge_item_journal_size = bl.length() + journaler.get_journal_envelope_size();
+ }
+
if (readonly) {
dout(10) << "skipping activate: PurgeQueue is readonly" << dendl;
return;
logger->set(l_pq_executing, in_flight.size());
dout(10) << "in_flight.size() now " << in_flight.size() << dendl;
+ uint64_t write_pos = journaler.get_write_pos();
+ uint64_t read_pos = journaler.get_read_pos();
+ uint64_t expire_pos = journaler.get_expire_pos();
+ uint64_t item_num = (write_pos - (in_flight.size() ? expire_pos : read_pos))
+ / purge_item_journal_size;
+ dout(10) << "left purge items in journal: " << item_num
+ << " (purge_item_journal_size/write_pos/read_pos/expire_pos) now at "
+ << "(" << purge_item_journal_size << "/" << write_pos << "/" << read_pos
+ << "/" << expire_pos << ")" << dendl;
+
+ logger->set(l_pq_item_in_journal, item_num);
logger->inc(l_pq_executed);
}
uint64_t get_read_pos() const { return read_pos; }
uint64_t get_expire_pos() const { return expire_pos; }
uint64_t get_trimmed_pos() const { return trimmed_pos; }
+ size_t get_journal_envelope_size() const {
+ return journal_stream.get_envelope_size();
+ }
};
WRITE_CLASS_ENCODER(Journaler::Header)