out << indented{width, "zone"} << zone.id << " (" << zone.name << ")\n";
out << indented{width, "bucket"} << info.bucket << "\n\n";
- if (!info.bucket_datasync_enabled(store->svc.zone)) {
+ if (!store->ctl()->bucket->bucket_imports_data(info.bucket, null_yield)) {
out << "Sync is disabled for bucket " << info.bucket.name << '\n';
return 0;
}
+#warning need to use bucket sources
auto& zone_conn_map = store->svc()->zone->get_zone_conn_map();
if (!source_zone_id.empty()) {
auto z = zonegroup.zones.find(source_zone_id);
#include "services/svc_zone.h"
#include "services/svc_sys_obj.h"
#include "services/svc_bucket.h"
+#include "services/svc_bucket_sync.h"
#include "services/svc_meta.h"
#include "services/svc_meta_be_sobj.h"
#include "services/svc_user.h"
}
int RGWDataChangesLog::add_entry(const RGWBucketInfo& bucket_info, int shard_id) {
- if (!bucket_info.bucket_datasync_enabled(svc.zone)) {
+ if (!ctl.bucket->bucket_exports_data(bucket_info.bucket, null_yield)) {
return 0;
}
RGWBucketCtl::RGWBucketCtl(RGWSI_Zone *zone_svc,
RGWSI_Bucket *bucket_svc,
+ RGWSI_Bucket_Sync *bucket_sync_svc,
RGWSI_BucketIndex *bi_svc) : cct(zone_svc->ctx())
{
svc.zone = zone_svc;
svc.bucket = bucket_svc;
+ svc.bucket_sync = bucket_sync_svc;
svc.bi = bi_svc;
}
return ctl.user->flush_bucket_stats(user_id, *pent);
}
+int RGWBucketCtl::bucket_exports_data(const rgw_bucket& bucket,
+ optional_yield y)
+{
+
+ RGWBucketSyncPolicyHandlerRef handler;
+
+ int r = call([&](RGWSI_Bucket_X_Ctx& ctx) {
+ return svc.bucket_sync->get_policy_handler(ctx.bi, bucket, &handler, y);
+ });
+ if (r < 0) {
+ ldout(cct, 20) << __func__ << "(): failed to read bucket stats (r=" << r << ")" << dendl;
+ return r;
+ }
+
+ return handler->bucket_exports_data();
+}
+
+int RGWBucketCtl::bucket_imports_data(const rgw_bucket& bucket,
+ optional_yield y)
+{
+
+ RGWBucketSyncPolicyHandlerRef handler;
+
+ int r = call([&](RGWSI_Bucket_X_Ctx& ctx) {
+ return svc.bucket_sync->get_policy_handler(ctx.bi, bucket, &handler, y);
+ });
+ if (r < 0) {
+ ldout(cct, 20) << __func__ << "(): failed to read bucket stats (r=" << r << ")" << dendl;
+ return r;
+ }
+
+ return handler->bucket_imports_data();
+}
+
RGWBucketMetadataHandlerBase *RGWBucketMetaHandlerAllocator::alloc()
{
return new RGWBucketMetadataHandler();
struct Svc {
RGWSI_Zone *zone{nullptr};
RGWSI_Bucket *bucket{nullptr};
+ RGWSI_Bucket_Sync *bucket_sync{nullptr};
RGWSI_BucketIndex *bi{nullptr};
} svc;
public:
RGWBucketCtl(RGWSI_Zone *zone_svc,
RGWSI_Bucket *bucket_svc,
+ RGWSI_Bucket_Sync *bucket_sync_svc,
RGWSI_BucketIndex *bi_svc);
void init(RGWUserCtl *user_ctl,
int sync_user_stats(const rgw_user& user_id, const RGWBucketInfo& bucket_info,
RGWBucketEnt* pent = nullptr);
+ /* bucket sync */
+ int bucket_exports_data(const rgw_bucket& bucket,
+ optional_yield y);
+ int bucket_imports_data(const rgw_bucket& bucket,
+ optional_yield y);
+
private:
int convert_old_bucket_info(RGWSI_Bucket_X_Ctx& ctx,
const rgw_bucket& bucket,
return 0;
}
-#if 0
-vector<rgw_bucket_sync_pipe> rgw_bucket_sync_target_info::build_pipes(const rgw_bucket& source_bs)
+bool RGWBucketSyncPolicyHandler::bucket_exports_data() const
{
- vector<rgw_bucket_sync_pipe> pipes;
-
- for (auto t : targets) {
- rgw_bucket_sync_pipe pipe;
- pipe.source_bs = source_bs;
- pipe.source_prefix = t.source_prefix;
- pipe.dest_prefix = t.dest_prefix;
- pipes.push_back(std::move(pipe));
+ if (bucket_is_sync_source()) {
+ return true;
}
- return pipes;
+
+ return (zone_svc->need_to_log_data() &&
+ bucket_info.datasync_flag_enabled());
+}
+
+bool RGWBucketSyncPolicyHandler::bucket_imports_data() const
+{
+ return bucket_is_sync_target();
}
-#endif
int init();
+ const RGWBucketInfo& get_bucket_info() const {
+ return bucket_info;
+ }
+
bool zone_is_source(const string& zone_id) const {
return sources.find(zone_id) != sources.end();
}
+
+ bool bucket_is_sync_source() const {
+ return !targets.empty();
+ }
+
+ bool bucket_is_sync_target() const {
+ return !sources.empty();
+ }
+
+ bool bucket_exports_data() const;
+ bool bucket_imports_data() const;
};
return sync_policy->empty();
}
-bool RGWBucketInfo::bucket_is_sync_source(const string& zone_id) const
-{
- return (sync_policy &&
- sync_policy->zone_is_source(zone_id));
-}
-
-bool RGWBucketInfo::bucket_datasync_enabled(const RGWSI_Zone *zone_svc) const
-{
- if (bucket_is_sync_source(zone_svc->zone_id())) {
- return true;
- }
-
- return (zone_svc->need_to_log_data() &&
- datasync_flag_enabled());
-}
void set_sync_policy(rgw_sync_policy_info&& policy);
bool empty_sync_policy() const;
- bool bucket_is_sync_source(const string& zone_id) const;
- bool bucket_datasync_enabled(const RGWSI_Zone *zone_svc) const;
RGWBucketInfo();
~RGWBucketInfo();
user.reset(new RGWUserCtl(svc.zone, svc.user, (RGWUserMetadataHandler *)meta.user.get()));
bucket.reset(new RGWBucketCtl(svc.zone,
svc.bucket,
+ svc.bucket_sync,
svc.bi));
otp.reset(new RGWOTPCtl(svc.zone, svc.otp));
int RGWSI_BucketIndex_RADOS::handle_overwrite(const RGWBucketInfo& info,
const RGWBucketInfo& orig_info)
{
- bool new_sync_enabled = info.bucket_datasync_enabled(svc.zone);
- bool old_sync_enabled = orig_info.bucket_datasync_enabled(svc.zone);
+#warning needs to be done differently
+ bool new_sync_enabled = info.datasync_flag_enabled();
+ bool old_sync_enabled = orig_info.datasync_flag_enabled();
if (old_sync_enabled != new_sync_enabled) {
int shards_num = info.num_shards? info.num_shards : 1;
#include "svc_bucket_types.h"
class RGWBucketSyncPolicyHandler;
+using RGWBucketSyncPolicyHandlerRef = std::shared_ptr<RGWBucketSyncPolicyHandler>;
class RGWSI_Bucket_Sync : public RGWServiceInstance
virtual int get_policy_handler(RGWSI_Bucket_BI_Ctx& ctx,
const rgw_bucket& bucket,
- std::shared_ptr<RGWBucketSyncPolicyHandler> *handler,
+ RGWBucketSyncPolicyHandlerRef *handler,
optional_yield y) = 0;
};
int RGWSI_Bucket_Sync_SObj::get_policy_handler(RGWSI_Bucket_BI_Ctx& ctx,
const rgw_bucket& bucket,
- std::shared_ptr<RGWBucketSyncPolicyHandler> *handler,
+ RGWBucketSyncPolicyHandlerRef *handler,
optional_yield y)
{
string key = RGWSI_Bucket::get_bi_meta_key(bucket);
#include "rgw/rgw_service.h"
#include "svc_meta_be.h"
-#include "svc_bucket_types.h"
#include "svc_bucket_sync.h"
class RGWSI_Zone;
template <class T>
class RGWChainedCacheImpl;
-class RGWBucketSyncPolicyHandler;
-
class RGWSI_Bucket_Sync_SObj : public RGWSI_Bucket_Sync
{
struct bucket_sync_policy_cache_entry {
int get_policy_handler(RGWSI_Bucket_BI_Ctx& ctx,
const rgw_bucket& bucket,
- std::shared_ptr<RGWBucketSyncPolicyHandler> *handler,
+ RGWBucketSyncPolicyHandlerRef *handler,
optional_yield y) override;
};