From 3b07e1a47366f1e8a05234c888cbc772e35023ea Mon Sep 17 00:00:00 2001 From: Yehuda Sadeh Date: Fri, 7 Aug 2015 14:17:33 -0700 Subject: [PATCH] rgw: move coroutines code move the generic code to rgw_coroutine.{h,cc} Signed-off-by: Yehuda Sadeh --- src/rgw/Makefile.am | 3 + src/rgw/rgw_coroutine.cc | 310 +++++++++++++++++++++++++++++ src/rgw/rgw_coroutine.h | 209 ++++++++++++++++++++ src/rgw/rgw_sync.cc | 409 ++++----------------------------------- src/rgw/rgw_sync.h | 150 +------------- 5 files changed, 560 insertions(+), 521 deletions(-) create mode 100644 src/rgw/rgw_coroutine.cc create mode 100644 src/rgw/rgw_coroutine.h diff --git a/src/rgw/Makefile.am b/src/rgw/Makefile.am index a848d2b286540..1423a10884564 100644 --- a/src/rgw/Makefile.am +++ b/src/rgw/Makefile.am @@ -18,6 +18,7 @@ librgw_la_SOURCES = \ rgw/rgw_acl_s3.cc \ rgw/rgw_acl_swift.cc \ rgw/rgw_client_io.cc \ + rgw/rgw_coroutine.cc \ rgw/rgw_fcgi.cc \ rgw/rgw_xml.cc \ rgw/rgw_usage.cc \ @@ -132,6 +133,7 @@ noinst_HEADERS += \ rgw/rgw_acl_s3.h \ rgw/rgw_acl_swift.h \ rgw/rgw_client_io.h \ + rgw/rgw_coroutine.h \ rgw/rgw_fcgi.h \ rgw/rgw_xml.h \ rgw/rgw_basic_types.h \ @@ -176,6 +178,7 @@ noinst_HEADERS += \ rgw/rgw_rest_opstate.h \ rgw/rgw_rest_replica_log.h \ rgw/rgw_rest_config.h \ + rgw/rgw_sync.h \ rgw/rgw_usage.h \ rgw/rgw_user.h \ rgw/rgw_bucket.h \ diff --git a/src/rgw/rgw_coroutine.cc b/src/rgw/rgw_coroutine.cc new file mode 100644 index 0000000000000..c94501ce182b5 --- /dev/null +++ b/src/rgw/rgw_coroutine.cc @@ -0,0 +1,310 @@ + + +#include "rgw_coroutine.h" + +#define dout_subsys ceph_subsys_rgw + + + +RGWCoroutinesStack::RGWCoroutinesStack(CephContext *_cct, RGWCoroutinesManager *_ops_mgr, RGWCoroutine *start) : cct(_cct), ops_mgr(_ops_mgr), + done_flag(false), error_flag(false), blocked_flag(false) { + if (start) { + ops.push_back(start); + } + pos = ops.begin(); +} + +int RGWCoroutinesStack::operate(RGWCoroutinesEnv *env) +{ + RGWCoroutine *op = *pos; + int r = op->do_operate(env); + if (r < 0) { + ldout(cct, 0) << "ERROR: op->operate() returned r=" << r << dendl; + } + + error_flag = op->is_error(); + blocked_flag = op->is_blocked(); + + if (op->is_done()) { + op->put(); + r = unwind(r); + done_flag = (pos == ops.end()); + return r; + } + + /* should r ever be negative at this point? */ + assert(r >= 0); + + return 0; +} + +string RGWCoroutinesStack::error_str() +{ + if (pos != ops.end()) { + return (*pos)->error_str(); + } + return string(); +} + +int RGWCoroutinesStack::call(RGWCoroutine *next_op, int ret) { + ops.push_back(next_op); + if (pos != ops.end()) { + ++pos; + } else { + pos = ops.begin(); + } + return ret; +} + +int RGWCoroutinesStack::unwind(int retcode) +{ + if (pos == ops.begin()) { + pos = ops.end(); + return retcode; + } + + --pos; + ops.pop_back(); + RGWCoroutine *op = *pos; + op->set_retcode(retcode); + return 0; +} + +void RGWCoroutinesStack::set_blocked(bool flag) +{ + blocked_flag = flag; + if (pos != ops.end()) { + (*pos)->set_blocked(flag); + } +} + +static void _aio_completion_notifier_cb(librados::completion_t cb, void *arg); + +static void _aio_completion_notifier_cb(librados::completion_t cb, void *arg) +{ + ((RGWAioCompletionNotifier *)arg)->cb(); +} + +RGWAioCompletionNotifier::RGWAioCompletionNotifier(RGWCompletionManager *_mgr, void *_user_data) : completion_mgr(_mgr), user_data(_user_data) { + c = librados::Rados::aio_create_completion((void *)this, _aio_completion_notifier_cb, NULL); +} + +RGWAioCompletionNotifier *RGWCoroutinesStack::create_completion_notifier() +{ + return ops_mgr->create_completion_notifier(this); +} + +RGWCompletionManager *RGWCoroutinesStack::get_completion_mgr() +{ + return ops_mgr->get_completion_mgr(); +} + +bool RGWCoroutinesStack::unblock_stack(RGWCoroutinesStack **s) +{ + if (blocking_stacks.empty()) { + return false; + } + + set::iterator iter = blocking_stacks.begin(); + *s = *iter; + blocking_stacks.erase(iter); + (*s)->blocked_by_stack.erase(this); + + return true; +} + +void RGWCoroutinesManager::report_error(RGWCoroutinesStack *op) +{ +#warning need to have error logging infrastructure that logs on backend + lderr(cct) << "ERROR: failed operation: " << op->error_str() << dendl; +} + +void RGWCoroutinesManager::handle_unblocked_stack(list& stacks, RGWCoroutinesStack *stack, int *waiting_count) +{ + --(*waiting_count); + stack->set_blocked(false); + if (!stack->is_done()) { + stacks.push_back(stack); + } else { + delete stack; + } +} + +int RGWCoroutinesManager::run(list& stacks) +{ + int waiting_count = 0; + RGWCoroutinesEnv env; + + env.manager = this; + env.stacks = &stacks; + + for (list::iterator iter = stacks.begin(); iter != stacks.end();) { + RGWCoroutinesStack *stack = *iter; + env.stack = stack; + int ret = stack->operate(&env); + if (ret < 0) { + ldout(cct, 0) << "ERROR: stack->operate() returned ret=" << ret << dendl; + } + + if (stack->is_error()) { + report_error(stack); + } + + if (stack->is_blocked_by_stack()) { + /* do nothing, we'll re-add the stack when the blocking stack is done */ + } else if (stack->is_blocked()) { + waiting_count++; + } else if (stack->is_done()) { + RGWCoroutinesStack *s; + while (stack->unblock_stack(&s)) { + if (!s->is_blocked_by_stack() && !s->is_done()) { + if (s->is_blocked()) { + waiting_count++; + } else { + stacks.push_back(s); + } + } + } + delete stack; + } else { + stacks.push_back(stack); + } + + RGWCoroutinesStack *blocked_stack; + while (completion_mgr.try_get_next((void **)&blocked_stack)) { + handle_unblocked_stack(stacks, blocked_stack, &waiting_count); + } + + if (waiting_count >= ops_window) { + int ret = completion_mgr.get_next((void **)&blocked_stack); + if (ret < 0) { + ldout(cct, 0) << "ERROR: failed to clone shard, completion_mgr.get_next() returned ret=" << ret << dendl; + } + handle_unblocked_stack(stacks, blocked_stack, &waiting_count); + } + + ++iter; + stacks.pop_front(); + while (iter == stacks.end() && waiting_count > 0) { + int ret = completion_mgr.get_next((void **)&blocked_stack); + if (ret < 0) { + ldout(cct, 0) << "ERROR: failed to clone shard, completion_mgr.get_next() returned ret=" << ret << dendl; + } + handle_unblocked_stack(stacks, blocked_stack, &waiting_count); + iter = stacks.begin(); + } + } + + return 0; +} + +int RGWCoroutinesManager::run(RGWCoroutine *op) +{ + list stacks; + RGWCoroutinesStack *stack = allocate_stack(); + op->get(); + int r = stack->call(op); + if (r < 0) { + ldout(cct, 0) << "ERROR: stack->call() returned r=" << r << dendl; + return r; + } + + stacks.push_back(stack); + + r = run(stacks); + if (r < 0) { + ldout(cct, 0) << "ERROR: run(stacks) returned r=" << r << dendl; + } + + r = op->get_ret_status(); + op->put(); + + return r; +} + +RGWAioCompletionNotifier *RGWCoroutinesManager::create_completion_notifier(RGWCoroutinesStack *stack) +{ + return new RGWAioCompletionNotifier(&completion_mgr, (void *)stack); +} + +void RGWCoroutine::call(RGWCoroutine *op) +{ + int r = env->stack->call(op, 0); + assert(r == 0); +} + +void RGWCoroutine::spawn(RGWCoroutine *op) +{ + RGWCoroutinesStack *stack = env->manager->allocate_stack(); + + int r = stack->call(op, 0); + assert(r == 0); + + env->stacks->push_back(stack); + + env->stack->set_blocked_by(stack); +} + +int RGWSimpleCoroutine::operate() +{ + switch (state) { + case Init: + ldout(cct, 20) << __func__ << ": init request" << dendl; + return state_init(); + case SendRequest: + ldout(cct, 20) << __func__ << ": send request" << dendl; + return state_send_request(); + case RequestComplete: + ldout(cct, 20) << __func__ << ": request complete" << dendl; + return state_request_complete(); + case AllComplete: + ldout(cct, 20) << __func__ << ": all complete" << dendl; + return state_all_complete(); + case Done: + ldout(cct, 20) << __func__ << ": done" << dendl; + break; + case Error: + ldout(cct, 20) << __func__ << ": error" << dendl; + break; + } + + return 0; +} + +int RGWSimpleCoroutine::state_init() +{ + int ret = init(); + if (ret < 0) { + return set_state(Error, ret); + } + return set_state(SendRequest); +} + +int RGWSimpleCoroutine::state_send_request() +{ + int ret = send_request(); + if (ret < 0) { + return set_state(Error, ret); + } + return yield(set_state(RequestComplete)); +} + +int RGWSimpleCoroutine::state_request_complete() +{ + int ret = request_complete(); + if (ret < 0) { + return set_state(Error, ret); + } + return set_state(AllComplete); +} + +int RGWSimpleCoroutine::state_all_complete() +{ + int ret = finish(); + if (ret < 0) { + return set_state(Error, ret); + } + return set_state(Done); +} + + diff --git a/src/rgw/rgw_coroutine.h b/src/rgw/rgw_coroutine.h new file mode 100644 index 0000000000000..ab5d3162a58ca --- /dev/null +++ b/src/rgw/rgw_coroutine.h @@ -0,0 +1,209 @@ +#ifndef CEPH_RGW_COROUTINE_H +#define CEPH_RGW_COROUTINE_H + +#include "rgw_http_client.h" + +#include "common/RefCountedObj.h" + + + +#define RGW_ASYNC_OPS_MGR_WINDOW 16 + +class RGWCoroutinesStack; +class RGWCoroutinesManager; + +/* a single use librados aio completion notifier that hooks into the RGWCompletionManager */ +class RGWAioCompletionNotifier : public RefCountedObject { + librados::AioCompletion *c; + RGWCompletionManager *completion_mgr; + void *user_data; + +public: + RGWAioCompletionNotifier(RGWCompletionManager *_mgr, void *_user_data); + ~RGWAioCompletionNotifier() { + c->release(); + } + + librados::AioCompletion *completion() { + return c; + } + + void cb() { + completion_mgr->complete(user_data); + put(); + } +}; + + +struct RGWCoroutinesEnv { + RGWCoroutinesManager *manager; + list *stacks; + RGWCoroutinesStack *stack; + + RGWCoroutinesEnv() : manager(NULL), stacks(NULL), stack(NULL) {} +}; + +class RGWCoroutine : public RefCountedObject { + friend class RGWCoroutinesStack; +protected: + RGWCoroutinesEnv *env; + bool blocked; + int retcode; + + stringstream error_stream; + + void set_blocked(bool flag) { blocked = flag; } + int yield(int ret) { + set_blocked(true); + return ret; + } + + int do_operate(RGWCoroutinesEnv *_env) { + env = _env; + return operate(); + } + + void call(RGWCoroutine *op); + void spawn(RGWCoroutine *op); + +public: + RGWCoroutine() : env(NULL), blocked(false), retcode(0) {} + virtual ~RGWCoroutine() {} + + virtual int operate() = 0; + + virtual bool is_done() = 0; + virtual bool is_error() = 0; + + stringstream& log_error() { return error_stream; } + string error_str() { + return error_stream.str(); + } + + bool is_blocked() { return blocked; } + + void set_retcode(int r) { + retcode = r; + } + + int get_ret_status() { + return retcode; + } +}; + +class RGWCoroutinesStack { + CephContext *cct; + + RGWCoroutinesManager *ops_mgr; + + list ops; + list::iterator pos; + + set blocked_by_stack; + set blocking_stacks; + + + bool done_flag; + bool error_flag; + bool blocked_flag; + +public: + RGWCoroutinesStack(CephContext *_cct, RGWCoroutinesManager *_ops_mgr, RGWCoroutine *start = NULL); + + int operate(RGWCoroutinesEnv *env); + + bool is_done() { + return done_flag; + } + bool is_error() { + return error_flag; + } + bool is_blocked_by_stack() { + return !blocked_by_stack.empty(); + } + bool is_blocked() { + return blocked_flag || is_blocked_by_stack(); + } + + void set_blocked(bool flag); + + string error_str(); + + int call(RGWCoroutine *next_op, int ret = 0); + int unwind(int retcode); + + RGWAioCompletionNotifier *create_completion_notifier(); + RGWCompletionManager *get_completion_mgr(); + + void set_blocked_by(RGWCoroutinesStack *s) { + blocked_by_stack.insert(s); + s->blocking_stacks.insert(this); + } + + bool unblock_stack(RGWCoroutinesStack **s); +}; + +class RGWCoroutinesManager { + CephContext *cct; + + void handle_unblocked_stack(list& stacks, RGWCoroutinesStack *stack, int *waiting_count); +protected: + RGWCompletionManager completion_mgr; + + int ops_window; + + void put_completion_notifier(RGWAioCompletionNotifier *cn); +public: + RGWCoroutinesManager(CephContext *_cct) : cct(_cct), ops_window(RGW_ASYNC_OPS_MGR_WINDOW) {} + virtual ~RGWCoroutinesManager() {} + + int run(list& ops); + int run(RGWCoroutine *op); + + virtual void report_error(RGWCoroutinesStack *op); + + RGWAioCompletionNotifier *create_completion_notifier(RGWCoroutinesStack *stack); + RGWCompletionManager *get_completion_mgr() { return &completion_mgr; } + + RGWCoroutinesStack *allocate_stack() { + return new RGWCoroutinesStack(cct, this); + } +}; + +class RGWSimpleCoroutine : public RGWCoroutine { + enum State { + Init = 0, + SendRequest = 1, + RequestComplete = 2, + AllComplete = 3, + Done = 100, + Error = 200, + } state; + + int set_state(State s, int ret = 0) { + state = s; + return ret; + } + int operate(); + + int state_init(); + int state_send_request(); + int state_request_complete(); + int state_all_complete(); + +protected: + CephContext *cct; + +public: + RGWSimpleCoroutine(CephContext *_cct) : state(Init), cct(_cct) {} + + virtual int init() { return 0; } + virtual int send_request() = 0; + virtual int request_complete() = 0; + virtual int finish() { return 0; } + + bool is_done() { return (state == Done || state == Error); } + bool is_error() { return (state == Error); } +}; + +#endif diff --git a/src/rgw/rgw_sync.cc b/src/rgw/rgw_sync.cc index 6eef00f2bb1a7..dd8ee47fb07b0 100644 --- a/src/rgw/rgw_sync.cc +++ b/src/rgw/rgw_sync.cc @@ -53,40 +53,8 @@ void rgw_mdlog_shard_data::decode_json(JSONObj *obj) { JSONDecoder::decode_json("entries", entries, obj); }; -static void _aio_completion_notifier_cb(librados::completion_t cb, void *arg); - -/* a single use librados aio completion notifier that hooks into the RGWCompletionManager */ -class AioCompletionNotifier : public RefCountedObject { - librados::AioCompletion *c; - RGWCompletionManager *completion_mgr; - void *user_data; - -public: - AioCompletionNotifier(RGWCompletionManager *_mgr, void *_user_data) : completion_mgr(_mgr), user_data(_user_data) { - c = librados::Rados::aio_create_completion((void *)this, _aio_completion_notifier_cb, NULL); - } - - ~AioCompletionNotifier() { - c->release(); - } - - librados::AioCompletion *completion() { - return c; - } - - void cb() { - completion_mgr->complete(user_data); - put(); - } -}; - -static void _aio_completion_notifier_cb(librados::completion_t cb, void *arg) -{ - ((AioCompletionNotifier *)arg)->cb(); -} - class RGWAsyncRadosRequest { - AioCompletionNotifier *notifier; + RGWAioCompletionNotifier *notifier; void *user_info; int retcode; @@ -94,7 +62,7 @@ class RGWAsyncRadosRequest { protected: virtual int _send_request() = 0; public: - RGWAsyncRadosRequest(AioCompletionNotifier *_cn) : notifier(_cn) {} + RGWAsyncRadosRequest(RGWAioCompletionNotifier *_cn) : notifier(_cn) {} virtual ~RGWAsyncRadosRequest() {} void send_request() { @@ -119,7 +87,7 @@ protected: return store->get_system_obj(*obj_ctx, read_state, objv_tracker, obj, *pbl, ofs, end, NULL); } public: - RGWAsyncGetSystemObj(AioCompletionNotifier *cn, RGWRados *_store, RGWObjectCtx *_obj_ctx, + RGWAsyncGetSystemObj(RGWAioCompletionNotifier *cn, RGWRados *_store, RGWObjectCtx *_obj_ctx, RGWObjVersionTracker *_objv_tracker, rgw_obj& _obj, bufferlist *_pbl, off_t _ofs, off_t _end) : RGWAsyncRadosRequest(cn), store(_store), obj_ctx(_obj_ctx), objv_tracker(_objv_tracker), obj(_obj), pbl(_pbl), @@ -127,6 +95,29 @@ public: } }; +class RGWAsyncPutSystemObj : public RGWAsyncRadosRequest { + RGWRados *store; + RGWObjVersionTracker *objv_tracker; + rgw_obj obj; + bool exclusive; + bufferlist bl; + map attrs; + time_t mtime; + +protected: + int _send_request() { + return store->put_system_obj(NULL, obj, bl.c_str(), bl.length(), exclusive, + NULL, attrs, objv_tracker, mtime); + } +public: + RGWAsyncPutSystemObj(RGWAioCompletionNotifier *cn, RGWRados *_store, + RGWObjVersionTracker *_objv_tracker, rgw_obj& _obj, bool _exclusive, + bufferlist& _bl, time_t _mtime = 0) : RGWAsyncRadosRequest(cn), store(_store), + objv_tracker(_objv_tracker), obj(_obj), exclusive(_exclusive), + bl(_bl), mtime(_mtime) {} +}; + + class RGWAsyncRadosProcessor { @@ -424,124 +415,8 @@ int RGWMetaSyncStatusManager::set_state(RGWMetaSyncGlobalStatus::SyncState state return 0; } -void RGWCoroutine::call(RGWCoroutine *op) -{ - int r = env->stack->call(op, 0); - assert(r == 0); -} - -void RGWCoroutine::spawn(RGWCoroutine *op) -{ - RGWCoroutinesStack *stack = env->manager->allocate_stack(); - - int r = stack->call(op, 0); - assert(r == 0); - - env->stacks->push_back(stack); - - env->stack->set_blocked_by(stack); -} - -class RGWSimpleCoroutine : public RGWCoroutine { - enum State { - Init = 0, - SendRequest = 1, - RequestComplete = 2, - AllComplete = 3, - Done = 100, - Error = 200, - } state; - - int set_state(State s, int ret = 0) { - state = s; - return ret; - } - int operate(); - - int state_init(); - int state_send_request(); - int state_request_complete(); - int state_all_complete(); - -protected: - CephContext *cct; - -public: - RGWSimpleCoroutine(CephContext *_cct) : state(Init), cct(_cct) {} - - virtual int init() { return 0; } - virtual int send_request() = 0; - virtual int request_complete() = 0; - virtual int finish() { return 0; } - - bool is_done() { return (state == Done || state == Error); } - bool is_error() { return (state == Error); } -}; - -int RGWSimpleCoroutine::operate() -{ - switch (state) { - case Init: - ldout(cct, 20) << __func__ << ": init request" << dendl; - return state_init(); - case SendRequest: - ldout(cct, 20) << __func__ << ": send request" << dendl; - return state_send_request(); - case RequestComplete: - ldout(cct, 20) << __func__ << ": request complete" << dendl; - return state_request_complete(); - case AllComplete: - ldout(cct, 20) << __func__ << ": all complete" << dendl; - return state_all_complete(); - case Done: - ldout(cct, 20) << __func__ << ": done" << dendl; - break; - case Error: - ldout(cct, 20) << __func__ << ": error" << dendl; - break; - } - - return 0; -} - -int RGWSimpleCoroutine::state_init() -{ - int ret = init(); - if (ret < 0) { - return set_state(Error, ret); - } - return set_state(SendRequest); -} - -int RGWSimpleCoroutine::state_send_request() -{ - int ret = send_request(); - if (ret < 0) { - return set_state(Error, ret); - } - return yield(set_state(RequestComplete)); -} - -int RGWSimpleCoroutine::state_request_complete() -{ - int ret = request_complete(); - if (ret < 0) { - return set_state(Error, ret); - } - return set_state(AllComplete); -} - -int RGWSimpleCoroutine::state_all_complete() -{ - int ret = finish(); - if (ret < 0) { - return set_state(Error, ret); - } - return set_state(Done); -} - template -class RGWSimpleRadosCoroutine : public RGWSimpleCoroutine { +class RGWSimpleRadosReadCR : public RGWSimpleCoroutine { RGWAsyncRadosProcessor *async_rados; RGWRados *store; RGWObjectCtx& obj_ctx; @@ -555,7 +430,7 @@ class RGWSimpleRadosCoroutine : public RGWSimpleCoroutine { RGWAsyncGetSystemObj *req; public: - RGWSimpleRadosCoroutine(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store, + RGWSimpleRadosReadCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store, RGWObjectCtx& _obj_ctx, rgw_bucket& _pool, const string& _oid, T *_result) : RGWSimpleCoroutine(_store->ctx()), @@ -565,7 +440,7 @@ public: result(_result), req(NULL) { } - ~RGWSimpleRadosCoroutine() { + ~RGWSimpleRadosReadCR() { delete req; } @@ -578,7 +453,7 @@ public: }; template -int RGWSimpleRadosCoroutine::send_request() +int RGWSimpleRadosReadCR::send_request() { rgw_obj obj = rgw_obj(pool, oid); req = new RGWAsyncGetSystemObj(env->stack->create_completion_notifier(), @@ -590,7 +465,7 @@ int RGWSimpleRadosCoroutine::send_request() } template -int RGWSimpleRadosCoroutine::request_complete() +int RGWSimpleRadosReadCR::request_complete() { int ret = req->get_ret_status(); retcode = ret; @@ -611,7 +486,7 @@ int RGWSimpleRadosCoroutine::request_complete() return handle_data(*result); } -class RGWReadSyncStatusCoroutine : public RGWSimpleRadosCoroutine { +class RGWReadSyncStatusCoroutine : public RGWSimpleRadosReadCR { RGWAsyncRadosProcessor *async_rados; RGWRados *store; RGWObjectCtx& obj_ctx; @@ -622,7 +497,7 @@ class RGWReadSyncStatusCoroutine : public RGWSimpleRadosCoroutineget_zone_params().log_pool, "mdlog.state.global", _gs), @@ -637,9 +512,9 @@ int RGWReadSyncStatusCoroutine::handle_data(RGWMetaSyncGlobalStatus& data) if (retcode == -ENOENT) { return retcode; } - spawn(new RGWSimpleRadosCoroutine(async_rados, store, obj_ctx, store->get_zone_params().log_pool, + spawn(new RGWSimpleRadosReadCR(async_rados, store, obj_ctx, store->get_zone_params().log_pool, "mdlog.state.0", &sync_marker)); - spawn(new RGWSimpleRadosCoroutine(async_rados, store, obj_ctx, store->get_zone_params().log_pool, + spawn(new RGWSimpleRadosReadCR(async_rados, store, obj_ctx, store->get_zone_params().log_pool, "mdlog.state.1", &sync_marker)); return 0; } @@ -738,7 +613,7 @@ class RGWCloneMetaLogCoroutine : public RGWCoroutine { RGWRESTReadResource *http_op; - AioCompletionNotifier *md_op_notifier; + RGWAioCompletionNotifier *md_op_notifier; int req_ret; RGWMetadataLogInfo shard_info; @@ -784,216 +659,6 @@ public: bool is_error() { return (state == Error); } }; -RGWCoroutinesStack::RGWCoroutinesStack(CephContext *_cct, RGWCoroutinesManager *_ops_mgr, RGWCoroutine *start) : cct(_cct), ops_mgr(_ops_mgr), - done_flag(false), error_flag(false), blocked_flag(false) { - if (start) { - ops.push_back(start); - } - pos = ops.begin(); -} - -int RGWCoroutinesStack::operate(RGWCoroutinesEnv *env) -{ - RGWCoroutine *op = *pos; - int r = op->do_operate(env); - if (r < 0) { - ldout(cct, 0) << "ERROR: op->operate() returned r=" << r << dendl; - } - - error_flag = op->is_error(); - blocked_flag = op->is_blocked(); - - if (op->is_done()) { - op->put(); - r = unwind(r); - done_flag = (pos == ops.end()); - return r; - } - - /* should r ever be negative at this point? */ - assert(r >= 0); - - return 0; -} - -string RGWCoroutinesStack::error_str() -{ - if (pos != ops.end()) { - return (*pos)->error_str(); - } - return string(); -} - -int RGWCoroutinesStack::call(RGWCoroutine *next_op, int ret) { - ops.push_back(next_op); - if (pos != ops.end()) { - ++pos; - } else { - pos = ops.begin(); - } - return ret; -} - -int RGWCoroutinesStack::unwind(int retcode) -{ - if (pos == ops.begin()) { - pos = ops.end(); - return retcode; - } - - --pos; - ops.pop_back(); - RGWCoroutine *op = *pos; - op->set_retcode(retcode); - return 0; -} - -void RGWCoroutinesStack::set_blocked(bool flag) -{ - blocked_flag = flag; - if (pos != ops.end()) { - (*pos)->set_blocked(flag); - } -} - -AioCompletionNotifier *RGWCoroutinesStack::create_completion_notifier() -{ - return ops_mgr->create_completion_notifier(this); -} - -RGWCompletionManager *RGWCoroutinesStack::get_completion_mgr() -{ - return ops_mgr->get_completion_mgr(); -} - -bool RGWCoroutinesStack::unblock_stack(RGWCoroutinesStack **s) -{ - if (blocking_stacks.empty()) { - return false; - } - - set::iterator iter = blocking_stacks.begin(); - *s = *iter; - blocking_stacks.erase(iter); - (*s)->blocked_by_stack.erase(this); - - return true; -} - -void RGWCoroutinesManager::report_error(RGWCoroutinesStack *op) -{ -#warning need to have error logging infrastructure that logs on backend - lderr(cct) << "ERROR: failed operation: " << op->error_str() << dendl; -} - -void RGWCoroutinesManager::handle_unblocked_stack(list& stacks, RGWCoroutinesStack *stack, int *waiting_count) -{ - --(*waiting_count); - stack->set_blocked(false); - if (!stack->is_done()) { - stacks.push_back(stack); - } else { - delete stack; - } -} - -int RGWCoroutinesManager::run(list& stacks) -{ - int waiting_count = 0; - RGWCoroutinesEnv env; - - env.manager = this; - env.stacks = &stacks; - - for (list::iterator iter = stacks.begin(); iter != stacks.end();) { - RGWCoroutinesStack *stack = *iter; - env.stack = stack; - int ret = stack->operate(&env); - if (ret < 0) { - ldout(cct, 0) << "ERROR: stack->operate() returned ret=" << ret << dendl; - } - - if (stack->is_error()) { - report_error(stack); - } - - if (stack->is_blocked_by_stack()) { - /* do nothing, we'll re-add the stack when the blocking stack is done */ - } else if (stack->is_blocked()) { - waiting_count++; - } else if (stack->is_done()) { - RGWCoroutinesStack *s; - while (stack->unblock_stack(&s)) { - if (!s->is_blocked_by_stack() && !s->is_done()) { - if (s->is_blocked()) { - waiting_count++; - } else { - stacks.push_back(s); - } - } - } - delete stack; - } else { - stacks.push_back(stack); - } - - RGWCoroutinesStack *blocked_stack; - while (completion_mgr.try_get_next((void **)&blocked_stack)) { - handle_unblocked_stack(stacks, blocked_stack, &waiting_count); - } - - if (waiting_count >= ops_window) { - int ret = completion_mgr.get_next((void **)&blocked_stack); - if (ret < 0) { - ldout(cct, 0) << "ERROR: failed to clone shard, completion_mgr.get_next() returned ret=" << ret << dendl; - } - handle_unblocked_stack(stacks, blocked_stack, &waiting_count); - } - - ++iter; - stacks.pop_front(); - while (iter == stacks.end() && waiting_count > 0) { - int ret = completion_mgr.get_next((void **)&blocked_stack); - if (ret < 0) { - ldout(cct, 0) << "ERROR: failed to clone shard, completion_mgr.get_next() returned ret=" << ret << dendl; - } - handle_unblocked_stack(stacks, blocked_stack, &waiting_count); - iter = stacks.begin(); - } - } - - return 0; -} - -int RGWCoroutinesManager::run(RGWCoroutine *op) -{ - list stacks; - RGWCoroutinesStack *stack = allocate_stack(); - op->get(); - int r = stack->call(op); - if (r < 0) { - ldout(cct, 0) << "ERROR: stack->call() returned r=" << r << dendl; - return r; - } - - stacks.push_back(stack); - - r = run(stacks); - if (r < 0) { - ldout(cct, 0) << "ERROR: run(stacks) returned r=" << r << dendl; - } - - r = op->get_ret_status(); - op->put(); - - return r; -} - -AioCompletionNotifier *RGWCoroutinesManager::create_completion_notifier(RGWCoroutinesStack *stack) -{ - return new AioCompletionNotifier(&completion_mgr, (void *)stack); -} - int RGWRemoteMetaLog::clone_shards() { list stacks; @@ -1187,7 +852,7 @@ int RGWCloneMetaLogCoroutine::state_store_mdlog_entries() marker = entry.id; } - AioCompletionNotifier *cn = env->stack->create_completion_notifier(); + RGWAioCompletionNotifier *cn = env->stack->create_completion_notifier(); int ret = store->meta_mgr->store_md_log_entries(dest_entries, shard_id, cn->completion()); if (ret < 0) { diff --git a/src/rgw/rgw_sync.h b/src/rgw/rgw_sync.h index e0922bb28a853..e242ef850ce04 100644 --- a/src/rgw/rgw_sync.h +++ b/src/rgw/rgw_sync.h @@ -1,16 +1,9 @@ #ifndef CEPH_RGW_SYNC_H #define CEPH_RGW_SYNC_H -#include "rgw_common.h" -#include "rgw_rados.h" -#include "rgw_metadata.h" -#include "rgw_http_client.h" +#include "rgw_coroutine.h" #include "common/RWLock.h" -#include "common/RefCountedObj.h" - - -#define dout_subsys ceph_subsys_rgw struct rgw_mdlog_info { @@ -21,147 +14,6 @@ struct rgw_mdlog_info { void decode_json(JSONObj *obj); }; -#define RGW_ASYNC_OPS_MGR_WINDOW 16 - -class RGWCoroutinesStack; -class RGWCoroutinesManager; -class AioCompletionNotifier; - -struct RGWCoroutinesEnv { - RGWCoroutinesManager *manager; - list *stacks; - RGWCoroutinesStack *stack; - - RGWCoroutinesEnv() : manager(NULL), stacks(NULL), stack(NULL) {} -}; - -class RGWCoroutine : public RefCountedObject { - friend class RGWCoroutinesStack; -protected: - RGWCoroutinesEnv *env; - bool blocked; - int retcode; - - stringstream error_stream; - - void set_blocked(bool flag) { blocked = flag; } - int yield(int ret) { - set_blocked(true); - return ret; - } - - int do_operate(RGWCoroutinesEnv *_env) { - env = _env; - return operate(); - } - - void call(RGWCoroutine *op); - void spawn(RGWCoroutine *op); - -public: - RGWCoroutine() : env(NULL), blocked(false), retcode(0) {} - virtual ~RGWCoroutine() {} - - virtual int operate() = 0; - - virtual bool is_done() = 0; - virtual bool is_error() = 0; - - stringstream& log_error() { return error_stream; } - string error_str() { - return error_stream.str(); - } - - bool is_blocked() { return blocked; } - - void set_retcode(int r) { - retcode = r; - } - - int get_ret_status() { - return retcode; - } -}; - -class RGWCoroutinesStack { - CephContext *cct; - - RGWCoroutinesManager *ops_mgr; - - list ops; - list::iterator pos; - - set blocked_by_stack; - set blocking_stacks; - - - bool done_flag; - bool error_flag; - bool blocked_flag; - -public: - RGWCoroutinesStack(CephContext *_cct, RGWCoroutinesManager *_ops_mgr, RGWCoroutine *start = NULL); - - int operate(RGWCoroutinesEnv *env); - - bool is_done() { - return done_flag; - } - bool is_error() { - return error_flag; - } - bool is_blocked_by_stack() { - return !blocked_by_stack.empty(); - } - bool is_blocked() { - return blocked_flag || is_blocked_by_stack(); - } - - void set_blocked(bool flag); - - string error_str(); - - int call(RGWCoroutine *next_op, int ret = 0); - int unwind(int retcode); - - AioCompletionNotifier *create_completion_notifier(); - RGWCompletionManager *get_completion_mgr(); - - void set_blocked_by(RGWCoroutinesStack *s) { - blocked_by_stack.insert(s); - s->blocking_stacks.insert(this); - } - - bool unblock_stack(RGWCoroutinesStack **s); -}; - -class RGWCoroutinesManager { - CephContext *cct; - - void handle_unblocked_stack(list& stacks, RGWCoroutinesStack *stack, int *waiting_count); -protected: - RGWCompletionManager completion_mgr; - - int ops_window; - - void put_completion_notifier(AioCompletionNotifier *cn); -public: - RGWCoroutinesManager(CephContext *_cct) : cct(_cct), ops_window(RGW_ASYNC_OPS_MGR_WINDOW) {} - virtual ~RGWCoroutinesManager() {} - - int run(list& ops); - int run(RGWCoroutine *op); - - virtual void report_error(RGWCoroutinesStack *op); - - AioCompletionNotifier *create_completion_notifier(RGWCoroutinesStack *stack); - RGWCompletionManager *get_completion_mgr() { return &completion_mgr; } - - RGWCoroutinesStack *allocate_stack() { - return new RGWCoroutinesStack(cct, this); - } -}; - struct RGWMetaSyncGlobalStatus { enum SyncState { StateInit = 0, -- 2.39.5