Context *on_ready = create_context_callback<
ImageReplayer, &ImageReplayer<I>::handle_process_entry_ready>(this);
Context *on_commit = new C_ReplayCommitted(this, std::move(m_replay_entry));
+
+ m_event_replay_tracker.start_op();
m_local_replay->process(m_event_entry, on_ready, on_commit);
}
if (r < 0) {
derr << "failed to commit journal event: " << cpp_strerror(r) << dendl;
-
handle_replay_complete(r, "failed to commit journal event");
- return;
- }
-
- if (m_remote_journaler) {
+ } else {
+ assert(m_remote_journaler != nullptr);
m_remote_journaler->committed(replay_entry);
}
+ m_event_replay_tracker.finish_op();
}
template <typename I>
// blocks if listener notification is in-progress
m_local_journal->remove_listener(m_journal_listener);
- ctx->complete(0);
+
+ // wait for all in-flight replayed events to complete
+ m_event_replay_tracker.wait_for_ops(ctx);
});
if (m_local_replay != nullptr) {
ctx = new FunctionContext([this, ctx](int r) {
#include <vector>
#include "include/atomic.h"
+#include "common/AsyncOpTracker.h"
#include "common/Mutex.h"
#include "common/WorkQueue.h"
#include "include/rados/librados.hpp"
cls::journal::Tag m_replay_tag;
librbd::journal::TagData m_replay_tag_data;
librbd::journal::EventEntry m_event_entry;
+ AsyncOpTracker m_event_replay_tracker;
struct RemoteJournalerListener : public ::journal::JournalMetadataListener {
ImageReplayer *replayer;