From 5a2e8f0526db92a290c711f82627fc5042c290ea Mon Sep 17 00:00:00 2001 From: Yehuda Sadeh Date: Tue, 12 Jul 2016 20:36:35 -0700 Subject: [PATCH] rgw: collect skips a specific coroutine stack Fixes: http://tracker.ceph.com/issues/16665 Instead of drain_all_but() that specifies number of stacks to leave behind, added drain_all_but_stack() that has a specific stack specified. This is needed so that we don't call wakeup() through lease_cr->go_down() on a cr stack that was already collected. Signed-off-by: Yehuda Sadeh --- src/rgw/rgw_coroutine.cc | 20 ++++++++++++-------- src/rgw/rgw_coroutine.h | 12 ++++++++---- src/rgw/rgw_data_sync.cc | 35 +++++++++++++++++++---------------- src/rgw/rgw_sync.cc | 26 +++++++++++++++----------- 4 files changed, 54 insertions(+), 39 deletions(-) diff --git a/src/rgw/rgw_coroutine.cc b/src/rgw/rgw_coroutine.cc index a09afc85532..2e478f23850 100644 --- a/src/rgw/rgw_coroutine.cc +++ b/src/rgw/rgw_coroutine.cc @@ -292,7 +292,7 @@ int RGWCoroutinesStack::unwind(int retcode) } -bool RGWCoroutinesStack::collect(RGWCoroutine *op, int *ret) /* returns true if needs to be called again */ +bool RGWCoroutinesStack::collect(RGWCoroutine *op, int *ret, RGWCoroutinesStack *skip_stack) /* returns true if needs to be called again */ { rgw_spawned_stacks *s = (op ? &op->spawned : &spawned); *ret = 0; @@ -300,7 +300,7 @@ bool RGWCoroutinesStack::collect(RGWCoroutine *op, int *ret) /* returns true if for (vector::iterator iter = s->entries.begin(); iter != s->entries.end(); ++iter) { RGWCoroutinesStack *stack = *iter; - if (!stack->is_done()) { + if (stack == skip_stack || !stack->is_done()) { new_list.push_back(stack); ldout(cct, 20) << "collect(): s=" << (void *)this << " stack=" << (void *)stack << " is still running" << dendl; continue; @@ -349,9 +349,9 @@ bool RGWCoroutinesStack::collect_next(RGWCoroutine *op, int *ret, RGWCoroutinesS return false; } -bool RGWCoroutinesStack::collect(int *ret) /* returns true if needs to be called again */ +bool RGWCoroutinesStack::collect(int *ret, RGWCoroutinesStack *skip_stack) /* returns true if needs to be called again */ { - return collect(NULL, ret); + return collect(NULL, ret, skip_stack); } static void _aio_completion_notifier_cb(librados::completion_t cb, void *arg); @@ -712,9 +712,9 @@ RGWCoroutinesStack *RGWCoroutine::spawn(RGWCoroutine *op, bool wait) return stack->spawn(this, op, wait); } -bool RGWCoroutine::collect(int *ret) /* returns true if needs to be called again */ +bool RGWCoroutine::collect(int *ret, RGWCoroutinesStack *skip_stack) /* returns true if needs to be called again */ { - return stack->collect(this, ret); + return stack->collect(this, ret, skip_stack); } bool RGWCoroutine::collect_next(int *ret, RGWCoroutinesStack **collected_stack) /* returns true if found a stack to collect */ @@ -752,14 +752,18 @@ ostream& operator<<(ostream& out, const RGWCoroutine& cr) return out; } -bool RGWCoroutine::drain_children(int num_cr_left) +bool RGWCoroutine::drain_children(int num_cr_left, RGWCoroutinesStack *skip_stack) { bool done = false; + assert(num_cr_left >= 0); + if (num_cr_left == 0 && skip_stack) { + num_cr_left = 1; + } reenter(&drain_cr) { while (num_spawned() > (size_t)num_cr_left) { yield wait_for_child(); int ret; - while (collect(&ret)) { + while (collect(&ret, skip_stack)) { if (ret < 0) { ldout(cct, 10) << "collect() returned ret=" << ret << dendl; /* we should have reported this error */ diff --git a/src/rgw/rgw_coroutine.h b/src/rgw/rgw_coroutine.h index 11addf68b23..6b17fa0a69b 100644 --- a/src/rgw/rgw_coroutine.h +++ b/src/rgw/rgw_coroutine.h @@ -265,11 +265,11 @@ public: void call(RGWCoroutine *op); /* call at the same stack we're in */ RGWCoroutinesStack *spawn(RGWCoroutine *op, bool wait); /* execute on a different stack */ - bool collect(int *ret); /* returns true if needs to be called again */ + bool collect(int *ret, RGWCoroutinesStack *skip_stack); /* returns true if needs to be called again */ bool collect_next(int *ret, RGWCoroutinesStack **collected_stack = NULL); /* returns true if found a stack to collect */ int wait(const utime_t& interval); - bool drain_children(int num_cr_left); /* returns true if needed to be called again */ + bool drain_children(int num_cr_left, RGWCoroutinesStack *skip_stack = NULL); /* returns true if needed to be called again */ void wakeup(); void set_sleeping(bool flag); /* put in sleep, or wakeup from sleep */ @@ -306,6 +306,10 @@ do { \ drain_cr = boost::asio::coroutine(); \ yield_until_true(drain_children(n)) +#define drain_all_but_stack(stack) \ + drain_cr = boost::asio::coroutine(); \ + yield_until_true(drain_children(1, stack)) + template class RGWConsumerCR : public RGWCoroutine { list product; @@ -371,7 +375,7 @@ protected: RGWCoroutinesStack *parent; RGWCoroutinesStack *spawn(RGWCoroutine *source_op, RGWCoroutine *next_op, bool wait); - bool collect(RGWCoroutine *op, int *ret); /* returns true if needs to be called again */ + bool collect(RGWCoroutine *op, int *ret, RGWCoroutinesStack *skip_stack); /* returns true if needs to be called again */ bool collect_next(RGWCoroutine *op, int *ret, RGWCoroutinesStack **collected_stack); /* returns true if found a stack to collect */ public: RGWCoroutinesStack(CephContext *_cct, RGWCoroutinesManager *_ops_mgr, RGWCoroutine *start = NULL); @@ -442,7 +446,7 @@ public: int wait(const utime_t& interval); void wakeup(); - bool collect(int *ret); /* returns true if needs to be called again */ + bool collect(int *ret, RGWCoroutinesStack *skip_stack); /* returns true if needs to be called again */ RGWAioCompletionNotifier *create_completion_notifier(); RGWCompletionManager *get_completion_mgr(); diff --git a/src/rgw/rgw_data_sync.cc b/src/rgw/rgw_data_sync.cc index f686f97aa8c..01e55739de2 100644 --- a/src/rgw/rgw_data_sync.cc +++ b/src/rgw/rgw_data_sync.cc @@ -472,7 +472,7 @@ public: spawn(new RGWReadRemoteDataLogShardInfoCR(sync_env, i, &shards_info[i]), true); } } - while (collect(&ret)) { + while (collect(&ret, NULL)) { if (ret < 0) { return set_state(RGWCoroutine_Error); } @@ -497,7 +497,7 @@ public: call(new RGWSimpleRadosUnlockCR(sync_env->async_rados, store, store->get_zone_params().log_pool, sync_status_oid, lock_name, cookie)); } - while (collect(&ret)) { + while (collect(&ret, NULL)) { if (ret < 0) { return set_state(RGWCoroutine_Error); } @@ -724,7 +724,7 @@ public: yield call(sync_env->error_logger->log_error_cr(sync_env->conn->get_remote_id(), "data.init", "", EIO, string("failed to build bucket instances map"))); } - while (collect(&ret)) { + while (collect(&ret, NULL)) { if (ret < 0) { yield call(sync_env->error_logger->log_error_cr(sync_env->conn->get_remote_id(), "data.init", "", -ret, string("failed to store sync status: ") + cpp_strerror(-ret))); @@ -986,6 +986,7 @@ class RGWDataSyncShardCR : public RGWCoroutine { set spawned_keys; RGWContinuousLeaseCR *lease_cr; + RGWCoroutinesStack *lease_stack; string status_oid; @@ -1013,7 +1014,7 @@ public: sync_marker(_marker), marker_tracker(NULL), truncated(false), inc_lock("RGWDataSyncShardCR::inc_lock"), total_entries(0), spawn_window(BUCKET_SHARD_SYNC_SPAWN_WINDOW), reset_backoff(NULL), - lease_cr(NULL), error_repo(nullptr), max_error_entries(DATA_SYNC_MAX_ERR_ENTRIES), + lease_cr(nullptr), lease_stack(nullptr), error_repo(nullptr), max_error_entries(DATA_SYNC_MAX_ERR_ENTRIES), retry_backoff_secs(RETRY_BACKOFF_SECS_DEFAULT) { set_description() << "data sync shard source_zone=" << sync_env->source_zone << " shard_id=" << shard_id; status_oid = RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, shard_id); @@ -1080,7 +1081,7 @@ public: lease_cr = new RGWContinuousLeaseCR(sync_env->async_rados, store, store->get_zone_params().log_pool, status_oid, lock_name, lock_duration, this); lease_cr->get(); - spawn(lease_cr, false); + lease_stack = spawn(lease_cr, false); } int full_sync() { @@ -1252,7 +1253,7 @@ public: set_status() << "num_spawned() > spawn_window"; yield wait_for_child(); int ret; - while (collect(&ret)) { + while (collect(&ret, lease_stack)) { if (ret < 0) { ldout(sync_env->cct, 0) << "ERROR: a sync operation returned error" << dendl; /* we have reported this error */ @@ -2192,6 +2193,7 @@ class RGWBucketShardFullSyncCR : public RGWCoroutine { int total_entries; RGWContinuousLeaseCR *lease_cr; + RGWCoroutinesStack *lease_stack; string status_oid; @@ -2205,7 +2207,7 @@ public: full_marker(_full_marker), marker_tracker(NULL), spawn_window(BUCKET_SYNC_SPAWN_WINDOW), entry(NULL), op(CLS_RGW_OP_ADD), - total_entries(0), lease_cr(NULL) { + total_entries(0), lease_cr(nullptr), lease_stack(nullptr) { status_oid = RGWBucketSyncStatusManager::status_oid(sync_env->source_zone, bs); logger.init(sync_env, "BucketFull", bs.get_key()); } @@ -2232,7 +2234,7 @@ int RGWBucketShardFullSyncCR::operate() lease_cr = new RGWContinuousLeaseCR(sync_env->async_rados, store, store->get_zone_params().log_pool, status_oid, lock_name, lock_duration, this); lease_cr->get(); - spawn(lease_cr, false); + lease_stack = spawn(lease_cr, false); } while (!lease_cr->is_locked()) { if (lease_cr->is_done()) { @@ -2281,7 +2283,7 @@ int RGWBucketShardFullSyncCR::operate() } while ((int)num_spawned() > spawn_window) { yield wait_for_child(); - while (collect(&ret)) { + while (collect(&ret, lease_stack)) { if (ret < 0) { ldout(sync_env->cct, 0) << "ERROR: a sync operation returned error" << dendl; /* we have reported this error */ @@ -2292,7 +2294,7 @@ int RGWBucketShardFullSyncCR::operate() } while (list_result.is_truncated); set_status("done iterating over all objects"); /* wait for all operations to complete */ - drain_all_but(1); /* still need to hold lease cr */ + drain_all_but_stack(lease_stack); /* still need to hold lease cr */ /* update sync state to incremental */ yield { rgw_bucket_shard_sync_info sync_status; @@ -2330,6 +2332,7 @@ class RGWBucketShardIncrementalSyncCR : public RGWCoroutine { const int spawn_window{BUCKET_SYNC_SPAWN_WINDOW}; bool updated_status{false}; RGWContinuousLeaseCR *lease_cr{nullptr}; + RGWCoroutinesStack *lease_stack{nullptr}; const string status_oid; string name; @@ -2376,7 +2379,7 @@ int RGWBucketShardIncrementalSyncCR::operate() lease_cr = new RGWContinuousLeaseCR(sync_env->async_rados, store, store->get_zone_params().log_pool, status_oid, lock_name, lock_duration, this); lease_cr->get(); - spawn(lease_cr, false); + lease_stack = spawn(lease_cr, false); } while (!lease_cr->is_locked()) { if (lease_cr->is_done()) { @@ -2397,7 +2400,7 @@ int RGWBucketShardIncrementalSyncCR::operate() &list_result)); if (retcode < 0 && retcode != -ENOENT) { /* wait for all operations to complete */ - drain_all_but(1); + drain_all_but_stack(lease_stack); lease_cr->go_down(); drain_all(); return set_cr_error(retcode); @@ -2477,7 +2480,7 @@ int RGWBucketShardIncrementalSyncCR::operate() } ldout(sync_env->cct, 5) << *this << ": [inc sync] can't do op on key=" << key << " need to wait for conflicting operation to complete" << dendl; yield wait_for_child(); - while (collect(&ret)) { + while (collect(&ret, lease_stack)) { if (ret < 0) { ldout(sync_env->cct, 0) << "ERROR: a child operation returned error (ret=" << ret << ")" << dendl; /* we have reported this error */ @@ -2509,7 +2512,7 @@ int RGWBucketShardIncrementalSyncCR::operate() while ((int)num_spawned() > spawn_window) { set_status() << "num_spawned() > spawn_window"; yield wait_for_child(); - while (collect(&ret)) { + while (collect(&ret, lease_stack)) { if (ret < 0) { ldout(sync_env->cct, 0) << "ERROR: a sync operation returned error" << dendl; /* we have reported this error */ @@ -2525,13 +2528,13 @@ int RGWBucketShardIncrementalSyncCR::operate() } if (retcode < 0) { ldout(sync_env->cct, 0) << "ERROR: marker_tracker->flush() returned retcode=" << retcode << dendl; - drain_all_but(1); + drain_all_but_stack(lease_stack); lease_cr->go_down(); drain_all(); return set_cr_error(retcode); } - drain_all_but(1); + drain_all_but_stack(lease_stack); lease_cr->go_down(); /* wait for all operations to complete */ drain_all(); diff --git a/src/rgw/rgw_sync.cc b/src/rgw/rgw_sync.cc index b31a1666d04..c53a2cd5da6 100644 --- a/src/rgw/rgw_sync.cc +++ b/src/rgw/rgw_sync.cc @@ -593,12 +593,13 @@ class RGWInitSyncStatusCoroutine : public RGWCoroutine { rgw_meta_sync_info status; vector shards_info; RGWContinuousLeaseCR *lease_cr; + RGWCoroutinesStack *lease_stack; public: RGWInitSyncStatusCoroutine(RGWMetaSyncEnv *_sync_env, const rgw_meta_sync_info &status) : RGWCoroutine(_sync_env->store->ctx()), sync_env(_sync_env), status(status), shards_info(status.num_shards), - lease_cr(NULL) {} + lease_cr(nullptr), lease_stack(nullptr) {} ~RGWInitSyncStatusCoroutine() { if (lease_cr) { @@ -618,7 +619,7 @@ public: lease_cr = new RGWContinuousLeaseCR(sync_env->async_rados, store, store->get_zone_params().log_pool, sync_env->status_oid(), lock_name, lock_duration, this); lease_cr->get(); - spawn(lease_cr, false); + lease_stack = spawn(lease_cr, false); } while (!lease_cr->is_locked()) { if (lease_cr->is_done()) { @@ -651,7 +652,7 @@ public: } } - drain_all_but(1); /* the lease cr still needs to run */ + drain_all_but_stack(lease_stack); /* the lease cr still needs to run */ yield { set_status("updating sync status"); @@ -674,7 +675,7 @@ public: } set_status("drop lock lease"); yield lease_cr->go_down(); - while (collect(&ret)) { + while (collect(&ret, NULL)) { if (ret < 0) { return set_cr_error(ret); } @@ -737,6 +738,7 @@ class RGWFetchAllMetaCR : public RGWCoroutine { RGWShardedOmapCRManager *entries_index; RGWContinuousLeaseCR *lease_cr; + RGWCoroutinesStack *lease_stack; bool lost_lock; bool failed; @@ -746,7 +748,8 @@ public: RGWFetchAllMetaCR(RGWMetaSyncEnv *_sync_env, int _num_shards, map& _markers) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), num_shards(_num_shards), - ret_status(0), entries_index(NULL), lease_cr(NULL), lost_lock(false), failed(false), markers(_markers) { + ret_status(0), entries_index(NULL), lease_cr(nullptr), lease_stack(nullptr), + lost_lock(false), failed(false), markers(_markers) { } ~RGWFetchAllMetaCR() { @@ -791,7 +794,7 @@ public: lease_cr = new RGWContinuousLeaseCR(sync_env->async_rados, sync_env->store, sync_env->store->get_zone_params().log_pool, sync_env->status_oid(), lock_name, lock_duration, this); lease_cr->get(); - spawn(lease_cr, false); + lease_stack = spawn(lease_cr, false); } while (!lease_cr->is_locked()) { if (lease_cr->is_done()) { @@ -870,12 +873,12 @@ public: } } - drain_all_but(1); /* the lease cr still needs to run */ + drain_all_but_stack(lease_stack); /* the lease cr still needs to run */ yield lease_cr->go_down(); int ret; - while (collect(&ret)) { + while (collect(&ret, NULL)) { if (ret < 0) { return set_cr_error(ret); } @@ -1253,6 +1256,7 @@ class RGWMetaSyncShardCR : public RGWCoroutine { boost::asio::coroutine full_cr; RGWContinuousLeaseCR *lease_cr = nullptr; + RGWCoroutinesStack *lease_stack = nullptr; bool lost_lock = false; bool *reset_backoff; @@ -1383,7 +1387,7 @@ public: sync_env->shard_obj_name(shard_id), lock_name, lock_duration, this); lease_cr->get(); - spawn(lease_cr, false); + lease_stack = spawn(lease_cr, false); lost_lock = false; } while (!lease_cr->is_locked()) { @@ -1505,7 +1509,7 @@ public: sync_env->shard_obj_name(shard_id), lock_name, lock_duration, this); lease_cr->get(); - spawn(lease_cr, false); + lease_stack = spawn(lease_cr, false); lost_lock = false; } while (!lease_cr->is_locked()) { @@ -1728,7 +1732,7 @@ public: } } // wait for each shard to complete - collect(&ret); + collect(&ret, NULL); drain_all(); { // drop shard cr refs under lock -- 2.47.3