]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw/notifications: sharded bucket notifications 64018/head
authorAdarsh <dev.9401adarsh@gmail.com>
Wed, 18 Jun 2025 20:48:34 +0000 (20:48 +0000)
committerYuval Lifshitz <ylifshit@ibm.com>
Tue, 9 Sep 2025 14:28:04 +0000 (14:28 +0000)
* sharding is based on hash of bucket_name:object_key
* compatible with v1 topics
* include tests to cover backward compatibility and mixed cluster

Signed-off-by: Adarsh <dev.9401adarsh@gmail.com>
12 files changed:
PendingReleaseNotes
doc/radosgw/notifications.rst
src/common/options/rgw.yaml.in
src/rgw/driver/rados/rgw_notify.cc
src/rgw/driver/rados/rgw_notify.h
src/rgw/radosgw-admin/radosgw-admin.cc
src/rgw/rgw_pubsub.cc
src/rgw/rgw_pubsub.h
src/rgw/rgw_rest_pubsub.cc
src/test/rgw/bucket_notification/api.py
src/test/rgw/bucket_notification/test_bn.py
src/test/rgw/test-rgw-common.sh

index 0cc53d9f93e061f3504beb4e37905f08dcd595bb..e9ae678b2533c5fa5d09ef7662043cda43c3d38b 100644 (file)
@@ -762,3 +762,11 @@ Relevant tracker: https://tracker.ceph.com/issues/64777
 when decoding, resulting in Linux codes on the wire, and host codes on the receiver.
 All CEPHFS_E* defines have been removed across Ceph (including the Python binding).
 Relevant tracker: https://tracker.ceph.com/issues/64611
+
+* RGW: Persistent bucket notifications will use queues with multiple shards instead of one queue. Number of shards
+can be configured using the `rgw` option `rgw_bucket_persistent_notif_num_shards`. Note that pre-existing topics will continue to function as is, i.e, they are mapped to only one RADOS object.   
+  
+  For more details, see: 
+  https://docs.ceph.com/en/latest/radosgw/notifications/
+
+Relevant tracker: https://tracker.ceph.com/issues/71677
index 64da2445c62612d5f7f9a2c99ecc1635769fd8e0..cb7135194416f29d0c611561ded966fa018e3207 100644 (file)
@@ -85,6 +85,18 @@ which tells the client that it may retry later.
 .. tip:: To minimize the latency added by asynchronous notification, we
    recommended placing the "log" pool on fast media.
 
+Persistent bucket notifications are managed by the following central configuration options:
+
+.. confval:: rgw_bucket_persistent_notif_num_shards
+
+.. note:: When a topic is created during a Ceph upgrade, per-key reordering of notifications may
+   happen on any bucket mapped to that topic.
+   
+.. note:: Persistent topics that were created on a radosgw that does not support sharding, will be treated as a single shard topics
+
+.. tip:: It is also recommended that you avoid modifying or deleting topics created during 
+   upgrades, as this might result in orphan RADOS objects that will not be deleted when the topic is deleted.
+
 
 Topic Management via CLI
 ------------------------
index ffec3bacb2487fd5bf3248946d7d30942b4466f4..75d22af12fd68c655a4c00a4a6b64c067775ab04 100644 (file)
@@ -4511,3 +4511,12 @@ options:
   services:
   - rgw
   with_legacy: true
+- name: rgw_bucket_persistent_notif_num_shards
+  type: uint
+  level: advanced
+  desc: Number of shards for a persistent topic.
+  long_desc: Number of shards of persistent topics. The notifications will be sharded by a combination of 
+    the bucket and key name. Changing the number effect only new topics and does not change exiting ones.
+  default: 11
+  services: 
+    - rgw
index c7876ec62413d683258fc15ffc3e2beb6e392607..8bad6a525e9aa6aecb9387a5eb1b60fa4a8d1cea 100644 (file)
@@ -1084,6 +1084,19 @@ static inline bool notification_match(reservation_t& res,
   return true;
 }
 
+
+static inline uint64_t get_target_shard(const DoutPrefixProvider* dpp, const std::string& bucket_name, const std::string& object_key, const uint64_t num_shards) {
+  std::hash<std::string> hash_fn; 
+  std::string hash_key = fmt::format("{}:{}", bucket_name, object_key);
+  size_t hash = hash_fn(hash_key); 
+  ldpp_dout(dpp, 20) << "INFO: Hash Value (hash) is:  " << hash << ". Hash Key: " << bucket_name << ":" << object_key << dendl; 
+  return hash % num_shards; 
+}
+
+static inline std::string get_shard_name(const std::string& topic_name, const uint64_t& shard_id) {
+  return (shard_id == 0) ? topic_name : fmt::format("{}.{}", topic_name, shard_id);
+}
+
 int publish_reserve(const DoutPrefixProvider* dpp,
                     const SiteConfig& site,
                     const EventTypeList& event_types,
@@ -1145,6 +1158,7 @@ int publish_reserve(const DoutPrefixProvider* dpp,
       }
 
       cls_2pc_reservation::id_t res_id = cls_2pc_reservation::NO_ID;
+      uint64_t target_shard = 0; 
       if (topic_cfg.dest.persistent) {
         // TODO: take default reservation size from conf
         constexpr auto DEFAULT_RESERVATION = 4 * 1024U;  // 4K
@@ -1152,15 +1166,21 @@ int publish_reserve(const DoutPrefixProvider* dpp,
         librados::ObjectWriteOperation op;
         bufferlist obl;
         int rval;
-        const auto& queue_name = topic_cfg.dest.persistent_queue;
+        const std::string bucket_name = res.bucket->get_name(); 
+        const std::string object_key = res.object_name ? *res.object_name : res.object->get_name();
+        const uint64_t num_shards = topic_cfg.dest.num_shards; 
+        target_shard = get_target_shard(
+            dpp, bucket_name, object_key, num_shards); 
+        const auto shard_name = get_shard_name(topic_cfg.dest.persistent_queue, target_shard);
+        ldpp_dout(res.dpp, 1) << "INFO: target_shard: " << shard_name << dendl;       
         cls_2pc_queue_reserve(op, res.size, 1, &obl, &rval);
         auto ret = rgw_rados_operate(
-            res.dpp, res.store->getRados()->get_notif_pool_ctx(), queue_name,
+            res.dpp, res.store->getRados()->get_notif_pool_ctx(), shard_name,
             std::move(op), res.yield, librados::OPERATION_RETURNVEC);
         if (ret < 0) {
           ldpp_dout(res.dpp, 1)
               << "ERROR: failed to reserve notification on queue: "
-              << queue_name << ". error: " << ret << dendl;
+              << shard_name << ". error: " << ret << dendl;
           // if no space is left in queue we ask client to slow down
           return (ret == -ENOSPC) ? -ERR_RATE_LIMITED : ret;
         }
@@ -1173,7 +1193,7 @@ int publish_reserve(const DoutPrefixProvider* dpp,
         }
       }
 
-      res.topics.emplace_back(topic_filter.s3_id, topic_cfg, res_id, event_type);
+      res.topics.emplace_back(topic_filter.s3_id, topic_cfg, res_id, event_type, target_shard);
     }
   }
   return 0;
@@ -1209,25 +1229,27 @@ int publish_commit(rgw::sal::Object* obj,
       event_entry.retry_sleep_duration = topic.cfg.dest.retry_sleep_duration;
       bufferlist bl;
       encode(event_entry, bl);
-      const auto& queue_name = topic.cfg.dest.persistent_queue;
+      uint64_t target_shard = topic.shard_id;
+      const auto shard_name = get_shard_name(topic.cfg.dest.persistent_queue, target_shard);  
+      ldpp_dout(res.dpp, 1) << "INFO: target_shard: " << shard_name << dendl;   
       if (bl.length() > res.size) {
         // try to make a larger reservation, fail only if this is not possible
         ldpp_dout(dpp, 5) << "WARNING: committed size: " << bl.length()
                          << " exceeded reserved size: " << res.size
                          <<
-          " . trying to make a larger reservation on queue:" << queue_name
+          " . trying to make a larger reservation on queue:" << shard_name
                          << dendl;
         // first cancel the existing reservation
         librados::ObjectWriteOperation op;
         cls_2pc_queue_abort(op, topic.res_id);
         auto ret = rgw_rados_operate(
          dpp, res.store->getRados()->get_notif_pool_ctx(),
-         queue_name, std::move(op),
+         shard_name, std::move(op),
          res.yield);
         if (ret < 0) {
           ldpp_dout(dpp, 1) << "ERROR: failed to abort reservation: "
                            << topic.res_id << 
-            " when trying to make a larger reservation on queue: " << queue_name
+            " when trying to make a larger reservation on queue: " << shard_name
                            << ". error: " << ret << dendl;
           return ret;
         }
@@ -1238,10 +1260,10 @@ int publish_commit(rgw::sal::Object* obj,
         cls_2pc_queue_reserve(op, bl.length(), 1, &obl, &rval);
         ret = rgw_rados_operate(
          dpp, res.store->getRados()->get_notif_pool_ctx(),
-          queue_name, std::move(op), res.yield, librados::OPERATION_RETURNVEC);
+          shard_name, std::move(op), res.yield, librados::OPERATION_RETURNVEC);
         if (ret < 0) {
           ldpp_dout(dpp, 1) << "ERROR: failed to reserve extra space on queue: "
-                           << queue_name
+                           << shard_name
                            << ". error: " << ret << dendl;
           return (ret == -ENOSPC) ? -ERR_RATE_LIMITED : ret;
         }
@@ -1256,12 +1278,12 @@ int publish_commit(rgw::sal::Object* obj,
       librados::ObjectWriteOperation op;
       cls_2pc_queue_commit(op, bl_data_vec, topic.res_id);
       topic.res_id = cls_2pc_reservation::NO_ID;
-      auto pcc_arg = make_unique<PublishCommitCompleteArg>(queue_name, dpp->get_cct());
+      auto pcc_arg = make_unique<PublishCommitCompleteArg>(shard_name, dpp->get_cct());
       aio_completion_ptr completion{librados::Rados::aio_create_completion(pcc_arg.get(), publish_commit_completion)};
       auto& io_ctx = res.store->getRados()->get_notif_pool_ctx();
-      if (const int ret = io_ctx.aio_operate(queue_name, completion.get(), &op); ret < 0) {
+      if (const int ret = io_ctx.aio_operate(shard_name, completion.get(), &op); ret < 0) {
         ldpp_dout(dpp, 1) << "ERROR: failed to commit reservation to queue: "
-                          << queue_name << ". error: " << ret << dendl;
+                          << shard_name << ". error: " << ret << dendl;
         return ret;
       }
       // args will be released inside the callback
@@ -1304,16 +1326,18 @@ int publish_abort(reservation_t& res) {
       // nothing to abort or already committed/aborted
       continue;
     }
-    const auto& queue_name = topic.cfg.dest.persistent_queue;
+    uint64_t target_shard = topic.shard_id;   
+    const auto shard_name = get_shard_name(topic.cfg.dest.persistent_queue, target_shard);   
+    ldpp_dout(res.dpp, 1) << "INFO: target_shard: " << shard_name << dendl;
     librados::ObjectWriteOperation op;
     cls_2pc_queue_abort(op, topic.res_id);
     const auto ret = rgw_rados_operate(
       res.dpp, res.store->getRados()->get_notif_pool_ctx(),
-      queue_name, std::move(op), res.yield);
+      shard_name, std::move(op), res.yield);
     if (ret < 0) {
       ldpp_dout(res.dpp, 1) << "ERROR: failed to abort reservation: "
                            << topic.res_id <<
-        " from queue: " << queue_name << ". error: " << ret << dendl;
+        " from queue: " << shard_name << ". error: " << ret << dendl;
       return ret;
     }
     topic.res_id = cls_2pc_reservation::NO_ID;
@@ -1322,23 +1346,33 @@ int publish_abort(reservation_t& res) {
 }
 
 int get_persistent_queue_stats(const DoutPrefixProvider *dpp, librados::IoCtx &rados_ioctx,
-                               const std::string &queue_name, rgw_topic_stats &stats, optional_yield y)
+                               ShardNamesView shards, rgw_topic_stats &stats, optional_yield y)
 {
   // TODO: use optional_yield instead calling rados_ioctx.operate() synchronously
   cls_2pc_reservations reservations;
-  auto ret = cls_2pc_queue_list_reservations(rados_ioctx, queue_name, reservations);
-  if (ret < 0) {
-    ldpp_dout(dpp, 1) << "ERROR: failed to read queue list reservation: " << ret << dendl;
-    return ret;
-  }
-  stats.queue_reservations = reservations.size();
-
-  ret = cls_2pc_queue_get_topic_stats(rados_ioctx, queue_name, stats.queue_entries, stats.queue_size);
-  if (ret < 0) {
-    ldpp_dout(dpp, 1) << "ERROR: failed to get the queue size or the number of entries: " << ret << dendl;
-    return ret;
+  uint32_t shard_entries; 
+  uint64_t shard_size;
+
+  stats.queue_reservations = 0; 
+  stats.queue_size = 0; 
+  stats.queue_entries = 0; 
+  for(const auto& shard_name: shards){
+    auto ret = cls_2pc_queue_list_reservations(rados_ioctx, shard_name, reservations);
+    if (ret < 0) {
+      ldpp_dout(dpp, 1) << "ERROR: failed to read shard: "<< shard_name << "'s list reservation: " << ret << dendl;
+      return ret;
+    }
+    stats.queue_reservations += reservations.size();
+    shard_entries = 0; 
+    shard_size = 0; 
+    ret = cls_2pc_queue_get_topic_stats(rados_ioctx, shard_name, shard_entries, shard_size);
+    stats.queue_size += shard_size; 
+    stats.queue_entries += shard_entries;
+    if (ret < 0) {
+      ldpp_dout(dpp, 1) << "ERROR: failed to get the size or number of entries for queue shard: " << shard_name << ret << dendl;
+      return ret;
+    }
   }
-
   return 0;
 }
 
index 13b3e3b42bdd3e07e39f802b1eba49da378b2e4a..bef54aa6afb2e26ca86d8763709b95002341cafb 100644 (file)
@@ -47,17 +47,19 @@ struct reservation_t {
   struct topic_t {
     topic_t(const std::string& _configurationId, const rgw_pubsub_topic& _cfg,
             cls_2pc_reservation::id_t _res_id,
-            rgw::notify::EventType _event_type)
+            rgw::notify::EventType _event_type, uint64_t shard_id)
         : configurationId(_configurationId),
           cfg(_cfg),
           res_id(_res_id),
-          event_type(_event_type) {}
+          event_type(_event_type),
+          shard_id(shard_id){}
 
     const std::string configurationId;
     const rgw_pubsub_topic cfg;
     // res_id is reset after topic is committed/aborted
     cls_2pc_reservation::id_t res_id;
     rgw::notify::EventType event_type;
+    uint64_t shard_id; 
   };
 
   const DoutPrefixProvider* const dpp;
@@ -132,7 +134,7 @@ int publish_commit(rgw::sal::Object* obj,
 int publish_abort(reservation_t& reservation);
 
 int get_persistent_queue_stats(const DoutPrefixProvider *dpp, librados::IoCtx &rados_ioctx,
-                               const std::string &queue_name, rgw_topic_stats &stats, optional_yield y);
+                               ShardNamesView shards, rgw_topic_stats &stats, optional_yield y);
 
 }
 
index c7fc57d50f46adbf18c32e172e1c7456366480bb..cadd46d87a972c18b037eb1e92c94e1c3362dad6 100644 (file)
@@ -11914,9 +11914,9 @@ next:
     rgw::notify::rgw_topic_stats stats;
     ret = rgw::notify::get_persistent_queue_stats(
         dpp(), ioctx,
-        topic.dest.persistent_queue, stats, null_yield);
+        topic.dest.get_shard_names(), stats, null_yield);
     if (ret < 0) {
-      cerr << "ERROR: could not get persistent queue: " << cpp_strerror(-ret) << std::endl;
+      cerr << "ERROR: could not get persistent queues: " << cpp_strerror(-ret) << std::endl;
       return -ret;
     }
     encode_json("", stats, formatter.get());
@@ -11948,37 +11948,41 @@ next:
     std::string end_marker;
     librados::ObjectReadOperation rop;
     std::vector<cls_queue_entry> queue_entries;
-    bool truncated = true;
+    bool truncated;
     formatter->open_array_section("eventEntries");
-    while (truncated) {
-      bufferlist bl;
-      int rc;
-      cls_2pc_queue_list_entries(rop, marker, max_entries, &bl, &rc);
-      ioctx.operate(topic.dest.persistent_queue, &rop, nullptr);
-      if (rc < 0 ) {
-        cerr << "ERROR: could not list entries from queue. error: " << cpp_strerror(-ret) << std::endl;
-        return -rc;
-      }
-      rc = cls_2pc_queue_list_entries_result(bl, queue_entries, &truncated, end_marker);
-      if (rc < 0) {
-        cerr << "ERROR: failed to parse list entries from queue (skipping). error: " << cpp_strerror(-ret) << std::endl;
-        return -rc;
-      }
-
-      std::for_each(queue_entries.cbegin(), 
-        queue_entries.cend(), 
-        [&formatter](const auto& queue_entry) {
-          rgw::notify::event_entry_t event_entry;
-          bufferlist::const_iterator iter{&queue_entry.data};
-          try {
-            event_entry.decode(iter);
-            encode_json("", event_entry, formatter.get());
-          } catch (const buffer::error& e) {
-            cerr << "ERROR: failed to decode queue entry. error: " << e.what() << std::endl;
-          }
-        });
-      formatter->flush(cout);
-      marker = end_marker;
+
+    for (const auto& shard_name: topic.dest.get_shard_names()){
+      truncated = true; 
+      marker.clear();
+      while (truncated) {
+        bufferlist bl;
+        int rc;
+        cls_2pc_queue_list_entries(rop, marker, max_entries, &bl, &rc);
+        ioctx.operate(shard_name, &rop, nullptr);
+        if (rc < 0 ) {
+          cerr << "ERROR: could not list entries from queue. error: " << cpp_strerror(-ret) << std::endl;
+          return -rc;
+        }
+        rc = cls_2pc_queue_list_entries_result(bl, queue_entries, &truncated, end_marker);
+        if (rc < 0) {
+          cerr << "ERROR: failed to parse list entries from queue (skipping). error: " << cpp_strerror(-ret) << std::endl;
+          return -rc;
+        }
+        std::for_each(queue_entries.cbegin(), 
+          queue_entries.cend(), 
+          [&formatter](const auto& queue_entry) {
+            rgw::notify::event_entry_t event_entry;
+            bufferlist::const_iterator iter{&queue_entry.data};
+            try {
+              event_entry.decode(iter);
+              encode_json("", event_entry, formatter.get());
+            } catch (const buffer::error& e) {
+              cerr << "ERROR: failed to decode queue entry. error: " << e.what() << std::endl;
+            }
+          });
+        formatter->flush(cout);
+        marker = end_marker;
+      }
     }
     formatter->close_section();
     formatter->flush(cout);
index 87a46bd61a6eb9c1b443c2b084e6fbcc3b6c3bb9..8c8f94d04d82ddcb67c18ad1d9218a6ca5d43804 100644 (file)
@@ -21,6 +21,7 @@
 #define dout_subsys ceph_subsys_rgw
 
 static constexpr std::string_view topic_tenant_delim = ":";
+static constexpr std::string_view topic_shard_delim = "."; 
 
 // format and parse topic metadata keys as tenant:name
 std::string get_topic_metadata_key(std::string_view tenant,
@@ -44,6 +45,7 @@ void parse_topic_metadata_key(const std::string& key,
                               std::string& name)
 {
   // expected format: tenant_name:topic_name*
+  // expected format: tenant_name:topic_name.<shard_id>
   auto pos = key.find(topic_tenant_delim);
   if (pos != std::string::npos) {
     tenant = key.substr(0, pos);
@@ -52,6 +54,11 @@ void parse_topic_metadata_key(const std::string& key,
     tenant.clear();
     name = key;
   }
+  //remove shard id if present
+  pos = name.find_last_of(topic_shard_delim);
+  if(pos != std::string::npos){
+    name = name.substr(0, pos);
+  } 
 }
 
 void set_event_id(std::string& id, const std::string& hash, const utime_t& ts) {
@@ -352,6 +359,13 @@ void rgw_pubsub_dest::decode_json(JSONObj* f) {
   JSONDecoder::decode_json("retry_sleep_duration", sleep_dur, f);
   retry_sleep_duration = sleep_dur == DEFAULT_CONFIG ? DEFAULT_GLOBAL_VALUE
                                                      : std::stoul(sleep_dur);
+
+}
+
+ShardNamesView rgw_pubsub_dest::get_shard_names() const {
+  const std::string base_name = persistent_queue; 
+  auto get_shard_name = [base_name](uint64_t i){return i != 0 ? fmt::format("{}.{}", base_name, i) : base_name;};
+  return std::ranges::views::iota(0ul, num_shards) | std::ranges::views::transform(std::function<std::string(uint64_t)>(get_shard_name));
 }
 
 RGWPubSub::RGWPubSub(rgw::sal::Driver* _driver,
@@ -897,12 +911,16 @@ int RGWPubSub::remove_topic_v2(const DoutPrefixProvider* dpp,
   const rgw_pubsub_dest& dest = topic.dest;
   if (!dest.push_endpoint.empty() && dest.persistent &&
       !dest.persistent_queue.empty()) {
-    ret = driver->remove_persistent_topic(dpp, y, dest.persistent_queue);
-    if (ret < 0 && ret != -ENOENT) {
-      ldpp_dout(dpp, 1) << "ERROR: failed to remove queue for "
-          "persistent topic: " << cpp_strerror(ret) << dendl;
-      return ret;
+
+    for(const auto& q: dest.get_shard_names()) {
+      ret = driver->remove_persistent_topic(dpp, y, q);
+      if (ret < 0 && ret != -ENOENT) {
+        ldpp_dout(dpp, 1) << "ERROR: failed to remove shards for "
+            "persistent topic: " << cpp_strerror(ret) << dendl;
+        return ret;
+      }
     }
+    ldpp_dout(dpp, 20) << "Successfully removed " << dest.num_shards << " shards for topic: " + name << dendl;
   }
 
   ret = driver->remove_topic_v2(name, tenant, objv_tracker, y, dpp);
@@ -943,12 +961,15 @@ int RGWPubSub::remove_topic(const DoutPrefixProvider *dpp, const std::string& na
   }
   if (!t->second.dest.push_endpoint.empty() && t->second.dest.persistent &&
       !t->second.dest.persistent_queue.empty()) {
-    ret = driver->remove_persistent_topic(dpp, y, t->second.dest.persistent_queue);
-    if (ret < 0 && ret != -ENOENT) {
-      ldpp_dout(dpp, 1) << "ERROR: failed to remove queue for "
-          "persistent topic: " << cpp_strerror(ret) << dendl;
-      return ret;
-    }
+      for(const auto& q: t->second.dest.get_shard_names()) {
+        ret = driver->remove_persistent_topic(dpp, y, q);
+        if (ret < 0 && ret != -ENOENT) {
+          ldpp_dout(dpp, 1) << "ERROR: failed to remove shards for "
+              "persistent topic: " << cpp_strerror(ret) << dendl;
+          return ret;
+        }
+      }
+      ldpp_dout(dpp, 20) << "Successfully removed " << t->second.dest.num_shards << " shards for topic: " + name << dendl;
   }
   topics.topics.erase(t);
 
index e96a6aa28c3866ba6cb70ab64964075ed615a0ad..3dbaec0d967ffdac1f30f7dcfa363de142c893c7 100644 (file)
@@ -10,6 +10,7 @@
 #include "rgw_notify_event_type.h"
 #include <boost/container/flat_map.hpp>
 #include "rgw_s3_filter.h"
+#include <ranges>
 
 class XMLObj;
 
@@ -252,6 +253,8 @@ WRITE_CLASS_ENCODER(rgw_pubsub_s3_event)
 // setting a unique ID for an event based on object hash and timestamp
 void set_event_id(std::string& id, const std::string& hash, const utime_t& ts);
 
+using ShardNamesView = std::ranges::transform_view<std::ranges::iota_view<uint64_t, uint64_t>, std::function<std::string(uint64_t)>>; 
+
 struct rgw_pubsub_dest {
   std::string push_endpoint;
   std::string push_endpoint_args;
@@ -263,9 +266,12 @@ struct rgw_pubsub_dest {
   uint32_t time_to_live;
   uint32_t max_retries;
   uint32_t retry_sleep_duration;
+  // naming convention of sharded queues in the 'notif' pool -> persistent_queue, persistent_queue.1, persistent_queue.(num_shards -1)...
+  uint64_t num_shards; //defaults to a single shard for now, for backward compatibility
+
 
   void encode(bufferlist& bl) const {
-    ENCODE_START(7, 1, bl);
+    ENCODE_START(8, 1, bl);
     encode("", bl);
     encode("", bl);
     encode(push_endpoint, bl);
@@ -277,26 +283,30 @@ struct rgw_pubsub_dest {
     encode(max_retries, bl);
     encode(retry_sleep_duration, bl);
     encode(persistent_queue, bl);
+    encode(num_shards, bl);
     ENCODE_FINISH(bl);
   }
 
   void decode(bufferlist::const_iterator& bl) {
-    DECODE_START(7, bl);
+    DECODE_START(8, bl);
     std::string dummy;
     decode(dummy, bl);
     decode(dummy, bl);
     decode(push_endpoint, bl);
     if (struct_v >= 2) {
-        decode(push_endpoint_args, bl);
+      decode(push_endpoint_args, bl);
     }
     if (struct_v >= 3) {
-        decode(arn_topic, bl);
+      decode(arn_topic, bl);
     }
     if (struct_v >= 4) {
-        decode(stored_secret, bl);
+      decode(stored_secret, bl);
     }
     if (struct_v >= 5) {
-        decode(persistent, bl);
+      decode(persistent, bl);
+    }
+    else {
+      num_shards = persistent ? 1 : 0; //defaults to a single shard for backward compatibility
     }
     if (struct_v >= 6) {
       decode(time_to_live, bl);
@@ -305,11 +315,16 @@ struct rgw_pubsub_dest {
     }
     if (struct_v >= 7) {
       decode(persistent_queue, bl);
-    } else if (persistent) {
+    } 
+    else if (persistent) {
       // persistent topics created before v7 did not support tenant namespacing.
       // continue to use 'arn_topic' alone as the queue's rados object name
       persistent_queue = arn_topic;
     }
+    if (struct_v >= 8) { 
+      decode(num_shards, bl);
+    }
+
     DECODE_FINISH(bl);
   }
 
@@ -317,6 +332,9 @@ struct rgw_pubsub_dest {
   void dump_xml(Formatter *f) const;
   std::string to_json_str() const;
   void decode_json(JSONObj* obj);
+
+  // get the names of the shards in the persistent queue
+  ShardNamesView get_shard_names() const; 
 };
 WRITE_CLASS_ENCODER(rgw_pubsub_dest)
 
index 1e5a5f7e848a7e2f37de065028e7da9bbb117670..b9889d3259b22bafa9cfd4cf9844395a6657f691 100644 (file)
@@ -413,23 +413,41 @@ void RGWPSCreateTopicOp::execute(optional_yield y) {
     }
   }
 
-  // don't add a persistent queue if we already have one
-  const bool already_persistent = topic && topic_needs_queue(topic->dest);
+  // if we already have an existing persistent queue - do nothing - resharding not supported
+  // if we don't have a persistent queue already, just make shards or a single based on dest's num_shards from config
+
+  const bool already_persistent = topic && topic_needs_queue(topic->dest); 
   if (!already_persistent && topic_needs_queue(dest)) {
     // initialize the persistent queue's location, using ':' as the namespace
     // delimiter because its inclusion in a TopicName would break ARNs
+    // new persistent topics to have shards - loading num_shards from config for sharding persistent bucket notifications
+    dest.num_shards = s->get_cct()->_conf.get_val<uint64_t>("rgw_bucket_persistent_notif_num_shards"); 
+    if(dest.num_shards == 0) {
+      dest.num_shards = 1;
+    }
     dest.persistent_queue = string_cat_reserve(
         get_account_or_tenant(s->owner.id), ":", topic_name);
-
-    op_ret = driver->add_persistent_topic(this, y, dest.persistent_queue);
-    if (op_ret < 0) {
-      ldpp_dout(this, 1) << "CreateTopic Action failed to create queue for "
-                            "persistent topics. error:"
-                         << op_ret << dendl;
-      return;
+    for(const auto& q: dest.get_shard_names()){
+      ldpp_dout(this, 20) << "CreateTopic Action creating persistent topic queue: " << q << dendl;
+      op_ret = driver->add_persistent_topic(this, y, q);
+      if (op_ret < 0) {
+        ldpp_dout(this, 1) << "CreateTopic Action failed to create queue/shards for "
+                              "persistent topics. error:"
+                          << op_ret << dendl;
+        return;
+      }
     }
+    ldpp_dout(this, 20) << "Successfully created " << dest.num_shards << " shards for topic: " << topic_name << dendl;
   } else if (already_persistent) {  // redundant call to CreateTopic
+    //no resharding of existing topics for persistent bucket notifications
     dest.persistent_queue = topic->dest.persistent_queue;
+    dest.num_shards = topic->dest.num_shards;
+    if(s->get_cct()->_conf.get_val<uint64_t>("rgw_bucket_persistent_notif_num_shards") != dest.num_shards) {
+      ldpp_dout(this, 20) << "CreateTopic Action called for topic with existing shards, but num_shards in config is different. "
+                            "Not resharding existing topic: " << topic_name << dendl;
+    } else {
+      ldpp_dout(this, 20) << "CreateTopic Action called for topic with existing. Not resharding existing topic: " << topic_name << dendl;
+    }
   }
   const RGWPubSub ps(driver, get_account_or_tenant(s->owner.id), *s->penv.site);
   op_ret = ps.create_topic(this, topic_name, dest, topic_arn.to_string(),
@@ -885,24 +903,35 @@ void RGWPSSetTopicAttributesOp::execute(optional_yield y) {
     // initialize the persistent queue's location, using ':' as the namespace
     // delimiter because its inclusion in a TopicName would break ARNs
     dest.persistent_queue = string_cat_reserve(topic_arn.account, ":", topic_name);
-
-    op_ret = driver->add_persistent_topic(this, y, dest.persistent_queue);
-    if (op_ret < 0) {
-      ldpp_dout(this, 4)
-          << "SetTopicAttributes Action failed to create queue for "
-             "persistent topics. error:"
-          << op_ret << dendl;
-      return;
+    dest.num_shards = s->get_cct()->_conf.get_val<uint64_t>("rgw_bucket_persistent_notif_num_shards");
+    if (dest.num_shards == 0) {
+      dest.num_shards = 1;
+    }
+    for(const auto& q: dest.get_shard_names()){
+      op_ret = driver->add_persistent_topic(this, y, q);
+      if (op_ret < 0) {
+        ldpp_dout(this, 4)
+            << "SetTopicAttributes Action failed to create queue for "
+              "persistent topics. error:"
+            << op_ret << dendl;
+        return;
+      }
     }
+    ldpp_dout(this, 20) << "Successfully created " << dest.num_shards <<" shards for topic: " << topic_name << dendl;
+
   } else if (already_persistent && !topic_needs_queue(dest)) {
     // changing the persistent topic to non-persistent.
-    op_ret = driver->remove_persistent_topic(this, y, result.dest.persistent_queue);
-    if (op_ret != -ENOENT && op_ret < 0) {
-      ldpp_dout(this, 4) << "SetTopicAttributes Action failed to remove queue "
-                            "for persistent topics. error:"
-                         << op_ret << dendl;
-      return;
+    for(const auto& q: result.dest.get_shard_names()) {
+      op_ret = driver->remove_persistent_topic(this, y, q);
+      if (op_ret != -ENOENT && op_ret < 0) {
+        ldpp_dout(this, 4) << "SetTopicAttributes Action failed to remove queue "
+                              "for persistent topics. error:"
+                          << op_ret << dendl;
+        return;
+      }
+      dest.num_shards = 0; // set to 0 to indicate no sharding
     }
+    ldpp_dout(this, 20) << "Successfully removed " << result.dest.num_shards << " shards for topic: " << topic_name << dendl;
   }
   const RGWPubSub ps(driver, topic_arn.account, *s->penv.site);
   op_ret = ps.create_topic(this, topic_name, dest, topic_arn.to_string(),
index 1c2c4c54b0422fc31082f26fa4e559ffcb9ba3fc..795538d223a97178ef19854480b93dbfd57411d2 100644 (file)
@@ -242,6 +242,12 @@ def admin(args, cluster='noname', **kwargs):
     cmd = [test_path + 'test-rgw-call.sh', 'call_rgw_admin', cluster] + args
     return bash(cmd, **kwargs)
 
+def ceph_admin(args, cluster='noname', **kwargs):
+    """ ceph command """
+    cmd = [test_path + 'test-rgw-call.sh', 'call_ceph', cluster] + args
+    print(' '.join(cmd))
+    return bash(cmd, **kwargs)
+
 def delete_all_topics(conn, tenant, cluster):
     """ delete all topics """
     if tenant == '':
@@ -263,3 +269,7 @@ def delete_all_topics(conn, tenant, cluster):
             for topic in topics_json:
                 admin(['topic', 'rm', '--tenant', tenant, '--topic', topic['name']], cluster)
 
+def set_rgw_config_option(client, option, value, cluster='noname'):
+    """ change a config option """
+    print(f'Setting {option} to {value} for {client} in cluster {cluster}')
+    return ceph_admin(['config', 'set', client, option, str(value)], cluster)
\ No newline at end of file
index e27517ebd2f2cc7430be871af211f6930e548747..cd2b2857ff0d6afc2648393ee515c4275709acfb 100644 (file)
@@ -38,7 +38,9 @@ from .api import PSTopicS3, \
     delete_all_objects, \
     delete_all_topics, \
     put_object_tagging, \
-    admin
+    admin, \
+    set_rgw_config_option, \
+    bash
 
 from nose import SkipTest
 from nose.tools import assert_not_equal, assert_equal, assert_in, assert_not_in, assert_true
@@ -3076,13 +3078,13 @@ def wait_for_queue_to_drain(topic_name, tenant=None, account=None, http_port=Non
         entries = parsed_result['Topic Stats']['Entries']
         retries += 1
         time_diff = time.time() - start_time
-        log.info('queue %s has %d entries after %ds', topic_name, entries, time_diff)
-        if retries > 30:
-            log.warning('queue %s still has %d entries after %ds', topic_name, entries, time_diff)
+        log.info('shards for %s has %d entries after %ds', topic_name, entries, time_diff)
+        if retries > 100:
+            log.warning('shards for %s still has %d entries after %ds', topic_name, entries, time_diff)
             assert_equal(entries, 0)
         time.sleep(5)
     time_diff = time.time() - start_time
-    log.info('waited for %ds for queue %s to drain', time_diff, topic_name)
+    log.info('waited for %ds for shards of %s to drain', time_diff, topic_name)
 
 
 def persistent_topic_stats(conn, endpoint_type):
@@ -6014,3 +6016,122 @@ def test_topic_migration_to_an_account():
             get_config_cluster(),
         )
         admin(["account", "rm", "--account-id", account_id], get_config_cluster())
+
+def persistent_notification_shard_config_change(endpoint_type, conn, new_num_shards, old_num_shards=11): 
+    """ test persistent notification shard config change """
+    """ test to check if notifications work when config value for determining num_shards is changed..."""
+    
+    default_num_shards = 11
+    rgw_client = f'client.rgw.{get_config_port()}'
+    if (old_num_shards != default_num_shards):
+        set_rgw_config_option(rgw_client, 'rgw_bucket_persistent_notif_num_shards', old_num_shards)
+
+    bucket_name = gen_bucket_name()
+    bucket = conn.create_bucket(bucket_name)
+    topic_name = bucket_name + TOPIC_SUFFIX
+
+    #start receiver thread based on conn type
+    # start endpoint receiver
+    host = get_ip()
+    task = None
+    port = None
+    if endpoint_type == 'http':
+        # create random port for the http server
+        port = random.randint(10000, 20000)
+        # start an http server in a separate thread
+        receiver = HTTPServerWithEvents((host, port))
+        endpoint_address = 'http://'+host+':'+str(port)
+        endpoint_args = 'push-endpoint='+endpoint_address+'&persistent=true'
+    elif endpoint_type == 'kafka':
+        # start kafka receiver
+        task, receiver = create_kafka_receiver_thread(topic_name)
+        task.start()
+        endpoint_address = 'kafka://' + host
+        endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker&persistent=true'
+    else:
+        return SkipTest('Unknown endpoint type: ' + endpoint_type)
+
+    zonegroup = get_config_zonegroup()
+    topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
+    topic_arn = topic_conf.set_config()
+
+    # create s3 notification
+    notif_1 = bucket_name +  '_notif_1'
+    topic_conf_list = [{'Id': notif_1, 'TopicArn': topic_arn,
+        'Events': ['s3:ObjectCreated:*', 's3:ObjectRemoved:*']
+    }]
+
+    s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
+    _, status = s3_notification_conf.set_config()
+    assert_equal(status/100, 2)
+
+    ## create objects in the bucket (async)
+    expected_keys = []
+    create_object_and_verify_events(bucket, 'foo', topic_name, receiver, expected_keys, deletions=True)
+
+    ## change config value for num_shards to new_num_shards
+    set_rgw_config_option(rgw_client, 'rgw_bucket_persistent_notif_num_shards', new_num_shards)
+    
+    ## create objects in the bucket (async)
+    expected_keys = []
+    create_object_and_verify_events(bucket, 'bar', topic_name, receiver, expected_keys, deletions=True)
+
+    # cleanup
+    receiver.close(task)
+    s3_notification_conf.del_config()
+    topic_conf.del_config()
+    # delete the bucket
+    conn.delete_bucket(bucket_name)
+
+    ##revert config value for num_shards to default
+    if (new_num_shards != default_num_shards):
+        set_rgw_config_option(rgw_client, 'rgw_bucket_persistent_notif_num_shards', default_num_shards)
+
+
+def create_object_and_verify_events(bucket, key_name, topic_name, receiver, expected_keys, deletions=False):
+    key = bucket.new_key(key_name)
+    key.set_contents_from_string('aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa')
+    expected_keys.append(key_name)
+
+    print('wait for the messages...')
+    wait_for_queue_to_drain(topic_name)
+    events = receiver.get_and_reset_events()
+    assert_equal(len(events), len(expected_keys))
+    for event in events:
+        assert(event['Records'][0]['s3']['object']['key'] in expected_keys)
+
+    if deletions:
+        # delete objects
+        for key in bucket.list():
+            key.delete()
+        print('wait for the messages...')
+        wait_for_queue_to_drain(topic_name)
+        # check endpoint receiver
+        events = receiver.get_and_reset_events()
+        assert_equal(len(events), len(expected_keys))
+        for event in events:
+            assert(event['Records'][0]['s3']['object']['key'] in expected_keys)
+
+@attr('http_test')
+def test_backward_compatibility_persistent_sharded_topic_http(): 
+    conn = connection()
+    persistent_notification_shard_config_change('http', conn, new_num_shards=11, old_num_shards=1)
+
+@attr('kafka_test')
+def test_backward_compatibility_persistent_sharded_topic_kafka(): 
+    conn = connection()
+    persistent_notification_shard_config_change('kafka', conn, new_num_shards=11, old_num_shards=1)
+
+@attr('http_test')
+def test_persistent_sharded_topic_config_change_http():
+    conn = connection()
+    new_num_shards = random.randint(2, 10)
+    default_num_shards = 11
+    persistent_notification_shard_config_change('http', conn, new_num_shards, default_num_shards)
+
+@attr('kafka_test')
+def test_persistent_sharded_topic_config_change_kafka():
+    conn = connection()
+    new_num_shards = random.randint(2, 10)
+    default_num_shards = 11
+    persistent_notification_shard_config_change('kafka', conn, new_num_shards, default_num_shards)
\ No newline at end of file
index 4dc9438303fd7329fc63ff747f1d30ee1ec0e094..f0f7537ce52f79c89c7c2d5d907b5316d5be2c6e 100644 (file)
@@ -92,6 +92,11 @@ function rgw {
   echo "$mrgw $name $port $ssl_port $rgw_flags $@"
 }
 
+function ceph {
+  [ $# -lt 1 ] && echo "ceph() needs atleast 1 param" && exit 1
+  echo "$mrun $1 ceph"
+}
+
 function init_first_zone {
   [ $# -ne 7 ] && echo "init_first_zone() needs 7 params" && exit 1
 
@@ -173,6 +178,13 @@ function call_rgw_rados {
   x $(rgw_rados $cid) "$@"
 }
 
+function call_ceph {
+  cid=$1
+  shift 1
+  echo $@
+  x $(ceph $cid) "$@"
+}
+
 function get_mstart_parameters {
   [ $# -ne 1 ] && echo "get_mstart_parameters() needs 1 param" && exit 1
   # bash arrays start from zero
@@ -204,4 +216,3 @@ function get_mstart_parameters {
 
   echo "$parameters $VSTART_PARAMETERS"
 }
-