From 68fd306b43d9402f05ced76ce6d31bb246001e78 Mon Sep 17 00:00:00 2001 From: Yehuda Sadeh Date: Tue, 18 Aug 2015 11:57:27 -0700 Subject: [PATCH] rgw: keep track of spawned coroutines under the current op instead of keeping a per-stack list of spawned coroutines, keep it on the op that spawned it. Otherwise, we might spawn a cr, call a second one that will spawn some crs, and when waiting for these to complete it will also wait on the first one. The list of spawned coroutines is inherited. Once a coroutine finishes its parent will take over all the spawned ones that weren't collected. Signed-off-by: Yehuda Sadeh --- src/rgw/rgw_coroutine.cc | 36 +++++++++++++++++++++++++++++------- src/rgw/rgw_coroutine.h | 21 +++++++++++++++++---- src/rgw/rgw_sync.cc | 21 ++++++++++----------- 3 files changed, 56 insertions(+), 22 deletions(-) diff --git a/src/rgw/rgw_coroutine.cc b/src/rgw/rgw_coroutine.cc index f523c7e2d2897..8140922a5c038 100644 --- a/src/rgw/rgw_coroutine.cc +++ b/src/rgw/rgw_coroutine.cc @@ -48,8 +48,8 @@ int RGWCoroutinesStack::operate(RGWCoroutinesEnv *_env) if (op->is_done()) { int op_retcode = op->get_ret_status(); - op->put(); r = unwind(r); + op->put(); done_flag = (pos == ops.end()); if (done_flag) { retcode = op_retcode; @@ -81,12 +81,14 @@ int RGWCoroutinesStack::call(RGWCoroutine *next_op, int ret) { return ret; } -void RGWCoroutinesStack::spawn(RGWCoroutine *op, bool wait) +void RGWCoroutinesStack::spawn(RGWCoroutine *source_op, RGWCoroutine *op, bool wait) { op->get(); + rgw_spawned_stacks *s = (source_op ? &source_op->spawned : &spawned); + RGWCoroutinesStack *stack = env->manager->allocate_stack(); - spawned_stacks.push_back(stack); + s->entries.push_back(stack); stack->get(); /* we'll need to collect the stack */ int r = stack->call(op, 0); @@ -99,9 +101,17 @@ void RGWCoroutinesStack::spawn(RGWCoroutine *op, bool wait) } } +void RGWCoroutinesStack::spawn(RGWCoroutine *op, bool wait) +{ + spawn(NULL, op, wait); +} + int RGWCoroutinesStack::unwind(int retcode) { + rgw_spawned_stacks *src_spawned = &(*pos)->spawned; + if (pos == ops.begin()) { + spawned.inherit(src_spawned); pos = ops.end(); return retcode; } @@ -110,15 +120,17 @@ int RGWCoroutinesStack::unwind(int retcode) ops.pop_back(); RGWCoroutine *op = *pos; op->set_retcode(retcode); + op->spawned.inherit(src_spawned); return 0; } -bool RGWCoroutinesStack::collect(int *ret) /* returns true if needs to be called again */ +bool RGWCoroutinesStack::collect(RGWCoroutine *op, int *ret) /* returns true if needs to be called again */ { + rgw_spawned_stacks *s = (op ? &op->spawned : &spawned); *ret = 0; list new_list; - for (list::iterator iter = spawned_stacks.begin(); iter != spawned_stacks.end(); ++iter) { + for (list::iterator iter = s->entries.begin(); iter != s->entries.end(); ++iter) { RGWCoroutinesStack *stack = *iter; if (!stack->is_done()) { new_list.push_back(stack); @@ -131,10 +143,15 @@ bool RGWCoroutinesStack::collect(int *ret) /* returns true if needs to be called stack->put(); } - spawned_stacks.swap(new_list); + s->entries.swap(new_list); return (!new_list.empty()); } +bool RGWCoroutinesStack::collect(int *ret) /* returns true if needs to be called again */ +{ + return collect(NULL, ret); +} + static void _aio_completion_notifier_cb(librados::completion_t cb, void *arg); static void _aio_completion_notifier_cb(librados::completion_t cb, void *arg) @@ -300,7 +317,12 @@ void RGWCoroutine::call(RGWCoroutine *op) void RGWCoroutine::spawn(RGWCoroutine *op, bool wait) { - stack->spawn(op, wait); + stack->spawn(this, op, wait); +} + +bool RGWCoroutine::collect(int *ret) /* returns true if needs to be called again */ +{ + return stack->collect(this, ret); } int RGWSimpleCoroutine::operate() diff --git a/src/rgw/rgw_coroutine.h b/src/rgw/rgw_coroutine.h index 7c57146fc6c63..f5aeed2545d41 100644 --- a/src/rgw/rgw_coroutine.h +++ b/src/rgw/rgw_coroutine.h @@ -60,6 +60,14 @@ enum RGWCoroutineState { RGWCoroutine_Run = 0, }; +struct rgw_spawned_stacks { + list entries; + + void inherit(rgw_spawned_stacks *source) { + entries.splice(entries.end(), source->entries); + } +}; + class RGWCoroutine : public RefCountedObject, public boost::asio::coroutine { friend class RGWCoroutinesStack; @@ -70,6 +78,8 @@ protected: int retcode; int state; + rgw_spawned_stacks spawned; + stringstream error_stream; int set_state(int s, int ret = 0) { @@ -80,9 +90,6 @@ protected: void set_sleeping(bool flag); int io_block(int ret); - void call(RGWCoroutine *op); /* call at the same stack we're in */ - void spawn(RGWCoroutine *op, bool wait); /* execute on a different stack */ - public: RGWCoroutine(CephContext *_cct) : cct(_cct), stack(NULL), retcode(0), state(RGWCoroutine_Run) {} virtual ~RGWCoroutine() {} @@ -104,6 +111,10 @@ public: int get_ret_status() { return retcode; } + + void call(RGWCoroutine *op); /* call at the same stack we're in */ + void spawn(RGWCoroutine *op, bool wait); /* execute on a different stack */ + bool collect(int *ret); /* returns true if needs to be called again */ }; template @@ -146,7 +157,7 @@ class RGWCoroutinesStack : public RefCountedObject { list ops; list::iterator pos; - list spawned_stacks; + rgw_spawned_stacks spawned; set blocked_by_stack; set blocking_stacks; @@ -163,6 +174,8 @@ class RGWCoroutinesStack : public RefCountedObject { protected: RGWCoroutinesEnv *env; + void spawn(RGWCoroutine *source_op, RGWCoroutine *next_op, bool wait); + bool collect(RGWCoroutine *op, int *ret); /* returns true if needs to be called again */ public: RGWCoroutinesStack(CephContext *_cct, RGWCoroutinesManager *_ops_mgr, RGWCoroutine *start = NULL); diff --git a/src/rgw/rgw_sync.cc b/src/rgw/rgw_sync.cc index 499b2f7f52d8b..bc7a39b6c7900 100644 --- a/src/rgw/rgw_sync.cc +++ b/src/rgw/rgw_sync.cc @@ -770,7 +770,7 @@ public: lock_name, cookie)); } int ret; - while (stack->collect(&ret)) { + while (collect(&ret)) { if (ret < 0) { return set_state(RGWCoroutine_Error); } @@ -820,7 +820,6 @@ int RGWReadSyncStatusCoroutine::handle_data(rgw_meta_sync_info& data) class RGWOmapAppend : public RGWConsumerCR { RGWAsyncRadosProcessor *async_rados; RGWRados *store; - RGWCoroutinesEnv *env; rgw_bucket pool; string oid; @@ -833,9 +832,9 @@ class RGWOmapAppend : public RGWConsumerCR { map entries; public: - RGWOmapAppend(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store, RGWCoroutinesEnv *_env, rgw_bucket& _pool, const string& _oid) + RGWOmapAppend(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store, rgw_bucket& _pool, const string& _oid) : RGWConsumerCR(_store->ctx()), async_rados(_async_rados), - store(_store), env(_env), pool(_pool), oid(_oid), going_down(false), num_pending_entries(0) {} + store(_store), pool(_pool), oid(_oid), going_down(false), num_pending_entries(0) {} #define OMAP_APPEND_MAX_ENTRIES 100 int operate() { @@ -890,22 +889,22 @@ public: class RGWShardedOmapCRManager { RGWAsyncRadosProcessor *async_rados; RGWRados *store; - RGWCoroutinesEnv *env; + RGWCoroutine *op; int num_shards; vector shards; public: - RGWShardedOmapCRManager(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store, RGWCoroutinesEnv *_env, int _num_shards, rgw_bucket& pool, const string& oid_prefix) + RGWShardedOmapCRManager(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store, RGWCoroutine *_op, int _num_shards, rgw_bucket& pool, const string& oid_prefix) : async_rados(_async_rados), - store(_store), env(_env), num_shards(_num_shards) { + store(_store), op(_op), num_shards(_num_shards) { shards.reserve(num_shards); for (int i = 0; i < num_shards; ++i) { char buf[oid_prefix.size() + 16]; snprintf(buf, sizeof(buf), "%s.%d", oid_prefix.c_str(), i); - RGWOmapAppend *shard = new RGWOmapAppend(async_rados, store, env, pool, buf); + RGWOmapAppend *shard = new RGWOmapAppend(async_rados, store, pool, buf); shards.push_back(shard); - env->stack->spawn(shard, false); + op->spawn(shard, false); } } void append(const string& entry) { @@ -947,7 +946,7 @@ public: RGWRESTConn *conn = store->rest_master_conn; reenter(this) { - entries_index = new RGWShardedOmapCRManager(async_rados, store, stack->get_env(), num_shards, + entries_index = new RGWShardedOmapCRManager(async_rados, store, this, num_shards, store->get_zone_params().log_pool, "meta.full-sync.index"); yield { call(new RGWReadRESTResourceCR >(store->ctx(), conn, http_manager, @@ -980,7 +979,7 @@ public: } yield entries_index->finish(); int ret; - while (stack->collect(&ret)) { + while (collect(&ret)) { if (ret < 0) { return set_state(RGWCoroutine_Error); } -- 2.39.5