// safely commit any remaining AIO modify operations
if ((m_in_flight_aio_flush + m_in_flight_aio_modify) != 0) {
flush_comp = create_aio_flush_completion(nullptr);
+ assert(flush_comp != nullptr);
}
for (auto &op_event_pair : m_op_events) {
}
}
+ assert(!m_shut_down);
+ m_shut_down = true;
+
assert(m_flush_ctx == nullptr);
if (m_in_flight_op_events > 0 || flush_comp != nullptr) {
std::swap(m_flush_ctx, on_finish);
Mutex::Locker locker(m_lock);
aio_comp = create_aio_flush_completion(
util::create_async_context_callback(m_image_ctx, on_finish));
+ if (aio_comp == nullptr) {
+ return;
+ }
}
RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
auto aio_comp = create_aio_modify_completion(on_ready, on_safe,
io::AIO_TYPE_DISCARD,
&flush_required);
+ if (aio_comp == nullptr) {
+ return;
+ }
+
io::ImageRequest<I>::aio_discard(&m_image_ctx, aio_comp, event.offset,
event.length, event.skip_partial_discard,
{});
auto flush_comp = create_aio_flush_completion(nullptr);
m_lock.Unlock();
- io::ImageRequest<I>::aio_flush(&m_image_ctx, flush_comp, {});
+ if (flush_comp != nullptr) {
+ io::ImageRequest<I>::aio_flush(&m_image_ctx, flush_comp, {});
+ }
}
}
auto aio_comp = create_aio_modify_completion(on_ready, on_safe,
io::AIO_TYPE_WRITE,
&flush_required);
+ if (aio_comp == nullptr) {
+ return;
+ }
+
io::ImageRequest<I>::aio_write(&m_image_ctx, aio_comp,
{{event.offset, event.length}},
std::move(data), 0, {});
auto flush_comp = create_aio_flush_completion(nullptr);
m_lock.Unlock();
- io::ImageRequest<I>::aio_flush(&m_image_ctx, flush_comp, {});
+ if (flush_comp != nullptr) {
+ io::ImageRequest<I>::aio_flush(&m_image_ctx, flush_comp, {});
+ }
}
}
Mutex::Locker locker(m_lock);
aio_comp = create_aio_flush_completion(on_safe);
}
- io::ImageRequest<I>::aio_flush(&m_image_ctx, aio_comp, {});
+ if (aio_comp != nullptr) {
+ io::ImageRequest<I>::aio_flush(&m_image_ctx, aio_comp, {});
+ }
on_ready->complete(0);
}
auto aio_comp = create_aio_modify_completion(on_ready, on_safe,
io::AIO_TYPE_WRITESAME,
&flush_required);
+ if (aio_comp == nullptr) {
+ return;
+ }
+
io::ImageRequest<I>::aio_writesame(&m_image_ctx, aio_comp, event.offset,
event.length, std::move(data), 0, {});
if (flush_required) {
auto flush_comp = create_aio_flush_completion(nullptr);
m_lock.Unlock();
- io::ImageRequest<I>::aio_flush(&m_image_ctx, flush_comp, {});
+ if (flush_comp != nullptr) {
+ io::ImageRequest<I>::aio_flush(&m_image_ctx, flush_comp, {});
+ }
}
}
Context *on_safe,
OpEvent **op_event) {
CephContext *cct = m_image_ctx.cct;
+ if (m_shut_down) {
+ ldout(cct, 5) << ": ignoring event after shut down" << dendl;
+ on_ready->complete(0);
+ m_image_ctx.op_work_queue->queue(on_safe, -ESHUTDOWN);
+ return nullptr;
+ }
assert(m_lock.is_locked());
if (m_op_events.count(op_tid) != 0) {
op_event = std::move(op_it->second);
m_op_events.erase(op_it);
- shutting_down = (m_flush_ctx != nullptr);
+ if (m_shut_down) {
+ assert(m_flush_ctx != nullptr);
+ shutting_down = true;
+ }
}
assert(op_event.on_start_ready == nullptr || (r < 0 && r != -ERESTART));
CephContext *cct = m_image_ctx.cct;
assert(m_on_aio_ready == nullptr);
+ if (m_shut_down) {
+ ldout(cct, 5) << ": ignoring event after shut down" << dendl;
+ on_ready->complete(0);
+ m_image_ctx.op_work_queue->queue(on_safe, -ESHUTDOWN);
+ return nullptr;
+ }
+
++m_in_flight_aio_modify;
m_aio_modify_unsafe_contexts.push_back(on_safe);
io::AioCompletion *Replay<I>::create_aio_flush_completion(Context *on_safe) {
assert(m_lock.is_locked());
+ CephContext *cct = m_image_ctx.cct;
+ if (m_shut_down) {
+ ldout(cct, 5) << ": ignoring event after shut down" << dendl;
+ if (on_safe != nullptr) {
+ m_image_ctx.op_work_queue->queue(on_safe, -ESHUTDOWN);
+ }
+ return nullptr;
+ }
+
++m_in_flight_aio_flush;
// associate all prior write/discard ops to this flush request
OpEvents m_op_events;
uint64_t m_in_flight_op_events = 0;
+ bool m_shut_down = false;
Context *m_flush_ctx = nullptr;
Context *m_on_aio_ready = nullptr;
ASSERT_EQ(0, on_finish_safe.wait());
}
+TEST_F(TestMockJournalReplay, FlushEventAfterShutDown) {
+ REQUIRE_FEATURE(RBD_FEATURE_JOURNALING);
+
+ librbd::ImageCtx *ictx;
+ ASSERT_EQ(0, open_image(m_image_name, &ictx));
+
+ MockReplayImageCtx mock_image_ctx(*ictx);
+ MockJournalReplay mock_journal_replay(mock_image_ctx);
+ MockIoImageRequest mock_io_image_request;
+ expect_op_work_queue(mock_image_ctx);
+
+ ASSERT_EQ(0, when_shut_down(mock_journal_replay, false));
+
+ C_SaferCond on_ready;
+ C_SaferCond on_safe;
+ when_process(mock_journal_replay, EventEntry{AioFlushEvent()},
+ &on_ready, &on_safe);
+ ASSERT_EQ(0, on_ready.wait());
+ ASSERT_EQ(-ESHUTDOWN, on_safe.wait());
+}
+
+TEST_F(TestMockJournalReplay, ModifyEventAfterShutDown) {
+ REQUIRE_FEATURE(RBD_FEATURE_JOURNALING);
+
+ librbd::ImageCtx *ictx;
+ ASSERT_EQ(0, open_image(m_image_name, &ictx));
+
+ MockReplayImageCtx mock_image_ctx(*ictx);
+ MockJournalReplay mock_journal_replay(mock_image_ctx);
+ MockIoImageRequest mock_io_image_request;
+ expect_op_work_queue(mock_image_ctx);
+
+ ASSERT_EQ(0, when_shut_down(mock_journal_replay, false));
+
+ C_SaferCond on_ready;
+ C_SaferCond on_safe;
+ when_process(mock_journal_replay,
+ EventEntry{AioWriteEvent(123, 456, to_bl("test"))},
+ &on_ready, &on_safe);
+ ASSERT_EQ(0, on_ready.wait());
+ ASSERT_EQ(-ESHUTDOWN, on_safe.wait());
+}
+
+TEST_F(TestMockJournalReplay, OpEventAfterShutDown) {
+ REQUIRE_FEATURE(RBD_FEATURE_JOURNALING);
+
+ librbd::ImageCtx *ictx;
+ ASSERT_EQ(0, open_image(m_image_name, &ictx));
+
+ MockReplayImageCtx mock_image_ctx(*ictx);
+ MockJournalReplay mock_journal_replay(mock_image_ctx);
+ MockIoImageRequest mock_io_image_request;
+ expect_op_work_queue(mock_image_ctx);
+
+ ASSERT_EQ(0, when_shut_down(mock_journal_replay, false));
+
+ C_SaferCond on_ready;
+ C_SaferCond on_safe;
+ when_process(mock_journal_replay, EventEntry{RenameEvent(123, "image")},
+ &on_ready, &on_safe);
+ ASSERT_EQ(0, on_ready.wait());
+ ASSERT_EQ(-ESHUTDOWN, on_safe.wait());
+}
+
} // namespace journal
} // namespace librbd
.WillOnce(CompleteContext(r));
}
+ void expect_flush(MockReplay &mock_replay, int r) {
+ EXPECT_CALL(mock_replay, flush(_)).WillOnce(CompleteContext(r));
+ }
+
void expect_shut_down(MockReplay &mock_replay, bool cancel_ops, int r) {
EXPECT_CALL(mock_replay, shut_down(cancel_ops, _))
.WillOnce(WithArg<1>(CompleteContext(r)));
MockCloseImageRequest mock_close_local_image_request;
- expect_stop_replay(mock_remote_journaler, 0);
expect_shut_down(mock_local_replay, true, 0);
-
EXPECT_CALL(mock_local_journal, remove_listener(_));
EXPECT_CALL(mock_local_journal, stop_external_replay());
+ expect_send(mock_close_local_image_request, 0);
+ expect_stop_replay(mock_remote_journaler, 0);
EXPECT_CALL(mock_remote_journaler, remove_listener(_));
expect_shut_down(mock_remote_journaler, 0);
- expect_send(mock_close_local_image_request, 0);
-
C_SaferCond stop_ctx;
m_image_replayer->stop(&stop_ctx);
ASSERT_EQ(0, stop_ctx.wait());
expect_start_external_replay(mock_local_journal, nullptr, -EINVAL);
MockCloseImageRequest mock_close_local_image_request;
-
EXPECT_CALL(mock_local_journal, remove_listener(_));
+ expect_send(mock_close_local_image_request, 0);
EXPECT_CALL(mock_remote_journaler, remove_listener(_));
expect_shut_down(mock_remote_journaler, 0);
- expect_send(mock_close_local_image_request, 0);
-
C_SaferCond start_ctx;
m_image_replayer->start(&start_ctx);
ASSERT_EQ(-EINVAL, start_ctx.wait());
MockCloseImageRequest mock_close_local_image_request;
- expect_stop_replay(mock_remote_journaler, -EINVAL);
expect_shut_down(mock_local_replay, true, -EINVAL);
-
EXPECT_CALL(mock_local_journal, remove_listener(_));
EXPECT_CALL(mock_local_journal, stop_external_replay());
+ expect_send(mock_close_local_image_request, -EINVAL);
+ expect_stop_replay(mock_remote_journaler, -EINVAL);
EXPECT_CALL(mock_remote_journaler, remove_listener(_));
expect_shut_down(mock_remote_journaler, -EINVAL);
- expect_send(mock_close_local_image_request, -EINVAL);
-
C_SaferCond stop_ctx;
m_image_replayer->stop(&stop_ctx);
ASSERT_EQ(0, stop_ctx.wait());
// STOP
MockCloseImageRequest mock_close_local_image_request;
-
- expect_stop_replay(mock_remote_journaler, 0);
expect_shut_down(mock_local_replay, true, 0);
-
EXPECT_CALL(mock_local_journal, remove_listener(_));
EXPECT_CALL(mock_local_journal, stop_external_replay());
+ expect_send(mock_close_local_image_request, 0);
+ expect_stop_replay(mock_remote_journaler, 0);
EXPECT_CALL(mock_remote_journaler, remove_listener(_));
expect_shut_down(mock_remote_journaler, 0);
- expect_send(mock_close_local_image_request, 0);
-
C_SaferCond stop_ctx;
m_image_replayer->stop(&stop_ctx);
ASSERT_EQ(0, stop_ctx.wait());
.WillOnce(Return(-EINVAL));
// stop on error
- expect_stop_replay(mock_remote_journaler, 0);
expect_shut_down(mock_local_replay, true, 0);
-
EXPECT_CALL(mock_local_journal, remove_listener(_));
EXPECT_CALL(mock_local_journal, stop_external_replay());
- EXPECT_CALL(mock_remote_journaler, remove_listener(_));
- expect_shut_down(mock_remote_journaler, 0);
-
MockCloseImageRequest mock_close_local_image_request;
C_SaferCond close_ctx;
EXPECT_CALL(mock_close_local_image_request, send())
close_ctx.complete(0);
}));
+ expect_stop_replay(mock_remote_journaler, 0);
+ EXPECT_CALL(mock_remote_journaler, remove_listener(_));
+ expect_shut_down(mock_remote_journaler, 0);
+
// fire
m_image_replayer->handle_replay_ready();
ASSERT_EQ(0, close_ctx.wait());
MockCloseImageRequest mock_close_local_image_request;
- expect_stop_replay(mock_remote_journaler, 0);
expect_shut_down(mock_local_replay, true, 0);
-
EXPECT_CALL(mock_local_journal, remove_listener(_));
EXPECT_CALL(mock_local_journal, stop_external_replay());
+ expect_send(mock_close_local_image_request, 0);
+ expect_stop_replay(mock_remote_journaler, 0);
EXPECT_CALL(mock_remote_journaler, remove_listener(_));
expect_shut_down(mock_remote_journaler, 0);
- expect_send(mock_close_local_image_request, 0);
-
C_SaferCond stop_ctx;
m_image_replayer->stop(&stop_ctx);
ASSERT_EQ(0, stop_ctx.wait());
}
m_event_replay_tracker.start_op();
+
+ m_lock.Lock();
+ bool stopping = (m_state == STATE_STOPPING);
+ m_lock.Unlock();
+
+ if (stopping) {
+ dout(10) << "stopping event replay" << dendl;
+ m_event_replay_tracker.finish_op();
+ return;
+ }
+
if (m_replay_tag_valid && m_replay_tag.tid == m_replay_tag_tid) {
preprocess_entry();
return;
}
}
+ // NOTE: it's important to ensure that the local image is fully
+ // closed before attempting to close the remote journal in
+ // case the remote cluster is unreachable
+
// chain the shut down sequence (reverse order)
Context *ctx = new FunctionContext(
[this, r](int _r) {
update_mirror_image_status(true, STATE_STOPPED);
handle_shut_down(r);
});
- if (m_local_image_ctx) {
- ctx = new FunctionContext([this, ctx](int r) {
- CloseImageRequest<I> *request = CloseImageRequest<I>::create(
- &m_local_image_ctx, ctx);
- request->send();
- });
- }
+
+ // close the remote journal
if (m_remote_journaler != nullptr) {
ctx = new FunctionContext([this, ctx](int r) {
delete m_remote_journaler;
});
}
}
+
+ // stop the replay of remote journal events
+ if (m_replay_handler != nullptr) {
+ ctx = new FunctionContext([this, ctx](int r) {
+ delete m_replay_handler;
+ m_replay_handler = nullptr;
+
+ m_event_replay_tracker.wait_for_ops(ctx);
+ });
+ ctx = new FunctionContext([this, ctx](int r) {
+ m_remote_journaler->stop_replay(ctx);
+ });
+ }
+
+ // close the local image (release exclusive lock)
+ if (m_local_image_ctx) {
+ ctx = new FunctionContext([this, ctx](int r) {
+ CloseImageRequest<I> *request = CloseImageRequest<I>::create(
+ &m_local_image_ctx, ctx);
+ request->send();
+ });
+ }
+
+ // shut down event replay into the local image
if (m_local_journal != nullptr) {
ctx = new FunctionContext([this, ctx](int r) {
m_local_journal = nullptr;
});
}
ctx = new FunctionContext([this, ctx](int r) {
- if (r < 0) {
- derr << "error flushing journal replay: " << cpp_strerror(r) << dendl;
- }
-
// blocks if listener notification is in-progress
m_local_journal->remove_listener(m_journal_listener);
-
- // wait for all in-flight replayed events to complete
- m_event_replay_tracker.wait_for_ops(ctx);
- });
- if (m_local_replay != nullptr) {
- ctx = new FunctionContext([this, ctx](int r) {
- m_local_replay->shut_down(true, ctx);
- });
- }
- }
- if (m_replay_handler != nullptr) {
- ctx = new FunctionContext([this, ctx](int r) {
- delete m_replay_handler;
- m_replay_handler = nullptr;
ctx->complete(0);
});
+ }
+
+ // wait for all local in-flight replay events to complete
+ ctx = new FunctionContext([this, ctx](int r) {
+ if (r < 0) {
+ derr << "error shutting down journal replay: " << cpp_strerror(r)
+ << dendl;
+ }
+
+ m_event_replay_tracker.wait_for_ops(ctx);
+ });
+
+ // flush any local in-flight replay events
+ if (m_local_replay != nullptr) {
ctx = new FunctionContext([this, ctx](int r) {
- m_remote_journaler->stop_replay(ctx);
+ m_local_replay->shut_down(true, ctx);
});
}
+
m_threads->work_queue->queue(ctx, 0);
}