if (iter == in_flight.begin()) {
// This was the lowest journal position in flight, so we can now
// safely expire the journal up to here.
+ dout(10) << "expiring to 0x" << std::hex << expire_to << std::dec << dendl;
journaler.set_expire_pos(expire_to);
- journaler.trim();
+ } else {
+ // This is completely fine, we're not supposed to purge files in
+ // order when doing them in parallel.
+ dout(10) << "non-sequential completion, not expiring anything" << dendl;
}
- dout(10) << "completed item for ino 0x" << std::hex << iter->second.ino
- << std::dec << dendl;
-
ops_in_flight -= _calculate_ops(iter->second);
logger->set(l_pq_executing_ops, ops_in_flight);
+ dout(10) << "completed item for ino 0x" << std::hex << iter->second.ino
+ << std::dec << dendl;
+
in_flight.erase(iter);
logger->set(l_pq_executing, in_flight.size());
+ dout(10) << "in_flight.size() now " << in_flight.size() << dendl;
logger->inc(l_pq_executed);
_consume();
+
+ // Have we gone idle? If so, do an extra write_head now instead of
+ // waiting for next flush after journaler_write_head_interval.
+ // Also do this periodically even if not idle, so that the persisted
+ // expire_pos doesn't fall too far behind our progress when consuming
+ // a very long queue.
+ if (in_flight.empty() || journaler.write_head_needed()) {
+ journaler.write_head(new FunctionContext([this](int r){
+ journaler.trim();
+ }));
+ }
}
void PurgeQueue::update_op_limit(const MDSMap &mds_map)