From: Yehuda Sadeh Date: Mon, 4 Nov 2019 23:20:53 +0000 (-0800) Subject: rgw: identify potential related (for sync) buckets on bucket update X-Git-Tag: v15.1.0~22^2~72 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=96d34a3a8c288791fd4c045f404e3f3a37b9822c;p=ceph.git rgw: identify potential related (for sync) buckets on bucket update Generate a diff off old and new bucket infos. Signed-off-by: Yehuda Sadeh --- diff --git a/src/rgw/rgw_bucket.cc b/src/rgw/rgw_bucket.cc index ff2829b60696..16adc9df3b4f 100644 --- a/src/rgw/rgw_bucket.cc +++ b/src/rgw/rgw_bucket.cc @@ -3056,8 +3056,9 @@ int RGWMetadataHandlerPut_BucketInstance::put_post() objv_tracker = bci.info.objv_tracker; int ret = bihandler->svc.bi->init_index(bci.info); - if (ret < 0) + if (ret < 0) { return ret; + } return STATUS_APPLIED; } @@ -3072,7 +3073,6 @@ public: } }; - RGWBucketCtl::RGWBucketCtl(RGWSI_Zone *zone_svc, RGWSI_Bucket *bucket_svc, RGWSI_Bucket_Sync *bucket_sync_svc, diff --git a/src/rgw/rgw_bucket.h b/src/rgw/rgw_bucket.h index 520a1ad80ce8..50120cd6ebe5 100644 --- a/src/rgw/rgw_bucket.h +++ b/src/rgw/rgw_bucket.h @@ -192,7 +192,6 @@ public: virtual void init(RGWSI_Zone *zone_svc, RGWSI_Bucket *bucket_svc, RGWSI_BucketIndex *bi_svc) = 0; - }; class RGWBucketMetaHandlerAllocator { diff --git a/src/rgw/rgw_data_sync.cc b/src/rgw/rgw_data_sync.cc index d0011f848c25..2bc6594e4c80 100644 --- a/src/rgw/rgw_data_sync.cc +++ b/src/rgw/rgw_data_sync.cc @@ -2020,7 +2020,7 @@ public: RGWMetadataHandler *alloc_bucket_meta_handler() override { return RGWArchiveBucketMetaHandlerAllocator::alloc(); } - RGWMetadataHandler *alloc_bucket_instance_meta_handler() override { + RGWBucketInstanceMetadataHandlerBase *alloc_bucket_instance_meta_handler() override { return RGWArchiveBucketInstanceMetaHandlerAllocator::alloc(); } }; @@ -3638,7 +3638,7 @@ class RGWGetBucketPeersCR : public RGWCoroutine { rgw_sync_pipe_info_set::iterator siter; rgw_bucket_sync_sources_local_info sources_local_info; - rgw_bucket_sync_sources_local_info expected_local_info; + rgw_bucket_sync_sources_local_info targets_local_info; rgw_bucket_get_sync_policy_params get_policy_params; std::shared_ptr source_policy; @@ -4009,7 +4009,7 @@ int RGWGetBucketPeersCR::operate() yield call(new RGWSimpleRadosReadCR(sync_env->async_rados, sync_env->svc->sysobj, RGWBucketSyncPeersManager::sync_targets_obj(sync_env->svc->zone, *source_zone, *source_bucket), - &sources_local_info)); + &targets_local_info)); if (retcode < 0 && retcode != -ENOENT) { return set_cr_error(retcode); diff --git a/src/rgw/rgw_service.cc b/src/rgw/rgw_service.cc index 0909188ae149..c81817f546c2 100644 --- a/src/rgw/rgw_service.cc +++ b/src/rgw/rgw_service.cc @@ -81,7 +81,7 @@ int RGWServices_Def::init(CephContext *cct, bilog_rados->init(bi_rados.get()); bucket_sobj->init(zone.get(), sysobj.get(), sysobj_cache.get(), bi_rados.get(), meta.get(), meta_be_sobj.get(), - sync_modules.get()); + sync_modules.get(), bucket_sync_sobj.get()); bucket_sync_sobj->init(zone.get(), sysobj_cache.get(), bucket_sobj.get()); cls->init(zone.get(), rados.get()); diff --git a/src/rgw/rgw_sync_module.cc b/src/rgw/rgw_sync_module.cc index 269316616818..7100646098f9 100644 --- a/src/rgw/rgw_sync_module.cc +++ b/src/rgw/rgw_sync_module.cc @@ -22,7 +22,7 @@ RGWMetadataHandler *RGWSyncModuleInstance::alloc_bucket_meta_handler() return RGWBucketMetaHandlerAllocator::alloc(); } -RGWMetadataHandler *RGWSyncModuleInstance::alloc_bucket_instance_meta_handler() +RGWBucketInstanceMetadataHandlerBase *RGWSyncModuleInstance::alloc_bucket_instance_meta_handler() { return RGWBucketInstanceMetaHandlerAllocator::alloc(); } diff --git a/src/rgw/rgw_sync_module.h b/src/rgw/rgw_sync_module.h index a82ba21d1c98..dfab8ee0e426 100644 --- a/src/rgw/rgw_sync_module.h +++ b/src/rgw/rgw_sync_module.h @@ -39,6 +39,7 @@ public: class RGWRESTMgr; class RGWMetadataHandler; +class RGWBucketInstanceMetadataHandlerBase; class RGWSyncModuleInstance { public: @@ -52,7 +53,7 @@ public: return false; } virtual RGWMetadataHandler *alloc_bucket_meta_handler(); - virtual RGWMetadataHandler *alloc_bucket_instance_meta_handler(); + virtual RGWBucketInstanceMetadataHandlerBase *alloc_bucket_instance_meta_handler(); // indication whether the sync module start with full sync (default behavior) // incremental sync would follow anyway diff --git a/src/rgw/rgw_sync_policy.cc b/src/rgw/rgw_sync_policy.cc index d2485aa25625..5e8d24f6f8fb 100644 --- a/src/rgw/rgw_sync_policy.cc +++ b/src/rgw/rgw_sync_policy.cc @@ -168,6 +168,27 @@ std::vector rgw_sync_bucket_pipes::expand() const } +void rgw_sync_bucket_pipes::get_potential_related_buckets(const rgw_bucket& bucket, + std::set *sources, + std::set *dests) const +{ + if (dest.match_bucket(bucket)) { + auto expanded_sources = source.expand(); + + for (auto& s : expanded_sources) { + sources->insert(*s.bucket); + } + } + + if (source.match_bucket(bucket)) { + auto expanded_dests = dest.expand(); + + for (auto& d : expanded_dests) { + dests->insert(*d.bucket); + } + } +} + bool rgw_sync_data_flow_group::find_symmetrical(const string& flow_id, bool create, rgw_sync_symmetric_group **flow_group) { if (!symmetrical) { @@ -319,3 +340,22 @@ void rgw_sync_policy_group::remove_pipe(const string& pipe_id) } } } + +void rgw_sync_policy_group::get_potential_related_buckets(const rgw_bucket& bucket, + std::set *sources, + std::set *dests) const +{ + for (auto& pipe : pipes) { + pipe.get_potential_related_buckets(bucket, sources, dests); + } +} + +void rgw_sync_policy_info::get_potential_related_buckets(const rgw_bucket& bucket, + std::set *sources, + std::set *dests) const +{ + for (auto& entry : groups) { + auto& group = entry.second; + group.get_potential_related_buckets(bucket, sources, dests); + } +} diff --git a/src/rgw/rgw_sync_policy.h b/src/rgw/rgw_sync_policy.h index 979785deb30f..1233084e2ff6 100644 --- a/src/rgw/rgw_sync_policy.h +++ b/src/rgw/rgw_sync_policy.h @@ -309,6 +309,10 @@ struct rgw_sync_bucket_pipes { void decode_json(JSONObj *obj); std::vector expand() const; + + void get_potential_related_buckets(const rgw_bucket& bucket, + std::set *sources, + std::set *dests) const; }; WRITE_CLASS_ENCODER(rgw_sync_bucket_pipes) @@ -412,6 +416,11 @@ struct rgw_sync_policy_group { bool find_pipe(const string& pipe_id, bool create, rgw_sync_bucket_pipes **pipe); void remove_pipe(const string& pipe_id); + + void get_potential_related_buckets(const rgw_bucket& bucket, + std::set *sources, + std::set *dests) const; + }; WRITE_CLASS_ENCODER(rgw_sync_policy_group) @@ -436,6 +445,10 @@ struct rgw_sync_policy_info { bool empty() const { return groups.empty(); } + + void get_potential_related_buckets(const rgw_bucket& bucket, + std::set *sources, + std::set *dests) const; }; WRITE_CLASS_ENCODER(rgw_sync_policy_info) diff --git a/src/rgw/services/svc_bucket_sobj.cc b/src/rgw/services/svc_bucket_sobj.cc index e59ecd60184b..0b87485fdebf 100644 --- a/src/rgw/services/svc_bucket_sobj.cc +++ b/src/rgw/services/svc_bucket_sobj.cc @@ -145,7 +145,8 @@ RGWSI_Bucket_SObj::~RGWSI_Bucket_SObj() { void RGWSI_Bucket_SObj::init(RGWSI_Zone *_zone_svc, RGWSI_SysObj *_sysobj_svc, RGWSI_SysObj_Cache *_cache_svc, RGWSI_BucketIndex *_bi, RGWSI_Meta *_meta_svc, RGWSI_MetaBackend *_meta_be_svc, - RGWSI_SyncModules *_sync_modules_svc) + RGWSI_SyncModules *_sync_modules_svc, + RGWSI_Bucket_Sync *_bucket_sync_svc) { svc.bucket = this; svc.zone = _zone_svc; @@ -155,6 +156,7 @@ void RGWSI_Bucket_SObj::init(RGWSI_Zone *_zone_svc, RGWSI_SysObj *_sysobj_svc, svc.meta = _meta_svc; svc.meta_be = _meta_be_svc; svc.sync_modules = _sync_modules_svc; + svc.bucket_sync = _bucket_sync_svc; } int RGWSI_Bucket_SObj::do_start() @@ -298,6 +300,12 @@ int RGWSI_Bucket_SObj::read_bucket_instance_info(RGWSI_Bucket_BI_Ctx& ctx, &ci, refresh_version, y); *info = e.info; +#warning FIXME: use unique_ptr and implement RGWBucketInfo copy constructor, or other better solution + if (info->sync_policy) { /* fork policy off cache */ + auto policy = make_shared(*info->sync_policy); + info->sync_policy = std::const_pointer_cast(policy); + } + if (ret < 0) { if (ret != -ENOENT) { lderr(cct) << "ERROR: do_read_bucket_instance_info failed: " << ret << dendl; @@ -522,7 +530,14 @@ int RGWSI_Bucket_SObj::store_bucket_instance_info(RGWSI_Bucket_BI_Ctx& ctx, RGWSI_MBSObj_PutParams params(bl, pattrs, mtime, exclusive); int ret = svc.meta_be->put(ctx.get(), key, params, &info.objv_tracker, y); - if (ret == -EEXIST) { + + if (ret >= 0) { + int r = svc.bucket_sync->handle_bi_update(info, + orig_info.value_or(nullptr)); + if (r < 0) { + return r; + } + } else if (ret == -EEXIST) { /* well, if it's exclusive we shouldn't overwrite it, because we might race with another * bucket operation on this specific bucket (e.g., being synced from the master), but * since bucket instace meta object is unique for this specific bucket instace, we don't diff --git a/src/rgw/services/svc_bucket_sobj.h b/src/rgw/services/svc_bucket_sobj.h index 868fa537f1c0..80695ba2f738 100644 --- a/src/rgw/services/svc_bucket_sobj.h +++ b/src/rgw/services/svc_bucket_sobj.h @@ -28,6 +28,7 @@ class RGWSI_SysObj; class RGWSI_SysObj_Cache; class RGWSI_Meta; class RGWSI_SyncModules; +class RGWSI_Bucket_Sync; struct rgw_cache_entry_info; @@ -75,6 +76,7 @@ public: RGWSI_Meta *meta{nullptr}; RGWSI_MetaBackend *meta_be{nullptr}; RGWSI_SyncModules *sync_modules{nullptr}; + RGWSI_Bucket_Sync *bucket_sync{nullptr}; } svc; RGWSI_Bucket_SObj(CephContext *cct); @@ -94,7 +96,8 @@ public: RGWSI_BucketIndex *_bi, RGWSI_Meta *_meta_svc, RGWSI_MetaBackend *_meta_be_svc, - RGWSI_SyncModules *_sync_modules); + RGWSI_SyncModules *_sync_modules_svc, + RGWSI_Bucket_Sync *_bucket_sync_svc); int read_bucket_entrypoint_info(RGWSI_Bucket_EP_Ctx& ctx, diff --git a/src/rgw/services/svc_bucket_sync.h b/src/rgw/services/svc_bucket_sync.h index 1f2a93d2c538..ed709620276a 100644 --- a/src/rgw/services/svc_bucket_sync.h +++ b/src/rgw/services/svc_bucket_sync.h @@ -35,6 +35,8 @@ public: std::optional bucket, RGWBucketSyncPolicyHandlerRef *handler, optional_yield y) = 0; + virtual int handle_bi_update(RGWBucketInfo& bucket_info, + RGWBucketInfo *orig_bucket_info) = 0; }; diff --git a/src/rgw/services/svc_bucket_sync_sobj.cc b/src/rgw/services/svc_bucket_sync_sobj.cc index 71e364be35a2..935e0ea776a5 100644 --- a/src/rgw/services/svc_bucket_sync_sobj.cc +++ b/src/rgw/services/svc_bucket_sync_sobj.cc @@ -28,6 +28,7 @@ int RGWSI_Bucket_Sync_SObj::do_start() return 0; } + int RGWSI_Bucket_Sync_SObj::get_policy_handler(RGWSI_Bucket_BI_Ctx& ctx, std::optional zone, std::optional _bucket, @@ -87,3 +88,71 @@ int RGWSI_Bucket_Sync_SObj::get_policy_handler(RGWSI_Bucket_BI_Ctx& ctx, return 0; } +static void diff_sets(std::set& orig_set, + std::set& new_set, + vector *added, + vector *removed) +{ + auto oiter = orig_set.begin(); + auto niter = new_set.begin(); + + while (oiter != orig_set.end() && + niter != new_set.end()) { + if (*oiter == *niter) { + ++oiter; + ++niter; + continue; + } + while (*oiter < *niter) { + removed->push_back(*oiter); + ++oiter; + } + while (*niter < *oiter) { + added->push_back(*niter); + ++niter; + } + } + for (; oiter != orig_set.end(); ++oiter) { + removed->push_back(*oiter); + } + for (; niter != new_set.end(); ++niter) { + added->push_back(*niter); + } +} + +int RGWSI_Bucket_Sync_SObj::handle_bi_update(RGWBucketInfo& bucket_info, + RGWBucketInfo *orig_bucket_info) +{ + std::set orig_sources; + std::set orig_dests; + + if (orig_bucket_info && + orig_bucket_info->sync_policy) { + orig_bucket_info->sync_policy->get_potential_related_buckets(bucket_info.bucket, + &orig_sources, + &orig_dests); + } + + std::set sources; + std::set dests; + if (bucket_info.sync_policy) { + bucket_info.sync_policy->get_potential_related_buckets(bucket_info.bucket, + &sources, + &dests); + } + + std::vector removed_sources; + std::vector added_sources; + diff_sets(orig_sources, sources, &added_sources, &removed_sources); + ldout(cct, 20) << __func__ << "(): bucket=" << bucket_info.bucket << ": orig_sources=" << orig_sources << " new_sources=" << sources << dendl; + ldout(cct, 20) << __func__ << "(): bucket=" << bucket_info.bucket << ": potential sources added=" << added_sources << " removed=" << removed_sources << dendl; + + std::vector removed_dests; + std::vector added_dests; + diff_sets(orig_dests, dests, &added_dests, &removed_dests); + ldout(cct, 20) << __func__ << "(): bucket=" << bucket_info.bucket << ": orig_dests=" << orig_dests << " new_dests=" << dests << dendl; + ldout(cct, 20) << __func__ << "(): bucket=" << bucket_info.bucket << ": potential dests added=" << added_dests << " removed=" << removed_dests << dendl; + + return 0; + +} diff --git a/src/rgw/services/svc_bucket_sync_sobj.h b/src/rgw/services/svc_bucket_sync_sobj.h index 967cb9877080..0271d9faff66 100644 --- a/src/rgw/services/svc_bucket_sync_sobj.h +++ b/src/rgw/services/svc_bucket_sync_sobj.h @@ -39,7 +39,6 @@ class RGWSI_Bucket_Sync_SObj : public RGWSI_Bucket_Sync unique_ptr sync_policy_cache; int do_start() override; - public: struct Svc { RGWSI_Zone *zone{nullptr}; @@ -60,5 +59,8 @@ public: std::optional bucket, RGWBucketSyncPolicyHandlerRef *handler, optional_yield y) override; + + int handle_bi_update(RGWBucketInfo& bucket_info, + RGWBucketInfo *orig_bucket_info) override; };