return &global_dpp;
}
+#define CHECK_TRUE(x, msg, err) \
+ do { \
+ if (!x) { \
+ cerr << msg << std::endl; \
+ return err; \
+ } \
+ } while (0)
+
+#define CHECK_SUCCESS(x, msg) \
+ do { \
+ int _x_val = (x); \
+ if (_x_val < 0) { \
+ cerr << msg << ": " << cpp_strerror(-_x_val) << std::endl; \
+ return _x_val; \
+ } \
+ } while (0)
+
void usage()
{
cout << "usage: radosgw-admin <cmd> [options...]" << std::endl;
GLOBAL_QUOTA_SET,
GLOBAL_QUOTA_ENABLE,
GLOBAL_QUOTA_DISABLE,
+ SYNC_INFO,
SYNC_STATUS,
ROLE_CREATE,
ROLE_DELETE,
{ "global quota set", OPT::GLOBAL_QUOTA_SET },
{ "global quota enable", OPT::GLOBAL_QUOTA_ENABLE },
{ "global quota disable", OPT::GLOBAL_QUOTA_DISABLE },
+ { "sync info", OPT::SYNC_INFO },
{ "sync status", OPT::SYNC_STATUS },
{ "role create", OPT::ROLE_CREATE },
{ "role delete", OPT::ROLE_DELETE },
return 0;
}
+static int init_bucket(const rgw_bucket& b,
+ RGWBucketInfo& bucket_info,
+ rgw_bucket& bucket,
+ map<string, bufferlist> *pattrs = nullptr)
+{
+ return init_bucket(b.tenant, b.name, b.bucket_id,
+ bucket_info, bucket, pattrs);
+}
+
static int read_input(const string& infile, bufferlist& bl)
{
int fd = 0;
return 0;
}
+void encode_json(const char *name, const RGWBucketSyncFlowManager::flow_map_t& m, Formatter *f)
+{
+ Formatter::ObjectSection top_section(*f, name);
+ Formatter::ArraySection as(*f, "entries");
+
+ for (auto& entry : m) {
+ Formatter::ObjectSection os(*f, "entry");
+ auto& bucket = entry.first;
+ auto& pflow = entry.second;
+
+ encode_json("bucket", rgw_sync_bucket_entity::bucket_key(bucket), f);
+ {
+ Formatter::ArraySection fg(*f, "flow_groups");
+ for (auto& flow_group : pflow.flow_groups) {
+ encode_json("entry", *flow_group, f);
+ }
+ }
+ encode_json("pipes", pflow.pipe, f);
+ }
+}
+
+static int sync_info(std::optional<string> opt_target_zone, std::optional<rgw_bucket> opt_bucket, Formatter *formatter)
+{
+ const RGWRealm& realm = store->svc()->zone->get_realm();
+ const RGWZoneGroup& zonegroup = store->svc()->zone->get_zonegroup();
+ const RGWZone& zone = store->svc()->zone->get_zone();
+
+ string zone_name = opt_target_zone.value_or(store->svc()->zone->zone_name());
+
+ RGWBucketSyncFlowManager zone_flow(zone_name, nullopt, nullptr);
+
+ zone_flow.init(zonegroup.sync_policy);
+
+ RGWBucketSyncFlowManager *flow_mgr = &zone_flow;
+
+ std::optional<RGWBucketSyncFlowManager> bucket_flow;
+
+ if (opt_bucket) {
+ rgw_bucket bucket;
+ RGWBucketInfo bucket_info;
+
+ int ret = init_bucket(*opt_bucket, bucket_info, bucket);
+ if (ret < 0) {
+ cerr << "ERROR: init_bucket failed: " << cpp_strerror(-ret) << std::endl;
+ return ret;
+ }
+
+ if (ret >= 0 &&
+ bucket_info.sync_policy) {
+ bucket_flow.emplace(zone_name, opt_bucket, &zone_flow);
+
+ bucket_flow->init(*bucket_info.sync_policy);
+
+ flow_mgr = &(*bucket_flow);
+ }
+ }
+
+ auto& sources = flow_mgr->get_sources();
+ auto& dests = flow_mgr->get_dests();
+
+ {
+ Formatter::ObjectSection os(*formatter, "result");
+ encode_json("sources", sources, formatter);
+ encode_json("dests", dests, formatter);
+ }
+
+ formatter->flush(cout);
+
+ return 0;
+}
+
static int bucket_sync_info(rgw::sal::RGWRadosStore *store, const RGWBucketInfo& info,
std::ostream& out)
{
return true;
}
-#define CHECK_TRUE(x, msg, err) \
- do { \
- if (!x) { \
- cerr << msg << std::endl; \
- return err; \
- } \
- } while (0)
-
template <class T>
static void show_result(T& obj,
Formatter *formatter,
return 0;
}
- ret = init_bucket(bucket->tenant, bucket->name, bucket->bucket_id, bucket_info, *bucket, &bucket_attrs);
+ ret = init_bucket(*bucket, bucket_info, *bucket, &bucket_attrs);
if (ret < 0) {
cerr << "ERROR: could not init bucket: " << cpp_strerror(-ret) << std::endl;
return ret;
std::optional<string> opt_dest_tenant;
std::optional<string> opt_dest_bucket_name;
std::optional<string> opt_dest_bucket_id;
+ std::optional<string> opt_effective_zone;
rgw::notify::EventTypeList event_types;
opt_dest_bucket_name = val;
} else if (ceph_argparse_witharg(args, i, &val, "--dest-bucket-id", (char*)NULL)) {
opt_dest_bucket_id = val;
+ } else if (ceph_argparse_witharg(args, i, &val, "--effective-zone", (char*)NULL)) {
+ opt_effective_zone = val;
} else if (ceph_argparse_binary_flag(args, i, &detail, NULL, "--detail", (char*)NULL)) {
// do nothing
} else if (strncmp(*i, "-", 1) == 0) {
OPT::PERIOD_GET_CURRENT,
OPT::PERIOD_LIST,
OPT::GLOBAL_QUOTA_GET,
+ OPT::SYNC_INFO,
OPT::SYNC_STATUS,
OPT::ROLE_GET,
OPT::ROLE_LIST,
}
}
+ if (opt_cmd == OPT::SYNC_INFO) {
+ sync_info(opt_effective_zone, opt_bucket, formatter);
+ }
+
if (opt_cmd == OPT::SYNC_STATUS) {
sync_status(formatter);
}
}
}
+
+string rgw_sync_bucket_entity::bucket_key(std::optional<rgw_bucket> b)
+{
+ if (!b) {
+ return string("*");
+ }
+
+ rgw_bucket _b = *b;
+
+ if (_b.name.empty()) {
+ _b.name = "*";
+ }
+
+ return _b.get_key();
+}
+
+
bool rgw_sync_data_flow_group::find_symmetrical(const string& flow_id, bool create, rgw_sync_symmetric_group **flow_group)
{
if (!symmetrical) {
const string& dest_zone)
{
std::vector<rgw_sync_bucket_pipe> relevant_pipes;
- for (auto& pipe : relevant_pipes) {
- if (pipe.source.match_zone(source_zone)) {
- relevant_pipes.push_back(pipe);
- }
- if (pipe.dest.match_zone(dest_zone)) {
+ for (auto& pipe : pipes) {
+ if (pipe.source.match_zone(source_zone) &&
+ pipe.dest.match_zone(dest_zone)) {
relevant_pipes.push_back(pipe);
}
}
return bucket.name.empty();
}
-struct group_pipe_map {
- string zone;
- std::optional<rgw_bucket> bucket;
-
- rgw_sync_policy_group::Status status{rgw_sync_policy_group::Status::FORBIDDEN};
-
- struct zone_bucket {
- string zone; /* zone name */
- rgw_bucket bucket; /* bucket, if empty then wildcard */
+void rgw_sync_group_pipe_map::zone_bucket::dump(ceph::Formatter *f) const
+{
+ encode_json("zone", zone, f);
+ encode_json("bucket", rgw_sync_bucket_entity::bucket_key(bucket), f);
+}
- bool operator<(const zone_bucket& zb) const {
- if (zone < zb.zone) {
- return true;
- }
- if (zone > zb.zone) {
- return false;
- }
- return (bucket < zb.bucket);
- }
- };
+void rgw_sync_group_pipe_map::dump(ceph::Formatter *f) const
+{
+ encode_json("zone", zone, f);
+ encode_json("bucket", rgw_sync_bucket_entity::bucket_key(bucket), f);
+ encode_json("sources", sources, f);
+ encode_json("dests", dests, f);
+}
- using zb_pipe_map_t = std::multimap<zone_bucket, rgw_sync_bucket_pipe>;
+ostream& operator<<(ostream& os, const rgw_sync_bucket_entity& e) {
+ os << "{b=" << rgw_sync_bucket_entity::bucket_key(e.bucket) << ",z=" << e.zones.value_or(std::set<string>()) << "}";
+ return os;
+}
- zb_pipe_map_t sources; /* all the pipes where zone is pulling from, by source_zone, s */
- zb_pipe_map_t dests; /* all the pipes that pull from zone */
+ostream& operator<<(ostream& os, const rgw_sync_bucket_pipe& pipe) {
+ os << "{id=" << pipe.id << ",s=" << pipe.source << ",d=" << pipe.dest << "}";
+ return os;
+}
- template <typename CB1, typename CB2>
- void try_add_to_pipe_map(const string& source_zone,
- const string& dest_zone,
- const std::vector<rgw_sync_bucket_pipe>& pipes,
- zb_pipe_map_t *pipe_map,
- CB1 filter_cb,
- CB2 call_filter_cb) {
- if (!filter_cb(source_zone, nullopt, dest_zone, nullopt)) {
- return;
- }
- auto relevant_pipes = filter_relevant_pipes(pipes, source_zone, dest_zone);
+template <typename CB1, typename CB2>
+void rgw_sync_group_pipe_map::try_add_to_pipe_map(const string& source_zone,
+ const string& dest_zone,
+ const std::vector<rgw_sync_bucket_pipe>& pipes,
+ zb_pipe_map_t *pipe_map,
+ CB1 filter_cb,
+ CB2 call_filter_cb)
+{
+ if (!filter_cb(source_zone, nullopt, dest_zone, nullopt)) {
+ return;
+ }
+ auto relevant_pipes = filter_relevant_pipes(pipes, source_zone, dest_zone);
- for (auto& pipe : relevant_pipes) {
- zone_bucket zb;
- if (!call_filter_cb(pipe, &zb)) {
- continue;
- }
- pipe_map->insert(make_pair(zb, pipe));
+ for (auto& pipe : relevant_pipes) {
+ zone_bucket zb;
+ if (!call_filter_cb(pipe, &zb)) {
+ continue;
}
+ pipe_map->insert(make_pair(zb, pipe));
}
+}
- template <typename CB>
- void try_add_source(const string& source_zone,
- const string& dest_zone,
- const std::vector<rgw_sync_bucket_pipe>& pipes,
- CB filter_cb)
- {
- return try_add_to_pipe_map(source_zone, dest_zone, pipes,
- &sources,
- filter_cb,
- [&](const rgw_sync_bucket_pipe& pipe, zone_bucket *zb) {
- *zb = zone_bucket{source_zone, pipe.source.get_bucket()};
- return filter_cb(source_zone, zb->bucket, dest_zone, pipe.dest.get_bucket());
- });
- }
-
- template <typename CB>
- void try_add_dest(const string& source_zone,
+template <typename CB>
+void rgw_sync_group_pipe_map::try_add_source(const string& source_zone,
const string& dest_zone,
const std::vector<rgw_sync_bucket_pipe>& pipes,
CB filter_cb)
- {
- return try_add_to_pipe_map(source_zone, dest_zone, pipes,
- &dests,
- filter_cb,
- [&](const rgw_sync_bucket_pipe& pipe, zone_bucket *zb) {
- *zb = zone_bucket{dest_zone, pipe.dest.get_bucket()};
- return filter_cb(source_zone, pipe.source.get_bucket(), dest_zone, zb->bucket);
- });
- }
-
- pair<zb_pipe_map_t::const_iterator, zb_pipe_map_t::const_iterator> find_pipes(const zb_pipe_map_t& m,
- const string& zone,
- std::optional<rgw_bucket> b) {
- if (!b) {
- return m.equal_range(zone_bucket{zone, rgw_bucket()});
- }
+{
+ return try_add_to_pipe_map(source_zone, dest_zone, pipes,
+ &sources,
+ filter_cb,
+ [&](const rgw_sync_bucket_pipe& pipe, zone_bucket *zb) {
+ *zb = zone_bucket{source_zone, pipe.source.get_bucket()};
+ return filter_cb(source_zone, zb->bucket, dest_zone, pipe.dest.get_bucket());
+ });
+}
- auto zb = zone_bucket{zone, *bucket};
+template <typename CB>
+void rgw_sync_group_pipe_map::try_add_dest(const string& source_zone,
+ const string& dest_zone,
+ const std::vector<rgw_sync_bucket_pipe>& pipes,
+ CB filter_cb)
+{
+ return try_add_to_pipe_map(source_zone, dest_zone, pipes,
+ &dests,
+ filter_cb,
+ [&](const rgw_sync_bucket_pipe& pipe, zone_bucket *zb) {
+ *zb = zone_bucket{dest_zone, pipe.dest.get_bucket()};
+ return filter_cb(source_zone, pipe.source.get_bucket(), dest_zone, zb->bucket);
+ });
+}
- auto range = m.equal_range(zb);
- if (range.first == range.second &&
- !is_wildcard_bucket(*bucket)) {
- /* couldn't find the specific bucket, try to find by wildcard */
- zb.bucket = rgw_bucket();
- range = m.equal_range(zb);
- }
+using zb_pipe_map_t = rgw_sync_group_pipe_map::zb_pipe_map_t;
- return range;
+pair<zb_pipe_map_t::const_iterator, zb_pipe_map_t::const_iterator> rgw_sync_group_pipe_map::find_pipes(const zb_pipe_map_t& m,
+ const string& zone,
+ std::optional<rgw_bucket> b)
+{
+ if (!b) {
+ return m.equal_range(zone_bucket{zone, rgw_bucket()});
}
+ auto zb = zone_bucket{zone, *bucket};
- template <typename CB>
- void init(const string& _zone,
- std::optional<rgw_bucket> _bucket,
- const rgw_sync_policy_group& group,
- CB filter_cb) {
- zone = _zone;
- bucket = _bucket;
+ auto range = m.equal_range(zb);
+ if (range.first == range.second &&
+ !is_wildcard_bucket(*bucket)) {
+ /* couldn't find the specific bucket, try to find by wildcard */
+ zb.bucket = rgw_bucket();
+ range = m.equal_range(zb);
+ }
- status = group.status;
+ return range;
+}
- std::vector<rgw_sync_bucket_pipe> zone_pipes;
- /* only look at pipes that touch the specific zone and bucket */
- for (auto& pipe : group.pipes) {
- if (pipe.contains_zone(zone) &&
- pipe.contains_bucket(bucket)) {
- zone_pipes.push_back(pipe);
- }
- }
+template <typename CB>
+void rgw_sync_group_pipe_map::init(const string& _zone,
+ std::optional<rgw_bucket> _bucket,
+ const rgw_sync_policy_group& group,
+ CB filter_cb) {
+ zone = _zone;
+ bucket = _bucket;
- if (group.data_flow.empty()) {
- return;
+ zone_bucket zb(zone, bucket);
+
+ status = group.status;
+
+ std::vector<rgw_sync_bucket_pipe> zone_pipes;
+
+ /* only look at pipes that touch the specific zone and bucket */
+ for (auto& pipe : group.pipes) {
+ if (pipe.contains_zone_bucket(zone, bucket)) {
+ zone_pipes.push_back(pipe);
}
+ }
- auto& flow = group.data_flow;
-
- /* symmetrical */
- if (flow.symmetrical) {
- for (auto& symmetrical_group : *flow.symmetrical) {
- if (symmetrical_group.zones.find(zone) != symmetrical_group.zones.end()) {
- for (auto& z : symmetrical_group.zones) {
- if (z != zone) {
- try_add_source(z, zone, zone_pipes, filter_cb);
- try_add_dest(zone, z, zone_pipes, filter_cb);
- }
+ if (group.data_flow.empty()) {
+ return;
+ }
+
+ auto& flow = group.data_flow;
+
+ /* symmetrical */
+ if (flow.symmetrical) {
+ for (auto& symmetrical_group : *flow.symmetrical) {
+ if (symmetrical_group.zones.find(zone) != symmetrical_group.zones.end()) {
+ for (auto& z : symmetrical_group.zones) {
+ if (z != zone) {
+ try_add_source(z, zone, zone_pipes, filter_cb);
+ try_add_dest(zone, z, zone_pipes, filter_cb);
}
}
}
}
+ }
- /* directional */
- if (flow.directional) {
- for (auto& rule : *flow.directional) {
- if (rule.source_zone == zone) {
- try_add_dest(zone, rule.dest_zone, zone_pipes, filter_cb);
- } else if (rule.dest_zone == zone) {
- try_add_source(rule.source_zone, zone, zone_pipes, filter_cb);
- }
+ /* directional */
+ if (flow.directional) {
+ for (auto& rule : *flow.directional) {
+ if (rule.source_zone == zone) {
+ try_add_dest(zone, rule.dest_zone, zone_pipes, filter_cb);
+ } else if (rule.dest_zone == zone) {
+ try_add_source(rule.source_zone, zone, zone_pipes, filter_cb);
}
}
}
+}
- /*
- * find all relevant pipes in our zone that match {dest_bucket} <- {source_zone, source_bucket}
- */
- vector<rgw_sync_bucket_pipe> find_source_pipes(const string& source_zone,
- std::optional<rgw_bucket> source_bucket,
- std::optional<rgw_bucket> dest_bucket) {
- vector<rgw_sync_bucket_pipe> result;
+/*
+ * find all relevant pipes in our zone that match {dest_bucket} <- {source_zone, source_bucket}
+ */
+vector<rgw_sync_bucket_pipe> rgw_sync_group_pipe_map::find_source_pipes(const string& source_zone,
+ std::optional<rgw_bucket> source_bucket,
+ std::optional<rgw_bucket> dest_bucket) {
+ vector<rgw_sync_bucket_pipe> result;
- auto range = find_pipes(sources, source_zone, source_bucket);
+ auto range = find_pipes(sources, source_zone, source_bucket);
- for (auto iter = range.first; iter != range.second; ++iter) {
- auto pipe = iter->second;
- if (pipe.dest.match_bucket(dest_bucket)) {
- result.push_back(pipe);
- }
+ for (auto iter = range.first; iter != range.second; ++iter) {
+ auto pipe = iter->second;
+ if (pipe.dest.match_bucket(dest_bucket)) {
+ result.push_back(pipe);
}
- return std::move(result);
}
+ return std::move(result);
+}
- /*
- * find all relevant pipes in other zones that pull from a specific
- * source bucket in out zone {source_bucket} -> {dest_zone, dest_bucket}
- */
- vector<rgw_sync_bucket_pipe> find_dest_pipes(std::optional<rgw_bucket> source_bucket,
- const string& dest_zone,
- std::optional<rgw_bucket> dest_bucket) {
- vector<rgw_sync_bucket_pipe> result;
+/*
+ * find all relevant pipes in other zones that pull from a specific
+ * source bucket in out zone {source_bucket} -> {dest_zone, dest_bucket}
+ */
+vector<rgw_sync_bucket_pipe> rgw_sync_group_pipe_map::find_dest_pipes(std::optional<rgw_bucket> source_bucket,
+ const string& dest_zone,
+ std::optional<rgw_bucket> dest_bucket) {
+ vector<rgw_sync_bucket_pipe> result;
- auto range = find_pipes(dests, dest_zone, dest_bucket);
+ auto range = find_pipes(dests, dest_zone, dest_bucket);
- for (auto iter = range.first; iter != range.second; ++iter) {
- auto pipe = iter->second;
- if (pipe.source.match_bucket(source_bucket)) {
- result.push_back(pipe);
- }
+ for (auto iter = range.first; iter != range.second; ++iter) {
+ auto pipe = iter->second;
+ if (pipe.source.match_bucket(source_bucket)) {
+ result.push_back(pipe);
}
+ }
+
+ return std::move(result);
+}
- return std::move(result);
+/*
+ * find all relevant pipes from {source_zone, source_bucket} -> {dest_zone, dest_bucket}
+ */
+vector<rgw_sync_bucket_pipe> rgw_sync_group_pipe_map::find_pipes(const string& source_zone,
+ std::optional<rgw_bucket> source_bucket,
+ const string& dest_zone,
+ std::optional<rgw_bucket> dest_bucket) {
+ if (dest_zone == zone) {
+ return find_source_pipes(source_zone, source_bucket, dest_bucket);
}
- /*
- * find all relevant pipes from {source_zone, source_bucket} -> {dest_zone, dest_bucket}
- */
- vector<rgw_sync_bucket_pipe> find_pipes(const string& source_zone,
- std::optional<rgw_bucket> source_bucket,
- const string& dest_zone,
- std::optional<rgw_bucket> dest_bucket) {
- if (dest_zone == zone) {
- return find_source_pipes(source_zone, source_bucket, dest_bucket);
- }
+ if (source_zone == zone) {
+ return find_dest_pipes(source_bucket, dest_zone, dest_bucket);
+ }
- if (source_zone == zone) {
- return find_dest_pipes(source_bucket, dest_zone, dest_bucket);
- }
+ return vector<rgw_sync_bucket_pipe>();
+}
- return vector<rgw_sync_bucket_pipe>();
+void RGWBucketSyncFlowManager::pipe_flow::dump(ceph::Formatter *f) const
+{
+ {
+ Formatter::ArraySection os(*f, "flow_groups");
+ for (auto& g : flow_groups) {
+ encode_json("group", *g, f);
+ }
}
-};
+ encode_json("pipe", pipe, f);
+}
bool RGWBucketSyncFlowManager::allowed_data_flow(const string& source_zone,
std::optional<rgw_bucket> source_bucket,
void RGWBucketSyncFlowManager::update_flow_maps(const rgw_sync_bucket_pipe& pipe,
- group_pipe_map *flow_group) {
+ rgw_sync_group_pipe_map *flow_group) {
auto source_bucket = pipe.source.get_bucket();
auto dest_bucket = pipe.dest.get_bucket();
+ if (!flow_group->sources.empty()) {
+ auto& by_source = flow_by_source[source_bucket];
+ by_source.flow_groups.push_back(flow_group);
+ by_source.pipe.push_back(pipe);
+ }
+
+ if (!flow_group->dests.empty()) {
+ auto& by_dest = flow_by_dest[dest_bucket];
+ by_dest.flow_groups.push_back(flow_group);
+ by_dest.pipe.push_back(pipe);
+ }
+
+#if 0
if (!bucket ||
*bucket != source_bucket) {
auto& by_source = flow_by_source[source_bucket];
by_dest.flow_groups.push_back(flow_group);
by_dest.pipe.push_back(pipe);
}
+#endif
}
void RGWBucketSyncFlowManager::init(const rgw_sync_policy_info& sync_policy) {
auto& group = item.second;
auto& flow_group_map = flow_groups[group.id];
- flow_group_map.init(zone_svc->zone_name(), bucket, group,
+ flow_group_map.init(zone_name, bucket, group,
[&](const string& source_zone,
std::optional<rgw_bucket> source_bucket,
const string& dest_zone,
std::optional<rgw_bucket> dest_bucket) {
if (!parent) {
- return true;
+ return true;
}
return parent->allowed_data_flow(source_zone,
source_bucket,
}
-RGWBucketSyncFlowManager::RGWBucketSyncFlowManager(RGWSI_Zone *_zone_svc,
+RGWBucketSyncFlowManager::RGWBucketSyncFlowManager(const string& _zone_name,
std::optional<rgw_bucket> _bucket,
- RGWBucketSyncFlowManager *_parent) : zone_svc(_zone_svc),
+ RGWBucketSyncFlowManager *_parent) : zone_name(_zone_name),
bucket(_bucket),
parent(_parent) {}
+
+
int RGWBucketSyncPolicyHandler::init()
{
#warning FIXME
#pragma once
#include "rgw_common.h"
+#include "rgw_sync_policy.h"
class RGWSI_Zone;
-struct group_pipe_map;
-struct rgw_sync_bucket_pipe;;
+struct rgw_sync_group_pipe_map;
+struct rgw_sync_bucket_pipe;
struct rgw_sync_policy_info;
-class RGWBucketSyncFlowManager {
- RGWSI_Zone *zone_svc;
+struct rgw_sync_group_pipe_map {
+ string zone;
std::optional<rgw_bucket> bucket;
- RGWBucketSyncFlowManager *parent{nullptr};
+ rgw_sync_policy_group::Status status{rgw_sync_policy_group::Status::FORBIDDEN};
+
+ struct zone_bucket {
+ string zone; /* zone name */
+ rgw_bucket bucket; /* bucket, if empty then wildcard */
+
+ zone_bucket() {}
+ zone_bucket(const string& _zone,
+ std::optional<rgw_bucket> _bucket) : zone(_zone),
+ bucket(_bucket.value_or(rgw_bucket())) {}
- map<string, group_pipe_map> flow_groups;
+ bool operator<(const zone_bucket& zb) const {
+ if (zone < zb.zone) {
+ return true;
+ }
+ if (zone > zb.zone) {
+ return false;
+ }
+ return (bucket < zb.bucket);
+ }
+ void dump(ceph::Formatter *f) const;
+ };
+
+ using zb_pipe_map_t = std::multimap<zone_bucket, rgw_sync_bucket_pipe>;
+
+ zb_pipe_map_t sources; /* all the pipes where zone is pulling from */
+ zb_pipe_map_t dests; /* all the pipes that pull from zone */
+
+ void dump(ceph::Formatter *f) const;
+
+ template <typename CB1, typename CB2>
+ void try_add_to_pipe_map(const string& source_zone,
+ const string& dest_zone,
+ const std::vector<rgw_sync_bucket_pipe>& pipes,
+ zb_pipe_map_t *pipe_map,
+ CB1 filter_cb,
+ CB2 call_filter_cb);
+
+ template <typename CB>
+ void try_add_source(const string& source_zone,
+ const string& dest_zone,
+ const std::vector<rgw_sync_bucket_pipe>& pipes,
+ CB filter_cb);
+
+ template <typename CB>
+ void try_add_dest(const string& source_zone,
+ const string& dest_zone,
+ const std::vector<rgw_sync_bucket_pipe>& pipes,
+ CB filter_cb);
+
+ pair<zb_pipe_map_t::const_iterator, zb_pipe_map_t::const_iterator> find_pipes(const zb_pipe_map_t& m,
+ const string& zone,
+ std::optional<rgw_bucket> b);
+
+ template <typename CB>
+ void init(const string& _zone,
+ std::optional<rgw_bucket> _bucket,
+ const rgw_sync_policy_group& group,
+ CB filter_cb);
+
+ /*
+ * find all relevant pipes in our zone that match {dest_bucket} <- {source_zone, source_bucket}
+ */
+ vector<rgw_sync_bucket_pipe> find_source_pipes(const string& source_zone,
+ std::optional<rgw_bucket> source_bucket,
+ std::optional<rgw_bucket> dest_bucket);
+
+ /*
+ * find all relevant pipes in other zones that pull from a specific
+ * source bucket in out zone {source_bucket} -> {dest_zone, dest_bucket}
+ */
+ vector<rgw_sync_bucket_pipe> find_dest_pipes(std::optional<rgw_bucket> source_bucket,
+ const string& dest_zone,
+ std::optional<rgw_bucket> dest_bucket);
+
+ /*
+ * find all relevant pipes from {source_zone, source_bucket} -> {dest_zone, dest_bucket}
+ */
+ vector<rgw_sync_bucket_pipe> find_pipes(const string& source_zone,
+ std::optional<rgw_bucket> source_bucket,
+ const string& dest_zone,
+ std::optional<rgw_bucket> dest_bucket);
+};
+
+class RGWBucketSyncFlowManager {
+public:
struct pipe_flow {
- vector<group_pipe_map *> flow_groups;
+ vector<rgw_sync_group_pipe_map *> flow_groups;
vector<rgw_sync_bucket_pipe> pipe;
+
+ void dump(ceph::Formatter *f) const;
};
+ using flow_map_t = map<rgw_bucket, pipe_flow>;
+
+private:
+
+ string zone_name;
+ std::optional<rgw_bucket> bucket;
+
+ RGWBucketSyncFlowManager *parent{nullptr};
+
+ map<string, rgw_sync_group_pipe_map> flow_groups;
+
bool allowed_data_flow(const string& source_zone,
std::optional<rgw_bucket> source_bucket,
const string& dest_zone,
std::optional<rgw_bucket> dest_bucket,
bool check_activated);
- using flow_map_t = map<rgw_bucket, pipe_flow>;
-
flow_map_t flow_by_source;
flow_map_t flow_by_dest;
flow_map_t::iterator find_bucket_flow(flow_map_t& m, std::optional<rgw_bucket> bucket);
void update_flow_maps(const rgw_sync_bucket_pipe& pipe,
- group_pipe_map *flow_group);
+ rgw_sync_group_pipe_map *flow_group);
public:
- RGWBucketSyncFlowManager(RGWSI_Zone *_zone_svc,
+ RGWBucketSyncFlowManager(const string& _zone_name,
std::optional<rgw_bucket> _bucket,
RGWBucketSyncFlowManager *_parent);
void init(const rgw_sync_policy_info& sync_policy);
+
+ const flow_map_t& get_sources() {
+ return flow_by_source;
+ }
+ const flow_map_t& get_dests() {
+ return flow_by_dest;
+ }
};
class RGWBucketSyncPolicyHandler {