From: Yehuda Sadeh Date: Thu, 17 Oct 2019 01:41:41 +0000 (-0700) Subject: rgw-admin: sync group flow add command X-Git-Tag: v15.1.0~22^2~100 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=ba563aebbeb45be454a3fa8192e24535f600ccba;p=ceph.git rgw-admin: sync group flow add command Signed-off-by: Yehuda Sadeh --- diff --git a/src/rgw/rgw_admin.cc b/src/rgw/rgw_admin.cc index abda156cee78..3c2ba158047c 100644 --- a/src/rgw/rgw_admin.cc +++ b/src/rgw/rgw_admin.cc @@ -651,6 +651,7 @@ enum class OPT { SYNC_ERROR_TRIM, SYNC_GROUP_CREATE, SYNC_GROUP_REMOVE, + SYNC_GROUP_FLOW_ADD, SYNC_POLICY_GET, BILOG_LIST, BILOG_TRIM, @@ -848,6 +849,7 @@ static SimpleCmd::Commands all_cmds = { { "sync policy get", OPT::SYNC_POLICY_GET }, { "sync group create", OPT::SYNC_GROUP_CREATE }, { "sync group remove", OPT::SYNC_GROUP_REMOVE }, + { "sync group flow add", OPT::SYNC_GROUP_FLOW_ADD }, { "bilog list", OPT::BILOG_LIST }, { "bilog trim", OPT::BILOG_TRIM }, { "bilog status", OPT::BILOG_STATUS }, @@ -2579,6 +2581,16 @@ const string& get_tier_type(rgw::sal::RGWRadosStore *store) { return store->svc()->zone->get_zone().tier_type; } +static bool symmetrical_flow_opt(const string& opt) +{ + return (opt == "symmetrical" || opt == "symmetric"); +} + +static bool directional_flow_opt(const string& opt) +{ + return (opt == "directional" || opt == "direction"); +} + int main(int argc, const char **argv) { vector args; @@ -2757,8 +2769,13 @@ int main(int argc, const char **argv) string sub_push_endpoint; string event_id; - std::optional group_id; + std::optional opt_group_id; std::optional opt_status; + std::optional opt_flow_type; + std::optional > opt_zones; + std::optional opt_flow_id; + std::optional opt_source_zone; + std::optional opt_dest_zone; rgw::notify::EventTypeList event_types; @@ -3042,6 +3059,9 @@ int main(int argc, const char **argv) sync_from_all_specified = true; } else if (ceph_argparse_witharg(args, i, &val, "--source-zone", (char*)NULL)) { source_zone_name = val; + opt_source_zone = val; + } else if (ceph_argparse_witharg(args, i, &val, "--dest-zone", (char*)NULL)) { + opt_dest_zone = val; } else if (ceph_argparse_witharg(args, i, &val, "--tier-type", (char*)NULL)) { tier_type = val; tier_type_specified = true; @@ -3111,9 +3131,17 @@ int main(int argc, const char **argv) } else if (ceph_argparse_witharg(args, i, &val, "--event-type", "--event-types", (char*)NULL)) { rgw::notify::from_string_list(val, event_types); } else if (ceph_argparse_witharg(args, i, &val, "--group-id", (char*)NULL)) { - group_id = val; + opt_group_id = val; } else if (ceph_argparse_witharg(args, i, &val, "--status", (char*)NULL)) { opt_status = val; + } else if (ceph_argparse_witharg(args, i, &val, "--flow-type", (char*)NULL)) { + opt_flow_type = val; + } else if (ceph_argparse_witharg(args, i, &val, "--zones", (char*)NULL)) { + vector v; + get_str_vec(val, v); + opt_zones = std::move(v); + } else if (ceph_argparse_witharg(args, i, &val, "--flow-id", (char*)NULL)) { + opt_flow_id = val; } else if (ceph_argparse_binary_flag(args, i, &detail, NULL, "--detail", (char*)NULL)) { // do nothing } else if (strncmp(*i, "-", 1) == 0) { @@ -7543,7 +7571,7 @@ next: } if (opt_cmd == OPT::SYNC_GROUP_CREATE) { - if (!group_id) { + if (!opt_group_id) { cerr << "ERROR: --group-id is not specified" << std::endl; return EINVAL; } @@ -7559,8 +7587,8 @@ next: return -ret; } - auto& group = zonegroup.sync_policy.groups[*group_id]; - group.id = *group_id; + auto& group = zonegroup.sync_policy.groups[*opt_group_id]; + group.id = *opt_group_id; if (opt_status) { if (!group.set_status(*opt_status)) { @@ -7584,11 +7612,52 @@ next: } if (opt_cmd == OPT::SYNC_GROUP_REMOVE) { - if (!group_id) { + if (!opt_group_id) { + cerr << "ERROR: --group-id not specified" << std::endl; + return EINVAL; + } + + RGWZoneGroup zonegroup(zonegroup_id, zonegroup_name); + ret = zonegroup.init(g_ceph_context, store->svc()->sysobj); + if (ret < 0) { + cerr << "failed to init zonegroup: " << cpp_strerror(-ret) << std::endl; + return -ret; + } + + zonegroup.sync_policy.groups.erase(*opt_group_id); + + ret = zonegroup.update(); + if (ret < 0) { + cerr << "failed to update zonegroup: " << cpp_strerror(-ret) << std::endl; + return -ret; + } + + { + Formatter::ObjectSection os(*formatter, "result"); + encode_json("sync_policy", zonegroup.sync_policy, formatter); + } + + formatter->flush(cout); + } + + if (opt_cmd == OPT::SYNC_GROUP_FLOW_ADD) { + if (!opt_group_id) { cerr << "ERROR: --group-id not specified" << std::endl; return EINVAL; } + if (!opt_flow_id) { + cerr << "ERROR: --flow-id not specified" << std::endl; + return EINVAL; + } + + if (!opt_flow_type || + (!symmetrical_flow_opt(*opt_flow_type) && + !directional_flow_opt(*opt_flow_type))) { + cerr << "ERROR: --flow-type not specified or invalid (options: symmetrical, directional)" << std::endl; + return EINVAL; + } + RGWZoneGroup zonegroup(zonegroup_id, zonegroup_name); ret = zonegroup.init(g_ceph_context, store->svc()->sysobj); if (ret < 0) { @@ -7596,8 +7665,42 @@ next: return -ret; } - zonegroup.sync_policy.groups.erase(*group_id); + auto iter = zonegroup.sync_policy.groups.find(*opt_group_id); + if (iter == zonegroup.sync_policy.groups.end()) { + cerr << "ERROR: could not find group '" << *opt_group_id << "'" << std::endl; + return ENOENT; + } + + auto& group = iter->second; + if (symmetrical_flow_opt(*opt_flow_type)) { + if (!opt_zones || opt_zones->empty()) { + cerr << "ERROR: --zones not provided for symmetrical flow, or is empty" << std::endl; + return EINVAL; + } + + rgw_sync_symmetric_group *flow_group; + + group.data_flow.find_symmetrical(*opt_flow_id, true, &flow_group); + + for (auto& z : *opt_zones) { + flow_group->zones.insert(z); + } + } else { /* directional */ + if (!opt_source_zone || opt_source_zone->empty()) { + cerr << "ERROR: --source-zone not provided for directional flow rule, or is empty" << std::endl; + return EINVAL; + } + if (!opt_dest_zone || opt_dest_zone->empty()) { + cerr << "ERROR: --dest-zone not provided for directional flow rule, or is empty" << std::endl; + return EINVAL; + } + + rgw_sync_directional_rule *flow_rule; + + group.data_flow.find_directional(*opt_source_zone, *opt_dest_zone, true, &flow_rule); + } + ret = zonegroup.update(); if (ret < 0) { cerr << "failed to update zonegroup: " << cpp_strerror(-ret) << std::endl; diff --git a/src/rgw/rgw_bucket_sync.cc b/src/rgw/rgw_bucket_sync.cc index 4e5ed21a0ba9..5af813f273b7 100644 --- a/src/rgw/rgw_bucket_sync.cc +++ b/src/rgw/rgw_bucket_sync.cc @@ -10,6 +10,62 @@ #define dout_subsys ceph_subsys_rgw +bool rgw_sync_data_flow_group::find_symmetrical(const string& flow_id, bool create, rgw_sync_symmetric_group **flow_group) +{ + if (!symmetrical) { + if (!create) { + return false; + } + symmetrical.emplace(); + } + + for (auto& group : *symmetrical) { + if (flow_id == group.id) { + *flow_group = &group; + return true; + } + } + + if (!create) { + return false; + } + + auto& group = symmetrical->emplace_back(); + *flow_group = &group; + (*flow_group)->id = flow_id; + return true; +} + +bool rgw_sync_data_flow_group::find_directional(const string& source_zone, const string& dest_zone, bool create, rgw_sync_directional_rule **flow_group) +{ + if (!directional) { + if (!create) { + return false; + } + directional.emplace(); + } + + for (auto& rule : *directional) { + if (source_zone == rule.source_zone && + dest_zone == rule.dest_zone) { + *flow_group = &rule; + return true; + } + } + + if (!create) { + return false; + } + + auto& rule = directional->emplace_back(); + *flow_group = &rule; + + rule.source_zone = source_zone; + rule.dest_zone = dest_zone; + + return true; +} + static std::vector filter_relevant_pipes(const std::vector& pipes, const string& source_zone, const string& dest_zone) @@ -141,25 +197,21 @@ struct group_pipe_map { status = group.status; - auto& pipes = group.pipes; - std::vector zone_pipes; /* only look at pipes that touch the specific zone and bucket */ - if (pipes) { - for (auto& pipe : *pipes) { - if (pipe.contains_zone(zone) && - pipe.contains_bucket(bucket)) { - zone_pipes.push_back(pipe); - } + for (auto& pipe : group.pipes) { + if (pipe.contains_zone(zone) && + pipe.contains_bucket(bucket)) { + zone_pipes.push_back(pipe); } } - if (!group.data_flow) { + if (group.data_flow.empty()) { return; } - auto& flow = *group.data_flow; + auto& flow = group.data_flow; /* symmetrical */ if (flow.symmetrical) { @@ -388,8 +440,8 @@ public: false); /* just check that it's not disabled */ }); - if (group.pipes) { - for (auto& pipe : *group.pipes) { + if (!group.pipes.empty()) { + for (auto& pipe : group.pipes) { if (!pipe.contains_bucket(bucket)) { continue; } diff --git a/src/rgw/rgw_json_enc.cc b/src/rgw/rgw_json_enc.cc index d382c3361099..35bbbebad0f7 100644 --- a/src/rgw/rgw_json_enc.cc +++ b/src/rgw/rgw_json_enc.cc @@ -844,22 +844,24 @@ void RGWBucketInfo::decode_json(JSONObj *obj) { void rgw_sync_directional_rule::dump(Formatter *f) const { encode_json("source_zone", source_zone, f); - encode_json("target_zone", target_zone, f); + encode_json("dest_zone", dest_zone, f); } void rgw_sync_directional_rule::decode_json(JSONObj *obj) { JSONDecoder::decode_json("source_zone", source_zone, obj); - JSONDecoder::decode_json("target_zone", target_zone, obj); + JSONDecoder::decode_json("dest_zone", dest_zone, obj); } void rgw_sync_symmetric_group::dump(Formatter *f) const { + encode_json("id", id, f); encode_json("zones", zones, f); } void rgw_sync_symmetric_group::decode_json(JSONObj *obj) { + JSONDecoder::decode_json("id", id, obj); JSONDecoder::decode_json("zones", zones, obj); } @@ -878,13 +880,13 @@ void rgw_sync_bucket_entity::decode_json(JSONObj *obj) void rgw_sync_bucket_pipe::dump(Formatter *f) const { encode_json("source", source, f); - encode_json("target", target, f); + encode_json("dest", dest, f); } void rgw_sync_bucket_pipe::decode_json(JSONObj *obj) { JSONDecoder::decode_json("source", source, obj); - JSONDecoder::decode_json("target", target, obj); + JSONDecoder::decode_json("dest", dest, obj); } void rgw_sync_data_flow_group::dump(Formatter *f) const diff --git a/src/rgw/rgw_sync_policy.h b/src/rgw/rgw_sync_policy.h index 56058f7d6d9b..07479968cec8 100644 --- a/src/rgw/rgw_sync_policy.h +++ b/src/rgw/rgw_sync_policy.h @@ -174,16 +174,19 @@ WRITE_CLASS_ENCODER(rgw_sync_policy_info) #endif struct rgw_sync_symmetric_group { + string id; std::set zones; void encode(bufferlist& bl) const { ENCODE_START(1, 1, bl); + encode(id, bl); encode(zones, bl); ENCODE_FINISH(bl); } void decode(bufferlist::const_iterator& bl) { DECODE_START(1, bl); + decode(id, bl); decode(zones, bl); DECODE_FINISH(bl); } @@ -350,6 +353,14 @@ struct rgw_sync_data_flow_group { void dump(ceph::Formatter *f) const; void decode_json(JSONObj *obj); + + bool empty() const { + return ((!symmetrical || symmetrical->empty()) && + (!directional || directional->empty())); + } + + bool find_symmetrical(const string& flow_id, bool create, rgw_sync_symmetric_group **flow_group); + bool find_directional(const string& source_zone, const string& dest_zone, bool create, rgw_sync_directional_rule **flow_group); }; WRITE_CLASS_ENCODER(rgw_sync_data_flow_group) @@ -357,9 +368,9 @@ WRITE_CLASS_ENCODER(rgw_sync_data_flow_group) struct rgw_sync_policy_group { string id; - std::optional data_flow; /* override data flow, howver, will not be able to + rgw_sync_data_flow_group data_flow; /* override data flow, howver, will not be able to add new flows that don't exist at higher level */ - std::optional > pipes; /* if not defined then applies to all + std::vector pipes; /* if not defined then applies to all buckets (DR sync) */ enum Status { diff --git a/src/rgw/rgw_zone.h b/src/rgw/rgw_zone.h index 9c1f018cda98..002c72e443b8 100644 --- a/src/rgw/rgw_zone.h +++ b/src/rgw/rgw_zone.h @@ -740,7 +740,7 @@ struct RGWZoneGroup : public RGWSystemMetaObj { void post_process_params(); void encode(bufferlist& bl) const override { - ENCODE_START(4, 1, bl); + ENCODE_START(5, 1, bl); encode(name, bl); encode(api_name, bl); encode(is_master, bl); @@ -753,11 +753,12 @@ struct RGWZoneGroup : public RGWSystemMetaObj { encode(hostnames_s3website, bl); RGWSystemMetaObj::encode(bl); encode(realm_id, bl); + encode(sync_policy, bl); ENCODE_FINISH(bl); } void decode(bufferlist::const_iterator& bl) override { - DECODE_START(4, bl); + DECODE_START(5, bl); decode(name, bl); decode(api_name, bl); decode(is_master, bl); @@ -778,6 +779,9 @@ struct RGWZoneGroup : public RGWSystemMetaObj { } else { id = name; } + if (struct_v >= 5) { + decode(sync_policy, bl); + } DECODE_FINISH(bl); }