Mutex sdata_op_ordering_lock;
map<PG*, list<OpRequestRef> > pg_for_processing;
PrioritizedQueue< pair<PGRef, OpRequestRef>, entity_inst_t> pqueue;
- ShardData(string lock_name, string ordering_lock, uint64_t max_tok_per_prio, uint64_t min_cost):
- sdata_lock(lock_name.c_str()),
- sdata_op_ordering_lock(ordering_lock.c_str()),
- pqueue(max_tok_per_prio, min_cost) {}
+ ShardData(
+ string lock_name, string ordering_lock,
+ uint64_t max_tok_per_prio, uint64_t min_cost)
+ : sdata_lock(lock_name.c_str()),
+ sdata_op_ordering_lock(ordering_lock.c_str()),
+ pqueue(max_tok_per_prio, min_cost) {}
};
-
+
vector<ShardData*> shard_list;
OSD *osd;
uint32_t num_shards;
- public:
- ShardedOpWQ(uint32_t pnum_shards, OSD *o, time_t ti, time_t si, ShardedThreadPool* tp):
- ShardedThreadPool::ShardedWQ < pair <PGRef, OpRequestRef> >(ti, si, tp),
- osd(o), num_shards(pnum_shards) {
- for(uint32_t i = 0; i < num_shards; i++) {
- char lock_name[32] = {0};
- snprintf(lock_name, sizeof(lock_name), "%s.%d", "OSD:ShardedOpWQ:", i);
- char order_lock[32] = {0};
- snprintf(order_lock, sizeof(order_lock), "%s.%d", "OSD:ShardedOpWQ:order:", i);
- ShardData* one_shard = new ShardData(lock_name, order_lock,
- osd->cct->_conf->osd_op_pq_max_tokens_per_priority,
- osd->cct->_conf->osd_op_pq_min_cost);
- shard_list.push_back(one_shard);
- }
+ public:
+ ShardedOpWQ(uint32_t pnum_shards, OSD *o, time_t ti, time_t si, ShardedThreadPool* tp):
+ ShardedThreadPool::ShardedWQ < pair <PGRef, OpRequestRef> >(ti, si, tp),
+ osd(o), num_shards(pnum_shards) {
+ for(uint32_t i = 0; i < num_shards; i++) {
+ char lock_name[32] = {0};
+ snprintf(lock_name, sizeof(lock_name), "%s.%d", "OSD:ShardedOpWQ:", i);
+ char order_lock[32] = {0};
+ snprintf(
+ order_lock, sizeof(order_lock), "%s.%d",
+ "OSD:ShardedOpWQ:order:", i);
+ ShardData* one_shard = new ShardData(
+ lock_name, order_lock,
+ osd->cct->_conf->osd_op_pq_max_tokens_per_priority,
+ osd->cct->_conf->osd_op_pq_min_cost);
+ shard_list.push_back(one_shard);
}
-
- ~ShardedOpWQ() {
-
- while(!shard_list.empty()) {
- delete shard_list.back();
- shard_list.pop_back();
- }
+ }
+
+ ~ShardedOpWQ() {
+ while(!shard_list.empty()) {
+ delete shard_list.back();
+ shard_list.pop_back();
}
+ }
- void _process(uint32_t thread_index, heartbeat_handle_d *hb);
- void _enqueue(pair <PGRef, OpRequestRef> item);
- void _enqueue_front(pair <PGRef, OpRequestRef> item);
-
- void return_waiting_threads() {
- for(uint32_t i = 0; i < num_shards; i++) {
- ShardData* sdata = shard_list[i];
- assert (NULL != sdata);
- sdata->sdata_lock.Lock();
- sdata->sdata_cond.Signal();
- sdata->sdata_lock.Unlock();
- }
+ void _process(uint32_t thread_index, heartbeat_handle_d *hb);
+ void _enqueue(pair <PGRef, OpRequestRef> item);
+ void _enqueue_front(pair <PGRef, OpRequestRef> item);
+ void return_waiting_threads() {
+ for(uint32_t i = 0; i < num_shards; i++) {
+ ShardData* sdata = shard_list[i];
+ assert (NULL != sdata);
+ sdata->sdata_lock.Lock();
+ sdata->sdata_cond.Signal();
+ sdata->sdata_lock.Unlock();
}
+ }
- void dump(Formatter *f) {
- for(uint32_t i = 0; i < num_shards; i++) {
- ShardData* sdata = shard_list[i];
- char lock_name[32] = {0};
- snprintf(lock_name, sizeof(lock_name), "%s%d", "OSD:ShardedOpWQ:", i);
- assert (NULL != sdata);
- sdata->sdata_op_ordering_lock.Lock();
- f->open_object_section(lock_name);
- sdata->pqueue.dump(f);
- f->close_section();
- sdata->sdata_op_ordering_lock.Unlock();
- }
+ void dump(Formatter *f) {
+ for(uint32_t i = 0; i < num_shards; i++) {
+ ShardData* sdata = shard_list[i];
+ char lock_name[32] = {0};
+ snprintf(lock_name, sizeof(lock_name), "%s%d", "OSD:ShardedOpWQ:", i);
+ assert (NULL != sdata);
+ sdata->sdata_op_ordering_lock.Lock();
+ f->open_object_section(lock_name);
+ sdata->pqueue.dump(f);
+ f->close_section();
+ sdata->sdata_op_ordering_lock.Unlock();
}
+ }
- struct Pred {
- PG *pg;
- Pred(PG *pg) : pg(pg) {}
- bool operator()(const pair<PGRef, OpRequestRef> &op) {
- return op.first == pg;
- }
- };
-
- void dequeue(PG *pg, list<OpRequestRef> *dequeued = 0) {
- ShardData* sdata = NULL;
- assert(pg != NULL);
- uint32_t shard_index = pg->get_pgid().ps()% shard_list.size();
- sdata = shard_list[shard_index];
- assert(sdata != NULL);
- if (!dequeued) {
- sdata->sdata_op_ordering_lock.Lock();
- sdata->pqueue.remove_by_filter(Pred(pg));
- sdata->pg_for_processing.erase(pg);
- sdata->sdata_op_ordering_lock.Unlock();
- } else {
- list<pair<PGRef, OpRequestRef> > _dequeued;
- sdata->sdata_op_ordering_lock.Lock();
- sdata->pqueue.remove_by_filter(Pred(pg), &_dequeued);
- for (list<pair<PGRef, OpRequestRef> >::iterator i = _dequeued.begin();
- i != _dequeued.end(); ++i) {
- dequeued->push_back(i->second);
- }
- if (sdata->pg_for_processing.count(pg)) {
- dequeued->splice(
- dequeued->begin(),
- sdata->pg_for_processing[pg]);
- sdata->pg_for_processing.erase(pg);
- }
- sdata->sdata_op_ordering_lock.Unlock();
- }
+ struct Pred {
+ PG *pg;
+ Pred(PG *pg) : pg(pg) {}
+ bool operator()(const pair<PGRef, OpRequestRef> &op) {
+ return op.first == pg;
+ }
+ };
+ void dequeue(PG *pg, list<OpRequestRef> *dequeued = 0) {
+ ShardData* sdata = NULL;
+ assert(pg != NULL);
+ uint32_t shard_index = pg->get_pgid().ps()% shard_list.size();
+ sdata = shard_list[shard_index];
+ assert(sdata != NULL);
+ if (!dequeued) {
+ sdata->sdata_op_ordering_lock.Lock();
+ sdata->pqueue.remove_by_filter(Pred(pg));
+ sdata->pg_for_processing.erase(pg);
+ sdata->sdata_op_ordering_lock.Unlock();
+ } else {
+ list<pair<PGRef, OpRequestRef> > _dequeued;
+ sdata->sdata_op_ordering_lock.Lock();
+ sdata->pqueue.remove_by_filter(Pred(pg), &_dequeued);
+ for (list<pair<PGRef, OpRequestRef> >::iterator i = _dequeued.begin();
+ i != _dequeued.end(); ++i) {
+ dequeued->push_back(i->second);
+ }
+ if (sdata->pg_for_processing.count(pg)) {
+ dequeued->splice(
+ dequeued->begin(),
+ sdata->pg_for_processing[pg]);
+ sdata->pg_for_processing.erase(pg);
+ }
+ sdata->sdata_op_ordering_lock.Unlock();
}
+ }
- bool is_shard_empty(uint32_t thread_index) {
- uint32_t shard_index = thread_index % num_shards;
- ShardData* sdata = shard_list[shard_index];
- assert(NULL != sdata);
- Mutex::Locker l(sdata->sdata_op_ordering_lock);
- return sdata->pqueue.empty();
- }
-
+ bool is_shard_empty(uint32_t thread_index) {
+ uint32_t shard_index = thread_index % num_shards;
+ ShardData* sdata = shard_list[shard_index];
+ assert(NULL != sdata);
+ Mutex::Locker l(sdata->sdata_op_ordering_lock);
+ return sdata->pqueue.empty();
+ }
} op_shardedwq;