ceph_assert(m_in_flight_object_closes == 0);
}
+void JournalRecorder::shut_down(Context *on_safe) {
+ on_safe = new FunctionContext(
+ [this, on_safe](int r) {
+ Context *ctx = nullptr;
+ {
+ Mutex::Locker locker(m_lock);
+ if (m_in_flight_advance_sets != 0) {
+ ceph_assert(m_on_object_set_advanced == nullptr);
+ m_on_object_set_advanced = new FunctionContext(
+ [on_safe, r](int) {
+ on_safe->complete(r);
+ });
+ } else {
+ ctx = on_safe;
+ }
+ }
+ if (ctx != nullptr) {
+ ctx->complete(r);
+ }
+ });
+ flush(on_safe);
+}
+
Future JournalRecorder::append(uint64_t tag_tid,
const bufferlist &payload_bl) {
}
void JournalRecorder::handle_advance_object_set(int r) {
- Mutex::Locker locker(m_lock);
- ldout(m_cct, 20) << __func__ << ": r=" << r << dendl;
+ Context *on_object_set_advanced = nullptr;
+ {
+ Mutex::Locker locker(m_lock);
+ ldout(m_cct, 20) << __func__ << ": r=" << r << dendl;
- ceph_assert(m_in_flight_advance_sets > 0);
- --m_in_flight_advance_sets;
+ ceph_assert(m_in_flight_advance_sets > 0);
+ --m_in_flight_advance_sets;
- if (r < 0 && r != -ESTALE) {
- lderr(m_cct) << __func__ << ": failed to advance object set: "
- << cpp_strerror(r) << dendl;
- }
+ if (r < 0 && r != -ESTALE) {
+ lderr(m_cct) << __func__ << ": failed to advance object set: "
+ << cpp_strerror(r) << dendl;
+ }
- if (m_in_flight_advance_sets == 0 && m_in_flight_object_closes == 0) {
- open_object_set();
+ if (m_in_flight_advance_sets == 0 && m_in_flight_object_closes == 0) {
+ open_object_set();
+ std::swap(on_object_set_advanced, m_on_object_set_advanced);
+ }
+ }
+ if (on_object_set_advanced != nullptr) {
+ on_object_set_advanced->complete(0);
}
}
double flush_age, uint64_t max_in_flight_appends);
~JournalRecorder();
+ void shut_down(Context *on_safe);
Future append(uint64_t tag_tid, const bufferlist &bl);
void flush(Context *on_safe);
FutureImplPtr m_prev_future;
+ Context *m_on_object_set_advanced = nullptr;
+
void open_object_set();
bool close_object_set(uint64_t active_set);