rgw_http_param_pair pairs[] = { { "key", key.c_str() },
{ NULL, NULL } };
- int ret = call(new RGWReadRESTResourceCR<bucket_instance_meta_info>(store->ctx(), conn, http_manager, path, pairs, &meta_info));
- if (ret < 0) {
- ldout(store->ctx(), 0) << "ERROR: failed to fetch bucket metadata info from zone=" << source_zone << " path=" << path << " key=" << key << " ret=" << ret << dendl;
- return ret;
- }
+ call(new RGWReadRESTResourceCR<bucket_instance_meta_info>(store->ctx(), conn, http_manager, path, pairs, &meta_info));
}
num_shards = meta_info.data.get_bucket_info().num_shards;
return set_cr_error(-EIO);
}
marker_tracker->reset_need_retry(raw_key);
- ret = call(new RGWRunBucketSyncCoroutine(http_manager, async_rados, conn, store, source_zone, bucket_name, bucket_instance, shard_id));
- if (ret < 0) {
-#warning failed syncing bucket, need to log
- return set_cr_error(sync_status);
- }
+ call(new RGWRunBucketSyncCoroutine(http_manager, async_rados, conn, store, source_zone, bucket_name, bucket_instance, shard_id));
}
} while (marker_tracker->need_retry(raw_key));
sync_status = retcode;
#warning what do do in case of error
if (!entry_marker.empty()) {
- yield {
- /* update marker */
- int ret = call(marker_tracker->finish(entry_marker));
- if (ret < 0) {
- ldout(store->ctx(), 0) << "ERROR: marker_tracker->finish(" << entry_marker << ") returned ret=" << ret << dendl;
- return set_cr_error(sync_status);
- }
- }
+ /* update marker */
+ yield call(marker_tracker->finish(entry_marker));
}
if (sync_status == 0) {
sync_status = retcode;
sync_marker));
total_entries = sync_marker.pos;
do {
- yield return call(new RGWRadosGetOmapKeysCR(store, pool, oid, sync_marker.marker, &entries, max_entries));
+ yield call(new RGWRadosGetOmapKeysCR(store, pool, oid, sync_marker.marker, &entries, max_entries));
if (retcode < 0) {
ldout(store->ctx(), 0) << "ERROR: " << __func__ << "(): RGWRadosGetOmapKeysCR() returned ret=" << retcode << dendl;
return set_cr_error(retcode);
}
}
- yield {
- int ret = call(new RGWReadRemoteDataLogShardInfoCR(store, http_manager, async_rados, conn, shard_id, &shard_info));
- if (ret < 0) {
- ldout(store->ctx(), 0) << "ERROR: failed to call RGWReadRemoteDataLogShardInfoCR() ret=" << ret << dendl;
- return set_cr_error(ret);
- }
- }
+ yield call(new RGWReadRemoteDataLogShardInfoCR(store, http_manager, async_rados, conn, shard_id, &shard_info));
if (retcode < 0) {
ldout(store->ctx(), 0) << "ERROR: failed to fetch remote data log info: ret=" << retcode << dendl;
return set_cr_error(retcode);
int operate() {
reenter(this) {
- int r;
- yield {
- /* read sync status */
- r = call(new RGWReadDataSyncStatusCoroutine(async_rados, store, obj_ctx, source_zone, &sync_status));
- if (r < 0) {
- ldout(store->ctx(), 0) << "ERROR: failed to call RGWReadDataSyncStatusCoroutine r=" << r << dendl;
- return set_cr_error(r);
- }
- }
+ /* read sync status */
+ yield call(new RGWReadDataSyncStatusCoroutine(async_rados, store, obj_ctx, source_zone, &sync_status));
if (retcode == -ENOENT) {
sync_status.sync_info.num_shards = num_shards;
/* state: init status */
if ((rgw_data_sync_info::SyncState)sync_status.sync_info.state == rgw_data_sync_info::StateInit) {
ldout(store->ctx(), 20) << __func__ << "(): init" << dendl;
- yield {
- r = call(new RGWInitDataSyncStatusCoroutine(async_rados, store, http_manager, obj_ctx, source_zone, sync_status.sync_info.num_shards));
- if (r < 0) {
- ldout(store->ctx(), 0) << "ERROR: failed to call RGWReadDataSyncStatusCoroutine r=" << r << dendl;
- return set_cr_error(r);
- }
- }
+ yield call(new RGWInitDataSyncStatusCoroutine(async_rados, store, http_manager, obj_ctx, source_zone, sync_status.sync_info.num_shards));
if (retcode < 0) {
ldout(store->ctx(), 0) << "ERROR: failed to init sync, retcode=" << retcode << dendl;
return set_cr_error(retcode);
sync_status.sync_info.num_shards = num_shards;
sync_status.sync_info.state = rgw_data_sync_info::StateBuildingFullSyncMaps;
/* update new state */
- yield {
- r = call(set_sync_info_cr());
- if (r < 0) {
- ldout(store->ctx(), 0) << "ERROR: failed to write sync status" << dendl;
- return set_cr_error(r);
- }
- }
+ yield call(set_sync_info_cr());
if (retcode < 0) {
ldout(store->ctx(), 0) << "ERROR: failed to write sync status, retcode=" << retcode << dendl;
if ((rgw_data_sync_info::SyncState)sync_status.sync_info.state == rgw_data_sync_info::StateBuildingFullSyncMaps) {
/* state: building full sync maps */
- yield {
- ldout(store->ctx(), 20) << __func__ << "(): building full sync maps" << dendl;
- r = call(new RGWListBucketIndexesCR(store, http_manager, async_rados, conn, source_zone, &sync_status));
- if (r < 0) {
- ldout(store->ctx(), 0) << "ERROR: failed to call RGWListBucketIndexesCR r=" << r << dendl;
- return set_cr_error(r);
- }
- }
+ ldout(store->ctx(), 20) << __func__ << "(): building full sync maps" << dendl;
+ yield call(new RGWListBucketIndexesCR(store, http_manager, async_rados, conn, source_zone, &sync_status));
sync_status.sync_info.state = rgw_data_sync_info::StateSync;
- /* update new state */
- yield {
- r = call(set_sync_info_cr());
- if (r < 0) {
- ldout(store->ctx(), 0) << "ERROR: failed to write sync status" << dendl;
- return set_cr_error(r);
- }
- }
+ /* update new state */
+ yield call(set_sync_info_cr());
if (retcode < 0) {
ldout(store->ctx(), 0) << "ERROR: failed to write sync status, retcode=" << retcode << dendl;
return set_cr_error(retcode);
}
int operate() {
- int ret;
reenter(this) {
yield {
rgw_http_param_pair pairs[] = { { "type" , "bucket-index" },
{ NULL, NULL } };
string p = "/admin/log/";
- ret = call(new RGWReadRESTResourceCR<bucket_index_marker_info>(store->ctx(), conn, http_manager, p, pairs, info));
- if (ret < 0) {
- return set_cr_error(ret);
- }
+ call(new RGWReadRESTResourceCR<bucket_index_marker_info>(store->ctx(), conn, http_manager, p, pairs, info));
}
if (retcode < 0) {
return set_cr_error(retcode);
}
int operate() {
- int ret;
reenter(this) {
yield {
uint32_t lock_duration = 30;
return set_cr_error(retcode);
}
}
- yield {
- call(new RGWSimpleRadosWriteCR<rgw_bucket_shard_sync_info>(async_rados, store, store->get_zone_params().log_pool,
+ yield call(new RGWSimpleRadosWriteCR<rgw_bucket_shard_sync_info>(async_rados, store, store->get_zone_params().log_pool,
sync_status_oid, status));
- }
yield { /* take lock again, we just recreated the object */
uint32_t lock_duration = 30;
call(new RGWSimpleRadosLockCR(async_rados, store, store->get_zone_params().log_pool, sync_status_oid,
}
}
/* fetch current position in logs */
- yield {
- ret = call(new RGWReadRemoteBucketIndexLogInfoCR(store, http_manager, async_rados, conn, bucket_name, bucket_id, shard_id, &info));
- if (ret < 0) {
- ldout(cct, 0) << "ERROR: failed to fetch bucket index status" << dendl;
- return set_cr_error(ret);
- }
- }
+ yield call(new RGWReadRemoteBucketIndexLogInfoCR(store, http_manager, async_rados, conn, bucket_name, bucket_id, shard_id, &info));
if (retcode < 0 && retcode != -ENOENT) {
ldout(cct, 0) << "ERROR: failed to fetch bucket index status" << dendl;
return set_cr_error(retcode);
int RGWReadBucketSyncStatusCoroutine::operate()
{
reenter(this) {
- yield {
- int ret = call(new RGWSimpleRadosReadAttrsCR(async_rados, store, obj_ctx,
+ yield call(new RGWSimpleRadosReadAttrsCR(async_rados, store, obj_ctx,
store->get_zone_params().log_pool,
oid,
&attrs));
- if (ret < 0) {
- ldout(store->ctx(), 0) << "ERROR: failed to call new RGWSimpleRadosReadAttrsCR() ret=" << ret << dendl;
- return set_cr_error(ret);
- }
- }
if (retcode == -ENOENT) {
*status = rgw_bucket_shard_sync_info();
return set_cr_done();
}
int operate() {
- int ret;
reenter(this) {
yield {
rgw_http_param_pair pairs[] = { { "rgwx-bucket-instance", instance_key.c_str() },
{ NULL, NULL } };
string p = string("/") + bucket_name;
- ret = call(new RGWReadRESTResourceCR<bucket_list_result>(store->ctx(), conn, http_manager, p, pairs, result));
- if (ret < 0) {
- return set_cr_error(ret);
- }
+ call(new RGWReadRESTResourceCR<bucket_list_result>(store->ctx(), conn, http_manager, p, pairs, result));
}
if (retcode < 0) {
return set_cr_error(retcode);
}
int operate() {
- int ret;
reenter(this) {
yield {
rgw_http_param_pair pairs[] = { { "bucket-instance", instance_key.c_str() },
{ "type", "bucket-index" },
{ NULL, NULL } };
- ret = call(new RGWReadRESTResourceCR<list<rgw_bi_log_entry> >(store->ctx(), conn, http_manager, "/admin/log", pairs, result));
- if (ret < 0) {
- return set_cr_error(ret);
- }
+ call(new RGWReadRESTResourceCR<list<rgw_bi_log_entry> >(store->ctx(), conn, http_manager, "/admin/log", pairs, result));
}
if (retcode < 0) {
return set_cr_error(retcode);
int operate() {
reenter(this) {
yield {
- int r;
if (op == CLS_RGW_OP_ADD ||
op == CLS_RGW_OP_LINK_OLH) {
if (op == CLS_RGW_OP_ADD && !key.instance.empty() && key.instance != "null") {
}
ldout(store->ctx(), 5) << "bucket sync: sync obj: " << source_zone << "/" << bucket_info->bucket << "/" << key << "[" << versioned_epoch << "]" << dendl;
- r = call(new RGWFetchRemoteObjCR(async_rados, store, source_zone, *bucket_info,
+ call(new RGWFetchRemoteObjCR(async_rados, store, source_zone, *bucket_info,
key, versioned_epoch,
true));
- if (r < 0) {
- ldout(store->ctx(), 0) << "ERROR: failed to call RGWFetchRemoteObjCR()" << dendl;
- return set_cr_error(r);
- }
} else if (op == CLS_RGW_OP_DEL) {
- r = call(new RGWRemoveObjCR(async_rados, store, source_zone, *bucket_info, key, versioned_epoch, ×tamp));
- if (r < 0) {
- ldout(store->ctx(), 0) << "ERROR: failed to call RGWRemoveObjCR()" << dendl;
- return set_cr_error(r);
- }
+ call(new RGWRemoveObjCR(async_rados, store, source_zone, *bucket_info, key, versioned_epoch, ×tamp));
}
}
if (retcode < 0 && retcode != -ENOENT) {
ldout(store->ctx(), 0) << "ERROR: failed to sync object: " << bucket.name << ":" << bucket.bucket_id << ":" << shard_id << "/" << key << dendl;
sync_status = retcode;
}
- yield {
- /* update marker */
- int ret = call(marker_tracker->finish(entry_marker));
- if (ret < 0) {
- ldout(store->ctx(), 0) << "ERROR: marker_tracker->finish(" << entry_marker << ") returned ret=" << ret << dendl;
- return set_cr_error(sync_status);
- }
- }
+ /* update marker */
+ yield call(marker_tracker->finish(entry_marker));
if (sync_status == 0) {
sync_status = retcode;
}
total_entries = full_marker.count;
do {
- yield {
- ldout(store->ctx(), 20) << __func__ << "(): listing bucket for full sync" << dendl;
- int r = call(new RGWListBucketShardCR(store, http_manager, async_rados, conn, bucket_name, bucket_id, shard_id,
- list_marker, &list_result));
- if (r < 0) {
- ldout(store->ctx(), 0) << "ERROR: failed to call new CR (RGWListBucketShardCR)" << dendl;
- return r;
- }
- }
+ ldout(store->ctx(), 20) << __func__ << "(): listing bucket for full sync" << dendl;
+ yield call(new RGWListBucketShardCR(store, http_manager, async_rados, conn, bucket_name, bucket_id, shard_id,
+ list_marker, &list_result));
if (retcode < 0 && retcode != -ENOENT) {
return set_cr_error(retcode);
}
map<string, bufferlist> attrs;
sync_status.encode_state_attr(attrs);
string oid = RGWBucketSyncStatusManager::status_oid(source_zone, bucket_name, bucket_id, shard_id);
- int ret = call(new RGWSimpleRadosWriteAttrsCR(async_rados, store, store->get_zone_params().log_pool,
- oid, attrs));
- if (ret < 0) {
- ldout(store->ctx(), 0) << "ERROR: failed to call RGWSimpleRadosWriteAttrsCR() oid=" << oid << dendl;
- return set_cr_error(ret);
- }
+ call(new RGWSimpleRadosWriteAttrsCR(async_rados, store, store->get_zone_params().log_pool,
+ oid, attrs));
}
if (retcode < 0) {
ldout(store->ctx(), 0) << "ERROR: failed to set sync state on bucket " << bucket_name << ":" << bucket_id << ":" << shard_id
RGWBucketSyncStatusManager::status_oid(source_zone, bucket_name, bucket_id, shard_id),
inc_marker);
do {
- yield {
- ldout(store->ctx(), 20) << __func__ << "(): listing bilog for incremental sync" << dendl;
- int r = call(new RGWListBucketIndexLogCR(store, http_manager, async_rados, conn, bucket_name, bucket_id, shard_id,
- inc_marker.position, &list_result));
- if (r < 0) {
- ldout(store->ctx(), 0) << "ERROR: failed to call new CR (RGWListBucketShardCR)" << dendl;
- return r;
- }
- }
+ ldout(store->ctx(), 20) << __func__ << "(): listing bilog for incremental sync" << dendl;
+ yield call(new RGWListBucketIndexLogCR(store, http_manager, async_rados, conn, bucket_name, bucket_id, shard_id,
+ inc_marker.position, &list_result));
if (retcode < 0 && retcode != -ENOENT) {
/* wait for all operations to complete */
drain_all();
int RGWRunBucketSyncCoroutine::operate()
{
reenter(this) {
- yield {
- int r = call(new RGWReadBucketSyncStatusCoroutine(async_rados, store, source_zone, bucket_name, bucket_id, shard_id, &sync_status));
- if (r < 0) {
- ldout(store->ctx(), 0) << "ERROR: failed to fetch sync status" << dendl;
- return r;
- }
- }
-
+ yield call(new RGWReadBucketSyncStatusCoroutine(async_rados, store, source_zone, bucket_name, bucket_id, shard_id, &sync_status));
if (retcode < 0 && retcode != -ENOENT) {
ldout(store->ctx(), 0) << "ERROR: failed to read sync status for bucket=" << bucket_name << " bucket_id=" << bucket_id << " shard_id=" << shard_id << dendl;
return set_cr_error(retcode);
ldout(store->ctx(), 20) << __func__ << "(): sync status for bucket " << bucket_name << ":" << bucket_id << ":" << shard_id << ": " << sync_status.state << dendl;
- yield {
- int r = call(new RGWGetBucketInstanceInfoCR(async_rados, store, bucket_name, bucket_id, &bucket_info));
- if (r < 0) {
- ldout(store->ctx(), 0) << "ERROR: failed to fetch sync status" << dendl;
- return set_cr_error(r);
- }
- }
-
+ yield call(new RGWGetBucketInstanceInfoCR(async_rados, store, bucket_name, bucket_id, &bucket_info));
if (retcode == -ENOENT) {
/* bucket instance info has not been synced in yet, fetch it now */
yield {
sync_env.init(cct, store, store->rest_master_conn, async_rados, http_manager);
- int r = call(new RGWMetaSyncSingleEntryCR(&sync_env, raw_key,
- string() /* no marker */,
- NULL /* no marker tracker */));
- if (r < 0) {
- ldout(store->ctx(), 0) << "ERROR: failed to fetch bucket instance info for " << raw_key << dendl;
- return set_cr_error(r);
- }
+ call(new RGWMetaSyncSingleEntryCR(&sync_env, raw_key,
+ string() /* no marker */,
+ NULL /* no marker tracker */));
}
-
if (retcode < 0) {
ldout(store->ctx(), 0) << "ERROR: failed to fetch bucket instance info for " << bucket_name << ":" << bucket_id << dendl;
return set_cr_error(retcode);
}
- yield {
- int r = call(new RGWGetBucketInstanceInfoCR(async_rados, store, bucket_name, bucket_id, &bucket_info));
- if (r < 0) {
- ldout(store->ctx(), 0) << "ERROR: failed to fetch sync status" << dendl;
- return set_cr_error(r);
- }
- }
- }
+ yield call(new RGWGetBucketInstanceInfoCR(async_rados, store, bucket_name, bucket_id, &bucket_info));
+ }
if (retcode < 0) {
ldout(store->ctx(), 0) << "ERROR: failed to retrieve bucket info for bucket=" << bucket_name << " bucket_id=" << bucket_id << dendl;
return set_cr_error(retcode);
yield {
if ((rgw_bucket_shard_sync_info::SyncState)sync_status.state == rgw_bucket_shard_sync_info::StateInit) {
- int r = call(new RGWInitBucketShardSyncStatusCoroutine(async_rados, store, http_manager, source_zone,
- conn, bucket_name, bucket_id, shard_id));
- if (r < 0) {
- ldout(store->ctx(), 0) << "ERROR: failed to fetch sync status" << dendl;
- return r;
- }
+ call(new RGWInitBucketShardSyncStatusCoroutine(async_rados, store, http_manager, source_zone,
+ conn, bucket_name, bucket_id, shard_id));
sync_status.state = rgw_bucket_shard_sync_info::StateFullSync;
}
}
}
yield {
if ((rgw_bucket_shard_sync_info::SyncState)sync_status.state == rgw_bucket_shard_sync_info::StateFullSync) {
- int r = call(new RGWBucketShardFullSyncCR(http_manager, async_rados, conn, store,
- source_zone, bucket_name, bucket_id, shard_id,
- &bucket_info, sync_status.full_marker));
- if (r < 0) {
- ldout(store->ctx(), 0) << "ERROR: failed to fetch sync status" << dendl;
- return r;
- }
+ call(new RGWBucketShardFullSyncCR(http_manager, async_rados, conn, store,
+ source_zone, bucket_name, bucket_id, shard_id,
+ &bucket_info, sync_status.full_marker));
sync_status.state = rgw_bucket_shard_sync_info::StateIncrementalSync;
}
}
-
if (retcode < 0) {
ldout(store->ctx(), 0) << "ERROR: full sync on " << bucket_name << " bucket_id=" << bucket_id << " shard_id=" << shard_id << " failed, retcode=" << retcode << dendl;
return set_cr_error(retcode);
yield {
if ((rgw_bucket_shard_sync_info::SyncState)sync_status.state == rgw_bucket_shard_sync_info::StateIncrementalSync) {
- int r = call(new RGWBucketShardIncrementalSyncCR(http_manager, async_rados, conn, store,
- source_zone, bucket_name, bucket_id, shard_id,
- &bucket_info, sync_status.inc_marker));
- if (r < 0) {
- ldout(store->ctx(), 0) << "ERROR: failed to fetch sync status" << dendl;
- return r;
- }
+ call(new RGWBucketShardIncrementalSyncCR(http_manager, async_rados, conn, store,
+ source_zone, bucket_name, bucket_id, shard_id,
+ &bucket_info, sync_status.inc_marker));
}
}
-
if (retcode < 0) {
ldout(store->ctx(), 0) << "ERROR: incremental sync on " << bucket_name << " bucket_id=" << bucket_id << " shard_id=" << shard_id << " failed, retcode=" << retcode << dendl;
return set_cr_error(retcode);
for (map<int, RGWRemoteBucketLog *>::iterator iter = source_logs.begin(); iter != source_logs.end(); ++iter) {
RGWCoroutinesStack *stack = new RGWCoroutinesStack(store->ctx(), &cr_mgr);
RGWRemoteBucketLog *l = iter->second;
- int r = stack->call(l->init_sync_status_cr());
- if (r < 0) {
- ldout(store->ctx(), 0) << "ERROR: failed to init sync status for " << bucket_name << ":" << bucket_id << ":" << iter->first << dendl;
- }
+ stack->call(l->init_sync_status_cr());
stacks.push_back(stack);
}
for (map<int, RGWRemoteBucketLog *>::iterator iter = source_logs.begin(); iter != source_logs.end(); ++iter) {
RGWCoroutinesStack *stack = new RGWCoroutinesStack(store->ctx(), &cr_mgr);
RGWRemoteBucketLog *l = iter->second;
- int r = stack->call(l->read_sync_status_cr(&sync_status[iter->first]));
- if (r < 0) {
- ldout(store->ctx(), 0) << "ERROR: failed to read sync status for " << bucket_name << ":" << bucket_id << ":" << iter->first << dendl;
- }
+ stack->call(l->read_sync_status_cr(&sync_status[iter->first]));
stacks.push_back(stack);
}
for (map<int, RGWRemoteBucketLog *>::iterator iter = source_logs.begin(); iter != source_logs.end(); ++iter) {
RGWCoroutinesStack *stack = new RGWCoroutinesStack(store->ctx(), &cr_mgr);
RGWRemoteBucketLog *l = iter->second;
- int r = stack->call(l->run_sync_cr());
- if (r < 0) {
- ldout(store->ctx(), 0) << "ERROR: failed to read sync status for " << bucket_name << ":" << bucket_id << ":" << iter->first << dendl;
- }
+ stack->call(l->run_sync_cr());
stacks.push_back(stack);
}