logger(osd->logger),
recoverystate_perf(osd->recoverystate_perf),
monc(osd->monc),
- op_wq(osd->op_wq),
+ op_wq(osd->op_shardedwq),
peering_wq(osd->peering_wq),
recovery_wq(osd->recovery_wq),
snap_trim_wq(osd->snap_trim_wq),
osd_compat(get_osd_compat_set()),
state_lock(), state(STATE_INITIALIZING),
op_tp(cct, "OSD::op_tp", cct->_conf->osd_op_threads, "osd_op_threads"),
+ op_sharded_tp(cct, "OSD::op_sharded_tp", cct->_conf->osd_op_num_sharded_pool_threads),
recovery_tp(cct, "OSD::recovery_tp", cct->_conf->osd_recovery_threads, "osd_recovery_threads"),
disk_tp(cct, "OSD::disk_tp", cct->_conf->osd_disk_threads, "osd_disk_threads"),
command_tp(cct, "OSD::command_tp", 1),
finished_lock("OSD::finished_lock"),
op_tracker(cct, cct->_conf->osd_enable_op_tracker),
test_ops_hook(NULL),
- op_wq(this, cct->_conf->osd_op_thread_timeout, &op_tp),
+ op_shardedwq(cct->_conf->osd_op_num_shards, this, cct->_conf->osd_op_thread_timeout, &op_sharded_tp),
peering_wq(this, cct->_conf->osd_op_thread_timeout, &op_tp),
map_lock("OSD::map_lock"),
pg_map_lock("OSD::pg_map_lock"),
op_tracker.dump_historic_ops(f);
} else if (command == "dump_op_pq_state") {
f->open_object_section("pq");
- op_wq.dump(f);
+ op_shardedwq.dump(f);
f->close_section();
} else if (command == "dump_blacklist") {
list<pair<entity_addr_t,utime_t> > bl;
monc->set_log_client(&clog);
op_tp.start();
+ op_sharded_tp.start();
recovery_tp.start();
disk_tp.start();
command_tp.start();
derr << " pausing thread pools" << dendl;
op_tp.pause();
+ op_sharded_tp.pause();
disk_tp.pause();
recovery_tp.pause();
command_tp.pause();
}
// finish ops
- op_wq.drain(); // should already be empty except for lagard PGs
+ op_shardedwq.drain(); // should already be empty except for lagard PGs
{
Mutex::Locker l(finished_lock);
finished.clear(); // zap waiters (bleh, this is messy)
op_tp.stop();
dout(10) << "op tp stopped" << dendl;
+ op_sharded_tp.drain();
+ op_sharded_tp.stop();
+ dout(10) << "op sharded tp stopped" << dendl;
+
command_tp.drain();
command_tp.stop();
dout(10) << "command tp stopped" << dendl;
Mutex::Locker l(pg_stat_queue_lock);
assert(pg_stat_queue.empty());
}
-
peering_wq.clear();
// Remove PGs
#ifdef PG_DEBUG_REFS
pg->queue_op(op);
}
-void OSD::OpWQ::_enqueue(pair<PGRef, OpRequestRef> item)
-{
- unsigned priority = item.second->get_req()->get_priority();
- unsigned cost = item.second->get_req()->get_cost();
- if (priority >= CEPH_MSG_PRIO_LOW)
- pqueue.enqueue_strict(
- item.second->get_req()->get_source_inst(),
- priority, item);
- else
- pqueue.enqueue(item.second->get_req()->get_source_inst(),
- priority, cost, item);
- osd->logger->set(l_osd_opq, pqueue.length());
-}
-
-void OSD::OpWQ::_enqueue_front(pair<PGRef, OpRequestRef> item)
-{
- Mutex::Locker l(qlock);
- if (pg_for_processing.count(&*(item.first))) {
- pg_for_processing[&*(item.first)].push_front(item.second);
- item.second = pg_for_processing[&*(item.first)].back();
- pg_for_processing[&*(item.first)].pop_back();
- }
- unsigned priority = item.second->get_req()->get_priority();
- unsigned cost = item.second->get_req()->get_cost();
- if (priority >= CEPH_MSG_PRIO_LOW)
- pqueue.enqueue_strict_front(
- item.second->get_req()->get_source_inst(),
- priority, item);
- else
- pqueue.enqueue_front(item.second->get_req()->get_source_inst(),
- priority, cost, item);
- osd->logger->set(l_osd_opq, pqueue.length());
-}
-
-PGRef OSD::OpWQ::_dequeue()
-{
- assert(!pqueue.empty());
- PGRef pg;
- {
- Mutex::Locker l(qlock);
- pair<PGRef, OpRequestRef> ret = pqueue.dequeue();
- pg = ret.first;
- pg_for_processing[&*pg].push_back(ret.second);
- }
- osd->logger->set(l_osd_opq, pqueue.length());
- return pg;
-}
-
-void OSD::OpWQ::_process(PGRef pg, ThreadPool::TPHandle &handle)
-{
- pg->lock_suspend_timeout(handle);
- OpRequestRef op;
- {
- Mutex::Locker l(qlock);
- if (!pg_for_processing.count(&*pg)) {
- pg->unlock();
- return;
- }
- assert(pg_for_processing[&*pg].size());
- op = pg_for_processing[&*pg].front();
- pg_for_processing[&*pg].pop_front();
- if (!(pg_for_processing[&*pg].size()))
- pg_for_processing.erase(&*pg);
- }
-
- lgeneric_subdout(osd->cct, osd, 30) << "dequeue status: ";
- Formatter *f = new_formatter("json");
- f->open_object_section("q");
- dump(f);
- f->close_section();
- f->flush(*_dout);
- delete f;
- *_dout << dendl;
-
- osd->dequeue_op(pg, op, handle);
- pg->unlock();
-}
-
void OSDService::dequeue_pg(PG *pg, list<OpRequestRef> *dequeued)
{
- osd->op_wq.dequeue(pg, dequeued);
+ osd->op_shardedwq.dequeue(pg, dequeued);
}
/*
PerfCounters *&logger;
PerfCounters *&recoverystate_perf;
MonClient *&monc;
- ThreadPool::WorkQueueVal<pair<PGRef, OpRequestRef>, PGRef> &op_wq;
+ ShardedThreadPool::ShardedWQ < pair <PGRef, OpRequestRef> > &op_wq;
ThreadPool::BatchWorkQueue<PG> &peering_wq;
ThreadPool::WorkQueue<PG> &recovery_wq;
ThreadPool::WorkQueue<PG> &snap_trim_wq;
private:
ThreadPool op_tp;
+ ShardedThreadPool op_sharded_tp;
ThreadPool recovery_tp;
ThreadPool disk_tp;
ThreadPool command_tp;
// -- op queue --
- struct OpWQ: public ThreadPool::WorkQueueVal<pair<PGRef, OpRequestRef>,
- PGRef > {
- Mutex qlock;
- map<PG*, list<OpRequestRef> > pg_for_processing;
+
+ class ShardedOpWQ: public ShardedThreadPool::ShardedWQ < pair <PGRef, OpRequestRef> > {
+
+ struct ShardData {
+ Mutex sdata_lock;
+ Cond sdata_cond;
+ PrioritizedQueue< pair<PGRef, OpRequestRef>, entity_inst_t> pqueue;
+ ShardData(string lock_name, uint64_t max_tok_per_prio, uint64_t min_cost):
+ sdata_lock(lock_name.c_str()),
+ pqueue(max_tok_per_prio, min_cost) {}
+ };
+
+ vector<ShardData*> shard_list;
OSD *osd;
- PrioritizedQueue<pair<PGRef, OpRequestRef>, entity_inst_t > pqueue;
- OpWQ(OSD *o, time_t ti, ThreadPool *tp)
- : ThreadPool::WorkQueueVal<pair<PGRef, OpRequestRef>, PGRef >(
- "OSD::OpWQ", ti, ti*10, tp),
- qlock("OpWQ::qlock"),
- osd(o),
- pqueue(o->cct->_conf->osd_op_pq_max_tokens_per_priority,
- o->cct->_conf->osd_op_pq_min_cost)
- {}
+ uint32_t num_shards;
+ Mutex opQ_lock;
+ Cond opQ_cond;
+
+ public:
+ ShardedOpWQ(uint32_t pnum_shards, OSD *o, time_t ti, ShardedThreadPool* tp):
+ ShardedThreadPool::ShardedWQ < pair <PGRef, OpRequestRef> >(ti, ti*10, tp),
+ osd(o), num_shards(pnum_shards), opQ_lock("OSD::ShardedOpWQLock") {
+ for(uint32_t i = 0; i < num_shards; i++) {
+ char lock_name[32] = {0};
+ snprintf(lock_name, sizeof(lock_name), "%s.%d", "OSD::ShardedOpWQ::", i);
+ ShardData* one_shard = new ShardData(lock_name, osd->cct->_conf->osd_op_pq_max_tokens_per_priority,
+ osd->cct->_conf->osd_op_pq_min_cost);
+ shard_list.push_back(one_shard);
+ }
+ }
- void dump(Formatter *f) {
- lock();
- pqueue.dump(f);
- unlock();
- }
+ ~ShardedOpWQ() {
+
+ while(!shard_list.empty()) {
+ delete shard_list.back();
+ shard_list.pop_back();
+ }
+ }
+
+ void _process(uint32_t thread_index, heartbeat_handle_d *hb ) {
+
+ uint32_t shard_index = thread_index % num_shards;
+
+ ShardData* sdata = shard_list[shard_index];
+
+ if (NULL != sdata) {
+
+ sdata->sdata_lock.Lock();
+
+ while (true) {
+
+ while(!sdata->pqueue.empty()) {
+
+ if (pause_threads.read() != 0){
+
+ break;
+ }
- void _enqueue_front(pair<PGRef, OpRequestRef> item);
- void _enqueue(pair<PGRef, OpRequestRef> item);
- PGRef _dequeue();
+ in_process.inc();
+ ThreadPool::TPHandle tp_handle(osd->cct, hb, timeout_interval, suicide_interval);
+ tp_handle.reset_tp_timeout();
+
+ pair<PGRef, OpRequestRef> item = sdata->pqueue.dequeue();
+
+ (item.first)->lock_suspend_timeout(tp_handle);
+ //unlocking after holding the PG lock as it should maintain the op order
+ sdata->sdata_lock.Unlock();
+ //Should it be within some config option ?
+ lgeneric_subdout(osd->cct, osd, 30) << "dequeue status: ";
+ Formatter *f = new_formatter("json");
+ f->open_object_section("q");
+ dump(f);
+ f->close_section();
+ f->flush(*_dout);
+ delete f;
+ *_dout << dendl;
+
+ osd->dequeue_op(item.first, item.second, tp_handle);
+ (item.first)->unlock();
+
+ sdata->sdata_lock.Lock();
+ in_process.dec();
+ if ((pause_threads.read() != 0) || (drain_threads.read() != 0)) {
+ opQ_lock.Lock();
+ opQ_cond.Signal();
+ opQ_lock.Unlock();
+ }
+ }
+
+ if (stop_threads.read() != 0){
+ break;
+ }
+
+ osd->cct->get_heartbeat_map()->reset_timeout(hb, 4, 0);
+ sdata->sdata_cond.WaitInterval(osd->cct, sdata->sdata_lock, utime_t(2, 0));
+
+ }
+ sdata->sdata_lock.Unlock();
+
+ } else {
+ assert(0);
+ }
- struct Pred {
- PG *pg;
- Pred(PG *pg) : pg(pg) {}
- bool operator()(const pair<PGRef, OpRequestRef> &op) {
- return op.first == pg;
}
- };
- void dequeue(PG *pg, list<OpRequestRef> *dequeued = 0) {
- lock();
- if (!dequeued) {
- pqueue.remove_by_filter(Pred(pg));
- pg_for_processing.erase(pg);
- } else {
- list<pair<PGRef, OpRequestRef> > _dequeued;
- pqueue.remove_by_filter(Pred(pg), &_dequeued);
- for (list<pair<PGRef, OpRequestRef> >::iterator i = _dequeued.begin();
- i != _dequeued.end();
- ++i) {
- dequeued->push_back(i->second);
- }
- if (pg_for_processing.count(pg)) {
- dequeued->splice(
- dequeued->begin(),
- pg_for_processing[pg]);
- pg_for_processing.erase(pg);
- }
+
+ void stop_threads_on_queue() {
+ stop_threads.set(1);
+ for(uint32_t i = 0; i < num_shards; i++) {
+ ShardData* sdata = shard_list[i];
+ if (NULL != sdata) {
+ sdata->sdata_lock.Lock();
+ sdata->sdata_cond.Signal();
+ sdata->sdata_lock.Unlock();
+ }
+ }
+
}
- unlock();
- }
- bool _empty() {
- return pqueue.empty();
- }
- void _process(PGRef pg, ThreadPool::TPHandle &handle);
- } op_wq;
+
+ void pause_threads_on_queue() {
+ pause_threads.set(1);
+ opQ_lock.Lock();
+ while (in_process.read()) {
+ opQ_cond.Wait(opQ_lock);
+ }
+ opQ_lock.Unlock();
+
+ }
+
+ void pause_new_threads_on_queue() {
+ pause_threads.set(1);
+
+ }
+
+ void unpause_threads_on_queue() {
+ pause_threads.set(0);
+ for(uint32_t i = 0; i < num_shards; i++) {
+ ShardData* sdata = shard_list[i];
+ if (NULL != sdata) {
+ sdata->sdata_lock.Lock();
+ sdata->sdata_cond.Signal();
+ sdata->sdata_lock.Unlock();
+ }
+ }
+
+ }
+
+ void drain_threads_on_queue() {
+ drain_threads.set(1);
+ opQ_lock.Lock();
+ for(uint32_t i = 0; i < num_shards; i++) {
+ if (!_empty(i)) {
+ opQ_cond.Wait(opQ_lock);
+ }
+ }
+ while (in_process.read()){
+ opQ_cond.Wait(opQ_lock);
+ }
+ opQ_lock.Unlock();
+
+ drain_threads.set(0);
+ }
+
+ void drain() {
+
+ drain_threads_on_queue();
+ }
+
+ void _enqueue(pair <PGRef, OpRequestRef> item) {
+
+ uint32_t shard_index = (((item.first)->get_pgid().ps())% shard_list.size());
+
+ ShardData* sdata = shard_list[shard_index];
+ if (NULL != sdata) {
+ unsigned priority = item.second->get_req()->get_priority();
+ unsigned cost = item.second->get_req()->get_cost();
+ sdata->sdata_lock.Lock();
+ if (priority >= CEPH_MSG_PRIO_LOW)
+ sdata->pqueue.enqueue_strict(
+ item.second->get_req()->get_source_inst(), priority, item);
+ else
+ sdata->pqueue.enqueue(item.second->get_req()->get_source_inst(),
+ priority, cost, item);
+
+
+ sdata->sdata_cond.SignalOne();
+ sdata->sdata_lock.Unlock();
+ } else {
+ assert(0);
+ }
+ }
+
+ void _enqueue_front(pair <PGRef, OpRequestRef> item) {
+
+ uint32_t shard_index = (((item.first)->get_pgid().ps())% shard_list.size());
+
+ ShardData* sdata = shard_list[shard_index];
+ if (NULL != sdata) {
+ unsigned priority = item.second->get_req()->get_priority();
+ unsigned cost = item.second->get_req()->get_cost();
+ sdata->sdata_lock.Lock();
+ if (priority >= CEPH_MSG_PRIO_LOW)
+ sdata->pqueue.enqueue_strict_front(
+ item.second->get_req()->get_source_inst(),priority, item);
+ else
+ sdata->pqueue.enqueue_front(item.second->get_req()->get_source_inst(),
+ priority, cost, item);
+
+ sdata->sdata_cond.SignalOne();
+ sdata->sdata_lock.Unlock();
+ } else {
+ assert(0);
+ }
+ }
+
+
+ void dump(Formatter *f) {
+ for(uint32_t i = 0; i < num_shards; i++) {
+ ShardData* sdata = shard_list[i];
+ if (NULL != sdata) {
+ sdata->sdata_lock.Lock();
+ sdata->pqueue.dump(f);
+ sdata->sdata_lock.Unlock();
+ }
+ }
+ }
+
+ struct Pred {
+ PG *pg;
+ Pred(PG *pg) : pg(pg) {}
+ bool operator()(const pair<PGRef, OpRequestRef> &op) {
+ return op.first == pg;
+ }
+ };
+
+ void dequeue(PG *pg, list<OpRequestRef> *dequeued = 0) {
+ ShardData* sdata = NULL;
+ if (pg) {
+ uint32_t shard_index = pg->get_pgid().ps()% shard_list.size();
+ sdata = shard_list[shard_index];
+ if (!sdata) {
+ assert(0);
+ }
+ } else {
+ assert(0);
+ }
+
+ if (!dequeued) {
+ sdata->sdata_lock.Lock();
+ sdata->pqueue.remove_by_filter(Pred(pg));
+ sdata->sdata_lock.Unlock();
+ } else {
+ list<pair<PGRef, OpRequestRef> > _dequeued;
+ sdata->sdata_lock.Lock();
+ sdata->pqueue.remove_by_filter(Pred(pg), &_dequeued);
+ sdata->sdata_lock.Unlock();
+ for (list<pair<PGRef, OpRequestRef> >::iterator i = _dequeued.begin();
+ i != _dequeued.end(); ++i) {
+ dequeued->push_back(i->second);
+ }
+ }
+
+ }
+
+ bool _empty(uint32_t shard_index) {
+ ShardData* sdata = shard_list[shard_index];
+ if (NULL != sdata) {
+ sdata->sdata_lock.Lock();
+ bool is_empty = sdata->pqueue.empty();
+ sdata->sdata_lock.Unlock();
+ return is_empty;
+ }
+ return true;
+
+ }
+
+ } op_shardedwq;
+
void enqueue_op(PG *pg, OpRequestRef op);
void dequeue_op(