next_removal_seq(0),
service(this)
{
+ assert(cct->_conf->osd_op_num_sharded_pool_threads >= cct->_conf->osd_op_num_shards);
monc->set_messenger(client_messenger);
op_tracker.set_complaint_and_threshold(cct->_conf->osd_op_complaint_time,
cct->_conf->osd_op_log_threshold);
pg->queue_op(op);
}
+void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb, atomic_t& in_process ) {
+
+ uint32_t shard_index = thread_index % num_shards;
+
+ ShardData* sdata = shard_list[shard_index];
+ assert(NULL != sdata);
+ sdata->sdata_op_ordering_lock.Lock();
+ if (sdata->pqueue.empty()) {
+ sdata->sdata_op_ordering_lock.Unlock();
+ osd->cct->get_heartbeat_map()->reset_timeout(hb, 4, 0);
+ sdata->sdata_lock.Lock();
+ sdata->sdata_cond.WaitInterval(osd->cct, sdata->sdata_lock, utime_t(2, 0));
+ sdata->sdata_lock.Unlock();
+ sdata->sdata_op_ordering_lock.Lock();
+ if(sdata->pqueue.empty()) {
+ sdata->sdata_op_ordering_lock.Unlock();
+ return;
+ }
+ }
+
+ pair<PGRef, OpRequestRef> item = sdata->pqueue.dequeue();
+ sdata->pg_for_processing[&*(item.first)].push_back(item.second);
+ sdata->sdata_op_ordering_lock.Unlock();
+ in_process.inc();
+ ThreadPool::TPHandle tp_handle(osd->cct, hb, timeout_interval, suicide_interval);
+
+ (item.first)->lock_suspend_timeout(tp_handle);
+
+ OpRequestRef op;
+ {
+ Mutex::Locker l(sdata->sdata_op_ordering_lock);
+ if (!sdata->pg_for_processing.count(&*(item.first))) {
+ (item.first)->unlock();
+ return;
+ }
+ assert(sdata->pg_for_processing[&*(item.first)].size());
+ op = sdata->pg_for_processing[&*(item.first)].front();
+ sdata->pg_for_processing[&*(item.first)].pop_front();
+ if (!(sdata->pg_for_processing[&*(item.first)].size()))
+ sdata->pg_for_processing.erase(&*(item.first));
+ }
+
+ lgeneric_subdout(osd->cct, osd, 30) << "dequeue status: ";
+ Formatter *f = new_formatter("json");
+ f->open_object_section("q");
+ dump(f);
+ f->close_section();
+ f->flush(*_dout);
+ delete f;
+ *_dout << dendl;
+
+ osd->dequeue_op(item.first, op, tp_handle);
+ (item.first)->unlock();
+ in_process.dec();
+
+}
+
+void OSD::ShardedOpWQ::_enqueue(pair <PGRef, OpRequestRef> item) {
+
+ uint32_t shard_index = (((item.first)->get_pgid().ps())% shard_list.size());
+
+ ShardData* sdata = shard_list[shard_index];
+ assert (NULL != sdata);
+ unsigned priority = item.second->get_req()->get_priority();
+ unsigned cost = item.second->get_req()->get_cost();
+ sdata->sdata_op_ordering_lock.Lock();
+
+ if (priority >= CEPH_MSG_PRIO_LOW)
+ sdata->pqueue.enqueue_strict(
+ item.second->get_req()->get_source_inst(), priority, item);
+ else
+ sdata->pqueue.enqueue(item.second->get_req()->get_source_inst(),
+ priority, cost, item);
+ sdata->sdata_op_ordering_lock.Unlock();
+
+ sdata->sdata_lock.Lock();
+ sdata->sdata_cond.SignalOne();
+ sdata->sdata_lock.Unlock();
+
+}
+
+void OSD::ShardedOpWQ::_enqueue_front(pair <PGRef, OpRequestRef> item) {
+
+ uint32_t shard_index = (((item.first)->get_pgid().ps())% shard_list.size());
+
+ ShardData* sdata = shard_list[shard_index];
+ assert (NULL != sdata);
+ sdata->sdata_op_ordering_lock.Lock();
+ if (sdata->pg_for_processing.count(&*(item.first))) {
+ sdata->pg_for_processing[&*(item.first)].push_front(item.second);
+ item.second = sdata->pg_for_processing[&*(item.first)].back();
+ sdata->pg_for_processing[&*(item.first)].pop_back();
+ }
+ unsigned priority = item.second->get_req()->get_priority();
+ unsigned cost = item.second->get_req()->get_cost();
+ if (priority >= CEPH_MSG_PRIO_LOW)
+ sdata->pqueue.enqueue_strict_front(
+ item.second->get_req()->get_source_inst(),priority, item);
+ else
+ sdata->pqueue.enqueue_front(item.second->get_req()->get_source_inst(),
+ priority, cost, item);
+
+ sdata->sdata_op_ordering_lock.Unlock();
+ sdata->sdata_lock.Lock();
+ sdata->sdata_cond.SignalOne();
+ sdata->sdata_lock.Unlock();
+
+}
+
+
void OSDService::dequeue_pg(PG *pg, list<OpRequestRef> *dequeued)
{
struct ShardData {
Mutex sdata_lock;
Cond sdata_cond;
+ Mutex sdata_op_ordering_lock;
+ map<PG*, list<OpRequestRef> > pg_for_processing;
PrioritizedQueue< pair<PGRef, OpRequestRef>, entity_inst_t> pqueue;
- ShardData(string lock_name, uint64_t max_tok_per_prio, uint64_t min_cost):
+ ShardData(string lock_name, string ordering_lock, uint64_t max_tok_per_prio, uint64_t min_cost):
sdata_lock(lock_name.c_str()),
+ sdata_op_ordering_lock(ordering_lock.c_str()),
pqueue(max_tok_per_prio, min_cost) {}
};
vector<ShardData*> shard_list;
OSD *osd;
uint32_t num_shards;
- Mutex opQ_lock;
- Cond opQ_cond;
public:
ShardedOpWQ(uint32_t pnum_shards, OSD *o, time_t ti, ShardedThreadPool* tp):
ShardedThreadPool::ShardedWQ < pair <PGRef, OpRequestRef> >(ti, ti*10, tp),
- osd(o), num_shards(pnum_shards), opQ_lock("OSD::ShardedOpWQLock") {
+ osd(o), num_shards(pnum_shards) {
for(uint32_t i = 0; i < num_shards; i++) {
char lock_name[32] = {0};
- snprintf(lock_name, sizeof(lock_name), "%s.%d", "OSD::ShardedOpWQ::", i);
- ShardData* one_shard = new ShardData(lock_name, osd->cct->_conf->osd_op_pq_max_tokens_per_priority,
+ snprintf(lock_name, sizeof(lock_name), "%s.%d", "OSD:ShardedOpWQ:", i);
+ char order_lock[32] = {0};
+ snprintf(order_lock, sizeof(order_lock), "%s.%d", "OSD:ShardedOpWQ:order:", i);
+ ShardData* one_shard = new ShardData(lock_name, order_lock, osd->cct->_conf->osd_op_pq_max_tokens_per_priority,
osd->cct->_conf->osd_op_pq_min_cost);
shard_list.push_back(one_shard);
}
}
}
- void _process(uint32_t thread_index, heartbeat_handle_d *hb ) {
-
- uint32_t shard_index = thread_index % num_shards;
-
- ShardData* sdata = shard_list[shard_index];
-
- if (NULL != sdata) {
-
- sdata->sdata_lock.Lock();
-
- while (true) {
-
- while(!sdata->pqueue.empty()) {
-
- if (pause_threads.read() != 0){
-
- break;
- }
-
- in_process.inc();
- ThreadPool::TPHandle tp_handle(osd->cct, hb, timeout_interval, suicide_interval);
- tp_handle.reset_tp_timeout();
-
- pair<PGRef, OpRequestRef> item = sdata->pqueue.dequeue();
-
- (item.first)->lock_suspend_timeout(tp_handle);
- //unlocking after holding the PG lock as it should maintain the op order
- sdata->sdata_lock.Unlock();
- //Should it be within some config option ?
- lgeneric_subdout(osd->cct, osd, 30) << "dequeue status: ";
- Formatter *f = new_formatter("json");
- f->open_object_section("q");
- dump(f);
- f->close_section();
- f->flush(*_dout);
- delete f;
- *_dout << dendl;
-
- osd->dequeue_op(item.first, item.second, tp_handle);
- (item.first)->unlock();
-
- sdata->sdata_lock.Lock();
- in_process.dec();
- if ((pause_threads.read() != 0) || (drain_threads.read() != 0)) {
- opQ_lock.Lock();
- opQ_cond.Signal();
- opQ_lock.Unlock();
- }
- }
-
- if (stop_threads.read() != 0){
- break;
- }
-
- osd->cct->get_heartbeat_map()->reset_timeout(hb, 4, 0);
- sdata->sdata_cond.WaitInterval(osd->cct, sdata->sdata_lock, utime_t(2, 0));
-
- }
- sdata->sdata_lock.Unlock();
-
- } else {
- assert(0);
- }
-
- }
-
- void stop_threads_on_queue() {
- stop_threads.set(1);
- for(uint32_t i = 0; i < num_shards; i++) {
- ShardData* sdata = shard_list[i];
- if (NULL != sdata) {
- sdata->sdata_lock.Lock();
- sdata->sdata_cond.Signal();
- sdata->sdata_lock.Unlock();
- }
- }
+ void _process(uint32_t thread_index, heartbeat_handle_d *hb, atomic_t& in_process );
+ void _enqueue(pair <PGRef, OpRequestRef> item);
+ void _enqueue_front(pair <PGRef, OpRequestRef> item);
- }
-
- void pause_threads_on_queue() {
- pause_threads.set(1);
- opQ_lock.Lock();
- while (in_process.read()) {
- opQ_cond.Wait(opQ_lock);
- }
- opQ_lock.Unlock();
-
- }
-
- void pause_new_threads_on_queue() {
- pause_threads.set(1);
-
- }
-
- void unpause_threads_on_queue() {
- pause_threads.set(0);
+ void return_waiting_threads() {
for(uint32_t i = 0; i < num_shards; i++) {
ShardData* sdata = shard_list[i];
- if (NULL != sdata) {
- sdata->sdata_lock.Lock();
- sdata->sdata_cond.Signal();
- sdata->sdata_lock.Unlock();
- }
- }
-
- }
-
- void drain_threads_on_queue() {
- drain_threads.set(1);
- opQ_lock.Lock();
- for(uint32_t i = 0; i < num_shards; i++) {
- if (!_empty(i)) {
- opQ_cond.Wait(opQ_lock);
- }
- }
- while (in_process.read()){
- opQ_cond.Wait(opQ_lock);
- }
- opQ_lock.Unlock();
-
- drain_threads.set(0);
- }
-
- void drain() {
-
- drain_threads_on_queue();
- }
-
- void _enqueue(pair <PGRef, OpRequestRef> item) {
-
- uint32_t shard_index = (((item.first)->get_pgid().ps())% shard_list.size());
-
- ShardData* sdata = shard_list[shard_index];
- if (NULL != sdata) {
- unsigned priority = item.second->get_req()->get_priority();
- unsigned cost = item.second->get_req()->get_cost();
- sdata->sdata_lock.Lock();
- if (priority >= CEPH_MSG_PRIO_LOW)
- sdata->pqueue.enqueue_strict(
- item.second->get_req()->get_source_inst(), priority, item);
- else
- sdata->pqueue.enqueue(item.second->get_req()->get_source_inst(),
- priority, cost, item);
-
-
- sdata->sdata_cond.SignalOne();
- sdata->sdata_lock.Unlock();
- } else {
- assert(0);
- }
- }
-
- void _enqueue_front(pair <PGRef, OpRequestRef> item) {
-
- uint32_t shard_index = (((item.first)->get_pgid().ps())% shard_list.size());
-
- ShardData* sdata = shard_list[shard_index];
- if (NULL != sdata) {
- unsigned priority = item.second->get_req()->get_priority();
- unsigned cost = item.second->get_req()->get_cost();
+ assert (NULL != sdata);
sdata->sdata_lock.Lock();
- if (priority >= CEPH_MSG_PRIO_LOW)
- sdata->pqueue.enqueue_strict_front(
- item.second->get_req()->get_source_inst(),priority, item);
- else
- sdata->pqueue.enqueue_front(item.second->get_req()->get_source_inst(),
- priority, cost, item);
-
- sdata->sdata_cond.SignalOne();
+ sdata->sdata_cond.Signal();
sdata->sdata_lock.Unlock();
- } else {
- assert(0);
}
+
}
-
void dump(Formatter *f) {
for(uint32_t i = 0; i < num_shards; i++) {
ShardData* sdata = shard_list[i];
- if (NULL != sdata) {
- sdata->sdata_lock.Lock();
- sdata->pqueue.dump(f);
- sdata->sdata_lock.Unlock();
- }
+ assert (NULL != sdata);
+ sdata->sdata_op_ordering_lock.Lock();
+ sdata->pqueue.dump(f);
+ sdata->sdata_op_ordering_lock.Unlock();
}
}
void dequeue(PG *pg, list<OpRequestRef> *dequeued = 0) {
ShardData* sdata = NULL;
- if (pg) {
- uint32_t shard_index = pg->get_pgid().ps()% shard_list.size();
- sdata = shard_list[shard_index];
- if (!sdata) {
- assert(0);
- }
- } else {
- assert(0);
- }
-
+ assert(pg != NULL);
+ uint32_t shard_index = pg->get_pgid().ps()% shard_list.size();
+ sdata = shard_list[shard_index];
+ assert(sdata != NULL);
if (!dequeued) {
- sdata->sdata_lock.Lock();
+ sdata->sdata_op_ordering_lock.Lock();
sdata->pqueue.remove_by_filter(Pred(pg));
- sdata->sdata_lock.Unlock();
+ sdata->pg_for_processing.erase(pg);
+ sdata->sdata_op_ordering_lock.Unlock();
} else {
list<pair<PGRef, OpRequestRef> > _dequeued;
- sdata->sdata_lock.Lock();
+ sdata->sdata_op_ordering_lock.Lock();
sdata->pqueue.remove_by_filter(Pred(pg), &_dequeued);
- sdata->sdata_lock.Unlock();
for (list<pair<PGRef, OpRequestRef> >::iterator i = _dequeued.begin();
i != _dequeued.end(); ++i) {
dequeued->push_back(i->second);
}
+ if (sdata->pg_for_processing.count(pg)) {
+ dequeued->splice(
+ dequeued->begin(),
+ sdata->pg_for_processing[pg]);
+ sdata->pg_for_processing.erase(pg);
+ }
+ sdata->sdata_op_ordering_lock.Unlock();
}
}
- bool _empty(uint32_t shard_index) {
- ShardData* sdata = shard_list[shard_index];
- if (NULL != sdata) {
- sdata->sdata_lock.Lock();
- bool is_empty = sdata->pqueue.empty();
- sdata->sdata_lock.Unlock();
- return is_empty;
+ bool is_all_shard_empty() {
+ bool is_empty = true;
+ for(uint32_t i = 0; i < num_shards; i++) {
+ ShardData* sdata = shard_list[i];
+ assert(NULL != sdata);
+ sdata->sdata_op_ordering_lock.Lock();
+ if (!sdata->pqueue.empty()) {
+ is_empty = false;
+ sdata->sdata_op_ordering_lock.Unlock();
+ break;
+ }
+ sdata->sdata_op_ordering_lock.Unlock();
}
- return true;
+ return is_empty;
}
+
+ bool is_shard_empty(uint32_t shard_index) {
+
+ ShardData* sdata = shard_list[shard_index];
+ assert(NULL != sdata);
+ Mutex::Locker l(sdata->sdata_op_ordering_lock);
+ return sdata->pqueue.empty();
+ }
} op_shardedwq;