]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
common/PrioritizedQueue: add min cost, max tokens per bucket
authorSage Weil <sage@inktank.com>
Mon, 21 Jan 2013 23:29:28 +0000 (15:29 -0800)
committerSage Weil <sage@inktank.com>
Tue, 22 Jan 2013 22:47:41 +0000 (14:47 -0800)
Two problems.

First, we need to cap the tokens per bucket.  Otherwise, a stream of
items at one priority over time will indefinitely inflate the tokens
available at another priority.  The cap should represent how "bursty"
we allow a given bucket to be.  Start with 4MB for now.

Second, set a floor on the item cost.  Otherwise, we can have an
infinite queue of 0 cost items that start over queues.  More
realistically, we need to balance the overhead of processing small items
with the cost of large items.  I.e., a 4 KB item is not 1/1000th as
expensive as a 4MB item.

Signed-off-by: Sage Weil <sage@inktank.com>
src/common/PrioritizedQueue.h
src/common/config_opts.h
src/msg/DispatchQueue.h
src/osd/OSD.h

index 6ada06601246bbf8e9aee3df09fbd3d0f8a90060..072eaebf2ee8b1db08e2530f664f6b63f47bbbe2 100644 (file)
@@ -45,6 +45,8 @@
 template <typename T, typename K>
 class PrioritizedQueue {
   int64_t total_priority;
+  int64_t max_tokens_per_subqueue;
+  int64_t min_cost;
 
   template <class F>
   static unsigned filter_list_pairs(
@@ -76,22 +78,39 @@ class PrioritizedQueue {
   struct SubQueue {
   private:
     map<K, list<pair<unsigned, T> > > q;
-    unsigned tokens;
+    unsigned tokens, max_tokens;
     int64_t size;
     typename map<K, list<pair<unsigned, T> > >::iterator cur;
   public:
     SubQueue(const SubQueue &other)
-      : q(other.q), tokens(other.tokens), size(other.size),
+      : q(other.q),
+       tokens(other.tokens),
+       max_tokens(other.max_tokens),
+       size(other.size),
        cur(q.begin()) {}
-    SubQueue() : tokens(0), size(0), cur(q.begin()) {}
+    SubQueue()
+      : tokens(0),
+       max_tokens(0),
+       size(0) {}
+    void set_max_tokens(unsigned mt) {
+      max_tokens = mt;
+    }
+    unsigned get_max_tokens() const {
+      return max_tokens;
+    }
     unsigned num_tokens() const {
       return tokens;
     }
     void put_tokens(unsigned t) {
       tokens += t;
+      if (tokens > max_tokens)
+       tokens = max_tokens;
     }
     void take_tokens(unsigned t) {
+      if (tokens > t)
        tokens -= t;
+      else
+       tokens = 0;
     }
     void enqueue(K cl, unsigned cost, T item) {
       q[cl].push_back(make_pair(cost, item));
@@ -175,7 +194,9 @@ class PrioritizedQueue {
     if (p != queue.end())
       return &p->second;
     total_priority += priority;
-    return &queue[priority];
+    SubQueue *sq = &queue[priority];
+    sq->set_max_tokens(max_tokens_per_subqueue);
+    return sq;
   }
 
   void remove_queue(unsigned priority) {
@@ -196,7 +217,11 @@ class PrioritizedQueue {
   }
 
 public:
-  PrioritizedQueue() : total_priority(0) {}
+  PrioritizedQueue(unsigned max_per, unsigned min_c)
+    : total_priority(0),
+      max_tokens_per_subqueue(max_per),
+      min_cost(min_c)
+  {}
 
   unsigned length() {
     unsigned total = 0;
@@ -276,10 +301,14 @@ public:
   }
 
   void enqueue(K cl, unsigned priority, unsigned cost, T item) {
+    if (cost < min_cost)
+      cost = min_cost;
     create_queue(priority)->enqueue(cl, cost, item);
   }
 
   void enqueue_front(K cl, unsigned priority, unsigned cost, T item) {
+    if (cost < min_cost)
+      cost = min_cost;
     create_queue(priority)->enqueue_front(cl, cost, item);
   }
 
@@ -300,6 +329,9 @@ public:
       return ret;
     }
 
+    // if there are multiple buckets/subqueues with sufficient tokens,
+    // we behave like a strict priority queue among all subqueues that
+    // are eligible to run.
     for (typename map<unsigned, SubQueue>::iterator i = queue.begin();
         i != queue.end();
         ++i) {
@@ -315,6 +347,9 @@ public:
        return ret;
       }
     }
+
+    // if no subqueues have sufficient tokens, we behave like a strict
+    // priority queue.
     T ret = queue.rbegin()->second.front().second;
     unsigned cost = queue.rbegin()->second.front().first;
     queue.rbegin()->second.pop_front();
index 32bd41c6ce30d9c16ddf3ece22e946f03d4dd7a1..7eb3e94fadcce137639fa4d0d4533dd09704b32f 100644 (file)
@@ -105,6 +105,8 @@ OPTION(ms_bind_port_min, OPT_INT, 6800)
 OPTION(ms_bind_port_max, OPT_INT, 7100)
 OPTION(ms_rwthread_stack_bytes, OPT_U64, 1024 << 10)
 OPTION(ms_tcp_read_timeout, OPT_U64, 900)
+OPTION(ms_pq_max_tokens_per_priority, OPT_U64, 4194304)
+OPTION(ms_pq_min_cost, OPT_U64, 65536)
 OPTION(ms_inject_socket_failures, OPT_U64, 0)
 OPTION(ms_inject_delay_type, OPT_STR, "")          // "osd mds mon client" allowed
 OPTION(ms_inject_delay_max, OPT_DOUBLE, 1)         // seconds
@@ -315,6 +317,8 @@ OPTION(osd_map_dedup, OPT_BOOL, true)
 OPTION(osd_map_cache_size, OPT_INT, 500)
 OPTION(osd_map_message_max, OPT_INT, 100)  // max maps per MOSDMap message
 OPTION(osd_op_threads, OPT_INT, 2)    // 0 == no threading
+OPTION(osd_op_pq_max_tokens_per_priority, OPT_U64, 4194304)
+OPTION(osd_op_pq_min_cost, OPT_U64, 65536)
 OPTION(osd_disk_threads, OPT_INT, 1)
 OPTION(osd_recovery_threads, OPT_INT, 1)
 OPTION(osd_recover_clone_overlap, OPT_BOOL, true)   // preserve clone_overlap during recovery/migration
index ea44c165d56d680be76e0d8a953c3c9fd975d530..884e0269342628981ba8c6fb64bfcca7d531b307 100644 (file)
@@ -151,6 +151,8 @@ class DispatchQueue {
   DispatchQueue(CephContext *cct, SimpleMessenger *msgr)
     : cct(cct), msgr(msgr),
       lock("SimpleMessenger::DispatchQeueu::lock"), 
+      mqueue(cct->_conf->ms_pq_max_tokens_per_priority,
+            cct->_conf->ms_pq_min_cost),
       next_pipe_id(1),
       dispatch_thread(this),
       stop(false)
index 3d5e51523ea01f209b9789fcd8190f2909bf192a..1af32d738838551e5a9e3c7fce8e7ede7b011f17 100644 (file)
@@ -642,7 +642,10 @@ private:
       : ThreadPool::WorkQueueVal<pair<PGRef, OpRequestRef>, PGRef >(
        "OSD::OpWQ", ti, ti*10, tp),
        qlock("OpWQ::qlock"),
-       osd(o) {}
+       osd(o),
+       pqueue(o->cct->_conf->osd_op_pq_max_tokens_per_priority,
+              o->cct->_conf->osd_op_pq_min_cost)
+    {}
 
     void _enqueue_front(pair<PGRef, OpRequestRef> item);
     void _enqueue(pair<PGRef, OpRequestRef> item);