}
};
+class RGWGenericAsyncCR : public RGWSimpleCoroutine {
+ RGWAsyncRadosProcessor *async_rados;
+ rgw::sal::RGWRadosStore *store;
+
+
+public:
+ class Action {
+ public:
+ virtual ~Action() {}
+ virtual int operate() = 0;
+ };
+
+private:
+ std::shared_ptr<Action> action;
+
+ class Request : public RGWAsyncRadosRequest {
+ std::shared_ptr<Action> action;
+ protected:
+ int _send_request() override {
+ if (!action) {
+ return 0;
+ }
+ return action->operate();
+ }
+ public:
+ Request(RGWCoroutine *caller,
+ RGWAioCompletionNotifier *cn,
+ std::shared_ptr<Action>& _action) : RGWAsyncRadosRequest(caller, cn),
+ action(_action) {}
+ } *req{nullptr};
+
+ public:
+ RGWGenericAsyncCR(CephContext *_cct,
+ RGWAsyncRadosProcessor *_async_rados,
+ std::shared_ptr<Action>& _action) : RGWSimpleCoroutine(_cct),
+ async_rados(_async_rados),
+ action(_action) {}
+ template<typename T>
+ RGWGenericAsyncCR(CephContext *_cct,
+ RGWAsyncRadosProcessor *_async_rados,
+ std::shared_ptr<T>& _action) : RGWSimpleCoroutine(_cct),
+ async_rados(_async_rados),
+ action(std::static_pointer_cast<Action>(_action)) {}
+
+ ~RGWGenericAsyncCR() override {
+ request_cleanup();
+ }
+ void request_cleanup() override {
+ if (req) {
+ req->finish();
+ req = NULL;
+ }
+ }
+
+ int send_request() override {
+ req = new Request(this,
+ stack->create_completion_notifier(),
+ action);
+
+ async_rados->queue(req);
+ return 0;
+ }
+ int request_complete() override {
+ return req->get_ret_status();
+ }
+};
+
class RGWAsyncGetSystemObj : public RGWAsyncRadosRequest {
RGWSysObjectCtx obj_ctx;
static string datalog_sync_full_sync_index_prefix = "data.full-sync.index";
static string bucket_status_oid_prefix = "bucket.sync-status";
static string object_status_oid_prefix = "bucket.sync-status";
-static string bucket_sync_sources_oid_prefix = "bucket.sync-sources";
-static string bucket_sync_targets_oid_prefix = "bucket.sync-targets";
void rgw_datalog_info::decode_json(JSONObj *obj) {
std::optional<rgw_bucket> target_bucket;
std::optional<rgw_bucket> source_bucket;
- rgw_raw_obj sources_obj;
-
rgw_sync_pipe_info_set pipes;
rgw_sync_pipe_info_set::iterator siter;
return 0;
}
-class RGWBucketSyncPeersManager {
-public:
- static string sync_sources_oid(const rgw_bucket& bucket) {
- return bucket_sync_sources_oid_prefix + "." + bucket.get_key();
- }
-
- static rgw_raw_obj sync_sources_obj(RGWSI_Zone *zone_svc, const rgw_bucket& bucket) {
- return rgw_raw_obj(zone_svc->get_zone_params().log_pool, sync_sources_oid(bucket));
- }
- static string sync_targets_oid(const string& source_zone, const rgw_bucket& source_bucket) {
- return bucket_sync_targets_oid_prefix + "." + source_zone + "." + source_bucket.get_key();
- }
-
- static rgw_raw_obj sync_targets_obj(RGWSI_Zone *zone_svc, const string& source_zone, const rgw_bucket& source_bucket) {
- return rgw_raw_obj(zone_svc->get_zone_params().log_pool, sync_targets_oid(source_zone, source_bucket));
- }
-};
-
struct rgw_bucket_sync_source_local_info {
string id;
string type;
rgw_sync_pipe_info_set::iterator siter;
- rgw_bucket_sync_sources_local_info sources_local_info;
- rgw_bucket_sync_sources_local_info targets_local_info;
-
rgw_bucket_get_sync_policy_params get_policy_params;
std::shared_ptr<rgw_bucket_get_sync_policy_result> source_policy;
std::shared_ptr<rgw_bucket_get_sync_policy_result> target_policy;
void update_from_target_bucket_policy();
void update_from_source_bucket_policy();
+ struct GetHintTargets : public RGWGenericAsyncCR::Action {
+ RGWDataSyncEnv *sync_env;
+ rgw_bucket source_bucket;
+ std::set<rgw_bucket> targets;
+
+ GetHintTargets(RGWDataSyncEnv *_sync_env,
+ const rgw_bucket& _source_bucket) : sync_env(_sync_env),
+ source_bucket(_source_bucket) {}
+ int operate() override {
+ int r = sync_env->svc->bucket_sync->get_bucket_sync_hints(source_bucket,
+ nullptr,
+ &targets,
+ null_yield);
+ if (r < 0) {
+ ldout(sync_env->cct, 0) << "ERROR: " << __func__ << "(): failed to fetch bucket sync hints for bucket=" << source_bucket << dendl;
+ return r;
+ }
+
+ return 0;
+ }
+ };
+
+ std::shared_ptr<GetHintTargets> get_hint_targets_action;
+ std::set<rgw_bucket>::iterator hiter;
+
public:
RGWGetBucketPeersCR(RGWDataSyncEnv *_sync_env,
std::optional<rgw_bucket> _target_bucket,
{
if (target_bs) {
target_bucket = target_bs->bucket;
- sources_obj = RGWBucketSyncPeersManager::sync_sources_obj(sync_env->svc->zone, *target_bucket);
}
if (source_bs) {
source_bucket = source_bs->bucket;
pipes->clear();
}
if (target_bucket) {
- yield call(new RGWSimpleRadosReadCR<rgw_bucket_sync_sources_local_info>(sync_env->async_rados,
- sync_env->svc->sysobj,
- RGWBucketSyncPeersManager::sync_sources_obj(sync_env->svc->zone, *target_bucket),
- &sources_local_info));
- if (retcode < 0 &&
- retcode != -ENOENT) {
- return set_cr_error(retcode);
- }
-
get_policy_params.zone = nullopt;
get_policy_params.bucket = *target_bucket;
+ target_policy = make_shared<rgw_bucket_get_sync_policy_result>();
yield call(new RGWBucketGetSyncPolicyHandlerCR(sync_env->async_rados,
sync_env->store,
get_policy_params,
retcode != -ENOENT) {
return set_cr_error(retcode);
}
+
+ update_from_target_bucket_policy();
}
if (source_bucket && source_zone) {
- yield call(new RGWSimpleRadosReadCR<rgw_bucket_sync_sources_local_info>(sync_env->async_rados,
- sync_env->svc->sysobj,
- RGWBucketSyncPeersManager::sync_targets_obj(sync_env->svc->zone, *source_zone, *source_bucket),
- &targets_local_info));
- if (retcode < 0 &&
- retcode != -ENOENT) {
- return set_cr_error(retcode);
- }
-
get_policy_params.zone = source_zone;
get_policy_params.bucket = *source_bucket;
yield call(new RGWBucketGetSyncPolicyHandlerCR(sync_env->async_rados,
}
source_bucket_info = source_policy->policy_handler->get_bucket_info();
+
+ if (!target_bucket) {
+ get_hint_targets_action = make_shared<GetHintTargets>(sync_env, *source_bucket);
+
+ yield call(new RGWGenericAsyncCR(cct, sync_env->async_rados,
+ get_hint_targets_action));
+ if (retcode < 0) {
+ return set_cr_error(retcode);
+ }
+
+ /* hints might have incomplete bucket ids,
+ * in which case we need to figure out the current
+ * bucket_id
+ */
+ for (hiter = get_hint_targets_action->targets.begin();
+ hiter != get_hint_targets_action->targets.end();
+ ++hiter) {
+ ldpp_dout(sync_env->dpp, 20) << "Got sync hint for bucket=" << *source_bucket << ": " << hiter->get_key() << dendl;
+
+ get_policy_params.zone = nullopt;
+ get_policy_params.bucket = *hiter;
+ yield call(new RGWBucketGetSyncPolicyHandlerCR(sync_env->async_rados,
+ sync_env->store,
+ get_policy_params,
+ target_policy));
+ if (retcode < 0 &&
+ retcode != -ENOENT) {
+ return set_cr_error(retcode);
+ }
+ update_from_target_bucket_policy();
+ }
+ }
}
-#warning update local copy of sources and act on changes
- update_from_target_bucket_policy();
update_from_source_bucket_policy();
for (siiter = buckets_info.begin(); siiter != buckets_info.end(); ++siiter) {