exit(1);
}
+enum OpType {
+ OP_NONE = 0,
+ OP_GET_OBJ = 1,
+ OP_PUT_OBJ = 2,
+};
+
struct req_context : public RefCountedObject {
bool complete;
S3Status status;
bufferlist *in_bl;
bufferlist out_bl;
uint64_t off;
+ uint64_t len;
+ string oid;
Mutex lock;
Cond cond;
+ S3BucketContext *bucket_ctx;
bool should_destroy_ctx;
- req_context() : complete(false), status(S3StatusOK), ctx(NULL), cb(NULL), arg(NULL), in_bl(NULL), off(0),
- lock("req_context"), should_destroy_ctx(false) {}
+ OpType op;
+
+ req_context() : complete(false), status(S3StatusOK), ctx(NULL), cb(NULL), arg(NULL), in_bl(NULL), off(0), len(0),
+ lock("req_context"), bucket_ctx(NULL), should_destroy_ctx(false), op(OP_NONE) {}
~req_context() {
if (should_destroy_ctx) {
S3_destroy_request_context(ctx);
}
};
+static S3Status properties_callback(const S3ResponseProperties *properties, void *cb_data)
+{
+ return S3StatusOK;
+}
+
+static void complete_callback(S3Status status, const S3ErrorDetails *details, void *cb_data)
+{
+ if (!cb_data)
+ return;
+
+ struct req_context *ctx = (struct req_context *)cb_data;
+
+ ctx->lock.Lock();
+ ctx->complete = true;
+ ctx->status = status;
+ ctx->lock.Unlock();
+
+ if (ctx->cb) {
+ ctx->cb((void *)ctx->cb, ctx->arg);
+ }
+
+ ctx->put();
+}
+
+static S3Status get_obj_callback(int size, const char *buf,
+ void *cb_data)
+{
+ if (!cb_data)
+ return S3StatusOK;
+
+ struct req_context *ctx = (struct req_context *)cb_data;
+
+ ctx->in_bl->append(buf, size);
+
+ return S3StatusOK;
+}
+
+static int put_obj_callback(int size, char *buf,
+ void *cb_data)
+{
+ if (!cb_data)
+ return 0;
+
+ struct req_context *ctx = (struct req_context *)cb_data;
+
+ int chunk = ctx->out_bl.length() - ctx->off;
+ if (!chunk)
+ return 0;
+
+ if (chunk > size)
+ chunk = size;
+
+ memcpy(buf, ctx->out_bl.c_str() + ctx->off, chunk);
+
+ ctx->off += chunk;
+
+ return chunk;
+}
+
class RESTDispatcher {
deque<req_context *> m_req_queue;
ThreadPool m_tp;
+ S3ResponseHandler response_handler;
+ S3GetObjectHandler get_obj_handler;
+ S3PutObjectHandler put_obj_handler;
+
struct DispatcherWQ : public ThreadPool::WorkQueue<req_context> {
RESTDispatcher *dispatcher;
DispatcherWQ(RESTDispatcher *p, time_t timeout, time_t suicide_timeout, ThreadPool *tp)
RESTDispatcher(CephContext *cct, int num_threads)
: m_tp(cct, "RESTDispatcher::m_tp", num_threads),
req_wq(this, g_conf->rgw_op_thread_timeout,
- g_conf->rgw_op_thread_suicide_timeout, &m_tp)
- {}
+ g_conf->rgw_op_thread_suicide_timeout, &m_tp) {
+
+
+ response_handler.propertiesCallback = properties_callback;
+ response_handler.completeCallback = complete_callback;
+
+ get_obj_handler.responseHandler = response_handler;
+ get_obj_handler.getObjectDataCallback = get_obj_callback;
+
+ put_obj_handler.responseHandler = response_handler;
+ put_obj_handler.putObjectDataCallback = put_obj_callback;
+
+ }
void process_context(req_context *ctx);
+ void get_obj(req_context *ctx);
+ void put_obj(req_context *ctx);
void queue(req_context *ctx) {
req_wq.queue(ctx);
{
ctx->get();
+ switch (ctx->op) {
+ case OP_GET_OBJ:
+ get_obj(ctx);
+ break;
+ case OP_PUT_OBJ:
+ put_obj(ctx);
+ break;
+ default:
+ assert(0);
+ }
+
S3Status status = S3_runall_request_context(ctx->ctx);
if (status != S3StatusOK) {
cerr << "ERROR: S3_runall_request_context() returned " << S3_get_status_name(status) << std::endl;
ctx->status = status;
} else if (ctx->status != S3StatusOK) {
- cerr << "ERROR: " << S3_get_status_name(ctx->status) << std::endl;
+ cerr << "ERROR: " << ctx->oid << ": " << S3_get_status_name(ctx->status) << std::endl;
}
ctx->lock.Lock();
ctx->put();
}
-static S3Status properties_callback(const S3ResponseProperties *properties, void *cb_data)
+void RESTDispatcher::put_obj(req_context *ctx)
{
- return S3StatusOK;
+ S3_put_object(ctx->bucket_ctx, ctx->oid.c_str(),
+ ctx->out_bl.length(),
+ NULL,
+ ctx->ctx,
+ &put_obj_handler, ctx);
}
-static void complete_callback(S3Status status, const S3ErrorDetails *details, void *cb_data)
-{
- if (!cb_data)
- return;
-
- struct req_context *ctx = (struct req_context *)cb_data;
-
- ctx->lock.Lock();
- ctx->complete = true;
- ctx->status = status;
- ctx->lock.Unlock();
-
- if (ctx->cb) {
- ctx->cb((void *)ctx->cb, ctx->arg);
- }
-
- ctx->put();
-}
-
-static S3Status get_obj_callback(int size, const char *buf,
- void *cb_data)
-{
- if (!cb_data)
- return S3StatusOK;
-
- struct req_context *ctx = (struct req_context *)cb_data;
-
- ctx->in_bl->append(buf, size);
-
- return S3StatusOK;
-}
-
-static int put_obj_callback(int size, char *buf,
- void *cb_data)
+void RESTDispatcher::get_obj(req_context *ctx)
{
- if (!cb_data)
- return 0;
-
- struct req_context *ctx = (struct req_context *)cb_data;
-
- int chunk = ctx->out_bl.length() - ctx->off;
- if (!chunk)
- return 0;
-
- if (chunk > size)
- chunk = size;
-
- memcpy(buf, ctx->out_bl.c_str() + ctx->off, chunk);
-
- ctx->off += chunk;
-
- return chunk;
+ S3_get_object(ctx->bucket_ctx, ctx->oid.c_str(), NULL, 0, ctx->len, ctx->ctx,
+ &get_obj_handler, ctx);
}
class RESTBencher : public ObjBencher {
string secret;
int concurrentios;
- S3ResponseHandler response_handler;
- S3GetObjectHandler get_obj_handler;
- S3PutObjectHandler put_obj_handler;
-
protected:
-
int rest_init() {
S3Status status = S3_initialize(user_agent.c_str(), S3_INIT_ALL, host.c_str());
if (status != S3StatusOK) {
return -EINVAL;
}
- response_handler.propertiesCallback = properties_callback;
- response_handler.completeCallback = complete_callback;
-
- get_obj_handler.responseHandler = response_handler;
- get_obj_handler.getObjectDataCallback = get_obj_callback;
-
- put_obj_handler.responseHandler = response_handler;
- put_obj_handler.putObjectDataCallback = put_obj_callback;
return 0;
}
ctx->get();
ctx->in_bl = pbl;
- S3_get_object(&bucket_ctx, oid.c_str(), NULL, 0, len, ctx->ctx,
- &get_obj_handler, ctx);
+ ctx->oid = oid;
+ ctx->len = len;
+ ctx->bucket_ctx = &bucket_ctx;
+ ctx->op = OP_GET_OBJ;
dispatcher->queue(ctx);
int aio_write(const std::string& oid, int slot, const bufferlist& bl, size_t len) {
struct req_context *ctx = completions[slot];
+
ctx->get();
+ ctx->bucket_ctx = &bucket_ctx;
ctx->out_bl = bl;
- S3_put_object(&bucket_ctx, oid.c_str(),
- bl.length(),
- NULL,
- ctx->ctx,
- &put_obj_handler, ctx);
+ ctx->oid = oid;
+ ctx->len = len;
+ ctx->op = OP_PUT_OBJ;
dispatcher->queue(ctx);
return 0;
}
ctx->in_bl = &bl;
ctx->get();
+ ctx->bucket_ctx = &bucket_ctx;
+ ctx->oid = oid;
+ ctx->len = len;
+ ctx->op = OP_GET_OBJ;
- S3_get_object(&bucket_ctx, oid.c_str(), NULL, 0, len, NULL,
- &get_obj_handler, ctx);
+ dispatcher->process_context(ctx);
+ ret = ctx->ret();
ctx->put();
return bl.length();
}
if (ret < 0) {
return ret;
}
- ctx->out_bl = bl;
ctx->get();
- S3_put_object(&bucket_ctx, oid.c_str(),
- bl.length(),
- NULL,
- NULL,
- &put_obj_handler, ctx);
+ ctx->out_bl = bl;
+ ctx->bucket_ctx = &bucket_ctx;
+ ctx->oid = oid;
+ ctx->op = OP_PUT_OBJ;
+
+ dispatcher->process_context(ctx);
ret = ctx->ret();
ctx->put();
return ret;
while (!ctx->complete) {
ctx->cond.Wait(ctx->lock);
}
-//cerr << __FILE__ << ":" << __LINE__ << ": ctx->put() ctx=" << (void *)ctx << std::endl;
-// ctx->put();
return 0;
}
ctx->get();
+ S3ResponseHandler response_handler;
+ response_handler.propertiesCallback = properties_callback;
+ response_handler.completeCallback = complete_callback;
+
S3_create_bucket(protocol, access_key.c_str(), secret.c_str(), NULL,
bucket.c_str(), S3CannedAclPrivate,
NULL, /* locationConstraint */