]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
AsyncMessenger: Bind async thread to special cpu core
authorHaomai Wang <haomaiwang@gmail.com>
Sun, 7 Dec 2014 15:00:06 +0000 (23:00 +0800)
committerHaomai Wang <haomaiwang@gmail.com>
Thu, 15 Jan 2015 19:07:09 +0000 (03:07 +0800)
Now, 2-4 async op thread can fully meet a OSD's network demand with SSD
backend. So we can bind limited thread to special cores, it can improve
async event loop performance because most of structure and method will
processed within thread.

For example,

ms_async_op_threads = 2
ms_async_affinity_cores = 0,3

Signed-off-by: Haomai Wang <haomaiwang@gmail.com>
configure.ac
src/common/config_opts.h
src/msg/async/AsyncMessenger.cc
src/msg/async/AsyncMessenger.h

index 1b09d115f8170045d4ec2adfa6d3997896164319..531bb0f295ba305cdb295146af7ce4b1772555e5 100644 (file)
@@ -842,6 +842,24 @@ AC_DEFINE([HAVE_FDATASYNC], 1, [Define to 1 if you have fdatasync.])
 AC_MSG_RESULT([no])
 ])
 
+AC_MSG_CHECKING([for sched.h])
+AC_COMPILE_IFELSE([AC_LANG_PROGRAM([[
+#define _GNU_SOURCE
+#include <sched.h>
+]], [[
+cpu_set_t cpuset;
+CPU_ZERO(&cpuset);
+CPU_SET(sched_getcpu(), &cpuset);
+sched_setaffinity(0, sizeof(cpuset), &cpuset);
+sched_yield();
+return 0;
+]])], [
+AC_MSG_RESULT([yes])
+AC_DEFINE([HAVE_SCHED], 1, [Define to 1 if you have sched.h.])
+], [
+AC_MSG_RESULT([no])
+])
+
 #
 # Check for pthread spinlock (depends on ACX_PTHREAD)
 #
index 08739c417205ed546478ce9183a321b229da59ed..4e857b08c0a07f757124ea91c4db5637a7623176 100644 (file)
@@ -145,6 +145,13 @@ OPTION(ms_inject_internal_delays, OPT_DOUBLE, 0)   // seconds
 OPTION(ms_dump_on_send, OPT_BOOL, false)           // hexdump msg to log on send
 OPTION(ms_dump_corrupt_message_level, OPT_INT, 1)  // debug level to hexdump undecodeable messages at
 OPTION(ms_async_op_threads, OPT_INT, 2)
+OPTION(ms_async_set_affinity, OPT_BOOL, true)
+// example: ms_async_affinity_cores = 0,1
+// The number of coreset is expected to equal to ms_async_op_threads, otherwise
+// extra op threads will loop ms_async_affinity_cores again.
+// If ms_async_affinity_cores is empty, all threads will be bind to current running
+// core
+OPTION(ms_async_affinity_cores, OPT_STR, "")
 
 OPTION(inject_early_sigterm, OPT_BOOL, false)
 
index ab6320da117a46654bc8af37bcffd64a5bd66769..670b980a271905bff9abacc7c6646ed15c71a5fe 100644 (file)
  *
  */
 
+#include "acconfig.h"
+
 #include <errno.h>
 #include <iostream>
 #include <fstream>
 #include <poll.h>
+#ifdef HAVE_SCHED
+#include <sched.h>
+#endif
 
 #include "AsyncMessenger.h"
 
+#include "include/str_list.h"
+#include "common/strtol.h"
 #include "common/config.h"
 #include "common/Timer.h"
 #include "common/errno.h"
@@ -39,7 +46,11 @@ static ostream& _prefix(std::ostream *_dout, Processor *p) {
 }
 
 static ostream& _prefix(std::ostream *_dout, Worker *w) {
-  return *_dout << "--";
+  return *_dout << " Worker -- ";
+}
+
+static ostream& _prefix(std::ostream *_dout, WorkerPool *p) {
+  return *_dout << " WorkerPool -- ";
 }
 
 
