OPTION(ms_inject_internal_delays, OPT_DOUBLE, 0) // seconds
OPTION(ms_dump_on_send, OPT_BOOL, false) // hexdump msg to log on send
OPTION(ms_dump_corrupt_message_level, OPT_INT, 1) // debug level to hexdump undecodeable messages at
-OPTION(ms_async_op_threads, OPT_INT, 3)
+OPTION(ms_async_op_threads, OPT_INT, 3) // number of worker processing threads for async messenger created on init
+OPTION(ms_async_max_op_threads, OPT_INT, 5) // max number of worker processing threads for async messenger
OPTION(ms_async_set_affinity, OPT_BOOL, true)
// example: ms_async_affinity_cores = 0,1
// The number of coreset is expected to equal to ms_async_op_threads, otherwise
*******************/
const string WorkerPool::name = "AsyncMessenger::WorkerPool";
-WorkerPool::WorkerPool(CephContext *c): cct(c), seq(0), started(false),
+WorkerPool::WorkerPool(CephContext *c): cct(c), started(false),
barrier_lock("WorkerPool::WorkerPool::barrier_lock"),
barrier_count(0)
{
assert(cct->_conf->ms_async_op_threads > 0);
+ // make sure user won't try to force some crazy number of worker threads
+ assert(cct->_conf->ms_async_max_op_threads >= cct->_conf->ms_async_op_threads &&
+ cct->_conf->ms_async_op_threads <= 32);
for (int i = 0; i < cct->_conf->ms_async_op_threads; ++i) {
Worker *w = new Worker(cct, this, i);
workers.push_back(w);
}
}
+Worker* WorkerPool::get_worker()
+{
+ ldout(cct, 10) << __func__ << dendl;
+
+ // start with some reasonably large number
+ unsigned min_load = std::numeric_limits<int>::max();
+ Worker* current_best = nullptr;
+
+ simple_spin_lock(&pool_spin);
+ // find worker with least references
+ // tempting case is returning on references == 0, but in reality
+ // this will happen so rarely that there's no need for special case.
+ for (auto p = workers.begin(); p != workers.end(); ++p) {
+ unsigned worker_load = (*p)->references.load();
+ ldout(cct, 20) << __func__ << " Worker " << *p << " load: " << worker_load << dendl;
+ if (worker_load < min_load) {
+ current_best = *p;
+ min_load = worker_load;
+ }
+ }
+
+ // if minimum load exceeds amount of workers, make a new worker
+ // logic behind this is that we're not going to create new worker
+ // just because others have *some* load, we'll defer worker creation
+ // until others have *plenty* of load. This will cause new worker
+ // to get assigned to all new connections *unless* one or more
+ // of workers get their load reduced - in that case, this worker
+ // will be assigned to new connection.
+ // TODO: add more logic and heuristics, so connections known to be
+ // of light workload (heartbeat service, etc.) won't overshadow
+ // heavy workload (clients, etc).
+ if (!current_best || ((workers.size() < (unsigned)cct->_conf->ms_async_max_op_threads)
+ && (min_load > workers.size()))) {
+ ldout(cct, 20) << __func__ << " creating worker" << dendl;
+ current_best = new Worker(cct, this, workers.size());
+ workers.push_back(current_best);
+ current_best->create("ms_async_worker");
+ } else {
+ ldout(cct, 20) << __func__ << " picked " << current_best
+ << " as best worker with load " << min_load << dendl;
+ }
+
+ ++current_best->references;
+ simple_spin_unlock(&pool_spin);
+
+ assert(current_best);
+ return current_best;
+}
+
+void WorkerPool::release_worker(EventCenter* c)
+{
+ ldout(cct, 10) << __func__ << dendl;
+ simple_spin_lock(&pool_spin);
+ for (auto p = workers.begin(); p != workers.end(); ++p) {
+ if (&((*p)->center) == c) {
+ ldout(cct, 10) << __func__ << " found worker, releasing" << dendl;
+ int oldref = (*p)->references.fetch_sub(1);
+ assert(oldref > 0);
+ break;
+ }
+ }
+ simple_spin_unlock(&pool_spin);
+}
+
void WorkerPool::barrier()
{
ldout(cct, 10) << __func__ << " started." << dendl;
#include "include/assert.h"
#include "AsyncConnection.h"
#include "Event.h"
+#include "common/simple_spin.h"
class AsyncMessenger;
public:
EventCenter center;
+ std::atomic_uint references;
Worker(CephContext *c, WorkerPool *p, int i)
- : cct(c), pool(p), done(false), id(i), perf_logger(NULL), center(c) {
+ : cct(c), pool(p), done(false), id(i), perf_logger(NULL), center(c), references(0) {
center.init(InitEventNumber);
char name[128];
sprintf(name, "AsyncMessenger::Worker-%d", id);
WorkerPool(const WorkerPool &);
WorkerPool& operator=(const WorkerPool &);
CephContext *cct;
- uint64_t seq;
vector<Worker*> workers;
vector<int> coreids;
// Used to indicate whether thread started
Mutex barrier_lock;
Cond barrier_cond;
atomic_t barrier_count;
+ simple_spinlock_t pool_spin = SIMPLE_SPINLOCK_INITIALIZER;
class C_barrier : public EventCallback {
WorkerPool *pool;
explicit WorkerPool(CephContext *c);
virtual ~WorkerPool();
void start();
- Worker *get_worker() {
- return workers[(seq++)%workers.size()];
- }
+ Worker *get_worker();
+ void release_worker(EventCenter* c);
int get_cpuid(int id) {
if (coreids.empty())
return -1;
*/
void unregister_conn(AsyncConnectionRef conn) {
Mutex::Locker l(deleted_lock);
+ conn->release_worker();
deleted_conns.insert(conn);
if (deleted_conns.size() >= ReapDeadConnectionThreshold) {
* See "deleted_conns"
*/
int reap_dead();
+
+ void release_worker(EventCenter* c) {
+ pool->release_worker(c);
+ }
/**
* @} // AsyncMessenger Internals