rgw_bucket_shard_sync_info& status;
RGWObjVersionTracker& objv_tracker;
rgw_bucket_index_marker_info info;
+ bool exclusive;
public:
RGWInitBucketShardSyncStatusCoroutine(RGWDataSyncCtx *_sc,
const rgw_bucket_sync_pair_info& _sync_pair,
rgw_bucket_shard_sync_info& _status,
- RGWObjVersionTracker& objv_tracker)
+ RGWObjVersionTracker& objv_tracker,
+ bool exclusive)
: RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env),
sync_pair(_sync_pair),
sync_status_oid(RGWBucketPipeSyncStatusManager::inc_status_oid(sc->source_zone, _sync_pair)),
- status(_status), objv_tracker(objv_tracker)
+ status(_status), objv_tracker(objv_tracker), exclusive(exclusive)
{}
int operate(const DoutPrefixProvider *dpp) override {
if (write_status) {
map<string, bufferlist> attrs;
status.encode_all_attrs(attrs);
- call(new RGWSimpleRadosWriteAttrsCR(dpp, sync_env->async_rados, sync_env->svc->sysobj, obj, attrs, &objv_tracker));
+ call(new RGWSimpleRadosWriteAttrsCR(dpp, sync_env->async_rados, sync_env->svc->sysobj,
+ obj, attrs, &objv_tracker, exclusive));
} else {
call(new RGWRadosRemoveCR(store, obj, &objv_tracker));
}
const rgw_zone_id& _source_zone,
RGWRESTConn *_conn,
const RGWBucketInfo& source_bucket_info,
- const rgw_bucket& dest_bucket) : dpp(_dpp), sync_env(_sync_env)
+ const rgw_bucket& dest_bucket)
+ : dpp(_dpp), sync_env(_sync_env), conn(_conn), source_zone(_source_zone),
+ full_status_obj(sync_env->svc->zone->get_zone_params().log_pool,
+ RGWBucketPipeSyncStatusManager::full_status_oid(source_zone,
+ source_bucket_info.bucket,
+ dest_bucket))
{
- conn = _conn;
- source_zone = _source_zone;
-
int num_shards = (source_bucket_info.layout.current_index.layout.normal.num_shards <= 0 ?
1 : source_bucket_info.layout.current_index.layout.normal.num_shards);
sc.init(sync_env, conn, source_zone);
}
-RGWCoroutine *RGWRemoteBucketManager::init_sync_status_cr(int num, RGWObjVersionTracker& objv_tracker)
-{
- if ((size_t)num >= sync_pairs.size()) {
- return nullptr;
- }
- return new RGWInitBucketShardSyncStatusCoroutine(&sc, sync_pairs[num], init_status, objv_tracker);
-}
-
#define BUCKET_SYNC_ATTR_PREFIX RGW_ATTR_PREFIX "bucket-sync."
template <class T>
return 0;
}
+// wrap ReadSyncStatus and set a flag if it's not in incremental
+class CheckBucketShardStatusIsIncremental : public RGWReadBucketPipeSyncStatusCoroutine {
+ bool* result;
+ rgw_bucket_shard_sync_info status;
+ public:
+ CheckBucketShardStatusIsIncremental(RGWDataSyncCtx* sc,
+ const rgw_bucket_sync_pair_info& sync_pair,
+ bool* result)
+ : RGWReadBucketPipeSyncStatusCoroutine(sc, sync_pair, &status, nullptr),
+ result(result)
+ {}
+
+ int operate(const DoutPrefixProvider *dpp) override {
+ int r = RGWReadBucketPipeSyncStatusCoroutine::operate(dpp);
+ if (state == RGWCoroutine_Done &&
+ status.state != rgw_bucket_shard_sync_info::StateIncrementalSync) {
+ *result = false;
+ }
+ return r;
+ }
+};
+
+class CheckAllBucketShardStatusIsIncremental : public RGWShardCollectCR {
+ // start with 1 shard, and only spawn more if we detect an existing shard.
+ // this makes the backward compatilibility check far less expensive in the
+ // general case where no shards exist
+ static constexpr int initial_concurrent_shards = 1;
+ static constexpr int max_concurrent_shards = 16;
+
+ RGWDataSyncCtx* sc;
+ rgw_bucket_sync_pair_info sync_pair;
+ const int num_shards;
+ bool* result;
+ int shard = 0;
+ public:
+ CheckAllBucketShardStatusIsIncremental(RGWDataSyncCtx* sc,
+ const rgw_bucket_sync_pair_info& sync_pair,
+ int num_shards, bool* result)
+ : RGWShardCollectCR(sc->cct, initial_concurrent_shards),
+ sc(sc), sync_pair(sync_pair), num_shards(num_shards), result(result)
+ {}
+
+ bool spawn_next() override {
+ // stop spawning if we saw any errors or non-incremental shards
+ if (shard >= num_shards || status < 0 || !*result) {
+ return false;
+ }
+ sync_pair.dest_bs.shard_id = shard++;
+ spawn(new CheckBucketShardStatusIsIncremental(sc, sync_pair, result), false);
+ return true;
+ }
+
+ private:
+ int handle_result(int r) override {
+ if (r < 0) {
+ ldout(cct, 4) << "failed to read bucket shard status: "
+ << cpp_strerror(r) << dendl;
+ } else if (shard == 0) {
+ // enable concurrency once the first shard succeeds
+ max_concurrent = max_concurrent_shards;
+ }
+ return r;
+ }
+};
+
+// wrap InitBucketShardSyncStatus with local storage for 'status' and 'objv'
+// and a loop to retry on racing writes
+class InitBucketShardStatusCR : public RGWCoroutine {
+ RGWDataSyncCtx* sc;
+ const rgw_bucket_sync_pair_info& pair;
+ rgw_bucket_shard_sync_info status;
+ RGWObjVersionTracker objv;
+ int tries = 10; // retry on racing writes
+ bool exclusive = true; // first try is exclusive
+ using ReadCR = RGWReadBucketPipeSyncStatusCoroutine;
+ using InitCR = RGWInitBucketShardSyncStatusCoroutine;
+ public:
+ InitBucketShardStatusCR(RGWDataSyncCtx* sc, const rgw_bucket_sync_pair_info& pair)
+ : RGWCoroutine(sc->cct), sc(sc), pair(pair)
+ {}
+ int operate(const DoutPrefixProvider *dpp) {
+ reenter(this) {
+ // try exclusive create with empty status
+ objv.generate_new_write_ver(cct);
+ yield call(new InitCR(sc, pair, status, objv, exclusive));
+ if (retcode >= 0) {
+ return set_cr_done();
+ } else if (retcode != -EEXIST) {
+ return set_cr_error(retcode);
+ }
+
+ exclusive = false;
+ // retry loop to reinitialize
+ while (--tries) {
+ objv.clear();
+ // read current status and objv
+ yield call(new ReadCR(sc, pair, &status, &objv));
+ if (retcode < 0) {
+ return set_cr_error(retcode);
+ }
+ yield call(new InitCR(sc, pair, status, objv, exclusive));
+ if (retcode >= 0) {
+ return set_cr_done();
+ } else if (retcode != -ECANCELED) {
+ return set_cr_error(retcode);
+ }
+ }
+ return set_cr_error(retcode);
+ }
+ return 0;
+ }
+};
+
+class InitBucketShardStatusCollectCR : public RGWShardCollectCR {
+ static constexpr int max_concurrent_shards = 16;
+ RGWDataSyncCtx* sc;
+ rgw_bucket_sync_pair_info sync_pair;
+ const int num_shards;
+ int shard = 0;
+
+ int handle_result(int r) override {
+ if (r < 0) {
+ ldout(cct, 4) << "failed to init bucket shard status: "
+ << cpp_strerror(r) << dendl;
+ }
+ return r;
+ }
+ public:
+ InitBucketShardStatusCollectCR(RGWDataSyncCtx* sc,
+ const rgw_bucket_sync_pair_info& sync_pair,
+ int num_shards)
+ : RGWShardCollectCR(sc->cct, max_concurrent_shards),
+ sc(sc), sync_pair(sync_pair), num_shards(num_shards)
+ {}
+
+ bool spawn_next() override {
+ if (shard >= num_shards || status < 0) { // stop spawning on any errors
+ return false;
+ }
+ sync_pair.dest_bs.shard_id = shard++;
+ spawn(new InitBucketShardStatusCR(sc, sync_pair), false);
+ return true;
+ }
+};
+
+class InitBucketFullSyncStatusCR : public RGWCoroutine {
+ RGWDataSyncCtx *sc;
+ RGWDataSyncEnv *sync_env;
+
+ const rgw_bucket_sync_pair_info& sync_pair;
+ const rgw_raw_obj& status_obj;
+ rgw_bucket_sync_status& status;
+ RGWObjVersionTracker& objv;
+ const int num_shards;
+ const bool check_compat;
+
+ bool all_incremental = true;
+public:
+ InitBucketFullSyncStatusCR(RGWDataSyncCtx* sc,
+ const rgw_bucket_sync_pair_info& sync_pair,
+ const rgw_raw_obj& status_obj,
+ rgw_bucket_sync_status& status,
+ RGWObjVersionTracker& objv,
+ int num_shards, bool check_compat)
+ : RGWCoroutine(sc->cct), sc(sc), sync_env(sc->env),
+ sync_pair(sync_pair), status_obj(status_obj),
+ status(status), objv(objv), num_shards(num_shards),
+ check_compat(check_compat)
+ {}
+
+ int operate(const DoutPrefixProvider *dpp) override {
+ reenter(this) {
+ status.state = BucketSyncState::Init;
+
+ if (check_compat) {
+ // try to convert existing per-shard incremental status for backward compatibility
+ yield call(new CheckAllBucketShardStatusIsIncremental(sc, sync_pair, num_shards, &all_incremental));
+ if (retcode < 0) {
+ return set_cr_error(retcode);
+ }
+ if (all_incremental) {
+ // we can use existing status and resume incremental sync
+ status.state = BucketSyncState::Incremental;
+ }
+ }
+
+ if (status.state != BucketSyncState::Incremental) {
+ // initialize all shard sync status. this will populate the log marker
+ // positions where incremental sync will resume after full sync
+ yield call(new InitBucketShardStatusCollectCR(sc, sync_pair, num_shards));
+ if (retcode < 0) {
+ ldout(cct, 20) << "failed to init bucket shard status: "
+ << cpp_strerror(retcode) << dendl;
+ return set_cr_error(retcode);
+ }
+
+ if (sync_env->sync_module->should_full_sync()) {
+ status.state = BucketSyncState::Full;
+ } else {
+ status.state = BucketSyncState::Incremental;
+ }
+ }
+
+ ldout(cct, 20) << "writing bucket sync state=" << status.state << dendl;
+
+ // write bucket sync status
+ using CR = RGWSimpleRadosWriteCR<rgw_bucket_sync_status>;
+ yield call(new CR(dpp, sync_env->async_rados, sync_env->svc->sysobj,
+ status_obj, status, &objv, false));
+ if (retcode < 0) {
+ ldout(cct, 20) << "failed to write bucket shard status: "
+ << cpp_strerror(retcode) << dendl;
+ return set_cr_error(retcode);
+ }
+ return set_cr_done();
+ }
+ return 0;
+ }
+};
+
+RGWCoroutine *RGWRemoteBucketManager::init_sync_status_cr(RGWObjVersionTracker& objv)
+{
+ constexpr bool check_compat = false;
+ const int num_shards = num_pipes();
+ return new InitBucketFullSyncStatusCR(&sc, sync_pairs[0], full_status_obj,
+ full_status, objv, num_shards, check_compat);
+}
+
#define OMAP_READ_MAX_ENTRIES 10
class RGWReadRecoveringBucketShardsCoroutine : public RGWCoroutine {
RGWDataSyncCtx *sc;
do {
if (sync_status.state == rgw_bucket_shard_sync_info::StateInit ||
sync_status.state == rgw_bucket_shard_sync_info::StateStopped) {
- yield call(new RGWInitBucketShardSyncStatusCoroutine(sc, sync_pair, sync_status, objv_tracker));
+ yield call(new RGWInitBucketShardSyncStatusCoroutine(sc, sync_pair, sync_status, objv_tracker, false));
if (retcode == -ENOENT) {
tn->log(0, "bucket sync disabled");
drain_all();
for (auto& mgr : source_mgrs) {
RGWCoroutinesStack *stack = new RGWCoroutinesStack(store->ctx(), &cr_mgr);
-
- for (int i = 0; i < mgr->num_pipes(); ++i) {
- objvs.emplace_back();
- stack->call(mgr->init_sync_status_cr(i, objvs.back()));
- }
+ objvs.emplace_back();
+ stack->call(mgr->init_sync_status_cr(objvs.back()));
stacks.push_back(stack);
}