#include "rgw_cr_tools.h"
#include "rgw_http_client.h"
#include "rgw_bucket.h"
+#include "rgw_bucket_sync.h"
#include "rgw_metadata.h"
#include "rgw_sync_counters.h"
#include "rgw_sync_module.h"
class RGWRunBucketSyncCoroutine : public RGWCoroutine {
RGWDataSyncCtx *sc;
RGWDataSyncEnv *sync_env;
+ rgw_bucket_sync_pair_info sync_pair;
rgw_bucket_sync_pipe sync_pipe;
rgw_bucket_shard_sync_info sync_status;
RGWMetaSyncEnv meta_sync_env;
RGWSyncTraceNodeRef tn;
public:
- RGWRunBucketSyncCoroutine(RGWDataSyncCtx *_sc, const rgw_bucket_shard& bs, const RGWSyncTraceNodeRef& _tn_parent)
- : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env),
- status_oid(RGWBucketPipeSyncStatusManager::status_oid(sc->source_zone, sync_pipe)),
+ RGWRunBucketSyncCoroutine(RGWDataSyncCtx *_sc, const rgw_bucket_sync_pair_info& _sync_pair, const RGWSyncTraceNodeRef& _tn_parent)
+ : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env), sync_pair(_sync_pair),
+ status_oid(RGWBucketPipeSyncStatusManager::status_oid(sc->source_zone, sync_pair)),
tn(sync_env->sync_tracer->add_node(_tn_parent, "bucket",
- SSTR(bucket_shard_str{bs}))) {
- sync_pipe.source_bs = bs;
+ SSTR(bucket_shard_str{_sync_pair.dest_bs} << "<-" << bucket_shard_str{_sync_pair.source_bs} ))) {
}
~RGWRunBucketSyncCoroutine() override {
if (lease_cr) {
string entry_marker;
rgw_bucket_shard bs;
+ rgw_bucket_sync_pair_info sync_pair;
int sync_status;
marker_tracker->reset_need_retry(raw_key);
}
tn->log(0, SSTR("triggering sync of bucket/shard " << bucket_shard_str{bs}));
- call(new RGWRunBucketSyncCoroutine(sc, bs, tn));
+
+ sync_pair = rgw_bucket_sync_pair_info();
+ sync_pair.source_bs = bs;
+ sync_pair.dest_bs = bs;
+#warning init pipe fields
+ call(new RGWRunBucketSyncCoroutine(sc, sync_pair, tn));
}
} while (marker_tracker && marker_tracker->need_retry(raw_key));
RGWCoroutine *RGWDefaultDataSyncModule::sync_object(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, std::optional<uint64_t> versioned_epoch, rgw_zone_set *zones_trace)
{
auto sync_env = sc->env;
- return new RGWFetchRemoteObjCR(sync_env->async_rados, sync_env->store, sc->source_zone, sync_pipe.source_bs.bucket,
+ return new RGWFetchRemoteObjCR(sync_env->async_rados, sync_env->store, sc->source_zone, sync_pipe.info.source_bs.bucket,
std::nullopt, sync_pipe.dest_bucket_info,
key, std::nullopt, versioned_epoch,
true, zones_trace, sync_env->counters, sync_env->dpp);
RGWCoroutine *RGWArchiveDataSyncModule::sync_object(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, std::optional<uint64_t> versioned_epoch, rgw_zone_set *zones_trace)
{
auto sync_env = sc->env;
- ldout(sc->cct, 5) << "SYNC_ARCHIVE: sync_object: b=" << sync_pipe.source_bs.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch.value_or(0) << dendl;
+ ldout(sc->cct, 5) << "SYNC_ARCHIVE: sync_object: b=" << sync_pipe.info.source_bs.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch.value_or(0) << dendl;
if (!sync_pipe.dest_bucket_info.versioned() ||
(sync_pipe.dest_bucket_info.flags & BUCKET_VERSIONS_SUSPENDED)) {
ldout(sc->cct, 0) << "SYNC_ARCHIVE: sync_object: enabling object versioning for archive bucket" << dendl;
}
return new RGWFetchRemoteObjCR(sync_env->async_rados, sync_env->store, sc->source_zone,
- sync_pipe.source_bs.bucket, std::nullopt, sync_pipe.dest_bucket_info,
+ sync_pipe.info.source_bs.bucket, std::nullopt, sync_pipe.dest_bucket_info,
key, dest_key, versioned_epoch,
true, zones_trace, nullptr, sync_env->dpp);
}
RGWCoroutine *RGWArchiveDataSyncModule::remove_object(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key,
real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace)
{
- ldout(sc->cct, 0) << "SYNC_ARCHIVE: remove_object: b=" << sync_pipe.source_bs.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch << dendl;
+ ldout(sc->cct, 0) << "SYNC_ARCHIVE: remove_object: b=" << sync_pipe.info.source_bs.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch << dendl;
return NULL;
}
RGWCoroutine *RGWArchiveDataSyncModule::create_delete_marker(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime,
rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace)
{
- ldout(sc->cct, 0) << "SYNC_ARCHIVE: create_delete_marker: b=" << sync_pipe.source_bs.bucket << " k=" << key << " mtime=" << mtime
+ ldout(sc->cct, 0) << "SYNC_ARCHIVE: create_delete_marker: b=" << sync_pipe.info.source_bs.bucket << " k=" << key << " mtime=" << mtime
<< " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl;
auto sync_env = sc->env;
return new RGWRemoveObjCR(sync_env->async_rados, sync_env->store, sc->source_zone,
rgw_bucket_shard_sync_info& _status)
: RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env),
sync_pipe(_sync_pipe),
- sync_status_oid(RGWBucketPipeSyncStatusManager::status_oid(sc->source_zone, _sync_pipe)),
+ sync_status_oid(RGWBucketPipeSyncStatusManager::status_oid(sc->source_zone, _sync_pipe.info)),
status(_status)
{}
int operate() override {
reenter(this) {
/* fetch current position in logs */
- yield call(new RGWReadRemoteBucketIndexLogInfoCR(sc, sync_pipe.source_bs, &info));
+ yield call(new RGWReadRemoteBucketIndexLogInfoCR(sc, sync_pipe.info.source_bs, &info));
if (retcode < 0 && retcode != -ENOENT) {
ldout(cct, 0) << "ERROR: failed to fetch bucket index status" << dendl;
return set_cr_error(retcode);
#warning FIXME
rgw_bucket_sync_pipe sync_pipe;
sync_pipe.source_bs = bs;
+ sync_pipe.dest_bs = bs;
return new RGWInitBucketShardSyncStatusCoroutine(&sc, sync_pipe, init_status);
}
map<string, bufferlist> attrs;
public:
RGWReadBucketPipeSyncStatusCoroutine(RGWDataSyncCtx *_sc,
- const rgw_bucket_sync_pipe& sync_pipe,
+ const rgw_bucket_sync_pair_info& sync_pair,
rgw_bucket_shard_sync_info *_status)
: RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env),
- oid(RGWBucketPipeSyncStatusManager::status_oid(sc->source_zone, sync_pipe)),
+ oid(RGWBucketPipeSyncStatusManager::status_oid(sc->source_zone, sync_pair)),
status(_status) {}
int operate() override;
};
RGWCoroutine *RGWRemoteBucketLog::read_sync_status_cr(rgw_bucket_shard_sync_info *sync_status)
{
+ rgw_bucket_sync_pair_info sync_pair;
+ sync_pair.source_bs = bs;
#warning FIXME
- rgw_bucket_sync_pipe sync_pipe;
- sync_pipe.source_bs = bs;
- return new RGWReadBucketPipeSyncStatusCoroutine(&sc, sync_pipe, sync_status);
+ sync_pair.dest_bs = bs;
+ return new RGWReadBucketPipeSyncStatusCoroutine(&sc, sync_pair, sync_status);
}
RGWBucketPipeSyncStatusManager::RGWBucketPipeSyncStatusManager(rgw::sal::RGWRadosStore *_store, const string& _source_zone,
const T& _entry_marker, RGWSyncShardMarkerTrack<T, K> *_marker_tracker, rgw_zone_set& _zones_trace,
RGWSyncTraceNodeRef& _tn_parent) : RGWCoroutine(_sc->cct),
sc(_sc), sync_env(_sc->env),
- sync_pipe(_sync_pipe), bs(_sync_pipe.source_bs),
+ sync_pipe(_sync_pipe), bs(_sync_pipe.info.source_bs),
key(_key), versioned(_versioned), versioned_epoch(_versioned_epoch),
owner(_owner),
timestamp(_timestamp), op(_op),
rgw_bucket_shard_sync_info& sync_info,
RGWSyncTraceNodeRef tn_parent)
: RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env),
- sync_pipe(_sync_pipe), bs(_sync_pipe.source_bs),
+ sync_pipe(_sync_pipe), bs(_sync_pipe.info.source_bs),
lease_cr(lease_cr), sync_info(sync_info),
marker_tracker(sc, status_oid, sync_info.full_marker),
status_oid(status_oid),
rgw_bucket_shard_sync_info& sync_info,
RGWSyncTraceNodeRef& _tn_parent)
: RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env),
- sync_pipe(_sync_pipe), bs(_sync_pipe.source_bs),
+ sync_pipe(_sync_pipe), bs(_sync_pipe.info.source_bs),
lease_cr(lease_cr), sync_info(sync_info),
marker_tracker(sc, status_oid, sync_info.inc_marker),
status_oid(status_oid), zone_id(sync_env->svc->zone->get_zone().id),
};
WRITE_CLASS_ENCODER(rgw_bucket_sync_sources_local_info)
-class RGWReadBucketSourcesInfoCR : public RGWCoroutine {
+class RGWGetBucketSourcePeersCR : public RGWCoroutine {
RGWDataSyncEnv *sync_env;
rgw_bucket bucket;
- RGWBucketInfo bucket_info;
+ map<string, set<RGWBucketSyncPolicyHandler::peer_info> > *sources;
+ RGWBucketInfo *pbucket_info;
rgw_raw_obj sources_obj;
rgw_bucket_sync_sources_local_info expected_local_info;
rgw_bucket_get_sync_policy_params get_policy_params;
- std::shared_ptr<rgw_bucket_get_sync_policy_result> get_policy_result;
+ std::shared_ptr<rgw_bucket_get_sync_policy_result> policy;
RGWSyncTraceNodeRef tn;
public:
- RGWReadBucketSourcesInfoCR(RGWDataSyncEnv *_sync_env,
- const rgw_bucket& _bucket,
- const RGWSyncTraceNodeRef& _tn_parent)
+ RGWGetBucketSourcePeersCR(RGWDataSyncEnv *_sync_env,
+ const rgw_bucket& _bucket,
+ map<string, set<RGWBucketSyncPolicyHandler::peer_info> > *_sources,
+ RGWBucketInfo *_pbucket_info,
+ const RGWSyncTraceNodeRef& _tn_parent)
: RGWCoroutine(_sync_env->cct),
sync_env(_sync_env),
bucket(_bucket),
+ sources(_sources),
+ pbucket_info(_pbucket_info),
sources_obj(RGWBucketSyncSourcesManager::sync_sources_obj(sync_env->svc->zone, bucket)),
- get_policy_result(make_shared<rgw_bucket_get_sync_policy_result>()),
+ policy(make_shared<rgw_bucket_get_sync_policy_result>()),
tn(sync_env->sync_tracer->add_node(_tn_parent, "read_bucket_sources",
SSTR(bucket))) {
}
int operate() override;
};
-int RGWReadBucketSourcesInfoCR::operate()
+int RGWGetBucketSourcePeersCR::operate()
{
reenter(this) {
yield call(new RGWSimpleRadosReadCR<rgw_bucket_sync_sources_local_info>(sync_env->async_rados,
yield call(new RGWBucketGetSyncPolicyHandlerCR(sync_env->async_rados,
sync_env->store,
get_policy_params,
- get_policy_result));
+ policy));
if (retcode < 0 &&
retcode != -ENOENT) {
return set_cr_error(retcode);
}
+#warning update local copy of sources and act on changes
+
+ {
+ auto& handler = policy->policy_handler;
+
+ *sources = handler->get_sources();
+ *pbucket_info = handler->get_bucket_info();
+ }
return set_cr_done();
}
class RGWRunBucketSourcesSyncCR : public RGWCoroutine {
RGWDataSyncEnv *sync_env;
rgw_bucket bucket;
- rgw_sync_source source;
+ RGWBucketInfo bucket_info;
rgw_raw_obj sources_obj;
+ map<string, set<RGWBucketSyncPolicyHandler::peer_info> > sources;
+ map<string, set<RGWBucketSyncPolicyHandler::peer_info> >::iterator siter;
+ set<RGWBucketSyncPolicyHandler::peer_info>::iterator piter;
+
+ rgw_bucket_sync_pair_info sync_pair;
+
boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr;
boost::intrusive_ptr<RGWCoroutinesStack> lease_stack;
RGWSyncTraceNodeRef tn;
+ std::vector<RGWDataSyncCtx> scs;
+ RGWDataSyncCtx *cur_sc{nullptr};
+
+ int ret{0};
public:
RGWRunBucketSourcesSyncCR(RGWDataSyncEnv *_sync_env,
}
tn->log(10, "took lease");
- yield call(new RGWReadBucketSourcesInfoCR(sync_env, bucket, tn, &info));
+ yield call(new RGWGetBucketSourcePeersCR(sync_env, bucket, &sources, &bucket_info, tn));
if (retcode < 0 && retcode != -ENOENT) {
tn->log(0, "ERROR: failed to read sync status for bucket");
lease_cr->go_down();
return set_cr_error(retcode);
}
- if (retcode == -ENOENT) {
- rgw_bucket_sync_pipe sync_pipe;
- sync_pipe.init_default(bs);
- info.pipes.push_back(sync_pipe);
- }
+ for (siter = sources.begin(); siter != sources.end(); ++siter) {
+ scs.emplace_back();
+ cur_sc = &scs.back();
+ {
+ auto& source_zone = siter->first;
+ auto conn = sync_env->svc->zone->get_zone_conn_by_id(source_zone);
+ if (!conn) {
+ ldpp_dout(sync_env->dpp, 0) << "ERROR: connection object to zone " << source_zone << " does not exist" << dendl;
+ continue;
+ }
+ cur_sc->init(sync_env, conn, siter->first);
+ }
+ for (piter = siter->second.begin(); piter != siter->second.end(); ++piter) {
+ if (!piter->is_rgw()) {
+ continue;
+ }
- yield {
- for (auto pipe : info.pipes) {
- spawn(new RGWRunBucketSyncCoroutine(sc, pipe, &tn));
+ sync_pair.source_bs.bucket = piter->bucket;
+ sync_pair.dest_bs.bucket = bucket_info.bucket;
+
+ yield spawn(new RGWRunBucketSyncCoroutine(cur_sc, sync_pair, tn));
+ while (num_spawned() > BUCKET_SYNC_SPAWN_WINDOW) {
+ set_status() << "num_spawned() > spawn_window";
+ yield wait_for_child();
+ bool again = true;
+ while (again) {
+ again = collect(&ret, nullptr);
+ if (ret < 0) {
+ tn->log(10, "a sync operation returned error");
+ /* we have reported this error */
+ }
+ /* not waiting for child here */
+ }
+ }
}
}
return 0;
}
+class RGWSyncGetBucketInfoCR : public RGWCoroutine {
+ RGWDataSyncEnv *sync_env;
+ rgw_bucket bucket;
+ RGWBucketInfo *pbucket_info;
+ RGWMetaSyncEnv meta_sync_env;
+
+ RGWSyncTraceNodeRef tn;
+
+public:
+ RGWSyncGetBucketInfoCR(RGWDataSyncEnv *_sync_env,
+ const rgw_bucket& _bucket,
+ RGWBucketInfo *_pbucket_info,
+ const RGWSyncTraceNodeRef& _tn_parent)
+ : RGWCoroutine(_sync_env->cct),
+ sync_env(_sync_env),
+ bucket(_bucket),
+ pbucket_info(_pbucket_info),
+ tn(sync_env->sync_tracer->add_node(_tn_parent, "get_bucket_info",
+ SSTR(bucket))) {
+ }
+
+ int operate() override;
+};
+
+int RGWSyncGetBucketInfoCR::operate()
+{
+ reenter(this) {
+ yield call(new RGWGetBucketInstanceInfoCR(sync_env->async_rados, sync_env->store, bucket, pbucket_info));
+ if (retcode == -ENOENT) {
+ /* bucket instance info has not been synced in yet, fetch it now */
+ yield {
+ tn->log(10, SSTR("no local info for bucket:" << ": fetching metadata"));
+ string raw_key = string("bucket.instance:") + bucket.get_key();
+
+ meta_sync_env.init(sync_env->dpp, cct, sync_env->store, sync_env->svc->zone->get_master_conn(), sync_env->async_rados,
+ sync_env->http_manager, sync_env->error_logger, sync_env->sync_tracer);
+
+ call(new RGWMetaSyncSingleEntryCR(&meta_sync_env, raw_key,
+ string() /* no marker */,
+ MDLOG_STATUS_COMPLETE,
+ NULL /* no marker tracker */,
+ tn));
+ }
+ if (retcode < 0) {
+ tn->log(0, SSTR("ERROR: failed to fetch bucket instance info for " << bucket_str{bucket}));
+ return set_cr_error(retcode);
+ }
+
+ yield call(new RGWGetBucketInstanceInfoCR(sync_env->async_rados, sync_env->store, bucket, &pbucket_info));
+ }
+ if (retcode < 0) {
+ tn->log(0, SSTR("ERROR: failed to retrieve bucket info for bucket=" << bucket_str{bucket}));
+ return set_cr_error(retcode);
+ }
+
+ return set_cr_done();
+ }
+}
+
int RGWRunBucketSyncCoroutine::operate()
{
reenter(this) {
}
tn->log(10, "took lease");
- yield call(new RGWReadBucketPipeSyncStatusCoroutine(sc, sync_pipe, &sync_status));
+ yield call(new RGWReadBucketPipeSyncStatusCoroutine(sc, sync_pair, &sync_status));
if (retcode < 0 && retcode != -ENOENT) {
tn->log(0, "ERROR: failed to read sync status for bucket");
lease_cr->go_down();
return set_cr_error(retcode);
}
- tn->log(20, SSTR("sync status for bucket: " << sync_status.state));
-
- yield call(new RGWGetBucketInstanceInfoCR(sync_env->async_rados, sync_env->store, sync_pipe.source_bs.bucket, &sync_pipe.dest_bucket_info));
- if (retcode == -ENOENT) {
- /* bucket instance info has not been synced in yet, fetch it now */
- yield {
- tn->log(10, SSTR("no local info for bucket:" << ": fetching metadata"));
- string raw_key = string("bucket.instance:") + sync_pipe.source_bs.bucket.get_key();
-
- meta_sync_env.init(sync_env->dpp, cct, sync_env->store, sync_env->svc->zone->get_master_conn(), sync_env->async_rados,
- sync_env->http_manager, sync_env->error_logger, sync_env->sync_tracer);
-
- call(new RGWMetaSyncSingleEntryCR(&meta_sync_env, raw_key,
- string() /* no marker */,
- MDLOG_STATUS_COMPLETE,
- NULL /* no marker tracker */,
- tn));
- }
- if (retcode < 0) {
- tn->log(0, SSTR("ERROR: failed to fetch bucket instance info for " << bucket_str{sync_pipe.source_bs.bucket}));
- lease_cr->go_down();
- drain_all();
- return set_cr_error(retcode);
- }
+ tn->log(20, SSTR("sync status for source bucket: " << sync_status.state));
- yield call(new RGWGetBucketInstanceInfoCR(sync_env->async_rados, sync_env->store, sync_pipe.source_bs.bucket, &sync_pipe.dest_bucket_info));
+ yield call(new RGWSyncGetBucketInfoCR(sync_env, sync_pair.source_bs.bucket, &sync_pipe.source_bucket_info, tn));
+ if (retcode < 0) {
+ tn->log(0, SSTR("ERROR: failed to retrieve bucket info for bucket=" << bucket_str{sync_pipe.info.source_bs.bucket}));
+ lease_cr->go_down();
+ drain_all();
+ return set_cr_error(retcode);
}
+
+ yield call(new RGWSyncGetBucketInfoCR(sync_env, sync_pair.dest_bs.bucket, &sync_pipe.dest_bucket_info, tn));
if (retcode < 0) {
tn->log(0, SSTR("ERROR: failed to retrieve bucket info for bucket=" << bucket_str{sync_pipe.source_bs.bucket}));
lease_cr->go_down();
return set_cr_error(retcode);
}
+ sync_pipe.info = sync_pair;
+
do {
if (sync_status.state == rgw_bucket_shard_sync_info::StateInit) {
yield call(new RGWInitBucketShardSyncStatusCoroutine(sc, sync_pipe, sync_status));
return ret;
}
+#warning read specific bucket sources
const string key = bucket.get_key();
}
string RGWBucketPipeSyncStatusManager::status_oid(const string& source_zone,
- const rgw_bucket_sync_pipe& sync_pipe)
+ const rgw_bucket_sync_pair_info& sync_pair)
{
- return bucket_status_oid_prefix + "." + source_zone + ":" + sync_pipe.source_bs.get_key();
+ if (sync_pair.source_bs == sync_pair.dest_bs) {
+ return bucket_status_oid_prefix + "." + source_zone + ":" + sync_pair.dest_bs.get_key();
+ } else {
+ return bucket_status_oid_prefix + "." + source_zone + ":" + sync_pair.dest_bs.get_key() + ":" + sync_pair.source_bs.get_key();
+ }
}
string RGWBucketPipeSyncStatusManager::obj_status_oid(const string& source_zone,