// TODO: implement purge queue creation on startup
// if we are on a filesystem created before purge queues existed
-// TODO: ensure that a deactivating MDS rank blocks
-// on complete drain of this queue before finishing
// TODO: when we're deactivating, lift all limits on
// how many OSD ops we're allowed to emit at a time to
// race through the queue as fast as we can.
objecter(objecter_),
journaler("pq", MDS_INO_PURGE_QUEUE + rank, metadata_pool,
CEPH_FS_ONDISK_MAGIC, objecter_, nullptr, 0, &timer,
- &finisher)
+ &finisher),
+ ops_in_flight(0),
+ max_purge_ops(0)
{
}
{
PerfCountersBuilder pcb(g_ceph_context,
"purge_queue", l_pq_first, l_pq_last);
- //pcb.add_u64(l_mdc_num_strays_purging, "num_strays_purging", "Stray dentries purging");
- // pcb.add_u64(l_mdc_num_purge_ops, "num_purge_ops", "Purge operations");
+ pcb.add_u64(l_pq_executing_ops, "pq_executing_ops", "Purge queue ops in flight");
pcb.add_u64(l_pq_executing, "pq_executing", "Purge queue tasks in flight");
- pcb.add_u64_counter(l_pq_executed, "pq_executed", "Purge queue tasks executed");
+ pcb.add_u64_counter(l_pq_executed, "pq_executed", "Purge queue tasks executed", "purg");
logger.reset(pcb.create_perf_counters());
g_ceph_context->get_perfcounters_collection()->add(logger.get());
// Note that flush calls are not 1:1 with IOs, Journaler
// does its own batching. So we just call every time.
- // FIXME: *actually* as soon as we call _consume it will
- // do a flush via _issue_read, so we really are doing one
- // write per event. Avoid this by avoiding doing the journaler
- // read (see "if we could consume this PurgeItem immediately...")
journaler.flush(completion);
// Maybe go ahead and do something with it right away
_consume();
-
- // TODO: if we could consume this PurgeItem immediately, and
- // Journaler does not have any outstanding prefetches, then
- // short circuit its read by advancing read_pos to write_pos
- // and passing the PurgeItem straight into _execute_item
}
-#if 0
-uint32_t StrayManager::_calculate_ops_required(CInode *in, bool trunc)
+uint32_t PurgeQueue::_calculate_ops(const PurgeItem &item) const
{
uint32_t ops_required = 0;
- if (in->is_dir()) {
+ if (item.action == PurgeItem::PURGE_DIR) {
// Directory, count dirfrags to be deleted
std::list<frag_t> ls;
- if (!in->dirfragtree.is_leaf(frag_t())) {
- in->dirfragtree.get_leaves(ls);
+ if (!item.fragtree.is_leaf(frag_t())) {
+ item.fragtree.get_leaves(ls);
}
// One for the root, plus any leaves
ops_required = 1 + ls.size();
} else {
// File, work out concurrent Filer::purge deletes
- const uint64_t to = MAX(in->inode.max_size_ever,
- MAX(in->inode.size, in->inode.get_max_size()));
+ const uint64_t num = (item.size > 0) ?
+ Striper::get_num_objects(item.layout, item.size) : 1;
- const uint64_t num = (to > 0) ? Striper::get_num_objects(in->inode.layout, to) : 1;
ops_required = MIN(num, g_conf->filer_max_purge_ops);
// Account for removing (or zeroing) backtrace
ops_required += 1;
// Account for deletions for old pools
- if (!trunc) {
- ops_required += in->get_projected_inode()->old_pools.size();
+ if (item.action != PurgeItem::TRUNCATE_FILE) {
+ ops_required += item.old_pools.size();
}
}
return ops_required;
}
-#endif
bool PurgeQueue::can_consume()
{
-#if 0
- // Calculate how much of the ops allowance is available, allowing
- // for the case where the limit is currently being exceeded.
- uint32_t ops_avail;
- if (ops_in_flight <= max_purge_ops) {
- ops_avail = max_purge_ops - ops_in_flight;
- } else {
- ops_avail = 0;
+ dout(20) << ops_in_flight << "/" << max_purge_ops << " ops, "
+ << in_flight.size() << "/" << g_conf->mds_max_purge_files
+ << " files" << dendl;
+
+ if (in_flight.size() == 0 && cct->_conf->mds_max_purge_files > 0) {
+ // Always permit consumption if nothing is in flight, so that the ops
+ // limit can never be so low as to forbid all progress (unless
+ // administrator has deliberately paused purging by setting max
+ // purge files to zero).
+ return true;
}
- dout(10) << __func__ << ": allocating allowance "
- << ops_required << " to " << ops_in_flight << " in flight" << dendl;
-
- logger->set(l_mdc_num_purge_ops, ops_in_flight);
-#endif
+ if (ops_in_flight >= max_purge_ops) {
+ dout(20) << "Throttling on op limit " << ops_in_flight << "/"
+ << max_purge_ops << dendl;
+ return false;
+ }
- // TODO: enforce limits (currently just allowing one in flight)
- if (in_flight.size() > cct->_conf->mds_max_purge_files) {
+ if (in_flight.size() >= cct->_conf->mds_max_purge_files) {
+ dout(20) << "Throttling on item limit " << in_flight.size()
+ << "/" << cct->_conf->mds_max_purge_files << dendl;
return false;
} else {
return true;
{
assert(lock.is_locked_by_me());
- // Because we are the writer and the reader of the journal
- // via the same Journaler instance, we never need to reread_head
-
- if (!can_consume()) {
- dout(10) << " cannot consume right now" << dendl;
-
- return;
- }
+ while(can_consume()) {
+ if (!journaler.is_readable()) {
+ dout(10) << " not readable right now" << dendl;
+ // Because we are the writer and the reader of the journal
+ // via the same Journaler instance, we never need to reread_head
+ if (!journaler.have_waiter()) {
+ journaler.wait_for_readable(new FunctionContext([this](int r) {
+ Mutex::Locker l(lock);
+ if (r == 0) {
+ _consume();
+ }
+ }));
+ }
- if (!journaler.is_readable()) {
- dout(10) << " not readable right now" << dendl;
- if (!journaler.have_waiter()) {
- journaler.wait_for_readable(new FunctionContext([this](int r) {
- Mutex::Locker l(lock);
- if (r == 0) {
- _consume();
- }
- }));
+ return;
}
- return;
+ // The journaler is readable: consume an entry
+ bufferlist bl;
+ bool readable = journaler.try_read_entry(bl);
+ assert(readable); // we checked earlier
+
+ dout(20) << " decoding entry" << dendl;
+ PurgeItem item;
+ bufferlist::iterator q = bl.begin();
+ ::decode(item, q);
+ dout(20) << " executing item (0x" << std::hex << item.ino
+ << std::dec << ")" << dendl;
+ _execute_item(item, journaler.get_read_pos());
}
- // The journaler is readable: consume an entry
- bufferlist bl;
- bool readable = journaler.try_read_entry(bl);
- assert(readable); // we checked earlier
-
- dout(20) << " decoding entry" << dendl;
- PurgeItem item;
- bufferlist::iterator q = bl.begin();
- ::decode(item, q);
- dout(20) << " executing item (0x" << std::hex << item.ino
- << std::dec << ")" << dendl;
- _execute_item(item, journaler.get_read_pos());
+ dout(10) << " cannot consume right now" << dendl;
}
void PurgeQueue::_execute_item(
in_flight[expire_to] = item;
logger->set(l_pq_executing, in_flight.size());
+ ops_in_flight += _calculate_ops(item);
+ logger->set(l_pq_executing_ops, ops_in_flight);
SnapContext nullsnapc;
dout(10) << "completed item for ino 0x" << std::hex << iter->second.ino
<< std::dec << dendl;
+ ops_in_flight -= _calculate_ops(iter->second);
+ logger->set(l_pq_executing_ops, ops_in_flight);
+
in_flight.erase(iter);
logger->set(l_pq_executing, in_flight.size());
-#if 0
- // Release resources
- dout(10) << __func__ << ": decrementing op allowance "
- << ops_allowance << " from " << ops_in_flight << " in flight" << dendl;
- assert(ops_in_flight >= ops_allowance);
- ops_in_flight -= ops_allowance;
- logger->set(l_mdc_num_purge_ops, ops_in_flight);
- files_purging -= 1;
-#endif
-
logger->inc(l_pq_executed);
_consume();
if (changed.count("mds_max_purge_ops")
|| changed.count("mds_max_purge_ops_per_pg")) {
update_op_limit(mds_map);
+ } else if (changed.count("mds_max_purge_files")) {
+ Mutex::Locker l(lock);
+
+ if (in_flight.empty()) {
+ // We might have gone from zero to a finite limit, so
+ // might need to kick off consume.
+ dout(4) << "maybe start work again (max_purge_files="
+ << conf->mds_max_purge_files << dendl;
+ finisher.queue(new FunctionContext([this](int r){
+ Mutex::Locker l(lock);
+ _consume();
+ }));
+ }
}
}
+bool PurgeQueue::is_idle() const
+{
+ return in_flight.empty() && (
+ journaler.get_read_pos() == journaler.get_write_pos());
+}
+