]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
os/bluestore: wait for kv thread to start before stopping it
authorSage Weil <sage@redhat.com>
Fri, 26 May 2017 22:18:06 +0000 (18:18 -0400)
committerSage Weil <sage@redhat.com>
Tue, 30 May 2017 01:56:17 +0000 (21:56 -0400)
Otherwise we can assert out when we try to join a thread that
hasn't started.

- move everything into _kv_start() and _kv_stop()
- separate stop bools for each thread
- wait until thread starts before signalling stop (and potentially calling
join()).

Signed-off-by: Sage Weil <sage@redhat.com>
src/os/bluestore/BlueStore.cc
src/os/bluestore/BlueStore.h

index bbff5d607f29c62fb77ce98aabec970a7ecfbe3d..647819486f22acccff9dc0ce13c7d0f456fb4784 100644 (file)
@@ -5012,11 +5012,7 @@ int BlueStore::_mount(bool kv_only)
       goto out_coll;
   }
 
-  for (auto f : finishers) {
-    f->start();
-  }
-  kv_sync_thread.create("bstore_kv_sync");
-  kv_finalize_thread.create("bstore_kv_final");
+  _kv_start();
 
   r = _deferred_replay();
   if (r < 0)
@@ -5030,10 +5026,6 @@ int BlueStore::_mount(bool kv_only)
 
  out_stop:
   _kv_stop();
-  for (auto f : finishers) {
-    f->wait_for_empty();
-    f->stop();
-  }
  out_coll:
   flush_cache();
  out_alloc:
@@ -5063,12 +5055,6 @@ int BlueStore::umount()
 
   dout(20) << __func__ << " stopping kv thread" << dendl;
   _kv_stop();
-  for (auto f : finishers) {
-    dout(20) << __func__ << " draining finisher" << dendl;
-    f->wait_for_empty();
-    dout(20) << __func__ << " stopping finisher" << dendl;
-    f->stop();
-  }
   _reap_collections();
   flush_cache();
   dout(20) << __func__ << " closing" << dendl;
@@ -5216,17 +5202,10 @@ int BlueStore::fsck(bool deep)
 
   mempool_thread.init();
 
-  // we need finishrs and kv_sync_thread *just* for replay.
-  for (auto f : finishers) {
-    f->start();
-  }
-  kv_sync_thread.create("bstore_kv_sync");
+  // we need finishrs and kv_{sync,fainlize}_thread *just* for replay
+  _kv_start();
   r = _deferred_replay();
   _kv_stop();
-  for (auto f : finishers) {
-    f->wait_for_empty();
-    f->stop();
-  }
   if (r < 0)
     goto out_scan;
 
@@ -7958,10 +7937,60 @@ void BlueStore::_osr_unregister_all()
   }
 }
 
+void BlueStore::_kv_start()
+{
+  dout(10) << __func__ << dendl;
+  for (auto f : finishers) {
+    f->start();
+  }
+  kv_sync_thread.create("bstore_kv_sync");
+  kv_finalize_thread.create("bstore_kv_final");
+}
+
+void BlueStore::_kv_stop()
+{
+  dout(10) << __func__ << dendl;
+  {
+    std::unique_lock<std::mutex> l(kv_lock);
+    while (!kv_sync_started) {
+      kv_cond.wait(l);
+    }
+    kv_stop = true;
+    kv_cond.notify_all();
+  }
+  {
+    std::unique_lock<std::mutex> l(kv_finalize_lock);
+    while (!kv_finalize_started) {
+      kv_finalize_cond.wait(l);
+    }
+    kv_finalize_stop = true;
+    kv_finalize_cond.notify_all();
+  }
+  kv_sync_thread.join();
+  kv_finalize_thread.join();
+  {
+    std::lock_guard<std::mutex> l(kv_lock);
+    kv_stop = false;
+  }
+  {
+    std::lock_guard<std::mutex> l(kv_finalize_lock);
+    kv_finalize_stop = false;
+  }
+  dout(10) << __func__ << " stopping finishers" << dendl;
+  for (auto f : finishers) {
+    f->wait_for_empty();
+    f->stop();
+  }
+  dout(10) << __func__ << " stopped" << dendl;
+}
+
 void BlueStore::_kv_sync_thread()
 {
   dout(10) << __func__ << " start" << dendl;
   std::unique_lock<std::mutex> l(kv_lock);
+  assert(!kv_sync_started);
+  kv_sync_started = true;
+  kv_cond.notify_all();
   while (true) {
     assert(kv_committing.empty());
     if (kv_queue.empty() &&
@@ -8196,6 +8225,7 @@ void BlueStore::_kv_sync_thread()
     }
   }
   dout(10) << __func__ << " finish" << dendl;
+  kv_sync_started = false;
 }
 
 void BlueStore::_kv_finalize_thread()
@@ -8204,12 +8234,15 @@ void BlueStore::_kv_finalize_thread()
   deque<DeferredBatch*> deferred_stable;
   dout(10) << __func__ << " start" << dendl;
   std::unique_lock<std::mutex> l(kv_finalize_lock);
+  assert(!kv_finalize_started);
+  kv_finalize_started = true;
+  kv_finalize_cond.notify_all();
   while (true) {
     assert(kv_committed.empty());
     assert(deferred_stable.empty());
     if (kv_committing_to_finalize.empty() &&
        deferred_stable_to_finalize.empty()) {
-      if (kv_stop)
+      if (kv_finalize_stop)
        break;
       dout(20) << __func__ << " sleep" << dendl;
       kv_finalize_cond.wait(l);
@@ -8253,6 +8286,7 @@ void BlueStore::_kv_finalize_thread()
     }
   }
   dout(10) << __func__ << " finish" << dendl;
+  kv_finalize_started = false;
 }
 
 bluestore_deferred_op_t *BlueStore::_get_deferred_op(
index f37c0b1a6756f224272a246e749bb81a1a50f364..5063395f9508607f34840753f90a60da7230fe1b 100644 (file)
@@ -1810,7 +1810,10 @@ private:
   KVSyncThread kv_sync_thread;
   std::mutex kv_lock;
   std::condition_variable kv_cond;
+  bool kv_sync_started = false;
   bool kv_stop = false;
+  bool kv_finalize_started = false;
+  bool kv_finalize_stop = false;
   deque<TransContext*> kv_queue;             ///< ready, already submitted
   deque<TransContext*> kv_queue_unsubmitted; ///< ready, need submit by kv thread
   deque<TransContext*> kv_committing;        ///< currently syncing
@@ -1984,26 +1987,10 @@ private:
   void _osr_drain_all();
   void _osr_unregister_all();
 
+  void _kv_start();
+  void _kv_stop();
   void _kv_sync_thread();
   void _kv_finalize_thread();
-  void _kv_stop() {
-    {
-      std::lock_guard<std::mutex> l(kv_lock);
-      kv_stop = true;
-      kv_cond.notify_all();
-    }
-    {
-      std::lock_guard<std::mutex> l(kv_finalize_lock);
-      kv_finalize_cond.notify_all();
-    }
-
-    kv_sync_thread.join();
-    kv_finalize_thread.join();
-    {
-      std::lock_guard<std::mutex> l(kv_lock);
-      kv_stop = false;
-    }
-  }
 
   bluestore_deferred_op_t *_get_deferred_op(TransContext *txc, OnodeRef o);
   void _deferred_queue(TransContext *txc);