uint32_t shard_index = thread_index % osd->num_shards;
auto& sdata = osd->shards[shard_index];
ceph_assert(sdata);
+
+ // If all threads of shards do oncommits, there is a out-of-order problem.
+ // So we choose the thread which has the smallest thread_index(thread_index < num_shards) of shard
+ // to do oncommit callback.
+ bool is_smallest_thread_index = thread_index < osd->num_shards;
+
// peek at spg_t
sdata->shard_lock.Lock();
- if (sdata->pqueue->empty() && sdata->context_queue.empty()) {
+ if (is_smallest_thread_index) {
+ if (sdata->pqueue->empty() && sdata->context_queue.empty()) {
+ sdata->sdata_wait_lock.Lock();
+ if (!sdata->stop_waiting) {
+ dout(20) << __func__ << " empty q, waiting" << dendl;
+ osd->cct->get_heartbeat_map()->clear_timeout(hb);
+ sdata->shard_lock.Unlock();
+ sdata->sdata_cond.Wait(sdata->sdata_wait_lock);
+ sdata->sdata_wait_lock.Unlock();
+ sdata->shard_lock.Lock();
+ if (sdata->pqueue->empty() && sdata->context_queue.empty()) {
+ sdata->shard_lock.Unlock();
+ return;
+ }
+ osd->cct->get_heartbeat_map()->reset_timeout(hb,
+ osd->cct->_conf->threadpool_default_timeout, 0);
+ } else {
+ dout(20) << __func__ << " need return immediately" << dendl;
+ sdata->sdata_wait_lock.Unlock();
+ sdata->shard_lock.Unlock();
+ return;
+ }
+ }
+ } else if (sdata->pqueue->empty()) {
sdata->sdata_wait_lock.Lock();
if (!sdata->stop_waiting) {
dout(20) << __func__ << " empty q, waiting" << dendl;
sdata->sdata_cond.Wait(sdata->sdata_wait_lock);
sdata->sdata_wait_lock.Unlock();
sdata->shard_lock.Lock();
- if (sdata->pqueue->empty() && sdata->context_queue.empty()) {
+ if (sdata->pqueue->empty()) {
sdata->shard_lock.Unlock();
return;
}
}
list<Context *> oncommits;
- if (!sdata->context_queue.empty()) {
+ if (is_smallest_thread_index && !sdata->context_queue.empty()) {
sdata->context_queue.swap(oncommits);
}
auto &&sdata = osd->shards[shard_index];
ceph_assert(sdata);
Mutex::Locker l(sdata->shard_lock);
- return sdata->pqueue->empty() && sdata->context_queue.empty();
+ if (thread_index < osd->num_shards) {
+ return sdata->pqueue->empty() && sdata->context_queue.empty();
+ } else {
+ return sdata->pqueue->empty();
+ }
}
void handle_oncommits(list<Context*>& oncommits) {