From d6f6ad03b6753bb3200340056b4a981ac9b08b7e Mon Sep 17 00:00:00 2001 From: Haomai Wang Date: Fri, 12 Jun 2015 01:00:12 +0800 Subject: [PATCH] PerfMsgr: Make Server worker threads configurable Signed-off-by: Haomai Wang --- src/test/msgr/perf_msgr_server.cc | 86 +++++++++++++++---------------- 1 file changed, 43 insertions(+), 43 deletions(-) diff --git a/src/test/msgr/perf_msgr_server.cc b/src/test/msgr/perf_msgr_server.cc index 0a15c2ff0c3b1..146219f42190f 100644 --- a/src/test/msgr/perf_msgr_server.cc +++ b/src/test/msgr/perf_msgr_server.cc @@ -33,53 +33,51 @@ using namespace std; class ServerDispatcher : public Dispatcher { uint64_t think_time; - class Worker : public Thread { - ServerDispatcher *dispatcher; + ThreadPool op_tp; + class OpWQ : public ThreadPool::WorkQueue { list messages; - bool is_stop; - Mutex lock; - Cond cond; public: - Worker(ServerDispatcher *d): dispatcher(d), is_stop(false), lock("ServerDispatcher::Worker::lock") {} - void queue(Message *m) { - Mutex::Locker l(lock); + OpWQ(time_t timeout, time_t suicide_timeout, ThreadPool *tp) + : ThreadPool::WorkQueue("ServerDispatcher::OpWQ", timeout, suicide_timeout, tp) {} + + bool _enqueue(Message *m) { messages.push_back(m); - cond.Signal(); + return true; + } + void _dequeue(Message *m) { + assert(0); + } + bool _empty() { + return messages.empty(); + } + Message *_dequeue() { + if (messages.empty()) + return NULL; + Message *m = messages.front(); + messages.pop_front(); + return m; } - void *entry() { - Mutex::Locker l(lock); - while (!is_stop) { - if (!messages.empty()) { - Message *m = messages.back(); - messages.pop_back(); - lock.Unlock(); - MOSDOp *osd_op = static_cast(m); - MOSDOpReply *reply = new MOSDOpReply(osd_op, 0, 0, 0, false); - m->get_connection()->send_message(reply); - m->put(); - lock.Lock(); - } else { - cond.Wait(lock); - } - } - return 0; + void _process(Message *m, ThreadPool::TPHandle &handle) { + MOSDOp *osd_op = static_cast(m); + MOSDOpReply *reply = new MOSDOpReply(osd_op, 0, 0, 0, false); + m->get_connection()->send_message(reply); + m->put(); } - void stop() { - Mutex::Locker l(lock); - is_stop = true; - cond.Signal(); + void _process_finish(Message *m) { } + void _clear() { + assert(messages.empty()); } - } worker; - friend class Worker; + } op_wq; public: - ServerDispatcher(uint64_t delay): Dispatcher(g_ceph_context), think_time(delay), worker(this) { - worker.create(); + ServerDispatcher(int threads, uint64_t delay): Dispatcher(g_ceph_context), think_time(delay), + op_tp(g_ceph_context, "ServerDispatcher::op_tp", threads, "serverdispatcher_op_threads"), + op_wq(30, 30, &op_tp) { + op_tp.start(); } ~ServerDispatcher() { - worker.stop(); - worker.join(); + op_tp.stop(); } bool ms_can_fast_dispatch_any() const { return true; } bool ms_can_fast_dispatch(Message *m) const { @@ -99,7 +97,7 @@ class ServerDispatcher : public Dispatcher { void ms_fast_dispatch(Message *m) { usleep(think_time); //cerr << __func__ << " reply message=" << m << std::endl; - worker.queue(m); + op_wq.queue(m); } bool ms_verify_authorizer(Connection *con, int peer_type, int protocol, bufferlist& authorizer, bufferlist& authorizer_reply, @@ -116,8 +114,8 @@ class MessengerServer { ServerDispatcher dispatcher; public: - MessengerServer(string t, string addr, int delay): - msgr(NULL), type(t), bindaddr(addr), dispatcher(delay) { + MessengerServer(string t, string addr, int threads, int delay): + msgr(NULL), type(t), bindaddr(addr), dispatcher(threads, delay) { msgr = Messenger::create(g_ceph_context, type, entity_name_t::OSD(0), "server", 0); msgr->set_default_policy(Messenger::Policy::stateless_server(0, 0)); } @@ -136,7 +134,7 @@ class MessengerServer { }; void usage(const string &name) { - cerr << "Usage: " << name << " [bind ip:port] [thinktime us]" << std::endl; + cerr << "Usage: " << name << " [bind ip:port] [server worker threads] [thinktime us]" << std::endl; } int main(int argc, char **argv) @@ -148,20 +146,22 @@ int main(int argc, char **argv) common_init_finish(g_ceph_context); g_ceph_context->_conf->apply_changes(NULL); - if (args.size() < 2) { + if (args.size() < 3) { usage(argv[0]); return 1; } - int think_time = atoi(args[1]); + int worker_threads = atoi(args[1]); + int think_time = atoi(args[2]); cerr << " This tool won't handle connection error alike things, " << std::endl; cerr << "please ensure the proper network environment to test." << std::endl; cerr << " Or ctrl+c when meeting error and restart tests" << std::endl; cerr << " using ms-type " << g_ceph_context->_conf->ms_type << std::endl; cerr << " bind ip:port " << args[0] << std::endl; + cerr << " worker threads " << worker_threads << std::endl; cerr << " thinktime(us) " << think_time << std::endl; - MessengerServer server(g_ceph_context->_conf->ms_type, args[0], think_time); + MessengerServer server(g_ceph_context->_conf->ms_type, args[0], worker_threads, think_time); server.start(); return 0; -- 2.39.5