}
};
+class ContextQueue {
+ list<Context *> q;
+ std::mutex q_mutex;
+ Mutex& mutex;
+ Cond& cond;
+public:
+ ContextQueue(Mutex& mut, Cond& con) : mutex(mut), cond(con) {}
+
+ void queue(list<Context *>& ls) {
+ {
+ std::scoped_lock l(q_mutex);
+ if (q.empty()) {
+ q.swap(ls);
+ } else {
+ q.insert(q.end(), ls.begin(), ls.end());
+ }
+ }
+
+ mutex.Lock();
+ cond.SignalOne();
+ mutex.Unlock();
+
+ ls.clear();
+ }
+
+ void swap(list<Context *>& ls) {
+ ls.clear();
+ std::scoped_lock l(q_mutex);
+ if (!q.empty()) {
+ q.swap(ls);
+ }
+ }
+
+ bool empty() {
+ std::scoped_lock l(q_mutex);
+ return q.empty();
+ }
+};
+
#endif
OPTION(bluestore_debug_fsck_abort, OPT_BOOL)
OPTION(bluestore_debug_omit_kv_commit, OPT_BOOL)
OPTION(bluestore_debug_permit_any_bdev_label, OPT_BOOL)
-OPTION(bluestore_shard_finishers, OPT_BOOL)
OPTION(bluestore_debug_random_read_err, OPT_DOUBLE)
OPTION(bluestore_debug_inject_bug21040, OPT_BOOL)
OPTION(bluestore_debug_inject_csum_err_probability, OPT_FLOAT)
.set_default(false)
.set_description(""),
- Option("bluestore_shard_finishers", Option::TYPE_BOOL, Option::LEVEL_DEV)
- .set_default(false)
- .set_description(""),
-
Option("bluestore_debug_random_read_err", Option::TYPE_FLOAT, Option::LEVEL_DEV)
.set_default(0)
.set_description(""),
*/
class Logger;
-
+class ContextQueue;
static inline void encode(const map<string,bufferptr> *attrset, bufferlist &bl) {
encode(*attrset, bl);
*/
virtual CollectionHandle create_new_collection(const coll_t &cid) = 0;
+ /**
+ * set ContextQueue for a collection
+ *
+ * After that, oncommits of Transaction will queue into commit_queue.
+ * And osd ShardThread will call oncommits.
+ */
+ virtual void set_collection_commit_queue(const coll_t &cid, ContextQueue *commit_queue) = 0;
/**
* Synchronous read operations
cache(c),
lock("BlueStore::Collection::lock", true, false),
exists(true),
- onode_map(c)
+ onode_map(c),
+ commit_queue(nullptr)
{
}
cct->_conf->bluestore_throttle_bytes +
cct->_conf->bluestore_throttle_deferred_bytes),
deferred_finisher(cct, "defered_finisher", "dfin"),
+ finisher(cct, "commit_finisher", "cfin"),
kv_sync_thread(this),
kv_finalize_thread(this),
mempool_thread(this)
cct->_conf->bluestore_throttle_bytes +
cct->_conf->bluestore_throttle_deferred_bytes),
deferred_finisher(cct, "defered_finisher", "dfin"),
+ finisher(cct, "commit_finisher", "cfin"),
kv_sync_thread(this),
kv_finalize_thread(this),
min_alloc_size(_min_alloc_size),
BlueStore::~BlueStore()
{
- for (auto f : finishers) {
- delete f;
- }
- finishers.clear();
-
cct->_conf.remove_observer(this);
_shutdown_logger();
ceph_assert(!mounted);
<< std::dec << dendl;
}
-void BlueStore::_set_finisher_num()
-{
- if (cct->_conf->bluestore_shard_finishers) {
- if (cct->_conf->osd_op_num_shards) {
- m_finisher_num = cct->_conf->osd_op_num_shards;
- } else {
- ceph_assert(bdev);
- if (bdev->is_rotational()) {
- m_finisher_num = cct->_conf->osd_op_num_shards_hdd;
- } else {
- m_finisher_num = cct->_conf->osd_op_num_shards_ssd;
- }
- }
- }
- ceph_assert(m_finisher_num != 0);
-}
-
int BlueStore::_set_cache_sizes()
{
ceph_assert(bdev);
mempool_thread.init();
- // we need finishers and kv_{sync,finalize}_thread *just* for replay
+ // we need finisher and kv_{sync,finalize}_thread *just* for replay
_kv_start();
r = _deferred_replay();
_kv_stop();
return c;
}
+void BlueStore::set_collection_commit_queue(
+ const coll_t& cid,
+ ContextQueue *commit_queue)
+{
+ if (commit_queue) {
+ RWLock::RLocker l(coll_lock);
+ if (coll_map.count(cid)) {
+ coll_map[cid]->commit_queue = commit_queue;
+ } else if (new_coll_map.count(cid)) {
+ new_coll_map[cid]->commit_queue = commit_queue;
+ }
+ }
+}
+
+
bool BlueStore::exists(CollectionHandle &c_, const ghobject_t& oid)
{
Collection *c = static_cast<Collection *>(c_.get());
_set_compression();
_set_blob_size();
- _set_finisher_num();
-
_validate_bdev();
return 0;
}
{
std::lock_guard<std::mutex> l(txc->osr->qlock);
txc->state = TransContext::STATE_KV_DONE;
- finishers[txc->osr->shard]->queue(txc->oncommits);
+ if (txc->ch->commit_queue) {
+ txc->ch->commit_queue->queue(txc->oncommits);
+ } else {
+ finisher.queue(txc->oncommits);
+ }
}
txc->log_state_latency(logger, l_bluestore_state_kv_committing_lat);
logger->tinc(l_bluestore_commit_lat, ceph_clock_now() - txc->start);
auto q = coll_map.find(c->cid);
if (q != coll_map.end()) {
c->osr = q->second->osr;
- ceph_assert(c->osr->shard == c->cid.hash_to_shard(m_finisher_num));
ldout(cct, 10) << __func__ << " " << c->cid
<< " reusing osr " << c->osr << " from existing coll "
<< q->second << dendl;
auto p = zombie_osr_set.find(c->cid);
if (p == zombie_osr_set.end()) {
c->osr = new OpSequencer(this, c->cid);
- c->osr->shard = c->cid.hash_to_shard(m_finisher_num);
ldout(cct, 10) << __func__ << " " << c->cid
<< " fresh osr " << c->osr << dendl;
} else {
ldout(cct, 10) << __func__ << " " << c->cid
<< " resurrecting zombie osr " << c->osr << dendl;
c->osr->zombie = false;
- ceph_assert(c->osr->shard == c->cid.hash_to_shard(m_finisher_num));
}
}
}
{
dout(10) << __func__ << dendl;
- for (int i = 0; i < m_finisher_num; ++i) {
- ostringstream oss;
- oss << "finisher-" << i;
- Finisher *f = new Finisher(cct, oss.str(), "finisher");
- finishers.push_back(f);
- }
-
deferred_finisher.start();
- for (auto f : finishers) {
- f->start();
- }
+ finisher.start();
kv_sync_thread.create("bstore_kv_sync");
kv_finalize_thread.create("bstore_kv_final");
}
dout(10) << __func__ << " stopping finishers" << dendl;
deferred_finisher.wait_for_empty();
deferred_finisher.stop();
- for (auto f : finishers) {
- f->wait_for_empty();
- f->stop();
- }
+ finisher.wait_for_empty();
+ finisher.stop();
dout(10) << __func__ << " stopped" << dendl;
}
for (auto c : on_applied_sync) {
c->complete(0);
}
- for (auto c : on_applied) {
- finishers[osr->shard]->queue(c);
+ if (!on_applied.empty()) {
+ if (c->commit_queue) {
+ c->commit_queue->queue(on_applied);
+ } else {
+ finisher.queue(on_applied);
+ }
}
logger->tinc(l_bluestore_submit_lat, mono_clock::now() - start);
//pool options
pool_opts_t pool_opts;
+ ContextQueue *commit_queue;
OnodeRef get_onode(const ghobject_t& oid, bool create);
BlueStore *store;
coll_t cid;
- size_t shard;
-
uint64_t last_seq = 0;
std::atomic_int txc_with_unstable_io = {0}; ///< num txcs with unstable io
deferred_osr_queue_t deferred_queue; ///< osr's with deferred io pending
int deferred_queue_size = 0; ///< num txc's queued across all osrs
atomic_int deferred_aggressive = {0}; ///< aggressive wakeup of kv thread
- Finisher deferred_finisher;
-
- int m_finisher_num = 1;
- vector<Finisher*> finishers;
+ Finisher deferred_finisher, finisher;
KVSyncThread kv_sync_thread;
std::mutex kv_lock;
CollectionHandle open_collection(const coll_t &c) override;
CollectionHandle create_new_collection(const coll_t& cid) override;
+ void set_collection_commit_queue(const coll_t& cid,
+ ContextQueue *commit_queue) override;
bool collection_exists(const coll_t& c) override;
int collection_empty(CollectionHandle& c, bool *empty) override;
CollectionHandle open_collection(const coll_t& c) override;
CollectionHandle create_new_collection(const coll_t& c) override;
+ void set_collection_commit_queue(const coll_t& cid,
+ ContextQueue *commit_queue) override {
+ }
int queue_transactions(CollectionHandle& ch, vector<Transaction>& tls,
TrackedOpRef op = TrackedOpRef(),
CollectionHandle open_collection(const coll_t& c) override;
CollectionHandle create_new_collection(const coll_t& c) override;
+ void set_collection_commit_queue(const coll_t& cid,
+ ContextQueue *commit_queue) override {
+ }
using ObjectStore::exists;
bool exists(CollectionHandle& c, const ghobject_t& oid) override;
return get_collection(c);
}
CollectionHandle create_new_collection(const coll_t& c) override;
+
+ void set_collection_commit_queue(const coll_t& cid,
+ ContextQueue *commit_queue) override {
+ }
+
bool collection_exists(const coll_t& c) override;
int collection_empty(CollectionHandle& c, bool *empty) override;
int collection_bits(CollectionHandle& c) override;
recursive_remove_collection(cct, store, pgid, *it);
continue;
}
+ {
+ uint32_t shard_index = pgid.hash_to_shard(shards.size());
+ assert(NULL != shards[shard_index]);
+ store->set_collection_commit_queue(pg->coll, &(shards[shard_index]->context_queue));
+ }
pg->reg_next_scrub();
PGRef pg = _make_pg(startmap, pgid);
pg->ch = store->create_new_collection(pg->coll);
+ {
+ uint32_t shard_index = pgid.hash_to_shard(shards.size());
+ assert(NULL != shards[shard_index]);
+ store->set_collection_commit_queue(pg->coll, &(shards[shard_index]->context_queue));
+ }
+
pg->lock(true);
// we are holding the shard lock
out_pgs->insert(child);
child->ch = store->create_new_collection(child->coll);
+ {
+ uint32_t shard_index = i->hash_to_shard(shards.size());
+ assert(NULL != shards[shard_index]);
+ store->set_collection_commit_queue(child->coll, &(shards[shard_index]->context_queue));
+ }
+
unsigned split_bits = i->get_split_bits(pg_num);
dout(10) << " pg_num is " << pg_num
<< ", m_seed " << i->ps()
ceph_assert(sdata);
// peek at spg_t
sdata->shard_lock.Lock();
- if (sdata->pqueue->empty()) {
+ if (sdata->pqueue->empty() && sdata->context_queue.empty()) {
sdata->sdata_wait_lock.Lock();
if (!sdata->stop_waiting) {
dout(20) << __func__ << " empty q, waiting" << dendl;
sdata->sdata_cond.Wait(sdata->sdata_wait_lock);
sdata->sdata_wait_lock.Unlock();
sdata->shard_lock.Lock();
- if (sdata->pqueue->empty()) {
+ if (sdata->pqueue->empty() && sdata->context_queue.empty()) {
sdata->shard_lock.Unlock();
return;
}
osd->cct->get_heartbeat_map()->reset_timeout(hb,
osd->cct->_conf->threadpool_default_timeout, 0);
} else {
- dout(0) << __func__ << " need return immediately" << dendl;
+ dout(20) << __func__ << " need return immediately" << dendl;
sdata->sdata_wait_lock.Unlock();
sdata->shard_lock.Unlock();
return;
}
}
- OpQueueItem item = sdata->pqueue->dequeue();
+
if (osd->is_stopping()) {
sdata->shard_lock.Unlock();
return; // OSD shutdown, discard.
}
+
+ list<Context *> oncommits;
+ if (!sdata->context_queue.empty()) {
+ sdata->context_queue.swap(oncommits);
+ }
+
+ if (sdata->pqueue->empty()) {
+ sdata->shard_lock.Unlock();
+ handle_oncommits(oncommits);
+ return;
+ }
+
+ OpQueueItem item = sdata->pqueue->dequeue();
+
const auto token = item.get_ordering_token();
auto r = sdata->pg_slots.emplace(token, nullptr);
if (r.second) {
dout(20) << __func__ << " slot " << token << " no longer there" << dendl;
pg->unlock();
sdata->shard_lock.Unlock();
+ handle_oncommits(oncommits);
return;
}
slot = q->second.get();
<< " nothing queued" << dendl;
pg->unlock();
sdata->shard_lock.Unlock();
+ handle_oncommits(oncommits);
return;
}
if (requeue_seq != slot->requeue_seq) {
<< dendl;
pg->unlock();
sdata->shard_lock.Unlock();
+ handle_oncommits(oncommits);
return;
}
if (slot->pg != pg) {
if (pushes_to_free > 0) {
sdata->shard_lock.Unlock();
osd->service.release_reserved_pushes(pushes_to_free);
+ handle_oncommits(oncommits);
return;
}
}
sdata->shard_lock.Unlock();
+ handle_oncommits(oncommits);
return;
}
if (qi.is_peering()) {
_add_slot_waiter(token, slot, std::move(qi));
sdata->shard_lock.Unlock();
pg->unlock();
+ handle_oncommits(oncommits);
return;
}
}
tracepoint(osd, opwq_process_finish, reqid.name._type,
reqid.name._num, reqid.tid, reqid.inc);
}
+
+ handle_oncommits(oncommits);
}
void OSD::ShardedOpWQ::_enqueue(OpQueueItem&& item) {
bool stop_waiting = false;
+ ContextQueue context_queue;
+
void _enqueue_front(OpQueueItem&& item, unsigned cutoff) {
unsigned priority = item.get_priority();
unsigned cost = item.get_cost();
osdmap_lock_name(shard_name + "::osdmap_lock"),
osdmap_lock(osdmap_lock_name.c_str(), false, false),
shard_lock_name(shard_name + "::shard_lock"),
- shard_lock(shard_lock_name.c_str(), false, true,
- false, cct) {
+ shard_lock(shard_lock_name.c_str(), false, true, false, cct),
+ context_queue(sdata_wait_lock, sdata_cond) {
if (opqueue == io_queue::weightedpriority) {
pqueue = std::make_unique<
WeightedPriorityQueue<OpQueueItem,uint64_t>>(
auto &&sdata = osd->shards[shard_index];
ceph_assert(sdata);
Mutex::Locker l(sdata->shard_lock);
- return sdata->pqueue->empty();
+ return sdata->pqueue->empty() && sdata->context_queue.empty();
+ }
+
+ void handle_oncommits(list<Context*>& oncommits) {
+ for (auto p : oncommits) {
+ p->complete(0);
+ }
}
} op_shardedwq;