cct->_conf->bluestore_throttle_bytes +
cct->_conf->bluestore_throttle_deferred_bytes),
kv_sync_thread(this),
+ kv_finalize_thread(this),
mempool_thread(this)
{
_init_logger();
cct->_conf->bluestore_throttle_bytes +
cct->_conf->bluestore_throttle_deferred_bytes),
kv_sync_thread(this),
+ kv_finalize_thread(this),
min_alloc_size(_min_alloc_size),
min_alloc_size_order(ctz(_min_alloc_size)),
mempool_thread(this)
f->start();
}
kv_sync_thread.create("bstore_kv_sync");
+ kv_finalize_thread.create("bstore_kv_final");
r = _deferred_replay();
if (r < 0)
std::lock_guard<std::mutex> l(kv_lock);
kv_cond.notify_one();
}
+ {
+ std::lock_guard<std::mutex> l(kv_finalize_lock);
+ kv_finalize_cond.notify_one();
+ }
for (auto osr : s) {
dout(20) << __func__ << " drain " << osr << dendl;
osr->drain();
logger->tinc(l_bluestore_kv_commit_lat, dur_kv);
logger->tinc(l_bluestore_kv_lat, dur);
}
- while (!kv_committing.empty()) {
- TransContext *txc = kv_committing.front();
+
+ if (bluefs) {
+ if (!bluefs_gift_extents.empty()) {
+ _commit_bluefs_freespace(bluefs_gift_extents);
+ }
+ for (auto p = bluefs_extents_reclaiming.begin();
+ p != bluefs_extents_reclaiming.end();
+ ++p) {
+ dout(20) << __func__ << " releasing old bluefs 0x" << std::hex
+ << p.get_start() << "~" << p.get_len() << std::dec
+ << dendl;
+ alloc->release(p.get_start(), p.get_len());
+ }
+ bluefs_extents_reclaiming.clear();
+ }
+
+ {
+ std::unique_lock<std::mutex> m(kv_finalize_lock);
+ if (kv_committing_to_finalize.empty()) {
+ kv_committing_to_finalize.swap(kv_committing);
+ } else {
+ kv_committing_to_finalize.insert(
+ kv_committing_to_finalize.end(),
+ kv_committing.begin(),
+ kv_committing.end());
+ kv_committing.clear();
+ }
+ if (deferred_stable_to_finalize.empty()) {
+ deferred_stable_to_finalize.swap(deferred_stable);
+ } else {
+ deferred_stable_to_finalize.insert(
+ deferred_stable_to_finalize.end(),
+ deferred_stable.begin(),
+ deferred_stable.end());
+ deferred_stable.clear();
+ }
+ kv_finalize_cond.notify_one();
+ }
+
+ l.lock();
+ // previously deferred "done" are now "stable" by virtue of this
+ // commit cycle.
+ deferred_stable_queue.swap(deferred_done);
+ }
+ }
+ dout(10) << __func__ << " finish" << dendl;
+}
+
+void BlueStore::_kv_finalize_thread()
+{
+ deque<TransContext*> kv_committed;
+ deque<DeferredBatch*> deferred_stable;
+ dout(10) << __func__ << " start" << dendl;
+ std::unique_lock<std::mutex> l(kv_finalize_lock);
+ while (true) {
+ assert(kv_committed.empty());
+ assert(deferred_stable.empty());
+ if (kv_committing_to_finalize.empty() &&
+ deferred_stable_to_finalize.empty()) {
+ if (kv_stop)
+ break;
+ dout(20) << __func__ << " sleep" << dendl;
+ kv_finalize_cond.wait(l);
+ dout(20) << __func__ << " wake" << dendl;
+ } else {
+ kv_committed.swap(kv_committing_to_finalize);
+ deferred_stable.swap(deferred_stable_to_finalize);
+ l.unlock();
+ dout(20) << __func__ << " kv_committed " << kv_committed << dendl;
+ dout(20) << __func__ << " deferred_stable " << deferred_stable << dendl;
+
+ while (!kv_committed.empty()) {
+ TransContext *txc = kv_committed.front();
assert(txc->state == TransContext::STATE_KV_SUBMITTED);
_txc_state_proc(txc);
- kv_committing.pop_front();
+ kv_committed.pop_front();
}
for (auto b : deferred_stable) {
auto p = b->txcs.begin();
}
delete b;
}
+ deferred_stable.clear();
if (!deferred_aggressive) {
std::lock_guard<std::mutex> l(deferred_lock);
// this is as good a place as any ...
_reap_collections();
- if (bluefs) {
- if (!bluefs_gift_extents.empty()) {
- _commit_bluefs_freespace(bluefs_gift_extents);
- }
- for (auto p = bluefs_extents_reclaiming.begin();
- p != bluefs_extents_reclaiming.end();
- ++p) {
- dout(20) << __func__ << " releasing old bluefs 0x" << std::hex
- << p.get_start() << "~" << p.get_len() << std::dec
- << dendl;
- alloc->release(p.get_start(), p.get_len());
- }
- bluefs_extents_reclaiming.clear();
- }
-
l.lock();
- // previously deferred "done" are now "stable" by virtue of this
- // commit cycle.
- deferred_stable_queue.swap(deferred_done);
}
}
dout(10) << __func__ << " finish" << dendl;
return NULL;
}
};
+ struct KVFinalizeThread : public Thread {
+ BlueStore *store;
+ explicit KVFinalizeThread(BlueStore *s) : store(s) {}
+ void *entry() {
+ store->_kv_finalize_thread();
+ return NULL;
+ }
+ };
struct DBHistogram {
struct value_dist {
deque<DeferredBatch*> deferred_done_queue; ///< deferred ios done
deque<DeferredBatch*> deferred_stable_queue; ///< deferred ios done + stable
+ KVFinalizeThread kv_finalize_thread;
+ std::mutex kv_finalize_lock;
+ std::condition_variable kv_finalize_cond;
+ deque<TransContext*> kv_committing_to_finalize; ///< pending finalization
+ deque<DeferredBatch*> deferred_stable_to_finalize; ///< pending finalization
+
PerfCounters *logger = nullptr;
std::mutex reap_lock;
void _osr_unregister_all();
void _kv_sync_thread();
+ void _kv_finalize_thread();
void _kv_stop() {
{
std::lock_guard<std::mutex> l(kv_lock);
kv_stop = true;
kv_cond.notify_all();
}
+ {
+ std::lock_guard<std::mutex> l(kv_finalize_lock);
+ kv_finalize_cond.notify_all();
+ }
+
kv_sync_thread.join();
+ kv_finalize_thread.join();
{
std::lock_guard<std::mutex> l(kv_lock);
kv_stop = false;