}
};
-class RGWListBucketShardCR: public RGWCoroutine {
+class RGWListRemoteBucketCR: public RGWCoroutine {
RGWDataSyncCtx *sc;
RGWDataSyncEnv *sync_env;
const rgw_bucket_shard& bs;
- const string instance_key;
rgw_obj_key marker_position;
bucket_list_result *result;
public:
- RGWListBucketShardCR(RGWDataSyncCtx *_sc, const rgw_bucket_shard& bs,
- rgw_obj_key& _marker_position, bucket_list_result *_result)
+ RGWListRemoteBucketCR(RGWDataSyncCtx *_sc, const rgw_bucket_shard& bs,
+ rgw_obj_key& _marker_position, bucket_list_result *_result)
: RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env), bs(bs),
- instance_key(bs.get_key()), marker_position(_marker_position),
- result(_result) {}
+ marker_position(_marker_position), result(_result) {}
int operate(const DoutPrefixProvider *dpp) override {
reenter(this) {
yield {
- rgw_http_param_pair pairs[] = { { "rgwx-bucket-instance", instance_key.c_str() },
- { "versions" , NULL },
+ rgw_http_param_pair pairs[] = { { "versions" , NULL },
{ "format" , "json" },
{ "objs-container" , "true" },
{ "key-marker" , marker_position.name.c_str() },
{ "version-id-marker" , marker_position.instance.c_str() },
{ NULL, NULL } };
- // don't include tenant in the url, it's already part of instance_key
- string p = string("/") + bs.bucket.name;
+ string p = string("/") + bs.bucket.get_key(':', 0);
call(new RGWReadRESTResourceCR<bucket_list_result>(sync_env->cct, sc->conn, sync_env->http_manager, p, pairs, result));
}
if (retcode < 0) {
#define BUCKET_SYNC_UPDATE_MARKER_WINDOW 10
-class RGWBucketFullSyncShardMarkerTrack : public RGWSyncShardMarkerTrack<rgw_obj_key, rgw_obj_key> {
+class RGWBucketFullSyncMarkerTrack : public RGWSyncShardMarkerTrack<rgw_obj_key, rgw_obj_key> {
RGWDataSyncCtx *sc;
RGWDataSyncEnv *sync_env;
- string marker_oid;
- rgw_bucket_shard_full_sync_marker sync_marker;
+ const rgw_raw_obj& status_obj;
+ rgw_bucket_sync_status& sync_status;
RGWSyncTraceNodeRef tn;
RGWObjVersionTracker& objv_tracker;
public:
- RGWBucketFullSyncShardMarkerTrack(RGWDataSyncCtx *_sc,
- const string& _marker_oid,
- const rgw_bucket_shard_full_sync_marker& _marker,
- RGWSyncTraceNodeRef tn,
- RGWObjVersionTracker& objv_tracker)
+ RGWBucketFullSyncMarkerTrack(RGWDataSyncCtx *_sc,
+ const rgw_raw_obj& status_obj,
+ rgw_bucket_sync_status& sync_status,
+ RGWSyncTraceNodeRef tn,
+ RGWObjVersionTracker& objv_tracker)
: RGWSyncShardMarkerTrack(BUCKET_SYNC_UPDATE_MARKER_WINDOW),
- sc(_sc), sync_env(_sc->env), marker_oid(_marker_oid),
- sync_marker(_marker), tn(std::move(tn)), objv_tracker(objv_tracker)
+ sc(_sc), sync_env(_sc->env), status_obj(status_obj),
+ sync_status(sync_status), tn(std::move(tn)), objv_tracker(objv_tracker)
{}
- RGWCoroutine* store_marker(const rgw_obj_key& new_marker, uint64_t index_pos, const real_time& timestamp) override {
- sync_marker.position = new_marker;
- sync_marker.count = index_pos;
- map<string, bufferlist> attrs;
- sync_marker.encode_attr(attrs);
+ RGWCoroutine *store_marker(const rgw_obj_key& new_marker, uint64_t index_pos, const real_time& timestamp) override {
+ sync_status.full.position = new_marker;
+ sync_status.full.count = index_pos;
- tn->log(20, SSTR("updating marker marker_oid=" << marker_oid << " marker=" << new_marker));
- return new RGWSimpleRadosWriteAttrsCR(sync_env->dpp, sync_env->async_rados, sync_env->svc->sysobj,
- rgw_raw_obj(sync_env->svc->zone->get_zone_params().log_pool, marker_oid),
- attrs, &objv_tracker);
+ tn->log(20, SSTR("updating marker oid=" << status_obj.oid << " marker=" << new_marker));
+ return new RGWSimpleRadosWriteCR<rgw_bucket_sync_status>(
+ sync_env->dpp, sync_env->async_rados, sync_env->svc->sysobj,
+ status_obj, sync_status, &objv_tracker);
}
RGWOrderCallCR *allocate_order_control_cr() override {
#define BUCKET_SYNC_SPAWN_WINDOW 20
-class RGWBucketShardFullSyncCR : public RGWCoroutine {
+class RGWBucketFullSyncCR : public RGWCoroutine {
RGWDataSyncCtx *sc;
RGWDataSyncEnv *sync_env;
rgw_bucket_sync_pipe& sync_pipe;
+ rgw_bucket_sync_status& sync_status;
rgw_bucket_shard& bs;
boost::intrusive_ptr<const RGWContinuousLeaseCR> lease_cr;
bucket_list_result list_result;
list<bucket_list_entry>::iterator entries_iter;
- rgw_bucket_shard_sync_info& sync_info;
rgw_obj_key list_marker;
bucket_list_entry *entry{nullptr};
int total_entries{0};
- int sync_status{0};
+ int sync_result{0};
- const string& status_oid;
+ const rgw_raw_obj& status_obj;
+ RGWObjVersionTracker& objv;
rgw_zone_set zones_trace;
RGWSyncTraceNodeRef tn;
- RGWBucketFullSyncShardMarkerTrack marker_tracker;
+ RGWBucketFullSyncMarkerTrack marker_tracker;
struct _prefix_handler {
RGWBucketSyncFlowManager::pipe_rules_ref rules;
} prefix_handler;
public:
- RGWBucketShardFullSyncCR(RGWDataSyncCtx *_sc,
- rgw_bucket_sync_pipe& _sync_pipe,
- const std::string& status_oid,
- boost::intrusive_ptr<const RGWContinuousLeaseCR> lease_cr,
- rgw_bucket_shard_sync_info& sync_info,
- RGWSyncTraceNodeRef tn_parent,
- RGWObjVersionTracker& objv_tracker)
+ RGWBucketFullSyncCR(RGWDataSyncCtx *_sc,
+ rgw_bucket_sync_pipe& _sync_pipe,
+ const rgw_raw_obj& status_obj,
+ boost::intrusive_ptr<const RGWContinuousLeaseCR> lease_cr,
+ rgw_bucket_sync_status& sync_status,
+ RGWSyncTraceNodeRef tn_parent,
+ RGWObjVersionTracker& objv_tracker)
: RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env),
- sync_pipe(_sync_pipe), bs(_sync_pipe.info.source_bs),
- lease_cr(std::move(lease_cr)), sync_info(sync_info),
- status_oid(status_oid),
+ sync_pipe(_sync_pipe), sync_status(sync_status),
+ bs(_sync_pipe.info.source_bs),
+ lease_cr(std::move(lease_cr)), status_obj(status_obj), objv(objv_tracker),
tn(sync_env->sync_tracer->add_node(tn_parent, "full_sync",
SSTR(bucket_shard_str{bs}))),
- marker_tracker(sc, status_oid, sync_info.full_marker, tn, objv_tracker)
+ marker_tracker(sc, status_obj, sync_status, tn, objv_tracker)
{
zones_trace.insert(sc->source_zone.id, sync_pipe.info.dest_bs.bucket.get_key());
prefix_handler.set_rules(sync_pipe.get_rules());
int operate(const DoutPrefixProvider *dpp) override;
};
-int RGWBucketShardFullSyncCR::operate(const DoutPrefixProvider *dpp)
+int RGWBucketFullSyncCR::operate(const DoutPrefixProvider *dpp)
{
reenter(this) {
- list_marker = sync_info.full_marker.position;
+ list_marker = sync_status.full.position;
- total_entries = sync_info.full_marker.count;
+ total_entries = sync_status.full.count;
do {
if (lease_cr && !lease_cr->is_locked()) {
drain_all();
break;
}
- yield call(new RGWListBucketShardCR(sc, bs, list_marker,
- &list_result));
+ yield call(new RGWListRemoteBucketCR(sc, bs, list_marker, &list_result));
if (retcode < 0 && retcode != -ENOENT) {
set_status("failed bucket listing, going down");
drain_all();
[&](uint64_t stack_id, int ret) {
if (ret < 0) {
tn->log(10, "a sync operation returned error");
- sync_status = ret;
+ sync_result = ret;
}
return 0;
});
}
- } while (list_result.is_truncated && sync_status == 0);
+ } while (list_result.is_truncated && sync_result == 0);
set_status("done iterating over all objects");
- /* wait for all operations to complete */
+ /* wait for all operations to complete */
drain_all_cb([&](uint64_t stack_id, int ret) {
if (ret < 0) {
tn->log(10, "a sync operation returned error");
- sync_status = ret;
+ sync_result = ret;
}
return 0;
});
if (lease_cr && !lease_cr->is_locked()) {
return set_cr_error(-ECANCELED);
}
+ yield call(marker_tracker.flush());
+ if (retcode < 0) {
+ tn->log(0, SSTR("ERROR: marker_tracker.flush() returned retcode=" << retcode));
+ return set_cr_error(retcode);
+ }
/* update sync state to incremental */
- if (sync_status == 0) {
- yield {
- sync_info.state = rgw_bucket_shard_sync_info::StateIncrementalSync;
- map<string, bufferlist> attrs;
- sync_info.encode_state_attr(attrs);
- call(new RGWSimpleRadosWriteAttrsCR(dpp, sync_env->async_rados, sync_env->svc->sysobj,
- rgw_raw_obj(sync_env->svc->zone->get_zone_params().log_pool, status_oid),
- attrs));
- }
+ if (sync_result == 0) {
+ sync_status.state = BucketSyncState::Incremental;
+ tn->log(5, SSTR("set bucket state=" << sync_status.state));
+ yield call(new RGWSimpleRadosWriteCR<rgw_bucket_sync_status>(
+ dpp, sync_env->async_rados, sync_env->svc->sysobj,
+ status_obj, sync_status, &objv));
+ tn->log(5, SSTR("bucket status objv=" << objv));
} else {
- tn->log(10, SSTR("backing out with sync_status=" << sync_status));
+ tn->log(10, SSTR("backing out with sync_status=" << sync_result));
}
- if (retcode < 0 && sync_status == 0) { /* actually tried to set incremental state and failed */
+ if (retcode < 0 && sync_result == 0) { /* actually tried to set incremental state and failed */
tn->log(0, SSTR("ERROR: failed to set sync state on bucket "
<< bucket_shard_str{bs} << " retcode=" << retcode));
return set_cr_error(retcode);
}
- if (sync_status < 0) {
- return set_cr_error(sync_status);
+ if (sync_result < 0) {
+ return set_cr_error(sync_result);
}
return set_cr_done();
}
boost::intrusive_ptr<const RGWContinuousLeaseCR> lease_cr;
rgw_bucket_sync_pair_info sync_pair;
rgw_bucket_sync_pipe& sync_pipe;
+ BucketSyncState& bucket_state;
ceph::real_time* progress;
const std::string status_oid;
boost::intrusive_ptr<const RGWContinuousLeaseCR> lease_cr,
const rgw_bucket_sync_pair_info& _sync_pair,
rgw_bucket_sync_pipe& sync_pipe,
+ BucketSyncState& bucket_state,
const RGWSyncTraceNodeRef& tn,
ceph::real_time* progress)
: RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env),
- lease_cr(std::move(lease_cr)), sync_pair(_sync_pair), sync_pipe(sync_pipe), progress(progress),
+ lease_cr(std::move(lease_cr)), sync_pair(_sync_pair),
+ sync_pipe(sync_pipe), bucket_state(bucket_state), progress(progress),
status_oid(RGWBucketPipeSyncStatusManager::inc_status_oid(sc->source_zone, sync_pair)),
tn(tn) {
}
yield call(new RGWReadBucketPipeSyncStatusCoroutine(sc, sync_pair, &sync_status, &objv_tracker));
if (retcode < 0 && retcode != -ENOENT) {
tn->log(0, "ERROR: failed to read sync status for bucket");
- drain_all();
return set_cr_error(retcode);
}
- tn->log(20, SSTR("sync status for source bucket: " << sync_status.state));
-
- do {
- if (sync_status.state == rgw_bucket_shard_sync_info::StateInit ||
- sync_status.state == rgw_bucket_shard_sync_info::StateStopped) {
- yield call(new RGWInitBucketShardSyncStatusCoroutine(sc, sync_pair, sync_status, objv_tracker, false));
- if (retcode == -ENOENT) {
- tn->log(0, "bucket sync disabled");
- drain_all();
- return set_cr_done();
- }
- if (retcode < 0) {
- tn->log(0, SSTR("ERROR: init sync on bucket failed, retcode=" << retcode));
- drain_all();
- return set_cr_error(retcode);
- }
- }
- if (progress) {
- *progress = sync_status.inc_marker.timestamp;
- }
-
- if (sync_status.state == rgw_bucket_shard_sync_info::StateFullSync) {
- yield call(new RGWBucketShardFullSyncCR(sc, sync_pipe,
- status_oid, lease_cr,
- sync_status, tn, objv_tracker));
- if (retcode < 0) {
- tn->log(5, SSTR("full sync on bucket failed, retcode=" << retcode));
- drain_all();
- return set_cr_error(retcode);
- }
- }
-
- if (sync_status.state == rgw_bucket_shard_sync_info::StateIncrementalSync) {
- yield call(new RGWBucketShardIncrementalSyncCR(sc, sync_pipe,
- status_oid, lease_cr,
- sync_status, tn,
- objv_tracker, progress));
- if (retcode < 0) {
- tn->log(5, SSTR("incremental sync on bucket failed, retcode=" << retcode));
- drain_all();
- return set_cr_error(retcode);
- }
- }
- // loop back to previous states unless incremental sync returns normally
- } while (sync_status.state != rgw_bucket_shard_sync_info::StateIncrementalSync);
+ tn->log(20, SSTR("sync status for source bucket shard: " << sync_status.state));
+ sync_status.state = rgw_bucket_shard_sync_info::StateIncrementalSync;
+ if (progress) {
+ *progress = sync_status.inc_marker.timestamp;
+ }
- drain_all();
+ yield call(new RGWBucketShardIncrementalSyncCR(sc, sync_pipe,
+ status_oid, lease_cr,
+ sync_status, tn,
+ objv_tracker, progress));
+ if (retcode < 0) {
+ tn->log(5, SSTR("incremental sync on bucket failed, retcode=" << retcode));
+ return set_cr_error(retcode);
+ }
+ // TODO: handle transition to StateStopped
return set_cr_done();
}
class RGWSyncBucketCR : public RGWCoroutine {
RGWDataSyncCtx *sc;
RGWDataSyncEnv *env;
- boost::intrusive_ptr<const RGWContinuousLeaseCR> lease_cr;
+ boost::intrusive_ptr<const RGWContinuousLeaseCR> data_lease_cr;
+ boost::intrusive_ptr<RGWContinuousLeaseCR> bucket_lease_cr;
rgw_bucket_sync_pair_info sync_pair;
rgw_bucket_sync_pipe sync_pipe;
ceph::real_time* progress;
const rgw_raw_obj status_obj;
rgw_bucket_sync_status bucket_status;
RGWObjVersionTracker objv;
+ bool init_check_compat = false;
RGWSyncTraceNodeRef tn;
const RGWSyncTraceNodeRef& _tn_parent,
ceph::real_time* progress)
: RGWCoroutine(_sc->cct), sc(_sc), env(_sc->env),
- lease_cr(std::move(lease_cr)), sync_pair(_sync_pair), progress(progress),
+ data_lease_cr(std::move(lease_cr)), sync_pair(_sync_pair), progress(progress),
status_obj(env->svc->zone->get_zone_params().log_pool,
RGWBucketPipeSyncStatusManager::full_status_oid(sc->source_zone,
sync_pair.source_bs.bucket,
return set_cr_error(retcode);
}
- if (bucket_status.state == BucketSyncState::Init) {
- // init sync status
- yield {
- // use exclusive create if it didn't exist. if we lose the race to
- // create it, we'll fail with EEXIST and RGWSyncBucketCR() will come
- // back later and read the new status
- const bool exclusive = objv.version_for_check() == nullptr;
- int num_shards = sync_pipe.dest_bucket_info.layout.current_index.layout.normal.num_shards;
- call(new InitBucketFullSyncStatusCR(sc, sync_pair, status_obj,
- bucket_status, objv, num_shards,
- exclusive));
- }
- if (retcode < 0) {
- return set_cr_error(retcode);
+ do {
+ tn->log(20, SSTR("sync status for source bucket: " << bucket_status.state));
+
+ if (bucket_status.state == BucketSyncState::Init ||
+ bucket_status.state == BucketSyncState::Stopped) {
+ // init sync status
+ yield {
+ init_check_compat = objv.read_version.ver <= 1; // newly-created
+ int num_shards = sync_pipe.dest_bucket_info.layout.current_index.layout.normal.num_shards;
+ call(new InitBucketFullSyncStatusCR(sc, sync_pair, status_obj,
+ bucket_status, objv, num_shards,
+ init_check_compat));
+ }
+ if (retcode < 0) {
+ return set_cr_error(retcode);
+ }
}
- }
- if (bucket_status.state == BucketSyncState::Full) {
- // TODO: full sync
- }
+ if (bucket_status.state == BucketSyncState::Full) {
+ // take a bucket-wide lease for full sync to prevent the bucket shards
+ // from duplicating the work
+ yield {
+ const std::string lock_name = "full sync";
+ const uint32_t lock_duration = cct->_conf->rgw_sync_lease_period;
+ bucket_lease_cr.reset(new RGWContinuousLeaseCR(env->async_rados, env->store, status_obj,
+ lock_name, lock_duration, this));
+ spawn(bucket_lease_cr.get(), false);
+ }
+ while (!bucket_lease_cr->is_locked()) {
+ if (bucket_lease_cr->is_done()) {
+ tn->log(5, "failed to take bucket lease");
+ set_status("lease lock failed, early abort");
+ drain_all();
+ return set_cr_error(bucket_lease_cr->get_ret_status());
+ }
+ tn->log(5, "waiting on bucket lease");
+ yield set_sleeping(true);
+ }
- if (bucket_status.state == BucketSyncState::Incremental) {
- yield call(new RGWSyncBucketShardCR(sc, lease_cr, sync_pair,
- sync_pipe, tn, progress));
- if (retcode < 0) {
- return set_cr_error(retcode);
+ yield call(new RGWBucketFullSyncCR(sc, sync_pipe, status_obj,
+ bucket_lease_cr, bucket_status,
+ tn, objv));
+ bucket_lease_cr->go_down();
+ drain_all();
+ bucket_lease_cr.reset();
+ if (retcode < 0) {
+ return set_cr_error(retcode);
+ }
}
- }
+
+ if (bucket_status.state == BucketSyncState::Incremental) {
+ yield call(new RGWSyncBucketShardCR(sc, data_lease_cr, sync_pair,
+ sync_pipe, bucket_status.state,
+ tn, progress));
+ if (retcode < 0) {
+ return set_cr_error(retcode);
+ }
+ }
+ // loop back to previous states unless incremental sync returns normally
+ } while (bucket_status.state != BucketSyncState::Incremental);
+
return set_cr_done();
}