]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
OSD:Derived sharded queue implementation is changed
authorSomnath Roy <somnath.roy@sandisk.com>
Tue, 20 May 2014 23:20:54 +0000 (16:20 -0700)
committerSomnath Roy <somnath.roy@sandisk.com>
Sat, 31 May 2014 01:45:42 +0000 (18:45 -0700)
All the threadpool related stuff like stop/pause/drain etc. are not
handled by sharded queue anymore. All it is implementing are related to
processing,enqueue , signaling of waiting threads and shard queue status.
The pg ordering is been taken care of by introducing a map <pg,op> in each
shard.

Signed-off-by: Somnath Roy <somnath.roy@sandisk.com>
src/osd/OSD.cc
src/osd/OSD.h

index 978f302408b8b4a4efdf32a19a3ebe4a15466945..aa947529164260955959ab954fa2aeca2c1f93c0 100644 (file)
@@ -966,6 +966,7 @@ OSD::OSD(CephContext *cct_, ObjectStore *store_,
   next_removal_seq(0),
   service(this)
 {
+  assert(cct->_conf->osd_op_num_sharded_pool_threads >= cct->_conf->osd_op_num_shards);
   monc->set_messenger(client_messenger);
   op_tracker.set_complaint_and_threshold(cct->_conf->osd_op_complaint_time,
                                          cct->_conf->osd_op_log_threshold);
@@ -7961,6 +7962,116 @@ void OSD::enqueue_op(PG *pg, OpRequestRef op)
   pg->queue_op(op);
 }
 
+void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb, atomic_t& in_process ) {
+
+  uint32_t shard_index = thread_index % num_shards;
+
+  ShardData* sdata = shard_list[shard_index];
+  assert(NULL != sdata);
+  sdata->sdata_op_ordering_lock.Lock();
+  if (sdata->pqueue.empty()) {
+    sdata->sdata_op_ordering_lock.Unlock();
+    osd->cct->get_heartbeat_map()->reset_timeout(hb, 4, 0);
+    sdata->sdata_lock.Lock();
+    sdata->sdata_cond.WaitInterval(osd->cct, sdata->sdata_lock, utime_t(2, 0));
+    sdata->sdata_lock.Unlock();
+    sdata->sdata_op_ordering_lock.Lock();
+    if(sdata->pqueue.empty()) {
+      sdata->sdata_op_ordering_lock.Unlock();
+      return;
+    }
+  }
+
+  pair<PGRef, OpRequestRef> item = sdata->pqueue.dequeue();
+  sdata->pg_for_processing[&*(item.first)].push_back(item.second);
+  sdata->sdata_op_ordering_lock.Unlock();
+  in_process.inc();
+  ThreadPool::TPHandle tp_handle(osd->cct, hb, timeout_interval, suicide_interval);
+
+  (item.first)->lock_suspend_timeout(tp_handle);
+
+  OpRequestRef op;
+  {
+    Mutex::Locker l(sdata->sdata_op_ordering_lock);
+    if (!sdata->pg_for_processing.count(&*(item.first))) {
+      (item.first)->unlock();
+      return;
+    }
+    assert(sdata->pg_for_processing[&*(item.first)].size());
+    op = sdata->pg_for_processing[&*(item.first)].front();
+    sdata->pg_for_processing[&*(item.first)].pop_front();
+    if (!(sdata->pg_for_processing[&*(item.first)].size()))
+      sdata->pg_for_processing.erase(&*(item.first));
+  }  
+
+  lgeneric_subdout(osd->cct, osd, 30) << "dequeue status: ";
+  Formatter *f = new_formatter("json");
+  f->open_object_section("q");
+  dump(f);
+  f->close_section();
+  f->flush(*_dout);
+  delete f;
+  *_dout << dendl;
+
+  osd->dequeue_op(item.first, op, tp_handle);
+  (item.first)->unlock();
+  in_process.dec();
+
+}
+
+void OSD::ShardedOpWQ::_enqueue(pair <PGRef, OpRequestRef> item) {
+
+  uint32_t shard_index = (((item.first)->get_pgid().ps())% shard_list.size());
+
+  ShardData* sdata = shard_list[shard_index];
+  assert (NULL != sdata);
+  unsigned priority = item.second->get_req()->get_priority();
+  unsigned cost = item.second->get_req()->get_cost();
+  sdata->sdata_op_ordering_lock.Lock();
+  if (priority >= CEPH_MSG_PRIO_LOW)
+    sdata->pqueue.enqueue_strict(
+      item.second->get_req()->get_source_inst(), priority, item);
+  else
+    sdata->pqueue.enqueue(item.second->get_req()->get_source_inst(),
+      priority, cost, item);
+  sdata->sdata_op_ordering_lock.Unlock();
+
+  sdata->sdata_lock.Lock();
+  sdata->sdata_cond.SignalOne();
+  sdata->sdata_lock.Unlock();
+
+}
+
+void OSD::ShardedOpWQ::_enqueue_front(pair <PGRef, OpRequestRef> item) {
+
+  uint32_t shard_index = (((item.first)->get_pgid().ps())% shard_list.size());
+
+  ShardData* sdata = shard_list[shard_index];
+  assert (NULL != sdata);
+  sdata->sdata_op_ordering_lock.Lock();
+  if (sdata->pg_for_processing.count(&*(item.first))) {
+    sdata->pg_for_processing[&*(item.first)].push_front(item.second);
+    item.second = sdata->pg_for_processing[&*(item.first)].back();
+    sdata->pg_for_processing[&*(item.first)].pop_back();
+  }
+  unsigned priority = item.second->get_req()->get_priority();
+  unsigned cost = item.second->get_req()->get_cost();
+  if (priority >= CEPH_MSG_PRIO_LOW)
+    sdata->pqueue.enqueue_strict_front(
+      item.second->get_req()->get_source_inst(),priority, item);
+  else
+    sdata->pqueue.enqueue_front(item.second->get_req()->get_source_inst(),
+      priority, cost, item);
+
+  sdata->sdata_op_ordering_lock.Unlock();
+  sdata->sdata_lock.Lock();
+  sdata->sdata_cond.SignalOne();
+  sdata->sdata_lock.Unlock();
+
+}
+
+
 
 void OSDService::dequeue_pg(PG *pg, list<OpRequestRef> *dequeued)
 {
index 0e27d2161ac93845cc21d5b0536d0f045070d8b6..08c1219c00e01fdc8e5cb163bb9257b8d23f1c05 100644 (file)
@@ -1300,26 +1300,29 @@ private:
     struct ShardData {
       Mutex sdata_lock;
       Cond sdata_cond;
+      Mutex sdata_op_ordering_lock;
+      map<PG*, list<OpRequestRef> > pg_for_processing;
       PrioritizedQueue< pair<PGRef, OpRequestRef>, entity_inst_t> pqueue;
-      ShardData(string lock_name, uint64_t max_tok_per_prio, uint64_t min_cost):
+      ShardData(string lock_name, string ordering_lock, uint64_t max_tok_per_prio, uint64_t min_cost):
           sdata_lock(lock_name.c_str()),
+          sdata_op_ordering_lock(ordering_lock.c_str()),
           pqueue(max_tok_per_prio, min_cost) {}
     };
 
     vector<ShardData*> shard_list;
     OSD *osd;
     uint32_t num_shards;
-    Mutex opQ_lock;
-    Cond  opQ_cond;
 
     public:
       ShardedOpWQ(uint32_t pnum_shards, OSD *o, time_t ti, ShardedThreadPool* tp):
         ShardedThreadPool::ShardedWQ < pair <PGRef, OpRequestRef> >(ti, ti*10, tp),
-        osd(o), num_shards(pnum_shards), opQ_lock("OSD::ShardedOpWQLock") {
+        osd(o), num_shards(pnum_shards) {
         for(uint32_t i = 0; i < num_shards; i++) {
           char lock_name[32] = {0};
-          snprintf(lock_name, sizeof(lock_name), "%s.%d", "OSD::ShardedOpWQ::", i);
-          ShardData* one_shard = new ShardData(lock_name, osd->cct->_conf->osd_op_pq_max_tokens_per_priority,
+          snprintf(lock_name, sizeof(lock_name), "%s.%d", "OSD:ShardedOpWQ:", i);
+          char order_lock[32] = {0};
+          snprintf(order_lock, sizeof(order_lock), "%s.%d", "OSD:ShardedOpWQ:order:", i);
+          ShardData* one_shard = new ShardData(lock_name, order_lock, osd->cct->_conf->osd_op_pq_max_tokens_per_priority,
                                               osd->cct->_conf->osd_op_pq_min_cost);
           shard_list.push_back(one_shard);
         }
@@ -1333,190 +1336,28 @@ private:
         }
       }
 
-      void _process(uint32_t thread_index, heartbeat_handle_d *hb ) {
-
-       uint32_t shard_index = thread_index % num_shards;
-
-        ShardData* sdata = shard_list[shard_index];
-
-        if (NULL != sdata) {
-
-          sdata->sdata_lock.Lock();
-
-          while (true) {
-
-           while(!sdata->pqueue.empty()) {
-
-             if (pause_threads.read() != 0){
-
-               break;
-             }
-
-             in_process.inc();
-             ThreadPool::TPHandle tp_handle(osd->cct, hb, timeout_interval, suicide_interval);
-             tp_handle.reset_tp_timeout();
-
-             pair<PGRef, OpRequestRef> item = sdata->pqueue.dequeue();
-
-             (item.first)->lock_suspend_timeout(tp_handle);
-            //unlocking after holding the PG lock as it should maintain the op order
-             sdata->sdata_lock.Unlock();
-             //Should it be within some config option ?
-            lgeneric_subdout(osd->cct, osd, 30) << "dequeue status: ";
-             Formatter *f = new_formatter("json");
-             f->open_object_section("q");
-             dump(f);
-             f->close_section();
-             f->flush(*_dout);
-             delete f;
-             *_dout << dendl;
-
-             osd->dequeue_op(item.first, item.second, tp_handle);
-             (item.first)->unlock();
-
-             sdata->sdata_lock.Lock();
-            in_process.dec();
-            if ((pause_threads.read() != 0) || (drain_threads.read() != 0)) {
-               opQ_lock.Lock();               
-               opQ_cond.Signal();
-              opQ_lock.Unlock();
-             }          
-           }
-
-           if (stop_threads.read() != 0){
-             break;
-           }
-
-           osd->cct->get_heartbeat_map()->reset_timeout(hb, 4, 0);
-           sdata->sdata_cond.WaitInterval(osd->cct, sdata->sdata_lock, utime_t(2, 0));
-
-         }
-         sdata->sdata_lock.Unlock();
-
-        } else {
-          assert(0);
-        }
-
-      }
-
-      void stop_threads_on_queue() {
-        stop_threads.set(1);
-        for(uint32_t i = 0; i < num_shards; i++) {
-          ShardData* sdata = shard_list[i];
-          if (NULL != sdata) {
-            sdata->sdata_lock.Lock();
-            sdata->sdata_cond.Signal();
-            sdata->sdata_lock.Unlock();
-          }
-        }
+      void _process(uint32_t thread_index, heartbeat_handle_d *hb, atomic_t& in_process );
+      void _enqueue(pair <PGRef, OpRequestRef> item);
+      void _enqueue_front(pair <PGRef, OpRequestRef> item);
       
-      }
-
-      void pause_threads_on_queue() {
-        pause_threads.set(1);
-        opQ_lock.Lock();
-        while (in_process.read()) {
-         opQ_cond.Wait(opQ_lock);
-        }
-        opQ_lock.Unlock();
-
-      }
-
-      void pause_new_threads_on_queue() {
-        pause_threads.set(1);
-
-      }
-
-      void unpause_threads_on_queue() {
-        pause_threads.set(0);
+      void return_waiting_threads() {
         for(uint32_t i = 0; i < num_shards; i++) {
           ShardData* sdata = shard_list[i];
-          if (NULL != sdata) {
-            sdata->sdata_lock.Lock();
-            sdata->sdata_cond.Signal();
-            sdata->sdata_lock.Unlock();
-          }
-        }
-
-      }
-
-      void drain_threads_on_queue() {
-        drain_threads.set(1);
-       opQ_lock.Lock();
-        for(uint32_t i = 0; i < num_shards; i++) {
-         if (!_empty(i)) {
-            opQ_cond.Wait(opQ_lock);
-          }
-        }
-        while (in_process.read()){
-          opQ_cond.Wait(opQ_lock);
-        }
-        opQ_lock.Unlock();
-
-        drain_threads.set(0);
-      }
-      
-      void drain() {
-
-       drain_threads_on_queue();
-      }
-
-      void _enqueue(pair <PGRef, OpRequestRef> item) {
-
-        uint32_t shard_index = (((item.first)->get_pgid().ps())% shard_list.size());
-
-        ShardData* sdata = shard_list[shard_index];
-        if (NULL != sdata) {
-          unsigned priority = item.second->get_req()->get_priority();
-          unsigned cost = item.second->get_req()->get_cost();
-          sdata->sdata_lock.Lock();
-          if (priority >= CEPH_MSG_PRIO_LOW)
-            sdata->pqueue.enqueue_strict(
-              item.second->get_req()->get_source_inst(), priority, item);
-          else
-            sdata->pqueue.enqueue(item.second->get_req()->get_source_inst(),
-              priority, cost, item);
-
-
-          sdata->sdata_cond.SignalOne();
-          sdata->sdata_lock.Unlock();
-        } else {
-          assert(0);
-        }
-      }
-
-      void _enqueue_front(pair <PGRef, OpRequestRef> item) {
-
-       uint32_t shard_index = (((item.first)->get_pgid().ps())% shard_list.size());
-
-        ShardData* sdata = shard_list[shard_index];
-        if (NULL != sdata) {
-          unsigned priority = item.second->get_req()->get_priority();
-          unsigned cost = item.second->get_req()->get_cost();
+          assert (NULL != sdata); 
           sdata->sdata_lock.Lock();
-          if (priority >= CEPH_MSG_PRIO_LOW)
-            sdata->pqueue.enqueue_strict_front(
-              item.second->get_req()->get_source_inst(),priority, item);
-          else
-            sdata->pqueue.enqueue_front(item.second->get_req()->get_source_inst(),
-              priority, cost, item);
-
-          sdata->sdata_cond.SignalOne();
+          sdata->sdata_cond.Signal();
           sdata->sdata_lock.Unlock();
-        } else {
-          assert(0);
         }
+      
       }
 
-
       void dump(Formatter *f) {
         for(uint32_t i = 0; i < num_shards; i++) {
           ShardData* sdata = shard_list[i];
-          if (NULL != sdata) {
-            sdata->sdata_lock.Lock();
-            sdata->pqueue.dump(f);
-            sdata->sdata_lock.Unlock();
-          }
+          assert (NULL != sdata);
+          sdata->sdata_op_ordering_lock.Lock();
+          sdata->pqueue.dump(f);
+          sdata->sdata_op_ordering_lock.Unlock();
         }
       }
 
@@ -1530,44 +1371,58 @@ private:
 
       void dequeue(PG *pg, list<OpRequestRef> *dequeued = 0) {
         ShardData* sdata = NULL;
-        if (pg) {
-          uint32_t shard_index = pg->get_pgid().ps()% shard_list.size();
-          sdata = shard_list[shard_index];
-          if (!sdata) {
-            assert(0);
-          }
-        } else {
-          assert(0);
-        }
-
+        assert(pg != NULL);
+        uint32_t shard_index = pg->get_pgid().ps()% shard_list.size();
+        sdata = shard_list[shard_index];
+        assert(sdata != NULL);
         if (!dequeued) {
-          sdata->sdata_lock.Lock();
+          sdata->sdata_op_ordering_lock.Lock();
           sdata->pqueue.remove_by_filter(Pred(pg));
-          sdata->sdata_lock.Unlock();
+          sdata->pg_for_processing.erase(pg);
+          sdata->sdata_op_ordering_lock.Unlock();
         } else {
           list<pair<PGRef, OpRequestRef> > _dequeued;
-          sdata->sdata_lock.Lock();
+          sdata->sdata_op_ordering_lock.Lock();
           sdata->pqueue.remove_by_filter(Pred(pg), &_dequeued);
-          sdata->sdata_lock.Unlock();
           for (list<pair<PGRef, OpRequestRef> >::iterator i = _dequeued.begin();
             i != _dequeued.end(); ++i) {
             dequeued->push_back(i->second);
           }
+         if (sdata->pg_for_processing.count(pg)) {
+           dequeued->splice(
+             dequeued->begin(),
+             sdata->pg_for_processing[pg]);
+           sdata->pg_for_processing.erase(pg);
+         }
+          sdata->sdata_op_ordering_lock.Unlock();          
         }
 
       }
 
-      bool _empty(uint32_t shard_index) {
-        ShardData* sdata = shard_list[shard_index];
-        if (NULL != sdata) {
-          sdata->sdata_lock.Lock();
-          bool is_empty = sdata->pqueue.empty();
-          sdata->sdata_lock.Unlock();
-          return is_empty;
+      bool is_all_shard_empty() {
+        bool is_empty = true;
+        for(uint32_t i = 0; i < num_shards; i++) {
+          ShardData* sdata = shard_list[i];
+          assert(NULL != sdata);
+          sdata->sdata_op_ordering_lock.Lock();
+          if (!sdata->pqueue.empty()) {
+           is_empty = false;
+            sdata->sdata_op_ordering_lock.Unlock();
+            break;
+          }
+          sdata->sdata_op_ordering_lock.Unlock();
         }
-        return true;
+        return is_empty;
 
       }
+      bool is_shard_empty(uint32_t shard_index) {
+
+        ShardData* sdata = shard_list[shard_index];
+        assert(NULL != sdata);
+        Mutex::Locker l(sdata->sdata_op_ordering_lock);
+        return sdata->pqueue.empty();
+      }
 
   } op_shardedwq;