]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
msg/async: Implement smarter worker thread selection 7843/head
authorPiotr Dałek <piotr.dalek@ts.fujitsu.com>
Fri, 26 Feb 2016 12:54:20 +0000 (13:54 +0100)
committerPiotr Dałek <git@predictor.org.pl>
Sun, 15 May 2016 19:42:32 +0000 (21:42 +0200)
This changeset makes AsyncMessenger a bit smarter when it comes
to assigning worker threads to AsyncConnections. Each time a worker
is assigned, its reference count is increased. Next time when Async
Messenger needs to assign another worker to new AsyncConnection, it
picks the one with the lowest reference count. If it cannot find an
idle one, and number of currently instantiated workers is less than
specified with "ms async op max threads", the new worker is created
and returned.
Once AsyncConnection goes away, the reference count on assigned
worker is decreased.
This does not prevent, but greatly reduces chances of having a single
async worker thread doing most (or even all) of the ops, and also
removes the need to manually tune the "ms async op threads" option.

Signed-off-by: Piotr Dałek <git@predictor.org.pl>
src/common/config_opts.h
src/msg/async/AsyncConnection.cc
src/msg/async/AsyncConnection.h
src/msg/async/AsyncMessenger.cc
src/msg/async/AsyncMessenger.h

index e018f1e3ebc34697655d15893ee82a2a14ecc757..2f67509f9ee0a9a7a18dac3937b43adf5cf2a3ce 100644 (file)
@@ -196,7 +196,8 @@ OPTION(ms_inject_delay_probability, OPT_DOUBLE, 0) // range [0, 1]
 OPTION(ms_inject_internal_delays, OPT_DOUBLE, 0)   // seconds
 OPTION(ms_dump_on_send, OPT_BOOL, false)           // hexdump msg to log on send
 OPTION(ms_dump_corrupt_message_level, OPT_INT, 1)  // debug level to hexdump undecodeable messages at
-OPTION(ms_async_op_threads, OPT_INT, 3)
+OPTION(ms_async_op_threads, OPT_INT, 3)            // number of worker processing threads for async messenger created on init
+OPTION(ms_async_max_op_threads, OPT_INT, 5)        // max number of worker processing threads for async messenger
 OPTION(ms_async_set_affinity, OPT_BOOL, true)
 // example: ms_async_affinity_cores = 0,1
 // The number of coreset is expected to equal to ms_async_op_threads, otherwise
index 0d7ca05d5d29be4031beac201550ba25b36fc023..18430066d3ff31ed0a01f851bdb6a569e854bd20 100644 (file)
@@ -2493,6 +2493,12 @@ void AsyncConnection::mark_down()
   _stop();
 }
 
