From 6e043808b748eb4b82d459c91dda42618591a27d Mon Sep 17 00:00:00 2001 From: Yehuda Sadeh Date: Wed, 25 Apr 2012 00:30:48 -0700 Subject: [PATCH] rest-bench: create workqueue for requests dispatching Signed-off-by: Yehuda Sadeh --- src/tools/rest_bench.cc | 83 ++++++++++++++++++++++++++++++++++++++++- 1 file changed, 82 insertions(+), 1 deletion(-) diff --git a/src/tools/rest_bench.cc b/src/tools/rest_bench.cc index 65dc0d181b479..b5396bf7f2c77 100644 --- a/src/tools/rest_bench.cc +++ b/src/tools/rest_bench.cc @@ -17,11 +17,15 @@ #include "common/obj_bencher.h" #include "common/config.h" +#include "common/debug.h" #include "common/ceph_argparse.h" +#include "common/WorkQueue.h" #include "global/global_init.h" #include "libs3.h" +#include + #include #define DEFAULT_USER_AGENT "rest-bench" @@ -50,8 +54,11 @@ struct req_context { bufferlist *in_bl; bufferlist out_bl; uint64_t off; + Mutex lock; + Cond cond; - req_context() : status(S3StatusOK), ctx(NULL), cb(NULL), arg(NULL), in_bl(NULL), off(0) {} + req_context() : status(S3StatusOK), ctx(NULL), cb(NULL), arg(NULL), in_bl(NULL), off(0), + lock("req_context") {} ~req_context() { if (ctx) { S3_destroy_request_context(ctx); @@ -76,6 +83,73 @@ struct req_context { } }; +class RESTDispatcher { + deque m_req_queue; + ThreadPool m_tp; + + struct DispatcherWQ : public ThreadPool::WorkQueue { + RESTDispatcher *dispatcher; + DispatcherWQ(RESTDispatcher *p, time_t timeout, time_t suicide_timeout, ThreadPool *tp) + : ThreadPool::WorkQueue("REST", timeout, suicide_timeout, tp), dispatcher(p) {} + + bool _enqueue(req_context *req) { + dispatcher->m_req_queue.push_back(req); + generic_dout(20) << "enqueued request req=" << hex << req << dec << dendl; + _dump_queue(); + return true; + } + void _dequeue(req_context *req) { + assert(0); + } + bool _empty() { + return dispatcher->m_req_queue.empty(); + } + req_context *_dequeue() { + if (dispatcher->m_req_queue.empty()) + return NULL; + req_context *req = dispatcher->m_req_queue.front(); + dispatcher->m_req_queue.pop_front(); + generic_dout(20) << "dequeued request req=" << hex << req << dec << dendl; + _dump_queue(); + return req; + } + void _process(req_context *req) { + dispatcher->process_context(req); + } + void _dump_queue() { + deque::iterator iter; + if (dispatcher->m_req_queue.size() == 0) { + generic_dout(20) << "DispatcherWQ: empty" << dendl; + return; + } + generic_dout(20) << "DispatcherWQ:" << dendl; + for (iter = dispatcher->m_req_queue.begin(); iter != dispatcher->m_req_queue.end(); ++iter) { + generic_dout(20) << "req: " << hex << *iter << dec << dendl; + } + } + void _clear() { + assert(dispatcher->m_req_queue.empty()); + } + } req_wq; + +public: + RESTDispatcher(CephContext *cct, int num_threads) + : m_tp(cct, "RESTDispatcher::m_tp", num_threads), + req_wq(this, g_conf->rgw_op_thread_timeout, + g_conf->rgw_op_thread_suicide_timeout, &m_tp) + {} + void process_context(req_context *ctx); +}; + +void RESTDispatcher::process_context(req_context *ctx) +{ + S3Status status = S3_runall_request_context(ctx->ctx); + generic_dout(0) << "processed request, status=" << status << dendl; + + Mutex::Locker l(ctx->lock); + ctx->cond.SignalAll(); +} + static S3Status properties_callback(const S3ResponseProperties *properties, void *cb_data) { return S3StatusOK; @@ -249,6 +323,13 @@ protected: } int completion_wait(int slot) { + req_context *ctx = completions[slot]; + + Mutex::Locker l(ctx->lock); + + ctx->cond.Wait(ctx->lock); + + return 0; } int completion_ret(int slot) { -- 2.39.5