AC_MSG_RESULT([no])
])
+AC_MSG_CHECKING([for sched.h])
+AC_COMPILE_IFELSE([AC_LANG_PROGRAM([[
+#define _GNU_SOURCE
+#include <sched.h>
+]], [[
+cpu_set_t cpuset;
+CPU_ZERO(&cpuset);
+CPU_SET(sched_getcpu(), &cpuset);
+sched_setaffinity(0, sizeof(cpuset), &cpuset);
+sched_yield();
+return 0;
+]])], [
+AC_MSG_RESULT([yes])
+AC_DEFINE([HAVE_SCHED], 1, [Define to 1 if you have sched.h.])
+], [
+AC_MSG_RESULT([no])
+])
+
#
# Check for pthread spinlock (depends on ACX_PTHREAD)
#
OPTION(ms_dump_on_send, OPT_BOOL, false) // hexdump msg to log on send
OPTION(ms_dump_corrupt_message_level, OPT_INT, 1) // debug level to hexdump undecodeable messages at
OPTION(ms_async_op_threads, OPT_INT, 2)
+OPTION(ms_async_set_affinity, OPT_BOOL, true)
+// example: ms_async_affinity_cores = 0,1
+// The number of coreset is expected to equal to ms_async_op_threads, otherwise
+// extra op threads will loop ms_async_affinity_cores again.
+// If ms_async_affinity_cores is empty, all threads will be bind to current running
+// core
+OPTION(ms_async_affinity_cores, OPT_STR, "")
OPTION(inject_early_sigterm, OPT_BOOL, false)
*
*/
+#include "acconfig.h"
+
#include <errno.h>
#include <iostream>
#include <fstream>
#include <poll.h>
+#ifdef HAVE_SCHED
+#include <sched.h>
+#endif
#include "AsyncMessenger.h"
+#include "include/str_list.h"
+#include "common/strtol.h"
#include "common/config.h"
#include "common/Timer.h"
#include "common/errno.h"
}
static ostream& _prefix(std::ostream *_dout, Worker *w) {
- return *_dout << "--";
+ return *_dout << " Worker -- ";
+}
+
+static ostream& _prefix(std::ostream *_dout, WorkerPool *p) {
+ return *_dout << " WorkerPool -- ";
}
void *Worker::entry()
{
ldout(cct, 10) << __func__ << " starting" << dendl;
+ if (cct->_conf->ms_async_set_affinity) {
+#ifdef HAVE_SCHED
+ int cpuid;
+ cpu_set_t cpuset;
+ CPU_ZERO(&cpuset);
+
+ cpuid = pool->get_cpuid(id);
+ if (cpuid < 0) {
+ cpuid = sched_getcpu();
+ }
+
+ if (cpuid < CPU_SETSIZE) {
+ CPU_SET(cpuid, &cpuset);
+
+ if (sched_setaffinity(0, sizeof(cpuset), &cpuset) < 0) {
+ ldout(cct, 0) << __func__ << " sched_setaffinity failed: "
+ << cpp_strerror(errno) << dendl;
+ }
+ /* guaranteed to take effect immediately */
+ sched_yield();
+ }
+#endif
+ }
center.set_owner(pthread_self());
while (!done) {
return 0;
}
-
/*******************
* WorkerPool
*******************/
WorkerPool::WorkerPool(CephContext *c): cct(c), seq(0), started(false)
{
+ assert(cct->_conf->ms_async_op_threads > 0);
for (int i = 0; i < cct->_conf->ms_async_op_threads; ++i) {
- Worker *w = new Worker(cct);
+ Worker *w = new Worker(cct, this, i);
workers.push_back(w);
}
+ vector<string> corestrs;
+ get_str_vec(cct->_conf->ms_async_affinity_cores, corestrs);
+ for (vector<string>::iterator it = corestrs.begin();
+ it != corestrs.end(); ++it) {
+ string err;
+ int coreid = strict_strtol(it->c_str(), 10, &err);
+ if (err == "")
+ coreids.push_back(coreid);
+ else
+ lderr(cct) << __func__ << " failed to parse " << *it << " in " << cct->_conf->ms_async_affinity_cores << dendl;
+ }
}
WorkerPool::~WorkerPool()
void accept();
};
+class WorkerPool;
+
class Worker : public Thread {
CephContext *cct;
+ WorkerPool *pool;
bool done;
+ int id;
public:
EventCenter center;
- Worker(CephContext *c): cct(c), done(false), center(c) {
+ Worker(CephContext *c, WorkerPool *p, int i)
+ : cct(c), pool(p), done(false), id(i), center(c) {
center.init(5000);
}
void *entry();
void stop();
};
-
class WorkerPool: CephContext::AssociatedSingletonObject {
WorkerPool(const WorkerPool &);
WorkerPool& operator=(const WorkerPool &);
CephContext *cct;
uint64_t seq;
vector<Worker*> workers;
+ vector<int> coreids;
// Used to indicate whether thread started
bool started;
Worker *get_worker() {
return workers[(seq++)%workers.size()];
}
+ int get_cpuid(int id) {
+ if (coreids.empty())
+ return -1;
+ return coreids[id % coreids.size()];
+ }
// uniq name for CephContext to distinguish differnt object
static const string name;
};