From: Yehuda Sadeh Date: Thu, 25 Jun 2020 20:47:51 +0000 (-0700) Subject: rgw: cr: add coroutine stack id X-Git-Tag: v16.1.0~421^2~2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=2c9eb8e8db9579e4a91148864ef146a44e28f684;p=ceph.git rgw: cr: add coroutine stack id The stack id is unique per cr stack manager, and can be used to identify collected stacks. Signed-off-by: Yehuda Sadeh --- diff --git a/src/rgw/rgw_coroutine.cc b/src/rgw/rgw_coroutine.cc index 3279392507e..512ead62d33 100644 --- a/src/rgw/rgw_coroutine.cc +++ b/src/rgw/rgw_coroutine.cc @@ -196,12 +196,17 @@ int64_t RGWCoroutinesManager::get_next_io_id() return (int64_t)++max_io_id; } +uint64_t RGWCoroutinesManager::get_next_stack_id() { + return (uint64_t)++max_stack_id; +} + RGWCoroutinesStack::RGWCoroutinesStack(CephContext *_cct, RGWCoroutinesManager *_ops_mgr, RGWCoroutine *start) : cct(_cct), ops_mgr(_ops_mgr), done_flag(false), error_flag(false), blocked_flag(false), sleep_flag(false), interval_wait_flag(false), is_scheduled(false), is_waiting_for_child(false), retcode(0), run_count(0), env(NULL), parent(NULL) { + id = ops_mgr->get_next_stack_id(); if (start) { ops.push_back(start); } @@ -360,7 +365,7 @@ void RGWCoroutinesStack::cancel() put(); } -bool RGWCoroutinesStack::collect(RGWCoroutine *op, int *ret, RGWCoroutinesStack *skip_stack) /* returns true if needs to be called again */ +bool RGWCoroutinesStack::collect(RGWCoroutine *op, int *ret, RGWCoroutinesStack *skip_stack, uint64_t *stack_id) /* returns true if needs to be called again */ { bool need_retry = false; rgw_spawned_stacks *s = (op ? &op->spawned : &spawned); @@ -378,6 +383,9 @@ bool RGWCoroutinesStack::collect(RGWCoroutine *op, int *ret, RGWCoroutinesStack } continue; } + if (stack_id) { + *stack_id = stack->get_id(); + } int r = stack->get_ret_status(); stack->put(); if (r < 0) { @@ -426,9 +434,9 @@ bool RGWCoroutinesStack::collect_next(RGWCoroutine *op, int *ret, RGWCoroutinesS return false; } -bool RGWCoroutinesStack::collect(int *ret, RGWCoroutinesStack *skip_stack) /* returns true if needs to be called again */ +bool RGWCoroutinesStack::collect(int *ret, RGWCoroutinesStack *skip_stack, uint64_t *stack_id) /* returns true if needs to be called again */ { - return collect(NULL, ret, skip_stack); + return collect(NULL, ret, skip_stack, stack_id); } static void _aio_completion_notifier_cb(librados::completion_t cb, void *arg) @@ -884,9 +892,9 @@ RGWCoroutinesStack *RGWCoroutine::spawn(RGWCoroutine *op, bool wait) return stack->spawn(this, op, wait); } -bool RGWCoroutine::collect(int *ret, RGWCoroutinesStack *skip_stack) /* returns true if needs to be called again */ +bool RGWCoroutine::collect(int *ret, RGWCoroutinesStack *skip_stack, uint64_t *stack_id) /* returns true if needs to be called again */ { - return stack->collect(this, ret, skip_stack); + return stack->collect(this, ret, skip_stack, stack_id); } bool RGWCoroutine::collect_next(int *ret, RGWCoroutinesStack **collected_stack) /* returns true if found a stack to collect */ @@ -926,7 +934,7 @@ ostream& operator<<(ostream& out, const RGWCoroutine& cr) bool RGWCoroutine::drain_children(int num_cr_left, RGWCoroutinesStack *skip_stack, - std::optional > cb) + std::optional > cb) { bool done = false; ceph_assert(num_cr_left >= 0); @@ -937,14 +945,15 @@ bool RGWCoroutine::drain_children(int num_cr_left, while (num_spawned() > (size_t)num_cr_left) { yield wait_for_child(); int ret; - while (collect(&ret, skip_stack)) { + uint64_t stack_id; + while (collect(&ret, skip_stack, &stack_id)) { if (ret < 0) { ldout(cct, 10) << "collect() returned ret=" << ret << dendl; /* we should have reported this error */ log_error() << "ERROR: collect() returned error (ret=" << ret << ")"; } if (cb) { - (*cb)(ret); + (*cb)(stack_id, ret); } } } @@ -954,7 +963,7 @@ bool RGWCoroutine::drain_children(int num_cr_left, } bool RGWCoroutine::drain_children(int num_cr_left, - std::optional > cb) + std::optional > cb) { bool done = false; ceph_assert(num_cr_left >= 0); @@ -963,14 +972,15 @@ bool RGWCoroutine::drain_children(int num_cr_left, while (num_spawned() > (size_t)num_cr_left) { yield wait_for_child(); int ret; - while (collect(&ret, nullptr)) { + uint64_t stack_id; + while (collect(&ret, nullptr, &stack_id)) { if (ret < 0) { ldout(cct, 10) << "collect() returned ret=" << ret << dendl; /* we should have reported this error */ log_error() << "ERROR: collect() returned error (ret=" << ret << ")"; } if (cb && !drain_status.should_exit) { - int r = (*cb)(ret); + int r = (*cb)(stack_id, ret); if (r < 0) { drain_status.ret = r; num_cr_left = 0; /* need to drain all */ diff --git a/src/rgw/rgw_coroutine.h b/src/rgw/rgw_coroutine.h index 048dc198493..f589b4eac5d 100644 --- a/src/rgw/rgw_coroutine.h +++ b/src/rgw/rgw_coroutine.h @@ -299,20 +299,20 @@ public: void call(RGWCoroutine *op); /* call at the same stack we're in */ RGWCoroutinesStack *spawn(RGWCoroutine *op, bool wait); /* execute on a different stack */ - bool collect(int *ret, RGWCoroutinesStack *skip_stack); /* returns true if needs to be called again */ + bool collect(int *ret, RGWCoroutinesStack *skip_stack, uint64_t *stack_id = nullptr); /* returns true if needs to be called again */ bool collect_next(int *ret, RGWCoroutinesStack **collected_stack = NULL); /* returns true if found a stack to collect */ int wait(const utime_t& interval); bool drain_children(int num_cr_left, RGWCoroutinesStack *skip_stack = nullptr, - std::optional > cb = std::nullopt); /* returns true if needed to be called again, - cb will be called on completion of every - completion. */ + std::optional > cb = std::nullopt); /* returns true if needed to be called again, + cb will be called on completion of every + completion. */ bool drain_children(int num_cr_left, - std::optional > cb); /* returns true if needed to be called again, - cb will be called on every completion, can filter errors. - A negative return value from cb means that current cr - will need to exit */ + std::optional > cb); /* returns true if needed to be called again, + cb will be called on every completion, can filter errors. + A negative return value from cb means that current cr + will need to exit */ void wakeup(); void set_sleeping(bool flag); /* put in sleep, or wakeup from sleep */ @@ -425,6 +425,8 @@ class RGWCoroutinesStack : public RefCountedObject { CephContext *cct; + int64_t id{-1}; + RGWCoroutinesManager *ops_mgr; list ops; @@ -457,12 +459,16 @@ protected: RGWCoroutinesStack *parent; RGWCoroutinesStack *spawn(RGWCoroutine *source_op, RGWCoroutine *next_op, bool wait); - bool collect(RGWCoroutine *op, int *ret, RGWCoroutinesStack *skip_stack); /* returns true if needs to be called again */ + bool collect(RGWCoroutine *op, int *ret, RGWCoroutinesStack *skip_stack, uint64_t *stack_id); /* returns true if needs to be called again */ bool collect_next(RGWCoroutine *op, int *ret, RGWCoroutinesStack **collected_stack); /* returns true if found a stack to collect */ public: RGWCoroutinesStack(CephContext *_cct, RGWCoroutinesManager *_ops_mgr, RGWCoroutine *start = NULL); ~RGWCoroutinesStack() override; + int64_t get_id() const { + return id; + } + int operate(RGWCoroutinesEnv *env); bool is_done() { @@ -534,7 +540,7 @@ public: } void io_complete(const rgw_io_id& io_id); - bool collect(int *ret, RGWCoroutinesStack *skip_stack); /* returns true if needs to be called again */ + bool collect(int *ret, RGWCoroutinesStack *skip_stack, uint64_t *stack_id); /* returns true if needs to be called again */ void cancel(); @@ -617,6 +623,7 @@ class RGWCoroutinesManager { map > run_contexts; std::atomic max_io_id = { 0 }; + std::atomic max_stack_id = { 0 }; mutable ceph::shared_mutex lock = ceph::make_shared_mutex("RGWCoroutinesManager::lock"); @@ -671,6 +678,7 @@ public: RGWCoroutinesStack *allocate_stack(); int64_t get_next_io_id(); + uint64_t get_next_stack_id(); void set_sleeping(RGWCoroutine *cr, bool flag); void io_complete(RGWCoroutine *cr, const rgw_io_id& io_id); diff --git a/src/rgw/rgw_data_sync.cc b/src/rgw/rgw_data_sync.cc index 9b1d4e7c56a..082916d97ac 100644 --- a/src/rgw/rgw_data_sync.cc +++ b/src/rgw/rgw_data_sync.cc @@ -1576,7 +1576,7 @@ public: sync_marker.marker = iter->first; drain_all_but_stack_cb(lease_stack.get(), - [&](int ret) { + [&](uint64_t stack_id, int ret) { if (ret < 0) { tn->log(10, "a sync operation returned error"); } @@ -1724,7 +1724,7 @@ public: } drain_all_but_stack_cb(lease_stack.get(), - [&](int ret) { + [&](uint64_t stack_id, int ret) { if (ret < 0) { tn->log(10, "a sync operation returned error"); } @@ -3792,7 +3792,7 @@ int RGWBucketShardFullSyncCR::operate() false); } drain_with_cb(BUCKET_SYNC_SPAWN_WINDOW, - [&](int ret) { + [&](uint64_t stack_id, int ret) { if (ret < 0) { tn->log(10, "a sync operation returned error"); sync_status = ret; @@ -3804,7 +3804,7 @@ int RGWBucketShardFullSyncCR::operate() set_status("done iterating over all objects"); /* wait for all operations to complete */ - drain_all_cb([&](int ret) { + drain_all_cb([&](uint64_t stack_id, int ret) { if (ret < 0) { tn->log(10, "a sync operation returned error"); sync_status = ret; @@ -4089,7 +4089,7 @@ int RGWBucketShardIncrementalSyncCR::operate() } // } drain_with_cb(BUCKET_SYNC_SPAWN_WINDOW, - [&](int ret) { + [&](uint64_t stack_id, int ret) { if (ret < 0) { tn->log(10, "a sync operation returned error"); sync_status = ret; @@ -4099,7 +4099,7 @@ int RGWBucketShardIncrementalSyncCR::operate() } } while (!list_result.empty() && sync_status == 0 && !syncstopped); - drain_all_cb([&](int ret) { + drain_all_cb([&](uint64_t stack_id, int ret) { if (ret < 0) { tn->log(10, "a sync operation returned error"); sync_status = ret; @@ -4351,7 +4351,7 @@ int RGWRunBucketSourcesSyncCR::operate() yield_spawn_window(new RGWRunBucketSyncCoroutine(sc, lease_cr, sync_pair, tn, &*cur_shard_progress), BUCKET_SYNC_SPAWN_WINDOW, - [&](int ret) { + [&](uint64_t stack_id, int ret) { if (ret < 0) { tn->log(10, "a sync operation returned error"); } @@ -4359,7 +4359,7 @@ int RGWRunBucketSourcesSyncCR::operate() }); } } - drain_all_cb([&](int ret) { + drain_all_cb([&](uint64_t stack_id, int ret) { if (ret < 0) { tn->log(10, "a sync operation returned error"); }