}
template <typename I>
-int Journal<I>::start_external_replay(journal::Replay<I> **journal_replay) {
+void Journal<I>::start_external_replay(journal::Replay<I> **journal_replay,
+ Context *on_finish) {
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << this << " " << __func__ << dendl;
assert(m_state == STATE_READY);
assert(m_journal_replay == nullptr);
+ on_finish = util::create_async_context_callback(m_image_ctx, on_finish);
+ on_finish = new FunctionContext(
+ [this, journal_replay, on_finish](int r) {
+ handle_start_external_replay(r, journal_replay, on_finish);
+ });
+
+ // safely flush all in-flight events before starting external replay
+ m_journaler->stop_append(util::create_async_context_callback(m_image_ctx,
+ on_finish));
+}
+
+template <typename I>
+void Journal<I>::handle_start_external_replay(int r,
+ journal::Replay<I> **journal_replay,
+ Context *on_finish) {
+ CephContext *cct = m_image_ctx.cct;
+ ldout(cct, 20) << this << " " << __func__ << dendl;
+
+ Mutex::Locker locker(m_lock);
+ assert(m_state == STATE_READY);
+ assert(m_journal_replay == nullptr);
+
+ if (r < 0) {
+ lderr(cct) << "failed to stop recording: " << cpp_strerror(r) << dendl;
+ *journal_replay = nullptr;
+
+ // get back to a sane-state
+ start_append();
+ on_finish->complete(r);
+ return;
+ }
+
transition_state(STATE_REPLAYING, 0);
m_journal_replay = journal::Replay<I>::create(m_image_ctx);
-
*journal_replay = m_journal_replay;
- return 0;
+ on_finish->complete(0);
}
template <typename I>
delete m_journal_replay;
m_journal_replay = nullptr;
- transition_state(STATE_READY, 0);
+
+ start_append();
}
template <typename I>
}
}
+template <typename I>
+void Journal<I>::start_append() {
+ assert(m_lock.is_locked());
+ m_journaler->start_append(m_image_ctx.journal_object_flush_interval,
+ m_image_ctx.journal_object_flush_bytes,
+ m_image_ctx.journal_object_flush_age);
+ transition_state(STATE_READY, 0);
+}
+
template <typename I>
void Journal<I>::handle_initialized(int r) {
CephContext *cct = m_image_ctx.cct;
m_journal_replay = NULL;
m_error_result = 0;
- m_journaler->start_append(m_image_ctx.journal_object_flush_interval,
- m_image_ctx.journal_object_flush_bytes,
- m_image_ctx.journal_object_flush_age);
- transition_state(STATE_READY, 0);
+ start_append();
}
template <typename I>
return op_tid;
}
- int start_external_replay(journal::Replay<ImageCtxT> **journal_replay);
+ void start_external_replay(journal::Replay<ImageCtxT> **journal_replay,
+ Context *on_finish);
void stop_external_replay();
private:
void complete_event(typename Events::iterator it, int r);
+ void start_append();
+
void handle_initialized(int r);
void handle_get_tags(int r);
void handle_replay_process_ready(int r);
void handle_replay_process_safe(ReplayEntry replay_entry, int r);
+ void handle_start_external_replay(int r,
+ journal::Replay<ImageCtxT> **journal_replay,
+ Context *on_finish);
+
void handle_flushing_restart(int r);
void handle_flushing_replay();
void ImageReplayer<I>::start_replay() {
dout(20) << dendl;
- int r = m_local_image_ctx->journal->start_external_replay(&m_local_replay);
+ Context *ctx = create_context_callback<
+ ImageReplayer, &ImageReplayer<I>::handle_start_replay>(this);
+ m_local_image_ctx->journal->start_external_replay(&m_local_replay, ctx);
+}
+
+template <typename I>
+void ImageReplayer<I>::handle_start_replay(int r) {
+ dout(20) << "r=" << r << dendl;
+
if (r < 0) {
derr << "error starting external replay on local image "
<< m_local_image_id << ": " << cpp_strerror(r) << dendl;