};
template <typename J>
-int open_journaler(CephContext *cct, J *journaler, bool *initialized,
+int open_journaler(CephContext *cct, J *journaler,
cls::journal::Client *client,
journal::ImageClientMeta *client_meta,
journal::TagData *tag_data) {
C_SaferCond init_ctx;
journaler->init(&init_ctx);
int r = init_ctx.wait();
- *initialized = (r >= 0);
if (r < 0) {
return r;
}
C_SaferCond cond;
journaler.init(&cond);
+ BOOST_SCOPE_EXIT_ALL(&journaler) {
+ journaler.shut_down();
+ };
r = cond.wait();
if (r == -ENOENT) {
C_SaferCond cond;
journaler.init(&cond);
+ BOOST_SCOPE_EXIT_ALL(&journaler) {
+ journaler.shut_down();
+ };
int r = cond.wait();
if (r == -ENOENT) {
Journaler journaler(io_ctx, image_id, IMAGE_CLIENT_ID,
cct->_conf->rbd_journal_commit_age);
- bool initialized;
cls::journal::Client client;
journal::ImageClientMeta client_meta;
journal::TagData tag_data;
- int r = open_journaler(cct, &journaler, &initialized, &client,
- &client_meta, &tag_data);
+ int r = open_journaler(cct, &journaler, &client, &client_meta, &tag_data);
if (r >= 0) {
*mirror_uuid = tag_data.mirror_uuid;
}
- if (initialized) {
- journaler.shut_down();
- }
+ journaler.shut_down();
return r;
}
Journaler journaler(image_ctx->md_ctx, image_ctx->id, IMAGE_CLIENT_ID,
image_ctx->cct->_conf->rbd_journal_commit_age);
- bool initialized;
cls::journal::Client client;
journal::ImageClientMeta client_meta;
journal::TagData tag_data;
- int r = open_journaler(image_ctx->cct, &journaler, &initialized, &client,
- &client_meta, &tag_data);
- BOOST_SCOPE_EXIT_ALL(&journaler, &initialized) {
- if (initialized) {
- journaler.shut_down();
- }
+ int r = open_journaler(image_ctx->cct, &journaler, &client, &client_meta,
+ &tag_data);
+ BOOST_SCOPE_EXIT_ALL(&journaler) {
+ journaler.shut_down();
};
if (r < 0) {
Journaler journaler(image_ctx->md_ctx, image_ctx->id, IMAGE_CLIENT_ID,
image_ctx->cct->_conf->rbd_journal_commit_age);
- bool initialized;
cls::journal::Client client;
journal::ImageClientMeta client_meta;
journal::TagData tag_data;
- int r = open_journaler(image_ctx->cct, &journaler, &initialized, &client,
- &client_meta, &tag_data);
- BOOST_SCOPE_EXIT_ALL(&journaler, &initialized) {
- if (initialized) {
- journaler.shut_down();
- }
+ int r = open_journaler(image_ctx->cct, &journaler, &client, &client_meta,
+ &tag_data);
+ BOOST_SCOPE_EXIT_ALL(&journaler) {
+ journaler.shut_down();
};
if (r < 0) {
m_journal_replay = NULL;
transition_state(STATE_CLOSING, r);
- m_image_ctx.op_work_queue->queue(create_context_callback<
- Journal<I>, &Journal<I>::handle_journal_destroyed>(this), 0);
+ m_journaler->shut_down(create_async_context_callback(
+ m_image_ctx, create_context_callback<
+ Journal<I>, &Journal<I>::handle_journal_destroyed>(this)));
}
template <typename I>
m_journal_replay = NULL;
transition_state(STATE_RESTARTING_REPLAY, r);
- m_image_ctx.op_work_queue->queue(create_context_callback<
- Journal<I>, &Journal<I>::handle_journal_destroyed>(this), 0);
+ m_journaler->shut_down(create_async_context_callback(
+ m_image_ctx, create_context_callback<
+ Journal<I>, &Journal<I>::handle_journal_destroyed>(this)));
}
template <typename I>
void Journal<I>::handle_replay_complete(int r) {
CephContext *cct = m_image_ctx.cct;
- m_lock.Lock();
- if (m_state != STATE_REPLAYING) {
- m_lock.Unlock();
- return;
+ bool cancel_ops = false;
+ {
+ Mutex::Locker locker(m_lock);
+ if (m_state != STATE_REPLAYING) {
+ return;
+ }
+
+ ldout(cct, 20) << this << " " << __func__ << ": r=" << r << dendl;
+ if (r < 0) {
+ cancel_ops = true;
+ transition_state(STATE_FLUSHING_RESTART, r);
+ } else {
+ // state might change back to FLUSHING_RESTART on flush error
+ transition_state(STATE_FLUSHING_REPLAY, 0);
+ }
}
- ldout(cct, 20) << this << " " << __func__ << ": r=" << r << dendl;
- m_journaler->stop_replay();
- if (r < 0) {
- transition_state(STATE_FLUSHING_RESTART, r);
- m_lock.Unlock();
+ Context *ctx = new FunctionContext([this, cct](int r) {
+ ldout(cct, 20) << this << " handle_replay_complete: "
+ << "handle shut down replay" << dendl;
- m_journal_replay->shut_down(true, create_context_callback<
- Journal<I>, &Journal<I>::handle_flushing_restart>(this));
- } else {
- transition_state(STATE_FLUSHING_REPLAY, 0);
- m_lock.Unlock();
+ State state;
+ {
+ Mutex::Locker locker(m_lock);
+ assert(m_state == STATE_FLUSHING_RESTART ||
+ m_state == STATE_FLUSHING_REPLAY);
+ state = m_state;
+ }
- m_journal_replay->shut_down(false, create_context_callback<
- Journal<I>, &Journal<I>::handle_flushing_replay>(this));
- }
+ if (state == STATE_FLUSHING_RESTART) {
+ handle_flushing_restart(0);
+ } else {
+ handle_flushing_replay();
+ }
+ });
+ ctx = new FunctionContext([this, cct, cancel_ops, ctx](int r) {
+ ldout(cct, 20) << this << " handle_replay_complete: "
+ << "shut down replay" << dendl;
+ m_journal_replay->shut_down(cancel_ops, ctx);
+ });
+ m_journaler->stop_replay(ctx);
}
template <typename I>
template <typename I>
void Journal<I>::handle_replay_process_safe(ReplayEntry replay_entry, int r) {
- Mutex::Locker locker(m_lock);
-
CephContext *cct = m_image_ctx.cct;
+
+ m_lock.Lock();
ldout(cct, 20) << this << " " << __func__ << ": r=" << r << dendl;
if (r < 0) {
lderr(cct) << "failed to commit journal event to disk: " << cpp_strerror(r)
if (m_state == STATE_REPLAYING) {
// abort the replay if we have an error
- m_journaler->stop_replay();
transition_state(STATE_FLUSHING_RESTART, r);
-
- m_journal_replay->shut_down(true, create_context_callback<
- Journal<I>, &Journal<I>::handle_flushing_restart>(this));
+ m_lock.Unlock();
+
+ // stop replay, shut down, and restart
+ Context *ctx = new FunctionContext([this, cct](int r) {
+ ldout(cct, 20) << this << " handle_replay_process_safe: "
+ << "shut down replay" << dendl;
+ {
+ Mutex::Locker locker(m_lock);
+ assert(m_state == STATE_FLUSHING_RESTART);
+ }
+
+ m_journal_replay->shut_down(true, create_context_callback<
+ Journal<I>, &Journal<I>::handle_flushing_restart>(this));
+ });
+ m_journaler->stop_replay(ctx);
return;
} else if (m_state == STATE_FLUSHING_REPLAY) {
// end-of-replay flush in-progress -- we need to restart replay
transition_state(STATE_FLUSHING_RESTART, r);
+ m_lock.Unlock();
return;
}
} else {
// only commit the entry if written successfully
m_journaler->committed(replay_entry);
+ m_lock.Unlock();
}
}
}
template <typename I>
-void Journal<I>::handle_flushing_replay(int r) {
+void Journal<I>::handle_flushing_replay() {
Mutex::Locker locker(m_lock);
CephContext *cct = m_image_ctx.cct;
- ldout(cct, 20) << this << " " << __func__ << ": r=" << r << dendl;
+ ldout(cct, 20) << this << " " << __func__ << dendl;
- assert(r == 0);
assert(m_state == STATE_FLUSHING_REPLAY || m_state == STATE_FLUSHING_RESTART);
if (m_close_pending) {
- destroy_journaler(r);
+ destroy_journaler(0);
return;
} else if (m_state == STATE_FLUSHING_RESTART) {
// failed to replay one-or-more events -- restart
void handle_replay_process_safe(ReplayEntry replay_entry, int r);
void handle_flushing_restart(int r);
- void handle_flushing_replay(int r);
+ void handle_flushing_replay();
void handle_recording_stopped(int r);
MOCK_METHOD1(init, void(Context *));
MOCK_METHOD0(shut_down, void());
+ MOCK_METHOD1(shut_down, void(Context *));
MOCK_CONST_METHOD0(is_initialized, bool());
MOCK_METHOD3(get_metadata, void(uint8_t *order, uint8_t *splay_width,
MOCK_METHOD1(try_pop_front, bool(MockReplayEntryProxy *));
MOCK_METHOD2(try_pop_front, bool(MockReplayEntryProxy *, uint64_t *));
MOCK_METHOD0(stop_replay, void());
+ MOCK_METHOD1(stop_replay, void(Context *on_finish));
MOCK_METHOD3(start_append, void(int flush_interval, uint64_t flush_bytes,
double flush_age));
void shut_down() {
MockJournaler::get_instance().shut_down();
}
+ void shut_down(Context *on_finish) {
+ MockJournaler::get_instance().shut_down(on_finish);
+ }
bool is_initialized() const {
return MockJournaler::get_instance().is_initialized();
}
void stop_replay() {
MockJournaler::get_instance().stop_replay();
}
+ void stop_replay(Context *on_finish) {
+ MockJournaler::get_instance().stop_replay(on_finish);
+ }
void start_append(int flush_interval, uint64_t flush_bytes, double flush_age) {
MockJournaler::get_instance().start_append(flush_interval, flush_bytes,
it != m_journalers.end(); ++it) {
journal::Journaler *journaler = *it;
journaler->stop_replay();
+ journaler->shut_down();
delete journaler;
}
#include <list>
#include <boost/scope_exit.hpp>
+#define dout_subsys ceph_subsys_rbd
+
namespace librbd {
namespace {
.WillOnce(CompleteContext(r, NULL));
}
+ void expect_shut_down_journaler(::journal::MockJournaler &mock_journaler) {
+ EXPECT_CALL(mock_journaler, shut_down(_))
+ .WillOnce(CompleteContext(0, NULL));
+ }
+
void expect_get_max_append_size(::journal::MockJournaler &mock_journaler,
uint32_t max_size) {
EXPECT_CALL(mock_journaler, get_max_append_size())
}
void expect_stop_replay(::journal::MockJournaler &mock_journaler) {
- EXPECT_CALL(mock_journaler, stop_replay());
+ EXPECT_CALL(mock_journaler, stop_replay(_))
+ .WillOnce(CompleteContext(0, NULL));
}
void expect_shut_down_replay(MockJournalImageCtx &mock_image_ctx,
Contexts commit_contexts;
std::swap(commit_contexts, m_commit_contexts);
+ derr << "SHUT DOWN REPLAY START" << dendl;
for (auto ctx : commit_contexts) {
mock_image_ctx.image_ctx->op_work_queue->queue(ctx, r);
}
+
+ on_flush = new FunctionContext([on_flush](int r) {
+ derr << "FLUSH START" << dendl;
+ on_flush->complete(r);
+ derr << "FLUSH FINISH" << dendl;
+ });
mock_image_ctx.image_ctx->op_work_queue->queue(on_flush, 0);
+ derr << "SHUT DOWN REPLAY FINISH" << dendl;
}
void open_journal(MockJournalImageCtx &mock_image_ctx,
ASSERT_EQ(0, when_open(mock_journal));
expect_stop_append(mock_journaler, 0);
+ expect_shut_down_journaler(mock_journaler);
ASSERT_EQ(0, when_close(mock_journal));
}
::journal::MockJournaler mock_journaler;
expect_construct_journaler(mock_journaler);
expect_init_journaler(mock_journaler, -EINVAL);
+ expect_shut_down_journaler(mock_journaler);
ASSERT_EQ(-EINVAL, when_open(mock_journal));
}
expect_init_journaler(mock_journaler, 0);
expect_get_max_append_size(mock_journaler, 1 << 16);
expect_get_journaler_cached_client(mock_journaler, -ENOENT);
+ expect_shut_down_journaler(mock_journaler);
ASSERT_EQ(-ENOENT, when_open(mock_journal));
}
expect_get_max_append_size(mock_journaler, 1 << 16);
expect_get_journaler_cached_client(mock_journaler, 0);
expect_get_journaler_tags(mock_image_ctx, mock_journaler, -EBADMSG);
+ expect_shut_down_journaler(mock_journaler);
ASSERT_EQ(-EBADMSG, when_open(mock_journal));
}
MockJournalReplay mock_journal_replay;
expect_stop_replay(mock_journaler);
expect_shut_down_replay(mock_image_ctx, mock_journal_replay, 0, true);
+ expect_shut_down_journaler(mock_journaler);
// replay failure should result in replay-restart
expect_construct_journaler(mock_journaler);
ASSERT_EQ(0, when_open(mock_journal));
expect_stop_append(mock_journaler, 0);
+ expect_shut_down_journaler(mock_journaler);
ASSERT_EQ(0, when_close(mock_journal));
}
expect_try_pop_front(mock_journaler, false, mock_replay_entry);
expect_stop_replay(mock_journaler);
expect_shut_down_replay(mock_image_ctx, mock_journal_replay, -EINVAL);
+ expect_shut_down_journaler(mock_journaler);
// replay flush failure should result in replay-restart
expect_construct_journaler(mock_journaler);
ASSERT_EQ(0, when_open(mock_journal));
expect_stop_append(mock_journaler, 0);
+ expect_shut_down_journaler(mock_journaler);
ASSERT_EQ(0, when_close(mock_journal));
}
ASSERT_EQ(0, when_open(mock_journal));
expect_stop_append(mock_journaler, -EINVAL);
+ expect_shut_down_journaler(mock_journaler);
ASSERT_EQ(-EINVAL, when_close(mock_journal));
}
expect_try_pop_front(mock_journaler, false, mock_replay_entry);
expect_stop_replay(mock_journaler);
expect_shut_down_replay(mock_image_ctx, mock_journal_replay, 0, true);
+ expect_shut_down_journaler(mock_journaler);
// replay write-to-disk failure should result in replay-restart
expect_construct_journaler(mock_journaler);
ASSERT_EQ(0, ctx.wait());
expect_stop_append(mock_journaler, 0);
+ expect_shut_down_journaler(mock_journaler);
ASSERT_EQ(0, when_close(mock_journal));
}
InvokeWithoutArgs(this, &TestMockJournal::wake_up)));
// replay write-to-disk failure should result in replay-restart
+ expect_shut_down_journaler(mock_journaler);
expect_construct_journaler(mock_journaler);
expect_init_journaler(mock_journaler, 0);
expect_get_max_append_size(mock_journaler, 1 << 16);
ASSERT_EQ(0, ctx.wait());
expect_stop_append(mock_journaler, 0);
+ expect_shut_down_journaler(mock_journaler);
ASSERT_EQ(0, when_close(mock_journal));
}
on_journal_safe2->complete(0);
ictx->op_work_queue->drain();
ASSERT_EQ(0, event_ctx.wait());
+
+ expect_shut_down_journaler(mock_journaler);
}
TEST_F(TestMockJournal, AppendWriteEvent) {
expect_future_committed(mock_journaler);
mock_journal.commit_io_event(1U, 0);
ictx->op_work_queue->drain();
+
+ expect_shut_down_journaler(mock_journaler);
}
TEST_F(TestMockJournal, EventCommitError) {
C_SaferCond flush_ctx;
mock_journal.flush_event(1U, &flush_ctx);
ASSERT_EQ(-EINVAL, flush_ctx.wait());
+
+ expect_shut_down_journaler(mock_journaler);
}
TEST_F(TestMockJournal, EventCommitErrorWithPendingWriteback) {
// cache should receive the error if waiting
ASSERT_EQ(-EINVAL, flush_ctx.wait());
+
+ expect_shut_down_journaler(mock_journaler);
}
TEST_F(TestMockJournal, IOCommitError) {
on_journal_safe->complete(0);
ictx->op_work_queue->drain();
mock_journal.commit_io_event(1U, -EINVAL);
+
+ expect_shut_down_journaler(mock_journaler);
}
TEST_F(TestMockJournal, FlushCommitPosition) {
C_SaferCond ctx;
mock_journal.flush_commit_position(&ctx);
ASSERT_EQ(0, ctx.wait());
+
+ expect_shut_down_journaler(mock_journaler);
}
} // namespace librbd
}
int shut_down() {
- ::journal::Journaler::shut_down();
-
int r = unregister_client();
if (r < 0) {
std::cerr << "rbd: failed to unregister journal client: "
<< cpp_strerror(r) << std::endl;
}
+ ::journal::Journaler::shut_down();
+
return r;
}
};
m_journaler.start_replay(&replay_handler);
r = m_cond.wait();
-
if (r < 0) {
std::cerr << "rbd: failed to process journal: " << cpp_strerror(r)
<< std::endl;
m_r = r;
}
}
-
- r = m_journaler.shut_down();
- if (r < 0 && m_r == 0) {
- m_r = r;
- }
-
return m_r;
}
+ int shut_down() {
+ return m_journaler.shut_down();
+ }
+
protected:
struct ReplayHandler : public ::journal::ReplayHandler {
JournalPlayer *journal;
uint64_t tag_id) = 0;
void handle_replay_complete(int r) {
- m_journaler.stop_replay();
- m_cond.complete(r);
+ if (m_r == 0 && r < 0) {
+ m_r = r;
+ }
+ m_journaler.stop_replay(&m_cond);
}
Journaler m_journaler;
static int do_inspect_journal(librados::IoCtx& io_ctx,
const std::string& journal_id,
bool verbose) {
- return JournalInspector(io_ctx, journal_id, verbose).exec();
+ JournalInspector inspector(io_ctx, journal_id, verbose);
+ int r = inspector.exec();
+ if (r < 0) {
+ inspector.shut_down();
+ return r;
+ }
+
+ r = inspector.shut_down();
+ if (r < 0) {
+ return r;
+ }
+ return 0;
}
struct ExportEntry {
posix_fadvise(fd, 0, 0, POSIX_FADV_SEQUENTIAL);
}
- r = JournalExporter(io_ctx, journal_id, fd, no_error, verbose).exec();
+ JournalExporter exporter(io_ctx, journal_id, fd, no_error, verbose);
+ r = exporter.exec();
if (!to_stdout) {
close(fd);
}
+ int shut_down_r = exporter.shut_down();
+ if (r == 0 && shut_down_r < 0) {
+ r = shut_down_r;
+ }
+
return r;
}
std::cerr << "failed to append journal: " << cpp_strerror(r) << std::endl;
}
- if (r1 < 0 && r == 0) {
- r = r1;
- }
- r1 = m_journaler.shut_down();
if (r1 < 0 && r == 0) {
r = r1;
}
return r;
}
+ int shut_down() {
+ return m_journaler.shut_down();
+ }
+
private:
Journaler m_journaler;
int m_fd;
posix_fadvise(fd, 0, 0, POSIX_FADV_SEQUENTIAL);
}
- r = JournalImporter(io_ctx, journal_id, fd, no_error, verbose).exec();
+ JournalImporter importer(io_ctx, journal_id, fd, no_error, verbose);
+ r = importer.exec();
if (!from_stdin) {
close(fd);
}
+ int shut_down_r = importer.shut_down();
+ if (r == 0 && shut_down_r < 0) {
+ r = shut_down_r;
+ }
+
return r;
}