]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
OSD: Sharded Op worker queue implementation for handling OSD ops
authorSomnath Roy <somnath.roy@sandisk.com>
Wed, 14 May 2014 23:13:05 +0000 (16:13 -0700)
committerSomnath Roy <somnath.roy@sandisk.com>
Sat, 31 May 2014 01:44:51 +0000 (18:44 -0700)
This is the implementation for the client of the
sharded thread pool/sharded workQ. Removed the op_wq class and now OSD
ops are going through sharded workqueue model which is used by the
sharded threadpool. Derived ShardedOpWQ implementation has a data structure
called ShardData which has it's own lock/cond and storage. ShardedOpWQ
holds a vector of that and the size of the vector is a config option.
During enqueue operation on the queue, the ops are sharded across these
ShardData based on pg hash % number of shards.
Similarly, in the _process function the sharded thread pool threads are
divided across ShardData based on thread index % number of shards

Signed-off-by: Somnath Roy <somnath.roy@sandisk.com>
src/common/config_opts.h
src/osd/OSD.cc
src/osd/OSD.h

index d003c4b948d296521d0c9dc1a70c8268789f667b..8d28e8a9a42d15070dc540b78bc6b8c2541f9200 100644 (file)
@@ -466,6 +466,8 @@ OPTION(osd_op_pq_min_cost, OPT_U64, 65536)
 OPTION(osd_disk_threads, OPT_INT, 1)
 OPTION(osd_recovery_threads, OPT_INT, 1)
 OPTION(osd_recover_clone_overlap, OPT_BOOL, true)   // preserve clone_overlap during recovery/migration
+OPTION(osd_op_num_sharded_pool_threads, OPT_INT, 10)
+OPTION(osd_op_num_shards, OPT_INT, 5)
 
 // Only use clone_overlap for recovery if there are fewer than
 // osd_recover_clone_overlap_limit entries in the overlap set
index 91ec596b72109a014c81ae4d83adf954cfd90af9..978f302408b8b4a4efdf32a19a3ebe4a15466945 100644 (file)
@@ -181,7 +181,7 @@ OSDService::OSDService(OSD *osd) :
   logger(osd->logger),
   recoverystate_perf(osd->recoverystate_perf),
   monc(osd->monc),
-  op_wq(osd->op_wq),
+  op_wq(osd->op_shardedwq),
   peering_wq(osd->peering_wq),
   recovery_wq(osd->recovery_wq),
   snap_trim_wq(osd->snap_trim_wq),
