goto out_coll;
}
- for (auto f : finishers) {
- f->start();
- }
- kv_sync_thread.create("bstore_kv_sync");
- kv_finalize_thread.create("bstore_kv_final");
+ _kv_start();
r = _deferred_replay();
if (r < 0)
out_stop:
_kv_stop();
- for (auto f : finishers) {
- f->wait_for_empty();
- f->stop();
- }
out_coll:
flush_cache();
out_alloc:
dout(20) << __func__ << " stopping kv thread" << dendl;
_kv_stop();
- for (auto f : finishers) {
- dout(20) << __func__ << " draining finisher" << dendl;
- f->wait_for_empty();
- dout(20) << __func__ << " stopping finisher" << dendl;
- f->stop();
- }
_reap_collections();
flush_cache();
dout(20) << __func__ << " closing" << dendl;
mempool_thread.init();
- // we need finishrs and kv_sync_thread *just* for replay.
- for (auto f : finishers) {
- f->start();
- }
- kv_sync_thread.create("bstore_kv_sync");
+ // we need finishrs and kv_{sync,fainlize}_thread *just* for replay
+ _kv_start();
r = _deferred_replay();
_kv_stop();
- for (auto f : finishers) {
- f->wait_for_empty();
- f->stop();
- }
if (r < 0)
goto out_scan;
}
}
+void BlueStore::_kv_start()
+{
+ dout(10) << __func__ << dendl;
+ for (auto f : finishers) {
+ f->start();
+ }
+ kv_sync_thread.create("bstore_kv_sync");
+ kv_finalize_thread.create("bstore_kv_final");
+}
+
+void BlueStore::_kv_stop()
+{
+ dout(10) << __func__ << dendl;
+ {
+ std::unique_lock<std::mutex> l(kv_lock);
+ while (!kv_sync_started) {
+ kv_cond.wait(l);
+ }
+ kv_stop = true;
+ kv_cond.notify_all();
+ }
+ {
+ std::unique_lock<std::mutex> l(kv_finalize_lock);
+ while (!kv_finalize_started) {
+ kv_finalize_cond.wait(l);
+ }
+ kv_finalize_stop = true;
+ kv_finalize_cond.notify_all();
+ }
+ kv_sync_thread.join();
+ kv_finalize_thread.join();
+ {
+ std::lock_guard<std::mutex> l(kv_lock);
+ kv_stop = false;
+ }
+ {
+ std::lock_guard<std::mutex> l(kv_finalize_lock);
+ kv_finalize_stop = false;
+ }
+ dout(10) << __func__ << " stopping finishers" << dendl;
+ for (auto f : finishers) {
+ f->wait_for_empty();
+ f->stop();
+ }
+ dout(10) << __func__ << " stopped" << dendl;
+}
+
void BlueStore::_kv_sync_thread()
{
dout(10) << __func__ << " start" << dendl;
std::unique_lock<std::mutex> l(kv_lock);
+ assert(!kv_sync_started);
+ kv_sync_started = true;
+ kv_cond.notify_all();
while (true) {
assert(kv_committing.empty());
if (kv_queue.empty() &&
}
}
dout(10) << __func__ << " finish" << dendl;
+ kv_sync_started = false;
}
void BlueStore::_kv_finalize_thread()
deque<DeferredBatch*> deferred_stable;
dout(10) << __func__ << " start" << dendl;
std::unique_lock<std::mutex> l(kv_finalize_lock);
+ assert(!kv_finalize_started);
+ kv_finalize_started = true;
+ kv_finalize_cond.notify_all();
while (true) {
assert(kv_committed.empty());
assert(deferred_stable.empty());
if (kv_committing_to_finalize.empty() &&
deferred_stable_to_finalize.empty()) {
- if (kv_stop)
+ if (kv_finalize_stop)
break;
dout(20) << __func__ << " sleep" << dendl;
kv_finalize_cond.wait(l);
}
}
dout(10) << __func__ << " finish" << dendl;
+ kv_finalize_started = false;
}
bluestore_deferred_op_t *BlueStore::_get_deferred_op(
KVSyncThread kv_sync_thread;
std::mutex kv_lock;
std::condition_variable kv_cond;
+ bool kv_sync_started = false;
bool kv_stop = false;
+ bool kv_finalize_started = false;
+ bool kv_finalize_stop = false;
deque<TransContext*> kv_queue; ///< ready, already submitted
deque<TransContext*> kv_queue_unsubmitted; ///< ready, need submit by kv thread
deque<TransContext*> kv_committing; ///< currently syncing
void _osr_drain_all();
void _osr_unregister_all();
+ void _kv_start();
+ void _kv_stop();
void _kv_sync_thread();
void _kv_finalize_thread();
- void _kv_stop() {
- {
- std::lock_guard<std::mutex> l(kv_lock);
- kv_stop = true;
- kv_cond.notify_all();
- }
- {
- std::lock_guard<std::mutex> l(kv_finalize_lock);
- kv_finalize_cond.notify_all();
- }
-
- kv_sync_thread.join();
- kv_finalize_thread.join();
- {
- std::lock_guard<std::mutex> l(kv_lock);
- kv_stop = false;
- }
- }
bluestore_deferred_op_t *_get_deferred_op(TransContext *txc, OnodeRef o);
void _deferred_queue(TransContext *txc);