From: Alex Wojno Date: Mon, 4 Nov 2024 16:53:47 +0000 (+0000) Subject: rgw-admin: Add --format option for bucket sync status X-Git-Tag: v20.0.0~501^2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=44c31faa0e71c101884019fad84b735869769441;p=ceph.git rgw-admin: Add --format option for bucket sync status Signed-off-by: Alex Wojno --- diff --git a/src/rgw/rgw_admin.cc b/src/rgw/rgw_admin.cc index b00dfaa1ec5..1850e67730e 100644 --- a/src/rgw/rgw_admin.cc +++ b/src/rgw/rgw_admin.cc @@ -2549,35 +2549,104 @@ std::ostream& operator<<(std::ostream& out, const indented& h) { return out << std::setw(h.w) << h.header << std::setw(1) << ' '; } -static int bucket_source_sync_status(const DoutPrefixProvider *dpp, rgw::sal::RadosStore* driver, const RGWZone& zone, +struct bucket_source_sync_info { + const RGWZone& _source; + std::string_view error; + std::map shards_behind; + int total_shards; + std::string_view status; + rgw_bucket bucket_source; + + bucket_source_sync_info(const RGWZone& source): _source(source) {} + + void _print_plaintext(std::ostream& out, int width) const { + out << indented{width, "source zone"} << _source.id << " (" << _source.name << ")" << std::endl; + if (!error.empty()) { + out << indented{width} << error << std::endl; + return; + } + out << indented{width, "source bucket"} << bucket_source << std::endl; + if (!status.empty()) { + out << indented{width} << status << std::endl; + return; + } + out << indented{width} << "incremental sync on " << total_shards << " shards\n"; + if (!shards_behind.empty()) { + out << indented{width} << "bucket is behind on " << shards_behind.size() << " shards\n"; + set shard_ids; + for (auto const& [shard_id, _] : shards_behind) { + shard_ids.insert(shard_id); + } + out << indented{width} << "behind shards: [" << shard_ids << "]\n"; + } else { + out << indented{width} << "bucket is caught up with source\n"; + } + } + + void _print_formatter(std::ostream& out, Formatter* formatter) const { + formatter->open_object_section("source"); + formatter->dump_string("source_zone", _source.id); + formatter->dump_string("source_name", _source.name); + + if (!error.empty()) { + formatter->dump_string("error", error); + formatter->close_section(); + formatter->flush(out); + return; + } + + formatter->dump_string("source_bucket", bucket_source.name); + formatter->dump_string("source_bucket_id", bucket_source.bucket_id); + + if (!status.empty()) { + formatter->dump_string("status", status); + formatter->close_section(); + formatter->flush(out); + return; + } + + formatter->dump_int("total_shards", total_shards); + formatter->open_array_section("behind_shards"); + for (auto const& [id, marker] : shards_behind) { + formatter->open_object_section("shard"); + formatter->dump_int("shard_id", id); + formatter->dump_string("shard_marker", marker); + formatter->close_section(); + } + formatter->close_section(); + formatter->close_section(); + formatter->flush(out); + } +}; + +static int bucket_source_sync_status(const DoutPrefixProvider *dpp, rgw::sal::RadosStore* driver, + const RGWZone& zone, const RGWZone& source, RGWRESTConn *conn, const RGWBucketInfo& bucket_info, rgw_sync_bucket_pipe pipe, - int width, std::ostream& out) + bucket_source_sync_info& source_sync_info) { - out << indented{width, "source zone"} << source.id << " (" << source.name << ")" << std::endl; - // syncing from this zone? if (!driver->svc()->zone->zone_syncs_from(zone, source)) { - out << indented{width} << "does not sync from zone\n"; + source_sync_info.error = "does not sync from zone"; return 0; } if (!pipe.source.bucket) { - ldpp_dout(dpp, -1) << __func__ << "(): missing source bucket" << dendl; + source_sync_info.error = fmt::format("{} (): missing source bucket", __func__); return -EINVAL; } std::unique_ptr source_bucket; int r = init_bucket(*pipe.source.bucket, &source_bucket); if (r < 0) { - ldpp_dout(dpp, -1) << "failed to read source bucket info: " << cpp_strerror(r) << dendl; + source_sync_info.error = fmt::format("failed to read source bucket info: {}", cpp_strerror(r)); return r; } - out << indented{width, "source bucket"} << source_bucket->get_key() << std::endl; - pipe.source.bucket = source_bucket->get_key(); + source_sync_info.bucket_source = source_bucket->get_key(); + pipe.source.bucket = source_bucket->get_key(); pipe.dest.bucket = bucket_info.bucket; uint64_t gen = 0; @@ -2588,15 +2657,15 @@ static int bucket_source_sync_status(const DoutPrefixProvider *dpp, rgw::sal::Ra r = rgw_read_bucket_full_sync_status(dpp, driver, pipe, &full_status, null_yield); if (r >= 0) { if (full_status.state == BucketSyncState::Init) { - out << indented{width} << "init: bucket sync has not started\n"; + source_sync_info.status = "init: bucket sync has not started"; return 0; } if (full_status.state == BucketSyncState::Stopped) { - out << indented{width} << "stopped: bucket sync is disabled\n"; + source_sync_info.status = "stopped: bucket sync is disabled"; return 0; } if (full_status.state == BucketSyncState::Full) { - out << indented{width} << "full sync: " << full_status.full.count << " objects completed\n"; + source_sync_info.status = fmt::format("full sync: {} objects completed", full_status.full.count); return 0; } gen = full_status.incremental_gen; @@ -2605,46 +2674,45 @@ static int bucket_source_sync_status(const DoutPrefixProvider *dpp, rgw::sal::Ra // no full status, but there may be per-shard status from before upgrade const auto& logs = source_bucket->get_info().layout.logs; if (logs.empty()) { - out << indented{width} << "init: bucket sync has not started\n"; + source_sync_info.status = "init: bucket sync has not started"; return 0; } const auto& log = logs.front(); if (log.gen > 0) { // this isn't the backward-compatible case, so we just haven't started yet - out << indented{width} << "init: bucket sync has not started\n"; + source_sync_info.status = "init: bucket sync has not started"; return 0; } if (log.layout.type != rgw::BucketLogType::InIndex) { - ldpp_dout(dpp, -1) << "unrecognized log layout type " << log.layout.type << dendl; + source_sync_info.error = fmt::format("unrecognized log layout type {}", to_string(log.layout.type)); return -EINVAL; } // use shard count from our log gen=0 shard_status.resize(rgw::num_shards(log.layout.in_index)); } else { - lderr(driver->ctx()) << "failed to read bucket full sync status: " << cpp_strerror(r) << dendl; + source_sync_info.error = fmt::format("failed to read bucket full sync status: {}", cpp_strerror(r)); return r; } r = rgw_read_bucket_inc_sync_status(dpp, driver, pipe, gen, &shard_status); if (r < 0) { - lderr(driver->ctx()) << "failed to read bucket incremental sync status: " << cpp_strerror(r) << dendl; + source_sync_info.error = fmt::format("failed to read bucket incremental sync status: {}", cpp_strerror(r)); return r; } const int total_shards = shard_status.size(); - - out << indented{width} << "incremental sync on " << total_shards << " shards\n"; + source_sync_info.total_shards = total_shards; rgw_bucket_index_marker_info remote_info; BucketIndexShardsManager remote_markers; r = rgw_read_remote_bilog_info(dpp, conn, source_bucket->get_key(), remote_info, remote_markers, null_yield); if (r < 0) { - ldpp_dout(dpp, -1) << "failed to read remote log: " << cpp_strerror(r) << dendl; + source_sync_info.error = fmt::format("failed to read remote log: {}", cpp_strerror(r)); return r; } - std::set shards_behind; + std::map shards_behind; for (const auto& r : remote_markers.get()) { auto shard_id = r.first; if (r.second.empty()) { @@ -2652,21 +2720,17 @@ static int bucket_source_sync_status(const DoutPrefixProvider *dpp, rgw::sal::Ra } if (shard_id >= total_shards) { // unexpected shard id. we don't have status for it, so we're behind - shards_behind.insert(shard_id); + shards_behind[shard_id] = r.second; continue; } auto& m = shard_status[shard_id]; const auto pos = BucketIndexShardsManager::get_shard_marker(m.inc_marker.position); if (pos < r.second) { - shards_behind.insert(shard_id); + shards_behind[shard_id] = r.second; } } - if (!shards_behind.empty()) { - out << indented{width} << "bucket is behind on " << shards_behind.size() << " shards\n"; - out << indented{width} << "behind shards: [" << shards_behind << "]\n"; - } else { - out << indented{width} << "bucket is caught up with source\n"; - } + + source_sync_info.shards_behind = std::move(shards_behind); return 0; } @@ -2877,25 +2941,82 @@ static int bucket_sync_info(rgw::sal::Driver* driver, const RGWBucketInfo& info, return 0; } +struct bucket_sync_status_info { + std::vector source_status_info; + rgw::sal::Zone* _zone; + const rgw::sal::ZoneGroup* _zonegroup; + const RGWBucketInfo& _bucket_info; + const int width = 15; + std::string error; + + bucket_sync_status_info(const RGWBucketInfo& bucket_info): _bucket_info(bucket_info) {} + + void print(std::ostream& out, bool use_formatter, Formatter* formatter) { + if (use_formatter) { + _print_formatter(out, formatter); + } else { + _print_plaintext(out); + } + } + + void _print_plaintext(std::ostream& out) { + out << indented{width, "realm"} << _zone->get_realm_id() << " (" << _zone->get_realm_name() << ")" << std::endl; + out << indented{width, "zonegroup"} << _zonegroup->get_id() << " (" << _zonegroup->get_name() << ")" << std::endl; + out << indented{width, "zone"} << _zone->get_id() << " (" << _zone->get_name() << ")" << std::endl; + out << indented{width, "bucket"} << _bucket_info.bucket << std::endl; + out << indented{width, "current time"} + << to_iso_8601(ceph::real_clock::now(), iso_8601_format::YMDhms) << "\n\n"; + + if (!error.empty()){ + out << error << std::endl; + } + + for (const auto &info : source_status_info) { + info._print_plaintext(out, width); + } + } + + void _print_formatter(std::ostream& out, Formatter* formatter) { + formatter->open_object_section("test"); + formatter->dump_string("realm", _zone->get_realm_id()); + formatter->dump_string("realm_name", _zone->get_realm_name()); + formatter->dump_string("zonegroup", _zonegroup->get_id()); + formatter->dump_string("zonegroup_name", _zonegroup->get_name()); + formatter->dump_string("zone", _zone->get_id()); + formatter->dump_string("zone_name", _zone->get_name()); + formatter->dump_string("bucket", _bucket_info.bucket.name); + formatter->dump_string("bucket_instance_id", _bucket_info.bucket.bucket_id); + formatter->dump_string("current_time", to_iso_8601(ceph::real_clock::now(), iso_8601_format::YMDhms)); + + if (!error.empty()) { + formatter->dump_string("error", error); + } + + formatter->open_array_section("sources"); + for (const auto &info : source_status_info) { + info._print_formatter(out, formatter); + } + formatter->close_section(); + + formatter->close_section(); + formatter->flush(out); + } + +}; + static int bucket_sync_status(rgw::sal::Driver* driver, const RGWBucketInfo& info, const rgw_zone_id& source_zone_id, std::optional& opt_source_bucket, - std::ostream& out) + bucket_sync_status_info& bucket_sync_info) { const rgw::sal::ZoneGroup& zonegroup = driver->get_zone()->get_zonegroup(); rgw::sal::Zone* zone = driver->get_zone(); - constexpr int width = 15; - - out << indented{width, "realm"} << zone->get_realm_id() << " (" << zone->get_realm_name() << ")\n"; - out << indented{width, "zonegroup"} << zonegroup.get_id() << " (" << zonegroup.get_name() << ")\n"; - out << indented{width, "zone"} << zone->get_id() << " (" << zone->get_name() << ")\n"; - out << indented{width, "bucket"} << info.bucket << "\n"; - out << indented{width, "current time"} - << to_iso_8601(ceph::real_clock::now(), iso_8601_format::YMDhms) << "\n\n"; + bucket_sync_info._zone = zone; + bucket_sync_info._zonegroup = &zonegroup; if (!static_cast(driver)->ctl()->bucket->bucket_imports_data(info.bucket, null_yield, dpp())) { - out << "Sync is disabled for bucket " << info.bucket.name << " or bucket has no sync sources" << std::endl; + bucket_sync_info.error = fmt::format("Sync is disabled for bucket {} or bucket has no sync sources", info.bucket.name); return 0; } @@ -2903,7 +3024,7 @@ static int bucket_sync_status(rgw::sal::Driver* driver, const RGWBucketInfo& inf int r = driver->get_sync_policy_handler(dpp(), std::nullopt, info.bucket, &handler, null_yield); if (r < 0) { - ldpp_dout(dpp(), -1) << "ERROR: failed to get policy handler for bucket (" << info.bucket << "): r=" << r << ": " << cpp_strerror(-r) << dendl; + bucket_sync_info.error = fmt::format("ERROR: failed to get policy handler for bucket ({}): r={}: {}", info.bucket.name, r, cpp_strerror(-r)); return r; } @@ -2916,13 +3037,12 @@ static int bucket_sync_status(rgw::sal::Driver* driver, const RGWBucketInfo& inf std::unique_ptr zone; int ret = driver->get_zone()->get_zonegroup().get_zone_by_id(source_zone_id.id, &zone); if (ret < 0) { - ldpp_dout(dpp(), -1) << "Source zone not found in zonegroup " - << zonegroup.get_name() << dendl; + bucket_sync_info.error = fmt::format("Source zone not found in zonegroup {}", zonegroup.get_name()); return -EINVAL; } auto c = zone_conn_map.find(source_zone_id); if (c == zone_conn_map.end()) { - ldpp_dout(dpp(), -1) << "No connection to zone " << zone->get_name() << dendl; + bucket_sync_info.error = fmt::format("No connection to zone {}", zone->get_name()); return -EINVAL; } zone_ids.insert(source_zone_id); @@ -2953,10 +3073,15 @@ static int bucket_sync_status(rgw::sal::Driver* driver, const RGWBucketInfo& inf continue; } if (pipe.source.zone.value_or(rgw_zone_id()) == z->second.id) { - bucket_source_sync_status(dpp(), static_cast(driver), static_cast(driver)->svc()->zone->get_zone(), z->second, + bucket_source_sync_info source_sync_info(z->second); + auto ret = bucket_source_sync_status(dpp(), static_cast(driver), static_cast(driver)->svc()->zone->get_zone(), z->second, c->second, info, pipe, - width, out); + source_sync_info); + + if (ret == 0) { + bucket_sync_info.source_status_info.emplace_back(std::move(source_sync_info)); + } } } } @@ -3484,6 +3609,7 @@ int main(int argc, const char **argv) list tags_rm; int placement_inline_data = true; bool placement_inline_data_specified = false; + bool format_arg_passed = false; int64_t max_objects = -1; int64_t max_size = -1; @@ -3863,6 +3989,7 @@ int main(int argc, const char **argv) new_bucket_name = val; } else if (ceph_argparse_witharg(args, i, &val, "--format", (char*)NULL)) { format = val; + format_arg_passed = true; } else if (ceph_argparse_witharg(args, i, &val, "--categories", (char*)NULL)) { string cat_str = val; list cat_list; @@ -9845,7 +9972,18 @@ next: if (ret < 0) { return -ret; } - bucket_sync_status(driver, bucket->get_info(), source_zone, opt_source_bucket, std::cout); + + auto bucket_info = bucket->get_info(); + bucket_sync_status_info bucket_sync_info(bucket_info); + + ret = bucket_sync_status(driver, bucket_info, source_zone, + opt_source_bucket, bucket_sync_info); + + if (ret == 0) { + bucket_sync_info.print(std::cout, format_arg_passed, formatter.get()); + } else { + cerr << "failed to get bucket sync status. see logs for more info" << std::endl; + } } if (opt_cmd == OPT::BUCKET_SYNC_MARKERS) {