}
template <typename I>
-void Replay<I>::handle_aio_modify_complete(Context *on_safe, int r) {
+void Replay<I>::handle_aio_modify_complete(Context *on_ready, Context *on_safe,
+ int r) {
Mutex::Locker locker(m_lock);
CephContext *cct = m_image_ctx.cct;
- ldout(cct, 20) << this << " " << __func__ << ": on_safe=" << on_safe << ", "
- << "r=" << r << dendl;
+ ldout(cct, 20) << this << " " << __func__ << ": on_ready=" << on_ready << ", "
+ << "on_safe=" << on_safe << ", r=" << r << dendl;
+ if (on_ready != nullptr) {
+ on_ready->complete(0);
+ }
if (r < 0) {
lderr(cct) << "AIO modify op failed: " << cpp_strerror(r) << dendl;
on_safe->complete(r);
// completed by flushes-only so that we don't move the journal
// commit position until safely on-disk
- // when safe, the completion of the next flush will fire the on_safe
- // callback
- AioCompletion *aio_comp = AioCompletion::create<Context>(
- new C_AioModifyComplete(this, on_safe));
-
*flush_required = (m_aio_modify_unsafe_contexts.size() ==
IN_FLIGHT_IO_LOW_WATER_MARK);
if (*flush_required) {
if (m_in_flight_aio_modify == IN_FLIGHT_IO_HIGH_WATER_MARK) {
ldout(cct, 10) << "hit AIO replay high-water mark: pausing replay"
<< dendl;
- m_on_aio_ready = on_ready;
- } else {
- on_ready->complete(0);
+ assert(m_on_aio_ready == nullptr);
+ std::swap(m_on_aio_ready, on_ready);
}
+ // when the modification is ACKed by librbd, we can process the next
+ // event. when flushed, the completion of the next flush will fire the
+ // on_safe callback
+ AioCompletion *aio_comp = AioCompletion::create<Context>(
+ new C_AioModifyComplete(this, on_ready, on_safe));
return aio_comp;
}
struct C_AioModifyComplete : public Context {
Replay *replay;
+ Context *on_ready;
Context *on_safe;
- C_AioModifyComplete(Replay *replay, Context *on_safe)
- : replay(replay), on_safe(on_safe) {
+ C_AioModifyComplete(Replay *replay, Context *on_ready, Context *on_safe)
+ : replay(replay), on_ready(on_ready), on_safe(on_safe) {
}
virtual void finish(int r) {
- replay->handle_aio_modify_complete(on_safe, r);
+ replay->handle_aio_modify_complete(on_ready, on_safe, r);
}
};
void handle_event(const UnknownEvent &event, Context *on_ready,
Context *on_safe);
- void handle_aio_modify_complete(Context *on_safe, int r);
+ void handle_aio_modify_complete(Context *on_ready, Context *on_safe, int r);
void handle_aio_flush_complete(Context *on_flush_safe, Contexts &on_safe_ctxs,
int r);
when_process(mock_journal_replay,
EventEntry{AioDiscardEvent(123, 456)},
&on_ready, &on_safes[i]);
+ when_complete(mock_image_ctx, aio_comp, 0);
ASSERT_EQ(0, on_ready.wait());
- when_complete(mock_image_ctx, aio_comp, 0);
if (flush_comp != nullptr) {
when_complete(mock_image_ctx, flush_comp, 0);
}
when_process(mock_journal_replay,
EventEntry{AioWriteEvent(123, 456, to_bl("test"))},
&on_ready, &on_safes[i]);
+ when_complete(mock_image_ctx, aio_comp, 0);
if (i < io_count - 1) {
ASSERT_EQ(0, on_ready.wait());
- }
-
- when_complete(mock_image_ctx, aio_comp, 0);
- if (i == io_count - 1) {
+ } else {
for (auto flush_comp : flush_comps) {
when_complete(mock_image_ctx, flush_comp, 0);
- ASSERT_EQ(0, on_ready.wait());
}
+ ASSERT_EQ(0, on_ready.wait());
}
}
for (auto &on_safe : on_safes) {