f->close_section();
}
-void RGWCoroutinesManager::handle_unblocked_stack(list<RGWCoroutinesStack *>& stacks, RGWCoroutinesStack *stack, int *blocked_count)
+void RGWCoroutinesManager::handle_unblocked_stack(set<RGWCoroutinesStack *>& context_stacks, list<RGWCoroutinesStack *>& stacks, RGWCoroutinesStack *stack, int *blocked_count)
{
+ RWLock::WLocker wl(lock);
--(*blocked_count);
stack->set_io_blocked(false);
stack->set_interval_wait(false);
if (!stack->is_done()) {
stacks.push_back(stack);
} else {
+ context_stacks.erase(stack);
stack->put();
}
}
env.stacks = &stacks;
for (list<RGWCoroutinesStack *>::iterator iter = stacks.begin(); iter != stacks.end() && !going_down.read();) {
+ lock.get_write();
+
RGWCoroutinesStack *stack = *iter;
env.stack = stack;
stack->schedule();
}
+ lock.unlock();
+
RGWCoroutinesStack *blocked_stack;
while (completion_mgr.try_get_next((void **)&blocked_stack)) {
- handle_unblocked_stack(stacks, blocked_stack, &blocked_count);
+ handle_unblocked_stack(context_stacks, stacks, blocked_stack, &blocked_count);
}
/*
if (ret < 0) {
ldout(cct, 0) << "ERROR: failed to clone shard, completion_mgr.get_next() returned ret=" << ret << dendl;
}
- handle_unblocked_stack(stacks, blocked_stack, &blocked_count);
+ handle_unblocked_stack(context_stacks, stacks, blocked_stack, &blocked_count);
}
++iter;
ldout(cct, 5) << __func__ << "(): was stopped, exiting" << dendl;
return -ECANCELED;
}
- handle_unblocked_stack(stacks, blocked_stack, &blocked_count);
+ handle_unblocked_stack(context_stacks, stacks, blocked_stack, &blocked_count);
iter = stacks.begin();
}
}
void RGWCoroutinesManager::dump(Formatter *f) const {
+ RWLock::RLocker rl(lock);
+
f->open_array_section("run_contexts");
for (auto& i : run_contexts) {
char buf[32];
atomic64_t run_context_count;
map<uint64_t, set<RGWCoroutinesStack *> > run_contexts;
- void handle_unblocked_stack(list<RGWCoroutinesStack *>& stacks, RGWCoroutinesStack *stack, int *waiting_count);
+ RWLock lock;
+
+ void handle_unblocked_stack(set<RGWCoroutinesStack *>& context_stacks, list<RGWCoroutinesStack *>& stacks, RGWCoroutinesStack *stack, int *waiting_count);
protected:
RGWCompletionManager completion_mgr;
void put_completion_notifier(RGWAioCompletionNotifier *cn);
public:
- RGWCoroutinesManager(CephContext *_cct) : cct(_cct), completion_mgr(cct), ops_window(RGW_ASYNC_OPS_MGR_WINDOW) {}
+ RGWCoroutinesManager(CephContext *_cct) : cct(_cct), lock("RGWCoroutinesManager::lock"), completion_mgr(cct), ops_window(RGW_ASYNC_OPS_MGR_WINDOW) {}
virtual ~RGWCoroutinesManager() {}
int run(list<RGWCoroutinesStack *>& ops);