]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
osd: Add runtime config option to select which queue to use and the
authorRobert LeBlanc <robert.leblanc@endurance.com>
Tue, 26 Jan 2016 00:45:24 +0000 (00:45 +0000)
committerRobert LeBlanc <robert@leblancnet.us>
Thu, 28 Jan 2016 21:46:46 +0000 (21:46 +0000)
priority of the cutoff between the strict queue and the normal queue.

osd_op_queue takes prio, wpq, or debug_random to select PrioritizedQueue,
WeightedPriorityQueue or randomly select one for testing.
Default is PrioritizedQueue.

osd_op_queue_cut_off takes low, high, or debug_random to select ops with
priority greater than or equal to 64 (low) or 196 (high) to be queued in
the strict queue rather than the normal queue. It is advantageous to use
the high option so that repops are queued proportionally with regular ops
to help prevent I/O starvation when an OSD has many non-primary PGs. This
allows clients who accesses a primary PG on this busy OSD to not be
indefinatly blocked by OSD sending all repops to the front of the line.
Default is low.

The defaults preserve the original operation.

Signed-Off-By: Robert LeBlanc <robert.leblanc@endurance.com>
src/common/config_opts.h
src/osd/OSD.cc
src/osd/OSD.h

index f7d6fd0d2873eb622d8ab0347649d14b6642eacc..7e1bf405d54850b231f1ab42bd462a6ffd7c86de 100644 (file)
@@ -624,6 +624,8 @@ OPTION(osd_recovery_threads, OPT_INT, 1)
 OPTION(osd_recover_clone_overlap, OPT_BOOL, true)   // preserve clone_overlap during recovery/migration
 OPTION(osd_op_num_threads_per_shard, OPT_INT, 2)
 OPTION(osd_op_num_shards, OPT_INT, 5)
+OPTION(osd_op_queue, OPT_STR, "prio") // PrioritzedQueue (prio), Weighted Priority Queue (wpq), or debug_random
+OPTION(osd_op_queue_cut_off, OPT_STR, "low") // Min priority to go to strict queue. (low, high, debug_random)
 
 // Set to true for testing.  Users should NOT set this.
 // If set to true even after reading enough shards to
