return out;
}
-bool RGWCoroutine::drain_children(int num_cr_left, RGWCoroutinesStack *skip_stack)
+bool RGWCoroutine::drain_children(int num_cr_left,
+ RGWCoroutinesStack *skip_stack,
+ std::optional<std::function<void(int ret)> > err_cb)
{
bool done = false;
ceph_assert(num_cr_left >= 0);
if (num_cr_left == 0 && skip_stack) {
num_cr_left = 1;
}
- reenter(&drain_cr) {
+ reenter(&drain_status.cr) {
while (num_spawned() > (size_t)num_cr_left) {
yield wait_for_child();
int ret;
while (collect(&ret, skip_stack)) {
+ if (ret < 0) {
+ if (!err_cb) {
+ ldout(cct, 10) << "collect() returned ret=" << ret << dendl;
+ /* we should have reported this error */
+ log_error() << "ERROR: collect() returned error (ret=" << ret << ")";
+ } else {
+ (*err_cb)(ret);
+ }
+ }
+ }
+ }
+ done = true;
+ }
+ return done;
+}
+
+bool RGWCoroutine::drain_children(int num_cr_left,
+ std::optional<std::function<int(int ret)> > err_cb)
+{
+ bool done = false;
+ ceph_assert(num_cr_left >= 0);
+
+ reenter(&drain_status.cr) {
+ while (num_spawned() > (size_t)num_cr_left) {
+ yield wait_for_child();
+ int ret;
+ while (collect(&ret, nullptr)) {
if (ret < 0) {
ldout(cct, 10) << "collect() returned ret=" << ret << dendl;
/* we should have reported this error */
log_error() << "ERROR: collect() returned error (ret=" << ret << ")";
+ if (err_cb && !drain_status.should_exit) {
+ int r = (*err_cb)(ret);
+ if (r < 0) {
+ drain_status.ret = r;
+ num_cr_left = 0; /* need to drain all */
+ }
+ }
}
}
}
protected:
bool _yield_ret;
- boost::asio::coroutine drain_cr;
+
+ struct {
+ boost::asio::coroutine cr;
+ bool should_exit{false};
+ int ret{0};
+
+ void init() {
+ cr = boost::asio::coroutine();
+ should_exit = false;
+ ret = 0;
+ }
+ } drain_status;
CephContext *cct;
bool collect_next(int *ret, RGWCoroutinesStack **collected_stack = NULL); /* returns true if found a stack to collect */
int wait(const utime_t& interval);
- bool drain_children(int num_cr_left, RGWCoroutinesStack *skip_stack = NULL); /* returns true if needed to be called again */
+ bool drain_children(int num_cr_left,
+ RGWCoroutinesStack *skip_stack = nullptr,
+ std::optional<std::function<void(int ret)> > err_cb = std::nullopt); /* returns true if needed to be called again,
+ err_cb is just for reporting error */
+ bool drain_children(int num_cr_left,
+ std::optional<std::function<int(int ret)> > err_cb); /* returns true if needed to be called again,
+ err_cb is for filtering error. A negative return
+ value means that we need to exit current cr */
void wakeup();
void set_sleeping(bool flag); /* put in sleep, or wakeup from sleep */
} while (0)
#define drain_all() \
- drain_cr = boost::asio::coroutine(); \
+ drain_status.init(); \
yield_until_true(drain_children(0))
#define drain_all_but(n) \
- drain_cr = boost::asio::coroutine(); \
+ drain_status.init(); \
yield_until_true(drain_children(n))
#define drain_all_but_stack(stack) \
- drain_cr = boost::asio::coroutine(); \
+ drain_status.init(); \
yield_until_true(drain_children(1, stack))
+#define drain_all_but_stack_cb(stack, cb) \
+ drain_status.init(); \
+ yield_until_true(drain_children(1, stack, cb))
+
+#define drain_with_cb(n, err_cb) \
+ drain_status.init(); \
+ yield_until_true(drain_children(n, err_cb)); \
+ if (drain_status.should_exit) { \
+ return set_cr_error(drain_status.ret); \
+ }
+
+#define drain_all_cb(cb) \
+ drain_with_cb(0, cb)
+
+#define yield_spawn_window(cr, n, err_cb) \
+ do { \
+ spawn(cr, false); \
+ drain_with_cb(n, err_cb); /* this is guaranteed to yield */ \
+ } while (0)
+
+
+
template <class T>
class RGWConsumerCR : public RGWCoroutine {
list<T> product;