stack->get(); /* we'll need to collect the stack */
stack->call(op);
- env->stacks->push_back(stack);
+ env->manager->schedule(env, stack);
if (wait) {
set_blocked_by(stack);
f->close_section();
}
-void RGWCoroutinesManager::handle_unblocked_stack(set<RGWCoroutinesStack *>& context_stacks, list<RGWCoroutinesStack *>& stacks, RGWCoroutinesStack *stack, int *blocked_count)
+void RGWCoroutinesManager::handle_unblocked_stack(set<RGWCoroutinesStack *>& context_stacks, list<RGWCoroutinesStack *>& scheduled_stacks, RGWCoroutinesStack *stack, int *blocked_count)
{
RWLock::WLocker wl(lock);
--(*blocked_count);
stack->set_io_blocked(false);
stack->set_interval_wait(false);
if (!stack->is_done()) {
- stacks.push_back(stack);
+ scheduled_stacks.push_back(stack);
} else {
RWLock::WLocker wl(lock);
context_stacks.erase(stack);
}
}
+void RGWCoroutinesManager::schedule(RGWCoroutinesEnv *env, RGWCoroutinesStack *stack)
+{
+ assert(lock.is_wlocked());
+ env->scheduled_stacks->push_back(stack);
+ set<RGWCoroutinesStack *>& context_stacks = run_contexts[env->run_context];
+ context_stacks.insert(stack);
+}
+
int RGWCoroutinesManager::run(list<RGWCoroutinesStack *>& stacks)
{
int blocked_count = 0;
int interval_wait_count = 0;
RGWCoroutinesEnv env;
- uint64_t run_num = run_context_count.inc();
+ uint64_t run_context = run_context_count.inc();
lock.get_write();
- set<RGWCoroutinesStack *>& context_stacks = run_contexts[run_num];
+ set<RGWCoroutinesStack *>& context_stacks = run_contexts[run_context];
+ list<RGWCoroutinesStack *> scheduled_stacks;
for (auto& st : stacks) {
context_stacks.insert(st);
+ scheduled_stacks.push_back(st);
}
lock.unlock();
+ env.run_context = run_context;
env.manager = this;
- env.stacks = &stacks;
+ env.scheduled_stacks = &scheduled_stacks;
- for (list<RGWCoroutinesStack *>::iterator iter = stacks.begin(); iter != stacks.end() && !going_down.read();) {
+ for (list<RGWCoroutinesStack *>::iterator iter = scheduled_stacks.begin(); iter != scheduled_stacks.end() && !going_down.read();) {
lock.get_write();
RGWCoroutinesStack *stack = *iter;
RGWCoroutinesStack *blocked_stack;
while (completion_mgr.try_get_next((void **)&blocked_stack)) {
- handle_unblocked_stack(context_stacks, stacks, blocked_stack, &blocked_count);
+ handle_unblocked_stack(context_stacks, scheduled_stacks, blocked_stack, &blocked_count);
}
/*
if (ret < 0) {
ldout(cct, 0) << "ERROR: failed to clone shard, completion_mgr.get_next() returned ret=" << ret << dendl;
}
- handle_unblocked_stack(context_stacks, stacks, blocked_stack, &blocked_count);
+ handle_unblocked_stack(context_stacks, scheduled_stacks, blocked_stack, &blocked_count);
}
++iter;
- stacks.pop_front();
+ scheduled_stacks.pop_front();
- while (stacks.empty() && blocked_count > 0) {
+ while (scheduled_stacks.empty() && blocked_count > 0) {
int ret = completion_mgr.get_next((void **)&blocked_stack);
if (ret < 0) {
ldout(cct, 0) << "ERROR: failed to clone shard, completion_mgr.get_next() returned ret=" << ret << dendl;
ldout(cct, 5) << __func__ << "(): was stopped, exiting" << dendl;
return -ECANCELED;
}
- handle_unblocked_stack(context_stacks, stacks, blocked_stack, &blocked_count);
- iter = stacks.begin();
+ handle_unblocked_stack(context_stacks, scheduled_stacks, blocked_stack, &blocked_count);
+ iter = scheduled_stacks.begin();
}
- if (iter == stacks.end()) {
- iter = stacks.begin();
+ if (iter == scheduled_stacks.end()) {
+ iter = scheduled_stacks.begin();
}
}
f->close_section();
}
+RGWCoroutinesStack *RGWCoroutinesManager::allocate_stack() {
+ RGWCoroutinesStack *stack = new RGWCoroutinesStack(cct, this);
+ stack->get();
+ return stack;
+}
+
string RGWCoroutinesManager::get_id()
{
if (!id.empty()) {
if (!spawned.entries.empty()) {
f->open_array_section("spawned");
for (auto& i : spawned.entries) {
- encode_json("entry", *i, f);
+ char buf[32];
+ snprintf(buf, sizeof(buf), "%p", (void *)i);
+ encode_json("stack", string(buf), f);
}
f->close_section();
}
}
};
-
struct RGWCoroutinesEnv {
+ uint64_t run_context;
RGWCoroutinesManager *manager;
- list<RGWCoroutinesStack *> *stacks;
+ list<RGWCoroutinesStack *> *scheduled_stacks;
RGWCoroutinesStack *stack;
- RGWCoroutinesEnv() : manager(NULL), stacks(NULL), stack(NULL) {}
+ RGWCoroutinesEnv() : run_context(0), manager(NULL), scheduled_stacks(NULL), stack(NULL) {}
};
enum RGWCoroutineState {
void schedule(list<RGWCoroutinesStack *> *stacks = NULL) {
if (!stacks) {
- stacks = env->stacks;
+ stacks = env->scheduled_stacks;
}
if (!is_scheduled) {
stacks->push_back(this);
RWLock lock;
- void handle_unblocked_stack(set<RGWCoroutinesStack *>& context_stacks, list<RGWCoroutinesStack *>& stacks, RGWCoroutinesStack *stack, int *waiting_count);
+ void handle_unblocked_stack(set<RGWCoroutinesStack *>& context_stacks, list<RGWCoroutinesStack *>& scheduled_stacks, RGWCoroutinesStack *stack, int *waiting_count);
protected:
RGWCompletionManager completion_mgr;
RGWCoroutinesManagerRegistry *cr_registry;
RGWAioCompletionNotifier *create_completion_notifier(RGWCoroutinesStack *stack);
RGWCompletionManager *get_completion_mgr() { return &completion_mgr; }
- RGWCoroutinesStack *allocate_stack() {
- RGWCoroutinesStack *stack = new RGWCoroutinesStack(cct, this);
- stack->get();
- return stack;
- }
+ void schedule(RGWCoroutinesEnv *env, RGWCoroutinesStack *stack);
+ RGWCoroutinesStack *allocate_stack();
virtual string get_id();
void dump(Formatter *f) const;