#include "common/obj_bencher.h"
#include "common/config.h"
+#include "common/debug.h"
#include "common/ceph_argparse.h"
+#include "common/WorkQueue.h"
#include "global/global_init.h"
#include "libs3.h"
+#include <deque>
+
#include <errno.h>
#define DEFAULT_USER_AGENT "rest-bench"
bufferlist *in_bl;
bufferlist out_bl;
uint64_t off;
+ Mutex lock;
+ Cond cond;
- req_context() : status(S3StatusOK), ctx(NULL), cb(NULL), arg(NULL), in_bl(NULL), off(0) {}
+ req_context() : status(S3StatusOK), ctx(NULL), cb(NULL), arg(NULL), in_bl(NULL), off(0),
+ lock("req_context") {}
~req_context() {
if (ctx) {
S3_destroy_request_context(ctx);
}
};
+class RESTDispatcher {
+ deque<req_context *> m_req_queue;
+ ThreadPool m_tp;
+
+ struct DispatcherWQ : public ThreadPool::WorkQueue<req_context> {
+ RESTDispatcher *dispatcher;
+ DispatcherWQ(RESTDispatcher *p, time_t timeout, time_t suicide_timeout, ThreadPool *tp)
+ : ThreadPool::WorkQueue<req_context>("REST", timeout, suicide_timeout, tp), dispatcher(p) {}
+
+ bool _enqueue(req_context *req) {
+ dispatcher->m_req_queue.push_back(req);
+ generic_dout(20) << "enqueued request req=" << hex << req << dec << dendl;
+ _dump_queue();
+ return true;
+ }
+ void _dequeue(req_context *req) {
+ assert(0);
+ }
+ bool _empty() {
+ return dispatcher->m_req_queue.empty();
+ }
+ req_context *_dequeue() {
+ if (dispatcher->m_req_queue.empty())
+ return NULL;
+ req_context *req = dispatcher->m_req_queue.front();
+ dispatcher->m_req_queue.pop_front();
+ generic_dout(20) << "dequeued request req=" << hex << req << dec << dendl;
+ _dump_queue();
+ return req;
+ }
+ void _process(req_context *req) {
+ dispatcher->process_context(req);
+ }
+ void _dump_queue() {
+ deque<req_context *>::iterator iter;
+ if (dispatcher->m_req_queue.size() == 0) {
+ generic_dout(20) << "DispatcherWQ: empty" << dendl;
+ return;
+ }
+ generic_dout(20) << "DispatcherWQ:" << dendl;
+ for (iter = dispatcher->m_req_queue.begin(); iter != dispatcher->m_req_queue.end(); ++iter) {
+ generic_dout(20) << "req: " << hex << *iter << dec << dendl;
+ }
+ }
+ void _clear() {
+ assert(dispatcher->m_req_queue.empty());
+ }
+ } req_wq;
+
+public:
+ RESTDispatcher(CephContext *cct, int num_threads)
+ : m_tp(cct, "RESTDispatcher::m_tp", num_threads),
+ req_wq(this, g_conf->rgw_op_thread_timeout,
+ g_conf->rgw_op_thread_suicide_timeout, &m_tp)
+ {}
+ void process_context(req_context *ctx);
+};
+
+void RESTDispatcher::process_context(req_context *ctx)
+{
+ S3Status status = S3_runall_request_context(ctx->ctx);
+ generic_dout(0) << "processed request, status=" << status << dendl;
+
+ Mutex::Locker l(ctx->lock);
+ ctx->cond.SignalAll();
+}
+
static S3Status properties_callback(const S3ResponseProperties *properties, void *cb_data)
{
return S3StatusOK;
}
int completion_wait(int slot) {
+ req_context *ctx = completions[slot];
+
+ Mutex::Locker l(ctx->lock);
+
+ ctx->cond.Wait(ctx->lock);
+
+ return 0;
}
int completion_ret(int slot) {