From d54ef1c8055f63c944ebf52029a1483ebc4022ce Mon Sep 17 00:00:00 2001 From: Yehuda Sadeh Date: Thu, 26 Apr 2012 17:01:36 -0700 Subject: [PATCH] rest-bench: change thread context for libs3 calls Apparently S3_put_object() and S3_get_object() need to run on the same thread as S3_runall_request_context() (at least per context). So We now call them in the workqueue thread. Signed-off-by: Yehuda Sadeh --- src/tools/rest_bench.cc | 225 ++++++++++++++++++++++++---------------- 1 file changed, 137 insertions(+), 88 deletions(-) diff --git a/src/tools/rest_bench.cc b/src/tools/rest_bench.cc index 3aeaf747092c2..f6bdc090241e8 100644 --- a/src/tools/rest_bench.cc +++ b/src/tools/rest_bench.cc @@ -66,6 +66,12 @@ static void usage_exit() exit(1); } +enum OpType { + OP_NONE = 0, + OP_GET_OBJ = 1, + OP_PUT_OBJ = 2, +}; + struct req_context : public RefCountedObject { bool complete; S3Status status; @@ -75,13 +81,18 @@ struct req_context : public RefCountedObject { bufferlist *in_bl; bufferlist out_bl; uint64_t off; + uint64_t len; + string oid; Mutex lock; Cond cond; + S3BucketContext *bucket_ctx; bool should_destroy_ctx; - req_context() : complete(false), status(S3StatusOK), ctx(NULL), cb(NULL), arg(NULL), in_bl(NULL), off(0), - lock("req_context"), should_destroy_ctx(false) {} + OpType op; + + req_context() : complete(false), status(S3StatusOK), ctx(NULL), cb(NULL), arg(NULL), in_bl(NULL), off(0), len(0), + lock("req_context"), bucket_ctx(NULL), should_destroy_ctx(false), op(OP_NONE) {} ~req_context() { if (should_destroy_ctx) { S3_destroy_request_context(ctx); @@ -107,10 +118,73 @@ struct req_context : public RefCountedObject { } }; +static S3Status properties_callback(const S3ResponseProperties *properties, void *cb_data) +{ + return S3StatusOK; +} + +static void complete_callback(S3Status status, const S3ErrorDetails *details, void *cb_data) +{ + if (!cb_data) + return; + + struct req_context *ctx = (struct req_context *)cb_data; + + ctx->lock.Lock(); + ctx->complete = true; + ctx->status = status; + ctx->lock.Unlock(); + + if (ctx->cb) { + ctx->cb((void *)ctx->cb, ctx->arg); + } + + ctx->put(); +} + +static S3Status get_obj_callback(int size, const char *buf, + void *cb_data) +{ + if (!cb_data) + return S3StatusOK; + + struct req_context *ctx = (struct req_context *)cb_data; + + ctx->in_bl->append(buf, size); + + return S3StatusOK; +} + +static int put_obj_callback(int size, char *buf, + void *cb_data) +{ + if (!cb_data) + return 0; + + struct req_context *ctx = (struct req_context *)cb_data; + + int chunk = ctx->out_bl.length() - ctx->off; + if (!chunk) + return 0; + + if (chunk > size) + chunk = size; + + memcpy(buf, ctx->out_bl.c_str() + ctx->off, chunk); + + ctx->off += chunk; + + return chunk; +} + class RESTDispatcher { deque m_req_queue; ThreadPool m_tp; + S3ResponseHandler response_handler; + S3GetObjectHandler get_obj_handler; + S3PutObjectHandler put_obj_handler; + struct DispatcherWQ : public ThreadPool::WorkQueue { RESTDispatcher *dispatcher; DispatcherWQ(RESTDispatcher *p, time_t timeout, time_t suicide_timeout, ThreadPool *tp) @@ -158,9 +232,22 @@ public: RESTDispatcher(CephContext *cct, int num_threads) : m_tp(cct, "RESTDispatcher::m_tp", num_threads), req_wq(this, g_conf->rgw_op_thread_timeout, - g_conf->rgw_op_thread_suicide_timeout, &m_tp) - {} + g_conf->rgw_op_thread_suicide_timeout, &m_tp) { + + + response_handler.propertiesCallback = properties_callback; + response_handler.completeCallback = complete_callback; + + get_obj_handler.responseHandler = response_handler; + get_obj_handler.getObjectDataCallback = get_obj_callback; + + put_obj_handler.responseHandler = response_handler; + put_obj_handler.putObjectDataCallback = put_obj_callback; + + } void process_context(req_context *ctx); + void get_obj(req_context *ctx); + void put_obj(req_context *ctx); void queue(req_context *ctx) { req_wq.queue(ctx); @@ -175,13 +262,24 @@ void RESTDispatcher::process_context(req_context *ctx) { ctx->get(); + switch (ctx->op) { + case OP_GET_OBJ: + get_obj(ctx); + break; + case OP_PUT_OBJ: + put_obj(ctx); + break; + default: + assert(0); + } + S3Status status = S3_runall_request_context(ctx->ctx); if (status != S3StatusOK) { cerr << "ERROR: S3_runall_request_context() returned " << S3_get_status_name(status) << std::endl; ctx->status = status; } else if (ctx->status != S3StatusOK) { - cerr << "ERROR: " << S3_get_status_name(ctx->status) << std::endl; + cerr << "ERROR: " << ctx->oid << ": " << S3_get_status_name(ctx->status) << std::endl; } ctx->lock.Lock(); @@ -191,63 +289,19 @@ void RESTDispatcher::process_context(req_context *ctx) ctx->put(); } -static S3Status properties_callback(const S3ResponseProperties *properties, void *cb_data) +void RESTDispatcher::put_obj(req_context *ctx) { - return S3StatusOK; + S3_put_object(ctx->bucket_ctx, ctx->oid.c_str(), + ctx->out_bl.length(), + NULL, + ctx->ctx, + &put_obj_handler, ctx); } -static void complete_callback(S3Status status, const S3ErrorDetails *details, void *cb_data) -{ - if (!cb_data) - return; - - struct req_context *ctx = (struct req_context *)cb_data; - - ctx->lock.Lock(); - ctx->complete = true; - ctx->status = status; - ctx->lock.Unlock(); - - if (ctx->cb) { - ctx->cb((void *)ctx->cb, ctx->arg); - } - - ctx->put(); -} - -static S3Status get_obj_callback(int size, const char *buf, - void *cb_data) -{ - if (!cb_data) - return S3StatusOK; - - struct req_context *ctx = (struct req_context *)cb_data; - - ctx->in_bl->append(buf, size); - - return S3StatusOK; -} - -static int put_obj_callback(int size, char *buf, - void *cb_data) +void RESTDispatcher::get_obj(req_context *ctx) { - if (!cb_data) - return 0; - - struct req_context *ctx = (struct req_context *)cb_data; - - int chunk = ctx->out_bl.length() - ctx->off; - if (!chunk) - return 0; - - if (chunk > size) - chunk = size; - - memcpy(buf, ctx->out_bl.c_str() + ctx->off, chunk); - - ctx->off += chunk; - - return chunk; + S3_get_object(ctx->bucket_ctx, ctx->oid.c_str(), NULL, 0, ctx->len, ctx->ctx, + &get_obj_handler, ctx); } class RESTBencher : public ObjBencher { @@ -263,12 +317,7 @@ class RESTBencher : public ObjBencher { string secret; int concurrentios; - S3ResponseHandler response_handler; - S3GetObjectHandler get_obj_handler; - S3PutObjectHandler put_obj_handler; - protected: - int rest_init() { S3Status status = S3_initialize(user_agent.c_str(), S3_INIT_ALL, host.c_str()); if (status != S3StatusOK) { @@ -276,14 +325,6 @@ protected: return -EINVAL; } - response_handler.propertiesCallback = properties_callback; - response_handler.completeCallback = complete_callback; - - get_obj_handler.responseHandler = response_handler; - get_obj_handler.getObjectDataCallback = get_obj_callback; - - put_obj_handler.responseHandler = response_handler; - put_obj_handler.putObjectDataCallback = put_obj_callback; return 0; } @@ -334,8 +375,10 @@ protected: ctx->get(); ctx->in_bl = pbl; - S3_get_object(&bucket_ctx, oid.c_str(), NULL, 0, len, ctx->ctx, - &get_obj_handler, ctx); + ctx->oid = oid; + ctx->len = len; + ctx->bucket_ctx = &bucket_ctx; + ctx->op = OP_GET_OBJ; dispatcher->queue(ctx); @@ -344,13 +387,13 @@ protected: int aio_write(const std::string& oid, int slot, const bufferlist& bl, size_t len) { struct req_context *ctx = completions[slot]; + ctx->get(); + ctx->bucket_ctx = &bucket_ctx; ctx->out_bl = bl; - S3_put_object(&bucket_ctx, oid.c_str(), - bl.length(), - NULL, - ctx->ctx, - &put_obj_handler, ctx); + ctx->oid = oid; + ctx->len = len; + ctx->op = OP_PUT_OBJ; dispatcher->queue(ctx); return 0; @@ -364,9 +407,13 @@ protected: } ctx->in_bl = &bl; ctx->get(); + ctx->bucket_ctx = &bucket_ctx; + ctx->oid = oid; + ctx->len = len; + ctx->op = OP_GET_OBJ; - S3_get_object(&bucket_ctx, oid.c_str(), NULL, 0, len, NULL, - &get_obj_handler, ctx); + dispatcher->process_context(ctx); + ret = ctx->ret(); ctx->put(); return bl.length(); } @@ -376,13 +423,13 @@ protected: if (ret < 0) { return ret; } - ctx->out_bl = bl; ctx->get(); - S3_put_object(&bucket_ctx, oid.c_str(), - bl.length(), - NULL, - NULL, - &put_obj_handler, ctx); + ctx->out_bl = bl; + ctx->bucket_ctx = &bucket_ctx; + ctx->oid = oid; + ctx->op = OP_PUT_OBJ; + + dispatcher->process_context(ctx); ret = ctx->ret(); ctx->put(); return ret; @@ -400,8 +447,6 @@ protected: while (!ctx->complete) { ctx->cond.Wait(ctx->lock); } -//cerr << __FILE__ << ":" << __LINE__ << ": ctx->put() ctx=" << (void *)ctx << std::endl; -// ctx->put(); return 0; } @@ -449,6 +494,10 @@ public: ctx->get(); + S3ResponseHandler response_handler; + response_handler.propertiesCallback = properties_callback; + response_handler.completeCallback = complete_callback; + S3_create_bucket(protocol, access_key.c_str(), secret.c_str(), NULL, bucket.c_str(), S3CannedAclPrivate, NULL, /* locationConstraint */ -- 2.39.5