explicit FlushCommand(ImageReplayer *replayer) : replayer(replayer) {}
bool call(Formatter *f, stringstream *ss) {
- int r = replayer->flush();
+ C_SaferCond cond;
+ replayer->flush(&cond);
+ int r = cond.wait();
if (r < 0) {
*ss << "flush: " << cpp_strerror(r);
return false;
on_finish->complete(0);
});
+ m_on_finish = ctx;
+ }
+ } else if (m_state == STATE_FLUSHING_REPLAY) {
+ dout(20) << "interrupting flush" << dendl;
+
+ if (on_finish) {
+ Context *on_flush_finish = m_on_finish;
+ FunctionContext *ctx = new FunctionContext(
+ [this, on_flush_finish, on_finish](int r) {
+ if (on_flush_finish) {
+ on_flush_finish->complete(r);
+ }
+ on_finish->complete(0);
+ });
+
m_on_finish = ctx;
}
} else {
m_local_replay->process(&it, on_ready, on_commit);
}
-int ImageReplayer::flush()
+void ImageReplayer::flush(Context *on_finish)
{
- // TODO: provide async method
-
dout(20) << "enter" << dendl;
+ bool start_flush = false;
+
{
Mutex::Locker locker(m_lock);
- if (m_state != STATE_REPLAYING) {
- return 0;
+ if (m_state == STATE_REPLAYING) {
+ assert(m_on_finish == nullptr);
+ m_on_finish = on_finish;
+
+ m_state = STATE_FLUSHING_REPLAY;
+
+ start_flush = true;
}
+ }
- m_state = STATE_FLUSHING_REPLAY;
+ if (start_flush) {
+ on_flush_local_replay_flush_start();
+ } else if (on_finish) {
+ on_finish->complete(0);
}
+}
+
+void ImageReplayer::on_flush_local_replay_flush_start()
+{
+ dout(20) << "enter" << dendl;
+
+ FunctionContext *ctx = new FunctionContext(
+ [this](int r) {
+ on_flush_local_replay_flush_finish(r);
+ });
+
+ m_local_replay->flush(ctx);
+}
+
+void ImageReplayer::on_flush_local_replay_flush_finish(int r)
+{
+ dout(20) << "r=" << r << dendl;
- C_SaferCond replay_flush_ctx;
- m_local_replay->flush(&replay_flush_ctx);
- int r = replay_flush_ctx.wait();
if (r < 0) {
derr << "error flushing local replay: " << cpp_strerror(r) << dendl;
}
- C_SaferCond journaler_flush_ctx;
- m_remote_journaler->flush_commit_position(&journaler_flush_ctx);
- int r1 = journaler_flush_ctx.wait();
- if (r1 < 0) {
+ if (on_flush_interrupted()) {
+ return;
+ }
+
+ on_flush_flush_commit_position_start(r);
+}
+
+void ImageReplayer::on_flush_flush_commit_position_start(int last_r)
+{
+
+ FunctionContext *ctx = new FunctionContext(
+ [this, last_r](int r) {
+ on_flush_flush_commit_position_finish(last_r, r);
+ });
+
+ m_remote_journaler->flush_commit_position(ctx);
+}
+
+void ImageReplayer::on_flush_flush_commit_position_finish(int last_r, int r)
+{
+ if (r < 0) {
derr << "error flushing remote journal commit position: "
- << cpp_strerror(r1) << dendl;
+ << cpp_strerror(r) << dendl;
+ } else {
+ r = last_r;
}
+ Context *on_finish(nullptr);
+
{
Mutex::Locker locker(m_lock);
- assert(m_state == STATE_FLUSHING_REPLAY);
+ if (m_state == STATE_STOPPING) {
+ r = -EINTR;
+ } else {
+ assert(m_state == STATE_FLUSHING_REPLAY);
- m_state = STATE_REPLAYING;
+ m_state = STATE_REPLAYING;
+ }
+ std::swap(m_on_finish, on_finish);
+ }
+
+ dout(20) << "flush complete, r=" << r << dendl;
+
+ if (on_finish) {
+ dout(20) << "on finish complete, r=" << r << dendl;
+ on_finish->complete(r);
+ }
+}
+
+bool ImageReplayer::on_flush_interrupted()
+{
+ Context *on_finish(nullptr);
+
+ {
+ Mutex::Locker locker(m_lock);
+
+ if (m_state == STATE_FLUSHING_REPLAY) {
+ return false;
+ }
+
+ assert(m_state == STATE_STOPPING);
+
+ std::swap(m_on_finish, on_finish);
}
- dout(20) << "done" << dendl;
+ dout(20) << "flush interrupted" << dendl;
- return r < 0 ? r : r1;
+ if (on_finish) {
+ int r = -EINTR;
+ dout(20) << "on finish complete, r=" << r << dendl;
+ on_finish->complete(r);
+ }
+
+ return true;
}
void ImageReplayer::handle_replay_process_ready(int r)
void start(Context *on_finish = nullptr,
const BootstrapParams *bootstrap_params = nullptr);
void stop(Context *on_finish = nullptr);
- int flush();
+ void flush(Context *on_finish = nullptr);
virtual void handle_replay_ready();
virtual void handle_replay_process_ready(int r);
* | (sync required) *
* |\-----\ *
* | | *
- * | v *
+ * | v (error) *
* | BOOTSTRAP_IMAGE * * * * * * * * * * *
* | | *
* | v *
* REMOTE_JOURNALER_INIT * * * * * * * * * * *
* | *
* v (error) *
- * LOCAL_IMAGE_OPEN (skip if not *
+ * LOCAL_IMAGE_OPEN (skip if not * * * * * * *
* | needed *
* v (error) *
* WAIT_FOR_LOCAL_JOURNAL_READY * * * * * * * *
* |
- * v
- * <replaying>
- * |
- * v
- * <stopping>
- * |
- * v
- * JOURNAL_REPLAY_SHUT_DOWN
- * |
- * v
+ * v-----------------------------------------------\
+ * <replaying> --------------> <flushing_replay> |
+ * | | |
+ * v v |
+ * <stopping> LOCAL_REPLAY_FLUSH |
+ * | | |
+ * v v |
+ * JOURNAL_REPLAY_SHUT_DOWN FLUSH_COMMIT_POSITION |
+ * | | |
+ * v \-------------------/
* LOCAL_IMAGE_CLOSE
* |
* v
virtual void on_stop_local_image_close_start();
virtual void on_stop_local_image_close_finish(int r);
+ virtual void on_flush_local_replay_flush_start();
+ virtual void on_flush_local_replay_flush_finish(int r);
+ virtual void on_flush_flush_commit_position_start(int last_r);
+ virtual void on_flush_flush_commit_position_finish(int last_r, int r);
+ virtual bool on_flush_interrupted();
+
void close_local_image(Context *on_finish); // for tests
private: