]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: paginate ListTopics
authorCasey Bodley <cbodley@redhat.com>
Tue, 9 Jan 2024 23:55:40 +0000 (18:55 -0500)
committerCasey Bodley <cbodley@redhat.com>
Wed, 10 Apr 2024 13:18:06 +0000 (09:18 -0400)
rename read_topics()/write_topics() to 'v1' and only call them from
internal v1 call paths

public get_topics() now calls read_topics_v1() for the v1 case, and does
the paginated listing with driver->meta_list_keys_next() for v2

RGWPSListTopicsOp now uses the NextToken request/response params with
the paginated get_topics(), limiting responses to 100 entries like AWS

'radosgw-admin topic list' also paginates the listing according to
--max-entries to avoid reading everything into memory at once

Signed-off-by: Casey Bodley <cbodley@redhat.com>
(cherry picked from commit db6c73a0cdcf60a920c91b6d4506df36d98b7308)

src/rgw/rgw_admin.cc
src/rgw/rgw_pubsub.cc
src/rgw/rgw_pubsub.h
src/rgw/rgw_rest_pubsub.cc

index 45c889b205f11ef2a70759c63ff2df1e3c8eca60..7cc73c5b16a6d0d081bbdbe6a49af5a31b6162b0 100644 (file)
@@ -1169,7 +1169,7 @@ static void show_reshard_status(
 }
 
 static void show_topics_info_v2(const rgw_pubsub_topic& topic,
-                                std::set<std::string> subscribed_buckets,
+                                const std::set<std::string>& subscribed_buckets,
                                 Formatter* formatter) {
   formatter->open_object_section("topic");
   topic.dump(formatter);
@@ -10655,39 +10655,47 @@ next:
 
   if (opt_cmd == OPT::PUBSUB_TOPIC_LIST) {
     RGWPubSub ps(driver, tenant, *site);
-    rgw_pubsub_topics result;
-    ret = ps.get_topics(dpp(), result, null_yield);
-    if (ret < 0 && ret != -ENOENT) {
-      cerr << "ERROR: could not get topics: " << cpp_strerror(-ret) << std::endl;
-      return -ret;
-    }
-    if (!rgw::sal::User::empty(user)) {
-      for (auto it = result.topics.cbegin(); it != result.topics.cend();) {
-        const auto& topic = it->second;
-        if (user->get_id() != topic.user) {
-          result.topics.erase(it++);
-        } else {
-          ++it;
-        }
+    std::string next_token = marker;
+
+    formatter->open_object_section("result");
+    formatter->open_array_section("topics");
+    do {
+      rgw_pubsub_topics result;
+      int ret = ps.get_topics(dpp(), next_token, max_entries,
+                              result, next_token, null_yield);
+      if (ret < 0 && ret != -ENOENT) {
+        cerr << "ERROR: could not get topics: " << cpp_strerror(-ret) << std::endl;
+        return -ret;
       }
-    }
-    if (rgw::all_zonegroups_support(*site, rgw::zone_features::notification_v2)) {
-      Formatter::ObjectSection top_section(*formatter, "result");
-      Formatter::ArraySection s(*formatter, "topics");
       for (const auto& [_, topic] : result.topics) {
+        if (!rgw::sal::User::empty(user) && user->get_id() != topic.user) {
+          continue;
+        }
         std::set<std::string> subscribed_buckets;
-        ret = driver->get_bucket_topic_mapping(topic, subscribed_buckets,
-                                               null_yield, dpp());
-        if (ret < 0) {
-          cerr << "failed to fetch bucket topic mapping info for topic: "
-               << topic.name << ", ret=" << ret << std::endl;
-        } else {
+        if (rgw::all_zonegroups_support(*site, rgw::zone_features::notification_v2)) {
+          ret = driver->get_bucket_topic_mapping(topic, subscribed_buckets,
+                                                 null_yield, dpp());
+          if (ret < 0) {
+            cerr << "failed to fetch bucket topic mapping info for topic: "
+                 << topic.name << ", ret=" << ret << std::endl;
+          }
           show_topics_info_v2(topic, subscribed_buckets, formatter.get());
+        } else {
+          encode_json("result", result, formatter.get());
+        }
+        if (max_entries_specified) {
+          --max_entries;
         }
       }
-    } else {
-      encode_json("result", result, formatter.get());
+    } while (!next_token.empty() && max_entries > 0);
+    formatter->close_section(); // topics
+    if (max_entries_specified) {
+      encode_json("truncated", !next_token.empty(), formatter.get());
+      if (!next_token.empty()) {
+        encode_json("marker", next_token, formatter.get());
+      }
     }
+    formatter->close_section(); // result
     formatter->flush(cout);
   }
 
index f4ddb118cd31eed6f6287ff061a3809a76fc12c2..bec78b687c83e932b0a66e0e7e304357cae368c1 100644 (file)
@@ -510,45 +510,61 @@ RGWPubSub::RGWPubSub(rgw::sal::Driver* _driver,
 {
 }
 
-int RGWPubSub::read_topics(const DoutPrefixProvider *dpp, rgw_pubsub_topics& result,
-    RGWObjVersionTracker *objv_tracker, optional_yield y) const
+int RGWPubSub::get_topics(const DoutPrefixProvider* dpp,
+                          const std::string& start_marker, int max_items,
+                          rgw_pubsub_topics& result, std::string& next_marker,
+                          optional_yield y) const
 {
-  if (use_notification_v2) {
-    void* handle = NULL;
-    auto ret =
-        driver->meta_list_keys_init(dpp, "topic", std::string(), &handle);
-    if (ret < 0) {
-      return ret;
-    }
-    bool truncated;
-    int max = 1000;
-    do {
-      std::list<std::string> topics;
-      ret = driver->meta_list_keys_next(dpp, handle, max, topics, &truncated);
-      if (ret < 0) {
-            ldpp_dout(dpp, 1)
-                << "ERROR: lists_keys_next(): " << cpp_strerror(-ret) << dendl;
-            break;
-      }
-      for (auto& topic_entry : topics) {
-        std::string topic_name;
-        std::string topic_tenant;
-        parse_topic_entry(topic_entry, &topic_tenant, &topic_name);
-        if (tenant != topic_tenant) {
-          continue;
-        }
-        rgw_pubsub_topic topic;
-        const auto op_ret = get_topic(dpp, topic_name, topic, y, nullptr);
-        if (op_ret < 0) {
-          ret = op_ret;
-          continue;
-        }
-        result.topics[topic_name] = std::move(topic);
-      }
-    } while (truncated);
-    driver->meta_list_keys_complete(handle);
+  if (!use_notification_v2) {
+    // v1 returns all topics, ignoring marker/max_items
+    return read_topics_v1(dpp, result, nullptr, y);
+  }
+
+  // TODO: prefix filter on 'tenant:'
+  void* handle = NULL;
+  int ret = driver->meta_list_keys_init(dpp, "topic", start_marker, &handle);
+  if (ret < 0) {
+    return ret;
+  }
+  auto g = make_scope_guard(
+      [this, handle] { driver->meta_list_keys_complete(handle); });
+
+  if (max_items > 1000) {
+    max_items = 1000;
+  }
+  std::list<std::string> topics;
+  bool truncated = false;
+  ret = driver->meta_list_keys_next(dpp, handle, max_items, topics, &truncated);
+  if (ret < 0) {
+    ldpp_dout(dpp, 1)
+        << "ERROR: lists_keys_next(): " << cpp_strerror(-ret) << dendl;
     return ret;
   }
+  for (auto& topic_entry : topics) {
+    std::string topic_name;
+    std::string topic_tenant;
+    parse_topic_entry(topic_entry, &topic_tenant, &topic_name);
+    if (tenant != topic_tenant) {
+      continue;
+    }
+    rgw_pubsub_topic topic;
+    int r = get_topic(dpp, topic_name, topic, y, nullptr);
+    if (r < 0) {
+      continue;
+    }
+    result.topics[topic_name] = std::move(topic);
+  }
+  if (truncated) {
+    next_marker = driver->meta_get_marker(handle);
+  } else {
+    next_marker.clear();
+  }
+  return ret;
+}
+
+int RGWPubSub::read_topics_v1(const DoutPrefixProvider *dpp, rgw_pubsub_topics& result,
+                              RGWObjVersionTracker *objv_tracker, optional_yield y) const
+{
   const int ret = driver->read_topics(tenant, result, objv_tracker, y, dpp);
   if (ret < 0) {
     ldpp_dout(dpp, 10) << "WARNING: failed to read topics info: ret=" << ret << dendl;
@@ -557,8 +573,8 @@ int RGWPubSub::read_topics(const DoutPrefixProvider *dpp, rgw_pubsub_topics& res
   return 0;
 }
 
-int RGWPubSub::write_topics(const DoutPrefixProvider *dpp, const rgw_pubsub_topics& topics,
-                                    RGWObjVersionTracker *objv_tracker, optional_yield y) const
+int RGWPubSub::write_topics_v1(const DoutPrefixProvider *dpp, const rgw_pubsub_topics& topics,
+                               RGWObjVersionTracker *objv_tracker, optional_yield y) const
 {
   const int ret = driver->write_topics(tenant, topics, objv_tracker, y, dpp);
   if (ret < 0 && ret != -ENOENT) {
@@ -616,7 +632,7 @@ int RGWPubSub::get_topic(const DoutPrefixProvider* dpp,
     return ret;
   }
   rgw_pubsub_topics topics;
-  const int ret = read_topics(dpp, topics, nullptr, y);
+  const int ret = read_topics_v1(dpp, topics, nullptr, y);
   if (ret < 0) {
     ldpp_dout(dpp, 1) << "ERROR: failed to read topics info: ret=" << ret << dendl;
     return ret;
@@ -932,7 +948,7 @@ int RGWPubSub::create_topic(const DoutPrefixProvider* dpp,
   RGWObjVersionTracker objv_tracker;
   rgw_pubsub_topics topics;
 
-  int ret = read_topics(dpp, topics, &objv_tracker, y);
+  int ret = read_topics_v1(dpp, topics, &objv_tracker, y);
   if (ret < 0 && ret != -ENOENT) {
     // its not an error if not topics exist, we create one
     ldpp_dout(dpp, 1) << "ERROR: failed to read topics info: ret=" << ret << dendl;
@@ -947,7 +963,7 @@ int RGWPubSub::create_topic(const DoutPrefixProvider* dpp,
   new_topic.opaque_data = opaque_data;
   new_topic.policy_text = policy_text;
 
-  ret = write_topics(dpp, topics, &objv_tracker, y);
+  ret = write_topics_v1(dpp, topics, &objv_tracker, y);
   if (ret < 0) {
     ldpp_dout(dpp, 1) << "ERROR: failed to write topics info: ret=" << ret << dendl;
     return ret;
@@ -989,7 +1005,7 @@ int RGWPubSub::remove_topic(const DoutPrefixProvider *dpp, const std::string& na
   RGWObjVersionTracker objv_tracker;
   rgw_pubsub_topics topics;
 
-  int ret = read_topics(dpp, topics, &objv_tracker, y);
+  int ret = read_topics_v1(dpp, topics, &objv_tracker, y);
   if (ret < 0 && ret != -ENOENT) {
     ldpp_dout(dpp, 1) << "ERROR: failed to read topics info: ret=" << ret << dendl;
     return ret;
@@ -1001,7 +1017,7 @@ int RGWPubSub::remove_topic(const DoutPrefixProvider *dpp, const std::string& na
 
   topics.topics.erase(name);
 
-  ret = write_topics(dpp, topics, &objv_tracker, y);
+  ret = write_topics_v1(dpp, topics, &objv_tracker, y);
   if (ret < 0) {
     ldpp_dout(dpp, 1) << "ERROR: failed to remove topics info: ret=" << ret << dendl;
     return ret;
index ed7856721756c6545e457fd893332a8097460dbf..519c1053ab31f201b674d64830af87af5c0c814a 100644 (file)
@@ -562,10 +562,10 @@ class RGWPubSub
   const std::string tenant;
   bool use_notification_v2 = false;
 
-  int read_topics(const DoutPrefixProvider *dpp, rgw_pubsub_topics& result, 
-      RGWObjVersionTracker* objv_tracker, optional_yield y) const;
-  int write_topics(const DoutPrefixProvider *dpp, const rgw_pubsub_topics& topics,
-                       RGWObjVersionTracker* objv_tracker, optional_yield y) const;
+  int read_topics_v1(const DoutPrefixProvider *dpp, rgw_pubsub_topics& result,
+                     RGWObjVersionTracker* objv_tracker, optional_yield y) const;
+  int write_topics_v1(const DoutPrefixProvider *dpp, const rgw_pubsub_topics& topics,
+                      RGWObjVersionTracker* objv_tracker, optional_yield y) const;
 
 public:
   RGWPubSub(rgw::sal::Driver* _driver, const std::string& tenant);
@@ -620,11 +620,13 @@ public:
     int remove_notifications(const DoutPrefixProvider *dpp, optional_yield y) const;
   };
 
-  // get the list of topics
-  // return 0 on success or if no topic was associated with the bucket, error code otherwise
-  int get_topics(const DoutPrefixProvider *dpp, rgw_pubsub_topics& result, optional_yield y) const {
-    return read_topics(dpp, result, nullptr, y);
-  }
+  // get a paginated list of topics
+  // return 0 on success, error code otherwise
+  int get_topics(const DoutPrefixProvider* dpp,
+                 const std::string& start_marker, int max_items,
+                 rgw_pubsub_topics& result, std::string& next_marker,
+                 optional_yield y) const;
+
   // get a topic with by its name and populate it into "result"
   // return -ENOENT if the topic does not exists
   // return 0 on success, error code otherwise.
index c64b5785337254b03d7fa138a413fb684a2f2288..66574cc827240f0d63648c214c93baa68113a3e7 100644 (file)
@@ -293,6 +293,7 @@ void RGWPSCreateTopicOp::execute(optional_yield y) {
 class RGWPSListTopicsOp : public RGWOp {
 private:
   rgw_pubsub_topics result;
+  std::string next_token;
 
 public:
   int verify_permission(optional_yield) override {
@@ -325,15 +326,21 @@ public:
     f->close_section(); // ListTopicsResult
     f->open_object_section("ResponseMetadata");
     encode_xml("RequestId", s->req_id, f); 
-    f->close_section(); // ResponseMetadat
+    f->close_section(); // ResponseMetadata
+    if (!next_token.empty()) {
+      encode_xml("NextToken", next_token, f);
+    }
     f->close_section(); // ListTopicsResponse
     rgw_flush_formatter_and_reset(s, f);
   }
 };
 
 void RGWPSListTopicsOp::execute(optional_yield y) {
+  const std::string start_token = s->info.args.get("NextToken");
+
   const RGWPubSub ps(driver, s->owner.id.tenant, *s->penv.site);
-  op_ret = ps.get_topics(this, result, y);
+  constexpr int max_items = 100;
+  op_ret = ps.get_topics(this, start_token, max_items, result, next_token, y);
   // if there are no topics it is not considered an error
   op_ret = op_ret == -ENOENT ? 0 : op_ret;
   if (op_ret < 0) {