]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
osd/OSD: choose a fixed thread do oncommits callback function 22739/head
authorJianpeng Ma <jianpeng.ma@intel.com>
Thu, 20 Sep 2018 14:10:20 +0000 (22:10 +0800)
committerJianpeng Ma <jianpeng.ma@intel.com>
Thu, 20 Sep 2018 14:10:20 +0000 (22:10 +0800)
Now bluestore oncommit callback exec by osd op threads.
If there are multi threads of shard, it will cause out-of order.
For example, threads_per_shard=2
              Thread1                                 Thread2
    swap_oncommits(op1_oncommit)
                                            swap_oncommits(op2_oncommit)
    OpQueueItem.run(Op3)
                                            op2_oncommit.complete();
    op1_oncommit.complete()

This make oncommits out of order.
To avoiding this, we choose a fixed thread which has the smallest
thread_index of shard to do oncommit callback function.

Signed-off-by: Jianpeng Ma <jianpeng.ma@intel.com>
src/common/Finisher.h
src/osd/OSD.cc
src/osd/OSD.h

index 94607ea21a679aece473fa1a77c83956e251b661..1d0a4a3f5cf226e8c5b2925bca9ab572a06611a2 100644 (file)
@@ -206,7 +206,7 @@ public:
 
     if (empty) {
       mutex.Lock();
-      cond.SignalOne();
+      cond.Signal();
       mutex.Unlock();
     }
 
index 8b9f00dd71bf809a144d98a5f0af43995332778d..d85a7dde1bd17ec042d2644bd76653b3366333f7 100644 (file)
@@ -10239,9 +10239,38 @@ void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb)
   uint32_t shard_index = thread_index % osd->num_shards;
   auto& sdata = osd->shards[shard_index];
   ceph_assert(sdata);
+
+  // If all threads of shards do oncommits, there is a out-of-order problem.
+  // So we choose the thread which has the smallest thread_index(thread_index < num_shards) of shard
+  // to do oncommit callback.
+  bool is_smallest_thread_index = thread_index < osd->num_shards;
+
   // peek at spg_t
   sdata->shard_lock.Lock();
-  if (sdata->pqueue->empty() && sdata->context_queue.empty()) {
+  if (is_smallest_thread_index) {
+    if (sdata->pqueue->empty() && sdata->context_queue.empty()) {
+      sdata->sdata_wait_lock.Lock();
+      if (!sdata->stop_waiting) {
+       dout(20) << __func__ << " empty q, waiting" << dendl;
+       osd->cct->get_heartbeat_map()->clear_timeout(hb);
+       sdata->shard_lock.Unlock();
+       sdata->sdata_cond.Wait(sdata->sdata_wait_lock);
+       sdata->sdata_wait_lock.Unlock();
+       sdata->shard_lock.Lock();
+       if (sdata->pqueue->empty() && sdata->context_queue.empty()) {
+         sdata->shard_lock.Unlock();
+         return;
+       }
+       osd->cct->get_heartbeat_map()->reset_timeout(hb,
+           osd->cct->_conf->threadpool_default_timeout, 0);
+      } else {
+       dout(20) << __func__ << " need return immediately" << dendl;
+       sdata->sdata_wait_lock.Unlock();
+       sdata->shard_lock.Unlock();
+       return;
+      }
+    }
+  } else if (sdata->pqueue->empty()) {
     sdata->sdata_wait_lock.Lock();
     if (!sdata->stop_waiting) {
       dout(20) << __func__ << " empty q, waiting" << dendl;
@@ -10250,7 +10279,7 @@ void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb)
       sdata->sdata_cond.Wait(sdata->sdata_wait_lock);
       sdata->sdata_wait_lock.Unlock();
       sdata->shard_lock.Lock();
-      if (sdata->pqueue->empty() && sdata->context_queue.empty()) {
+      if (sdata->pqueue->empty()) {
        sdata->shard_lock.Unlock();
        return;
       }
@@ -10270,7 +10299,7 @@ void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb)
   }
 
   list<Context *> oncommits;
-  if (!sdata->context_queue.empty()) {
+  if (is_smallest_thread_index && !sdata->context_queue.empty()) {
     sdata->context_queue.swap(oncommits);
   }
 
index 737029eaa66241a238509ef4ee2be65c55a6dc06..58de6048da5c1fcfc2f836609a8f9cf3c25896d9 100644 (file)
@@ -1776,7 +1776,11 @@ protected:
       auto &&sdata = osd->shards[shard_index];
       ceph_assert(sdata);
       Mutex::Locker l(sdata->shard_lock);
-      return sdata->pqueue->empty() && sdata->context_queue.empty();
+      if (thread_index < osd->num_shards) {
+       return sdata->pqueue->empty() && sdata->context_queue.empty();
+      } else {
+       return sdata->pqueue->empty();
+      }
     }
 
     void handle_oncommits(list<Context*>& oncommits) {