class ServerDispatcher : public Dispatcher {
uint64_t think_time;
- class Worker : public Thread {
- ServerDispatcher *dispatcher;
+ ThreadPool op_tp;
+ class OpWQ : public ThreadPool::WorkQueue<Message> {
list<Message*> messages;
- bool is_stop;
- Mutex lock;
- Cond cond;
public:
- Worker(ServerDispatcher *d): dispatcher(d), is_stop(false), lock("ServerDispatcher::Worker::lock") {}
- void queue(Message *m) {
- Mutex::Locker l(lock);
+ OpWQ(time_t timeout, time_t suicide_timeout, ThreadPool *tp)
+ : ThreadPool::WorkQueue<Message>("ServerDispatcher::OpWQ", timeout, suicide_timeout, tp) {}
+
+ bool _enqueue(Message *m) {
messages.push_back(m);
- cond.Signal();
+ return true;
+ }
+ void _dequeue(Message *m) {
+ assert(0);
+ }
+ bool _empty() {
+ return messages.empty();
+ }
+ Message *_dequeue() {
+ if (messages.empty())
+ return NULL;
+ Message *m = messages.front();
+ messages.pop_front();
+ return m;
}
- void *entry() {
- Mutex::Locker l(lock);
- while (!is_stop) {
- if (!messages.empty()) {
- Message *m = messages.back();
- messages.pop_back();
- lock.Unlock();
- MOSDOp *osd_op = static_cast<MOSDOp*>(m);
- MOSDOpReply *reply = new MOSDOpReply(osd_op, 0, 0, 0, false);
- m->get_connection()->send_message(reply);
- m->put();
- lock.Lock();
- } else {
- cond.Wait(lock);
- }
- }
- return 0;
+ void _process(Message *m, ThreadPool::TPHandle &handle) {
+ MOSDOp *osd_op = static_cast<MOSDOp*>(m);
+ MOSDOpReply *reply = new MOSDOpReply(osd_op, 0, 0, 0, false);
+ m->get_connection()->send_message(reply);
+ m->put();
}
- void stop() {
- Mutex::Locker l(lock);
- is_stop = true;
- cond.Signal();
+ void _process_finish(Message *m) { }
+ void _clear() {
+ assert(messages.empty());
}
- } worker;
- friend class Worker;
+ } op_wq;
public:
- ServerDispatcher(uint64_t delay): Dispatcher(g_ceph_context), think_time(delay), worker(this) {
- worker.create();
+ ServerDispatcher(int threads, uint64_t delay): Dispatcher(g_ceph_context), think_time(delay),
+ op_tp(g_ceph_context, "ServerDispatcher::op_tp", threads, "serverdispatcher_op_threads"),
+ op_wq(30, 30, &op_tp) {
+ op_tp.start();
}
~ServerDispatcher() {
- worker.stop();
- worker.join();
+ op_tp.stop();
}
bool ms_can_fast_dispatch_any() const { return true; }
bool ms_can_fast_dispatch(Message *m) const {
void ms_fast_dispatch(Message *m) {
usleep(think_time);
//cerr << __func__ << " reply message=" << m << std::endl;
- worker.queue(m);
+ op_wq.queue(m);
}
bool ms_verify_authorizer(Connection *con, int peer_type, int protocol,
bufferlist& authorizer, bufferlist& authorizer_reply,
ServerDispatcher dispatcher;
public:
- MessengerServer(string t, string addr, int delay):
- msgr(NULL), type(t), bindaddr(addr), dispatcher(delay) {
+ MessengerServer(string t, string addr, int threads, int delay):
+ msgr(NULL), type(t), bindaddr(addr), dispatcher(threads, delay) {
msgr = Messenger::create(g_ceph_context, type, entity_name_t::OSD(0), "server", 0);
msgr->set_default_policy(Messenger::Policy::stateless_server(0, 0));
}
};
void usage(const string &name) {
- cerr << "Usage: " << name << " [bind ip:port] [thinktime us]" << std::endl;
+ cerr << "Usage: " << name << " [bind ip:port] [server worker threads] [thinktime us]" << std::endl;
}
int main(int argc, char **argv)
common_init_finish(g_ceph_context);
g_ceph_context->_conf->apply_changes(NULL);
- if (args.size() < 2) {
+ if (args.size() < 3) {
usage(argv[0]);
return 1;
}
- int think_time = atoi(args[1]);
+ int worker_threads = atoi(args[1]);
+ int think_time = atoi(args[2]);
cerr << " This tool won't handle connection error alike things, " << std::endl;
cerr << "please ensure the proper network environment to test." << std::endl;
cerr << " Or ctrl+c when meeting error and restart tests" << std::endl;
cerr << " using ms-type " << g_ceph_context->_conf->ms_type << std::endl;
cerr << " bind ip:port " << args[0] << std::endl;
+ cerr << " worker threads " << worker_threads << std::endl;
cerr << " thinktime(us) " << think_time << std::endl;
- MessengerServer server(g_ceph_context->_conf->ms_type, args[0], think_time);
+ MessengerServer server(g_ceph_context->_conf->ms_type, args[0], worker_threads, think_time);
server.start();
return 0;