rgw/rgw_acl_s3.cc \
rgw/rgw_acl_swift.cc \
rgw/rgw_client_io.cc \
+ rgw/rgw_coroutine.cc \
rgw/rgw_fcgi.cc \
rgw/rgw_xml.cc \
rgw/rgw_usage.cc \
rgw/rgw_acl_s3.h \
rgw/rgw_acl_swift.h \
rgw/rgw_client_io.h \
+ rgw/rgw_coroutine.h \
rgw/rgw_fcgi.h \
rgw/rgw_xml.h \
rgw/rgw_basic_types.h \
rgw/rgw_rest_opstate.h \
rgw/rgw_rest_replica_log.h \
rgw/rgw_rest_config.h \
+ rgw/rgw_sync.h \
rgw/rgw_usage.h \
rgw/rgw_user.h \
rgw/rgw_bucket.h \
--- /dev/null
+
+
+#include "rgw_coroutine.h"
+
+#define dout_subsys ceph_subsys_rgw
+
+
+
+RGWCoroutinesStack::RGWCoroutinesStack(CephContext *_cct, RGWCoroutinesManager *_ops_mgr, RGWCoroutine *start) : cct(_cct), ops_mgr(_ops_mgr),
+ done_flag(false), error_flag(false), blocked_flag(false) {
+ if (start) {
+ ops.push_back(start);
+ }
+ pos = ops.begin();
+}
+
+int RGWCoroutinesStack::operate(RGWCoroutinesEnv *env)
+{
+ RGWCoroutine *op = *pos;
+ int r = op->do_operate(env);
+ if (r < 0) {
+ ldout(cct, 0) << "ERROR: op->operate() returned r=" << r << dendl;
+ }
+
+ error_flag = op->is_error();
+ blocked_flag = op->is_blocked();
+
+ if (op->is_done()) {
+ op->put();
+ r = unwind(r);
+ done_flag = (pos == ops.end());
+ return r;
+ }
+
+ /* should r ever be negative at this point? */
+ assert(r >= 0);
+
+ return 0;
+}
+
+string RGWCoroutinesStack::error_str()
+{
+ if (pos != ops.end()) {
+ return (*pos)->error_str();
+ }
+ return string();
+}
+
+int RGWCoroutinesStack::call(RGWCoroutine *next_op, int ret) {
+ ops.push_back(next_op);
+ if (pos != ops.end()) {
+ ++pos;
+ } else {
+ pos = ops.begin();
+ }
+ return ret;
+}
+
+int RGWCoroutinesStack::unwind(int retcode)
+{
+ if (pos == ops.begin()) {
+ pos = ops.end();
+ return retcode;
+ }
+
+ --pos;
+ ops.pop_back();
+ RGWCoroutine *op = *pos;
+ op->set_retcode(retcode);
+ return 0;
+}
+
+void RGWCoroutinesStack::set_blocked(bool flag)
+{
+ blocked_flag = flag;
+ if (pos != ops.end()) {
+ (*pos)->set_blocked(flag);
+ }
+}
+
+static void _aio_completion_notifier_cb(librados::completion_t cb, void *arg);
+
+static void _aio_completion_notifier_cb(librados::completion_t cb, void *arg)
+{
+ ((RGWAioCompletionNotifier *)arg)->cb();
+}
+
+RGWAioCompletionNotifier::RGWAioCompletionNotifier(RGWCompletionManager *_mgr, void *_user_data) : completion_mgr(_mgr), user_data(_user_data) {
+ c = librados::Rados::aio_create_completion((void *)this, _aio_completion_notifier_cb, NULL);
+}
+
+RGWAioCompletionNotifier *RGWCoroutinesStack::create_completion_notifier()
+{
+ return ops_mgr->create_completion_notifier(this);
+}
+
+RGWCompletionManager *RGWCoroutinesStack::get_completion_mgr()
+{
+ return ops_mgr->get_completion_mgr();
+}
+
+bool RGWCoroutinesStack::unblock_stack(RGWCoroutinesStack **s)
+{
+ if (blocking_stacks.empty()) {
+ return false;
+ }
+
+ set<RGWCoroutinesStack *>::iterator iter = blocking_stacks.begin();
+ *s = *iter;
+ blocking_stacks.erase(iter);
+ (*s)->blocked_by_stack.erase(this);
+
+ return true;
+}
+
+void RGWCoroutinesManager::report_error(RGWCoroutinesStack *op)
+{
+#warning need to have error logging infrastructure that logs on backend
+ lderr(cct) << "ERROR: failed operation: " << op->error_str() << dendl;
+}
+
+void RGWCoroutinesManager::handle_unblocked_stack(list<RGWCoroutinesStack *>& stacks, RGWCoroutinesStack *stack, int *waiting_count)
+{
+ --(*waiting_count);
+ stack->set_blocked(false);
+ if (!stack->is_done()) {
+ stacks.push_back(stack);
+ } else {
+ delete stack;
+ }
+}
+
+int RGWCoroutinesManager::run(list<RGWCoroutinesStack *>& stacks)
+{
+ int waiting_count = 0;
+ RGWCoroutinesEnv env;
+
+ env.manager = this;
+ env.stacks = &stacks;
+
+ for (list<RGWCoroutinesStack *>::iterator iter = stacks.begin(); iter != stacks.end();) {
+ RGWCoroutinesStack *stack = *iter;
+ env.stack = stack;
+ int ret = stack->operate(&env);
+ if (ret < 0) {
+ ldout(cct, 0) << "ERROR: stack->operate() returned ret=" << ret << dendl;
+ }
+
+ if (stack->is_error()) {
+ report_error(stack);
+ }
+
+ if (stack->is_blocked_by_stack()) {
+ /* do nothing, we'll re-add the stack when the blocking stack is done */
+ } else if (stack->is_blocked()) {
+ waiting_count++;
+ } else if (stack->is_done()) {
+ RGWCoroutinesStack *s;
+ while (stack->unblock_stack(&s)) {
+ if (!s->is_blocked_by_stack() && !s->is_done()) {
+ if (s->is_blocked()) {
+ waiting_count++;
+ } else {
+ stacks.push_back(s);
+ }
+ }
+ }
+ delete stack;
+ } else {
+ stacks.push_back(stack);
+ }
+
+ RGWCoroutinesStack *blocked_stack;
+ while (completion_mgr.try_get_next((void **)&blocked_stack)) {
+ handle_unblocked_stack(stacks, blocked_stack, &waiting_count);
+ }
+
+ if (waiting_count >= ops_window) {
+ int ret = completion_mgr.get_next((void **)&blocked_stack);
+ if (ret < 0) {
+ ldout(cct, 0) << "ERROR: failed to clone shard, completion_mgr.get_next() returned ret=" << ret << dendl;
+ }
+ handle_unblocked_stack(stacks, blocked_stack, &waiting_count);
+ }
+
+ ++iter;
+ stacks.pop_front();
+ while (iter == stacks.end() && waiting_count > 0) {
+ int ret = completion_mgr.get_next((void **)&blocked_stack);
+ if (ret < 0) {
+ ldout(cct, 0) << "ERROR: failed to clone shard, completion_mgr.get_next() returned ret=" << ret << dendl;
+ }
+ handle_unblocked_stack(stacks, blocked_stack, &waiting_count);
+ iter = stacks.begin();
+ }
+ }
+
+ return 0;
+}
+
+int RGWCoroutinesManager::run(RGWCoroutine *op)
+{
+ list<RGWCoroutinesStack *> stacks;
+ RGWCoroutinesStack *stack = allocate_stack();
+ op->get();
+ int r = stack->call(op);
+ if (r < 0) {
+ ldout(cct, 0) << "ERROR: stack->call() returned r=" << r << dendl;
+ return r;
+ }
+
+ stacks.push_back(stack);
+
+ r = run(stacks);
+ if (r < 0) {
+ ldout(cct, 0) << "ERROR: run(stacks) returned r=" << r << dendl;
+ }
+
+ r = op->get_ret_status();
+ op->put();
+
+ return r;
+}
+
+RGWAioCompletionNotifier *RGWCoroutinesManager::create_completion_notifier(RGWCoroutinesStack *stack)
+{
+ return new RGWAioCompletionNotifier(&completion_mgr, (void *)stack);
+}
+
+void RGWCoroutine::call(RGWCoroutine *op)
+{
+ int r = env->stack->call(op, 0);
+ assert(r == 0);
+}
+
+void RGWCoroutine::spawn(RGWCoroutine *op)
+{
+ RGWCoroutinesStack *stack = env->manager->allocate_stack();
+
+ int r = stack->call(op, 0);
+ assert(r == 0);
+
+ env->stacks->push_back(stack);
+
+ env->stack->set_blocked_by(stack);
+}
+
+int RGWSimpleCoroutine::operate()
+{
+ switch (state) {
+ case Init:
+ ldout(cct, 20) << __func__ << ": init request" << dendl;
+ return state_init();
+ case SendRequest:
+ ldout(cct, 20) << __func__ << ": send request" << dendl;
+ return state_send_request();
+ case RequestComplete:
+ ldout(cct, 20) << __func__ << ": request complete" << dendl;
+ return state_request_complete();
+ case AllComplete:
+ ldout(cct, 20) << __func__ << ": all complete" << dendl;
+ return state_all_complete();
+ case Done:
+ ldout(cct, 20) << __func__ << ": done" << dendl;
+ break;
+ case Error:
+ ldout(cct, 20) << __func__ << ": error" << dendl;
+ break;
+ }
+
+ return 0;
+}
+
+int RGWSimpleCoroutine::state_init()
+{
+ int ret = init();
+ if (ret < 0) {
+ return set_state(Error, ret);
+ }
+ return set_state(SendRequest);
+}
+
+int RGWSimpleCoroutine::state_send_request()
+{
+ int ret = send_request();
+ if (ret < 0) {
+ return set_state(Error, ret);
+ }
+ return yield(set_state(RequestComplete));
+}
+
+int RGWSimpleCoroutine::state_request_complete()
+{
+ int ret = request_complete();
+ if (ret < 0) {
+ return set_state(Error, ret);
+ }
+ return set_state(AllComplete);
+}
+
+int RGWSimpleCoroutine::state_all_complete()
+{
+ int ret = finish();
+ if (ret < 0) {
+ return set_state(Error, ret);
+ }
+ return set_state(Done);
+}
+
+
--- /dev/null
+#ifndef CEPH_RGW_COROUTINE_H
+#define CEPH_RGW_COROUTINE_H
+
+#include "rgw_http_client.h"
+
+#include "common/RefCountedObj.h"
+
+
+
+#define RGW_ASYNC_OPS_MGR_WINDOW 16
+
+class RGWCoroutinesStack;
+class RGWCoroutinesManager;
+
+/* a single use librados aio completion notifier that hooks into the RGWCompletionManager */
+class RGWAioCompletionNotifier : public RefCountedObject {
+ librados::AioCompletion *c;
+ RGWCompletionManager *completion_mgr;
+ void *user_data;
+
+public:
+ RGWAioCompletionNotifier(RGWCompletionManager *_mgr, void *_user_data);
+ ~RGWAioCompletionNotifier() {
+ c->release();
+ }
+
+ librados::AioCompletion *completion() {
+ return c;
+ }
+
+ void cb() {
+ completion_mgr->complete(user_data);
+ put();
+ }
+};
+
+
+struct RGWCoroutinesEnv {
+ RGWCoroutinesManager *manager;
+ list<RGWCoroutinesStack *> *stacks;
+ RGWCoroutinesStack *stack;
+
+ RGWCoroutinesEnv() : manager(NULL), stacks(NULL), stack(NULL) {}
+};
+
+class RGWCoroutine : public RefCountedObject {
+ friend class RGWCoroutinesStack;
+protected:
+ RGWCoroutinesEnv *env;
+ bool blocked;
+ int retcode;
+
+ stringstream error_stream;
+
+ void set_blocked(bool flag) { blocked = flag; }
+ int yield(int ret) {
+ set_blocked(true);
+ return ret;
+ }
+
+ int do_operate(RGWCoroutinesEnv *_env) {
+ env = _env;
+ return operate();
+ }
+
+ void call(RGWCoroutine *op);
+ void spawn(RGWCoroutine *op);
+
+public:
+ RGWCoroutine() : env(NULL), blocked(false), retcode(0) {}
+ virtual ~RGWCoroutine() {}
+
+ virtual int operate() = 0;
+
+ virtual bool is_done() = 0;
+ virtual bool is_error() = 0;
+
+ stringstream& log_error() { return error_stream; }
+ string error_str() {
+ return error_stream.str();
+ }
+
+ bool is_blocked() { return blocked; }
+
+ void set_retcode(int r) {
+ retcode = r;
+ }
+
+ int get_ret_status() {
+ return retcode;
+ }
+};
+
+class RGWCoroutinesStack {
+ CephContext *cct;
+
+ RGWCoroutinesManager *ops_mgr;
+
+ list<RGWCoroutine *> ops;
+ list<RGWCoroutine *>::iterator pos;
+
+ set<RGWCoroutinesStack *> blocked_by_stack;
+ set<RGWCoroutinesStack *> blocking_stacks;
+
+
+ bool done_flag;
+ bool error_flag;
+ bool blocked_flag;
+
+public:
+ RGWCoroutinesStack(CephContext *_cct, RGWCoroutinesManager *_ops_mgr, RGWCoroutine *start = NULL);
+
+ int operate(RGWCoroutinesEnv *env);
+
+ bool is_done() {
+ return done_flag;
+ }
+ bool is_error() {
+ return error_flag;
+ }
+ bool is_blocked_by_stack() {
+ return !blocked_by_stack.empty();
+ }
+ bool is_blocked() {
+ return blocked_flag || is_blocked_by_stack();
+ }
+
+ void set_blocked(bool flag);
+
+ string error_str();
+
+ int call(RGWCoroutine *next_op, int ret = 0);
+ int unwind(int retcode);
+
+ RGWAioCompletionNotifier *create_completion_notifier();
+ RGWCompletionManager *get_completion_mgr();
+
+ void set_blocked_by(RGWCoroutinesStack *s) {
+ blocked_by_stack.insert(s);
+ s->blocking_stacks.insert(this);
+ }
+
+ bool unblock_stack(RGWCoroutinesStack **s);
+};
+
+class RGWCoroutinesManager {
+ CephContext *cct;
+
+ void handle_unblocked_stack(list<RGWCoroutinesStack *>& stacks, RGWCoroutinesStack *stack, int *waiting_count);
+protected:
+ RGWCompletionManager completion_mgr;
+
+ int ops_window;
+
+ void put_completion_notifier(RGWAioCompletionNotifier *cn);
+public:
+ RGWCoroutinesManager(CephContext *_cct) : cct(_cct), ops_window(RGW_ASYNC_OPS_MGR_WINDOW) {}
+ virtual ~RGWCoroutinesManager() {}
+
+ int run(list<RGWCoroutinesStack *>& ops);
+ int run(RGWCoroutine *op);
+
+ virtual void report_error(RGWCoroutinesStack *op);
+
+ RGWAioCompletionNotifier *create_completion_notifier(RGWCoroutinesStack *stack);
+ RGWCompletionManager *get_completion_mgr() { return &completion_mgr; }
+
+ RGWCoroutinesStack *allocate_stack() {
+ return new RGWCoroutinesStack(cct, this);
+ }
+};
+
+class RGWSimpleCoroutine : public RGWCoroutine {
+ enum State {
+ Init = 0,
+ SendRequest = 1,
+ RequestComplete = 2,
+ AllComplete = 3,
+ Done = 100,
+ Error = 200,
+ } state;
+
+ int set_state(State s, int ret = 0) {
+ state = s;
+ return ret;
+ }
+ int operate();
+
+ int state_init();
+ int state_send_request();
+ int state_request_complete();
+ int state_all_complete();
+
+protected:
+ CephContext *cct;
+
+public:
+ RGWSimpleCoroutine(CephContext *_cct) : state(Init), cct(_cct) {}
+
+ virtual int init() { return 0; }
+ virtual int send_request() = 0;
+ virtual int request_complete() = 0;
+ virtual int finish() { return 0; }
+
+ bool is_done() { return (state == Done || state == Error); }
+ bool is_error() { return (state == Error); }
+};
+
+#endif
JSONDecoder::decode_json("entries", entries, obj);
};
-static void _aio_completion_notifier_cb(librados::completion_t cb, void *arg);
-
-/* a single use librados aio completion notifier that hooks into the RGWCompletionManager */
-class AioCompletionNotifier : public RefCountedObject {
- librados::AioCompletion *c;
- RGWCompletionManager *completion_mgr;
- void *user_data;
-
-public:
- AioCompletionNotifier(RGWCompletionManager *_mgr, void *_user_data) : completion_mgr(_mgr), user_data(_user_data) {
- c = librados::Rados::aio_create_completion((void *)this, _aio_completion_notifier_cb, NULL);
- }
-
- ~AioCompletionNotifier() {
- c->release();
- }
-
- librados::AioCompletion *completion() {
- return c;
- }
-
- void cb() {
- completion_mgr->complete(user_data);
- put();
- }
-};
-
-static void _aio_completion_notifier_cb(librados::completion_t cb, void *arg)
-{
- ((AioCompletionNotifier *)arg)->cb();
-}
-
class RGWAsyncRadosRequest {
- AioCompletionNotifier *notifier;
+ RGWAioCompletionNotifier *notifier;
void *user_info;
int retcode;
protected:
virtual int _send_request() = 0;
public:
- RGWAsyncRadosRequest(AioCompletionNotifier *_cn) : notifier(_cn) {}
+ RGWAsyncRadosRequest(RGWAioCompletionNotifier *_cn) : notifier(_cn) {}
virtual ~RGWAsyncRadosRequest() {}
void send_request() {
return store->get_system_obj(*obj_ctx, read_state, objv_tracker, obj, *pbl, ofs, end, NULL);
}
public:
- RGWAsyncGetSystemObj(AioCompletionNotifier *cn, RGWRados *_store, RGWObjectCtx *_obj_ctx,
+ RGWAsyncGetSystemObj(RGWAioCompletionNotifier *cn, RGWRados *_store, RGWObjectCtx *_obj_ctx,
RGWObjVersionTracker *_objv_tracker, rgw_obj& _obj,
bufferlist *_pbl, off_t _ofs, off_t _end) : RGWAsyncRadosRequest(cn), store(_store), obj_ctx(_obj_ctx),
objv_tracker(_objv_tracker), obj(_obj), pbl(_pbl),
}
};
+class RGWAsyncPutSystemObj : public RGWAsyncRadosRequest {
+ RGWRados *store;
+ RGWObjVersionTracker *objv_tracker;
+ rgw_obj obj;
+ bool exclusive;
+ bufferlist bl;
+ map<string, bufferlist> attrs;
+ time_t mtime;
+
+protected:
+ int _send_request() {
+ return store->put_system_obj(NULL, obj, bl.c_str(), bl.length(), exclusive,
+ NULL, attrs, objv_tracker, mtime);
+ }
+public:
+ RGWAsyncPutSystemObj(RGWAioCompletionNotifier *cn, RGWRados *_store,
+ RGWObjVersionTracker *_objv_tracker, rgw_obj& _obj, bool _exclusive,
+ bufferlist& _bl, time_t _mtime = 0) : RGWAsyncRadosRequest(cn), store(_store),
+ objv_tracker(_objv_tracker), obj(_obj), exclusive(_exclusive),
+ bl(_bl), mtime(_mtime) {}
+};
+
+
class RGWAsyncRadosProcessor {
return 0;
}
-void RGWCoroutine::call(RGWCoroutine *op)
-{
- int r = env->stack->call(op, 0);
- assert(r == 0);
-}
-
-void RGWCoroutine::spawn(RGWCoroutine *op)
-{
- RGWCoroutinesStack *stack = env->manager->allocate_stack();
-
- int r = stack->call(op, 0);
- assert(r == 0);
-
- env->stacks->push_back(stack);
-
- env->stack->set_blocked_by(stack);
-}
-
-class RGWSimpleCoroutine : public RGWCoroutine {
- enum State {
- Init = 0,
- SendRequest = 1,
- RequestComplete = 2,
- AllComplete = 3,
- Done = 100,
- Error = 200,
- } state;
-
- int set_state(State s, int ret = 0) {
- state = s;
- return ret;
- }
- int operate();
-
- int state_init();
- int state_send_request();
- int state_request_complete();
- int state_all_complete();
-
-protected:
- CephContext *cct;
-
-public:
- RGWSimpleCoroutine(CephContext *_cct) : state(Init), cct(_cct) {}
-
- virtual int init() { return 0; }
- virtual int send_request() = 0;
- virtual int request_complete() = 0;
- virtual int finish() { return 0; }
-
- bool is_done() { return (state == Done || state == Error); }
- bool is_error() { return (state == Error); }
-};
-
-int RGWSimpleCoroutine::operate()
-{
- switch (state) {
- case Init:
- ldout(cct, 20) << __func__ << ": init request" << dendl;
- return state_init();
- case SendRequest:
- ldout(cct, 20) << __func__ << ": send request" << dendl;
- return state_send_request();
- case RequestComplete:
- ldout(cct, 20) << __func__ << ": request complete" << dendl;
- return state_request_complete();
- case AllComplete:
- ldout(cct, 20) << __func__ << ": all complete" << dendl;
- return state_all_complete();
- case Done:
- ldout(cct, 20) << __func__ << ": done" << dendl;
- break;
- case Error:
- ldout(cct, 20) << __func__ << ": error" << dendl;
- break;
- }
-
- return 0;
-}
-
-int RGWSimpleCoroutine::state_init()
-{
- int ret = init();
- if (ret < 0) {
- return set_state(Error, ret);
- }
- return set_state(SendRequest);
-}
-
-int RGWSimpleCoroutine::state_send_request()
-{
- int ret = send_request();
- if (ret < 0) {
- return set_state(Error, ret);
- }
- return yield(set_state(RequestComplete));
-}
-
-int RGWSimpleCoroutine::state_request_complete()
-{
- int ret = request_complete();
- if (ret < 0) {
- return set_state(Error, ret);
- }
- return set_state(AllComplete);
-}
-
-int RGWSimpleCoroutine::state_all_complete()
-{
- int ret = finish();
- if (ret < 0) {
- return set_state(Error, ret);
- }
- return set_state(Done);
-}
-
template <class T>
-class RGWSimpleRadosCoroutine : public RGWSimpleCoroutine {
+class RGWSimpleRadosReadCR : public RGWSimpleCoroutine {
RGWAsyncRadosProcessor *async_rados;
RGWRados *store;
RGWObjectCtx& obj_ctx;
RGWAsyncGetSystemObj *req;
public:
- RGWSimpleRadosCoroutine(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
+ RGWSimpleRadosReadCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
RGWObjectCtx& _obj_ctx,
rgw_bucket& _pool, const string& _oid,
T *_result) : RGWSimpleCoroutine(_store->ctx()),
result(_result),
req(NULL) { }
- ~RGWSimpleRadosCoroutine() {
+ ~RGWSimpleRadosReadCR() {
delete req;
}
};
template <class T>
-int RGWSimpleRadosCoroutine<T>::send_request()
+int RGWSimpleRadosReadCR<T>::send_request()
{
rgw_obj obj = rgw_obj(pool, oid);
req = new RGWAsyncGetSystemObj(env->stack->create_completion_notifier(),
}
template <class T>
-int RGWSimpleRadosCoroutine<T>::request_complete()
+int RGWSimpleRadosReadCR<T>::request_complete()
{
int ret = req->get_ret_status();
retcode = ret;
return handle_data(*result);
}
-class RGWReadSyncStatusCoroutine : public RGWSimpleRadosCoroutine<RGWMetaSyncGlobalStatus> {
+class RGWReadSyncStatusCoroutine : public RGWSimpleRadosReadCR<RGWMetaSyncGlobalStatus> {
RGWAsyncRadosProcessor *async_rados;
RGWRados *store;
RGWObjectCtx& obj_ctx;
public:
RGWReadSyncStatusCoroutine(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
RGWObjectCtx& _obj_ctx,
- RGWMetaSyncGlobalStatus *_gs) : RGWSimpleRadosCoroutine(_async_rados, _store, _obj_ctx,
+ RGWMetaSyncGlobalStatus *_gs) : RGWSimpleRadosReadCR(_async_rados, _store, _obj_ctx,
_store->get_zone_params().log_pool,
"mdlog.state.global",
_gs),
if (retcode == -ENOENT) {
return retcode;
}
- spawn(new RGWSimpleRadosCoroutine<rgw_sync_marker>(async_rados, store, obj_ctx, store->get_zone_params().log_pool,
+ spawn(new RGWSimpleRadosReadCR<rgw_sync_marker>(async_rados, store, obj_ctx, store->get_zone_params().log_pool,
"mdlog.state.0", &sync_marker));
- spawn(new RGWSimpleRadosCoroutine<rgw_sync_marker>(async_rados, store, obj_ctx, store->get_zone_params().log_pool,
+ spawn(new RGWSimpleRadosReadCR<rgw_sync_marker>(async_rados, store, obj_ctx, store->get_zone_params().log_pool,
"mdlog.state.1", &sync_marker));
return 0;
}
RGWRESTReadResource *http_op;
- AioCompletionNotifier *md_op_notifier;
+ RGWAioCompletionNotifier *md_op_notifier;
int req_ret;
RGWMetadataLogInfo shard_info;
bool is_error() { return (state == Error); }
};
-RGWCoroutinesStack::RGWCoroutinesStack(CephContext *_cct, RGWCoroutinesManager *_ops_mgr, RGWCoroutine *start) : cct(_cct), ops_mgr(_ops_mgr),
- done_flag(false), error_flag(false), blocked_flag(false) {
- if (start) {
- ops.push_back(start);
- }
- pos = ops.begin();
-}
-
-int RGWCoroutinesStack::operate(RGWCoroutinesEnv *env)
-{
- RGWCoroutine *op = *pos;
- int r = op->do_operate(env);
- if (r < 0) {
- ldout(cct, 0) << "ERROR: op->operate() returned r=" << r << dendl;
- }
-
- error_flag = op->is_error();
- blocked_flag = op->is_blocked();
-
- if (op->is_done()) {
- op->put();
- r = unwind(r);
- done_flag = (pos == ops.end());
- return r;
- }
-
- /* should r ever be negative at this point? */
- assert(r >= 0);
-
- return 0;
-}
-
-string RGWCoroutinesStack::error_str()
-{
- if (pos != ops.end()) {
- return (*pos)->error_str();
- }
- return string();
-}
-
-int RGWCoroutinesStack::call(RGWCoroutine *next_op, int ret) {
- ops.push_back(next_op);
- if (pos != ops.end()) {
- ++pos;
- } else {
- pos = ops.begin();
- }
- return ret;
-}
-
-int RGWCoroutinesStack::unwind(int retcode)
-{
- if (pos == ops.begin()) {
- pos = ops.end();
- return retcode;
- }
-
- --pos;
- ops.pop_back();
- RGWCoroutine *op = *pos;
- op->set_retcode(retcode);
- return 0;
-}
-
-void RGWCoroutinesStack::set_blocked(bool flag)
-{
- blocked_flag = flag;
- if (pos != ops.end()) {
- (*pos)->set_blocked(flag);
- }
-}
-
-AioCompletionNotifier *RGWCoroutinesStack::create_completion_notifier()
-{
- return ops_mgr->create_completion_notifier(this);
-}
-
-RGWCompletionManager *RGWCoroutinesStack::get_completion_mgr()
-{
- return ops_mgr->get_completion_mgr();
-}
-
-bool RGWCoroutinesStack::unblock_stack(RGWCoroutinesStack **s)
-{
- if (blocking_stacks.empty()) {
- return false;
- }
-
- set<RGWCoroutinesStack *>::iterator iter = blocking_stacks.begin();
- *s = *iter;
- blocking_stacks.erase(iter);
- (*s)->blocked_by_stack.erase(this);
-
- return true;
-}
-
-void RGWCoroutinesManager::report_error(RGWCoroutinesStack *op)
-{
-#warning need to have error logging infrastructure that logs on backend
- lderr(cct) << "ERROR: failed operation: " << op->error_str() << dendl;
-}
-
-void RGWCoroutinesManager::handle_unblocked_stack(list<RGWCoroutinesStack *>& stacks, RGWCoroutinesStack *stack, int *waiting_count)
-{
- --(*waiting_count);
- stack->set_blocked(false);
- if (!stack->is_done()) {
- stacks.push_back(stack);
- } else {
- delete stack;
- }
-}
-
-int RGWCoroutinesManager::run(list<RGWCoroutinesStack *>& stacks)
-{
- int waiting_count = 0;
- RGWCoroutinesEnv env;
-
- env.manager = this;
- env.stacks = &stacks;
-
- for (list<RGWCoroutinesStack *>::iterator iter = stacks.begin(); iter != stacks.end();) {
- RGWCoroutinesStack *stack = *iter;
- env.stack = stack;
- int ret = stack->operate(&env);
- if (ret < 0) {
- ldout(cct, 0) << "ERROR: stack->operate() returned ret=" << ret << dendl;
- }
-
- if (stack->is_error()) {
- report_error(stack);
- }
-
- if (stack->is_blocked_by_stack()) {
- /* do nothing, we'll re-add the stack when the blocking stack is done */
- } else if (stack->is_blocked()) {
- waiting_count++;
- } else if (stack->is_done()) {
- RGWCoroutinesStack *s;
- while (stack->unblock_stack(&s)) {
- if (!s->is_blocked_by_stack() && !s->is_done()) {
- if (s->is_blocked()) {
- waiting_count++;
- } else {
- stacks.push_back(s);
- }
- }
- }
- delete stack;
- } else {
- stacks.push_back(stack);
- }
-
- RGWCoroutinesStack *blocked_stack;
- while (completion_mgr.try_get_next((void **)&blocked_stack)) {
- handle_unblocked_stack(stacks, blocked_stack, &waiting_count);
- }
-
- if (waiting_count >= ops_window) {
- int ret = completion_mgr.get_next((void **)&blocked_stack);
- if (ret < 0) {
- ldout(cct, 0) << "ERROR: failed to clone shard, completion_mgr.get_next() returned ret=" << ret << dendl;
- }
- handle_unblocked_stack(stacks, blocked_stack, &waiting_count);
- }
-
- ++iter;
- stacks.pop_front();
- while (iter == stacks.end() && waiting_count > 0) {
- int ret = completion_mgr.get_next((void **)&blocked_stack);
- if (ret < 0) {
- ldout(cct, 0) << "ERROR: failed to clone shard, completion_mgr.get_next() returned ret=" << ret << dendl;
- }
- handle_unblocked_stack(stacks, blocked_stack, &waiting_count);
- iter = stacks.begin();
- }
- }
-
- return 0;
-}
-
-int RGWCoroutinesManager::run(RGWCoroutine *op)
-{
- list<RGWCoroutinesStack *> stacks;
- RGWCoroutinesStack *stack = allocate_stack();
- op->get();
- int r = stack->call(op);
- if (r < 0) {
- ldout(cct, 0) << "ERROR: stack->call() returned r=" << r << dendl;
- return r;
- }
-
- stacks.push_back(stack);
-
- r = run(stacks);
- if (r < 0) {
- ldout(cct, 0) << "ERROR: run(stacks) returned r=" << r << dendl;
- }
-
- r = op->get_ret_status();
- op->put();
-
- return r;
-}
-
-AioCompletionNotifier *RGWCoroutinesManager::create_completion_notifier(RGWCoroutinesStack *stack)
-{
- return new AioCompletionNotifier(&completion_mgr, (void *)stack);
-}
-
int RGWRemoteMetaLog::clone_shards()
{
list<RGWCoroutinesStack *> stacks;
marker = entry.id;
}
- AioCompletionNotifier *cn = env->stack->create_completion_notifier();
+ RGWAioCompletionNotifier *cn = env->stack->create_completion_notifier();
int ret = store->meta_mgr->store_md_log_entries(dest_entries, shard_id, cn->completion());
if (ret < 0) {
#ifndef CEPH_RGW_SYNC_H
#define CEPH_RGW_SYNC_H
-#include "rgw_common.h"
-#include "rgw_rados.h"
-#include "rgw_metadata.h"
-#include "rgw_http_client.h"
+#include "rgw_coroutine.h"
#include "common/RWLock.h"
-#include "common/RefCountedObj.h"
-
-
-#define dout_subsys ceph_subsys_rgw
struct rgw_mdlog_info {
void decode_json(JSONObj *obj);
};
-#define RGW_ASYNC_OPS_MGR_WINDOW 16
-
-class RGWCoroutinesStack;
-class RGWCoroutinesManager;
-class AioCompletionNotifier;
-
-struct RGWCoroutinesEnv {
- RGWCoroutinesManager *manager;
- list<RGWCoroutinesStack *> *stacks;
- RGWCoroutinesStack *stack;
-
- RGWCoroutinesEnv() : manager(NULL), stacks(NULL), stack(NULL) {}
-};
-
-class RGWCoroutine : public RefCountedObject {
- friend class RGWCoroutinesStack;
-protected:
- RGWCoroutinesEnv *env;
- bool blocked;
- int retcode;
-
- stringstream error_stream;
-
- void set_blocked(bool flag) { blocked = flag; }
- int yield(int ret) {
- set_blocked(true);
- return ret;
- }
-
- int do_operate(RGWCoroutinesEnv *_env) {
- env = _env;
- return operate();
- }
-
- void call(RGWCoroutine *op);
- void spawn(RGWCoroutine *op);
-
-public:
- RGWCoroutine() : env(NULL), blocked(false), retcode(0) {}
- virtual ~RGWCoroutine() {}
-
- virtual int operate() = 0;
-
- virtual bool is_done() = 0;
- virtual bool is_error() = 0;
-
- stringstream& log_error() { return error_stream; }
- string error_str() {
- return error_stream.str();
- }
-
- bool is_blocked() { return blocked; }
-
- void set_retcode(int r) {
- retcode = r;
- }
-
- int get_ret_status() {
- return retcode;
- }
-};
-
-class RGWCoroutinesStack {
- CephContext *cct;
-
- RGWCoroutinesManager *ops_mgr;
-
- list<RGWCoroutine *> ops;
- list<RGWCoroutine *>::iterator pos;
-
- set<RGWCoroutinesStack *> blocked_by_stack;
- set<RGWCoroutinesStack *> blocking_stacks;
-
-
- bool done_flag;
- bool error_flag;
- bool blocked_flag;
-
-public:
- RGWCoroutinesStack(CephContext *_cct, RGWCoroutinesManager *_ops_mgr, RGWCoroutine *start = NULL);
-
- int operate(RGWCoroutinesEnv *env);
-
- bool is_done() {
- return done_flag;
- }
- bool is_error() {
- return error_flag;
- }
- bool is_blocked_by_stack() {
- return !blocked_by_stack.empty();
- }
- bool is_blocked() {
- return blocked_flag || is_blocked_by_stack();
- }
-
- void set_blocked(bool flag);
-
- string error_str();
-
- int call(RGWCoroutine *next_op, int ret = 0);
- int unwind(int retcode);
-
- AioCompletionNotifier *create_completion_notifier();
- RGWCompletionManager *get_completion_mgr();
-
- void set_blocked_by(RGWCoroutinesStack *s) {
- blocked_by_stack.insert(s);
- s->blocking_stacks.insert(this);
- }
-
- bool unblock_stack(RGWCoroutinesStack **s);
-};
-
-class RGWCoroutinesManager {
- CephContext *cct;
-
- void handle_unblocked_stack(list<RGWCoroutinesStack *>& stacks, RGWCoroutinesStack *stack, int *waiting_count);
-protected:
- RGWCompletionManager completion_mgr;
-
- int ops_window;
-
- void put_completion_notifier(AioCompletionNotifier *cn);
-public:
- RGWCoroutinesManager(CephContext *_cct) : cct(_cct), ops_window(RGW_ASYNC_OPS_MGR_WINDOW) {}
- virtual ~RGWCoroutinesManager() {}
-
- int run(list<RGWCoroutinesStack *>& ops);
- int run(RGWCoroutine *op);
-
- virtual void report_error(RGWCoroutinesStack *op);
-
- AioCompletionNotifier *create_completion_notifier(RGWCoroutinesStack *stack);
- RGWCompletionManager *get_completion_mgr() { return &completion_mgr; }
-
- RGWCoroutinesStack *allocate_stack() {
- return new RGWCoroutinesStack(cct, this);
- }
-};
-
struct RGWMetaSyncGlobalStatus {
enum SyncState {
StateInit = 0,