]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
common/WorkQueue: fix a race avoiding drain endless. 17659/head
authorJianpeng Ma <jianpeng.ma@intel.com>
Mon, 6 Nov 2017 13:40:12 +0000 (21:40 +0800)
committerJianpeng Ma <jianpeng.ma@intel.com>
Mon, 6 Nov 2017 13:40:12 +0000 (21:40 +0800)
In function ShardedThreadPool::shardedthreadpool_worker, when call
_proces, it don't get shardedpool_lock. So there is a race between
return_waiting_threads and _proces. If return_waiting_threads first run
and _process will lost this signal and wait until.
This may cause ShardedThreadPool::drain can't complete.

Signed-off-by: Jianpeng Ma <jianpeng.ma@intel.com>
src/common/WorkQueue.cc
src/common/WorkQueue.h
src/osd/OSD.cc
src/osd/OSD.h

index 7da8c85d37448e5fb11fd03bd56c0bdf1ba121fd..5c8c772ab496f30e436fe13fc2f5bd3757dea045 100644 (file)
@@ -416,6 +416,7 @@ void ShardedThreadPool::unpause()
   ldout(cct,10) << "unpause" << dendl;
   shardedpool_lock.Lock();
   pause_threads = false;
+  wq->stop_return_waiting_threads();
   shardedpool_cond.Signal();
   shardedpool_lock.Unlock();
   ldout(cct,10) << "unpaused" << dendl;
@@ -432,6 +433,7 @@ void ShardedThreadPool::drain()
     wait_cond.Wait(shardedpool_lock);
   }
   drain_threads = false;
+  wq->stop_return_waiting_threads();
   shardedpool_cond.Signal();
   shardedpool_lock.Unlock();
   ldout(cct,10) << "drained" << dendl;
index 9d055050ad792dcd1b956692be604fe9c7f32292..acf1787cd4ae3483eb12d22943f24d5f78e308b1 100644 (file)
@@ -644,6 +644,7 @@ public:
 
     virtual void _process(uint32_t thread_index, heartbeat_handle_d *hb ) = 0;
     virtual void return_waiting_threads() = 0;
+    virtual void stop_return_waiting_threads() = 0;
     virtual bool is_shard_empty(uint32_t thread_index) = 0;
   };      
 
index 43c13f1e074c911736b58fbf77b86adb07aabb58..298b95b73d2d5681ee21c46a361dc3d5d4faaf60 100644 (file)
@@ -9754,19 +9754,26 @@ void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb)
   // peek at spg_t
   sdata->sdata_op_ordering_lock.Lock();
   if (sdata->pqueue->empty()) {
-    dout(20) << __func__ << " empty q, waiting" << dendl;
-    osd->cct->get_heartbeat_map()->clear_timeout(hb);
     sdata->sdata_lock.Lock();
-    sdata->sdata_op_ordering_lock.Unlock();
-    sdata->sdata_cond.Wait(sdata->sdata_lock);
-    sdata->sdata_lock.Unlock();
-    sdata->sdata_op_ordering_lock.Lock();
-    if (sdata->pqueue->empty()) {
+    if (!sdata->stop_waiting) {
+      dout(20) << __func__ << " empty q, waiting" << dendl;
+      osd->cct->get_heartbeat_map()->clear_timeout(hb);
+      sdata->sdata_op_ordering_lock.Unlock();
+      sdata->sdata_cond.Wait(sdata->sdata_lock);
+      sdata->sdata_lock.Unlock();
+      sdata->sdata_op_ordering_lock.Lock();
+      if (sdata->pqueue->empty()) {
+       sdata->sdata_op_ordering_lock.Unlock();
+       return;
+      }
+      osd->cct->get_heartbeat_map()->reset_timeout(hb,
+         osd->cct->_conf->threadpool_default_timeout, 0);
+    } else {
+      dout(0) << __func__ << " need return immediately" << dendl;
+      sdata->sdata_lock.Unlock();
       sdata->sdata_op_ordering_lock.Unlock();
       return;
     }
-    osd->cct->get_heartbeat_map()->reset_timeout(hb,
-      osd->cct->_conf->threadpool_default_timeout, 0);
   }
   OpQueueItem item = sdata->pqueue->dequeue();
   if (osd->is_stopping()) {
index eb59f9095c4c43ca5798c1acb4cdd59b63982792..9e3d4a0c84e1b6d95eebc5d039ff2ce79d06ea45 100644 (file)
@@ -1628,6 +1628,8 @@ private:
       /// priority queue
       std::unique_ptr<OpQueue<OpQueueItem, uint64_t>> pqueue;
 
+      bool stop_waiting = false;
+
       void _enqueue_front(OpQueueItem&& item, unsigned cutoff) {
        unsigned priority = item.get_priority();
        unsigned cost = item.get_cost();
@@ -1723,11 +1725,22 @@ private:
        ShardData* sdata = shard_list[i];
        assert (NULL != sdata); 
        sdata->sdata_lock.Lock();
+       sdata->stop_waiting = true;
        sdata->sdata_cond.Signal();
        sdata->sdata_lock.Unlock();
       }
     }
 
+    void stop_return_waiting_threads() override {
+      for(uint32_t i = 0; i < num_shards; i++) {
+       ShardData* sdata = shard_list[i];
+       assert (NULL != sdata);
+       sdata->sdata_lock.Lock();
+       sdata->stop_waiting = false;
+       sdata->sdata_lock.Unlock();
+      }
+    }
+
     void dump(Formatter *f) {
       for(uint32_t i = 0; i < num_shards; i++) {
        auto &&sdata = shard_list[i];