]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
PerfMsgr: Make Server worker threads configurable
authorHaomai Wang <haomaiwang@gmail.com>
Thu, 11 Jun 2015 17:00:12 +0000 (01:00 +0800)
committerHaomai Wang <haomaiwang@gmail.com>
Thu, 11 Jun 2015 17:00:12 +0000 (01:00 +0800)
Signed-off-by: Haomai Wang <haomaiwang@gmail.com>
src/test/msgr/perf_msgr_server.cc

index 0a15c2ff0c3b12ff4d14858e4aef6016362f1e2b..146219f42190fa43a2fe35028dcb3221528d0dd3 100644 (file)
@@ -33,53 +33,51 @@ using namespace std;
 
 class ServerDispatcher : public Dispatcher {
   uint64_t think_time;
-  class Worker : public Thread {
-    ServerDispatcher *dispatcher;
+  ThreadPool op_tp;
+  class OpWQ : public ThreadPool::WorkQueue<Message> {
     list<Message*> messages;
-    bool is_stop;
-    Mutex lock;
-    Cond cond;
 
    public:
-    Worker(ServerDispatcher *d): dispatcher(d), is_stop(false), lock("ServerDispatcher::Worker::lock") {}
-    void queue(Message *m) {
-      Mutex::Locker l(lock);
+    OpWQ(time_t timeout, time_t suicide_timeout, ThreadPool *tp)
+      : ThreadPool::WorkQueue<Message>("ServerDispatcher::OpWQ", timeout, suicide_timeout, tp) {}
+
+    bool _enqueue(Message *m) {
       messages.push_back(m);
-      cond.Signal();
+      return true;
+    }
+    void _dequeue(Message *m) {
+      assert(0);
+    }
+    bool _empty() {
+      return messages.empty();
+    }
+    Message *_dequeue() {
+      if (messages.empty())
+       return NULL;
+      Message *m = messages.front();
+      messages.pop_front();
+      return m;
     }
-    void *entry() {
-      Mutex::Locker l(lock);
-      while (!is_stop) {
-        if (!messages.empty()) {
-          Message *m = messages.back();
-          messages.pop_back();
-          lock.Unlock();
-          MOSDOp *osd_op = static_cast<MOSDOp*>(m);
-          MOSDOpReply *reply = new MOSDOpReply(osd_op, 0, 0, 0, false);
-          m->get_connection()->send_message(reply);
-          m->put();
-          lock.Lock();
-        } else {
-          cond.Wait(lock);
-        }
-      }
-      return 0;
+    void _process(Message *m, ThreadPool::TPHandle &handle) {
+      MOSDOp *osd_op = static_cast<MOSDOp*>(m);
+      MOSDOpReply *reply = new MOSDOpReply(osd_op, 0, 0, 0, false);
+      m->get_connection()->send_message(reply);
+      m->put();
     }
-    void stop() {
-      Mutex::Locker l(lock);
-      is_stop = true;
-      cond.Signal();
+    void _process_finish(Message *m) { }
+    void _clear() {
+      assert(messages.empty());
     }
-  } worker;
-  friend class Worker;
+  } op_wq;
 
  public:
-  ServerDispatcher(uint64_t delay): Dispatcher(g_ceph_context), think_time(delay), worker(this) {
-    worker.create();
+  ServerDispatcher(int threads, uint64_t delay): Dispatcher(g_ceph_context), think_time(delay),
+    op_tp(g_ceph_context, "ServerDispatcher::op_tp", threads, "serverdispatcher_op_threads"),
+    op_wq(30, 30, &op_tp) {
+    op_tp.start();
   }
   ~ServerDispatcher() {
-    worker.stop();
-    worker.join();
+    op_tp.stop();
   }
   bool ms_can_fast_dispatch_any() const { return true; }
   bool ms_can_fast_dispatch(Message *m) const {
@@ -99,7 +97,7 @@ class ServerDispatcher : public Dispatcher {
   void ms_fast_dispatch(Message *m) {
     usleep(think_time);
     //cerr << __func__ << " reply message=" << m << std::endl;
-    worker.queue(m);
+    op_wq.queue(m);
   }
   bool ms_verify_authorizer(Connection *con, int peer_type, int protocol,
                             bufferlist& authorizer, bufferlist& authorizer_reply,
@@ -116,8 +114,8 @@ class MessengerServer {
   ServerDispatcher dispatcher;
 
  public:
-  MessengerServer(string t, string addr, int delay):
-      msgr(NULL), type(t), bindaddr(addr), dispatcher(delay) {
+  MessengerServer(string t, string addr, int threads, int delay):
+      msgr(NULL), type(t), bindaddr(addr), dispatcher(threads, delay) {
     msgr = Messenger::create(g_ceph_context, type, entity_name_t::OSD(0), "server", 0);
     msgr->set_default_policy(Messenger::Policy::stateless_server(0, 0));
   }
@@ -136,7 +134,7 @@ class MessengerServer {
 };
 
 void usage(const string &name) {
-  cerr << "Usage: " << name << " [bind ip:port] [thinktime us]" << std::endl;
+  cerr << "Usage: " << name << " [bind ip:port] [server worker threads] [thinktime us]" << std::endl;
 }
 
 int main(int argc, char **argv)
@@ -148,20 +146,22 @@ int main(int argc, char **argv)
   common_init_finish(g_ceph_context);
   g_ceph_context->_conf->apply_changes(NULL);
 
-  if (args.size() < 2) {
+  if (args.size() < 3) {
     usage(argv[0]);
     return 1;
   }
 
-  int think_time = atoi(args[1]);
+  int worker_threads = atoi(args[1]);
+  int think_time = atoi(args[2]);
   cerr << " This tool won't handle connection error alike things, " << std::endl;
   cerr << "please ensure the proper network environment to test." << std::endl;
   cerr << " Or ctrl+c when meeting error and restart tests" << std::endl;
   cerr << " using ms-type " << g_ceph_context->_conf->ms_type << std::endl;
   cerr << "       bind ip:port " << args[0] << std::endl;
+  cerr << "       worker threads " << worker_threads << std::endl;
   cerr << "       thinktime(us) " << think_time << std::endl;
 
-  MessengerServer server(g_ceph_context->_conf->ms_type, args[0], think_time);
+  MessengerServer server(g_ceph_context->_conf->ms_type, args[0], worker_threads, think_time);
   server.start();
 
   return 0;