}
-bool RGWCoroutinesStack::collect(RGWCoroutine *op, int *ret) /* returns true if needs to be called again */
+bool RGWCoroutinesStack::collect(RGWCoroutine *op, int *ret, RGWCoroutinesStack *skip_stack) /* returns true if needs to be called again */
{
rgw_spawned_stacks *s = (op ? &op->spawned : &spawned);
*ret = 0;
for (vector<RGWCoroutinesStack *>::iterator iter = s->entries.begin(); iter != s->entries.end(); ++iter) {
RGWCoroutinesStack *stack = *iter;
- if (!stack->is_done()) {
+ if (stack == skip_stack || !stack->is_done()) {
new_list.push_back(stack);
ldout(cct, 20) << "collect(): s=" << (void *)this << " stack=" << (void *)stack << " is still running" << dendl;
continue;
return false;
}
-bool RGWCoroutinesStack::collect(int *ret) /* returns true if needs to be called again */
+bool RGWCoroutinesStack::collect(int *ret, RGWCoroutinesStack *skip_stack) /* returns true if needs to be called again */
{
- return collect(NULL, ret);
+ return collect(NULL, ret, skip_stack);
}
static void _aio_completion_notifier_cb(librados::completion_t cb, void *arg);
return stack->spawn(this, op, wait);
}
-bool RGWCoroutine::collect(int *ret) /* returns true if needs to be called again */
+bool RGWCoroutine::collect(int *ret, RGWCoroutinesStack *skip_stack) /* returns true if needs to be called again */
{
- return stack->collect(this, ret);
+ return stack->collect(this, ret, skip_stack);
}
bool RGWCoroutine::collect_next(int *ret, RGWCoroutinesStack **collected_stack) /* returns true if found a stack to collect */
return out;
}
-bool RGWCoroutine::drain_children(int num_cr_left)
+bool RGWCoroutine::drain_children(int num_cr_left, RGWCoroutinesStack *skip_stack)
{
bool done = false;
+ assert(num_cr_left >= 0);
+ if (num_cr_left == 0 && skip_stack) {
+ num_cr_left = 1;
+ }
reenter(&drain_cr) {
while (num_spawned() > (size_t)num_cr_left) {
yield wait_for_child();
int ret;
- while (collect(&ret)) {
+ while (collect(&ret, skip_stack)) {
if (ret < 0) {
ldout(cct, 10) << "collect() returned ret=" << ret << dendl;
/* we should have reported this error */
void call(RGWCoroutine *op); /* call at the same stack we're in */
RGWCoroutinesStack *spawn(RGWCoroutine *op, bool wait); /* execute on a different stack */
- bool collect(int *ret); /* returns true if needs to be called again */
+ bool collect(int *ret, RGWCoroutinesStack *skip_stack); /* returns true if needs to be called again */
bool collect_next(int *ret, RGWCoroutinesStack **collected_stack = NULL); /* returns true if found a stack to collect */
int wait(const utime_t& interval);
- bool drain_children(int num_cr_left); /* returns true if needed to be called again */
+ bool drain_children(int num_cr_left, RGWCoroutinesStack *skip_stack = NULL); /* returns true if needed to be called again */
void wakeup();
void set_sleeping(bool flag); /* put in sleep, or wakeup from sleep */
drain_cr = boost::asio::coroutine(); \
yield_until_true(drain_children(n))
+#define drain_all_but_stack(stack) \
+ drain_cr = boost::asio::coroutine(); \
+ yield_until_true(drain_children(1, stack))
+
template <class T>
class RGWConsumerCR : public RGWCoroutine {
list<T> product;
RGWCoroutinesStack *parent;
RGWCoroutinesStack *spawn(RGWCoroutine *source_op, RGWCoroutine *next_op, bool wait);
- bool collect(RGWCoroutine *op, int *ret); /* returns true if needs to be called again */
+ bool collect(RGWCoroutine *op, int *ret, RGWCoroutinesStack *skip_stack); /* returns true if needs to be called again */
bool collect_next(RGWCoroutine *op, int *ret, RGWCoroutinesStack **collected_stack); /* returns true if found a stack to collect */
public:
RGWCoroutinesStack(CephContext *_cct, RGWCoroutinesManager *_ops_mgr, RGWCoroutine *start = NULL);
int wait(const utime_t& interval);
void wakeup();
- bool collect(int *ret); /* returns true if needs to be called again */
+ bool collect(int *ret, RGWCoroutinesStack *skip_stack); /* returns true if needs to be called again */
RGWAioCompletionNotifier *create_completion_notifier();
RGWCompletionManager *get_completion_mgr();
spawn(new RGWReadRemoteDataLogShardInfoCR(sync_env, i, &shards_info[i]), true);
}
}
- while (collect(&ret)) {
+ while (collect(&ret, NULL)) {
if (ret < 0) {
return set_state(RGWCoroutine_Error);
}
call(new RGWSimpleRadosUnlockCR(sync_env->async_rados, store, store->get_zone_params().log_pool, sync_status_oid,
lock_name, cookie));
}
- while (collect(&ret)) {
+ while (collect(&ret, NULL)) {
if (ret < 0) {
return set_state(RGWCoroutine_Error);
}
yield call(sync_env->error_logger->log_error_cr(sync_env->conn->get_remote_id(), "data.init", "",
EIO, string("failed to build bucket instances map")));
}
- while (collect(&ret)) {
+ while (collect(&ret, NULL)) {
if (ret < 0) {
yield call(sync_env->error_logger->log_error_cr(sync_env->conn->get_remote_id(), "data.init", "",
-ret, string("failed to store sync status: ") + cpp_strerror(-ret)));
set<string> spawned_keys;
RGWContinuousLeaseCR *lease_cr;
+ RGWCoroutinesStack *lease_stack;
string status_oid;
sync_marker(_marker),
marker_tracker(NULL), truncated(false), inc_lock("RGWDataSyncShardCR::inc_lock"),
total_entries(0), spawn_window(BUCKET_SHARD_SYNC_SPAWN_WINDOW), reset_backoff(NULL),
- lease_cr(NULL), error_repo(nullptr), max_error_entries(DATA_SYNC_MAX_ERR_ENTRIES),
+ lease_cr(nullptr), lease_stack(nullptr), error_repo(nullptr), max_error_entries(DATA_SYNC_MAX_ERR_ENTRIES),
retry_backoff_secs(RETRY_BACKOFF_SECS_DEFAULT) {
set_description() << "data sync shard source_zone=" << sync_env->source_zone << " shard_id=" << shard_id;
status_oid = RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, shard_id);
lease_cr = new RGWContinuousLeaseCR(sync_env->async_rados, store, store->get_zone_params().log_pool, status_oid,
lock_name, lock_duration, this);
lease_cr->get();
- spawn(lease_cr, false);
+ lease_stack = spawn(lease_cr, false);
}
int full_sync() {
set_status() << "num_spawned() > spawn_window";
yield wait_for_child();
int ret;
- while (collect(&ret)) {
+ while (collect(&ret, lease_stack)) {
if (ret < 0) {
ldout(sync_env->cct, 0) << "ERROR: a sync operation returned error" << dendl;
/* we have reported this error */
int total_entries;
RGWContinuousLeaseCR *lease_cr;
+ RGWCoroutinesStack *lease_stack;
string status_oid;
full_marker(_full_marker), marker_tracker(NULL),
spawn_window(BUCKET_SYNC_SPAWN_WINDOW), entry(NULL),
op(CLS_RGW_OP_ADD),
- total_entries(0), lease_cr(NULL) {
+ total_entries(0), lease_cr(nullptr), lease_stack(nullptr) {
status_oid = RGWBucketSyncStatusManager::status_oid(sync_env->source_zone, bs);
logger.init(sync_env, "BucketFull", bs.get_key());
}
lease_cr = new RGWContinuousLeaseCR(sync_env->async_rados, store, store->get_zone_params().log_pool, status_oid,
lock_name, lock_duration, this);
lease_cr->get();
- spawn(lease_cr, false);
+ lease_stack = spawn(lease_cr, false);
}
while (!lease_cr->is_locked()) {
if (lease_cr->is_done()) {
}
while ((int)num_spawned() > spawn_window) {
yield wait_for_child();
- while (collect(&ret)) {
+ while (collect(&ret, lease_stack)) {
if (ret < 0) {
ldout(sync_env->cct, 0) << "ERROR: a sync operation returned error" << dendl;
/* we have reported this error */
} while (list_result.is_truncated);
set_status("done iterating over all objects");
/* wait for all operations to complete */
- drain_all_but(1); /* still need to hold lease cr */
+ drain_all_but_stack(lease_stack); /* still need to hold lease cr */
/* update sync state to incremental */
yield {
rgw_bucket_shard_sync_info sync_status;
const int spawn_window{BUCKET_SYNC_SPAWN_WINDOW};
bool updated_status{false};
RGWContinuousLeaseCR *lease_cr{nullptr};
+ RGWCoroutinesStack *lease_stack{nullptr};
const string status_oid;
string name;
lease_cr = new RGWContinuousLeaseCR(sync_env->async_rados, store, store->get_zone_params().log_pool, status_oid,
lock_name, lock_duration, this);
lease_cr->get();
- spawn(lease_cr, false);
+ lease_stack = spawn(lease_cr, false);
}
while (!lease_cr->is_locked()) {
if (lease_cr->is_done()) {
&list_result));
if (retcode < 0 && retcode != -ENOENT) {
/* wait for all operations to complete */
- drain_all_but(1);
+ drain_all_but_stack(lease_stack);
lease_cr->go_down();
drain_all();
return set_cr_error(retcode);
}
ldout(sync_env->cct, 5) << *this << ": [inc sync] can't do op on key=" << key << " need to wait for conflicting operation to complete" << dendl;
yield wait_for_child();
- while (collect(&ret)) {
+ while (collect(&ret, lease_stack)) {
if (ret < 0) {
ldout(sync_env->cct, 0) << "ERROR: a child operation returned error (ret=" << ret << ")" << dendl;
/* we have reported this error */
while ((int)num_spawned() > spawn_window) {
set_status() << "num_spawned() > spawn_window";
yield wait_for_child();
- while (collect(&ret)) {
+ while (collect(&ret, lease_stack)) {
if (ret < 0) {
ldout(sync_env->cct, 0) << "ERROR: a sync operation returned error" << dendl;
/* we have reported this error */
}
if (retcode < 0) {
ldout(sync_env->cct, 0) << "ERROR: marker_tracker->flush() returned retcode=" << retcode << dendl;
- drain_all_but(1);
+ drain_all_but_stack(lease_stack);
lease_cr->go_down();
drain_all();
return set_cr_error(retcode);
}
- drain_all_but(1);
+ drain_all_but_stack(lease_stack);
lease_cr->go_down();
/* wait for all operations to complete */
drain_all();
rgw_meta_sync_info status;
vector<RGWMetadataLogInfo> shards_info;
RGWContinuousLeaseCR *lease_cr;
+ RGWCoroutinesStack *lease_stack;
public:
RGWInitSyncStatusCoroutine(RGWMetaSyncEnv *_sync_env,
const rgw_meta_sync_info &status)
: RGWCoroutine(_sync_env->store->ctx()), sync_env(_sync_env),
status(status), shards_info(status.num_shards),
- lease_cr(NULL) {}
+ lease_cr(nullptr), lease_stack(nullptr) {}
~RGWInitSyncStatusCoroutine() {
if (lease_cr) {
lease_cr = new RGWContinuousLeaseCR(sync_env->async_rados, store, store->get_zone_params().log_pool, sync_env->status_oid(),
lock_name, lock_duration, this);
lease_cr->get();
- spawn(lease_cr, false);
+ lease_stack = spawn(lease_cr, false);
}
while (!lease_cr->is_locked()) {
if (lease_cr->is_done()) {
}
}
- drain_all_but(1); /* the lease cr still needs to run */
+ drain_all_but_stack(lease_stack); /* the lease cr still needs to run */
yield {
set_status("updating sync status");
}
set_status("drop lock lease");
yield lease_cr->go_down();
- while (collect(&ret)) {
+ while (collect(&ret, NULL)) {
if (ret < 0) {
return set_cr_error(ret);
}
RGWShardedOmapCRManager *entries_index;
RGWContinuousLeaseCR *lease_cr;
+ RGWCoroutinesStack *lease_stack;
bool lost_lock;
bool failed;
RGWFetchAllMetaCR(RGWMetaSyncEnv *_sync_env, int _num_shards,
map<uint32_t, rgw_meta_sync_marker>& _markers) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
num_shards(_num_shards),
- ret_status(0), entries_index(NULL), lease_cr(NULL), lost_lock(false), failed(false), markers(_markers) {
+ ret_status(0), entries_index(NULL), lease_cr(nullptr), lease_stack(nullptr),
+ lost_lock(false), failed(false), markers(_markers) {
}
~RGWFetchAllMetaCR() {
lease_cr = new RGWContinuousLeaseCR(sync_env->async_rados, sync_env->store, sync_env->store->get_zone_params().log_pool, sync_env->status_oid(),
lock_name, lock_duration, this);
lease_cr->get();
- spawn(lease_cr, false);
+ lease_stack = spawn(lease_cr, false);
}
while (!lease_cr->is_locked()) {
if (lease_cr->is_done()) {
}
}
- drain_all_but(1); /* the lease cr still needs to run */
+ drain_all_but_stack(lease_stack); /* the lease cr still needs to run */
yield lease_cr->go_down();
int ret;
- while (collect(&ret)) {
+ while (collect(&ret, NULL)) {
if (ret < 0) {
return set_cr_error(ret);
}
boost::asio::coroutine full_cr;
RGWContinuousLeaseCR *lease_cr = nullptr;
+ RGWCoroutinesStack *lease_stack = nullptr;
bool lost_lock = false;
bool *reset_backoff;
sync_env->shard_obj_name(shard_id),
lock_name, lock_duration, this);
lease_cr->get();
- spawn(lease_cr, false);
+ lease_stack = spawn(lease_cr, false);
lost_lock = false;
}
while (!lease_cr->is_locked()) {
sync_env->shard_obj_name(shard_id),
lock_name, lock_duration, this);
lease_cr->get();
- spawn(lease_cr, false);
+ lease_stack = spawn(lease_cr, false);
lost_lock = false;
}
while (!lease_cr->is_locked()) {
}
}
// wait for each shard to complete
- collect(&ret);
+ collect(&ret, NULL);
drain_all();
{
// drop shard cr refs under lock