@@ -923,6 +923,7 @@ OSD::OSD(CephContext *cct_, ObjectStore *store_,
   osd_compat(get_osd_compat_set()),
   state_lock(), state(STATE_INITIALIZING),
   op_tp(cct, "OSD::op_tp", cct->_conf->osd_op_threads, "osd_op_threads"),
+  op_sharded_tp(cct, "OSD::op_sharded_tp", cct->_conf->osd_op_num_sharded_pool_threads),
   recovery_tp(cct, "OSD::recovery_tp", cct->_conf->osd_recovery_threads, "osd_recovery_threads"),
   disk_tp(cct, "OSD::disk_tp", cct->_conf->osd_disk_threads, "osd_disk_threads"),
   command_tp(cct, "OSD::command_tp", 1),
@@ -940,7 +941,7 @@ OSD::OSD(CephContext *cct_, ObjectStore *store_,
   finished_lock("OSD::finished_lock"),
   op_tracker(cct, cct->_conf->osd_enable_op_tracker),
   test_ops_hook(NULL),
-  op_wq(this, cct->_conf->osd_op_thread_timeout, &op_tp),
+  op_shardedwq(cct->_conf->osd_op_num_shards, this, cct->_conf->osd_op_thread_timeout, &op_sharded_tp),
   peering_wq(this, cct->_conf->osd_op_thread_timeout, &op_tp),
   map_lock("OSD::map_lock"),
   pg_map_lock("OSD::pg_map_lock"),
@@ -1052,7 +1053,7 @@ bool OSD::asok_command(string command, cmdmap_t& cmdmap, string format,
     op_tracker.dump_historic_ops(f);
   } else if (command == "dump_op_pq_state") {
     f->open_object_section("pq");
-    op_wq.dump(f);
+    op_shardedwq.dump(f);
     f->close_section();
   } else if (command == "dump_blacklist") {
     list<pair<entity_addr_t,utime_t> > bl;
@@ -1281,6 +1282,7 @@ int OSD::init()
   monc->set_log_client(&clog);
 
   op_tp.start();
+  op_sharded_tp.start();
   recovery_tp.start();
   disk_tp.start();
   command_tp.start();
@@ -1581,6 +1583,7 @@ void OSD::suicide(int exitcode)
 
   derr << " pausing thread pools" << dendl;
   op_tp.pause();
+  op_sharded_tp.pause();
   disk_tp.pause();
   recovery_tp.pause();
   command_tp.pause();
@@ -1630,7 +1633,7 @@ int OSD::shutdown()
   }
   
   // finish ops
-  op_wq.drain(); // should already be empty except for lagard PGs
+  op_shardedwq.drain(); // should already be empty except for lagard PGs
   {
     Mutex::Locker l(finished_lock);
     finished.clear(); // zap waiters (bleh, this is messy)
@@ -1673,6 +1676,10 @@ int OSD::shutdown()
   op_tp.stop();
   dout(10) << "op tp stopped" << dendl;
 
+  op_sharded_tp.drain();
+  op_sharded_tp.stop();
+  dout(10) << "op sharded tp stopped" << dendl;
+
   command_tp.drain();
   command_tp.stop();
   dout(10) << "command tp stopped" << dendl;
@@ -1714,7 +1721,6 @@ int OSD::shutdown()
     Mutex::Locker l(pg_stat_queue_lock);
     assert(pg_stat_queue.empty());
   }
-
   peering_wq.clear();
   // Remove PGs
 #ifdef PG_DEBUG_REFS
@@ -7955,88 +7961,10 @@ void OSD::enqueue_op(PG *pg, OpRequestRef op)
   pg->queue_op(op);
 }
 
-void OSD::OpWQ::_enqueue(pair<PGRef, OpRequestRef> item)
-{
-  unsigned priority = item.second->get_req()->get_priority();
-  unsigned cost = item.second->get_req()->get_cost();
-  if (priority >= CEPH_MSG_PRIO_LOW)
-    pqueue.enqueue_strict(
-      item.second->get_req()->get_source_inst(),
-      priority, item);
-  else
-    pqueue.enqueue(item.second->get_req()->get_source_inst(),
-      priority, cost, item);
-  osd->logger->set(l_osd_opq, pqueue.length());
-}
-
-void OSD::OpWQ::_enqueue_front(pair<PGRef, OpRequestRef> item)
-{
-  Mutex::Locker l(qlock);
-  if (pg_for_processing.count(&*(item.first))) {
-    pg_for_processing[&*(item.first)].push_front(item.second);
-    item.second = pg_for_processing[&*(item.first)].back();
-    pg_for_processing[&*(item.first)].pop_back();
-  }
-  unsigned priority = item.second->get_req()->get_priority();
-  unsigned cost = item.second->get_req()->get_cost();
-  if (priority >= CEPH_MSG_PRIO_LOW)
-    pqueue.enqueue_strict_front(
-      item.second->get_req()->get_source_inst(),
-      priority, item);
-  else
-    pqueue.enqueue_front(item.second->get_req()->get_source_inst(),
-      priority, cost, item);
-  osd->logger->set(l_osd_opq, pqueue.length());
-}
-
-PGRef OSD::OpWQ::_dequeue()
-{
-  assert(!pqueue.empty());
-  PGRef pg;
-  {
-    Mutex::Locker l(qlock);
-    pair<PGRef, OpRequestRef> ret = pqueue.dequeue();
-    pg = ret.first;
-    pg_for_processing[&*pg].push_back(ret.second);
-  }
-  osd->logger->set(l_osd_opq, pqueue.length());
-  return pg;
-}
-
-void OSD::OpWQ::_process(PGRef pg, ThreadPool::TPHandle &handle)
-{
-  pg->lock_suspend_timeout(handle);
-  OpRequestRef op;
-  {
-    Mutex::Locker l(qlock);
-    if (!pg_for_processing.count(&*pg)) {
-      pg->unlock();
-      return;
-    }
-    assert(pg_for_processing[&*pg].size());
-    op = pg_for_processing[&*pg].front();
-    pg_for_processing[&*pg].pop_front();
-    if (!(pg_for_processing[&*pg].size()))
-      pg_for_processing.erase(&*pg);
-  }
-
-  lgeneric_subdout(osd->cct, osd, 30) << "dequeue status: ";
-  Formatter *f = new_formatter("json");
-  f->open_object_section("q");
-  dump(f);
-  f->close_section();
-  f->flush(*_dout);
-  delete f;
-  *_dout << dendl;
-
-  osd->dequeue_op(pg, op, handle);
-  pg->unlock();
-}
-
 
 void OSDService::dequeue_pg(PG *pg, list<OpRequestRef> *dequeued)
 {
-  osd->op_wq.dequeue(pg, dequeued);
+  osd->op_shardedwq.dequeue(pg, dequeued);
 }
 
 /*
index 0f5f05560c144ab396e93c31e14d6c1d6f21268c..0e27d2161ac93845cc21d5b0536d0f045070d8b6 100644 (file)
@@ -321,7 +321,7 @@ public:
   PerfCounters *&logger;
   PerfCounters *&recoverystate_perf;
   MonClient   *&monc;
-  ThreadPool::WorkQueueVal<pair<PGRef, OpRequestRef>, PGRef> &op_wq;
+  ShardedThreadPool::ShardedWQ < pair <PGRef, OpRequestRef> > &op_wq;
   ThreadPool::BatchWorkQueue<PG> &peering_wq;
   ThreadPool::WorkQueue<PG> &recovery_wq;
   ThreadPool::WorkQueue<PG> &snap_trim_wq;
@@ -1087,6 +1087,7 @@ public:
 private:
 
   ThreadPool op_tp;
+  ShardedThreadPool op_sharded_tp;
   ThreadPool recovery_tp;
   ThreadPool disk_tp;
   ThreadPool command_tp;
@@ -1293,65 +1294,283 @@ private:
 
   // -- op queue --
 
-  struct OpWQ: public ThreadPool::WorkQueueVal<pair<PGRef, OpRequestRef>,
-                                              PGRef > {
-    Mutex qlock;
-    map<PG*, list<OpRequestRef> > pg_for_processing;
+  class ShardedOpWQ: public ShardedThreadPool::ShardedWQ < pair <PGRef, OpRequestRef> > {
+
+    struct ShardData {
+      Mutex sdata_lock;
+      Cond sdata_cond;
+      PrioritizedQueue< pair<PGRef, OpRequestRef>, entity_inst_t> pqueue;
+      ShardData(string lock_name, uint64_t max_tok_per_prio, uint64_t min_cost):
+          sdata_lock(lock_name.c_str()),
+          pqueue(max_tok_per_prio, min_cost) {}
+    };
+
+    vector<ShardData*> shard_list;
     OSD *osd;
-    PrioritizedQueue<pair<PGRef, OpRequestRef>, entity_inst_t > pqueue;
-    OpWQ(OSD *o, time_t ti, ThreadPool *tp)
-      : ThreadPool::WorkQueueVal<pair<PGRef, OpRequestRef>, PGRef >(
-       "OSD::OpWQ", ti, ti*10, tp),
-       qlock("OpWQ::qlock"),
-       osd(o),
-       pqueue(o->cct->_conf->osd_op_pq_max_tokens_per_priority,
-              o->cct->_conf->osd_op_pq_min_cost)
-    {}
+    uint32_t num_shards;
+    Mutex opQ_lock;
+    Cond  opQ_cond;
+
+    public:
+      ShardedOpWQ(uint32_t pnum_shards, OSD *o, time_t ti, ShardedThreadPool* tp):
+        ShardedThreadPool::ShardedWQ < pair <PGRef, OpRequestRef> >(ti, ti*10, tp),
+        osd(o), num_shards(pnum_shards), opQ_lock("OSD::ShardedOpWQLock") {
+        for(uint32_t i = 0; i < num_shards; i++) {
+          char lock_name[32] = {0};
+          snprintf(lock_name, sizeof(lock_name), "%s.%d", "OSD::ShardedOpWQ::", i);
+          ShardData* one_shard = new ShardData(lock_name, osd->cct->_conf->osd_op_pq_max_tokens_per_priority,
+                                              osd->cct->_conf->osd_op_pq_min_cost);
+          shard_list.push_back(one_shard);
+        }
+      }
 
-    void dump(Formatter *f) {
-      lock();
-      pqueue.dump(f);
-      unlock();
-    }
+      ~ShardedOpWQ() {
+
+        while(!shard_list.empty()) {
+          delete shard_list.back();
+          shard_list.pop_back();
+        }
+      }
+
+      void _process(uint32_t thread_index, heartbeat_handle_d *hb ) {
+
+       uint32_t shard_index = thread_index % num_shards;
+
+        ShardData* sdata = shard_list[shard_index];
+
+        if (NULL != sdata) {
+
+          sdata->sdata_lock.Lock();
+
+          while (true) {
+
+           while(!sdata->pqueue.empty()) {
+
+             if (pause_threads.read() != 0){
+
+               break;
+             }
 
-    void _enqueue_front(pair<PGRef, OpRequestRef> item);
-    void _enqueue(pair<PGRef, OpRequestRef> item);
-    PGRef _dequeue();
+             in_process.inc();
+             ThreadPool::TPHandle tp_handle(osd->cct, hb, timeout_interval, suicide_interval);
+             tp_handle.reset_tp_timeout();
+
+             pair<PGRef, OpRequestRef> item = sdata->pqueue.dequeue();
+
+             (item.first)->lock_suspend_timeout(tp_handle);
+            //unlocking after holding the PG lock as it should maintain the op order
+             sdata->sdata_lock.Unlock();
+             //Should it be within some config option ?
+            lgeneric_subdout(osd->cct, osd, 30) << "dequeue status: ";
+             Formatter *f = new_formatter("json");
+             f->open_object_section("q");
+             dump(f);
+             f->close_section();
+             f->flush(*_dout);
+             delete f;
+             *_dout << dendl;
+
+             osd->dequeue_op(item.first, item.second, tp_handle);
+             (item.first)->unlock();
+
+             sdata->sdata_lock.Lock();
+            in_process.dec();
+            if ((pause_threads.read() != 0) || (drain_threads.read() != 0)) {
+               opQ_lock.Lock();               
+               opQ_cond.Signal();
+              opQ_lock.Unlock();
+             }          
+           }
+
+           if (stop_threads.read() != 0){
+             break;
+           }
+
+           osd->cct->get_heartbeat_map()->reset_timeout(hb, 4, 0);
+           sdata->sdata_cond.WaitInterval(osd->cct, sdata->sdata_lock, utime_t(2, 0));
+
+         }
+         sdata->sdata_lock.Unlock();
+
+        } else {
+          assert(0);
+        }
 
-    struct Pred {
-      PG *pg;
-      Pred(PG *pg) : pg(pg) {}
-      bool operator()(const pair<PGRef, OpRequestRef> &op) {
-       return op.first == pg;
       }
-    };
-    void dequeue(PG *pg, list<OpRequestRef> *dequeued = 0) {
-      lock();
-      if (!dequeued) {
-       pqueue.remove_by_filter(Pred(pg));
-       pg_for_processing.erase(pg);
-      } else {
-       list<pair<PGRef, OpRequestRef> > _dequeued;
-       pqueue.remove_by_filter(Pred(pg), &_dequeued);
-       for (list<pair<PGRef, OpRequestRef> >::iterator i = _dequeued.begin();
-            i != _dequeued.end();
-            ++i) {
-         dequeued->push_back(i->second);
-       }
-       if (pg_for_processing.count(pg)) {
-         dequeued->splice(
-           dequeued->begin(),
-           pg_for_processing[pg]);
-         pg_for_processing.erase(pg);
-       }
+
+      void stop_threads_on_queue() {
+        stop_threads.set(1);
+        for(uint32_t i = 0; i < num_shards; i++) {
+          ShardData* sdata = shard_list[i];
+          if (NULL != sdata) {
+            sdata->sdata_lock.Lock();
+            sdata->sdata_cond.Signal();
+            sdata->sdata_lock.Unlock();
+          }
+        }
+      
       }
-      unlock();
-    }
-    bool _empty() {
-      return pqueue.empty();
-    }
-    void _process(PGRef pg, ThreadPool::TPHandle &handle);
-  } op_wq;
+
+      void pause_threads_on_queue() {
+        pause_threads.set(1);
+        opQ_lock.Lock();
+        while (in_process.read()) {
+         opQ_cond.Wait(opQ_lock);
+        }
+        opQ_lock.Unlock();
+
+      }
+
+      void pause_new_threads_on_queue() {
+        pause_threads.set(1);
+
+      }
+
+      void unpause_threads_on_queue() {
+        pause_threads.set(0);
+        for(uint32_t i = 0; i < num_shards; i++) {
+          ShardData* sdata = shard_list[i];
+          if (NULL != sdata) {
+            sdata->sdata_lock.Lock();
+            sdata->sdata_cond.Signal();
+            sdata->sdata_lock.Unlock();
+          }
+        }
+
+      }
+
+      void drain_threads_on_queue() {
+        drain_threads.set(1);
+       opQ_lock.Lock();
+        for(uint32_t i = 0; i < num_shards; i++) {
+         if (!_empty(i)) {
+            opQ_cond.Wait(opQ_lock);
+          }
+        }
+        while (in_process.read()){
+          opQ_cond.Wait(opQ_lock);
+        }
+        opQ_lock.Unlock();
+
+        drain_threads.set(0);
+      }
+      
+      void drain() {
+
+       drain_threads_on_queue();
+      }
+
+      void _enqueue(pair <PGRef, OpRequestRef> item) {
+
+        uint32_t shard_index = (((item.first)->get_pgid().ps())% shard_list.size());
+
+        ShardData* sdata = shard_list[shard_index];
+        if (NULL != sdata) {
+          unsigned priority = item.second->get_req()->get_priority();
+          unsigned cost = item.second->get_req()->get_cost();
+          sdata->sdata_lock.Lock();
+          if (priority >= CEPH_MSG_PRIO_LOW)
+            sdata->pqueue.enqueue_strict(
+              item.second->get_req()->get_source_inst(), priority, item);
+          else
+            sdata->pqueue.enqueue(item.second->get_req()->get_source_inst(),
+              priority, cost, item);
+
+
+          sdata->sdata_cond.SignalOne();
+          sdata->sdata_lock.Unlock();
+        } else {
+          assert(0);
+        }
+      }
+
+      void _enqueue_front(pair <PGRef, OpRequestRef> item) {
+
+       uint32_t shard_index = (((item.first)->get_pgid().ps())% shard_list.size());
+
+        ShardData* sdata = shard_list[shard_index];
+        if (NULL != sdata) {
+          unsigned priority = item.second->get_req()->get_priority();
+          unsigned cost = item.second->get_req()->get_cost();
+          sdata->sdata_lock.Lock();
+          if (priority >= CEPH_MSG_PRIO_LOW)
+            sdata->pqueue.enqueue_strict_front(
+              item.second->get_req()->get_source_inst(),priority, item);
+          else
+            sdata->pqueue.enqueue_front(item.second->get_req()->get_source_inst(),
+              priority, cost, item);
+
+          sdata->sdata_cond.SignalOne();
+          sdata->sdata_lock.Unlock();
+        } else {
+          assert(0);
+        }
+      }
+
+
+      void dump(Formatter *f) {
+        for(uint32_t i = 0; i < num_shards; i++) {
+          ShardData* sdata = shard_list[i];
+          if (NULL != sdata) {
+            sdata->sdata_lock.Lock();
+            sdata->pqueue.dump(f);
+            sdata->sdata_lock.Unlock();
+          }
+        }
+      }
+
+      struct Pred {
+        PG *pg;
+        Pred(PG *pg) : pg(pg) {}
+        bool operator()(const pair<PGRef, OpRequestRef> &op) {
+          return op.first == pg;
+        }
+      };
+
+      void dequeue(PG *pg, list<OpRequestRef> *dequeued = 0) {
+        ShardData* sdata = NULL;
+        if (pg) {
+          uint32_t shard_index = pg->get_pgid().ps()% shard_list.size();
+          sdata = shard_list[shard_index];
+          if (!sdata) {
+            assert(0);
+          }
+        } else {
+          assert(0);
+        }
+
+        if (!dequeued) {
+          sdata->sdata_lock.Lock();
+          sdata->pqueue.remove_by_filter(Pred(pg));
+          sdata->sdata_lock.Unlock();
+        } else {
+          list<pair<PGRef, OpRequestRef> > _dequeued;
+          sdata->sdata_lock.Lock();
+          sdata->pqueue.remove_by_filter(Pred(pg), &_dequeued);
+          sdata->sdata_lock.Unlock();
+          for (list<pair<PGRef, OpRequestRef> >::iterator i = _dequeued.begin();
+            i != _dequeued.end(); ++i) {
+            dequeued->push_back(i->second);
+          }
+        }
+
+      }
+
+      bool _empty(uint32_t shard_index) {
+        ShardData* sdata = shard_list[shard_index];
+        if (NULL != sdata) {
+          sdata->sdata_lock.Lock();
+          bool is_empty = sdata->pqueue.empty();
+          sdata->sdata_lock.Unlock();
+          return is_empty;
+        }
+        return true;
+
+      }
+
+  } op_shardedwq;
+
 
   void enqueue_op(PG *pg, OpRequestRef op);
   void dequeue_op(