}
template <typename I>
-void ImageReplayer<I>::flush(Context *on_finish)
+void ImageReplayer<I>::flush()
{
dout(10) << dendl;
+ C_SaferCond ctx;
+ flush_local_replay(&ctx);
+ ctx.wait();
- {
- Mutex::Locker locker(m_lock);
- if (m_state == STATE_REPLAYING) {
- Context *ctx = new FunctionContext(
- [on_finish](int r) {
- if (on_finish != nullptr) {
- on_finish->complete(r);
- }
- });
- on_flush_local_replay_flush_start(ctx);
- return;
- }
- }
-
- if (on_finish) {
- on_finish->complete(0);
- }
+ update_mirror_image_status(false, boost::none);
}
template <typename I>
-void ImageReplayer<I>::on_flush_local_replay_flush_start(Context *on_flush)
+void ImageReplayer<I>::flush_local_replay(Context* on_flush)
{
- dout(10) << dendl;
- FunctionContext *ctx = new FunctionContext(
+ m_lock.Lock();
+ if (m_state != STATE_REPLAYING) {
+ m_lock.Unlock();
+ on_flush->complete(0);
+ return;
+ }
+
+ dout(15) << dendl;
+ auto ctx = new FunctionContext(
[this, on_flush](int r) {
- on_flush_local_replay_flush_finish(on_flush, r);
+ handle_flush_local_replay(on_flush, r);
});
-
- ceph_assert(m_lock.is_locked());
- ceph_assert(m_state == STATE_REPLAYING);
m_local_replay->flush(ctx);
+ m_lock.Unlock();
}
template <typename I>
-void ImageReplayer<I>::on_flush_local_replay_flush_finish(Context *on_flush,
- int r)
+void ImageReplayer<I>::handle_flush_local_replay(Context* on_flush, int r)
{
- dout(10) << "r=" << r << dendl;
+ dout(15) << "r=" << r << dendl;
if (r < 0) {
derr << "error flushing local replay: " << cpp_strerror(r) << dendl;
on_flush->complete(r);
return;
}
- on_flush_flush_commit_position_start(on_flush);
+ flush_commit_position(on_flush);
}
template <typename I>
-void ImageReplayer<I>::on_flush_flush_commit_position_start(Context *on_flush)
+void ImageReplayer<I>::flush_commit_position(Context* on_flush)
{
- FunctionContext *ctx = new FunctionContext(
+ m_lock.Lock();
+ if (m_state != STATE_REPLAYING) {
+ m_lock.Unlock();
+ on_flush->complete(0);
+ return;
+ }
+
+ dout(15) << dendl;
+ auto ctx = new FunctionContext(
[this, on_flush](int r) {
- on_flush_flush_commit_position_finish(on_flush, r);
+ handle_flush_commit_position(on_flush, r);
});
-
m_remote_journaler->flush_commit_position(ctx);
+ m_lock.Unlock();
}
template <typename I>
-void ImageReplayer<I>::on_flush_flush_commit_position_finish(Context *on_flush,
- int r)
+void ImageReplayer<I>::handle_flush_commit_position(Context* on_flush, int r)
{
+ dout(15) << "r=" << r << dendl;
if (r < 0) {
derr << "error flushing remote journal commit position: "
<< cpp_strerror(r) << dendl;
}
- update_mirror_image_status(false, boost::none);
-
- dout(20) << "flush complete, r=" << r << dendl;
on_flush->complete(r);
}
void stop(Context *on_finish = nullptr, bool manual = false,
int r = 0, const std::string& desc = "");
void restart(Context *on_finish = nullptr);
- void flush(Context *on_finish = nullptr);
+ void flush();
void resync_image(Context *on_finish=nullptr);
virtual void on_stop_journal_replay(int r = 0, const std::string &desc = "");
- virtual void on_flush_local_replay_flush_start(Context *on_flush);
- virtual void on_flush_local_replay_flush_finish(Context *on_flush, int r);
- virtual void on_flush_flush_commit_position_start(Context *on_flush);
- virtual void on_flush_flush_commit_position_finish(Context *on_flush, int r);
-
bool on_replay_interrupted();
private:
m_state == STATE_REPLAY_FLUSHING);
}
+ void flush_local_replay(Context* on_flush);
+ void handle_flush_local_replay(Context* on_flush, int r);
+
+ void flush_commit_position(Context* on_flush);
+ void handle_flush_commit_position(Context* on_flush, int r);
+
bool update_mirror_image_status(bool force, const OptionalState &state);
bool start_mirror_image_status_update(bool force, bool restarting);
void finish_mirror_image_status_update();