+void AsyncConnection::release_worker()
+{
+  if (msgr)
+    reinterpret_cast<AsyncMessenger*>(msgr)->release_worker(center);
+}
+
 void AsyncConnection::_send_keepalive_or_ack(bool ack, utime_t *tp)
 {
   assert(write_lock.is_locked());
index 23577db36b4b9aaec51545261f0f919b741695a5..416bccba42a6318a74fcf6746eb07c7e6bbbbcd1 100644 (file)
@@ -196,6 +196,8 @@ class AsyncConnection : public Connection {
     Mutex::Locker l(lock);
     policy.lossy = true;
   }
+  
+  void release_worker();
 
  private:
   enum {
index c12ee8e21bb27bac9061ced49a305a67cfaa487d..8ae2cffb81a6a43974a62d1098fbf0a3164f3b49 100644 (file)
@@ -307,11 +307,14 @@ void *Worker::entry()
  *******************/
 const string WorkerPool::name = "AsyncMessenger::WorkerPool";
 
-WorkerPool::WorkerPool(CephContext *c): cct(c), seq(0), started(false),
+WorkerPool::WorkerPool(CephContext *c): cct(c), started(false),
                                         barrier_lock("WorkerPool::WorkerPool::barrier_lock"),
                                         barrier_count(0)
 {
   assert(cct->_conf->ms_async_op_threads > 0);
+  // make sure user won't try to force some crazy number of worker threads
+  assert(cct->_conf->ms_async_max_op_threads >= cct->_conf->ms_async_op_threads && 
+         cct->_conf->ms_async_op_threads <= 32);
   for (int i = 0; i < cct->_conf->ms_async_op_threads; ++i) {
     Worker *w = new Worker(cct, this, i);
     workers.push_back(w);
@@ -351,6 +354,70 @@ void WorkerPool::start()
   }
 }
 
+Worker* WorkerPool::get_worker()
+{
+  ldout(cct, 10) << __func__ << dendl;
+
+   // start with some reasonably large number
+  unsigned min_load = std::numeric_limits<int>::max();
+  Worker* current_best = nullptr;
+
+  simple_spin_lock(&pool_spin);
+  // find worker with least references
+  // tempting case is returning on references == 0, but in reality
+  // this will happen so rarely that there's no need for special case.
+  for (auto p = workers.begin(); p != workers.end(); ++p) {
+    unsigned worker_load = (*p)->references.load();
+    ldout(cct, 20) << __func__ << " Worker " << *p << " load: " << worker_load << dendl;
+    if (worker_load < min_load) {
+      current_best = *p;
+      min_load = worker_load;
+    }
+  }
+
+  // if minimum load exceeds amount of workers, make a new worker
+  // logic behind this is that we're not going to create new worker
+  // just because others have *some* load, we'll defer worker creation
+  // until others have *plenty* of load. This will cause new worker
+  // to get assigned to all new connections *unless* one or more
+  // of workers get their load reduced - in that case, this worker
+  // will be assigned to new connection.
+  // TODO: add more logic and heuristics, so connections known to be
+  // of light workload (heartbeat service, etc.) won't overshadow
+  // heavy workload (clients, etc).
+  if (!current_best || ((workers.size() < (unsigned)cct->_conf->ms_async_max_op_threads)
+      && (min_load > workers.size()))) {
+     ldout(cct, 20) << __func__ << " creating worker" << dendl;
+     current_best = new Worker(cct, this, workers.size());
+     workers.push_back(current_best);
+     current_best->create("ms_async_worker");
+  } else {
+    ldout(cct, 20) << __func__ << " picked " << current_best 
+                   << " as best worker with load " << min_load << dendl;
+  }
+
+  ++current_best->references;
+  simple_spin_unlock(&pool_spin);
+
+  assert(current_best);
+  return current_best;
+}
+
+void WorkerPool::release_worker(EventCenter* c)
+{
+  ldout(cct, 10) << __func__ << dendl;
+  simple_spin_lock(&pool_spin);
+  for (auto p = workers.begin(); p != workers.end(); ++p) {
+    if (&((*p)->center) == c) {
+      ldout(cct, 10) << __func__ << " found worker, releasing" << dendl;
+      int oldref = (*p)->references.fetch_sub(1);
+      assert(oldref > 0);
+      break;
+    }
+  }
+  simple_spin_unlock(&pool_spin);
+}
+
 void WorkerPool::barrier()
 {
   ldout(cct, 10) << __func__ << " started." << dendl;
index 3c7aa0aa25e781287a23c326404b548465183ff2..52d93d7cd1e8f87b0ca85268e9ae6c1861ea43c4 100644 (file)
@@ -36,6 +36,7 @@ using namespace std;
 #include "include/assert.h"
 #include "AsyncConnection.h"
 #include "Event.h"
+#include "common/simple_spin.h"
 
 
 class AsyncMessenger;
@@ -65,8 +66,9 @@ class Worker : public Thread {
 
  public:
   EventCenter center;
+  std::atomic_uint references;
   Worker(CephContext *c, WorkerPool *p, int i)
-    : cct(c), pool(p), done(false), id(i), perf_logger(NULL), center(c) {
+    : cct(c), pool(p), done(false), id(i), perf_logger(NULL), center(c), references(0) {
     center.init(InitEventNumber);
     char name[128];
     sprintf(name, "AsyncMessenger::Worker-%d", id);
@@ -133,7 +135,6 @@ class WorkerPool {
   WorkerPool(const WorkerPool &);
   WorkerPool& operator=(const WorkerPool &);
   CephContext *cct;
-  uint64_t seq;
   vector<Worker*> workers;
   vector<int> coreids;
   // Used to indicate whether thread started
@@ -141,6 +142,7 @@ class WorkerPool {
   Mutex barrier_lock;
   Cond barrier_cond;
   atomic_t barrier_count;
+  simple_spinlock_t pool_spin = SIMPLE_SPINLOCK_INITIALIZER;
 
   class C_barrier : public EventCallback {
     WorkerPool *pool;
@@ -158,9 +160,8 @@ class WorkerPool {
   explicit WorkerPool(CephContext *c);
   virtual ~WorkerPool();
   void start();
-  Worker *get_worker() {
-    return workers[(seq++)%workers.size()];
-  }
+  Worker *get_worker();
+  void release_worker(EventCenter* c);
   int get_cpuid(int id) {
     if (coreids.empty())
       return -1;
@@ -525,6 +526,7 @@ public:
    */
   void unregister_conn(AsyncConnectionRef conn) {
     Mutex::Locker l(deleted_lock);
+    conn->release_worker();
     deleted_conns.insert(conn);
 
     if (deleted_conns.size() >= ReapDeadConnectionThreshold) {
@@ -540,6 +542,10 @@ public:
    * See "deleted_conns"
    */
   int reap_dead();
+  
+  void release_worker(EventCenter* c) {
+    pool->release_worker(c);
+  }
 
   /**
    * @} // AsyncMessenger Internals