From: Yehuda Sadeh Date: Mon, 2 Dec 2019 23:13:28 +0000 (-0800) Subject: rgw-admin: add 'sync info' command X-Git-Tag: v15.1.0~22^2~93 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=f5de4d2fc2503c56c28ba4f991ec3248c9bbb144;p=ceph.git rgw-admin: add 'sync info' command Also move code into header so that flow manager could be used in radosgw-admin Signed-off-by: Yehuda Sadeh --- diff --git a/src/rgw/rgw_admin.cc b/src/rgw/rgw_admin.cc index f8bb4e37dbf5..8b5baed2a299 100644 --- a/src/rgw/rgw_admin.cc +++ b/src/rgw/rgw_admin.cc @@ -85,6 +85,23 @@ static const DoutPrefixProvider* dpp() { return &global_dpp; } +#define CHECK_TRUE(x, msg, err) \ + do { \ + if (!x) { \ + cerr << msg << std::endl; \ + return err; \ + } \ + } while (0) + +#define CHECK_SUCCESS(x, msg) \ + do { \ + int _x_val = (x); \ + if (_x_val < 0) { \ + cerr << msg << ": " << cpp_strerror(-_x_val) << std::endl; \ + return _x_val; \ + } \ + } while (0) + void usage() { cout << "usage: radosgw-admin [options...]" << std::endl; @@ -691,6 +708,7 @@ enum class OPT { GLOBAL_QUOTA_SET, GLOBAL_QUOTA_ENABLE, GLOBAL_QUOTA_DISABLE, + SYNC_INFO, SYNC_STATUS, ROLE_CREATE, ROLE_DELETE, @@ -896,6 +914,7 @@ static SimpleCmd::Commands all_cmds = { { "global quota set", OPT::GLOBAL_QUOTA_SET }, { "global quota enable", OPT::GLOBAL_QUOTA_ENABLE }, { "global quota disable", OPT::GLOBAL_QUOTA_DISABLE }, + { "sync info", OPT::SYNC_INFO }, { "sync status", OPT::SYNC_STATUS }, { "role create", OPT::ROLE_CREATE }, { "role delete", OPT::ROLE_DELETE }, @@ -1075,6 +1094,15 @@ static int init_bucket(const string& tenant_name, const string& bucket_name, con return 0; } +static int init_bucket(const rgw_bucket& b, + RGWBucketInfo& bucket_info, + rgw_bucket& bucket, + map *pattrs = nullptr) +{ + return init_bucket(b.tenant, b.name, b.bucket_id, + bucket_info, bucket, pattrs); +} + static int read_input(const string& infile, bufferlist& bl) { int fd = 0; @@ -2292,6 +2320,77 @@ static int bucket_source_sync_status(rgw::sal::RGWRadosStore *store, const RGWZo return 0; } +void encode_json(const char *name, const RGWBucketSyncFlowManager::flow_map_t& m, Formatter *f) +{ + Formatter::ObjectSection top_section(*f, name); + Formatter::ArraySection as(*f, "entries"); + + for (auto& entry : m) { + Formatter::ObjectSection os(*f, "entry"); + auto& bucket = entry.first; + auto& pflow = entry.second; + + encode_json("bucket", rgw_sync_bucket_entity::bucket_key(bucket), f); + { + Formatter::ArraySection fg(*f, "flow_groups"); + for (auto& flow_group : pflow.flow_groups) { + encode_json("entry", *flow_group, f); + } + } + encode_json("pipes", pflow.pipe, f); + } +} + +static int sync_info(std::optional opt_target_zone, std::optional opt_bucket, Formatter *formatter) +{ + const RGWRealm& realm = store->svc()->zone->get_realm(); + const RGWZoneGroup& zonegroup = store->svc()->zone->get_zonegroup(); + const RGWZone& zone = store->svc()->zone->get_zone(); + + string zone_name = opt_target_zone.value_or(store->svc()->zone->zone_name()); + + RGWBucketSyncFlowManager zone_flow(zone_name, nullopt, nullptr); + + zone_flow.init(zonegroup.sync_policy); + + RGWBucketSyncFlowManager *flow_mgr = &zone_flow; + + std::optional bucket_flow; + + if (opt_bucket) { + rgw_bucket bucket; + RGWBucketInfo bucket_info; + + int ret = init_bucket(*opt_bucket, bucket_info, bucket); + if (ret < 0) { + cerr << "ERROR: init_bucket failed: " << cpp_strerror(-ret) << std::endl; + return ret; + } + + if (ret >= 0 && + bucket_info.sync_policy) { + bucket_flow.emplace(zone_name, opt_bucket, &zone_flow); + + bucket_flow->init(*bucket_info.sync_policy); + + flow_mgr = &(*bucket_flow); + } + } + + auto& sources = flow_mgr->get_sources(); + auto& dests = flow_mgr->get_dests(); + + { + Formatter::ObjectSection os(*formatter, "result"); + encode_json("sources", sources, formatter); + encode_json("dests", dests, formatter); + } + + formatter->flush(cout); + + return 0; +} + static int bucket_sync_info(rgw::sal::RGWRadosStore *store, const RGWBucketInfo& info, std::ostream& out) { @@ -2619,14 +2718,6 @@ static bool require_non_empty_opt(std::optional opt, bool extra_check = true) return true; } -#define CHECK_TRUE(x, msg, err) \ - do { \ - if (!x) { \ - cerr << msg << std::endl; \ - return err; \ - } \ - } while (0) - template static void show_result(T& obj, Formatter *formatter, @@ -2684,7 +2775,7 @@ public: return 0; } - ret = init_bucket(bucket->tenant, bucket->name, bucket->bucket_id, bucket_info, *bucket, &bucket_attrs); + ret = init_bucket(*bucket, bucket_info, *bucket, &bucket_attrs); if (ret < 0) { cerr << "ERROR: could not init bucket: " << cpp_strerror(-ret) << std::endl; return ret; @@ -2926,6 +3017,7 @@ int main(int argc, const char **argv) std::optional opt_dest_tenant; std::optional opt_dest_bucket_name; std::optional opt_dest_bucket_id; + std::optional opt_effective_zone; rgw::notify::EventTypeList event_types; @@ -3317,6 +3409,8 @@ int main(int argc, const char **argv) opt_dest_bucket_name = val; } else if (ceph_argparse_witharg(args, i, &val, "--dest-bucket-id", (char*)NULL)) { opt_dest_bucket_id = val; + } else if (ceph_argparse_witharg(args, i, &val, "--effective-zone", (char*)NULL)) { + opt_effective_zone = val; } else if (ceph_argparse_binary_flag(args, i, &detail, NULL, "--detail", (char*)NULL)) { // do nothing } else if (strncmp(*i, "-", 1) == 0) { @@ -3515,6 +3609,7 @@ int main(int argc, const char **argv) OPT::PERIOD_GET_CURRENT, OPT::PERIOD_LIST, OPT::GLOBAL_QUOTA_GET, + OPT::SYNC_INFO, OPT::SYNC_STATUS, OPT::ROLE_GET, OPT::ROLE_LIST, @@ -7280,6 +7375,10 @@ next: } } + if (opt_cmd == OPT::SYNC_INFO) { + sync_info(opt_effective_zone, opt_bucket, formatter); + } + if (opt_cmd == OPT::SYNC_STATUS) { sync_status(formatter); } diff --git a/src/rgw/rgw_bucket_sync.cc b/src/rgw/rgw_bucket_sync.cc index 05cb9c88ab8e..fda52b1bdeb3 100644 --- a/src/rgw/rgw_bucket_sync.cc +++ b/src/rgw/rgw_bucket_sync.cc @@ -93,6 +93,23 @@ void rgw_sync_bucket_entity::remove_bucket(std::optional tenant, } } + +string rgw_sync_bucket_entity::bucket_key(std::optional b) +{ + if (!b) { + return string("*"); + } + + rgw_bucket _b = *b; + + if (_b.name.empty()) { + _b.name = "*"; + } + + return _b.get_key(); +} + + bool rgw_sync_data_flow_group::find_symmetrical(const string& flow_id, bool create, rgw_sync_symmetric_group **flow_group) { if (!symmetrical) { @@ -235,11 +252,9 @@ static std::vector filter_relevant_pipes(const std::vector const string& dest_zone) { std::vector relevant_pipes; - for (auto& pipe : relevant_pipes) { - if (pipe.source.match_zone(source_zone)) { - relevant_pipes.push_back(pipe); - } - if (pipe.dest.match_zone(dest_zone)) { + for (auto& pipe : pipes) { + if (pipe.source.match_zone(source_zone) && + pipe.dest.match_zone(dest_zone)) { relevant_pipes.push_back(pipe); } } @@ -252,216 +267,229 @@ static bool is_wildcard_bucket(const rgw_bucket& bucket) return bucket.name.empty(); } -struct group_pipe_map { - string zone; - std::optional bucket; - - rgw_sync_policy_group::Status status{rgw_sync_policy_group::Status::FORBIDDEN}; - - struct zone_bucket { - string zone; /* zone name */ - rgw_bucket bucket; /* bucket, if empty then wildcard */ +void rgw_sync_group_pipe_map::zone_bucket::dump(ceph::Formatter *f) const +{ + encode_json("zone", zone, f); + encode_json("bucket", rgw_sync_bucket_entity::bucket_key(bucket), f); +} - bool operator<(const zone_bucket& zb) const { - if (zone < zb.zone) { - return true; - } - if (zone > zb.zone) { - return false; - } - return (bucket < zb.bucket); - } - }; +void rgw_sync_group_pipe_map::dump(ceph::Formatter *f) const +{ + encode_json("zone", zone, f); + encode_json("bucket", rgw_sync_bucket_entity::bucket_key(bucket), f); + encode_json("sources", sources, f); + encode_json("dests", dests, f); +} - using zb_pipe_map_t = std::multimap; +ostream& operator<<(ostream& os, const rgw_sync_bucket_entity& e) { + os << "{b=" << rgw_sync_bucket_entity::bucket_key(e.bucket) << ",z=" << e.zones.value_or(std::set()) << "}"; + return os; +} - zb_pipe_map_t sources; /* all the pipes where zone is pulling from, by source_zone, s */ - zb_pipe_map_t dests; /* all the pipes that pull from zone */ +ostream& operator<<(ostream& os, const rgw_sync_bucket_pipe& pipe) { + os << "{id=" << pipe.id << ",s=" << pipe.source << ",d=" << pipe.dest << "}"; + return os; +} - template - void try_add_to_pipe_map(const string& source_zone, - const string& dest_zone, - const std::vector& pipes, - zb_pipe_map_t *pipe_map, - CB1 filter_cb, - CB2 call_filter_cb) { - if (!filter_cb(source_zone, nullopt, dest_zone, nullopt)) { - return; - } - auto relevant_pipes = filter_relevant_pipes(pipes, source_zone, dest_zone); +template +void rgw_sync_group_pipe_map::try_add_to_pipe_map(const string& source_zone, + const string& dest_zone, + const std::vector& pipes, + zb_pipe_map_t *pipe_map, + CB1 filter_cb, + CB2 call_filter_cb) +{ + if (!filter_cb(source_zone, nullopt, dest_zone, nullopt)) { + return; + } + auto relevant_pipes = filter_relevant_pipes(pipes, source_zone, dest_zone); - for (auto& pipe : relevant_pipes) { - zone_bucket zb; - if (!call_filter_cb(pipe, &zb)) { - continue; - } - pipe_map->insert(make_pair(zb, pipe)); + for (auto& pipe : relevant_pipes) { + zone_bucket zb; + if (!call_filter_cb(pipe, &zb)) { + continue; } + pipe_map->insert(make_pair(zb, pipe)); } +} - template - void try_add_source(const string& source_zone, - const string& dest_zone, - const std::vector& pipes, - CB filter_cb) - { - return try_add_to_pipe_map(source_zone, dest_zone, pipes, - &sources, - filter_cb, - [&](const rgw_sync_bucket_pipe& pipe, zone_bucket *zb) { - *zb = zone_bucket{source_zone, pipe.source.get_bucket()}; - return filter_cb(source_zone, zb->bucket, dest_zone, pipe.dest.get_bucket()); - }); - } - - template - void try_add_dest(const string& source_zone, +template +void rgw_sync_group_pipe_map::try_add_source(const string& source_zone, const string& dest_zone, const std::vector& pipes, CB filter_cb) - { - return try_add_to_pipe_map(source_zone, dest_zone, pipes, - &dests, - filter_cb, - [&](const rgw_sync_bucket_pipe& pipe, zone_bucket *zb) { - *zb = zone_bucket{dest_zone, pipe.dest.get_bucket()}; - return filter_cb(source_zone, pipe.source.get_bucket(), dest_zone, zb->bucket); - }); - } - - pair find_pipes(const zb_pipe_map_t& m, - const string& zone, - std::optional b) { - if (!b) { - return m.equal_range(zone_bucket{zone, rgw_bucket()}); - } +{ + return try_add_to_pipe_map(source_zone, dest_zone, pipes, + &sources, + filter_cb, + [&](const rgw_sync_bucket_pipe& pipe, zone_bucket *zb) { + *zb = zone_bucket{source_zone, pipe.source.get_bucket()}; + return filter_cb(source_zone, zb->bucket, dest_zone, pipe.dest.get_bucket()); + }); +} - auto zb = zone_bucket{zone, *bucket}; +template +void rgw_sync_group_pipe_map::try_add_dest(const string& source_zone, + const string& dest_zone, + const std::vector& pipes, + CB filter_cb) +{ + return try_add_to_pipe_map(source_zone, dest_zone, pipes, + &dests, + filter_cb, + [&](const rgw_sync_bucket_pipe& pipe, zone_bucket *zb) { + *zb = zone_bucket{dest_zone, pipe.dest.get_bucket()}; + return filter_cb(source_zone, pipe.source.get_bucket(), dest_zone, zb->bucket); + }); +} - auto range = m.equal_range(zb); - if (range.first == range.second && - !is_wildcard_bucket(*bucket)) { - /* couldn't find the specific bucket, try to find by wildcard */ - zb.bucket = rgw_bucket(); - range = m.equal_range(zb); - } +using zb_pipe_map_t = rgw_sync_group_pipe_map::zb_pipe_map_t; - return range; +pair rgw_sync_group_pipe_map::find_pipes(const zb_pipe_map_t& m, + const string& zone, + std::optional b) +{ + if (!b) { + return m.equal_range(zone_bucket{zone, rgw_bucket()}); } + auto zb = zone_bucket{zone, *bucket}; - template - void init(const string& _zone, - std::optional _bucket, - const rgw_sync_policy_group& group, - CB filter_cb) { - zone = _zone; - bucket = _bucket; + auto range = m.equal_range(zb); + if (range.first == range.second && + !is_wildcard_bucket(*bucket)) { + /* couldn't find the specific bucket, try to find by wildcard */ + zb.bucket = rgw_bucket(); + range = m.equal_range(zb); + } - status = group.status; + return range; +} - std::vector zone_pipes; - /* only look at pipes that touch the specific zone and bucket */ - for (auto& pipe : group.pipes) { - if (pipe.contains_zone(zone) && - pipe.contains_bucket(bucket)) { - zone_pipes.push_back(pipe); - } - } +template +void rgw_sync_group_pipe_map::init(const string& _zone, + std::optional _bucket, + const rgw_sync_policy_group& group, + CB filter_cb) { + zone = _zone; + bucket = _bucket; - if (group.data_flow.empty()) { - return; + zone_bucket zb(zone, bucket); + + status = group.status; + + std::vector zone_pipes; + + /* only look at pipes that touch the specific zone and bucket */ + for (auto& pipe : group.pipes) { + if (pipe.contains_zone_bucket(zone, bucket)) { + zone_pipes.push_back(pipe); } + } - auto& flow = group.data_flow; - - /* symmetrical */ - if (flow.symmetrical) { - for (auto& symmetrical_group : *flow.symmetrical) { - if (symmetrical_group.zones.find(zone) != symmetrical_group.zones.end()) { - for (auto& z : symmetrical_group.zones) { - if (z != zone) { - try_add_source(z, zone, zone_pipes, filter_cb); - try_add_dest(zone, z, zone_pipes, filter_cb); - } + if (group.data_flow.empty()) { + return; + } + + auto& flow = group.data_flow; + + /* symmetrical */ + if (flow.symmetrical) { + for (auto& symmetrical_group : *flow.symmetrical) { + if (symmetrical_group.zones.find(zone) != symmetrical_group.zones.end()) { + for (auto& z : symmetrical_group.zones) { + if (z != zone) { + try_add_source(z, zone, zone_pipes, filter_cb); + try_add_dest(zone, z, zone_pipes, filter_cb); } } } } + } - /* directional */ - if (flow.directional) { - for (auto& rule : *flow.directional) { - if (rule.source_zone == zone) { - try_add_dest(zone, rule.dest_zone, zone_pipes, filter_cb); - } else if (rule.dest_zone == zone) { - try_add_source(rule.source_zone, zone, zone_pipes, filter_cb); - } + /* directional */ + if (flow.directional) { + for (auto& rule : *flow.directional) { + if (rule.source_zone == zone) { + try_add_dest(zone, rule.dest_zone, zone_pipes, filter_cb); + } else if (rule.dest_zone == zone) { + try_add_source(rule.source_zone, zone, zone_pipes, filter_cb); } } } +} - /* - * find all relevant pipes in our zone that match {dest_bucket} <- {source_zone, source_bucket} - */ - vector find_source_pipes(const string& source_zone, - std::optional source_bucket, - std::optional dest_bucket) { - vector result; +/* + * find all relevant pipes in our zone that match {dest_bucket} <- {source_zone, source_bucket} + */ +vector rgw_sync_group_pipe_map::find_source_pipes(const string& source_zone, + std::optional source_bucket, + std::optional dest_bucket) { + vector result; - auto range = find_pipes(sources, source_zone, source_bucket); + auto range = find_pipes(sources, source_zone, source_bucket); - for (auto iter = range.first; iter != range.second; ++iter) { - auto pipe = iter->second; - if (pipe.dest.match_bucket(dest_bucket)) { - result.push_back(pipe); - } + for (auto iter = range.first; iter != range.second; ++iter) { + auto pipe = iter->second; + if (pipe.dest.match_bucket(dest_bucket)) { + result.push_back(pipe); } - return std::move(result); } + return std::move(result); +} - /* - * find all relevant pipes in other zones that pull from a specific - * source bucket in out zone {source_bucket} -> {dest_zone, dest_bucket} - */ - vector find_dest_pipes(std::optional source_bucket, - const string& dest_zone, - std::optional dest_bucket) { - vector result; +/* + * find all relevant pipes in other zones that pull from a specific + * source bucket in out zone {source_bucket} -> {dest_zone, dest_bucket} + */ +vector rgw_sync_group_pipe_map::find_dest_pipes(std::optional source_bucket, + const string& dest_zone, + std::optional dest_bucket) { + vector result; - auto range = find_pipes(dests, dest_zone, dest_bucket); + auto range = find_pipes(dests, dest_zone, dest_bucket); - for (auto iter = range.first; iter != range.second; ++iter) { - auto pipe = iter->second; - if (pipe.source.match_bucket(source_bucket)) { - result.push_back(pipe); - } + for (auto iter = range.first; iter != range.second; ++iter) { + auto pipe = iter->second; + if (pipe.source.match_bucket(source_bucket)) { + result.push_back(pipe); } + } + + return std::move(result); +} - return std::move(result); +/* + * find all relevant pipes from {source_zone, source_bucket} -> {dest_zone, dest_bucket} + */ +vector rgw_sync_group_pipe_map::find_pipes(const string& source_zone, + std::optional source_bucket, + const string& dest_zone, + std::optional dest_bucket) { + if (dest_zone == zone) { + return find_source_pipes(source_zone, source_bucket, dest_bucket); } - /* - * find all relevant pipes from {source_zone, source_bucket} -> {dest_zone, dest_bucket} - */ - vector find_pipes(const string& source_zone, - std::optional source_bucket, - const string& dest_zone, - std::optional dest_bucket) { - if (dest_zone == zone) { - return find_source_pipes(source_zone, source_bucket, dest_bucket); - } + if (source_zone == zone) { + return find_dest_pipes(source_bucket, dest_zone, dest_bucket); + } - if (source_zone == zone) { - return find_dest_pipes(source_bucket, dest_zone, dest_bucket); - } + return vector(); +} - return vector(); +void RGWBucketSyncFlowManager::pipe_flow::dump(ceph::Formatter *f) const +{ + { + Formatter::ArraySection os(*f, "flow_groups"); + for (auto& g : flow_groups) { + encode_json("group", *g, f); + } } -}; + encode_json("pipe", pipe, f); +} bool RGWBucketSyncFlowManager::allowed_data_flow(const string& source_zone, std::optional source_bucket, @@ -519,10 +547,23 @@ RGWBucketSyncFlowManager::flow_map_t::iterator RGWBucketSyncFlowManager::find_bu void RGWBucketSyncFlowManager::update_flow_maps(const rgw_sync_bucket_pipe& pipe, - group_pipe_map *flow_group) { + rgw_sync_group_pipe_map *flow_group) { auto source_bucket = pipe.source.get_bucket(); auto dest_bucket = pipe.dest.get_bucket(); + if (!flow_group->sources.empty()) { + auto& by_source = flow_by_source[source_bucket]; + by_source.flow_groups.push_back(flow_group); + by_source.pipe.push_back(pipe); + } + + if (!flow_group->dests.empty()) { + auto& by_dest = flow_by_dest[dest_bucket]; + by_dest.flow_groups.push_back(flow_group); + by_dest.pipe.push_back(pipe); + } + +#if 0 if (!bucket || *bucket != source_bucket) { auto& by_source = flow_by_source[source_bucket]; @@ -536,6 +577,7 @@ void RGWBucketSyncFlowManager::update_flow_maps(const rgw_sync_bucket_pipe& pipe by_dest.flow_groups.push_back(flow_group); by_dest.pipe.push_back(pipe); } +#endif } void RGWBucketSyncFlowManager::init(const rgw_sync_policy_info& sync_policy) { @@ -543,13 +585,13 @@ void RGWBucketSyncFlowManager::init(const rgw_sync_policy_info& sync_policy) { auto& group = item.second; auto& flow_group_map = flow_groups[group.id]; - flow_group_map.init(zone_svc->zone_name(), bucket, group, + flow_group_map.init(zone_name, bucket, group, [&](const string& source_zone, std::optional source_bucket, const string& dest_zone, std::optional dest_bucket) { if (!parent) { - return true; + return true; } return parent->allowed_data_flow(source_zone, source_bucket, @@ -573,11 +615,13 @@ void RGWBucketSyncFlowManager::init(const rgw_sync_policy_info& sync_policy) { } -RGWBucketSyncFlowManager::RGWBucketSyncFlowManager(RGWSI_Zone *_zone_svc, +RGWBucketSyncFlowManager::RGWBucketSyncFlowManager(const string& _zone_name, std::optional _bucket, - RGWBucketSyncFlowManager *_parent) : zone_svc(_zone_svc), + RGWBucketSyncFlowManager *_parent) : zone_name(_zone_name), bucket(_bucket), parent(_parent) {} + + int RGWBucketSyncPolicyHandler::init() { #warning FIXME diff --git a/src/rgw/rgw_bucket_sync.h b/src/rgw/rgw_bucket_sync.h index 6379227c7e94..d9fcedbd2fd8 100644 --- a/src/rgw/rgw_bucket_sync.h +++ b/src/rgw/rgw_bucket_sync.h @@ -17,33 +17,128 @@ #pragma once #include "rgw_common.h" +#include "rgw_sync_policy.h" class RGWSI_Zone; -struct group_pipe_map; -struct rgw_sync_bucket_pipe;; +struct rgw_sync_group_pipe_map; +struct rgw_sync_bucket_pipe; struct rgw_sync_policy_info; -class RGWBucketSyncFlowManager { - RGWSI_Zone *zone_svc; +struct rgw_sync_group_pipe_map { + string zone; std::optional bucket; - RGWBucketSyncFlowManager *parent{nullptr}; + rgw_sync_policy_group::Status status{rgw_sync_policy_group::Status::FORBIDDEN}; + + struct zone_bucket { + string zone; /* zone name */ + rgw_bucket bucket; /* bucket, if empty then wildcard */ + + zone_bucket() {} + zone_bucket(const string& _zone, + std::optional _bucket) : zone(_zone), + bucket(_bucket.value_or(rgw_bucket())) {} - map flow_groups; + bool operator<(const zone_bucket& zb) const { + if (zone < zb.zone) { + return true; + } + if (zone > zb.zone) { + return false; + } + return (bucket < zb.bucket); + } + void dump(ceph::Formatter *f) const; + }; + + using zb_pipe_map_t = std::multimap; + + zb_pipe_map_t sources; /* all the pipes where zone is pulling from */ + zb_pipe_map_t dests; /* all the pipes that pull from zone */ + + void dump(ceph::Formatter *f) const; + + template + void try_add_to_pipe_map(const string& source_zone, + const string& dest_zone, + const std::vector& pipes, + zb_pipe_map_t *pipe_map, + CB1 filter_cb, + CB2 call_filter_cb); + + template + void try_add_source(const string& source_zone, + const string& dest_zone, + const std::vector& pipes, + CB filter_cb); + + template + void try_add_dest(const string& source_zone, + const string& dest_zone, + const std::vector& pipes, + CB filter_cb); + + pair find_pipes(const zb_pipe_map_t& m, + const string& zone, + std::optional b); + + template + void init(const string& _zone, + std::optional _bucket, + const rgw_sync_policy_group& group, + CB filter_cb); + + /* + * find all relevant pipes in our zone that match {dest_bucket} <- {source_zone, source_bucket} + */ + vector find_source_pipes(const string& source_zone, + std::optional source_bucket, + std::optional dest_bucket); + + /* + * find all relevant pipes in other zones that pull from a specific + * source bucket in out zone {source_bucket} -> {dest_zone, dest_bucket} + */ + vector find_dest_pipes(std::optional source_bucket, + const string& dest_zone, + std::optional dest_bucket); + + /* + * find all relevant pipes from {source_zone, source_bucket} -> {dest_zone, dest_bucket} + */ + vector find_pipes(const string& source_zone, + std::optional source_bucket, + const string& dest_zone, + std::optional dest_bucket); +}; + +class RGWBucketSyncFlowManager { +public: struct pipe_flow { - vector flow_groups; + vector flow_groups; vector pipe; + + void dump(ceph::Formatter *f) const; }; + using flow_map_t = map; + +private: + + string zone_name; + std::optional bucket; + + RGWBucketSyncFlowManager *parent{nullptr}; + + map flow_groups; + bool allowed_data_flow(const string& source_zone, std::optional source_bucket, const string& dest_zone, std::optional dest_bucket, bool check_activated); - using flow_map_t = map; - flow_map_t flow_by_source; flow_map_t flow_by_dest; @@ -53,15 +148,22 @@ class RGWBucketSyncFlowManager { flow_map_t::iterator find_bucket_flow(flow_map_t& m, std::optional bucket); void update_flow_maps(const rgw_sync_bucket_pipe& pipe, - group_pipe_map *flow_group); + rgw_sync_group_pipe_map *flow_group); public: - RGWBucketSyncFlowManager(RGWSI_Zone *_zone_svc, + RGWBucketSyncFlowManager(const string& _zone_name, std::optional _bucket, RGWBucketSyncFlowManager *_parent); void init(const rgw_sync_policy_info& sync_policy); + + const flow_map_t& get_sources() { + return flow_by_source; + } + const flow_map_t& get_dests() { + return flow_by_dest; + } }; class RGWBucketSyncPolicyHandler { diff --git a/src/rgw/rgw_json_enc.cc b/src/rgw/rgw_json_enc.cc index c331bdf8f880..fa7c00e12aee 100644 --- a/src/rgw/rgw_json_enc.cc +++ b/src/rgw/rgw_json_enc.cc @@ -867,17 +867,13 @@ void rgw_sync_symmetric_group::decode_json(JSONObj *obj) void rgw_sync_bucket_entity::dump(Formatter *f) const { - if (bucket) { - rgw_bucket b = *bucket; - if (b.name.empty()) { - b.name = "*"; - } - - encode_json("bucket", b.get_key(), f); - } else { - encode_json("bucket", "*", f); + encode_json("bucket", rgw_sync_bucket_entity::bucket_key(bucket), f); + if (zones) { + encode_json("zones", zones, f); + } else if (all_zones) { + set z = { "*" }; + encode_json("zones", z, f); } - encode_json("zones", zones, f); } void rgw_sync_bucket_entity::decode_json(JSONObj *obj) diff --git a/src/rgw/rgw_sync_policy.h b/src/rgw/rgw_sync_policy.h index 37ded0181823..1093cef0c66d 100644 --- a/src/rgw/rgw_sync_policy.h +++ b/src/rgw/rgw_sync_policy.h @@ -237,6 +237,7 @@ public: ENCODE_START(1, 1, bl); encode(bucket, bl); encode(zones, bl); + encode(all_zones, bl); ENCODE_FINISH(bl); } @@ -244,6 +245,7 @@ public: DECODE_START(1, bl); decode(bucket, bl); decode(zones, bl); + decode(all_zones, bl); DECODE_FINISH(bl); } @@ -275,10 +277,8 @@ public: bool match_zone(const string& zone) const { if (all_zones) { - return true; - } - - if (!zones) { /* all zones */ + return true; + } else if (!zones) { return false; } @@ -288,6 +288,8 @@ public: rgw_bucket get_bucket() const { return bucket.value_or(rgw_bucket()); } + + static string bucket_key(std::optional b); }; WRITE_CLASS_ENCODER(rgw_sync_bucket_entity) @@ -329,6 +331,11 @@ public: return (source.match_zone(zone) || dest.match_zone(zone)); } + bool contains_zone_bucket(const string& zone, std::optional b) const { + return ((source.match_zone(zone) && source.match_bucket(b)) || + (dest.match_zone(zone) && dest.match_bucket(b))); + } + void dump(ceph::Formatter *f) const; void decode_json(JSONObj *obj);