bool RGWCoroutine::drain_children(int num_cr_left,
RGWCoroutinesStack *skip_stack,
- std::optional<std::function<void(int ret)> > err_cb)
+ std::optional<std::function<void(int ret)> > cb)
{
bool done = false;
ceph_assert(num_cr_left >= 0);
int ret;
while (collect(&ret, skip_stack)) {
if (ret < 0) {
- if (!err_cb) {
ldout(cct, 10) << "collect() returned ret=" << ret << dendl;
/* we should have reported this error */
log_error() << "ERROR: collect() returned error (ret=" << ret << ")";
- } else {
- (*err_cb)(ret);
- }
+ }
+ if (cb) {
+ (*cb)(ret);
}
}
}
}
bool RGWCoroutine::drain_children(int num_cr_left,
- std::optional<std::function<int(int ret)> > err_cb)
+ std::optional<std::function<int(int ret)> > cb)
{
bool done = false;
ceph_assert(num_cr_left >= 0);
ldout(cct, 10) << "collect() returned ret=" << ret << dendl;
/* we should have reported this error */
log_error() << "ERROR: collect() returned error (ret=" << ret << ")";
- if (err_cb && !drain_status.should_exit) {
- int r = (*err_cb)(ret);
- if (r < 0) {
- drain_status.ret = r;
- num_cr_left = 0; /* need to drain all */
- }
+ }
+ if (cb && !drain_status.should_exit) {
+ int r = (*cb)(ret);
+ if (r < 0) {
+ drain_status.ret = r;
+ num_cr_left = 0; /* need to drain all */
}
}
}
int wait(const utime_t& interval);
bool drain_children(int num_cr_left,
RGWCoroutinesStack *skip_stack = nullptr,
- std::optional<std::function<void(int ret)> > err_cb = std::nullopt); /* returns true if needed to be called again,
- err_cb is just for reporting error */
+ std::optional<std::function<void(int ret)> > cb = std::nullopt); /* returns true if needed to be called again,
+ cb will be called on completion of every
+ completion. */
bool drain_children(int num_cr_left,
- std::optional<std::function<int(int ret)> > err_cb); /* returns true if needed to be called again,
- err_cb is for filtering error. A negative return
- value means that we need to exit current cr */
+ std::optional<std::function<int(int ret)> > cb); /* returns true if needed to be called again,
+ cb will be called on every completion, can filter errors.
+ A negative return value from cb means that current cr
+ will need to exit */
void wakeup();
void set_sleeping(bool flag); /* put in sleep, or wakeup from sleep */
drain_status.init(); \
yield_until_true(drain_children(1, stack, cb))
-#define drain_with_cb(n, err_cb) \
+#define drain_with_cb(n, cb) \
drain_status.init(); \
- yield_until_true(drain_children(n, err_cb)); \
+ yield_until_true(drain_children(n, cb)); \
if (drain_status.should_exit) { \
return set_cr_error(drain_status.ret); \
}
#define drain_all_cb(cb) \
drain_with_cb(0, cb)
-#define yield_spawn_window(cr, n, err_cb) \
+#define yield_spawn_window(cr, n, cb) \
do { \
spawn(cr, false); \
- drain_with_cb(n, err_cb); /* this is guaranteed to yield */ \
+ drain_with_cb(n, cb); /* this is guaranteed to yield */ \
} while (0)
drain_all_but_stack_cb(lease_stack.get(),
[&](int ret) {
- tn->log(10, "a sync operation returned error");
+ if (ret < 0) {
+ tn->log(10, "a sync operation returned error");
+ }
});
}
} while (omapvals->more);
drain_all_but_stack_cb(lease_stack.get(),
[&](int ret) {
- tn->log(10, "a sync operation returned error");
+ if (ret < 0) {
+ tn->log(10, "a sync operation returned error");
+ }
});
}
}
drain_with_cb(BUCKET_SYNC_SPAWN_WINDOW,
[&](int ret) {
- tn->log(10, "a sync operation returned error");
- sync_status = ret;
+ if (ret < 0) {
+ tn->log(10, "a sync operation returned error");
+ sync_status = ret;
+ }
return 0;
});
}
/* wait for all operations to complete */
drain_all_cb([&](int ret) {
- tn->log(10, "a sync operation returned error");
- sync_status = ret;
+ if (ret < 0) {
+ tn->log(10, "a sync operation returned error");
+ sync_status = ret;
+ }
return 0;
});
tn->unset_flag(RGW_SNS_FLAG_ACTIVE);
// }
drain_with_cb(BUCKET_SYNC_SPAWN_WINDOW,
[&](int ret) {
- tn->log(10, "a sync operation returned error");
- sync_status = ret;
+ if (ret < 0) {
+ tn->log(10, "a sync operation returned error");
+ sync_status = ret;
+ }
return 0;
});
}
} while (!list_result.empty() && sync_status == 0 && !syncstopped);
drain_all_cb([&](int ret) {
- tn->log(10, "a sync operation returned error");
- sync_status = ret;
+ if (ret < 0) {
+ tn->log(10, "a sync operation returned error");
+ sync_status = ret;
+ }
return 0;
});
tn->unset_flag(RGW_SNS_FLAG_ACTIVE);
&*cur_shard_progress),
BUCKET_SYNC_SPAWN_WINDOW,
[&](int ret) {
- tn->log(10, "a sync operation returned error");
+ if (ret < 0) {
+ tn->log(10, "a sync operation returned error");
+ }
return ret;
});
}
}
drain_all_cb([&](int ret) {
- tn->log(10, "a sync operation returned error");
+ if (ret < 0) {
+ tn->log(10, "a sync operation returned error");
+ }
return ret;
});
if (progress) {