OPTION(bluestore_debug_prefill, OPT_FLOAT, 0)
OPTION(bluestore_debug_prefragment_max, OPT_INT, 1048576)
OPTION(bluestore_inject_wal_apply_delay, OPT_FLOAT, 0)
+OPTION(bluestore_shard_finishers, OPT_BOOL, false)
OPTION(kstore_max_ops, OPT_U64, 512)
OPTION(kstore_max_bytes, OPT_U64, 64*1024*1024)
cct->_conf->bluestore_wal_thread_timeout,
cct->_conf->bluestore_wal_thread_suicide_timeout,
&wal_tp),
- finisher(cct),
+ m_finisher_num(1),
kv_sync_thread(this),
kv_stop(false),
logger(NULL),
_init_logger();
g_ceph_context->_conf->add_observer(this);
set_cache_shards(1);
+
+ if (cct->_conf->bluestore_shard_finishers) {
+ m_finisher_num = cct->_conf->osd_op_num_shards;
+ }
+
+ for (int i = 0; i < m_finisher_num; ++i) {
+ ostringstream oss;
+ oss << "finisher-" << i;
+ Finisher *f = new Finisher(cct, oss.str(), "finisher");
+ finishers.push_back(f);
+ }
}
BlueStore::~BlueStore()
{
+ for (auto f : finishers) {
+ delete f;
+ f = NULL;
+ }
+
g_ceph_context->_conf->remove_observer(this);
_shutdown_logger();
assert(!mounted);
goto out_coll;
}
- finisher.start();
+ for (auto f : finishers) {
+ f->start();
+ }
wal_tp.start();
kv_sync_thread.create("bstore_kv_sync");
_kv_stop();
wal_wq.drain();
wal_tp.stop();
- finisher.wait_for_empty();
- finisher.stop();
+ for (auto f : finishers) {
+ f->wait_for_empty();
+ f->stop();
+ }
out_coll:
coll_map.clear();
out_alloc:
wal_wq.drain();
dout(20) << __func__ << " stopping wal_tp" << dendl;
wal_tp.stop();
- dout(20) << __func__ << " draining finisher" << dendl;
- finisher.wait_for_empty();
- dout(20) << __func__ << " stopping finisher" << dendl;
- finisher.stop();
+ for (auto f : finishers) {
+ dout(20) << __func__ << " draining finisher" << dendl;
+ f->wait_for_empty();
+ dout(20) << __func__ << " stopping finisher" << dendl;
+ f->stop();
+ }
dout(20) << __func__ << " closing" << dendl;
mounted = false;
txc->onreadable_sync->complete(0);
txc->onreadable_sync = NULL;
}
+ unsigned n = txc->osr->parent->shard_hint.hash_to_shard(m_finisher_num);
if (txc->onreadable) {
- finisher.queue(txc->onreadable);
+ finishers[n]->queue(txc->onreadable);
txc->onreadable = NULL;
}
if (txc->oncommit) {
- finisher.queue(txc->oncommit);
+ finishers[n]->queue(txc->oncommit);
txc->oncommit = NULL;
}
while (!txc->oncommits.empty()) {
- finisher.queue(txc->oncommits.front());
+ auto f = txc->oncommits.front();
+ finishers[n]->queue(f);
txc->oncommits.pop_front();
}