]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
journal: metadata init and shutdown now asynchronous
authorJason Dillaman <dillaman@redhat.com>
Tue, 24 May 2016 19:36:17 +0000 (15:36 -0400)
committerJason Dillaman <dillaman@redhat.com>
Wed, 25 May 2016 12:19:17 +0000 (08:19 -0400)
Signed-off-by: Jason Dillaman <dillaman@redhat.com>
src/journal/JournalMetadata.cc
src/journal/JournalMetadata.h
src/journal/Journaler.cc
src/test/journal/RadosTestFixture.cc
src/test/journal/RadosTestFixture.h
src/test/journal/test_Journaler.cc

index 6c9ffd61cb45ee60757ad4d27549eb203b519401..18bef13b0f998d7e1b44907211b4a5e4247af171 100644 (file)
@@ -416,48 +416,80 @@ JournalMetadata::JournalMetadata(ContextWQ *work_queue, SafeTimer *timer,
 }
 
 JournalMetadata::~JournalMetadata() {
-  if (m_initialized) {
-    shut_down();
-  }
-}
-
-void JournalMetadata::init(Context *on_init) {
+  Mutex::Locker locker(m_lock);
   assert(!m_initialized);
-  m_initialized = true;
+}
 
-  int r = m_ioctx.watch2(m_oid, &m_watch_handle, &m_watch_ctx);
-  if (r < 0) {
-    lderr(m_cct) << __func__ << ": failed to watch journal"
-                 << cpp_strerror(r) << dendl;
-    on_init->complete(r);
-    return;
+void JournalMetadata::init(Context *on_finish) {
+  {
+    Mutex::Locker locker(m_lock);
+    assert(!m_initialized);
+    m_initialized = true;
   }
 
-  C_ImmutableMetadata *ctx = new C_ImmutableMetadata(this, on_init);
-  get_immutable_metadata(&m_order, &m_splay_width, &m_pool_id, ctx);
+  // chain the init sequence (reverse order)
+  on_finish = utils::create_async_context_callback(
+    this, on_finish);
+  on_finish = new C_ImmutableMetadata(this, on_finish);
+  on_finish = new FunctionContext([this, on_finish](int r) {
+      if (r < 0) {
+        lderr(m_cct) << __func__ << ": failed to watch journal"
+                     << cpp_strerror(r) << dendl;
+        Mutex::Locker locker(m_lock);
+        m_watch_handle = 0;
+        on_finish->complete(r);
+        return;
+      }
+
+      get_immutable_metadata(&m_order, &m_splay_width, &m_pool_id, on_finish);
+    });
+
+  librados::AioCompletion *comp = librados::Rados::aio_create_completion(
+    on_finish, nullptr, utils::rados_ctx_callback);
+  int r = m_ioctx.aio_watch(m_oid, comp, &m_watch_handle, &m_watch_ctx);
+  assert(r == 0);
+  comp->release();
 }
 
-void JournalMetadata::shut_down() {
+void JournalMetadata::shut_down(Context *on_finish) {
 
   ldout(m_cct, 20) << __func__ << dendl;
 
-  assert(m_initialized);
+  uint64_t watch_handle = 0;
   {
     Mutex::Locker locker(m_lock);
     m_initialized = false;
-
-    if (m_watch_handle != 0) {
-      m_ioctx.unwatch2(m_watch_handle);
-      m_watch_handle = 0;
-    }
+    std::swap(watch_handle, m_watch_handle);
   }
 
-  flush_commit_position();
-
-  librados::Rados rados(m_ioctx);
-  rados.watch_flush();
-
-  m_async_op_tracker.wait_for_ops();
+  // chain the shut down sequence (reverse order)
+  on_finish = utils::create_async_context_callback(
+    this, on_finish);
+  on_finish = new FunctionContext([this, on_finish](int r) {
+      ldout(m_cct, 20) << "shut_down: waiting for ops" << dendl;
+      m_async_op_tracker.wait_for_ops(on_finish);
+    });
+  on_finish = new FunctionContext([this, on_finish](int r) {
+      ldout(m_cct, 20) << "shut_down: flushing watch" << dendl;
+      librados::Rados rados(m_ioctx);
+      librados::AioCompletion *comp = librados::Rados::aio_create_completion(
+        on_finish, nullptr, utils::rados_ctx_callback);
+      r = rados.aio_watch_flush(comp);
+      assert(r == 0);
+      comp->release();
+    });
+  on_finish = new FunctionContext([this, on_finish](int r) {
+      flush_commit_position(on_finish);
+    });
+  if (watch_handle != 0) {
+    librados::AioCompletion *comp = librados::Rados::aio_create_completion(
+      on_finish, nullptr, utils::rados_ctx_callback);
+    int r = m_ioctx.aio_unwatch(watch_handle, comp);
+    assert(r == 0);
+    comp->release();
+  } else {
+    on_finish->complete(0);
+  }
 }
 
 void JournalMetadata::get_immutable_metadata(uint8_t *order,
index 3ded24bb565dd90ffeacc73f39f74a16ceb17db2..1c084a4c5a5dc15b8c733ed99fec971a60696681 100644 (file)
@@ -52,7 +52,7 @@ public:
   ~JournalMetadata();
 
   void init(Context *on_init);
-  void shut_down();
+  void shut_down(Context *on_finish);
 
   bool is_initialized() const { return m_initialized; }
 
index d1e80ea8b6173c0dd8db954c2016f907ad2834e1..d7da30f226fe81aff387eb4de08feebc2a2d39b5 100644 (file)
@@ -162,7 +162,9 @@ int Journaler::init_complete() {
 }
 
 void Journaler::shut_down() {
-  m_metadata->shut_down();
+  C_SaferCond ctx;
+  m_metadata->shut_down(&ctx);
+  ctx.wait();
 }
 
 bool Journaler::is_initialized() const {
@@ -201,7 +203,9 @@ int Journaler::create(uint8_t order, uint8_t splay_width, int64_t pool_id) {
 }
 
 int Journaler::remove(bool force) {
-  m_metadata->shut_down();
+  C_SaferCond ctx;
+  m_metadata->shut_down(&ctx);
+  ctx.wait();
 
   ldout(m_cct, 5) << "removing journal: " << m_header_oid << dendl;
   int r = m_trimmer->remove_objects(force);
index 6fa8759545961d0d00e3db11d7951221b96cf3fb..f57e5aed8c42cb2f5b4615a6b504ebafaa8de3f7 100644 (file)
@@ -44,6 +44,12 @@ void RadosTestFixture::SetUp() {
 }
 
 void RadosTestFixture::TearDown() {
+  for (auto metadata : m_metadatas) {
+    C_SaferCond ctx;
+    metadata->shut_down(&ctx);
+    ASSERT_EQ(0, ctx.wait());
+  }
+
   {
     Mutex::Locker locker(m_timer_lock);
     m_timer->shutdown();
@@ -65,6 +71,7 @@ journal::JournalMetadataPtr RadosTestFixture::create_metadata(
   journal::JournalMetadataPtr metadata(new journal::JournalMetadata(
     m_work_queue, m_timer, &m_timer_lock, m_ioctx, oid, client_id,
     commit_internal));
+  m_metadatas.push_back(metadata);
   return metadata;
 }
 
index 3415b0b6f02e0441d46cad474d940e73bca9f750..d7cd1a845d6734f64fca505b8e649c669048f700 100644 (file)
@@ -68,4 +68,6 @@ public:
   SafeTimer *m_timer;
 
   Listener m_listener;
+
+  std::list<journal::JournalMetadataPtr> m_metadatas;
 };
index df029a5a4773a79c86620a7a48591a32d1a11845..bdc70fb855d084c27b6c0f9e4e77c936bb316330 100644 (file)
@@ -39,6 +39,11 @@ public:
     return cond.wait();
   }
 
+  int shut_down_journaler() {
+    m_journaler->shut_down();
+    return 0;
+  }
+
   int register_client(const std::string &client_id, const std::string &desc) {
     journal::Journaler journaler(m_work_queue, m_timer, &m_timer_lock,
                                  m_ioctx, m_journal_id, client_id, 5);
@@ -95,10 +100,12 @@ TEST_F(TestJournaler, Init) {
   ASSERT_EQ(0, create_journal(12, 8));
   ASSERT_EQ(0, register_client(CLIENT_ID, "foo"));
   ASSERT_EQ(0, init_journaler());
+  ASSERT_EQ(0, shut_down_journaler());
 }
 
 TEST_F(TestJournaler, InitDNE) {
   ASSERT_EQ(-ENOENT, init_journaler());
+  ASSERT_EQ(0, shut_down_journaler());
 }
 
 TEST_F(TestJournaler, RegisterClientDuplicate) {