]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rest-bench: change thread context for libs3 calls
authorYehuda Sadeh <yehuda@hq.newdream.net>
Fri, 27 Apr 2012 00:01:36 +0000 (17:01 -0700)
committerYehuda Sadeh <yehuda@inktank.com>
Fri, 4 May 2012 22:53:27 +0000 (15:53 -0700)
Apparently S3_put_object() and S3_get_object() need to
run on the same thread as S3_runall_request_context() (at least
per context). So We now call them in the workqueue thread.

Signed-off-by: Yehuda Sadeh <yehuda@hq.newdream.net>
src/tools/rest_bench.cc

index 3aeaf747092c2e1a7ea29a7f9660275ea551c2ec..f6bdc090241e8b0fde8eae3eed0fd30ef22c6c93 100644 (file)
@@ -66,6 +66,12 @@ static void usage_exit()
   exit(1);
 }
 
+enum OpType {
+  OP_NONE    = 0,
+  OP_GET_OBJ = 1,
+  OP_PUT_OBJ = 2,
+};
+
 struct req_context : public RefCountedObject {
   bool complete;
   S3Status status;
@@ -75,13 +81,18 @@ struct req_context : public RefCountedObject {
   bufferlist *in_bl;
   bufferlist out_bl;
   uint64_t off;
+  uint64_t len;
+  string oid;
   Mutex lock;
   Cond cond;
+  S3BucketContext *bucket_ctx;
 
   bool should_destroy_ctx;
 
-  req_context() : complete(false), status(S3StatusOK), ctx(NULL), cb(NULL), arg(NULL), in_bl(NULL), off(0),
-                  lock("req_context"), should_destroy_ctx(false) {}
+  OpType op;
+
+  req_context() : complete(false), status(S3StatusOK), ctx(NULL), cb(NULL), arg(NULL), in_bl(NULL), off(0), len(0),
+                  lock("req_context"), bucket_ctx(NULL), should_destroy_ctx(false), op(OP_NONE) {}
   ~req_context() {
     if (should_destroy_ctx) {
       S3_destroy_request_context(ctx);
@@ -107,10 +118,73 @@ struct req_context : public RefCountedObject {
   }
 };
 
+static S3Status properties_callback(const S3ResponseProperties *properties, void *cb_data)
+{
+  return S3StatusOK;
+}
+
+static void complete_callback(S3Status status, const S3ErrorDetails *details, void *cb_data)
+{
+  if (!cb_data)
+    return;
+
+  struct req_context *ctx = (struct req_context *)cb_data;
+
+  ctx->lock.Lock();
+  ctx->complete = true;
+  ctx->status = status;
+  ctx->lock.Unlock();
+
+  if (ctx->cb) {
+    ctx->cb((void *)ctx->cb, ctx->arg);
+  }
+
+  ctx->put();
+}
+
+static S3Status get_obj_callback(int size, const char *buf,
+                                 void *cb_data)
+{
+  if (!cb_data)
+    return S3StatusOK;
+
+  struct req_context *ctx = (struct req_context *)cb_data;
+
+  ctx->in_bl->append(buf, size);
+
+  return S3StatusOK;
+}
+
+static int put_obj_callback(int size, char *buf,
+                            void *cb_data)
+{
+  if (!cb_data)
+    return 0;
+
+  struct req_context *ctx = (struct req_context *)cb_data;
+
+  int chunk = ctx->out_bl.length() - ctx->off;
+  if (!chunk)
+    return 0;
+
+  if (chunk > size)
+    chunk = size;
+
+  memcpy(buf, ctx->out_bl.c_str() + ctx->off, chunk);
+
+  ctx->off += chunk;
+
+  return chunk;
+}
+
 class RESTDispatcher {
   deque<req_context *> m_req_queue;
   ThreadPool m_tp;
 
+  S3ResponseHandler response_handler;
+  S3GetObjectHandler get_obj_handler;
+  S3PutObjectHandler put_obj_handler;
+
   struct DispatcherWQ : public ThreadPool::WorkQueue<req_context> {
     RESTDispatcher *dispatcher;
     DispatcherWQ(RESTDispatcher *p, time_t timeout, time_t suicide_timeout, ThreadPool *tp)
@@ -158,9 +232,22 @@ public:
   RESTDispatcher(CephContext *cct, int num_threads)
     : m_tp(cct, "RESTDispatcher::m_tp", num_threads),
       req_wq(this, g_conf->rgw_op_thread_timeout,
-            g_conf->rgw_op_thread_suicide_timeout, &m_tp)
-      {}
+            g_conf->rgw_op_thread_suicide_timeout, &m_tp) {
+
+
+    response_handler.propertiesCallback = properties_callback;
+    response_handler.completeCallback = complete_callback;
+
+    get_obj_handler.responseHandler = response_handler;
+    get_obj_handler.getObjectDataCallback = get_obj_callback;
+
+    put_obj_handler.responseHandler = response_handler;
+    put_obj_handler.putObjectDataCallback = put_obj_callback;
+
+  }
   void process_context(req_context *ctx);
+  void get_obj(req_context *ctx);
+  void put_obj(req_context *ctx);
 
   void queue(req_context *ctx) {
     req_wq.queue(ctx);
@@ -175,13 +262,24 @@ void RESTDispatcher::process_context(req_context *ctx)
 {
   ctx->get();
 
+  switch (ctx->op) {
+    case OP_GET_OBJ:
+      get_obj(ctx);
+      break;
+    case OP_PUT_OBJ:
+      put_obj(ctx);
+      break;
+    default:
+      assert(0);
+  }
+
   S3Status status = S3_runall_request_context(ctx->ctx);
 
   if (status != S3StatusOK) {
     cerr << "ERROR: S3_runall_request_context() returned " << S3_get_status_name(status) << std::endl;
     ctx->status = status;
   } else if (ctx->status != S3StatusOK) {
-    cerr << "ERROR: " << S3_get_status_name(ctx->status) << std::endl;
+    cerr << "ERROR: " << ctx->oid << ": " << S3_get_status_name(ctx->status) << std::endl;
   }
 
   ctx->lock.Lock();
@@ -191,63 +289,19 @@ void RESTDispatcher::process_context(req_context *ctx)
   ctx->put();
 }
 
-static S3Status properties_callback(const S3ResponseProperties *properties, void *cb_data)
+void RESTDispatcher::put_obj(req_context *ctx)
 {
-  return S3StatusOK;
+  S3_put_object(ctx->bucket_ctx, ctx->oid.c_str(),
+                ctx->out_bl.length(),
+                NULL,
+                ctx->ctx,
+                &put_obj_handler, ctx);
 }
 
-static void complete_callback(S3Status status, const S3ErrorDetails *details, void *cb_data)
-{
-  if (!cb_data)
-    return;
-
-  struct req_context *ctx = (struct req_context *)cb_data;
-
-  ctx->lock.Lock();
-  ctx->complete = true;
-  ctx->status = status;
-  ctx->lock.Unlock();
-
-  if (ctx->cb) {
-    ctx->cb((void *)ctx->cb, ctx->arg);
-  }
-
-  ctx->put();
-}
-
-static S3Status get_obj_callback(int size, const char *buf,
-                                 void *cb_data)
-{
-  if (!cb_data)
-    return S3StatusOK;
-
-  struct req_context *ctx = (struct req_context *)cb_data;
-
-  ctx->in_bl->append(buf, size);
-
-  return S3StatusOK;
-}
-
-static int put_obj_callback(int size, char *buf,
-                            void *cb_data)
+void RESTDispatcher::get_obj(req_context *ctx)
 {
-  if (!cb_data)
-    return 0;
-
-  struct req_context *ctx = (struct req_context *)cb_data;
-
-  int chunk = ctx->out_bl.length() - ctx->off;
-  if (!chunk)
-    return 0;
-
-  if (chunk > size)
-    chunk = size;
-
-  memcpy(buf, ctx->out_bl.c_str() + ctx->off, chunk);
-
-  ctx->off += chunk;
-
-  return chunk;
+  S3_get_object(ctx->bucket_ctx, ctx->oid.c_str(), NULL, 0, ctx->len, ctx->ctx,
+                &get_obj_handler, ctx);
 }
 
 class RESTBencher : public ObjBencher {
@@ -263,12 +317,7 @@ class RESTBencher : public ObjBencher {
   string secret;
   int concurrentios;
 
-  S3ResponseHandler response_handler;
-  S3GetObjectHandler get_obj_handler;
-  S3PutObjectHandler put_obj_handler;
-
 protected:
-
   int rest_init() {
     S3Status status = S3_initialize(user_agent.c_str(), S3_INIT_ALL, host.c_str());
     if (status != S3StatusOK) {
@@ -276,14 +325,6 @@ protected:
       return -EINVAL;
     }
 
-    response_handler.propertiesCallback = properties_callback;
-    response_handler.completeCallback = complete_callback;
-
-    get_obj_handler.responseHandler = response_handler;
-    get_obj_handler.getObjectDataCallback = get_obj_callback;
-
-    put_obj_handler.responseHandler = response_handler;
-    put_obj_handler.putObjectDataCallback = put_obj_callback;
 
     return 0;
   }
@@ -334,8 +375,10 @@ protected:
 
     ctx->get();
     ctx->in_bl = pbl;
-    S3_get_object(&bucket_ctx, oid.c_str(), NULL, 0, len, ctx->ctx,
-                  &get_obj_handler, ctx);
+    ctx->oid = oid;
+    ctx->len = len;
+    ctx->bucket_ctx = &bucket_ctx;
+    ctx->op = OP_GET_OBJ;
 
     dispatcher->queue(ctx);
 
@@ -344,13 +387,13 @@ protected:
 
   int aio_write(const std::string& oid, int slot, const bufferlist& bl, size_t len) {
     struct req_context *ctx = completions[slot];
+
     ctx->get();
+    ctx->bucket_ctx = &bucket_ctx;
     ctx->out_bl = bl;
-    S3_put_object(&bucket_ctx, oid.c_str(),
-                  bl.length(),
-                  NULL,
-                  ctx->ctx,
-                  &put_obj_handler, ctx);
+    ctx->oid = oid;
+    ctx->len = len;
+    ctx->op = OP_PUT_OBJ;
 
     dispatcher->queue(ctx);
     return 0;
@@ -364,9 +407,13 @@ protected:
     }
     ctx->in_bl = &bl;
     ctx->get();
+    ctx->bucket_ctx = &bucket_ctx;
+    ctx->oid = oid;
+    ctx->len = len;
+    ctx->op = OP_GET_OBJ;
 
-    S3_get_object(&bucket_ctx, oid.c_str(), NULL, 0, len, NULL,
-                  &get_obj_handler, ctx);
+    dispatcher->process_context(ctx);
+    ret = ctx->ret();
     ctx->put();
     return bl.length();
   }
@@ -376,13 +423,13 @@ protected:
     if (ret < 0) {
       return ret;
     }
-    ctx->out_bl = bl;
     ctx->get();
-    S3_put_object(&bucket_ctx, oid.c_str(),
-                  bl.length(),
-                  NULL,
-                  NULL,
-                  &put_obj_handler, ctx);
+    ctx->out_bl = bl;
+    ctx->bucket_ctx = &bucket_ctx;
+    ctx->oid = oid;
+    ctx->op = OP_PUT_OBJ;
+
+    dispatcher->process_context(ctx);
     ret = ctx->ret();
     ctx->put();
     return ret;
@@ -400,8 +447,6 @@ protected:
     while (!ctx->complete) {
       ctx->cond.Wait(ctx->lock);
     }
-//cerr << __FILE__ << ":" << __LINE__ << ": ctx->put() ctx=" << (void *)ctx << std::endl;
-//    ctx->put();
 
     return 0;
   }
@@ -449,6 +494,10 @@ public:
 
     ctx->get();
 
+    S3ResponseHandler response_handler;
+    response_handler.propertiesCallback = properties_callback;
+    response_handler.completeCallback = complete_callback;
+
     S3_create_bucket(protocol, access_key.c_str(), secret.c_str(), NULL,
                      bucket.c_str(), S3CannedAclPrivate,
                      NULL, /* locationConstraint */