SYNC_ERROR_TRIM,
SYNC_GROUP_CREATE,
SYNC_GROUP_REMOVE,
+ SYNC_GROUP_FLOW_ADD,
SYNC_POLICY_GET,
BILOG_LIST,
BILOG_TRIM,
{ "sync policy get", OPT::SYNC_POLICY_GET },
{ "sync group create", OPT::SYNC_GROUP_CREATE },
{ "sync group remove", OPT::SYNC_GROUP_REMOVE },
+ { "sync group flow add", OPT::SYNC_GROUP_FLOW_ADD },
{ "bilog list", OPT::BILOG_LIST },
{ "bilog trim", OPT::BILOG_TRIM },
{ "bilog status", OPT::BILOG_STATUS },
return store->svc()->zone->get_zone().tier_type;
}
+static bool symmetrical_flow_opt(const string& opt)
+{
+ return (opt == "symmetrical" || opt == "symmetric");
+}
+
+static bool directional_flow_opt(const string& opt)
+{
+ return (opt == "directional" || opt == "direction");
+}
+
int main(int argc, const char **argv)
{
vector<const char*> args;
string sub_push_endpoint;
string event_id;
- std::optional<string> group_id;
+ std::optional<string> opt_group_id;
std::optional<string> opt_status;
+ std::optional<string> opt_flow_type;
+ std::optional<vector<string> > opt_zones;
+ std::optional<string> opt_flow_id;
+ std::optional<string> opt_source_zone;
+ std::optional<string> opt_dest_zone;
rgw::notify::EventTypeList event_types;
sync_from_all_specified = true;
} else if (ceph_argparse_witharg(args, i, &val, "--source-zone", (char*)NULL)) {
source_zone_name = val;
+ opt_source_zone = val;
+ } else if (ceph_argparse_witharg(args, i, &val, "--dest-zone", (char*)NULL)) {
+ opt_dest_zone = val;
} else if (ceph_argparse_witharg(args, i, &val, "--tier-type", (char*)NULL)) {
tier_type = val;
tier_type_specified = true;
} else if (ceph_argparse_witharg(args, i, &val, "--event-type", "--event-types", (char*)NULL)) {
rgw::notify::from_string_list(val, event_types);
} else if (ceph_argparse_witharg(args, i, &val, "--group-id", (char*)NULL)) {
- group_id = val;
+ opt_group_id = val;
} else if (ceph_argparse_witharg(args, i, &val, "--status", (char*)NULL)) {
opt_status = val;
+ } else if (ceph_argparse_witharg(args, i, &val, "--flow-type", (char*)NULL)) {
+ opt_flow_type = val;
+ } else if (ceph_argparse_witharg(args, i, &val, "--zones", (char*)NULL)) {
+ vector<string> v;
+ get_str_vec(val, v);
+ opt_zones = std::move(v);
+ } else if (ceph_argparse_witharg(args, i, &val, "--flow-id", (char*)NULL)) {
+ opt_flow_id = val;
} else if (ceph_argparse_binary_flag(args, i, &detail, NULL, "--detail", (char*)NULL)) {
// do nothing
} else if (strncmp(*i, "-", 1) == 0) {
}
if (opt_cmd == OPT::SYNC_GROUP_CREATE) {
- if (!group_id) {
+ if (!opt_group_id) {
cerr << "ERROR: --group-id is not specified" << std::endl;
return EINVAL;
}
return -ret;
}
- auto& group = zonegroup.sync_policy.groups[*group_id];
- group.id = *group_id;
+ auto& group = zonegroup.sync_policy.groups[*opt_group_id];
+ group.id = *opt_group_id;
if (opt_status) {
if (!group.set_status(*opt_status)) {
}
if (opt_cmd == OPT::SYNC_GROUP_REMOVE) {
- if (!group_id) {
+ if (!opt_group_id) {
+ cerr << "ERROR: --group-id not specified" << std::endl;
+ return EINVAL;
+ }
+
+ RGWZoneGroup zonegroup(zonegroup_id, zonegroup_name);
+ ret = zonegroup.init(g_ceph_context, store->svc()->sysobj);
+ if (ret < 0) {
+ cerr << "failed to init zonegroup: " << cpp_strerror(-ret) << std::endl;
+ return -ret;
+ }
+
+ zonegroup.sync_policy.groups.erase(*opt_group_id);
+
+ ret = zonegroup.update();
+ if (ret < 0) {
+ cerr << "failed to update zonegroup: " << cpp_strerror(-ret) << std::endl;
+ return -ret;
+ }
+
+ {
+ Formatter::ObjectSection os(*formatter, "result");
+ encode_json("sync_policy", zonegroup.sync_policy, formatter);
+ }
+
+ formatter->flush(cout);
+ }
+
+ if (opt_cmd == OPT::SYNC_GROUP_FLOW_ADD) {
+ if (!opt_group_id) {
cerr << "ERROR: --group-id not specified" << std::endl;
return EINVAL;
}
+ if (!opt_flow_id) {
+ cerr << "ERROR: --flow-id not specified" << std::endl;
+ return EINVAL;
+ }
+
+ if (!opt_flow_type ||
+ (!symmetrical_flow_opt(*opt_flow_type) &&
+ !directional_flow_opt(*opt_flow_type))) {
+ cerr << "ERROR: --flow-type not specified or invalid (options: symmetrical, directional)" << std::endl;
+ return EINVAL;
+ }
+
RGWZoneGroup zonegroup(zonegroup_id, zonegroup_name);
ret = zonegroup.init(g_ceph_context, store->svc()->sysobj);
if (ret < 0) {
return -ret;
}
- zonegroup.sync_policy.groups.erase(*group_id);
+ auto iter = zonegroup.sync_policy.groups.find(*opt_group_id);
+ if (iter == zonegroup.sync_policy.groups.end()) {
+ cerr << "ERROR: could not find group '" << *opt_group_id << "'" << std::endl;
+ return ENOENT;
+ }
+
+ auto& group = iter->second;
+ if (symmetrical_flow_opt(*opt_flow_type)) {
+ if (!opt_zones || opt_zones->empty()) {
+ cerr << "ERROR: --zones not provided for symmetrical flow, or is empty" << std::endl;
+ return EINVAL;
+ }
+
+ rgw_sync_symmetric_group *flow_group;
+
+ group.data_flow.find_symmetrical(*opt_flow_id, true, &flow_group);
+
+ for (auto& z : *opt_zones) {
+ flow_group->zones.insert(z);
+ }
+ } else { /* directional */
+ if (!opt_source_zone || opt_source_zone->empty()) {
+ cerr << "ERROR: --source-zone not provided for directional flow rule, or is empty" << std::endl;
+ return EINVAL;
+ }
+ if (!opt_dest_zone || opt_dest_zone->empty()) {
+ cerr << "ERROR: --dest-zone not provided for directional flow rule, or is empty" << std::endl;
+ return EINVAL;
+ }
+
+ rgw_sync_directional_rule *flow_rule;
+
+ group.data_flow.find_directional(*opt_source_zone, *opt_dest_zone, true, &flow_rule);
+ }
+
ret = zonegroup.update();
if (ret < 0) {
cerr << "failed to update zonegroup: " << cpp_strerror(-ret) << std::endl;
#define dout_subsys ceph_subsys_rgw
+bool rgw_sync_data_flow_group::find_symmetrical(const string& flow_id, bool create, rgw_sync_symmetric_group **flow_group)
+{
+ if (!symmetrical) {
+ if (!create) {
+ return false;
+ }
+ symmetrical.emplace();
+ }
+
+ for (auto& group : *symmetrical) {
+ if (flow_id == group.id) {
+ *flow_group = &group;
+ return true;
+ }
+ }
+
+ if (!create) {
+ return false;
+ }
+
+ auto& group = symmetrical->emplace_back();
+ *flow_group = &group;
+ (*flow_group)->id = flow_id;
+ return true;
+}
+
+bool rgw_sync_data_flow_group::find_directional(const string& source_zone, const string& dest_zone, bool create, rgw_sync_directional_rule **flow_group)
+{
+ if (!directional) {
+ if (!create) {
+ return false;
+ }
+ directional.emplace();
+ }
+
+ for (auto& rule : *directional) {
+ if (source_zone == rule.source_zone &&
+ dest_zone == rule.dest_zone) {
+ *flow_group = &rule;
+ return true;
+ }
+ }
+
+ if (!create) {
+ return false;
+ }
+
+ auto& rule = directional->emplace_back();
+ *flow_group = &rule;
+
+ rule.source_zone = source_zone;
+ rule.dest_zone = dest_zone;
+
+ return true;
+}
+
static std::vector<rgw_sync_bucket_pipe> filter_relevant_pipes(const std::vector<rgw_sync_bucket_pipe>& pipes,
const string& source_zone,
const string& dest_zone)
status = group.status;
- auto& pipes = group.pipes;
-
std::vector<rgw_sync_bucket_pipe> zone_pipes;
/* only look at pipes that touch the specific zone and bucket */
- if (pipes) {
- for (auto& pipe : *pipes) {
- if (pipe.contains_zone(zone) &&
- pipe.contains_bucket(bucket)) {
- zone_pipes.push_back(pipe);
- }
+ for (auto& pipe : group.pipes) {
+ if (pipe.contains_zone(zone) &&
+ pipe.contains_bucket(bucket)) {
+ zone_pipes.push_back(pipe);
}
}
- if (!group.data_flow) {
+ if (group.data_flow.empty()) {
return;
}
- auto& flow = *group.data_flow;
+ auto& flow = group.data_flow;
/* symmetrical */
if (flow.symmetrical) {
false); /* just check that it's not disabled */
});
- if (group.pipes) {
- for (auto& pipe : *group.pipes) {
+ if (!group.pipes.empty()) {
+ for (auto& pipe : group.pipes) {
if (!pipe.contains_bucket(bucket)) {
continue;
}
void rgw_sync_directional_rule::dump(Formatter *f) const
{
encode_json("source_zone", source_zone, f);
- encode_json("target_zone", target_zone, f);
+ encode_json("dest_zone", dest_zone, f);
}
void rgw_sync_directional_rule::decode_json(JSONObj *obj)
{
JSONDecoder::decode_json("source_zone", source_zone, obj);
- JSONDecoder::decode_json("target_zone", target_zone, obj);
+ JSONDecoder::decode_json("dest_zone", dest_zone, obj);
}
void rgw_sync_symmetric_group::dump(Formatter *f) const
{
+ encode_json("id", id, f);
encode_json("zones", zones, f);
}
void rgw_sync_symmetric_group::decode_json(JSONObj *obj)
{
+ JSONDecoder::decode_json("id", id, obj);
JSONDecoder::decode_json("zones", zones, obj);
}
void rgw_sync_bucket_pipe::dump(Formatter *f) const
{
encode_json("source", source, f);
- encode_json("target", target, f);
+ encode_json("dest", dest, f);
}
void rgw_sync_bucket_pipe::decode_json(JSONObj *obj)
{
JSONDecoder::decode_json("source", source, obj);
- JSONDecoder::decode_json("target", target, obj);
+ JSONDecoder::decode_json("dest", dest, obj);
}
void rgw_sync_data_flow_group::dump(Formatter *f) const