From: Haomai Wang Date: Sun, 7 Dec 2014 15:00:06 +0000 (+0800) Subject: AsyncMessenger: Bind async thread to special cpu core X-Git-Tag: v0.93~247^2~27 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=f4fcff16b6f0013ee9d759bb75eaa8140916f8a5;p=ceph.git AsyncMessenger: Bind async thread to special cpu core Now, 2-4 async op thread can fully meet a OSD's network demand with SSD backend. So we can bind limited thread to special cores, it can improve async event loop performance because most of structure and method will processed within thread. For example, ms_async_op_threads = 2 ms_async_affinity_cores = 0,3 Signed-off-by: Haomai Wang --- diff --git a/configure.ac b/configure.ac index 1b09d115f817..531bb0f295ba 100644 --- a/configure.ac +++ b/configure.ac @@ -842,6 +842,24 @@ AC_DEFINE([HAVE_FDATASYNC], 1, [Define to 1 if you have fdatasync.]) AC_MSG_RESULT([no]) ]) +AC_MSG_CHECKING([for sched.h]) +AC_COMPILE_IFELSE([AC_LANG_PROGRAM([[ +#define _GNU_SOURCE +#include +]], [[ +cpu_set_t cpuset; +CPU_ZERO(&cpuset); +CPU_SET(sched_getcpu(), &cpuset); +sched_setaffinity(0, sizeof(cpuset), &cpuset); +sched_yield(); +return 0; +]])], [ +AC_MSG_RESULT([yes]) +AC_DEFINE([HAVE_SCHED], 1, [Define to 1 if you have sched.h.]) +], [ +AC_MSG_RESULT([no]) +]) + # # Check for pthread spinlock (depends on ACX_PTHREAD) # diff --git a/src/common/config_opts.h b/src/common/config_opts.h index 08739c417205..4e857b08c0a0 100644 --- a/src/common/config_opts.h +++ b/src/common/config_opts.h @@ -145,6 +145,13 @@ OPTION(ms_inject_internal_delays, OPT_DOUBLE, 0) // seconds OPTION(ms_dump_on_send, OPT_BOOL, false) // hexdump msg to log on send OPTION(ms_dump_corrupt_message_level, OPT_INT, 1) // debug level to hexdump undecodeable messages at OPTION(ms_async_op_threads, OPT_INT, 2) +OPTION(ms_async_set_affinity, OPT_BOOL, true) +// example: ms_async_affinity_cores = 0,1 +// The number of coreset is expected to equal to ms_async_op_threads, otherwise +// extra op threads will loop ms_async_affinity_cores again. +// If ms_async_affinity_cores is empty, all threads will be bind to current running +// core +OPTION(ms_async_affinity_cores, OPT_STR, "") OPTION(inject_early_sigterm, OPT_BOOL, false) diff --git a/src/msg/async/AsyncMessenger.cc b/src/msg/async/AsyncMessenger.cc index ab6320da117a..670b980a2719 100644 --- a/src/msg/async/AsyncMessenger.cc +++ b/src/msg/async/AsyncMessenger.cc @@ -14,13 +14,20 @@ * */ +#include "acconfig.h" + #include #include #include #include +#ifdef HAVE_SCHED +#include +#endif #include "AsyncMessenger.h" +#include "include/str_list.h" +#include "common/strtol.h" #include "common/config.h" #include "common/Timer.h" #include "common/errno.h" @@ -39,7 +46,11 @@ static ostream& _prefix(std::ostream *_dout, Processor *p) { } static ostream& _prefix(std::ostream *_dout, Worker *w) { - return *_dout << "--"; + return *_dout << " Worker -- "; +} + +static ostream& _prefix(std::ostream *_dout, WorkerPool *p) { + return *_dout << " WorkerPool -- "; } @@ -291,6 +302,29 @@ void Worker::stop() void *Worker::entry() { ldout(cct, 10) << __func__ << " starting" << dendl; + if (cct->_conf->ms_async_set_affinity) { +#ifdef HAVE_SCHED + int cpuid; + cpu_set_t cpuset; + CPU_ZERO(&cpuset); + + cpuid = pool->get_cpuid(id); + if (cpuid < 0) { + cpuid = sched_getcpu(); + } + + if (cpuid < CPU_SETSIZE) { + CPU_SET(cpuid, &cpuset); + + if (sched_setaffinity(0, sizeof(cpuset), &cpuset) < 0) { + ldout(cct, 0) << __func__ << " sched_setaffinity failed: " + << cpp_strerror(errno) << dendl; + } + /* guaranteed to take effect immediately */ + sched_yield(); + } +#endif + } center.set_owner(pthread_self()); while (!done) { @@ -307,7 +341,6 @@ void *Worker::entry() return 0; } - /******************* * WorkerPool *******************/ @@ -315,10 +348,22 @@ const string WorkerPool::name = "AsyncMessenger::WorkerPool"; WorkerPool::WorkerPool(CephContext *c): cct(c), seq(0), started(false) { + assert(cct->_conf->ms_async_op_threads > 0); for (int i = 0; i < cct->_conf->ms_async_op_threads; ++i) { - Worker *w = new Worker(cct); + Worker *w = new Worker(cct, this, i); workers.push_back(w); } + vector corestrs; + get_str_vec(cct->_conf->ms_async_affinity_cores, corestrs); + for (vector::iterator it = corestrs.begin(); + it != corestrs.end(); ++it) { + string err; + int coreid = strict_strtol(it->c_str(), 10, &err); + if (err == "") + coreids.push_back(coreid); + else + lderr(cct) << __func__ << " failed to parse " << *it << " in " << cct->_conf->ms_async_affinity_cores << dendl; + } } WorkerPool::~WorkerPool() diff --git a/src/msg/async/AsyncMessenger.h b/src/msg/async/AsyncMessenger.h index d91127710968..91004fca7417 100644 --- a/src/msg/async/AsyncMessenger.h +++ b/src/msg/async/AsyncMessenger.h @@ -61,26 +61,31 @@ class Processor : public Thread { void accept(); }; +class WorkerPool; + class Worker : public Thread { CephContext *cct; + WorkerPool *pool; bool done; + int id; public: EventCenter center; - Worker(CephContext *c): cct(c), done(false), center(c) { + Worker(CephContext *c, WorkerPool *p, int i) + : cct(c), pool(p), done(false), id(i), center(c) { center.init(5000); } void *entry(); void stop(); }; - class WorkerPool: CephContext::AssociatedSingletonObject { WorkerPool(const WorkerPool &); WorkerPool& operator=(const WorkerPool &); CephContext *cct; uint64_t seq; vector workers; + vector coreids; // Used to indicate whether thread started bool started; @@ -91,6 +96,11 @@ class WorkerPool: CephContext::AssociatedSingletonObject { Worker *get_worker() { return workers[(seq++)%workers.size()]; } + int get_cpuid(int id) { + if (coreids.empty()) + return -1; + return coreids[id % coreids.size()]; + } // uniq name for CephContext to distinguish differnt object static const string name; };