From 2534452b8647c1e6bb5ece399e8cc6cb2a6fe18a Mon Sep 17 00:00:00 2001 From: Yehuda Sadeh Date: Wed, 5 Aug 2015 16:22:49 -0700 Subject: [PATCH] rgw: a few bug fixes related to async operations fix uninitialized state, trim stacks, etc. Signed-off-by: Yehuda Sadeh --- src/rgw/rgw_http_client.cc | 11 ++++++ src/rgw/rgw_http_client.h | 1 + src/rgw/rgw_sync.cc | 72 +++++++++++++++++++++++--------------- src/rgw/rgw_sync.h | 1 + 4 files changed, 57 insertions(+), 28 deletions(-) diff --git a/src/rgw/rgw_http_client.cc b/src/rgw/rgw_http_client.cc index 5850cc9217e00..3647de8f9f0ce 100644 --- a/src/rgw/rgw_http_client.cc +++ b/src/rgw/rgw_http_client.cc @@ -328,6 +328,17 @@ int RGWCompletionManager::get_next(void **user_info) return 0; } +bool RGWCompletionManager::try_get_next(void **user_info) +{ + Mutex::Locker l(lock); + if (complete_reqs.empty()) { + return false; + } + *user_info = complete_reqs.front(); + complete_reqs.pop_front(); + return true; +} + void RGWCompletionManager::go_down() { Mutex::Locker l(lock); diff --git a/src/rgw/rgw_http_client.h b/src/rgw/rgw_http_client.h index 86d9c8b81d0d7..984a0308e2a87 100644 --- a/src/rgw/rgw_http_client.h +++ b/src/rgw/rgw_http_client.h @@ -79,6 +79,7 @@ public: void complete(void *user_info); int get_next(void **user_info); + bool try_get_next(void **user_info); void go_down(); }; diff --git a/src/rgw/rgw_sync.cc b/src/rgw/rgw_sync.cc index 13df181ef1c9b..ebf8132580326 100644 --- a/src/rgw/rgw_sync.cc +++ b/src/rgw/rgw_sync.cc @@ -425,8 +425,6 @@ int RGWMetaSyncStatusManager::set_state(RGWMetaSyncGlobalStatus::SyncState state } class RGWSimpleAsyncOp : public RGWAsyncOp { - CephContext *cct; - enum State { Init = 0, SendRequest = 1, @@ -448,13 +446,15 @@ class RGWSimpleAsyncOp : public RGWAsyncOp { int state_all_complete(); protected: + CephContext *cct; + void call(RGWAsyncOp *op) { int r = env->stack->call(op, 0); assert(r == 0); } public: - RGWSimpleAsyncOp(CephContext *_cct) : cct(_cct) {} + RGWSimpleAsyncOp(CephContext *_cct) : state(Init), cct(_cct) {} virtual int init() { return 0; } virtual int send_request() = 0; @@ -478,7 +478,7 @@ int RGWSimpleAsyncOp::operate() ldout(cct, 20) << __func__ << ": request complete" << dendl; return state_request_complete(); case AllComplete: - ldout(cct, 20) << __func__ << ": request complete" << dendl; + ldout(cct, 20) << __func__ << ": all complete" << dendl; return state_all_complete(); case Done: ldout(cct, 20) << __func__ << ": done" << dendl; @@ -558,6 +558,10 @@ public: int send_request(); int request_complete(); + + virtual int handle_data(T& data) { + return 0; + } }; template @@ -590,7 +594,7 @@ int RGWSimpleRadosAsyncOp::request_complete() *result = T(); } - return 0; + return handle_data(*result); } class RGWReadSyncStatusOp : public RGWSimpleRadosAsyncOp { @@ -610,10 +614,14 @@ public: async_rados(_async_rados), store(_store), obj_ctx(_obj_ctx), global_status(_gs) {} - ~RGWReadSyncStatusOp() { - } + int handle_data(RGWMetaSyncGlobalStatus& data); }; +int RGWReadSyncStatusOp::handle_data(RGWMetaSyncGlobalStatus& data) +{ + return 0; +} + class RGWMetaSyncOp : public RGWAsyncOp { RGWRados *store; RGWMetadataLog *mdlog; @@ -806,10 +814,12 @@ int RGWAsyncOpsStack::call(RGWAsyncOp *next_op, int ret) { int RGWAsyncOpsStack::unwind(int retcode) { if (pos == ops.begin()) { + pos = ops.end(); return retcode; } --pos; + ops.pop_back(); RGWAsyncOp *op = *pos; op->set_retcode(retcode); return 0; @@ -839,6 +849,17 @@ void RGWAsyncOpsManager::report_error(RGWAsyncOpsStack *op) lderr(cct) << "ERROR: failed operation: " << op->error_str() << dendl; } +void RGWAsyncOpsManager::handle_unblocked_stack(list& stacks, RGWAsyncOpsStack *stack, int *waiting_count) +{ + --(*waiting_count); + stack->set_blocked(false); + if (!stack->is_done()) { + stacks.push_back(stack); + } else { + delete stack; + } +} + int RGWAsyncOpsManager::run(list& stacks) { int waiting_count = 0; @@ -847,7 +868,7 @@ int RGWAsyncOpsManager::run(list& stacks) env.manager = this; env.stacks = &stacks; - for (list::iterator iter = stacks.begin(); iter != stacks.end(); ++iter) { + for (list::iterator iter = stacks.begin(); iter != stacks.end();) { RGWAsyncOpsStack *stack = *iter; env.stack = stack; int ret = stack->operate(&env); @@ -867,31 +888,28 @@ int RGWAsyncOpsManager::run(list& stacks) stacks.push_back(stack); } + RGWAsyncOpsStack *blocked_stack; + while (completion_mgr.try_get_next((void **)&blocked_stack)) { + handle_unblocked_stack(stacks, blocked_stack, &waiting_count); + } + if (waiting_count >= ops_window) { - RGWAsyncOpsStack *blocked_stack; int ret = completion_mgr.get_next((void **)&blocked_stack); if (ret < 0) { ldout(cct, 0) << "ERROR: failed to clone shard, completion_mgr.get_next() returned ret=" << ret << dendl; - } else { - waiting_count--; - } - blocked_stack->set_blocked(false); - if (!blocked_stack->is_done()) { - stacks.push_back(blocked_stack); - } else { - delete blocked_stack; } + handle_unblocked_stack(stacks, blocked_stack, &waiting_count); } - } - while (waiting_count > 0) { - RGWAsyncOpsStack *stack; - int ret = completion_mgr.get_next((void **)&stack); - if (ret < 0) { - ldout(cct, 0) << "ERROR: failed to clone shard, completion_mgr.get_next() returned ret=" << ret << dendl; - return ret; - } else { - waiting_count--; + ++iter; + stacks.pop_front(); + while (iter == stacks.end() && waiting_count > 0) { + int ret = completion_mgr.get_next((void **)&blocked_stack); + if (ret < 0) { + ldout(cct, 0) << "ERROR: failed to clone shard, completion_mgr.get_next() returned ret=" << ret << dendl; + } + handle_unblocked_stack(stacks, blocked_stack, &waiting_count); + iter = stacks.begin(); } } @@ -915,8 +933,6 @@ int RGWAsyncOpsManager::run(RGWAsyncOp *op) ldout(cct, 0) << "ERROR: run(stacks) returned r=" << r << dendl; } - delete stack; - return r; } diff --git a/src/rgw/rgw_sync.h b/src/rgw/rgw_sync.h index 010f9bdf171c9..2ba575e44a0ce 100644 --- a/src/rgw/rgw_sync.h +++ b/src/rgw/rgw_sync.h @@ -117,6 +117,7 @@ public: class RGWAsyncOpsManager { CephContext *cct; + void handle_unblocked_stack(list& stacks, RGWAsyncOpsStack *stack, int *waiting_count); protected: RGWCompletionManager completion_mgr; -- 2.39.5