RGWDataSyncEnv *sync_env;
boost::intrusive_ptr<const RGWContinuousLeaseCR> lease_cr;
rgw_bucket_sync_pair_info sync_pair;
- rgw_bucket_sync_pipe sync_pipe;
- rgw_bucket_shard_sync_info sync_status;
- RGWMetaSyncEnv meta_sync_env;
- RGWObjVersionTracker objv_tracker;
+ rgw_bucket_sync_pipe& sync_pipe;
ceph::real_time* progress;
const std::string status_oid;
+ rgw_bucket_shard_sync_info sync_status;
+ RGWObjVersionTracker objv_tracker;
RGWSyncTraceNodeRef tn;
RGWSyncBucketShardCR(RGWDataSyncCtx *_sc,
boost::intrusive_ptr<const RGWContinuousLeaseCR> lease_cr,
const rgw_bucket_sync_pair_info& _sync_pair,
- const RGWSyncTraceNodeRef& _tn_parent,
+ rgw_bucket_sync_pipe& sync_pipe,
+ const RGWSyncTraceNodeRef& tn,
ceph::real_time* progress)
: RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env),
- lease_cr(std::move(lease_cr)), sync_pair(_sync_pair), progress(progress),
+ lease_cr(std::move(lease_cr)), sync_pair(_sync_pair), sync_pipe(sync_pipe), progress(progress),
status_oid(RGWBucketPipeSyncStatusManager::inc_status_oid(sc->source_zone, sync_pair)),
- tn(sync_env->sync_tracer->add_node(_tn_parent, "bucket",
- SSTR(bucket_shard_str{_sync_pair.dest_bs} << "<-" << bucket_shard_str{_sync_pair.source_bs} ))) {
+ tn(tn) {
}
int operate(const DoutPrefixProvider *dpp) override;
};
-static RGWCoroutine* sync_bucket_shard_cr(RGWDataSyncCtx* sc,
- boost::intrusive_ptr<const RGWContinuousLeaseCR> lease,
- const rgw_bucket_sync_pair_info& sync_pair,
- const RGWSyncTraceNodeRef& tn,
- ceph::real_time* progress)
-{
- return new RGWSyncBucketShardCR(sc, std::move(lease), sync_pair, tn, progress);
-}
-
int RGWSyncBucketShardCR::operate(const DoutPrefixProvider *dpp)
{
reenter(this) {
tn->log(20, SSTR("sync status for source bucket: " << sync_status.state));
- yield call(new RGWSyncGetBucketInfoCR(sync_env, sync_pair.source_bs.bucket, &sync_pipe.source_bucket_info,
- &sync_pipe.source_bucket_attrs, tn));
- if (retcode < 0) {
- tn->log(0, SSTR("ERROR: failed to retrieve bucket info for bucket=" << bucket_str{sync_pair.source_bs.bucket}));
- drain_all();
- return set_cr_error(retcode);
- }
-
- yield call(new RGWSyncGetBucketInfoCR(sync_env, sync_pair.dest_bs.bucket, &sync_pipe.dest_bucket_info,
- &sync_pipe.dest_bucket_attrs, tn));
- if (retcode < 0) {
- tn->log(0, SSTR("ERROR: failed to retrieve bucket info for bucket=" << bucket_str{sync_pair.source_bs.bucket}));
- drain_all();
- return set_cr_error(retcode);
- }
-
- sync_pipe.info = sync_pair;
-
do {
if (sync_status.state == rgw_bucket_shard_sync_info::StateInit ||
sync_status.state == rgw_bucket_shard_sync_info::StateStopped) {
return 0;
}
+class RGWSyncBucketCR : public RGWCoroutine {
+ RGWDataSyncCtx *sc;
+ RGWDataSyncEnv *env;
+ boost::intrusive_ptr<const RGWContinuousLeaseCR> lease_cr;
+ rgw_bucket_sync_pair_info sync_pair;
+ rgw_bucket_sync_pipe sync_pipe;
+ ceph::real_time* progress;
+
+ const rgw_raw_obj status_obj;
+ rgw_bucket_sync_status bucket_status;
+ RGWObjVersionTracker objv;
+
+ RGWSyncTraceNodeRef tn;
+
+public:
+ RGWSyncBucketCR(RGWDataSyncCtx *_sc,
+ boost::intrusive_ptr<const RGWContinuousLeaseCR> lease_cr,
+ const rgw_bucket_sync_pair_info& _sync_pair,
+ const RGWSyncTraceNodeRef& _tn_parent,
+ ceph::real_time* progress)
+ : RGWCoroutine(_sc->cct), sc(_sc), env(_sc->env),
+ lease_cr(std::move(lease_cr)), sync_pair(_sync_pair), progress(progress),
+ status_obj(env->svc->zone->get_zone_params().log_pool,
+ RGWBucketPipeSyncStatusManager::full_status_oid(sc->source_zone,
+ sync_pair.source_bs.bucket,
+ sync_pair.dest_bs.bucket)),
+ tn(env->sync_tracer->add_node(_tn_parent, "bucket",
+ SSTR(bucket_shard_str{_sync_pair.dest_bs} << "<-" << bucket_shard_str{_sync_pair.source_bs} ))) {
+ }
+
+ int operate(const DoutPrefixProvider *dpp) override;
+};
+
+static RGWCoroutine* sync_bucket_shard_cr(RGWDataSyncCtx* sc,
+ boost::intrusive_ptr<const RGWContinuousLeaseCR> lease,
+ const rgw_bucket_sync_pair_info& sync_pair,
+ const RGWSyncTraceNodeRef& tn,
+ ceph::real_time* progress)
+{
+ return new RGWSyncBucketCR(sc, std::move(lease), sync_pair, tn, progress);
+}
+
+int RGWSyncBucketCR::operate(const DoutPrefixProvider *dpp)
+{
+ reenter(this) {
+ // read source/destination bucket info
+ yield call(new RGWSyncGetBucketInfoCR(env, sync_pair.source_bs.bucket, &sync_pipe.source_bucket_info,
+ &sync_pipe.source_bucket_attrs, tn));
+ if (retcode < 0) {
+ tn->log(0, SSTR("ERROR: failed to retrieve bucket info for bucket=" << bucket_str{sync_pair.source_bs.bucket}));
+ return set_cr_error(retcode);
+ }
+
+ yield call(new RGWSyncGetBucketInfoCR(env, sync_pair.dest_bs.bucket, &sync_pipe.dest_bucket_info,
+ &sync_pipe.dest_bucket_attrs, tn));
+ if (retcode < 0) {
+ tn->log(0, SSTR("ERROR: failed to retrieve bucket info for bucket=" << bucket_str{sync_pair.source_bs.bucket}));
+ return set_cr_error(retcode);
+ }
+
+ sync_pipe.info = sync_pair;
+
+ // read bucket sync status
+ using CR = RGWSimpleRadosReadCR<rgw_bucket_sync_status>;
+ yield call(new CR(dpp, env->async_rados, env->svc->sysobj,
+ status_obj, &bucket_status, true, &objv));
+ if (retcode < 0) {
+ return set_cr_error(retcode);
+ }
+
+ if (bucket_status.state == BucketSyncState::Init) {
+ // init sync status
+ yield {
+ // use exclusive create if it didn't exist. if we lose the race to
+ // create it, we'll fail with EEXIST and RGWSyncBucketCR() will come
+ // back later and read the new status
+ const bool exclusive = objv.version_for_check() == nullptr;
+ int num_shards = sync_pipe.dest_bucket_info.layout.current_index.layout.normal.num_shards;
+ call(new InitBucketFullSyncStatusCR(sc, sync_pair, status_obj,
+ bucket_status, objv, num_shards,
+ exclusive));
+ }
+ if (retcode < 0) {
+ return set_cr_error(retcode);
+ }
+ }
+
+ if (bucket_status.state == BucketSyncState::Full) {
+ // TODO: full sync
+ }
+
+ if (bucket_status.state == BucketSyncState::Incremental) {
+ yield call(new RGWSyncBucketShardCR(sc, lease_cr, sync_pair,
+ sync_pipe, tn, progress));
+ if (retcode < 0) {
+ return set_cr_error(retcode);
+ }
+ }
+ return set_cr_done();
+ }
+
+ return 0;
+}
+
RGWCoroutine *RGWRemoteBucketManager::run_sync_cr(int num)
{
if ((size_t)num >= sync_pairs.size()) {