From: Yehuda Sadeh Date: Tue, 9 Jun 2020 22:37:29 +0000 (-0700) Subject: rgw: cr: tool for spawn window management X-Git-Tag: v16.1.0~421^2~5 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=abd2deecef93cea448f5f203b02e6b3448c3df4d;p=ceph.git rgw: cr: tool for spawn window management Will make it possible to avoid duplicating lots of boilerplate code Signed-off-by: Yehuda Sadeh --- diff --git a/src/rgw/rgw_coroutine.cc b/src/rgw/rgw_coroutine.cc index 4c658ca30ac7..ebd9df9f7fe6 100644 --- a/src/rgw/rgw_coroutine.cc +++ b/src/rgw/rgw_coroutine.cc @@ -924,22 +924,58 @@ ostream& operator<<(ostream& out, const RGWCoroutine& cr) return out; } -bool RGWCoroutine::drain_children(int num_cr_left, RGWCoroutinesStack *skip_stack) +bool RGWCoroutine::drain_children(int num_cr_left, + RGWCoroutinesStack *skip_stack, + std::optional > err_cb) { bool done = false; ceph_assert(num_cr_left >= 0); if (num_cr_left == 0 && skip_stack) { num_cr_left = 1; } - reenter(&drain_cr) { + reenter(&drain_status.cr) { while (num_spawned() > (size_t)num_cr_left) { yield wait_for_child(); int ret; while (collect(&ret, skip_stack)) { + if (ret < 0) { + if (!err_cb) { + ldout(cct, 10) << "collect() returned ret=" << ret << dendl; + /* we should have reported this error */ + log_error() << "ERROR: collect() returned error (ret=" << ret << ")"; + } else { + (*err_cb)(ret); + } + } + } + } + done = true; + } + return done; +} + +bool RGWCoroutine::drain_children(int num_cr_left, + std::optional > err_cb) +{ + bool done = false; + ceph_assert(num_cr_left >= 0); + + reenter(&drain_status.cr) { + while (num_spawned() > (size_t)num_cr_left) { + yield wait_for_child(); + int ret; + while (collect(&ret, nullptr)) { if (ret < 0) { ldout(cct, 10) << "collect() returned ret=" << ret << dendl; /* we should have reported this error */ log_error() << "ERROR: collect() returned error (ret=" << ret << ")"; + if (err_cb && !drain_status.should_exit) { + int r = (*err_cb)(ret); + if (r < 0) { + drain_status.ret = r; + num_cr_left = 0; /* need to drain all */ + } + } } } } diff --git a/src/rgw/rgw_coroutine.h b/src/rgw/rgw_coroutine.h index a38421f4fc58..2bb41ffbbae4 100644 --- a/src/rgw/rgw_coroutine.h +++ b/src/rgw/rgw_coroutine.h @@ -219,7 +219,18 @@ class RGWCoroutine : public RefCountedObject, public boost::asio::coroutine { protected: bool _yield_ret; - boost::asio::coroutine drain_cr; + + struct { + boost::asio::coroutine cr; + bool should_exit{false}; + int ret{0}; + + void init() { + cr = boost::asio::coroutine(); + should_exit = false; + ret = 0; + } + } drain_status; CephContext *cct; @@ -292,7 +303,14 @@ public: bool collect_next(int *ret, RGWCoroutinesStack **collected_stack = NULL); /* returns true if found a stack to collect */ int wait(const utime_t& interval); - bool drain_children(int num_cr_left, RGWCoroutinesStack *skip_stack = NULL); /* returns true if needed to be called again */ + bool drain_children(int num_cr_left, + RGWCoroutinesStack *skip_stack = nullptr, + std::optional > err_cb = std::nullopt); /* returns true if needed to be called again, + err_cb is just for reporting error */ + bool drain_children(int num_cr_left, + std::optional > err_cb); /* returns true if needed to be called again, + err_cb is for filtering error. A negative return + value means that we need to exit current cr */ void wakeup(); void set_sleeping(bool flag); /* put in sleep, or wakeup from sleep */ @@ -336,17 +354,39 @@ do { \ } while (0) #define drain_all() \ - drain_cr = boost::asio::coroutine(); \ + drain_status.init(); \ yield_until_true(drain_children(0)) #define drain_all_but(n) \ - drain_cr = boost::asio::coroutine(); \ + drain_status.init(); \ yield_until_true(drain_children(n)) #define drain_all_but_stack(stack) \ - drain_cr = boost::asio::coroutine(); \ + drain_status.init(); \ yield_until_true(drain_children(1, stack)) +#define drain_all_but_stack_cb(stack, cb) \ + drain_status.init(); \ + yield_until_true(drain_children(1, stack, cb)) + +#define drain_with_cb(n, err_cb) \ + drain_status.init(); \ + yield_until_true(drain_children(n, err_cb)); \ + if (drain_status.should_exit) { \ + return set_cr_error(drain_status.ret); \ + } + +#define drain_all_cb(cb) \ + drain_with_cb(0, cb) + +#define yield_spawn_window(cr, n, err_cb) \ + do { \ + spawn(cr, false); \ + drain_with_cb(n, err_cb); /* this is guaranteed to yield */ \ + } while (0) + + + template class RGWConsumerCR : public RGWCoroutine { list product;