From 21da1039fc57bcf4054c0e7a13bb2732781770f6 Mon Sep 17 00:00:00 2001 From: Yehuda Sadeh Date: Tue, 12 Jul 2016 20:36:35 -0700 Subject: [PATCH] rgw: collect skips a specific coroutine stack Fixes: http://tracker.ceph.com/issues/16665 Instead of drain_all_but() that specifies number of stacks to leave behind, added drain_all_but_stack() that has a specific stack specified. This is needed so that we don't call wakeup() through lease_cr->go_down() on a cr stack that was already collected. Signed-off-by: Yehuda Sadeh (cherry picked from commit 5a2e8f0526db92a290c711f82627fc5042c290ea) --- src/rgw/rgw_coroutine.cc | 20 ++++++++++++-------- src/rgw/rgw_coroutine.h | 12 ++++++++---- src/rgw/rgw_data_sync.cc | 35 +++++++++++++++++++---------------- src/rgw/rgw_sync.cc | 26 +++++++++++++++----------- 4 files changed, 54 insertions(+), 39 deletions(-) diff --git a/src/rgw/rgw_coroutine.cc b/src/rgw/rgw_coroutine.cc index a09afc85532cb..2e478f2385060 100644 --- a/src/rgw/rgw_coroutine.cc +++ b/src/rgw/rgw_coroutine.cc @@ -292,7 +292,7 @@ int RGWCoroutinesStack::unwind(int retcode) } -bool RGWCoroutinesStack::collect(RGWCoroutine *op, int *ret) /* returns true if needs to be called again */ +bool RGWCoroutinesStack::collect(RGWCoroutine *op, int *ret, RGWCoroutinesStack *skip_stack) /* returns true if needs to be called again */ { rgw_spawned_stacks *s = (op ? &op->spawned : &spawned); *ret = 0; @@ -300,7 +300,7 @@ bool RGWCoroutinesStack::collect(RGWCoroutine *op, int *ret) /* returns true if for (vector::iterator iter = s->entries.begin(); iter != s->entries.end(); ++iter) { RGWCoroutinesStack *stack = *iter; - if (!stack->is_done()) { + if (stack == skip_stack || !stack->is_done()) { new_list.push_back(stack); ldout(cct, 20) << "collect(): s=" << (void *)this << " stack=" << (void *)stack << " is still running" << dendl; continue; @@ -349,9 +349,9 @@ bool RGWCoroutinesStack::collect_next(RGWCoroutine *op, int *ret, RGWCoroutinesS return false; } -bool RGWCoroutinesStack::collect(int *ret) /* returns true if needs to be called again */ +bool RGWCoroutinesStack::collect(int *ret, RGWCoroutinesStack *skip_stack) /* returns true if needs to be called again */ { - return collect(NULL, ret); + return collect(NULL, ret, skip_stack); } static void _aio_completion_notifier_cb(librados::completion_t cb, void *arg); @@ -712,9 +712,9 @@ RGWCoroutinesStack *RGWCoroutine::spawn(RGWCoroutine *op, bool wait) return stack->spawn(this, op, wait); } -bool RGWCoroutine::collect(int *ret) /* returns true if needs to be called again */ +bool RGWCoroutine::collect(int *ret, RGWCoroutinesStack *skip_stack) /* returns true if needs to be called again */ { - return stack->collect(this, ret); + return stack->collect(this, ret, skip_stack); } bool RGWCoroutine::collect_next(int *ret, RGWCoroutinesStack **collected_stack) /* returns true if found a stack to collect */ @@ -752,14 +752,18 @@ ostream& operator<<(ostream& out, const RGWCoroutine& cr) return out; } -bool RGWCoroutine::drain_children(int num_cr_left) +bool RGWCoroutine::drain_children(int num_cr_left, RGWCoroutinesStack *skip_stack) { bool done = false; + assert(num_cr_left >= 0); + if (num_cr_left == 0 && skip_stack) { + num_cr_left = 1; + } reenter(&drain_cr) { while (num_spawned() > (size_t)num_cr_left) { yield wait_for_child(); int ret; - while (collect(&ret)) { + while (collect(&ret, skip_stack)) { if (ret < 0) { ldout(cct, 10) << "collect() returned ret=" << ret << dendl; /* we should have reported this error */ diff --git a/src/rgw/rgw_coroutine.h b/src/rgw/rgw_coroutine.h index 11addf68b2300..6b17fa0a69b7b 100644 --- a/src/rgw/rgw_coroutine.h +++ b/src/rgw/rgw_coroutine.h @@ -265,11 +265,11 @@ public: void call(RGWCoroutine *op); /* call at the same stack we're in */ RGWCoroutinesStack *spawn(RGWCoroutine *op, bool wait); /* execute on a different stack */ - bool collect(int *ret); /* returns true if needs to be called again */ + bool collect(int *ret, RGWCoroutinesStack *skip_stack); /* returns true if needs to be called again */ bool collect_next(int *ret, RGWCoroutinesStack **collected_stack = NULL); /* returns true if found a stack to collect */ int wait(const utime_t& interval); - bool drain_children(int num_cr_left); /* returns true if needed to be called again */ + bool drain_children(int num_cr_left, RGWCoroutinesStack *skip_stack = NULL); /* returns true if needed to be called again */ void wakeup(); void set_sleeping(bool flag); /* put in sleep, or wakeup from sleep */ @@ -306,6 +306,10 @@ do { \ drain_cr = boost::asio::coroutine(); \ yield_until_true(drain_children(n)) +#define drain_all_but_stack(stack) \ + drain_cr = boost::asio::coroutine(); \ + yield_until_true(drain_children(1, stack)) + template class RGWConsumerCR : public RGWCoroutine { list product; @@ -371,7 +375,7 @@ protected: RGWCoroutinesStack *parent; RGWCoroutinesStack *spawn(RGWCoroutine *source_op, RGWCoroutine *next_op, bool wait); - bool collect(RGWCoroutine *op, int *ret); /* returns true if needs to be called again */ + bool collect(RGWCoroutine *op, int *ret, RGWCoroutinesStack *skip_stack); /* returns true if needs to be called again */ bool collect_next(RGWCoroutine *op, int *ret, RGWCoroutinesStack **collected_stack); /* returns true if found a stack to collect */ public: RGWCoroutinesStack(CephContext *_cct, RGWCoroutinesManager *_ops_mgr, RGWCoroutine *start = NULL); @@ -442,7 +446,7 @@ public: int wait(const utime_t& interval); void wakeup(); - bool collect(int *ret); /* returns true if needs to be called again */ + bool collect(int *ret, RGWCoroutinesStack *skip_stack); /* returns true if needs to be called again */ RGWAioCompletionNotifier *create_completion_notifier(); RGWCompletionManager *get_completion_mgr(); diff --git a/src/rgw/rgw_data_sync.cc b/src/rgw/rgw_data_sync.cc index 0ef55230ef929..95d5b8de1bebe 100644 --- a/src/rgw/rgw_data_sync.cc +++ b/src/rgw/rgw_data_sync.cc @@ -471,7 +471,7 @@ public: spawn(new RGWReadRemoteDataLogShardInfoCR(sync_env, i, &shards_info[i]), true); } } - while (collect(&ret)) { + while (collect(&ret, NULL)) { if (ret < 0) { return set_state(RGWCoroutine_Error); } @@ -496,7 +496,7 @@ public: call(new RGWSimpleRadosUnlockCR(sync_env->async_rados, store, store->get_zone_params().log_pool, sync_status_oid, lock_name, cookie)); } - while (collect(&ret)) { + while (collect(&ret, NULL)) { if (ret < 0) { return set_state(RGWCoroutine_Error); } @@ -723,7 +723,7 @@ public: yield call(sync_env->error_logger->log_error_cr(sync_env->conn->get_remote_id(), "data.init", "", EIO, string("failed to build bucket instances map"))); } - while (collect(&ret)) { + while (collect(&ret, NULL)) { if (ret < 0) { yield call(sync_env->error_logger->log_error_cr(sync_env->conn->get_remote_id(), "data.init", "", -ret, string("failed to store sync status: ") + cpp_strerror(-ret))); @@ -985,6 +985,7 @@ class RGWDataSyncShardCR : public RGWCoroutine { set spawned_keys; RGWContinuousLeaseCR *lease_cr; + RGWCoroutinesStack *lease_stack; string status_oid; @@ -1012,7 +1013,7 @@ public: sync_marker(_marker), marker_tracker(NULL), truncated(false), inc_lock("RGWDataSyncShardCR::inc_lock"), total_entries(0), spawn_window(BUCKET_SHARD_SYNC_SPAWN_WINDOW), reset_backoff(NULL), - lease_cr(NULL), error_repo(nullptr), max_error_entries(DATA_SYNC_MAX_ERR_ENTRIES), + lease_cr(nullptr), lease_stack(nullptr), error_repo(nullptr), max_error_entries(DATA_SYNC_MAX_ERR_ENTRIES), retry_backoff_secs(RETRY_BACKOFF_SECS_DEFAULT) { set_description() << "data sync shard source_zone=" << sync_env->source_zone << " shard_id=" << shard_id; status_oid = RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, shard_id); @@ -1079,7 +1080,7 @@ public: lease_cr = new RGWContinuousLeaseCR(sync_env->async_rados, store, store->get_zone_params().log_pool, status_oid, lock_name, lock_duration, this); lease_cr->get(); - spawn(lease_cr, false); + lease_stack = spawn(lease_cr, false); } int full_sync() { @@ -1251,7 +1252,7 @@ public: set_status() << "num_spawned() > spawn_window"; yield wait_for_child(); int ret; - while (collect(&ret)) { + while (collect(&ret, lease_stack)) { if (ret < 0) { ldout(sync_env->cct, 0) << "ERROR: a sync operation returned error" << dendl; /* we have reported this error */ @@ -2191,6 +2192,7 @@ class RGWBucketShardFullSyncCR : public RGWCoroutine { int total_entries; RGWContinuousLeaseCR *lease_cr; + RGWCoroutinesStack *lease_stack; string status_oid; @@ -2204,7 +2206,7 @@ public: full_marker(_full_marker), marker_tracker(NULL), spawn_window(BUCKET_SYNC_SPAWN_WINDOW), entry(NULL), op(CLS_RGW_OP_ADD), - total_entries(0), lease_cr(NULL) { + total_entries(0), lease_cr(nullptr), lease_stack(nullptr) { status_oid = RGWBucketSyncStatusManager::status_oid(sync_env->source_zone, bs); logger.init(sync_env, "BucketFull", bs.get_key()); } @@ -2231,7 +2233,7 @@ int RGWBucketShardFullSyncCR::operate() lease_cr = new RGWContinuousLeaseCR(sync_env->async_rados, store, store->get_zone_params().log_pool, status_oid, lock_name, lock_duration, this); lease_cr->get(); - spawn(lease_cr, false); + lease_stack = spawn(lease_cr, false); } while (!lease_cr->is_locked()) { if (lease_cr->is_done()) { @@ -2280,7 +2282,7 @@ int RGWBucketShardFullSyncCR::operate() } while ((int)num_spawned() > spawn_window) { yield wait_for_child(); - while (collect(&ret)) { + while (collect(&ret, lease_stack)) { if (ret < 0) { ldout(sync_env->cct, 0) << "ERROR: a sync operation returned error" << dendl; /* we have reported this error */ @@ -2291,7 +2293,7 @@ int RGWBucketShardFullSyncCR::operate() } while (list_result.is_truncated); set_status("done iterating over all objects"); /* wait for all operations to complete */ - drain_all_but(1); /* still need to hold lease cr */ + drain_all_but_stack(lease_stack); /* still need to hold lease cr */ /* update sync state to incremental */ yield { rgw_bucket_shard_sync_info sync_status; @@ -2329,6 +2331,7 @@ class RGWBucketShardIncrementalSyncCR : public RGWCoroutine { const int spawn_window{BUCKET_SYNC_SPAWN_WINDOW}; bool updated_status{false}; RGWContinuousLeaseCR *lease_cr{nullptr}; + RGWCoroutinesStack *lease_stack{nullptr}; const string status_oid; string name; @@ -2375,7 +2378,7 @@ int RGWBucketShardIncrementalSyncCR::operate() lease_cr = new RGWContinuousLeaseCR(sync_env->async_rados, store, store->get_zone_params().log_pool, status_oid, lock_name, lock_duration, this); lease_cr->get(); - spawn(lease_cr, false); + lease_stack = spawn(lease_cr, false); } while (!lease_cr->is_locked()) { if (lease_cr->is_done()) { @@ -2396,7 +2399,7 @@ int RGWBucketShardIncrementalSyncCR::operate() &list_result)); if (retcode < 0 && retcode != -ENOENT) { /* wait for all operations to complete */ - drain_all_but(1); + drain_all_but_stack(lease_stack); lease_cr->go_down(); drain_all(); return set_cr_error(retcode); @@ -2476,7 +2479,7 @@ int RGWBucketShardIncrementalSyncCR::operate() } ldout(sync_env->cct, 5) << *this << ": [inc sync] can't do op on key=" << key << " need to wait for conflicting operation to complete" << dendl; yield wait_for_child(); - while (collect(&ret)) { + while (collect(&ret, lease_stack)) { if (ret < 0) { ldout(sync_env->cct, 0) << "ERROR: a child operation returned error (ret=" << ret << ")" << dendl; /* we have reported this error */ @@ -2508,7 +2511,7 @@ int RGWBucketShardIncrementalSyncCR::operate() while ((int)num_spawned() > spawn_window) { set_status() << "num_spawned() > spawn_window"; yield wait_for_child(); - while (collect(&ret)) { + while (collect(&ret, lease_stack)) { if (ret < 0) { ldout(sync_env->cct, 0) << "ERROR: a sync operation returned error" << dendl; /* we have reported this error */ @@ -2524,13 +2527,13 @@ int RGWBucketShardIncrementalSyncCR::operate() } if (retcode < 0) { ldout(sync_env->cct, 0) << "ERROR: marker_tracker->flush() returned retcode=" << retcode << dendl; - drain_all_but(1); + drain_all_but_stack(lease_stack); lease_cr->go_down(); drain_all(); return set_cr_error(retcode); } - drain_all_but(1); + drain_all_but_stack(lease_stack); lease_cr->go_down(); /* wait for all operations to complete */ drain_all(); diff --git a/src/rgw/rgw_sync.cc b/src/rgw/rgw_sync.cc index de71dffffd176..4afa879a10a65 100644 --- a/src/rgw/rgw_sync.cc +++ b/src/rgw/rgw_sync.cc @@ -591,12 +591,13 @@ class RGWInitSyncStatusCoroutine : public RGWCoroutine { rgw_meta_sync_info status; vector shards_info; RGWContinuousLeaseCR *lease_cr; + RGWCoroutinesStack *lease_stack; public: RGWInitSyncStatusCoroutine(RGWMetaSyncEnv *_sync_env, const rgw_meta_sync_info &status) : RGWCoroutine(_sync_env->store->ctx()), sync_env(_sync_env), status(status), shards_info(status.num_shards), - lease_cr(NULL) {} + lease_cr(nullptr), lease_stack(nullptr) {} ~RGWInitSyncStatusCoroutine() { if (lease_cr) { @@ -616,7 +617,7 @@ public: lease_cr = new RGWContinuousLeaseCR(sync_env->async_rados, store, store->get_zone_params().log_pool, sync_env->status_oid(), lock_name, lock_duration, this); lease_cr->get(); - spawn(lease_cr, false); + lease_stack = spawn(lease_cr, false); } while (!lease_cr->is_locked()) { if (lease_cr->is_done()) { @@ -649,7 +650,7 @@ public: } } - drain_all_but(1); /* the lease cr still needs to run */ + drain_all_but_stack(lease_stack); /* the lease cr still needs to run */ yield { set_status("updating sync status"); @@ -672,7 +673,7 @@ public: } set_status("drop lock lease"); yield lease_cr->go_down(); - while (collect(&ret)) { + while (collect(&ret, NULL)) { if (ret < 0) { return set_cr_error(ret); } @@ -735,6 +736,7 @@ class RGWFetchAllMetaCR : public RGWCoroutine { RGWShardedOmapCRManager *entries_index; RGWContinuousLeaseCR *lease_cr; + RGWCoroutinesStack *lease_stack; bool lost_lock; bool failed; @@ -744,7 +746,8 @@ public: RGWFetchAllMetaCR(RGWMetaSyncEnv *_sync_env, int _num_shards, map& _markers) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), num_shards(_num_shards), - ret_status(0), entries_index(NULL), lease_cr(NULL), lost_lock(false), failed(false), markers(_markers) { + ret_status(0), entries_index(NULL), lease_cr(nullptr), lease_stack(nullptr), + lost_lock(false), failed(false), markers(_markers) { } ~RGWFetchAllMetaCR() { @@ -789,7 +792,7 @@ public: lease_cr = new RGWContinuousLeaseCR(sync_env->async_rados, sync_env->store, sync_env->store->get_zone_params().log_pool, sync_env->status_oid(), lock_name, lock_duration, this); lease_cr->get(); - spawn(lease_cr, false); + lease_stack = spawn(lease_cr, false); } while (!lease_cr->is_locked()) { if (lease_cr->is_done()) { @@ -868,12 +871,12 @@ public: } } - drain_all_but(1); /* the lease cr still needs to run */ + drain_all_but_stack(lease_stack); /* the lease cr still needs to run */ yield lease_cr->go_down(); int ret; - while (collect(&ret)) { + while (collect(&ret, NULL)) { if (ret < 0) { return set_cr_error(ret); } @@ -1250,6 +1253,7 @@ class RGWMetaSyncShardCR : public RGWCoroutine { boost::asio::coroutine full_cr; RGWContinuousLeaseCR *lease_cr = nullptr; + RGWCoroutinesStack *lease_stack = nullptr; bool lost_lock = false; bool *reset_backoff; @@ -1380,7 +1384,7 @@ public: sync_env->shard_obj_name(shard_id), lock_name, lock_duration, this); lease_cr->get(); - spawn(lease_cr, false); + lease_stack = spawn(lease_cr, false); lost_lock = false; } while (!lease_cr->is_locked()) { @@ -1502,7 +1506,7 @@ public: sync_env->shard_obj_name(shard_id), lock_name, lock_duration, this); lease_cr->get(); - spawn(lease_cr, false); + lease_stack = spawn(lease_cr, false); lost_lock = false; } while (!lease_cr->is_locked()) { @@ -1725,7 +1729,7 @@ public: } } // wait for each shard to complete - collect(&ret); + collect(&ret, NULL); drain_all(); { // drop shard cr refs under lock -- 2.39.5