return 0;
}
+void RGWAsyncOp::call(RGWAsyncOp *op)
+{
+ int r = env->stack->call(op, 0);
+ assert(r == 0);
+}
+
+void RGWAsyncOp::call_concurrent(RGWAsyncOp *op)
+{
+ RGWAsyncOpsStack *stack = env->manager->allocate_stack();
+
+ int r = stack->call(op, 0);
+ assert(r == 0);
+
+ env->stacks->push_back(stack);
+
+ env->stack->set_blocked_by(stack);
+}
+
class RGWSimpleAsyncOp : public RGWAsyncOp {
enum State {
Init = 0,
protected:
CephContext *cct;
- void call(RGWAsyncOp *op) {
- int r = env->stack->call(op, 0);
- assert(r == 0);
- }
-
public:
RGWSimpleAsyncOp(CephContext *_cct) : state(Init), cct(_cct) {}
RGWObjectCtx& obj_ctx;
RGWMetaSyncGlobalStatus *global_status;
+ rgw_sync_marker sync_marker;
public:
RGWReadSyncStatusOp(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
int RGWReadSyncStatusOp::handle_data(RGWMetaSyncGlobalStatus& data)
{
+ call_concurrent(new RGWSimpleRadosAsyncOp<rgw_sync_marker>(async_rados, store, obj_ctx, store->get_zone_params().log_pool,
+ "mdlog.state.0", &sync_marker));
+ call_concurrent(new RGWSimpleRadosAsyncOp<rgw_sync_marker>(async_rados, store, obj_ctx, store->get_zone_params().log_pool,
+ "mdlog.state.1", &sync_marker));
return 0;
}
ldout(cct, 0) << "ERROR: op->operate() returned r=" << r << dendl;
}
- done_flag = op->is_done();
error_flag = op->is_error();
blocked_flag = op->is_blocked();
- if (done_flag) {
+ if (op->is_done()) {
op->put();
- return unwind(r);
+ r = unwind(r);
+ done_flag = (pos == ops.end());
+ return r;
}
/* should r ever be negative at this point? */
return ops_mgr->get_completion_mgr();
}
+bool RGWAsyncOpsStack::unblock_stack(RGWAsyncOpsStack **s)
+{
+ if (blocking_stacks.empty()) {
+ return false;
+ }
+
+ set<RGWAsyncOpsStack *>::iterator iter = blocking_stacks.begin();
+ *s = *iter;
+ blocking_stacks.erase(iter);
+ (*s)->blocked_by_stack.erase(this);
+
+ return true;
+}
+
void RGWAsyncOpsManager::report_error(RGWAsyncOpsStack *op)
{
#warning need to have error logging infrastructure that logs on backend
report_error(stack);
}
- if (stack->is_blocked()) {
+ if (stack->is_blocked_by_stack()) {
+ /* do nothing, we'll re-add the stack when the blocking stack is done */
+ } else if (stack->is_blocked()) {
waiting_count++;
} else if (stack->is_done()) {
+ RGWAsyncOpsStack *s;
+ while (stack->unblock_stack(&s)) {
+ if (!s->is_blocked_by_stack() && !s->is_done()) {
+ if (s->is_blocked()) {
+ waiting_count++;
+ } else {
+ stacks.push_back(s);
+ }
+ }
+ }
delete stack;
} else {
stacks.push_back(stack);
int RGWAsyncOpsManager::run(RGWAsyncOp *op)
{
list<RGWAsyncOpsStack *> stacks;
- RGWAsyncOpsStack *stack = new RGWAsyncOpsStack(cct, this);
+ RGWAsyncOpsStack *stack = allocate_stack();
int r = stack->call(op);
if (r < 0) {
ldout(cct, 0) << "ERROR: stack->call() returned r=" << r << dendl;
return operate();
}
+ void call(RGWAsyncOp *op);
+ void call_concurrent(RGWAsyncOp *op);
+
public:
RGWAsyncOp() : env(NULL), blocked(false), retcode(0) {}
virtual ~RGWAsyncOp() {}
list<RGWAsyncOp *> ops;
list<RGWAsyncOp *>::iterator pos;
+ set<RGWAsyncOpsStack *> blocked_by_stack;
+ set<RGWAsyncOpsStack *> blocking_stacks;
+
+
bool done_flag;
bool error_flag;
bool blocked_flag;
bool is_error() {
return error_flag;
}
+ bool is_blocked_by_stack() {
+ return !blocked_by_stack.empty();
+ }
bool is_blocked() {
- return blocked_flag;
+ return blocked_flag || is_blocked_by_stack();
}
void set_blocked(bool flag);
AioCompletionNotifier *create_completion_notifier();
RGWCompletionManager *get_completion_mgr();
+
+ void set_blocked_by(RGWAsyncOpsStack *s) {
+ blocked_by_stack.insert(s);
+ s->blocking_stacks.insert(this);
+ }
+
+ bool unblock_stack(RGWAsyncOpsStack **s);
};
class RGWAsyncOpsManager {
AioCompletionNotifier *create_completion_notifier(RGWAsyncOpsStack *stack);
RGWCompletionManager *get_completion_mgr() { return &completion_mgr; }
+
+ RGWAsyncOpsStack *allocate_stack() {
+ return new RGWAsyncOpsStack(cct, this);
+ }
};
struct RGWMetaSyncGlobalStatus {