}
class RGWSimpleAsyncOp : public RGWAsyncOp {
- CephContext *cct;
-
enum State {
Init = 0,
SendRequest = 1,
int state_all_complete();
protected:
+ CephContext *cct;
+
void call(RGWAsyncOp *op) {
int r = env->stack->call(op, 0);
assert(r == 0);
}
public:
- RGWSimpleAsyncOp(CephContext *_cct) : cct(_cct) {}
+ RGWSimpleAsyncOp(CephContext *_cct) : state(Init), cct(_cct) {}
virtual int init() { return 0; }
virtual int send_request() = 0;
ldout(cct, 20) << __func__ << ": request complete" << dendl;
return state_request_complete();
case AllComplete:
- ldout(cct, 20) << __func__ << ": request complete" << dendl;
+ ldout(cct, 20) << __func__ << ": all complete" << dendl;
return state_all_complete();
case Done:
ldout(cct, 20) << __func__ << ": done" << dendl;
int send_request();
int request_complete();
+
+ virtual int handle_data(T& data) {
+ return 0;
+ }
};
template <class T>
*result = T();
}
- return 0;
+ return handle_data(*result);
}
class RGWReadSyncStatusOp : public RGWSimpleRadosAsyncOp<RGWMetaSyncGlobalStatus> {
async_rados(_async_rados), store(_store),
obj_ctx(_obj_ctx), global_status(_gs) {}
- ~RGWReadSyncStatusOp() {
- }
+ int handle_data(RGWMetaSyncGlobalStatus& data);
};
+int RGWReadSyncStatusOp::handle_data(RGWMetaSyncGlobalStatus& data)
+{
+ return 0;
+}
+
class RGWMetaSyncOp : public RGWAsyncOp {
RGWRados *store;
RGWMetadataLog *mdlog;
int RGWAsyncOpsStack::unwind(int retcode)
{
if (pos == ops.begin()) {
+ pos = ops.end();
return retcode;
}
--pos;
+ ops.pop_back();
RGWAsyncOp *op = *pos;
op->set_retcode(retcode);
return 0;
lderr(cct) << "ERROR: failed operation: " << op->error_str() << dendl;
}
+void RGWAsyncOpsManager::handle_unblocked_stack(list<RGWAsyncOpsStack *>& stacks, RGWAsyncOpsStack *stack, int *waiting_count)
+{
+ --(*waiting_count);
+ stack->set_blocked(false);
+ if (!stack->is_done()) {
+ stacks.push_back(stack);
+ } else {
+ delete stack;
+ }
+}
+
int RGWAsyncOpsManager::run(list<RGWAsyncOpsStack *>& stacks)
{
int waiting_count = 0;
env.manager = this;
env.stacks = &stacks;
- for (list<RGWAsyncOpsStack *>::iterator iter = stacks.begin(); iter != stacks.end(); ++iter) {
+ for (list<RGWAsyncOpsStack *>::iterator iter = stacks.begin(); iter != stacks.end();) {
RGWAsyncOpsStack *stack = *iter;
env.stack = stack;
int ret = stack->operate(&env);
stacks.push_back(stack);
}
+ RGWAsyncOpsStack *blocked_stack;
+ while (completion_mgr.try_get_next((void **)&blocked_stack)) {
+ handle_unblocked_stack(stacks, blocked_stack, &waiting_count);
+ }
+
if (waiting_count >= ops_window) {
- RGWAsyncOpsStack *blocked_stack;
int ret = completion_mgr.get_next((void **)&blocked_stack);
if (ret < 0) {
ldout(cct, 0) << "ERROR: failed to clone shard, completion_mgr.get_next() returned ret=" << ret << dendl;
- } else {
- waiting_count--;
- }
- blocked_stack->set_blocked(false);
- if (!blocked_stack->is_done()) {
- stacks.push_back(blocked_stack);
- } else {
- delete blocked_stack;
}
+ handle_unblocked_stack(stacks, blocked_stack, &waiting_count);
}
- }
- while (waiting_count > 0) {
- RGWAsyncOpsStack *stack;
- int ret = completion_mgr.get_next((void **)&stack);
- if (ret < 0) {
- ldout(cct, 0) << "ERROR: failed to clone shard, completion_mgr.get_next() returned ret=" << ret << dendl;
- return ret;
- } else {
- waiting_count--;
+ ++iter;
+ stacks.pop_front();
+ while (iter == stacks.end() && waiting_count > 0) {
+ int ret = completion_mgr.get_next((void **)&blocked_stack);
+ if (ret < 0) {
+ ldout(cct, 0) << "ERROR: failed to clone shard, completion_mgr.get_next() returned ret=" << ret << dendl;
+ }
+ handle_unblocked_stack(stacks, blocked_stack, &waiting_count);
+ iter = stacks.begin();
}
}
ldout(cct, 0) << "ERROR: run(stacks) returned r=" << r << dendl;
}
- delete stack;
-
return r;
}