return (int64_t)++max_io_id;
}
+uint64_t RGWCoroutinesManager::get_next_stack_id() {
+ return (uint64_t)++max_stack_id;
+}
+
RGWCoroutinesStack::RGWCoroutinesStack(CephContext *_cct, RGWCoroutinesManager *_ops_mgr, RGWCoroutine *start) : cct(_cct), ops_mgr(_ops_mgr),
done_flag(false), error_flag(false), blocked_flag(false),
sleep_flag(false), interval_wait_flag(false), is_scheduled(false), is_waiting_for_child(false),
retcode(0), run_count(0),
env(NULL), parent(NULL)
{
+ id = ops_mgr->get_next_stack_id();
if (start) {
ops.push_back(start);
}
put();
}
-bool RGWCoroutinesStack::collect(RGWCoroutine *op, int *ret, RGWCoroutinesStack *skip_stack) /* returns true if needs to be called again */
+bool RGWCoroutinesStack::collect(RGWCoroutine *op, int *ret, RGWCoroutinesStack *skip_stack, uint64_t *stack_id) /* returns true if needs to be called again */
{
bool need_retry = false;
rgw_spawned_stacks *s = (op ? &op->spawned : &spawned);
}
continue;
}
+ if (stack_id) {
+ *stack_id = stack->get_id();
+ }
int r = stack->get_ret_status();
stack->put();
if (r < 0) {
return false;
}
-bool RGWCoroutinesStack::collect(int *ret, RGWCoroutinesStack *skip_stack) /* returns true if needs to be called again */
+bool RGWCoroutinesStack::collect(int *ret, RGWCoroutinesStack *skip_stack, uint64_t *stack_id) /* returns true if needs to be called again */
{
- return collect(NULL, ret, skip_stack);
+ return collect(NULL, ret, skip_stack, stack_id);
}
static void _aio_completion_notifier_cb(librados::completion_t cb, void *arg)
return stack->spawn(this, op, wait);
}
-bool RGWCoroutine::collect(int *ret, RGWCoroutinesStack *skip_stack) /* returns true if needs to be called again */
+bool RGWCoroutine::collect(int *ret, RGWCoroutinesStack *skip_stack, uint64_t *stack_id) /* returns true if needs to be called again */
{
- return stack->collect(this, ret, skip_stack);
+ return stack->collect(this, ret, skip_stack, stack_id);
}
bool RGWCoroutine::collect_next(int *ret, RGWCoroutinesStack **collected_stack) /* returns true if found a stack to collect */
bool RGWCoroutine::drain_children(int num_cr_left,
RGWCoroutinesStack *skip_stack,
- std::optional<std::function<void(int ret)> > cb)
+ std::optional<std::function<void(uint64_t stack_id, int ret)> > cb)
{
bool done = false;
ceph_assert(num_cr_left >= 0);
while (num_spawned() > (size_t)num_cr_left) {
yield wait_for_child();
int ret;
- while (collect(&ret, skip_stack)) {
+ uint64_t stack_id;
+ while (collect(&ret, skip_stack, &stack_id)) {
if (ret < 0) {
ldout(cct, 10) << "collect() returned ret=" << ret << dendl;
/* we should have reported this error */
log_error() << "ERROR: collect() returned error (ret=" << ret << ")";
}
if (cb) {
- (*cb)(ret);
+ (*cb)(stack_id, ret);
}
}
}
}
bool RGWCoroutine::drain_children(int num_cr_left,
- std::optional<std::function<int(int ret)> > cb)
+ std::optional<std::function<int(uint64_t stack_id, int ret)> > cb)
{
bool done = false;
ceph_assert(num_cr_left >= 0);
while (num_spawned() > (size_t)num_cr_left) {
yield wait_for_child();
int ret;
- while (collect(&ret, nullptr)) {
+ uint64_t stack_id;
+ while (collect(&ret, nullptr, &stack_id)) {
if (ret < 0) {
ldout(cct, 10) << "collect() returned ret=" << ret << dendl;
/* we should have reported this error */
log_error() << "ERROR: collect() returned error (ret=" << ret << ")";
}
if (cb && !drain_status.should_exit) {
- int r = (*cb)(ret);
+ int r = (*cb)(stack_id, ret);
if (r < 0) {
drain_status.ret = r;
num_cr_left = 0; /* need to drain all */
void call(RGWCoroutine *op); /* call at the same stack we're in */
RGWCoroutinesStack *spawn(RGWCoroutine *op, bool wait); /* execute on a different stack */
- bool collect(int *ret, RGWCoroutinesStack *skip_stack); /* returns true if needs to be called again */
+ bool collect(int *ret, RGWCoroutinesStack *skip_stack, uint64_t *stack_id = nullptr); /* returns true if needs to be called again */
bool collect_next(int *ret, RGWCoroutinesStack **collected_stack = NULL); /* returns true if found a stack to collect */
int wait(const utime_t& interval);
bool drain_children(int num_cr_left,
RGWCoroutinesStack *skip_stack = nullptr,
- std::optional<std::function<void(int ret)> > cb = std::nullopt); /* returns true if needed to be called again,
- cb will be called on completion of every
- completion. */
+ std::optional<std::function<void(uint64_t stack_id, int ret)> > cb = std::nullopt); /* returns true if needed to be called again,
+ cb will be called on completion of every
+ completion. */
bool drain_children(int num_cr_left,
- std::optional<std::function<int(int ret)> > cb); /* returns true if needed to be called again,
- cb will be called on every completion, can filter errors.
- A negative return value from cb means that current cr
- will need to exit */
+ std::optional<std::function<int(uint64_t stack_id, int ret)> > cb); /* returns true if needed to be called again,
+ cb will be called on every completion, can filter errors.
+ A negative return value from cb means that current cr
+ will need to exit */
void wakeup();
void set_sleeping(bool flag); /* put in sleep, or wakeup from sleep */
CephContext *cct;
+ int64_t id{-1};
+
RGWCoroutinesManager *ops_mgr;
list<RGWCoroutine *> ops;
RGWCoroutinesStack *parent;
RGWCoroutinesStack *spawn(RGWCoroutine *source_op, RGWCoroutine *next_op, bool wait);
- bool collect(RGWCoroutine *op, int *ret, RGWCoroutinesStack *skip_stack); /* returns true if needs to be called again */
+ bool collect(RGWCoroutine *op, int *ret, RGWCoroutinesStack *skip_stack, uint64_t *stack_id); /* returns true if needs to be called again */
bool collect_next(RGWCoroutine *op, int *ret, RGWCoroutinesStack **collected_stack); /* returns true if found a stack to collect */
public:
RGWCoroutinesStack(CephContext *_cct, RGWCoroutinesManager *_ops_mgr, RGWCoroutine *start = NULL);
~RGWCoroutinesStack() override;
+ int64_t get_id() const {
+ return id;
+ }
+
int operate(RGWCoroutinesEnv *env);
bool is_done() {
}
void io_complete(const rgw_io_id& io_id);
- bool collect(int *ret, RGWCoroutinesStack *skip_stack); /* returns true if needs to be called again */
+ bool collect(int *ret, RGWCoroutinesStack *skip_stack, uint64_t *stack_id); /* returns true if needs to be called again */
void cancel();
map<uint64_t, set<RGWCoroutinesStack *> > run_contexts;
std::atomic<int64_t> max_io_id = { 0 };
+ std::atomic<uint64_t> max_stack_id = { 0 };
mutable ceph::shared_mutex lock =
ceph::make_shared_mutex("RGWCoroutinesManager::lock");
RGWCoroutinesStack *allocate_stack();
int64_t get_next_io_id();
+ uint64_t get_next_stack_id();
void set_sleeping(RGWCoroutine *cr, bool flag);
void io_complete(RGWCoroutine *cr, const rgw_io_id& io_id);
sync_marker.marker = iter->first;
drain_all_but_stack_cb(lease_stack.get(),
- [&](int ret) {
+ [&](uint64_t stack_id, int ret) {
if (ret < 0) {
tn->log(10, "a sync operation returned error");
}
}
drain_all_but_stack_cb(lease_stack.get(),
- [&](int ret) {
+ [&](uint64_t stack_id, int ret) {
if (ret < 0) {
tn->log(10, "a sync operation returned error");
}
false);
}
drain_with_cb(BUCKET_SYNC_SPAWN_WINDOW,
- [&](int ret) {
+ [&](uint64_t stack_id, int ret) {
if (ret < 0) {
tn->log(10, "a sync operation returned error");
sync_status = ret;
set_status("done iterating over all objects");
/* wait for all operations to complete */
- drain_all_cb([&](int ret) {
+ drain_all_cb([&](uint64_t stack_id, int ret) {
if (ret < 0) {
tn->log(10, "a sync operation returned error");
sync_status = ret;
}
// }
drain_with_cb(BUCKET_SYNC_SPAWN_WINDOW,
- [&](int ret) {
+ [&](uint64_t stack_id, int ret) {
if (ret < 0) {
tn->log(10, "a sync operation returned error");
sync_status = ret;
}
} while (!list_result.empty() && sync_status == 0 && !syncstopped);
- drain_all_cb([&](int ret) {
+ drain_all_cb([&](uint64_t stack_id, int ret) {
if (ret < 0) {
tn->log(10, "a sync operation returned error");
sync_status = ret;
yield_spawn_window(new RGWRunBucketSyncCoroutine(sc, lease_cr, sync_pair, tn,
&*cur_shard_progress),
BUCKET_SYNC_SPAWN_WINDOW,
- [&](int ret) {
+ [&](uint64_t stack_id, int ret) {
if (ret < 0) {
tn->log(10, "a sync operation returned error");
}
});
}
}
- drain_all_cb([&](int ret) {
+ drain_all_cb([&](uint64_t stack_id, int ret) {
if (ret < 0) {
tn->log(10, "a sync operation returned error");
}