assert(m_state == STATE_UNINITIALIZED || m_state == STATE_CLOSED);
assert(m_journaler == NULL);
assert(m_journal_replay == NULL);
- assert(m_on_replay_close_request == nullptr);
assert(m_wait_for_state_contexts.empty());
}
template <typename I>
bool Journal<I>::is_journal_replaying() const {
Mutex::Locker locker(m_lock);
+ return is_journal_replaying(m_lock);
+}
+
+template <typename I>
+bool Journal<I>::is_journal_replaying(const Mutex &) const {
+ assert(m_lock.is_locked());
return (m_state == STATE_REPLAYING ||
m_state == STATE_FLUSHING_REPLAY ||
m_state == STATE_FLUSHING_RESTART ||
on_finish = create_async_context_callback(m_image_ctx, on_finish);
Mutex::Locker locker(m_lock);
+ while (m_listener_notify) {
+ m_listener_cond.Wait(m_lock);
+ }
+
+ Listeners listeners(m_listeners);
+ m_listener_notify = true;
+ m_lock.Unlock();
+ for (auto listener : listeners) {
+ listener->handle_close();
+ }
+
+ m_lock.Lock();
+ m_listener_notify = false;
+ m_listener_cond.Signal();
+
assert(m_state != STATE_UNINITIALIZED);
if (m_state == STATE_CLOSED) {
on_finish->complete(m_error_result);
stop_recording();
}
- // interrupt external replay if active
- if (m_on_replay_close_request != nullptr) {
- m_on_replay_close_request->complete(0);
- m_on_replay_close_request = nullptr;
- }
-
m_close_pending = true;
wait_for_steady_state(on_finish);
}
template <typename I>
void Journal<I>::start_external_replay(journal::Replay<I> **journal_replay,
- Context *on_start,
- Context *on_close_request) {
+ Context *on_start) {
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << this << " " << __func__ << dendl;
Mutex::Locker locker(m_lock);
assert(m_state == STATE_READY);
assert(m_journal_replay == nullptr);
- assert(m_on_replay_close_request == nullptr);
- m_on_replay_close_request = on_close_request;
on_start = util::create_async_context_callback(m_image_ctx, on_start);
on_start = new FunctionContext(
<< "failed to stop recording: " << cpp_strerror(r) << dendl;
*journal_replay = nullptr;
- if (m_on_replay_close_request != nullptr) {
- m_on_replay_close_request->complete(r);
- m_on_replay_close_request = nullptr;
- }
-
// get back to a sane-state
start_append();
on_finish->complete(r);
template <typename I>
void Journal<I>::stop_external_replay() {
+ CephContext *cct = m_image_ctx.cct;
+ ldout(cct, 20) << this << " " << __func__ << dendl;
+
Mutex::Locker locker(m_lock);
assert(m_journal_replay != nullptr);
assert(m_state == STATE_REPLAYING);
- if (m_on_replay_close_request != nullptr) {
- m_on_replay_close_request->complete(-ECANCELED);
- m_on_replay_close_request = nullptr;
- }
-
delete m_journal_replay;
m_journal_replay = nullptr;
}
template <typename I>
-int Journal<I>::check_resync_requested(bool *do_resync) {
+int Journal<I>::is_resync_requested(bool *do_resync) {
Mutex::Locker l(m_lock);
- return check_resync_requested_internal(do_resync);
+ return check_resync_requested(do_resync);
}
template <typename I>
-int Journal<I>::check_resync_requested_internal(bool *do_resync) {
+int Journal<I>::check_resync_requested(bool *do_resync) {
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << this << " " << __func__ << dendl;
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << this << " " << __func__ << dendl;
- std::list<journal::ResyncListener *> resync_private_list;
+ Mutex::Locker locker(m_lock);
+ if (m_state != STATE_READY && !is_journal_replaying(m_lock)) {
+ return;
+ }
- {
- Mutex::Locker l(m_lock);
+ while (m_listener_notify) {
+ m_listener_cond.Wait(m_lock);
+ }
- if (m_state == STATE_CLOSING || m_state == STATE_CLOSED ||
- m_state == STATE_UNINITIALIZED || m_state == STATE_STOPPING) {
- return;
- }
+ bool resync_requested = false;
+ int r = check_resync_requested(&resync_requested);
+ if (r < 0) {
+ lderr(cct) << this << " " << __func__ << ": "
+ << "failed to check if a resync was requested" << dendl;
+ return;
+ }
- bool do_resync = false;
- int r = check_resync_requested_internal(&do_resync);
- if (r < 0) {
- lderr(cct) << this << " " << __func__ << ": "
- << "failed to check if a resync was requested" << dendl;
- return;
- }
+ Listeners listeners(m_listeners);
+ m_listener_notify = true;
+ m_lock.Unlock();
- if (do_resync) {
- for (const auto& listener :
- m_listener_map[journal::ListenerType::RESYNC]) {
- journal::ResyncListener *rsync_listener =
- boost::get<journal::ResyncListener *>(listener);
- resync_private_list.push_back(rsync_listener);
- }
+ if (resync_requested) {
+ for (auto listener : listeners) {
+ listener->handle_resync();
}
}
- for (const auto& listener : resync_private_list) {
- listener->handle_resync();
- }
+ m_lock.Lock();
+ m_listener_notify = false;
+ m_listener_cond.Signal();
}
template <typename I>
-void Journal<I>::add_listener(journal::ListenerType type,
- journal::JournalListenerPtr listener) {
- Mutex::Locker l(m_lock);
- m_listener_map[type].push_back(listener);
+void Journal<I>::add_listener(journal::Listener *listener) {
+ Mutex::Locker locker(m_lock);
+ m_listeners.insert(listener);
}
template <typename I>
-void Journal<I>::remove_listener(journal::ListenerType type,
- journal::JournalListenerPtr listener) {
- Mutex::Locker l(m_lock);
- m_listener_map[type].remove(listener);
+void Journal<I>::remove_listener(journal::Listener *listener) {
+ Mutex::Locker locker(m_lock);
+ while (m_listener_notify) {
+ m_listener_cond.Wait(m_lock);
+ }
+ m_listeners.erase(listener);
}
} // namespace librbd
#include "include/atomic.h"
#include "include/Context.h"
#include "include/interval_set.h"
+#include "common/Cond.h"
#include "common/Mutex.h"
#include "journal/Future.h"
#include "journal/JournalMetadataListener.h"
}
void start_external_replay(journal::Replay<ImageCtxT> **journal_replay,
- Context *on_start, Context *on_close_request);
+ Context *on_start);
void stop_external_replay();
- void add_listener(journal::ListenerType type,
- journal::JournalListenerPtr listener);
- void remove_listener(journal::ListenerType type,
- journal::JournalListenerPtr listener);
+ void add_listener(journal::Listener *listener);
+ void remove_listener(journal::Listener *listener);
- int check_resync_requested(bool *do_resync);
+ int is_resync_requested(bool *do_resync);
private:
ImageCtxT &m_image_ctx;
bool m_blocking_writes;
journal::Replay<ImageCtxT> *m_journal_replay;
- Context *m_on_replay_close_request = nullptr;
struct MetadataListener : public ::journal::JournalMetadataListener {
Journal<ImageCtxT> *journal;
}
} m_metadata_listener;
- typedef std::map<journal::ListenerType,
- std::list<journal::JournalListenerPtr> > ListenerMap;
- ListenerMap m_listener_map;
+ typedef std::set<journal::Listener *> Listeners;
+ Listeners m_listeners;
+ Cond m_listener_cond;
+ bool m_listener_notify = false;
+
+ bool is_journal_replaying(const Mutex &) const;
uint64_t append_io_events(journal::EventType event_type,
const Bufferlists &bufferlists,
bool is_steady_state() const;
void wait_for_steady_state(Context *on_state);
- int check_resync_requested_internal(bool *do_resync);
+ int check_resync_requested(bool *do_resync);
void handle_metadata_updated();
};
std::ostream &operator<<(std::ostream &out, const TagPredecessor &predecessor);
std::ostream &operator<<(std::ostream &out, const TagData &tag_data);
-enum class ListenerType : int8_t {
- RESYNC
-};
+struct Listener {
+ virtual ~Listener() {
+ }
-struct ResyncListener {
- virtual ~ResyncListener() {}
- virtual void handle_resync() = 0;
-};
+ /// invoked when journal close is requested
+ virtual void handle_close() = 0;
-typedef boost::variant<ResyncListener *> JournalListenerPtr;
+ /// invoked when journal is promoted to primary
+ virtual void handle_promoted() = 0;
+ /// invoked when journal resync is requested
+ virtual void handle_resync() = 0;
+};
} // namespace journal
} // namespace librbd
MOCK_METHOD3(commit_op_event, void(uint64_t, int, Context *));
MOCK_METHOD2(replay_op_ready, void(uint64_t, Context *));
- MOCK_METHOD2(add_listener, void(journal::ListenerType,
- journal::JournalListenerPtr));
- MOCK_METHOD2(remove_listener, void(journal::ListenerType,
- journal::JournalListenerPtr));
+ MOCK_METHOD1(add_listener, void(journal::Listener *));
+ MOCK_METHOD1(remove_listener, void(journal::Listener *));
- MOCK_METHOD1(check_resync_requested, int(bool *));
+ MOCK_METHOD1(is_resync_requested, int(bool *));
};
} // namespace librbd
}
void expect_get_journaler_cached_client(::journal::MockJournaler &mock_journaler, int r) {
-
journal::ImageClientMeta image_client_meta;
image_client_meta.tag_class = 0;
+ expect_get_journaler_cached_client(mock_journaler, image_client_meta, r);
+ }
+ void expect_get_journaler_cached_client(::journal::MockJournaler &mock_journaler,
+ const journal::ImageClientMeta &client_meta,
+ int r) {
journal::ClientData client_data;
- client_data.client_meta = image_client_meta;
+ client_data.client_meta = client_meta;
cls::journal::Client client;
::encode(client_data, client.data);
EXPECT_CALL(mock_journaler, get_tags(0, _, _))
.WillOnce(DoAll(SetArgPointee<1>(tags),
WithArg<2>(CompleteContext(r, mock_image_ctx.image_ctx->op_work_queue))));
- EXPECT_CALL(mock_journaler, add_listener(_));
+ EXPECT_CALL(mock_journaler, add_listener(_))
+ .WillOnce(SaveArg<0>(&m_listener));
}
void expect_start_replay(MockJournalImageCtx &mock_image_ctx,
}
::journal::ReplayHandler *m_replay_handler = nullptr;
+ ::journal::JournalMetadataListener *m_listener = nullptr;
};
TEST_F(TestMockJournal, StateTransitions) {
expect_shut_down_journaler(mock_journaler);
C_SaferCond start_ctx;
- C_SaferCond close_request_ctx;
journal::Replay<MockJournalImageCtx> *journal_replay = nullptr;
- mock_journal.start_external_replay(&journal_replay, &start_ctx,
- &close_request_ctx);
+ mock_journal.start_external_replay(&journal_replay, &start_ctx);
ASSERT_EQ(0, start_ctx.wait());
mock_journal.stop_external_replay();
- ASSERT_EQ(-ECANCELED, close_request_ctx.wait());
}
TEST_F(TestMockJournal, ExternalReplayFailure) {
expect_shut_down_journaler(mock_journaler);
C_SaferCond start_ctx;
- C_SaferCond close_request_ctx;
journal::Replay<MockJournalImageCtx> *journal_replay = nullptr;
- mock_journal.start_external_replay(&journal_replay, &start_ctx,
- &close_request_ctx);
+ mock_journal.start_external_replay(&journal_replay, &start_ctx);
ASSERT_EQ(-EINVAL, start_ctx.wait());
- ASSERT_EQ(-EINVAL, close_request_ctx.wait());
}
-TEST_F(TestMockJournal, ExternalReplayCloseRequest) {
+TEST_F(TestMockJournal, AppendDisabled) {
REQUIRE_FEATURE(RBD_FEATURE_JOURNALING);
librbd::ImageCtx *ictx;
MockJournalImageCtx mock_image_ctx(*ictx);
MockJournal mock_journal(mock_image_ctx);
+ MockJournalPolicy mock_journal_policy;
+
::journal::MockJournaler mock_journaler;
open_journal(mock_image_ctx, mock_journal, mock_journaler);
+ BOOST_SCOPE_EXIT_ALL(&) {
+ close_journal(mock_journal, mock_journaler);
+ };
InSequence seq;
- expect_stop_append(mock_journaler, 0);
+ RWLock::RLocker snap_locker(mock_image_ctx.snap_lock);
+ EXPECT_CALL(mock_image_ctx, get_journal_policy()).WillOnce(
+ Return(ictx->get_journal_policy()));
+ ASSERT_TRUE(mock_journal.is_journal_appending());
+
+ EXPECT_CALL(mock_image_ctx, get_journal_policy()).WillOnce(
+ Return(&mock_journal_policy));
+ EXPECT_CALL(mock_journal_policy, append_disabled()).WillOnce(Return(true));
+ ASSERT_FALSE(mock_journal.is_journal_appending());
+
expect_shut_down_journaler(mock_journaler);
+}
- C_SaferCond start_ctx;
- C_SaferCond close_request_ctx;
+TEST_F(TestMockJournal, CloseListenerEvent) {
+ REQUIRE_FEATURE(RBD_FEATURE_JOURNALING);
- journal::Replay<MockJournalImageCtx> *journal_replay = nullptr;
- mock_journal.start_external_replay(&journal_replay, &start_ctx,
- &close_request_ctx);
- ASSERT_EQ(0, start_ctx.wait());
+ librbd::ImageCtx *ictx;
+ ASSERT_EQ(0, open_image(m_image_name, &ictx));
- C_SaferCond close_ctx;
- mock_journal.close(&close_ctx);
+ MockJournalImageCtx mock_image_ctx(*ictx);
+ MockJournal mock_journal(mock_image_ctx);
+ ::journal::MockJournaler mock_journaler;
+ open_journal(mock_image_ctx, mock_journal, mock_journaler);
- ASSERT_EQ(0, close_request_ctx.wait());
- mock_journal.stop_external_replay();
+ struct Listener : public journal::Listener {
+ C_SaferCond ctx;
+ virtual void handle_close() {
+ ctx.complete(0);
+ }
+ virtual void handle_resync() {
+ ADD_FAILURE() << "unexpected resync request";
+ }
+ virtual void handle_promoted() {
+ ADD_FAILURE() << "unexpected promotion event";
+ }
+ } listener;
+ mock_journal.add_listener(&listener);
+
+ expect_shut_down_journaler(mock_journaler);
+ close_journal(mock_journal, mock_journaler);
- ASSERT_EQ(0, close_ctx.wait());
+ ASSERT_EQ(0, listener.ctx.wait());
+ mock_journal.remove_listener(&listener);
}
-TEST_F(TestMockJournal, AppendDisabled) {
+TEST_F(TestMockJournal, ResyncRequested) {
REQUIRE_FEATURE(RBD_FEATURE_JOURNALING);
librbd::ImageCtx *ictx;
MockJournalImageCtx mock_image_ctx(*ictx);
MockJournal mock_journal(mock_image_ctx);
- MockJournalPolicy mock_journal_policy;
-
::journal::MockJournaler mock_journaler;
open_journal(mock_image_ctx, mock_journal, mock_journaler);
+
+ struct Listener : public journal::Listener {
+ C_SaferCond ctx;
+ virtual void handle_close() {
+ ADD_FAILURE() << "unexpected close action";
+ }
+ virtual void handle_resync() {
+ ctx.complete(0);
+ }
+ virtual void handle_promoted() {
+ ADD_FAILURE() << "unexpected promotion event";
+ }
+ } listener;
+ mock_journal.add_listener(&listener);
+
BOOST_SCOPE_EXIT_ALL(&) {
+ mock_journal.remove_listener(&listener);
close_journal(mock_journal, mock_journaler);
};
InSequence seq;
- RWLock::RLocker snap_locker(mock_image_ctx.snap_lock);
- EXPECT_CALL(mock_image_ctx, get_journal_policy()).WillOnce(
- Return(ictx->get_journal_policy()));
- ASSERT_TRUE(mock_journal.is_journal_appending());
-
- EXPECT_CALL(mock_image_ctx, get_journal_policy()).WillOnce(
- Return(&mock_journal_policy));
- EXPECT_CALL(mock_journal_policy, append_disabled()).WillOnce(Return(true));
- ASSERT_FALSE(mock_journal.is_journal_appending());
-
+ journal::ImageClientMeta image_client_meta;
+ image_client_meta.tag_class = 0;
+ image_client_meta.resync_requested = true;
+ expect_get_journaler_cached_client(mock_journaler, image_client_meta, 0);
expect_shut_down_journaler(mock_journaler);
+
+ m_listener->handle_update(nullptr);
+ ASSERT_EQ(0, listener.ctx.wait());
}
} // namespace librbd
Commands commands;
};
-template <typename I>
-struct ResyncListener : public librbd::journal::ResyncListener {
- ImageReplayer<I> *img_replayer;
-
- ResyncListener(ImageReplayer<I> *img_replayer)
- : img_replayer(img_replayer) {
- }
-
- virtual void handle_resync() {
- img_replayer->resync_image();
- }
-};
-
} // anonymous namespace
template <typename I>
m_lock("rbd::mirror::ImageReplayer " + stringify(remote_pool_id) + " " +
remote_image_id),
m_progress_cxt(this),
- m_resync_listener(new ResyncListener<I>(this)),
+ m_journal_listener(new JournalListener(this)),
m_remote_listener(this)
{
// Register asok commands using a temporary "remote_pool_name/global_image_id"
assert(m_bootstrap_request == nullptr);
assert(m_in_flight_status_updates == 0);
- delete m_resync_listener;
+ delete m_journal_listener;
delete m_asok_hook;
}
template <typename I>
void ImageReplayer<I>::handle_bootstrap(int r) {
dout(20) << "r=" << r << dendl;
-
{
Mutex::Locker locker(m_lock);
m_bootstrap_request->put();
return;
}
+
+ assert(m_local_journal == nullptr);
{
- Mutex::Locker locker(m_lock);
+ RWLock::RLocker snap_locker(m_local_image_ctx->snap_lock);
+ if (m_local_image_ctx->journal != nullptr) {
+ m_local_journal = m_local_image_ctx->journal;
+ m_local_journal->add_listener(m_journal_listener);
+ }
+ }
- m_local_image_ctx->journal->add_listener(
- librbd::journal::ListenerType::RESYNC,
- m_resync_listener);
+ if (m_local_journal == nullptr) {
+ on_start_fail(-EINVAL, "error accessing local journal");
+ return;
+ }
+ {
+ Mutex::Locker locker(m_lock);
bool do_resync = false;
- r = m_local_image_ctx->journal->check_resync_requested(&do_resync);
+ r = m_local_image_ctx->journal->is_resync_requested(&do_resync);
if (r < 0) {
derr << "failed to check if a resync was requested" << dendl;
}
}
std::string name = m_local_ioctx.get_pool_name() + "/" +
- m_local_image_ctx->name;
+ m_local_image_ctx->name;
if (m_name != name) {
m_name = name;
if (m_asok_hook) {
void ImageReplayer<I>::start_replay() {
dout(20) << dendl;
- assert(m_local_journal == nullptr);
- {
- RWLock::RLocker snap_locker(m_local_image_ctx->snap_lock);
- if (m_local_image_ctx->journal != nullptr) {
- m_local_journal = m_local_image_ctx->journal;
-
- Context *start_ctx = create_context_callback<
- ImageReplayer, &ImageReplayer<I>::handle_start_replay>(this);
- Context *stop_ctx = create_context_callback<
- ImageReplayer, &ImageReplayer<I>::handle_stop_replay_request>(this);
- m_local_journal->start_external_replay(&m_local_replay, start_ctx,
- stop_ctx);
- return;
- }
- }
-
- on_start_fail(-EINVAL, "error starting journal replay");
+ Context *start_ctx = create_context_callback<
+ ImageReplayer, &ImageReplayer<I>::handle_start_replay>(this);
+ m_local_journal->start_external_replay(&m_local_replay, start_ctx);
}
template <typename I>
dout(20) << "r=" << r << dendl;
if (r < 0) {
- m_local_journal = nullptr;
+ assert(m_local_replay == nullptr);
derr << "error starting external replay on local image "
<< m_local_image_id << ": " << cpp_strerror(r) << dendl;
on_start_fail(r, "error starting replay on local image");
}
-template <typename I>
-void ImageReplayer<I>::handle_stop_replay_request(int r) {
- if (r < 0) {
- // error starting or we requested the stop -- ignore
- return;
- }
-
- // journal close has been requested, stop replay so the journal
- // can be closed (since it will wait on replay to finish)
- dout(20) << dendl;
- on_stop_journal_replay();
-}
-
template <typename I>
void ImageReplayer<I>::on_start_fail(int r, const std::string &desc)
{
return;
}
- Context *stop_ctx = create_context_callback<
- ImageReplayer, &ImageReplayer<I>::handle_stop_replay_request>(this);
- m_local_journal->start_external_replay(&m_local_replay, ctx, stop_ctx);
+ m_local_journal->start_external_replay(&m_local_replay, ctx);
});
m_local_replay->shut_down(false, ctx);
}
});
}
}
- if (m_local_replay != nullptr) {
+ if (m_local_journal != nullptr) {
+ ctx = new FunctionContext([this, ctx](int r) {
+ m_local_journal = nullptr;
+ ctx->complete(0);
+ });
+ if (m_local_replay != nullptr) {
+ ctx = new FunctionContext([this, ctx](int r) {
+ m_local_journal->stop_external_replay();
+ m_local_replay = nullptr;
+
+ delete m_event_preprocessor;
+ m_event_preprocessor = nullptr;
+ ctx->complete(0);
+ });
+ }
ctx = new FunctionContext([this, ctx](int r) {
if (r < 0) {
derr << "error flushing journal replay: " << cpp_strerror(r) << dendl;
}
- m_local_journal->stop_external_replay();
- m_local_journal = nullptr;
- m_local_replay = nullptr;
-
- delete m_event_preprocessor;
- m_event_preprocessor = nullptr;
+ // blocks if listener notification is in-progress
+ m_local_journal->remove_listener(m_journal_listener);
ctx->complete(0);
});
- ctx = new FunctionContext([this, ctx](int r) {
- m_local_journal->remove_listener(
- librbd::journal::ListenerType::RESYNC, m_resync_listener);
- m_local_replay->shut_down(true, ctx);
- });
+ if (m_local_replay != nullptr) {
+ ctx = new FunctionContext([this, ctx](int r) {
+ m_local_replay->shut_down(true, ctx);
+ });
+ }
}
if (m_replay_handler != nullptr) {
ctx = new FunctionContext([this, ctx](int r) {
typedef typename librbd::journal::TypeTraits<ImageCtxT>::Journaler Journaler;
typedef boost::optional<State> OptionalState;
+ struct JournalListener : public librbd::journal::Listener {
+ ImageReplayer *img_replayer;
+
+ JournalListener(ImageReplayer *img_replayer)
+ : img_replayer(img_replayer) {
+ }
+
+ virtual void handle_close() {
+ img_replayer->on_stop_journal_replay();
+ }
+
+ virtual void handle_promoted() {
+ // TODO
+ }
+
+ virtual void handle_resync() {
+ img_replayer->resync_image();
+ }
+ };
+
class BootstrapProgressContext : public ProgressContext {
public:
BootstrapProgressContext(ImageReplayer<ImageCtxT> *replayer) :
librbd::journal::Replay<ImageCtxT> *m_local_replay = nullptr;
Journaler* m_remote_journaler = nullptr;
::journal::ReplayHandler *m_replay_handler = nullptr;
- librbd::journal::ResyncListener *m_resync_listener;
+ librbd::journal::Listener *m_journal_listener;
bool m_stopping_for_resync = false;
Context *m_on_start_finish = nullptr;
void start_replay();
void handle_start_replay(int r);
- void handle_stop_replay_request(int r);
void replay_flush();
void handle_replay_flush(int r);