From: Jianpeng Ma Date: Thu, 20 Sep 2018 14:10:20 +0000 (+0800) Subject: osd/OSD: choose a fixed thread do oncommits callback function X-Git-Tag: v14.0.1~236^2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=6c583fe756c82ed6c196caa0ce783765d5e9ee41;p=ceph.git osd/OSD: choose a fixed thread do oncommits callback function Now bluestore oncommit callback exec by osd op threads. If there are multi threads of shard, it will cause out-of order. For example, threads_per_shard=2 Thread1 Thread2 swap_oncommits(op1_oncommit) swap_oncommits(op2_oncommit) OpQueueItem.run(Op3) op2_oncommit.complete(); op1_oncommit.complete() This make oncommits out of order. To avoiding this, we choose a fixed thread which has the smallest thread_index of shard to do oncommit callback function. Signed-off-by: Jianpeng Ma --- diff --git a/src/common/Finisher.h b/src/common/Finisher.h index 94607ea21a67..1d0a4a3f5cf2 100644 --- a/src/common/Finisher.h +++ b/src/common/Finisher.h @@ -206,7 +206,7 @@ public: if (empty) { mutex.Lock(); - cond.SignalOne(); + cond.Signal(); mutex.Unlock(); } diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index 8b9f00dd71bf..d85a7dde1bd1 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -10239,9 +10239,38 @@ void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb) uint32_t shard_index = thread_index % osd->num_shards; auto& sdata = osd->shards[shard_index]; ceph_assert(sdata); + + // If all threads of shards do oncommits, there is a out-of-order problem. + // So we choose the thread which has the smallest thread_index(thread_index < num_shards) of shard + // to do oncommit callback. + bool is_smallest_thread_index = thread_index < osd->num_shards; + // peek at spg_t sdata->shard_lock.Lock(); - if (sdata->pqueue->empty() && sdata->context_queue.empty()) { + if (is_smallest_thread_index) { + if (sdata->pqueue->empty() && sdata->context_queue.empty()) { + sdata->sdata_wait_lock.Lock(); + if (!sdata->stop_waiting) { + dout(20) << __func__ << " empty q, waiting" << dendl; + osd->cct->get_heartbeat_map()->clear_timeout(hb); + sdata->shard_lock.Unlock(); + sdata->sdata_cond.Wait(sdata->sdata_wait_lock); + sdata->sdata_wait_lock.Unlock(); + sdata->shard_lock.Lock(); + if (sdata->pqueue->empty() && sdata->context_queue.empty()) { + sdata->shard_lock.Unlock(); + return; + } + osd->cct->get_heartbeat_map()->reset_timeout(hb, + osd->cct->_conf->threadpool_default_timeout, 0); + } else { + dout(20) << __func__ << " need return immediately" << dendl; + sdata->sdata_wait_lock.Unlock(); + sdata->shard_lock.Unlock(); + return; + } + } + } else if (sdata->pqueue->empty()) { sdata->sdata_wait_lock.Lock(); if (!sdata->stop_waiting) { dout(20) << __func__ << " empty q, waiting" << dendl; @@ -10250,7 +10279,7 @@ void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb) sdata->sdata_cond.Wait(sdata->sdata_wait_lock); sdata->sdata_wait_lock.Unlock(); sdata->shard_lock.Lock(); - if (sdata->pqueue->empty() && sdata->context_queue.empty()) { + if (sdata->pqueue->empty()) { sdata->shard_lock.Unlock(); return; } @@ -10270,7 +10299,7 @@ void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb) } list oncommits; - if (!sdata->context_queue.empty()) { + if (is_smallest_thread_index && !sdata->context_queue.empty()) { sdata->context_queue.swap(oncommits); } diff --git a/src/osd/OSD.h b/src/osd/OSD.h index 737029eaa662..58de6048da5c 100644 --- a/src/osd/OSD.h +++ b/src/osd/OSD.h @@ -1776,7 +1776,11 @@ protected: auto &&sdata = osd->shards[shard_index]; ceph_assert(sdata); Mutex::Locker l(sdata->shard_lock); - return sdata->pqueue->empty() && sdata->context_queue.empty(); + if (thread_index < osd->num_shards) { + return sdata->pqueue->empty() && sdata->context_queue.empty(); + } else { + return sdata->pqueue->empty(); + } } void handle_oncommits(list& oncommits) {