Generate a diff off old and new bucket infos.
Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
objv_tracker = bci.info.objv_tracker;
int ret = bihandler->svc.bi->init_index(bci.info);
- if (ret < 0)
+ if (ret < 0) {
return ret;
+ }
return STATUS_APPLIED;
}
}
};
-
RGWBucketCtl::RGWBucketCtl(RGWSI_Zone *zone_svc,
RGWSI_Bucket *bucket_svc,
RGWSI_Bucket_Sync *bucket_sync_svc,
virtual void init(RGWSI_Zone *zone_svc,
RGWSI_Bucket *bucket_svc,
RGWSI_BucketIndex *bi_svc) = 0;
-
};
class RGWBucketMetaHandlerAllocator {
RGWMetadataHandler *alloc_bucket_meta_handler() override {
return RGWArchiveBucketMetaHandlerAllocator::alloc();
}
- RGWMetadataHandler *alloc_bucket_instance_meta_handler() override {
+ RGWBucketInstanceMetadataHandlerBase *alloc_bucket_instance_meta_handler() override {
return RGWArchiveBucketInstanceMetaHandlerAllocator::alloc();
}
};
rgw_sync_pipe_info_set::iterator siter;
rgw_bucket_sync_sources_local_info sources_local_info;
- rgw_bucket_sync_sources_local_info expected_local_info;
+ rgw_bucket_sync_sources_local_info targets_local_info;
rgw_bucket_get_sync_policy_params get_policy_params;
std::shared_ptr<rgw_bucket_get_sync_policy_result> source_policy;
yield call(new RGWSimpleRadosReadCR<rgw_bucket_sync_sources_local_info>(sync_env->async_rados,
sync_env->svc->sysobj,
RGWBucketSyncPeersManager::sync_targets_obj(sync_env->svc->zone, *source_zone, *source_bucket),
- &sources_local_info));
+ &targets_local_info));
if (retcode < 0 &&
retcode != -ENOENT) {
return set_cr_error(retcode);
bilog_rados->init(bi_rados.get());
bucket_sobj->init(zone.get(), sysobj.get(), sysobj_cache.get(),
bi_rados.get(), meta.get(), meta_be_sobj.get(),
- sync_modules.get());
+ sync_modules.get(), bucket_sync_sobj.get());
bucket_sync_sobj->init(zone.get(), sysobj_cache.get(),
bucket_sobj.get());
cls->init(zone.get(), rados.get());
return RGWBucketMetaHandlerAllocator::alloc();
}
-RGWMetadataHandler *RGWSyncModuleInstance::alloc_bucket_instance_meta_handler()
+RGWBucketInstanceMetadataHandlerBase *RGWSyncModuleInstance::alloc_bucket_instance_meta_handler()
{
return RGWBucketInstanceMetaHandlerAllocator::alloc();
}
class RGWRESTMgr;
class RGWMetadataHandler;
+class RGWBucketInstanceMetadataHandlerBase;
class RGWSyncModuleInstance {
public:
return false;
}
virtual RGWMetadataHandler *alloc_bucket_meta_handler();
- virtual RGWMetadataHandler *alloc_bucket_instance_meta_handler();
+ virtual RGWBucketInstanceMetadataHandlerBase *alloc_bucket_instance_meta_handler();
// indication whether the sync module start with full sync (default behavior)
// incremental sync would follow anyway
}
+void rgw_sync_bucket_pipes::get_potential_related_buckets(const rgw_bucket& bucket,
+ std::set<rgw_bucket> *sources,
+ std::set<rgw_bucket> *dests) const
+{
+ if (dest.match_bucket(bucket)) {
+ auto expanded_sources = source.expand();
+
+ for (auto& s : expanded_sources) {
+ sources->insert(*s.bucket);
+ }
+ }
+
+ if (source.match_bucket(bucket)) {
+ auto expanded_dests = dest.expand();
+
+ for (auto& d : expanded_dests) {
+ dests->insert(*d.bucket);
+ }
+ }
+}
+
bool rgw_sync_data_flow_group::find_symmetrical(const string& flow_id, bool create, rgw_sync_symmetric_group **flow_group)
{
if (!symmetrical) {
}
}
}
+
+void rgw_sync_policy_group::get_potential_related_buckets(const rgw_bucket& bucket,
+ std::set<rgw_bucket> *sources,
+ std::set<rgw_bucket> *dests) const
+{
+ for (auto& pipe : pipes) {
+ pipe.get_potential_related_buckets(bucket, sources, dests);
+ }
+}
+
+void rgw_sync_policy_info::get_potential_related_buckets(const rgw_bucket& bucket,
+ std::set<rgw_bucket> *sources,
+ std::set<rgw_bucket> *dests) const
+{
+ for (auto& entry : groups) {
+ auto& group = entry.second;
+ group.get_potential_related_buckets(bucket, sources, dests);
+ }
+}
void decode_json(JSONObj *obj);
std::vector<rgw_sync_bucket_pipe> expand() const;
+
+ void get_potential_related_buckets(const rgw_bucket& bucket,
+ std::set<rgw_bucket> *sources,
+ std::set<rgw_bucket> *dests) const;
};
WRITE_CLASS_ENCODER(rgw_sync_bucket_pipes)
bool find_pipe(const string& pipe_id, bool create, rgw_sync_bucket_pipes **pipe);
void remove_pipe(const string& pipe_id);
+
+ void get_potential_related_buckets(const rgw_bucket& bucket,
+ std::set<rgw_bucket> *sources,
+ std::set<rgw_bucket> *dests) const;
+
};
WRITE_CLASS_ENCODER(rgw_sync_policy_group)
bool empty() const {
return groups.empty();
}
+
+ void get_potential_related_buckets(const rgw_bucket& bucket,
+ std::set<rgw_bucket> *sources,
+ std::set<rgw_bucket> *dests) const;
};
WRITE_CLASS_ENCODER(rgw_sync_policy_info)
void RGWSI_Bucket_SObj::init(RGWSI_Zone *_zone_svc, RGWSI_SysObj *_sysobj_svc,
RGWSI_SysObj_Cache *_cache_svc, RGWSI_BucketIndex *_bi,
RGWSI_Meta *_meta_svc, RGWSI_MetaBackend *_meta_be_svc,
- RGWSI_SyncModules *_sync_modules_svc)
+ RGWSI_SyncModules *_sync_modules_svc,
+ RGWSI_Bucket_Sync *_bucket_sync_svc)
{
svc.bucket = this;
svc.zone = _zone_svc;
svc.meta = _meta_svc;
svc.meta_be = _meta_be_svc;
svc.sync_modules = _sync_modules_svc;
+ svc.bucket_sync = _bucket_sync_svc;
}
int RGWSI_Bucket_SObj::do_start()
&ci, refresh_version, y);
*info = e.info;
+#warning FIXME: use unique_ptr and implement RGWBucketInfo copy constructor, or other better solution
+ if (info->sync_policy) { /* fork policy off cache */
+ auto policy = make_shared<rgw_sync_policy_info>(*info->sync_policy);
+ info->sync_policy = std::const_pointer_cast<const rgw_sync_policy_info>(policy);
+ }
+
if (ret < 0) {
if (ret != -ENOENT) {
lderr(cct) << "ERROR: do_read_bucket_instance_info failed: " << ret << dendl;
RGWSI_MBSObj_PutParams params(bl, pattrs, mtime, exclusive);
int ret = svc.meta_be->put(ctx.get(), key, params, &info.objv_tracker, y);
- if (ret == -EEXIST) {
+
+ if (ret >= 0) {
+ int r = svc.bucket_sync->handle_bi_update(info,
+ orig_info.value_or(nullptr));
+ if (r < 0) {
+ return r;
+ }
+ } else if (ret == -EEXIST) {
/* well, if it's exclusive we shouldn't overwrite it, because we might race with another
* bucket operation on this specific bucket (e.g., being synced from the master), but
* since bucket instace meta object is unique for this specific bucket instace, we don't
class RGWSI_SysObj_Cache;
class RGWSI_Meta;
class RGWSI_SyncModules;
+class RGWSI_Bucket_Sync;
struct rgw_cache_entry_info;
RGWSI_Meta *meta{nullptr};
RGWSI_MetaBackend *meta_be{nullptr};
RGWSI_SyncModules *sync_modules{nullptr};
+ RGWSI_Bucket_Sync *bucket_sync{nullptr};
} svc;
RGWSI_Bucket_SObj(CephContext *cct);
RGWSI_BucketIndex *_bi,
RGWSI_Meta *_meta_svc,
RGWSI_MetaBackend *_meta_be_svc,
- RGWSI_SyncModules *_sync_modules);
+ RGWSI_SyncModules *_sync_modules_svc,
+ RGWSI_Bucket_Sync *_bucket_sync_svc);
int read_bucket_entrypoint_info(RGWSI_Bucket_EP_Ctx& ctx,
std::optional<rgw_bucket> bucket,
RGWBucketSyncPolicyHandlerRef *handler,
optional_yield y) = 0;
+ virtual int handle_bi_update(RGWBucketInfo& bucket_info,
+ RGWBucketInfo *orig_bucket_info) = 0;
};
return 0;
}
+
int RGWSI_Bucket_Sync_SObj::get_policy_handler(RGWSI_Bucket_BI_Ctx& ctx,
std::optional<string> zone,
std::optional<rgw_bucket> _bucket,
return 0;
}
+static void diff_sets(std::set<rgw_bucket>& orig_set,
+ std::set<rgw_bucket>& new_set,
+ vector<rgw_bucket> *added,
+ vector<rgw_bucket> *removed)
+{
+ auto oiter = orig_set.begin();
+ auto niter = new_set.begin();
+
+ while (oiter != orig_set.end() &&
+ niter != new_set.end()) {
+ if (*oiter == *niter) {
+ ++oiter;
+ ++niter;
+ continue;
+ }
+ while (*oiter < *niter) {
+ removed->push_back(*oiter);
+ ++oiter;
+ }
+ while (*niter < *oiter) {
+ added->push_back(*niter);
+ ++niter;
+ }
+ }
+ for (; oiter != orig_set.end(); ++oiter) {
+ removed->push_back(*oiter);
+ }
+ for (; niter != new_set.end(); ++niter) {
+ added->push_back(*niter);
+ }
+}
+
+int RGWSI_Bucket_Sync_SObj::handle_bi_update(RGWBucketInfo& bucket_info,
+ RGWBucketInfo *orig_bucket_info)
+{
+ std::set<rgw_bucket> orig_sources;
+ std::set<rgw_bucket> orig_dests;
+
+ if (orig_bucket_info &&
+ orig_bucket_info->sync_policy) {
+ orig_bucket_info->sync_policy->get_potential_related_buckets(bucket_info.bucket,
+ &orig_sources,
+ &orig_dests);
+ }
+
+ std::set<rgw_bucket> sources;
+ std::set<rgw_bucket> dests;
+ if (bucket_info.sync_policy) {
+ bucket_info.sync_policy->get_potential_related_buckets(bucket_info.bucket,
+ &sources,
+ &dests);
+ }
+
+ std::vector<rgw_bucket> removed_sources;
+ std::vector<rgw_bucket> added_sources;
+ diff_sets(orig_sources, sources, &added_sources, &removed_sources);
+ ldout(cct, 20) << __func__ << "(): bucket=" << bucket_info.bucket << ": orig_sources=" << orig_sources << " new_sources=" << sources << dendl;
+ ldout(cct, 20) << __func__ << "(): bucket=" << bucket_info.bucket << ": potential sources added=" << added_sources << " removed=" << removed_sources << dendl;
+
+ std::vector<rgw_bucket> removed_dests;
+ std::vector<rgw_bucket> added_dests;
+ diff_sets(orig_dests, dests, &added_dests, &removed_dests);
+ ldout(cct, 20) << __func__ << "(): bucket=" << bucket_info.bucket << ": orig_dests=" << orig_dests << " new_dests=" << dests << dendl;
+ ldout(cct, 20) << __func__ << "(): bucket=" << bucket_info.bucket << ": potential dests added=" << added_dests << " removed=" << removed_dests << dendl;
+
+ return 0;
+
+}
unique_ptr<RGWChainedCacheImpl_bucket_sync_policy_cache_entry> sync_policy_cache;
int do_start() override;
-
public:
struct Svc {
RGWSI_Zone *zone{nullptr};
std::optional<rgw_bucket> bucket,
RGWBucketSyncPolicyHandlerRef *handler,
optional_yield y) override;
+
+ int handle_bi_update(RGWBucketInfo& bucket_info,
+ RGWBucketInfo *orig_bucket_info) override;
};