}
JournalMetadata::~JournalMetadata() {
- if (m_initialized) {
- shut_down();
- }
-}
-
-void JournalMetadata::init(Context *on_init) {
+ Mutex::Locker locker(m_lock);
assert(!m_initialized);
- m_initialized = true;
+}
- int r = m_ioctx.watch2(m_oid, &m_watch_handle, &m_watch_ctx);
- if (r < 0) {
- lderr(m_cct) << __func__ << ": failed to watch journal"
- << cpp_strerror(r) << dendl;
- on_init->complete(r);
- return;
+void JournalMetadata::init(Context *on_finish) {
+ {
+ Mutex::Locker locker(m_lock);
+ assert(!m_initialized);
+ m_initialized = true;
}
- C_ImmutableMetadata *ctx = new C_ImmutableMetadata(this, on_init);
- get_immutable_metadata(&m_order, &m_splay_width, &m_pool_id, ctx);
+ // chain the init sequence (reverse order)
+ on_finish = utils::create_async_context_callback(
+ this, on_finish);
+ on_finish = new C_ImmutableMetadata(this, on_finish);
+ on_finish = new FunctionContext([this, on_finish](int r) {
+ if (r < 0) {
+ lderr(m_cct) << __func__ << ": failed to watch journal"
+ << cpp_strerror(r) << dendl;
+ Mutex::Locker locker(m_lock);
+ m_watch_handle = 0;
+ on_finish->complete(r);
+ return;
+ }
+
+ get_immutable_metadata(&m_order, &m_splay_width, &m_pool_id, on_finish);
+ });
+
+ librados::AioCompletion *comp = librados::Rados::aio_create_completion(
+ on_finish, nullptr, utils::rados_ctx_callback);
+ int r = m_ioctx.aio_watch(m_oid, comp, &m_watch_handle, &m_watch_ctx);
+ assert(r == 0);
+ comp->release();
}
-void JournalMetadata::shut_down() {
+void JournalMetadata::shut_down(Context *on_finish) {
ldout(m_cct, 20) << __func__ << dendl;
- assert(m_initialized);
+ uint64_t watch_handle = 0;
{
Mutex::Locker locker(m_lock);
m_initialized = false;
-
- if (m_watch_handle != 0) {
- m_ioctx.unwatch2(m_watch_handle);
- m_watch_handle = 0;
- }
+ std::swap(watch_handle, m_watch_handle);
}
- flush_commit_position();
-
- librados::Rados rados(m_ioctx);
- rados.watch_flush();
-
- m_async_op_tracker.wait_for_ops();
+ // chain the shut down sequence (reverse order)
+ on_finish = utils::create_async_context_callback(
+ this, on_finish);
+ on_finish = new FunctionContext([this, on_finish](int r) {
+ ldout(m_cct, 20) << "shut_down: waiting for ops" << dendl;
+ m_async_op_tracker.wait_for_ops(on_finish);
+ });
+ on_finish = new FunctionContext([this, on_finish](int r) {
+ ldout(m_cct, 20) << "shut_down: flushing watch" << dendl;
+ librados::Rados rados(m_ioctx);
+ librados::AioCompletion *comp = librados::Rados::aio_create_completion(
+ on_finish, nullptr, utils::rados_ctx_callback);
+ r = rados.aio_watch_flush(comp);
+ assert(r == 0);
+ comp->release();
+ });
+ on_finish = new FunctionContext([this, on_finish](int r) {
+ flush_commit_position(on_finish);
+ });
+ if (watch_handle != 0) {
+ librados::AioCompletion *comp = librados::Rados::aio_create_completion(
+ on_finish, nullptr, utils::rados_ctx_callback);
+ int r = m_ioctx.aio_unwatch(watch_handle, comp);
+ assert(r == 0);
+ comp->release();
+ } else {
+ on_finish->complete(0);
+ }
}
void JournalMetadata::get_immutable_metadata(uint8_t *order,
return cond.wait();
}
+ int shut_down_journaler() {
+ m_journaler->shut_down();
+ return 0;
+ }
+
int register_client(const std::string &client_id, const std::string &desc) {
journal::Journaler journaler(m_work_queue, m_timer, &m_timer_lock,
m_ioctx, m_journal_id, client_id, 5);
ASSERT_EQ(0, create_journal(12, 8));
ASSERT_EQ(0, register_client(CLIENT_ID, "foo"));
ASSERT_EQ(0, init_journaler());
+ ASSERT_EQ(0, shut_down_journaler());
}
TEST_F(TestJournaler, InitDNE) {
ASSERT_EQ(-ENOENT, init_journaler());
+ ASSERT_EQ(0, shut_down_journaler());
}
TEST_F(TestJournaler, RegisterClientDuplicate) {