journal_policy = policy;
}
+ bool ImageCtx::is_writeback_cache_enabled() const {
+ return (cache && cache_max_dirty > 0);
+ }
+
void ImageCtx::get_thread_pool_instance(CephContext *cct,
ThreadPool **thread_pool,
ContextWQ **op_work_queue) {
journal::Policy *get_journal_policy() const;
void set_journal_policy(journal::Policy *policy);
+ bool is_writeback_cache_enabled() const;
+
static void get_thread_pool_instance(CephContext *cct,
ThreadPool **thread_pool,
ContextWQ **op_work_queue);
template <typename I>
void Replay<I>::handle_aio_modify_complete(Context *on_ready, Context *on_safe,
- int r, std::set<int> &filters) {
+ int r, std::set<int> &filters,
+ bool writeback_cache_enabled) {
Mutex::Locker locker(m_lock);
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << ": on_ready=" << on_ready << ", "
return;
}
- // will be completed after next flush operation completes
- m_aio_modify_safe_contexts.insert(on_safe);
+ if (writeback_cache_enabled) {
+ // will be completed after next flush operation completes
+ m_aio_modify_safe_contexts.insert(on_safe);
+ } else {
+ // IO is safely stored on disk
+ assert(m_in_flight_aio_modify > 0);
+ --m_in_flight_aio_modify;
+
+ if (m_on_aio_ready != nullptr) {
+ ldout(cct, 10) << ": resuming paused AIO" << dendl;
+ m_on_aio_ready->complete(0);
+ m_on_aio_ready = nullptr;
+ }
+
+ ldout(cct, 20) << ": completing safe context: " << on_safe << dendl;
+ m_image_ctx.op_work_queue->queue(on_safe, 0);
+ }
}
template <typename I>
}
++m_in_flight_aio_modify;
- m_aio_modify_unsafe_contexts.push_back(on_safe);
+
+ bool writeback_cache_enabled = m_image_ctx.is_writeback_cache_enabled();
+ if (writeback_cache_enabled) {
+ m_aio_modify_unsafe_contexts.push_back(on_safe);
+ }
// FLUSH if we hit the low-water mark -- on_safe contexts are
// completed by flushes-only so that we don't move the journal
// commit position until safely on-disk
- *flush_required = (m_aio_modify_unsafe_contexts.size() ==
+ *flush_required = (writeback_cache_enabled &&
+ m_aio_modify_unsafe_contexts.size() ==
IN_FLIGHT_IO_LOW_WATER_MARK);
if (*flush_required) {
ldout(cct, 10) << ": hit AIO replay low-water mark: scheduling flush"
// event. when flushed, the completion of the next flush will fire the
// on_safe callback
auto aio_comp = io::AioCompletion::create_and_start<Context>(
- new C_AioModifyComplete(this, on_ready, on_safe, std::move(filters)),
+ new C_AioModifyComplete(this, on_ready, on_safe, std::move(filters),
+ writeback_cache_enabled),
util::get_image_ctx(&m_image_ctx), aio_type);
return aio_comp;
}
Context *on_ready;
Context *on_safe;
std::set<int> filters;
+ bool writeback_cache_enabled;
C_AioModifyComplete(Replay *replay, Context *on_ready,
- Context *on_safe, std::set<int> &&filters)
+ Context *on_safe, std::set<int> &&filters,
+ bool writeback_cache_enabled)
: replay(replay), on_ready(on_ready), on_safe(on_safe),
- filters(std::move(filters)) {
+ filters(std::move(filters)),
+ writeback_cache_enabled(writeback_cache_enabled) {
}
void finish(int r) override {
- replay->handle_aio_modify_complete(on_ready, on_safe, r, filters);
+ replay->handle_aio_modify_complete(on_ready, on_safe, r, filters,
+ writeback_cache_enabled);
}
};
Context *on_safe);
void handle_aio_modify_complete(Context *on_ready, Context *on_safe,
- int r, std::set<int> &filters);
+ int r, std::set<int> &filters,
+ bool writeback_cache_enabled);
void handle_aio_flush_complete(Context *on_flush_safe, Contexts &on_safe_ctxs,
int r);
}
}
+ void expect_writeback_cache_enabled(MockReplayImageCtx &mock_image_ctx,
+ bool enabled) {
+ EXPECT_CALL(mock_image_ctx, is_writeback_cache_enabled())
+ .WillRepeatedly(Return(enabled));
+ }
+
void when_process(MockJournalReplay &mock_journal_replay,
EventEntry &&event_entry, Context *on_ready,
Context *on_safe) {
MockJournalReplay mock_journal_replay(mock_image_ctx);
MockIoImageRequest mock_io_image_request;
+ expect_writeback_cache_enabled(mock_image_ctx, true);
expect_op_work_queue(mock_image_ctx);
InSequence seq;
MockJournalReplay mock_journal_replay(mock_image_ctx);
MockIoImageRequest mock_io_image_request;
+ expect_writeback_cache_enabled(mock_image_ctx, true);
expect_op_work_queue(mock_image_ctx);
InSequence seq;
MockJournalReplay mock_journal_replay(mock_image_ctx);
MockIoImageRequest mock_io_image_request;
+ expect_writeback_cache_enabled(mock_image_ctx, true);
expect_op_work_queue(mock_image_ctx);
InSequence seq;
MockJournalReplay mock_compare_and_write_journal_replay(mock_image_ctx);
MockJournalReplay mock_mis_compare_and_write_journal_replay(mock_image_ctx);
MockIoImageRequest mock_io_image_request;
+ expect_writeback_cache_enabled(mock_image_ctx, true);
expect_op_work_queue(mock_image_ctx);
InSequence seq;
MockJournalReplay mock_journal_replay(mock_image_ctx);
MockIoImageRequest mock_io_image_request;
+ expect_writeback_cache_enabled(mock_image_ctx, true);
expect_op_work_queue(mock_image_ctx);
InSequence seq;
MockJournalReplay mock_journal_replay(mock_image_ctx);
MockIoImageRequest mock_io_image_request;
+ expect_writeback_cache_enabled(mock_image_ctx, true);
expect_op_work_queue(mock_image_ctx);
InSequence seq;
MockJournalReplay mock_journal_replay(mock_image_ctx);
MockIoImageRequest mock_io_image_request;
+ expect_writeback_cache_enabled(mock_image_ctx, true);
expect_op_work_queue(mock_image_ctx);
InSequence seq;
}
TEST_F(TestMockJournalReplay, Flush) {
+ REQUIRE_FEATURE(RBD_FEATURE_JOURNALING);
+
librbd::ImageCtx *ictx;
ASSERT_EQ(0, open_image(m_image_name, &ictx));
MockJournalReplay mock_journal_replay(mock_image_ctx);
MockIoImageRequest mock_io_image_request;
+ expect_writeback_cache_enabled(mock_image_ctx, true);
expect_op_work_queue(mock_image_ctx);
InSequence seq;
}
TEST_F(TestMockJournalReplay, MissingOpFinishEvent) {
+ REQUIRE_FEATURE(RBD_FEATURE_JOURNALING);
+
librbd::ImageCtx *ictx;
ASSERT_EQ(0, open_image(m_image_name, &ictx));
}
TEST_F(TestMockJournalReplay, MissingOpFinishEventCancelOps) {
+ REQUIRE_FEATURE(RBD_FEATURE_JOURNALING);
+
librbd::ImageCtx *ictx;
ASSERT_EQ(0, open_image(m_image_name, &ictx));
}
TEST_F(TestMockJournalReplay, UnknownOpFinishEvent) {
+ REQUIRE_FEATURE(RBD_FEATURE_JOURNALING);
+
librbd::ImageCtx *ictx;
ASSERT_EQ(0, open_image(m_image_name, &ictx));
ASSERT_EQ(-ECANCELED, on_finish_safe.wait());
}
+TEST_F(TestMockJournalReplay, WritebackCacheDisabled) {
+ REQUIRE_FEATURE(RBD_FEATURE_JOURNALING);
+
+ librbd::ImageCtx *ictx;
+ ASSERT_EQ(0, open_image(m_image_name, &ictx));
+
+ MockReplayImageCtx mock_image_ctx(*ictx);
+
+ MockExclusiveLock mock_exclusive_lock;
+ mock_image_ctx.exclusive_lock = &mock_exclusive_lock;
+ expect_accept_ops(mock_exclusive_lock, true);
+
+ MockJournalReplay mock_journal_replay(mock_image_ctx);
+ MockIoImageRequest mock_io_image_request;
+ expect_writeback_cache_enabled(mock_image_ctx, false);
+ expect_op_work_queue(mock_image_ctx);
+
+ InSequence seq;
+ io::AioCompletion *aio_comp;
+ C_SaferCond on_ready;
+ C_SaferCond on_safe;
+ expect_aio_discard(mock_io_image_request, &aio_comp, 123, 456, false);
+ when_process(mock_journal_replay,
+ EventEntry{AioDiscardEvent(123, 456, false)},
+ &on_ready, &on_safe);
+
+ when_complete(mock_image_ctx, aio_comp, 0);
+ ASSERT_EQ(0, on_ready.wait());
+ ASSERT_EQ(0, on_safe.wait());
+ ASSERT_EQ(0, when_shut_down(mock_journal_replay, false));
+}
+
} // namespace journal
} // namespace librbd
MOCK_CONST_METHOD0(get_stripe_count, uint64_t());
MOCK_CONST_METHOD0(get_stripe_period, uint64_t());
+ MOCK_CONST_METHOD0(is_writeback_cache_enabled, bool());
+
ImageCtx *image_ctx;
CephContext *cct;
PerfCounters *perfcounter;
TestImageReplayer()
: m_local_cluster(new librados::Rados()), m_watch_handle(0)
{
+ EXPECT_EQ(0, g_ceph_context->_conf->get_val("rbd_mirror_journal_commit_age",
+ &m_journal_commit_age));
+ EXPECT_EQ(0, g_ceph_context->_conf->set_val("rbd_mirror_journal_commit_age",
+ "0.1"));
+
EXPECT_EQ("", connect_cluster_pp(*m_local_cluster.get()));
EXPECT_EQ(0, m_local_cluster->conf_set("rbd_cache", "false"));
EXPECT_EQ(0, m_local_cluster->conf_set("rbd_mirror_journal_poll_age", "1"));
EXPECT_EQ(0, m_remote_cluster.pool_delete(m_remote_pool_name.c_str()));
EXPECT_EQ(0, m_local_cluster->pool_delete(m_local_pool_name.c_str()));
+ EXPECT_EQ(0, g_ceph_context->_conf->set_val("rbd_mirror_journal_commit_age",
+ m_journal_commit_age));
}
template <typename ImageReplayerT = rbd::mirror::ImageReplayer<> >
cls::journal::ObjectPosition mirror_position;
for (int i = 0; i < 100; i++) {
- printf("m_replayer->flush()\n");
- C_SaferCond cond;
- m_replayer->flush(&cond);
- ASSERT_EQ(0, cond.wait());
get_commit_positions(&master_position, &mirror_position);
if (master_position == mirror_position) {
break;
C_WatchCtx *m_watch_ctx;
uint64_t m_watch_handle;
char m_test_data[TEST_IO_SIZE + 1];
+ std::string m_journal_commit_age;
};
int TestImageReplayer::_image_number;