static_cast<uint64_t>(max_in_flight)}
{}
~ObjectRecorderFlusher() {
- for (auto& object_recorder : m_object_recorders) {
+ for (auto& [object_recorder, m] : m_object_recorders) {
C_SaferCond cond;
object_recorder->flush(&cond);
cond.wait();
+ std::scoped_lock l{*m};
+ if (!object_recorder->is_closed()) {
+ object_recorder->close();
+ }
}
}
journal::ObjectRecorderPtr create_object(const std::string& oid,
m_flush_bytes,
m_flush_age);
}
- m_object_recorders.push_back(object);
+ m_object_recorders.emplace_back(object, lock);
m_handler.object_lock = lock;
return object;
}
uint64_t m_flush_bytes = std::numeric_limits<uint64_t>::max();
double m_flush_age = 600;
uint64_t m_max_in_flight_appends = 0;
- using ObjectRecorders = std::list<journal::ObjectRecorderPtr>;
+ using ObjectRecorders =
+ std::list<std::pair<journal::ObjectRecorderPtr, ceph::mutex*>>;
ObjectRecorders m_object_recorders;
Handler m_handler;
};