@@ -291,6 +302,29 @@ void Worker::stop()
 void *Worker::entry()
 {
   ldout(cct, 10) << __func__ << " starting" << dendl;
+  if (cct->_conf->ms_async_set_affinity) {
+#ifdef HAVE_SCHED
+    int cpuid;
+    cpu_set_t cpuset;
+    CPU_ZERO(&cpuset);
+
+    cpuid = pool->get_cpuid(id);
+    if (cpuid < 0) {
+      cpuid = sched_getcpu();
+    }
+
+    if (cpuid < CPU_SETSIZE) {
+      CPU_SET(cpuid, &cpuset);
+
+      if (sched_setaffinity(0, sizeof(cpuset), &cpuset) < 0) {
+        ldout(cct, 0) << __func__ << " sched_setaffinity failed: "
+                      << cpp_strerror(errno) << dendl;
+      }
+      /* guaranteed to take effect immediately */
+      sched_yield();
+    }
+#endif
+  }
 
   center.set_owner(pthread_self());
   while (!done) {
@@ -307,7 +341,6 @@ void *Worker::entry()
   return 0;
 }
 
-
 /*******************
  * WorkerPool
  *******************/
@@ -315,10 +348,22 @@ const string WorkerPool::name = "AsyncMessenger::WorkerPool";
 
 WorkerPool::WorkerPool(CephContext *c): cct(c), seq(0), started(false)
 {
+  assert(cct->_conf->ms_async_op_threads > 0);
   for (int i = 0; i < cct->_conf->ms_async_op_threads; ++i) {
-    Worker *w = new Worker(cct);
+    Worker *w = new Worker(cct, this, i);
     workers.push_back(w);
   }
+  vector<string> corestrs;
+  get_str_vec(cct->_conf->ms_async_affinity_cores, corestrs);
+  for (vector<string>::iterator it = corestrs.begin();
+       it != corestrs.end(); ++it) {
+    string err;
+    int coreid = strict_strtol(it->c_str(), 10, &err);
+    if (err == "")
+      coreids.push_back(coreid);
+    else
+      lderr(cct) << __func__ << " failed to parse " << *it << " in " << cct->_conf->ms_async_affinity_cores << dendl;
+  }
 }
 
 WorkerPool::~WorkerPool()
index d9112771096822d486d739e827db7f9acb77874a..91004fca74175a01970c6f24f601bc4616c7e7c3 100644 (file)
@@ -61,26 +61,31 @@ class Processor : public Thread {
   void accept();
 };
 
+class WorkerPool;
+
 class Worker : public Thread {
   CephContext *cct;
+  WorkerPool *pool;
   bool done;
+  int id;
 
  public:
   EventCenter center;
-  Worker(CephContext *c): cct(c), done(false), center(c) {
+  Worker(CephContext *c, WorkerPool *p, int i)
+    : cct(c), pool(p), done(false), id(i), center(c) {
     center.init(5000);
   }
   void *entry();
   void stop();
 };
 
-
 class WorkerPool: CephContext::AssociatedSingletonObject {
   WorkerPool(const WorkerPool &);
   WorkerPool& operator=(const WorkerPool &);
   CephContext *cct;
   uint64_t seq;
   vector<Worker*> workers;
+  vector<int> coreids;
   // Used to indicate whether thread started
   bool started;
 
@@ -91,6 +96,11 @@ class WorkerPool: CephContext::AssociatedSingletonObject {
   Worker *get_worker() {
     return workers[(seq++)%workers.size()];
   }
+  int get_cpuid(int id) {
+    if (coreids.empty())
+      return -1;
+    return coreids[id % coreids.size()];
+  }
   // uniq name for CephContext to distinguish differnt object
   static const string name;
 };