]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw-admin: Add --format option for bucket sync status 60676/head
authorAlex Wojno <awojno@bloomberg.net>
Mon, 4 Nov 2024 16:53:47 +0000 (16:53 +0000)
committerAlex Wojno <awojno@bloomberg.net>
Fri, 8 Nov 2024 20:44:49 +0000 (20:44 +0000)
Signed-off-by: Alex Wojno <awojno@bloomberg.net>
src/rgw/rgw_admin.cc

index b00dfaa1ec5150acf42b74905e2e267dade8d761..1850e67730e0d3931acdd91ad9b125224bd38a70 100644 (file)
@@ -2549,35 +2549,104 @@ std::ostream& operator<<(std::ostream& out, const indented& h) {
   return out << std::setw(h.w) << h.header << std::setw(1) << ' ';
 }
 
-static int bucket_source_sync_status(const DoutPrefixProvider *dpp, rgw::sal::RadosStore* driver, const RGWZone& zone,
+struct bucket_source_sync_info {
+  const RGWZone& _source;
+  std::string_view error;
+  std::map<int,std::string> shards_behind;
+  int total_shards;
+  std::string_view status;
+  rgw_bucket bucket_source;
+
+  bucket_source_sync_info(const RGWZone& source): _source(source) {}
+
+  void _print_plaintext(std::ostream& out, int width) const {
+    out << indented{width, "source zone"} << _source.id << " (" << _source.name << ")" << std::endl;
+    if (!error.empty()) {
+      out << indented{width} << error << std::endl;
+      return;
+    }
+    out << indented{width, "source bucket"} << bucket_source << std::endl;
+    if (!status.empty()) {
+      out << indented{width} << status << std::endl;
+      return;
+    }
+    out << indented{width} << "incremental sync on " << total_shards << " shards\n";
+    if (!shards_behind.empty()) {
+      out << indented{width} << "bucket is behind on " << shards_behind.size() << " shards\n";
+      set<int> shard_ids;
+      for (auto const& [shard_id, _] : shards_behind) {
+        shard_ids.insert(shard_id);
+      }
+      out << indented{width} << "behind shards: [" << shard_ids << "]\n";
+    } else {
+      out << indented{width} << "bucket is caught up with source\n";
+    }
+  }
+
+  void _print_formatter(std::ostream& out, Formatter* formatter) const {
+    formatter->open_object_section("source");
+    formatter->dump_string("source_zone", _source.id);
+    formatter->dump_string("source_name", _source.name);
+
+    if (!error.empty()) {
+      formatter->dump_string("error", error);
+      formatter->close_section();
+      formatter->flush(out);
+      return;
+    }
+
+    formatter->dump_string("source_bucket", bucket_source.name);
+    formatter->dump_string("source_bucket_id", bucket_source.bucket_id);
+
+    if (!status.empty()) {
+      formatter->dump_string("status", status);
+      formatter->close_section();
+      formatter->flush(out);
+      return;
+    }
+
+    formatter->dump_int("total_shards", total_shards);
+    formatter->open_array_section("behind_shards");
+    for (auto const& [id, marker] : shards_behind) {
+      formatter->open_object_section("shard");
+      formatter->dump_int("shard_id", id);
+      formatter->dump_string("shard_marker", marker);
+      formatter->close_section();
+    }
+    formatter->close_section();
+    formatter->close_section();
+    formatter->flush(out);
+  }
+};
+
+static int bucket_source_sync_status(const DoutPrefixProvider *dpp, rgw::sal::RadosStore* driver,
+                                     const RGWZone& zone,
                                      const RGWZone& source, RGWRESTConn *conn,
                                      const RGWBucketInfo& bucket_info,
                                      rgw_sync_bucket_pipe pipe,
-                                     int width, std::ostream& out)
+                                     bucket_source_sync_info& source_sync_info)
 {
-  out << indented{width, "source zone"} << source.id << " (" << source.name << ")" << std::endl;
-
   // syncing from this zone?
   if (!driver->svc()->zone->zone_syncs_from(zone, source)) {
-    out << indented{width} << "does not sync from zone\n";
+    source_sync_info.error = "does not sync from zone";
     return 0;
   }
 
   if (!pipe.source.bucket) {
-    ldpp_dout(dpp, -1) << __func__ << "(): missing source bucket" << dendl;
+    source_sync_info.error = fmt::format("{} (): missing source bucket", __func__);
     return -EINVAL;
   }
 
   std::unique_ptr<rgw::sal::Bucket> source_bucket;
   int r = init_bucket(*pipe.source.bucket, &source_bucket);
   if (r < 0) {
-    ldpp_dout(dpp, -1) << "failed to read source bucket info: " << cpp_strerror(r) << dendl;
+    source_sync_info.error = fmt::format("failed to read source bucket info: {}", cpp_strerror(r));
     return r;
   }
 
-  out << indented{width, "source bucket"} << source_bucket->get_key() << std::endl;
-  pipe.source.bucket = source_bucket->get_key();
+  source_sync_info.bucket_source = source_bucket->get_key();
 
+  pipe.source.bucket = source_bucket->get_key();
   pipe.dest.bucket = bucket_info.bucket;
 
   uint64_t gen = 0;
@@ -2588,15 +2657,15 @@ static int bucket_source_sync_status(const DoutPrefixProvider *dpp, rgw::sal::Ra
   r = rgw_read_bucket_full_sync_status(dpp, driver, pipe, &full_status, null_yield);
   if (r >= 0) {
     if (full_status.state == BucketSyncState::Init) {
-      out << indented{width} << "init: bucket sync has not started\n";
+      source_sync_info.status = "init: bucket sync has not started";
       return 0;
     }
     if (full_status.state == BucketSyncState::Stopped) {
-      out << indented{width} << "stopped: bucket sync is disabled\n";
+      source_sync_info.status = "stopped: bucket sync is disabled";
       return 0;
     }
     if (full_status.state == BucketSyncState::Full) {
-      out << indented{width} << "full sync: " << full_status.full.count << " objects completed\n";
+      source_sync_info.status = fmt::format("full sync: {} objects completed", full_status.full.count);
       return 0;
     }
     gen = full_status.incremental_gen;
@@ -2605,46 +2674,45 @@ static int bucket_source_sync_status(const DoutPrefixProvider *dpp, rgw::sal::Ra
     // no full status, but there may be per-shard status from before upgrade
     const auto& logs = source_bucket->get_info().layout.logs;
     if (logs.empty()) {
-      out << indented{width} << "init: bucket sync has not started\n";
+      source_sync_info.status = "init: bucket sync has not started";
       return 0;
     }
     const auto& log = logs.front();
     if (log.gen > 0) {
       // this isn't the backward-compatible case, so we just haven't started yet
-      out << indented{width} << "init: bucket sync has not started\n";
+      source_sync_info.status = "init: bucket sync has not started";
       return 0;
     }
     if (log.layout.type != rgw::BucketLogType::InIndex) {
-      ldpp_dout(dpp, -1) << "unrecognized log layout type " << log.layout.type << dendl;
+      source_sync_info.error = fmt::format("unrecognized log layout type {}", to_string(log.layout.type));
       return -EINVAL;
     }
     // use shard count from our log gen=0
     shard_status.resize(rgw::num_shards(log.layout.in_index));
   } else {
-    lderr(driver->ctx()) << "failed to read bucket full sync status: " << cpp_strerror(r) << dendl;
+    source_sync_info.error = fmt::format("failed to read bucket full sync status: {}", cpp_strerror(r));
     return r;
   }
 
   r = rgw_read_bucket_inc_sync_status(dpp, driver, pipe, gen, &shard_status);
   if (r < 0) {
-    lderr(driver->ctx()) << "failed to read bucket incremental sync status: " << cpp_strerror(r) << dendl;
+    source_sync_info.error = fmt::format("failed to read bucket incremental sync status: {}", cpp_strerror(r));
     return r;
   }
 
   const int total_shards = shard_status.size();
-
-  out << indented{width} << "incremental sync on " << total_shards << " shards\n";
+  source_sync_info.total_shards = total_shards;
 
   rgw_bucket_index_marker_info remote_info;
   BucketIndexShardsManager remote_markers;
   r = rgw_read_remote_bilog_info(dpp, conn, source_bucket->get_key(),
                                  remote_info, remote_markers, null_yield);
   if (r < 0) {
-    ldpp_dout(dpp, -1) << "failed to read remote log: " << cpp_strerror(r) << dendl;
+    source_sync_info.error = fmt::format("failed to read remote log: {}", cpp_strerror(r));
     return r;
   }
 
-  std::set<int> shards_behind;
+  std::map<int, std::string> shards_behind;
   for (const auto& r : remote_markers.get()) {
     auto shard_id = r.first;
     if (r.second.empty()) {
@@ -2652,21 +2720,17 @@ static int bucket_source_sync_status(const DoutPrefixProvider *dpp, rgw::sal::Ra
     }
     if (shard_id >= total_shards) {
       // unexpected shard id. we don't have status for it, so we're behind
-      shards_behind.insert(shard_id);
+      shards_behind[shard_id] = r.second;
       continue;
     }
     auto& m = shard_status[shard_id];
     const auto pos = BucketIndexShardsManager::get_shard_marker(m.inc_marker.position);
     if (pos < r.second) {
-      shards_behind.insert(shard_id);
+      shards_behind[shard_id] = r.second;
     }
   }
-  if (!shards_behind.empty()) {
-    out << indented{width} << "bucket is behind on " << shards_behind.size() << " shards\n";
-    out << indented{width} << "behind shards: [" << shards_behind << "]\n";
-  } else {
-    out << indented{width} << "bucket is caught up with source\n";
-  }
+
+  source_sync_info.shards_behind = std::move(shards_behind);
   return 0;
 }
 
@@ -2877,25 +2941,82 @@ static int bucket_sync_info(rgw::sal::Driver* driver, const RGWBucketInfo& info,
   return 0;
 }
 
+struct bucket_sync_status_info {
+  std::vector<bucket_source_sync_info> source_status_info;
+  rgw::sal::Zone* _zone;
+  const rgw::sal::ZoneGroup* _zonegroup;
+  const RGWBucketInfo& _bucket_info;
+  const int width = 15;
+  std::string error;
+
+  bucket_sync_status_info(const RGWBucketInfo& bucket_info): _bucket_info(bucket_info) {}
+
+  void print(std::ostream& out, bool use_formatter, Formatter* formatter) {
+    if (use_formatter) {
+      _print_formatter(out, formatter);
+    } else {
+      _print_plaintext(out);
+    }
+  }
+
+  void _print_plaintext(std::ostream& out) {
+    out << indented{width, "realm"} << _zone->get_realm_id() << " (" << _zone->get_realm_name() << ")" << std::endl;
+    out << indented{width, "zonegroup"} << _zonegroup->get_id() << " (" << _zonegroup->get_name() << ")" << std::endl;
+    out << indented{width, "zone"} << _zone->get_id() << " (" << _zone->get_name() << ")" << std::endl;
+    out << indented{width, "bucket"} << _bucket_info.bucket << std::endl;
+    out << indented{width, "current time"}
+      << to_iso_8601(ceph::real_clock::now(), iso_8601_format::YMDhms) << "\n\n";
+
+    if (!error.empty()){
+      out << error << std::endl;
+    }
+
+    for (const auto &info : source_status_info) {
+      info._print_plaintext(out, width);
+    }
+  }
+
+  void _print_formatter(std::ostream& out, Formatter* formatter) {
+    formatter->open_object_section("test");
+    formatter->dump_string("realm", _zone->get_realm_id());
+    formatter->dump_string("realm_name", _zone->get_realm_name());
+    formatter->dump_string("zonegroup", _zonegroup->get_id());
+    formatter->dump_string("zonegroup_name", _zonegroup->get_name());
+    formatter->dump_string("zone", _zone->get_id());
+    formatter->dump_string("zone_name", _zone->get_name());
+    formatter->dump_string("bucket", _bucket_info.bucket.name);
+    formatter->dump_string("bucket_instance_id", _bucket_info.bucket.bucket_id);
+    formatter->dump_string("current_time", to_iso_8601(ceph::real_clock::now(), iso_8601_format::YMDhms));
+
+    if (!error.empty()) {
+      formatter->dump_string("error", error);
+    }
+
+    formatter->open_array_section("sources");
+    for (const auto &info : source_status_info) {
+      info._print_formatter(out, formatter);
+    }
+    formatter->close_section();
+
+    formatter->close_section();
+    formatter->flush(out);
+  }
+
+};
+
 static int bucket_sync_status(rgw::sal::Driver* driver, const RGWBucketInfo& info,
                               const rgw_zone_id& source_zone_id,
                              std::optional<rgw_bucket>& opt_source_bucket,
-                              std::ostream& out)
+                              bucket_sync_status_info& bucket_sync_info)
 {
   const rgw::sal::ZoneGroup& zonegroup = driver->get_zone()->get_zonegroup();
   rgw::sal::Zone* zone = driver->get_zone();
-  constexpr int width = 15;
-
-  out << indented{width, "realm"} << zone->get_realm_id() << " (" << zone->get_realm_name() << ")\n";
-  out << indented{width, "zonegroup"} << zonegroup.get_id() << " (" << zonegroup.get_name() << ")\n";
-  out << indented{width, "zone"} << zone->get_id() << " (" << zone->get_name() << ")\n";
-  out << indented{width, "bucket"} << info.bucket << "\n";
-  out << indented{width, "current time"}
-    << to_iso_8601(ceph::real_clock::now(), iso_8601_format::YMDhms) << "\n\n";
 
+  bucket_sync_info._zone = zone;
+  bucket_sync_info._zonegroup = &zonegroup;
 
   if (!static_cast<rgw::sal::RadosStore*>(driver)->ctl()->bucket->bucket_imports_data(info.bucket, null_yield, dpp())) {
-    out << "Sync is disabled for bucket " << info.bucket.name << " or bucket has no sync sources" << std::endl;
+    bucket_sync_info.error = fmt::format("Sync is disabled for bucket {} or bucket has no sync sources", info.bucket.name);
     return 0;
   }
 
@@ -2903,7 +3024,7 @@ static int bucket_sync_status(rgw::sal::Driver* driver, const RGWBucketInfo& inf
 
   int r = driver->get_sync_policy_handler(dpp(), std::nullopt, info.bucket, &handler, null_yield);
   if (r < 0) {
-    ldpp_dout(dpp(), -1) << "ERROR: failed to get policy handler for bucket (" << info.bucket << "): r=" << r << ": " << cpp_strerror(-r) << dendl;
+    bucket_sync_info.error = fmt::format("ERROR: failed to get policy handler for bucket ({}): r={}: {}", info.bucket.name, r, cpp_strerror(-r));
     return r;
   }
 
@@ -2916,13 +3037,12 @@ static int bucket_sync_status(rgw::sal::Driver* driver, const RGWBucketInfo& inf
     std::unique_ptr<rgw::sal::Zone> zone;
     int ret = driver->get_zone()->get_zonegroup().get_zone_by_id(source_zone_id.id, &zone);
     if (ret < 0) {
-      ldpp_dout(dpp(), -1) << "Source zone not found in zonegroup "
-          << zonegroup.get_name() << dendl;
+      bucket_sync_info.error = fmt::format("Source zone not found in zonegroup {}", zonegroup.get_name());
       return -EINVAL;
     }
     auto c = zone_conn_map.find(source_zone_id);
     if (c == zone_conn_map.end()) {
-      ldpp_dout(dpp(), -1) << "No connection to zone " << zone->get_name() << dendl;
+      bucket_sync_info.error = fmt::format("No connection to zone {}", zone->get_name());
       return -EINVAL;
     }
     zone_ids.insert(source_zone_id);
@@ -2953,10 +3073,15 @@ static int bucket_sync_status(rgw::sal::Driver* driver, const RGWBucketInfo& inf
        continue;
       }
       if (pipe.source.zone.value_or(rgw_zone_id()) == z->second.id) {
-       bucket_source_sync_status(dpp(), static_cast<rgw::sal::RadosStore*>(driver), static_cast<rgw::sal::RadosStore*>(driver)->svc()->zone->get_zone(), z->second,
+        bucket_source_sync_info source_sync_info(z->second);
+       auto ret = bucket_source_sync_status(dpp(), static_cast<rgw::sal::RadosStore*>(driver), static_cast<rgw::sal::RadosStore*>(driver)->svc()->zone->get_zone(), z->second,
                                  c->second,
                                  info, pipe,
-                                 width, out);
+                                 source_sync_info);
+
+        if (ret == 0) {
+          bucket_sync_info.source_status_info.emplace_back(std::move(source_sync_info));
+        }
       }
     }
   }
@@ -3484,6 +3609,7 @@ int main(int argc, const char **argv)
   list<string> tags_rm;
   int placement_inline_data = true;
   bool placement_inline_data_specified = false;
+  bool format_arg_passed = false;
 
   int64_t max_objects = -1;
   int64_t max_size = -1;
@@ -3863,6 +3989,7 @@ int main(int argc, const char **argv)
       new_bucket_name = val;
     } else if (ceph_argparse_witharg(args, i, &val, "--format", (char*)NULL)) {
       format = val;
+      format_arg_passed = true;
     } else if (ceph_argparse_witharg(args, i, &val, "--categories", (char*)NULL)) {
       string cat_str = val;
       list<string> cat_list;
@@ -9845,7 +9972,18 @@ next:
     if (ret < 0) {
       return -ret;
     }
-    bucket_sync_status(driver, bucket->get_info(), source_zone, opt_source_bucket, std::cout);
+
+    auto bucket_info = bucket->get_info();
+    bucket_sync_status_info bucket_sync_info(bucket_info);
+    ret = bucket_sync_status(driver, bucket_info, source_zone,
+        opt_source_bucket, bucket_sync_info);
+
+    if (ret == 0) {
+      bucket_sync_info.print(std::cout, format_arg_passed, formatter.get());
+    } else {
+      cerr << "failed to get bucket sync status. see logs for more info" << std::endl;
+    }
   }
 
   if (opt_cmd == OPT::BUCKET_SYNC_MARKERS) {