// Maximum number of concurrent RADOS ops to issue in purging, scaled by PG count
OPTION(mds_max_purge_ops_per_pg, OPT_FLOAT, 0.5)
+OPTION(mds_purge_queue_busy_flush_period, OPT_FLOAT, 1.0)
+
OPTION(mds_root_ino_uid, OPT_INT, 0) // The UID of / on new filesystems
OPTION(mds_root_ino_gid, OPT_INT, 0) // The GID of / on new filesystems
ops_in_flight(0),
max_purge_ops(0),
drain_initial(0),
- draining(false)
+ draining(false),
+ delayed_flush(nullptr)
{
assert(cct != nullptr);
assert(on_error != nullptr);
journaler.append_entry(bl);
journaler.wait_for_flush(completion);
- // It is not necessary to explicitly flush here, because the reader
- // will get flushes generated inside Journaler::is_readable
-
// Maybe go ahead and do something with it right away
- _consume();
+ bool could_consume = _consume();
+ if (!could_consume) {
+ // Usually, it is not necessary to explicitly flush here, because the reader
+ // will get flushes generated inside Journaler::is_readable. However,
+ // if we remain in a can_consume()==false state for a long period then
+ // we should flush in order to allow MDCache to drop its strays rather
+ // than having them wait for purgequeue to progress.
+ if (!delayed_flush) {
+ delayed_flush = new FunctionContext([this](int r){
+ delayed_flush = nullptr;
+ journaler.flush();
+ });
+
+ timer.add_event_after(
+ g_conf->mds_purge_queue_busy_flush_period,
+ delayed_flush);
+ }
+ }
}
uint32_t PurgeQueue::_calculate_ops(const PurgeItem &item) const
}
}
-void PurgeQueue::_consume()
+bool PurgeQueue::_consume()
{
assert(lock.is_locked_by_me());
+ bool could_consume = false;
while(can_consume()) {
+ could_consume = true;
+
+ if (delayed_flush) {
+ // We are now going to read from the journal, so any proactive
+ // flush is no longer necessary. This is not functionally necessary
+ // but it can avoid generating extra fragmented flush IOs.
+ timer.cancel_event(delayed_flush);
+ delayed_flush = nullptr;
+ }
+
if (!journaler.is_readable()) {
dout(10) << " not readable right now" << dendl;
// Because we are the writer and the reader of the journal
}));
}
- return;
+ return could_consume;
}
// The journaler is readable: consume an entry
}
dout(10) << " cannot consume right now" << dendl;
+
+ return could_consume;
}
void PurgeQueue::_execute_item(