index 1d0ca9c57f21e0dcaa17f6742506227b359ef3f7..db8386f5baa609e59f5e68a59a5797432414e98e 100644 (file)
@@ -1583,6 +1583,8 @@ OSD::OSD(CephContext *cct_, ObjectStore *store_,
   op_tracker(cct, cct->_conf->osd_enable_op_tracker, 
                   cct->_conf->osd_num_op_tracker_shard),
   test_ops_hook(NULL),
+  op_queue(get_io_queue()),
+  op_prio_cutoff(get_io_prio_cut()),
   op_shardedwq(
     cct->_conf->osd_op_num_shards,
     this,
@@ -1939,6 +1941,8 @@ int OSD::init()
   load_pgs();
 
   dout(2) << "superblock: i am osd." << superblock.whoami << dendl;
+  dout(0) << "using " << op_queue << " op queue with priority op cut off at " <<
+    op_prio_cutoff << "." << dendl;
 
   create_logger();
 
@@ -8277,19 +8281,19 @@ void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb )
   ShardData* sdata = shard_list[shard_index];
   assert(NULL != sdata);
   sdata->sdata_op_ordering_lock.Lock();
-  if (sdata->pqueue.empty()) {
+  if (sdata->pqueue->empty()) {
     sdata->sdata_op_ordering_lock.Unlock();
     osd->cct->get_heartbeat_map()->reset_timeout(hb, 4, 0);
     sdata->sdata_lock.Lock();
     sdata->sdata_cond.WaitInterval(osd->cct, sdata->sdata_lock, utime_t(2, 0));
     sdata->sdata_lock.Unlock();
     sdata->sdata_op_ordering_lock.Lock();
-    if(sdata->pqueue.empty()) {
+    if(sdata->pqueue->empty()) {
       sdata->sdata_op_ordering_lock.Unlock();
       return;
     }
   }
-  pair<PGRef, PGQueueable> item = sdata->pqueue.dequeue();
+  pair<PGRef, PGQueueable> item = sdata->pqueue->dequeue();
   sdata->pg_for_processing[&*(item.first)].push_back(item.second);
   sdata->sdata_op_ordering_lock.Unlock();
   ThreadPool::TPHandle tp_handle(osd->cct, hb, timeout_interval, 
@@ -8359,11 +8363,11 @@ void OSD::ShardedOpWQ::_enqueue(pair<PGRef, PGQueueable> item) {
   unsigned cost = item.second.get_cost();
   sdata->sdata_op_ordering_lock.Lock();
  
-  if (priority >= CEPH_MSG_PRIO_LOW)
-    sdata->pqueue.enqueue_strict(
+  if (priority >= osd->op_prio_cutoff)
+    sdata->pqueue->enqueue_strict(
       item.second.get_owner(), priority, item);
   else
-    sdata->pqueue.enqueue(
+    sdata->pqueue->enqueue(
       item.second.get_owner(),
       priority, cost, item);
   sdata->sdata_op_ordering_lock.Unlock();
@@ -8388,12 +8392,12 @@ void OSD::ShardedOpWQ::_enqueue_front(pair<PGRef, PGQueueable> item) {
   }
   unsigned priority = item.second.get_priority();
   unsigned cost = item.second.get_cost();
-  if (priority >= CEPH_MSG_PRIO_LOW)
-    sdata->pqueue.enqueue_strict_front(
+  if (priority >= osd->op_prio_cutoff)
+    sdata->pqueue->enqueue_strict_front(
       item.second.get_owner(),
       priority, item);
   else
-    sdata->pqueue.enqueue_front(
+    sdata->pqueue->enqueue_front(
       item.second.get_owner(),
       priority, cost, item);
 
index 367d236c059b4c47cd50b2a8b36f7657385d3e64..556c931c526fdf025ccfde8f7d274397176be068 100644 (file)
@@ -52,7 +52,9 @@ using namespace std;
 #include "common/shared_cache.hpp"
 #include "common/simple_cache.hpp"
 #include "common/sharedptr_registry.hpp"
+#include "common/WeightedPriorityQueue.h"
 #include "common/PrioritizedQueue.h"
+#include "common/OpQueue.h"
 #include "messages/MOSDOp.h"
 #include "include/Spinlock.h"
 
@@ -1617,6 +1619,11 @@ private:
   friend struct C_CompleteSplits;
 
   // -- op queue --
+  enum io_queue {
+    prioritized,
+    weightedpriority};
+  const io_queue op_queue;
+  const unsigned int op_prio_cutoff;
 
   friend class PGQueueable;
   class ShardedOpWQ: public ShardedThreadPool::ShardedWQ < pair <PGRef, PGQueueable> > {
@@ -1626,13 +1633,25 @@ private:
       Cond sdata_cond;
       Mutex sdata_op_ordering_lock;
       map<PG*, list<PGQueueable> > pg_for_processing;
-      PrioritizedQueue< pair<PGRef, PGQueueable>, entity_inst_t> pqueue;
+      std::unique_ptr<OpQueue< pair<PGRef, PGQueueable>, entity_inst_t>> pqueue;
       ShardData(
        string lock_name, string ordering_lock,
-       uint64_t max_tok_per_prio, uint64_t min_cost, CephContext *cct)
+       uint64_t max_tok_per_prio, uint64_t min_cost, CephContext *cct,
+       io_queue opqueue)
        : sdata_lock(lock_name.c_str(), false, true, false, cct),
-         sdata_op_ordering_lock(ordering_lock.c_str(), false, true, false, cct),
-         pqueue(max_tok_per_prio, min_cost) {}
+         sdata_op_ordering_lock(ordering_lock.c_str(), false, true, false, cct) {
+           if (opqueue == weightedpriority) {
+             pqueue = std::unique_ptr
+               <WeightedPriorityQueue< pair<PGRef, PGQueueable>, entity_inst_t>>(
+                 new WeightedPriorityQueue< pair<PGRef, PGQueueable>, entity_inst_t>(
+                   max_tok_per_prio, min_cost));
+           } else if (opqueue == prioritized) {
+             pqueue = std::unique_ptr
+               <PrioritizedQueue< pair<PGRef, PGQueueable>, entity_inst_t>>(
+                 new PrioritizedQueue< pair<PGRef, PGQueueable>, entity_inst_t>(
+                   max_tok_per_prio, min_cost));
+           }
+         }
     };
     
     vector<ShardData*> shard_list;
@@ -1653,7 +1672,7 @@ private:
        ShardData* one_shard = new ShardData(
          lock_name, order_lock,
          osd->cct->_conf->osd_op_pq_max_tokens_per_priority, 
-         osd->cct->_conf->osd_op_pq_min_cost, osd->cct);
+         osd->cct->_conf->osd_op_pq_min_cost, osd->cct, osd->op_queue);
        shard_list.push_back(one_shard);
       }
     }
@@ -1687,7 +1706,7 @@ private:
        assert (NULL != sdata);
        sdata->sdata_op_ordering_lock.Lock();
        f->open_object_section(lock_name);
-       sdata->pqueue.dump(f);
+       sdata->pqueue->dump(f);
        f->close_section();
        sdata->sdata_op_ordering_lock.Unlock();
       }
@@ -1708,7 +1727,7 @@ private:
       sdata = shard_list[shard_index];
       assert(sdata != NULL);
       sdata->sdata_op_ordering_lock.Lock();
-      sdata->pqueue.remove_by_filter(Pred(pg));
+      sdata->pqueue->remove_by_filter(Pred(pg), 0);
       sdata->pg_for_processing.erase(pg);
       sdata->sdata_op_ordering_lock.Unlock();
     }
@@ -1722,7 +1741,7 @@ private:
       assert(dequeued);
       list<pair<PGRef, PGQueueable> > _dequeued;
       sdata->sdata_op_ordering_lock.Lock();
-      sdata->pqueue.remove_by_filter(Pred(pg), &_dequeued);
+      sdata->pqueue->remove_by_filter(Pred(pg), &_dequeued);
       for (list<pair<PGRef, PGQueueable> >::iterator i = _dequeued.begin();
           i != _dequeued.end(); ++i) {
        boost::optional<OpRequestRef> mop = i->second.maybe_get_op();
@@ -1749,7 +1768,7 @@ private:
       ShardData* sdata = shard_list[shard_index];
       assert(NULL != sdata);
       Mutex::Locker l(sdata->sdata_op_ordering_lock);
-      return sdata->pqueue.empty();
+      return sdata->pqueue->empty();
     }
   } op_shardedwq;
 
@@ -2310,6 +2329,28 @@ protected:
   bool ms_handle_reset(Connection *con);
   void ms_handle_remote_reset(Connection *con) {}
 
+  io_queue get_io_queue() const {
+    if (cct->_conf->osd_op_queue == "debug_random") {
+      srand(time(NULL));
+      return (rand() % 2 < 1) ? prioritized : weightedpriority;
+    } else if (cct->_conf->osd_op_queue == "wpq") {
+      return weightedpriority;
+    } else {
+      return prioritized;
+    }
+  }
+
+  unsigned int get_io_prio_cut() const {
+    if (cct->_conf->osd_op_queue_cut_off == "debug_random") {
+      srand(time(NULL));
+      return (rand() % 2 < 1) ? CEPH_MSG_PRIO_HIGH : CEPH_MSG_PRIO_LOW;
+    } else if (cct->_conf->osd_op_queue_cut_off == "low") {
+      return CEPH_MSG_PRIO_LOW;
+    } else {
+      return CEPH_MSG_PRIO_HIGH;
+    }
+  }
+
  public:
   /* internal and external can point to the same messenger, they will still
    * be cleaned up properly*/