return ctl.user->flush_bucket_stats(user_id, *pent);
}
+int RGWBucketCtl::get_sync_policy_handler(const rgw_bucket& bucket,
+ RGWBucketSyncPolicyHandlerRef *phandler,
+ optional_yield y)
+{
+ int r = call([&](RGWSI_Bucket_X_Ctx& ctx) {
+ return svc.bucket_sync->get_policy_handler(ctx.bi, bucket, phandler, y);
+ });
+ if (r < 0) {
+ ldout(cct, 20) << __func__ << "(): failed to get policy handler for bucket=" << bucket << " (r=" << r << ")" << dendl;
+ return r;
+ }
+ return 0;
+}
+
int RGWBucketCtl::bucket_exports_data(const rgw_bucket& bucket,
optional_yield y)
{
RGWBucketSyncPolicyHandlerRef handler;
- int r = call([&](RGWSI_Bucket_X_Ctx& ctx) {
- return svc.bucket_sync->get_policy_handler(ctx.bi, bucket, &handler, y);
- });
+ int r = get_sync_policy_handler(bucket, &handler, y);
if (r < 0) {
- ldout(cct, 20) << __func__ << "(): failed to read bucket stats (r=" << r << ")" << dendl;
return r;
}
RGWBucketSyncPolicyHandlerRef handler;
- int r = call([&](RGWSI_Bucket_X_Ctx& ctx) {
- return svc.bucket_sync->get_policy_handler(ctx.bi, bucket, &handler, y);
- });
+ int r = get_sync_policy_handler(bucket, &handler, y);
if (r < 0) {
- ldout(cct, 20) << __func__ << "(): failed to read bucket stats (r=" << r << ")" << dendl;
return r;
}
#include "rgw_rest_conn.h"
#include "rgw_cr_rados.h"
#include "rgw_cr_rest.h"
+#include "rgw_cr_tools.h"
#include "rgw_http_client.h"
#include "rgw_bucket.h"
#include "rgw_metadata.h"
class RGWBucketSyncSourcesManager {
public:
static string sync_sources_oid(const rgw_bucket bucket) {
- return bucket_sync_sources_oid_prefix + "." + bucket.to_str();
+ return bucket_sync_sources_oid_prefix + "." + bucket.get_key();
}
static rgw_raw_obj sync_sources_obj(RGWSI_Zone *zone_svc, const rgw_bucket& bucket) {
- return rgw_raw_obj(zone_svc->get_zone_params().log_pool, status_oid(bucket)),
+ return rgw_raw_obj(zone_svc->get_zone_params().log_pool, sync_sources_oid(bucket));
}
};
}
void dump(ceph::Formatter *f) const {
- encode_json("sources", type, f);
+ encode_json("sources", sources, f);
}
};
WRITE_CLASS_ENCODER(rgw_bucket_sync_sources_local_info)
class RGWReadBucketSourcesInfoCR : public RGWCoroutine {
+ RGWDataSyncEnv *sync_env;
rgw_bucket bucket;
+ RGWBucketInfo bucket_info;
+
rgw_raw_obj sources_obj;
+ rgw_bucket_sync_sources_local_info sources_local_info;
+ rgw_bucket_sync_sources_local_info expected_local_info;
+
+ rgw_bucket_get_sync_policy_params get_policy_params;
+ std::shared_ptr<rgw_bucket_get_sync_policy_result> get_policy_result;
+
RGWSyncTraceNodeRef tn;
public:
- RGWReadBucketSourcesInfoCR(RGWSI_Zone *_zone_svc,
- const rgw_bucket& _bucket,
- const RGWSyncTraceNodeRef& _tn_parent)
- : RGWCoroutine(_zone_svc->cct),
+ RGWReadBucketSourcesInfoCR(RGWDataSyncEnv *_sync_env,
+ const rgw_bucket& _bucket,
+ const RGWSyncTraceNodeRef& _tn_parent)
+ : RGWCoroutine(_sync_env->cct),
+ sync_env(_sync_env),
bucket(_bucket),
- source_obj(RGWBucketSyncSourcesManager::sync_sources_obj(bucket)),
+ sources_obj(RGWBucketSyncSourcesManager::sync_sources_obj(sync_env->svc->zone, bucket)),
+ get_policy_result(make_shared<rgw_bucket_get_sync_policy_result>()),
tn(sync_env->sync_tracer->add_node(_tn_parent, "read_bucket_sources",
SSTR(bucket))) {
}
{
reenter(this) {
yield call(new RGWSimpleRadosReadCR<rgw_bucket_sync_sources_local_info>(sync_env->async_rados,
- sync_env->svc.sysobj,
- RGWBucketSyncSourcesManager::sync_sources_obj(bucket),
- &pinfo));
+ sync_env->svc->sysobj,
+ RGWBucketSyncSourcesManager::sync_sources_obj(sync_env->svc->zone, bucket),
+ &sources_local_info));
+ if (retcode < 0 &&
+ retcode != -ENOENT) {
+ return set_cr_error(retcode);
+ }
+
+ get_policy_params.bucket = bucket;
+ yield call(new RGWBucketGetSyncPolicyHandlerCR(sync_env->async_rados,
+ sync_env->store,
+ get_policy_params,
+ get_policy_result));
+ if (retcode < 0 &&
+ retcode != -ENOENT) {
+ return set_cr_error(retcode);
+ }
+
return set_cr_done();
}
+
+ return 0;
}
class RGWRunBucketSourcesSyncCR : public RGWCoroutine {
- RGWDataSyncCtx *sc;
RGWDataSyncEnv *sync_env;
rgw_bucket bucket;
rgw_sync_source source;
RGWSyncTraceNodeRef tn;
public:
- RGWRunBucketSourcesSyncCR(RGWSI_Zone *_zone_svc,
+ RGWRunBucketSourcesSyncCR(RGWDataSyncEnv *_sync_env,
const rgw_bucket& _bucket,
const RGWSyncTraceNodeRef& _tn_parent)
- : RGWCoroutine(_sc->cct),
- sc(_sc), sync_env(_sc->env),
+ : RGWCoroutine(_sync_env->cct),
+ sync_env(_sync_env),
bucket(_bucket),
- source_obj(RGWBucketSyncSourcesManager::sync_sources_obj(bucket)),
+ sources_obj(RGWBucketSyncSourcesManager::sync_sources_obj(_sync_env->svc->zone, bucket)),
tn(sync_env->sync_tracer->add_node(_tn_parent, "bucket_sync_sources",
SSTR(bucket))) {
}
set_status("acquiring sync lock");
auto store = sync_env->store;
lease_cr.reset(new RGWContinuousLeaseCR(sync_env->async_rados, store,
- RGWBucketSyncSourcesManager::sync_sources_obj(bucket),
+ sources_obj,
"sync_lock",
cct->_conf->rgw_sync_lease_period,
this));
}
tn->log(10, "took lease");
- yield call(new RGWReadBucketSourcesInfoCR(sc, bucket, &info));
+ yield call(new RGWReadBucketSourcesInfoCR(sync_env, bucket, tn, &info));
if (retcode < 0 && retcode != -ENOENT) {
tn->log(0, "ERROR: failed to read sync status for bucket");
lease_cr->go_down();