#include "librbd/ExclusiveLock.h"
#include "librbd/ImageCtx.h"
#include "librbd/journal/Replay.h"
-#include "librbd/Utils.h"
#include "cls/journal/cls_journal_types.h"
#include "journal/Journaler.h"
#include "journal/Policy.h"
template <typename I>
bool Journal<I>::is_tag_owner() const {
+ Mutex::Locker locker(m_lock);
+ return is_tag_owner(m_lock);
+}
+
+template <typename I>
+bool Journal<I>::is_tag_owner(const Mutex &) const {
+ assert(m_lock.is_locked());
return (m_tag_data.mirror_uuid == LOCAL_MIRROR_UUID);
}
template <typename I>
uint64_t Journal<I>::get_tag_tid() const {
+ Mutex::Locker locker(m_lock);
return m_tag_tid;
}
template <typename I>
journal::TagData Journal<I>::get_tag_data() const {
+ Mutex::Locker locker(m_lock);
return m_tag_data;
}
ldout(cct, 20) << __func__ << dendl;
Mutex::Locker locker(m_lock);
- assert(m_journaler != nullptr && is_tag_owner());
+ assert(m_journaler != nullptr && is_tag_owner(m_lock));
cls::journal::Client client;
int r = m_journaler->get_cached_client(IMAGE_CLIENT_ID, &client);
predecessor.mirror_uuid = LOCAL_MIRROR_UUID;
{
Mutex::Locker locker(m_lock);
- assert(m_journaler != nullptr && is_tag_owner());
+ assert(m_journaler != nullptr && is_tag_owner(m_lock));
cls::journal::Client client;
int r = m_journaler->get_cached_client(IMAGE_CLIENT_ID, &client);
m_journaler->remove_listener(&m_metadata_listener);
transition_state(STATE_CLOSING, r);
- m_journaler->shut_down(create_async_context_callback(
+
+ Context *ctx = create_async_context_callback(
m_image_ctx, create_context_callback<
- Journal<I>, &Journal<I>::handle_journal_destroyed>(this)));
+ Journal<I>, &Journal<I>::handle_journal_destroyed>(this));
+ ctx = new FunctionContext(
+ [this, ctx](int r) {
+ Mutex::Locker locker(m_lock);
+ m_journaler->shut_down(ctx);
+ });
+ m_async_journal_op_tracker.wait(m_image_ctx, ctx);
}
template <typename I>
return 0;
}
+struct C_RefreshTags : public Context {
+ util::AsyncOpTracker &async_op_tracker;
+ Context *on_finish = nullptr;
+
+ Mutex lock;
+ uint64_t tag_tid;
+ journal::TagData tag_data;
+
+ C_RefreshTags(util::AsyncOpTracker &async_op_tracker)
+ : async_op_tracker(async_op_tracker),
+ lock("librbd::Journal::C_RefreshTags::lock") {
+ async_op_tracker.start_op();
+ }
+ virtual ~C_RefreshTags() {
+ async_op_tracker.finish_op();
+ }
+
+ virtual void finish(int r) {
+ on_finish->complete(r);
+ }
+};
+
template <typename I>
void Journal<I>::handle_metadata_updated() {
CephContext *cct = m_image_ctx.cct;
- ldout(cct, 20) << this << " " << __func__ << dendl;
-
Mutex::Locker locker(m_lock);
+
if (m_state != STATE_READY && !is_journal_replaying(m_lock)) {
return;
+ } else if (is_tag_owner(m_lock)) {
+ ldout(cct, 20) << this << " " << __func__ << ": primary image" << dendl;
+ return;
+ } else if (m_listeners.empty()) {
+ ldout(cct, 20) << this << " " << __func__ << ": no listeners" << dendl;
+ return;
+ }
+
+ uint64_t refresh_sequence = ++m_refresh_sequence;
+ ldout(cct, 20) << this << " " << __func__ << ": "
+ << "refresh_sequence=" << refresh_sequence << dendl;
+
+ // pull the most recent tags from the journal, decode, and
+ // update the internal tag state
+ C_RefreshTags *refresh_ctx = new C_RefreshTags(m_async_journal_op_tracker);
+ refresh_ctx->on_finish = new FunctionContext(
+ [this, refresh_sequence, refresh_ctx](int r) {
+ handle_refresh_metadata(refresh_sequence, refresh_ctx->tag_tid,
+ refresh_ctx->tag_data, r);
+ });
+ C_DecodeTags *decode_tags_ctx = new C_DecodeTags(
+ cct, &refresh_ctx->lock, &refresh_ctx->tag_tid,
+ &refresh_ctx->tag_data, refresh_ctx);
+ m_journaler->get_tags(m_tag_tid == 0 ? 0 : m_tag_tid - 1, m_tag_class,
+ &decode_tags_ctx->tags, decode_tags_ctx);
+}
+
+template <typename I>
+void Journal<I>::handle_refresh_metadata(uint64_t refresh_sequence,
+ uint64_t tag_tid,
+ journal::TagData tag_data, int r) {
+ CephContext *cct = m_image_ctx.cct;
+ Mutex::Locker locker(m_lock);
+
+ if (r < 0) {
+ lderr(cct) << this << " " << __func__ << ": failed to refresh metadata: "
+ << cpp_strerror(r) << dendl;
+ return;
+ } else if (m_state != STATE_READY && !is_journal_replaying(m_lock)) {
+ return;
+ } else if (refresh_sequence != m_refresh_sequence) {
+ // another, more up-to-date refresh is in-flight
+ return;
}
+ ldout(cct, 20) << this << " " << __func__ << ": "
+ << "refresh_sequence=" << refresh_sequence << ", "
+ << "tag_tid=" << tag_tid << ", "
+ << "tag_data=" << tag_data << dendl;
while (m_listener_notify) {
m_listener_cond.Wait(m_lock);
}
+ bool was_tag_owner = is_tag_owner(m_lock);
+ if (m_tag_tid < tag_tid) {
+ m_tag_tid = tag_tid;
+ m_tag_data = tag_data;
+ }
+ bool promoted_to_primary = (!was_tag_owner && is_tag_owner(m_lock));
+
bool resync_requested = false;
- int r = check_resync_requested(&resync_requested);
+ r = check_resync_requested(&resync_requested);
if (r < 0) {
lderr(cct) << this << " " << __func__ << ": "
<< "failed to check if a resync was requested" << dendl;
m_listener_notify = true;
m_lock.Unlock();
- if (resync_requested) {
+ if (promoted_to_primary) {
+ for (auto listener : listeners) {
+ listener->handle_promoted();
+ }
+ } else if (resync_requested) {
for (auto listener : listeners) {
listener->handle_resync();
}
void expect_get_journaler_tags(MockImageCtx &mock_image_ctx,
::journal::MockJournaler &mock_journaler,
- int r) {
+ bool primary, int r) {
journal::TagData tag_data;
+ if (!primary) {
+ tag_data.mirror_uuid = "remote mirror uuid";
+ }
bufferlist tag_data_bl;
::encode(tag_data, tag_data_bl);
.WillOnce(SaveArg<0>(&m_listener));
}
+ void expect_get_journaler_tags(MockImageCtx &mock_image_ctx,
+ ::journal::MockJournaler &mock_journaler,
+ uint64_t start_after_tag_tid,
+ ::journal::Journaler::Tags &&tags, int r) {
+ EXPECT_CALL(mock_journaler, get_tags(start_after_tag_tid, 0, _, _))
+ .WillOnce(DoAll(SetArgPointee<2>(tags),
+ WithArg<3>(CompleteContext(r, mock_image_ctx.image_ctx->op_work_queue))));
+ }
+
void expect_start_replay(MockJournalImageCtx &mock_image_ctx,
::journal::MockJournaler &mock_journaler,
const ReplayAction &action) {
void open_journal(MockJournalImageCtx &mock_image_ctx,
MockJournal &mock_journal,
- ::journal::MockJournaler &mock_journaler) {
+ ::journal::MockJournaler &mock_journaler,
+ bool primary = true) {
expect_op_work_queue(mock_image_ctx);
InSequence seq;
expect_init_journaler(mock_journaler, 0);
expect_get_max_append_size(mock_journaler, 1 << 16);
expect_get_journaler_cached_client(mock_journaler, 0);
- expect_get_journaler_tags(mock_image_ctx, mock_journaler, 0);
+ expect_get_journaler_tags(mock_image_ctx, mock_journaler, primary, 0);
expect_start_replay(
mock_image_ctx, mock_journaler,
std::bind(&invoke_replay_complete, _1, 0));
expect_init_journaler(mock_journaler, 0);
expect_get_max_append_size(mock_journaler, 1 << 16);
expect_get_journaler_cached_client(mock_journaler, 0);
- expect_get_journaler_tags(mock_image_ctx, mock_journaler, 0);
+ expect_get_journaler_tags(mock_image_ctx, mock_journaler, true, 0);
expect_start_replay(
mock_image_ctx, mock_journaler,
std::bind(&invoke_replay_ready, _1));
expect_init_journaler(mock_journaler, 0);
expect_get_max_append_size(mock_journaler, 1 << 16);
expect_get_journaler_cached_client(mock_journaler, 0);
- expect_get_journaler_tags(mock_image_ctx, mock_journaler, -EBADMSG);
+ expect_get_journaler_tags(mock_image_ctx, mock_journaler, true, -EBADMSG);
expect_shut_down_journaler(mock_journaler);
ASSERT_EQ(-EBADMSG, when_open(mock_journal));
}
expect_init_journaler(mock_journaler, 0);
expect_get_max_append_size(mock_journaler, 1 << 16);
expect_get_journaler_cached_client(mock_journaler, 0);
- expect_get_journaler_tags(mock_image_ctx, mock_journaler, 0);
+ expect_get_journaler_tags(mock_image_ctx, mock_journaler, true, 0);
expect_start_replay(
mock_image_ctx, mock_journaler,
std::bind(&invoke_replay_complete, _1, -EINVAL));
expect_init_journaler(mock_journaler, 0);
expect_get_max_append_size(mock_journaler, 1 << 16);
expect_get_journaler_cached_client(mock_journaler, 0);
- expect_get_journaler_tags(mock_image_ctx, mock_journaler, 0);
+ expect_get_journaler_tags(mock_image_ctx, mock_journaler, true, 0);
expect_start_replay(
mock_image_ctx, mock_journaler,
std::bind(&invoke_replay_complete, _1, 0));
expect_init_journaler(mock_journaler, 0);
expect_get_max_append_size(mock_journaler, 1 << 16);
expect_get_journaler_cached_client(mock_journaler, 0);
- expect_get_journaler_tags(mock_image_ctx, mock_journaler, 0);
+ expect_get_journaler_tags(mock_image_ctx, mock_journaler, true, 0);
expect_start_replay(
mock_image_ctx, mock_journaler,
std::bind(&invoke_replay_ready, _1));
expect_init_journaler(mock_journaler, 0);
expect_get_max_append_size(mock_journaler, 1 << 16);
expect_get_journaler_cached_client(mock_journaler, 0);
- expect_get_journaler_tags(mock_image_ctx, mock_journaler, 0);
+ expect_get_journaler_tags(mock_image_ctx, mock_journaler, true, 0);
expect_start_replay(
mock_image_ctx, mock_journaler,
std::bind(&invoke_replay_complete, _1, 0));
expect_init_journaler(mock_journaler, 0);
expect_get_max_append_size(mock_journaler, 1 << 16);
expect_get_journaler_cached_client(mock_journaler, 0);
- expect_get_journaler_tags(mock_image_ctx, mock_journaler, 0);
+ expect_get_journaler_tags(mock_image_ctx, mock_journaler, true, 0);
expect_start_replay(
mock_image_ctx, mock_journaler,
std::bind(&invoke_replay_ready, _1));
expect_init_journaler(mock_journaler, 0);
expect_get_max_append_size(mock_journaler, 1 << 16);
expect_get_journaler_cached_client(mock_journaler, 0);
- expect_get_journaler_tags(mock_image_ctx, mock_journaler, 0);
+ expect_get_journaler_tags(mock_image_ctx, mock_journaler, true, 0);
expect_start_replay(
mock_image_ctx, mock_journaler,
std::bind(&invoke_replay_complete, _1, 0));
expect_init_journaler(mock_journaler, 0);
expect_get_max_append_size(mock_journaler, 1 << 16);
expect_get_journaler_cached_client(mock_journaler, 0);
- expect_get_journaler_tags(mock_image_ctx, mock_journaler, 0);
+ expect_get_journaler_tags(mock_image_ctx, mock_journaler, true, 0);
expect_start_replay(
mock_image_ctx, mock_journaler,
std::bind(&invoke_replay_complete, _1, 0));
expect_init_journaler(mock_journaler, 0);
expect_get_max_append_size(mock_journaler, 1 << 16);
expect_get_journaler_cached_client(mock_journaler, 0);
- expect_get_journaler_tags(mock_image_ctx, mock_journaler, 0);
+ expect_get_journaler_tags(mock_image_ctx, mock_journaler, true, 0);
expect_start_replay(
mock_image_ctx, mock_journaler,
expect_init_journaler(mock_journaler, 0);
expect_get_max_append_size(mock_journaler, 1 << 16);
expect_get_journaler_cached_client(mock_journaler, 0);
- expect_get_journaler_tags(mock_image_ctx, mock_journaler, 0);
+ expect_get_journaler_tags(mock_image_ctx, mock_journaler, true, 0);
expect_start_replay(
mock_image_ctx, mock_journaler, {
std::bind(&invoke_replay_complete, _1, 0)
expect_init_journaler(mock_journaler, 0);
expect_get_max_append_size(mock_journaler, 1 << 16);
expect_get_journaler_cached_client(mock_journaler, 0);
- expect_get_journaler_tags(mock_image_ctx, mock_journaler, 0);
+ expect_get_journaler_tags(mock_image_ctx, mock_journaler, true, 0);
expect_start_replay(
mock_image_ctx, mock_journaler,
std::bind(&invoke_replay_ready, _1));
expect_init_journaler(mock_journaler, 0);
expect_get_max_append_size(mock_journaler, 1 << 16);
expect_get_journaler_cached_client(mock_journaler, 0);
- expect_get_journaler_tags(mock_image_ctx, mock_journaler, 0);
+ expect_get_journaler_tags(mock_image_ctx, mock_journaler, true, 0);
expect_start_replay(
mock_image_ctx, mock_journaler,
std::bind(&invoke_replay_complete, _1, 0));
MockJournalImageCtx mock_image_ctx(*ictx);
MockJournal mock_journal(mock_image_ctx);
::journal::MockJournaler mock_journaler;
- open_journal(mock_image_ctx, mock_journal, mock_journaler);
+ open_journal(mock_image_ctx, mock_journal, mock_journaler, false);
struct Listener : public journal::Listener {
C_SaferCond ctx;
};
InSequence seq;
+
+ journal::TagData tag_data;
+ tag_data.mirror_uuid == Journal<>::LOCAL_MIRROR_UUID;
+
+ bufferlist tag_data_bl;
+ ::encode(tag_data, tag_data_bl);
+ expect_get_journaler_tags(mock_image_ctx, mock_journaler, 0,
+ {{0, 0, tag_data_bl}}, 0);
+
journal::ImageClientMeta image_client_meta;
image_client_meta.tag_class = 0;
image_client_meta.resync_requested = true;
ASSERT_EQ(0, listener.ctx.wait());
}
+TEST_F(TestMockJournal, ForcePromoted) {
+ REQUIRE_FEATURE(RBD_FEATURE_JOURNALING);
+
+ librbd::ImageCtx *ictx;
+ ASSERT_EQ(0, open_image(m_image_name, &ictx));
+
+ MockJournalImageCtx mock_image_ctx(*ictx);
+ MockJournal mock_journal(mock_image_ctx);
+ ::journal::MockJournaler mock_journaler;
+ open_journal(mock_image_ctx, mock_journal, mock_journaler, false);
+
+ struct Listener : public journal::Listener {
+ C_SaferCond ctx;
+ virtual void handle_close() {
+ ADD_FAILURE() << "unexpected close action";
+ }
+ virtual void handle_resync() {
+ ADD_FAILURE() << "unexpected resync event";
+ }
+ virtual void handle_promoted() {
+ ctx.complete(0);
+ }
+ } listener;
+ mock_journal.add_listener(&listener);
+
+ BOOST_SCOPE_EXIT_ALL(&) {
+ mock_journal.remove_listener(&listener);
+ close_journal(mock_journal, mock_journaler);
+ };
+
+ InSequence seq;
+
+ journal::TagData tag_data;
+ tag_data.mirror_uuid == Journal<>::LOCAL_MIRROR_UUID;
+
+ bufferlist tag_data_bl;
+ ::encode(tag_data, tag_data_bl);
+ expect_get_journaler_tags(mock_image_ctx, mock_journaler, 0,
+ {{100, 0, tag_data_bl}}, 0);
+
+ journal::ImageClientMeta image_client_meta;
+ image_client_meta.tag_class = 0;
+ expect_get_journaler_cached_client(mock_journaler, image_client_meta, 0);
+ expect_shut_down_journaler(mock_journaler);
+
+ m_listener->handle_update(nullptr);
+ ASSERT_EQ(0, listener.ctx.wait());
+}
+
} // namespace librbd