]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rest-bench: create workqueue for requests dispatching
authorYehuda Sadeh <yehuda@hq.newdream.net>
Wed, 25 Apr 2012 07:30:48 +0000 (00:30 -0700)
committerYehuda Sadeh <yehuda@inktank.com>
Fri, 4 May 2012 22:53:26 +0000 (15:53 -0700)
Signed-off-by: Yehuda Sadeh <yehuda@hq.newdream.net>
src/tools/rest_bench.cc

index 65dc0d181b47921eabf1618c5ca16d3b8f1d83f9..b5396bf7f2c7793b20b9a43867f5ca3448077a85 100644 (file)
 
 #include "common/obj_bencher.h"
 #include "common/config.h"
+#include "common/debug.h"
 #include "common/ceph_argparse.h"
+#include "common/WorkQueue.h"
 #include "global/global_init.h"
 
 #include "libs3.h"
 
+#include <deque>
+
 #include <errno.h>
 
 #define DEFAULT_USER_AGENT "rest-bench"
@@ -50,8 +54,11 @@ struct req_context {
   bufferlist *in_bl;
   bufferlist out_bl;
   uint64_t off;
+  Mutex lock;
+  Cond cond;
 
-  req_context() : status(S3StatusOK), ctx(NULL), cb(NULL), arg(NULL), in_bl(NULL), off(0) {}
+  req_context() : status(S3StatusOK), ctx(NULL), cb(NULL), arg(NULL), in_bl(NULL), off(0),
+                  lock("req_context") {}
   ~req_context() {
     if (ctx) {
       S3_destroy_request_context(ctx);
@@ -76,6 +83,73 @@ struct req_context {
   }
 };
 
+class RESTDispatcher {
+  deque<req_context *> m_req_queue;
+  ThreadPool m_tp;
+
+  struct DispatcherWQ : public ThreadPool::WorkQueue<req_context> {
+    RESTDispatcher *dispatcher;
+    DispatcherWQ(RESTDispatcher *p, time_t timeout, time_t suicide_timeout, ThreadPool *tp)
+      : ThreadPool::WorkQueue<req_context>("REST", timeout, suicide_timeout, tp), dispatcher(p) {}
+
+    bool _enqueue(req_context *req) {
+      dispatcher->m_req_queue.push_back(req);
+      generic_dout(20) << "enqueued request req=" << hex << req << dec << dendl;
+      _dump_queue();
+      return true;
+    }
+    void _dequeue(req_context *req) {
+      assert(0);
+    }
+    bool _empty() {
+      return dispatcher->m_req_queue.empty();
+    }
+    req_context *_dequeue() {
+      if (dispatcher->m_req_queue.empty())
+       return NULL;
+      req_context *req = dispatcher->m_req_queue.front();
+      dispatcher->m_req_queue.pop_front();
+      generic_dout(20) << "dequeued request req=" << hex << req << dec << dendl;
+      _dump_queue();
+      return req;
+    }
+    void _process(req_context *req) {
+      dispatcher->process_context(req);
+    }
+    void _dump_queue() {
+      deque<req_context *>::iterator iter;
+      if (dispatcher->m_req_queue.size() == 0) {
+        generic_dout(20) << "DispatcherWQ: empty" << dendl;
+        return;
+      }
+      generic_dout(20) << "DispatcherWQ:" << dendl;
+      for (iter = dispatcher->m_req_queue.begin(); iter != dispatcher->m_req_queue.end(); ++iter) {
+        generic_dout(20) << "req: " << hex << *iter << dec << dendl;
+      }
+    }
+    void _clear() {
+      assert(dispatcher->m_req_queue.empty());
+    }
+  } req_wq;
+
+public:
+  RESTDispatcher(CephContext *cct, int num_threads)
+    : m_tp(cct, "RESTDispatcher::m_tp", num_threads),
+      req_wq(this, g_conf->rgw_op_thread_timeout,
+            g_conf->rgw_op_thread_suicide_timeout, &m_tp)
+      {}
+  void process_context(req_context *ctx);
+};
+
+void RESTDispatcher::process_context(req_context *ctx)
+{
+  S3Status status = S3_runall_request_context(ctx->ctx);
+  generic_dout(0) << "processed request, status=" << status << dendl;
+
+  Mutex::Locker l(ctx->lock);
+  ctx->cond.SignalAll();
+}
+
 static S3Status properties_callback(const S3ResponseProperties *properties, void *cb_data)
 {
   return S3StatusOK;
@@ -249,6 +323,13 @@ protected:
   }
 
   int completion_wait(int slot) {
+    req_context *ctx = completions[slot];
+
+    Mutex::Locker l(ctx->lock);
+
+    ctx->cond.Wait(ctx->lock);
+
+    return 0;
   }
 
   int completion_ret(int slot) {