when decoding, resulting in Linux codes on the wire, and host codes on the receiver.
All CEPHFS_E* defines have been removed across Ceph (including the Python binding).
Relevant tracker: https://tracker.ceph.com/issues/64611
+
+* RGW: Persistent bucket notifications will use queues with multiple shards instead of one queue. Number of shards
+can be configured using the `rgw` option `rgw_bucket_persistent_notif_num_shards`. Note that pre-existing topics will continue to function as is, i.e, they are mapped to only one RADOS object.
+
+ For more details, see:
+ https://docs.ceph.com/en/latest/radosgw/notifications/
+
+Relevant tracker: https://tracker.ceph.com/issues/71677
.. tip:: To minimize the latency added by asynchronous notification, we
recommended placing the "log" pool on fast media.
+Persistent bucket notifications are managed by the following central configuration options:
+
+.. confval:: rgw_bucket_persistent_notif_num_shards
+
+.. note:: When a topic is created during a Ceph upgrade, per-key reordering of notifications may
+ happen on any bucket mapped to that topic.
+
+.. note:: Persistent topics that were created on a radosgw that does not support sharding, will be treated as a single shard topics
+
+.. tip:: It is also recommended that you avoid modifying or deleting topics created during
+ upgrades, as this might result in orphan RADOS objects that will not be deleted when the topic is deleted.
+
Topic Management via CLI
------------------------
services:
- rgw
with_legacy: true
+- name: rgw_bucket_persistent_notif_num_shards
+ type: uint
+ level: advanced
+ desc: Number of shards for a persistent topic.
+ long_desc: Number of shards of persistent topics. The notifications will be sharded by a combination of
+ the bucket and key name. Changing the number effect only new topics and does not change exiting ones.
+ default: 11
+ services:
+ - rgw
return true;
}
+
+static inline uint64_t get_target_shard(const DoutPrefixProvider* dpp, const std::string& bucket_name, const std::string& object_key, const uint64_t num_shards) {
+ std::hash<std::string> hash_fn;
+ std::string hash_key = fmt::format("{}:{}", bucket_name, object_key);
+ size_t hash = hash_fn(hash_key);
+ ldpp_dout(dpp, 20) << "INFO: Hash Value (hash) is: " << hash << ". Hash Key: " << bucket_name << ":" << object_key << dendl;
+ return hash % num_shards;
+}
+
+static inline std::string get_shard_name(const std::string& topic_name, const uint64_t& shard_id) {
+ return (shard_id == 0) ? topic_name : fmt::format("{}.{}", topic_name, shard_id);
+}
+
int publish_reserve(const DoutPrefixProvider* dpp,
const SiteConfig& site,
const EventTypeList& event_types,
}
cls_2pc_reservation::id_t res_id = cls_2pc_reservation::NO_ID;
+ uint64_t target_shard = 0;
if (topic_cfg.dest.persistent) {
// TODO: take default reservation size from conf
constexpr auto DEFAULT_RESERVATION = 4 * 1024U; // 4K
librados::ObjectWriteOperation op;
bufferlist obl;
int rval;
- const auto& queue_name = topic_cfg.dest.persistent_queue;
+ const std::string bucket_name = res.bucket->get_name();
+ const std::string object_key = res.object_name ? *res.object_name : res.object->get_name();
+ const uint64_t num_shards = topic_cfg.dest.num_shards;
+ target_shard = get_target_shard(
+ dpp, bucket_name, object_key, num_shards);
+ const auto shard_name = get_shard_name(topic_cfg.dest.persistent_queue, target_shard);
+ ldpp_dout(res.dpp, 1) << "INFO: target_shard: " << shard_name << dendl;
cls_2pc_queue_reserve(op, res.size, 1, &obl, &rval);
auto ret = rgw_rados_operate(
- res.dpp, res.store->getRados()->get_notif_pool_ctx(), queue_name,
+ res.dpp, res.store->getRados()->get_notif_pool_ctx(), shard_name,
std::move(op), res.yield, librados::OPERATION_RETURNVEC);
if (ret < 0) {
ldpp_dout(res.dpp, 1)
<< "ERROR: failed to reserve notification on queue: "
- << queue_name << ". error: " << ret << dendl;
+ << shard_name << ". error: " << ret << dendl;
// if no space is left in queue we ask client to slow down
return (ret == -ENOSPC) ? -ERR_RATE_LIMITED : ret;
}
}
}
- res.topics.emplace_back(topic_filter.s3_id, topic_cfg, res_id, event_type);
+ res.topics.emplace_back(topic_filter.s3_id, topic_cfg, res_id, event_type, target_shard);
}
}
return 0;
event_entry.retry_sleep_duration = topic.cfg.dest.retry_sleep_duration;
bufferlist bl;
encode(event_entry, bl);
- const auto& queue_name = topic.cfg.dest.persistent_queue;
+ uint64_t target_shard = topic.shard_id;
+ const auto shard_name = get_shard_name(topic.cfg.dest.persistent_queue, target_shard);
+ ldpp_dout(res.dpp, 1) << "INFO: target_shard: " << shard_name << dendl;
if (bl.length() > res.size) {
// try to make a larger reservation, fail only if this is not possible
ldpp_dout(dpp, 5) << "WARNING: committed size: " << bl.length()
<< " exceeded reserved size: " << res.size
<<
- " . trying to make a larger reservation on queue:" << queue_name
+ " . trying to make a larger reservation on queue:" << shard_name
<< dendl;
// first cancel the existing reservation
librados::ObjectWriteOperation op;
cls_2pc_queue_abort(op, topic.res_id);
auto ret = rgw_rados_operate(
dpp, res.store->getRados()->get_notif_pool_ctx(),
- queue_name, std::move(op),
+ shard_name, std::move(op),
res.yield);
if (ret < 0) {
ldpp_dout(dpp, 1) << "ERROR: failed to abort reservation: "
<< topic.res_id <<
- " when trying to make a larger reservation on queue: " << queue_name
+ " when trying to make a larger reservation on queue: " << shard_name
<< ". error: " << ret << dendl;
return ret;
}
cls_2pc_queue_reserve(op, bl.length(), 1, &obl, &rval);
ret = rgw_rados_operate(
dpp, res.store->getRados()->get_notif_pool_ctx(),
- queue_name, std::move(op), res.yield, librados::OPERATION_RETURNVEC);
+ shard_name, std::move(op), res.yield, librados::OPERATION_RETURNVEC);
if (ret < 0) {
ldpp_dout(dpp, 1) << "ERROR: failed to reserve extra space on queue: "
- << queue_name
+ << shard_name
<< ". error: " << ret << dendl;
return (ret == -ENOSPC) ? -ERR_RATE_LIMITED : ret;
}
librados::ObjectWriteOperation op;
cls_2pc_queue_commit(op, bl_data_vec, topic.res_id);
topic.res_id = cls_2pc_reservation::NO_ID;
- auto pcc_arg = make_unique<PublishCommitCompleteArg>(queue_name, dpp->get_cct());
+ auto pcc_arg = make_unique<PublishCommitCompleteArg>(shard_name, dpp->get_cct());
aio_completion_ptr completion{librados::Rados::aio_create_completion(pcc_arg.get(), publish_commit_completion)};
auto& io_ctx = res.store->getRados()->get_notif_pool_ctx();
- if (const int ret = io_ctx.aio_operate(queue_name, completion.get(), &op); ret < 0) {
+ if (const int ret = io_ctx.aio_operate(shard_name, completion.get(), &op); ret < 0) {
ldpp_dout(dpp, 1) << "ERROR: failed to commit reservation to queue: "
- << queue_name << ". error: " << ret << dendl;
+ << shard_name << ". error: " << ret << dendl;
return ret;
}
// args will be released inside the callback
// nothing to abort or already committed/aborted
continue;
}
- const auto& queue_name = topic.cfg.dest.persistent_queue;
+ uint64_t target_shard = topic.shard_id;
+ const auto shard_name = get_shard_name(topic.cfg.dest.persistent_queue, target_shard);
+ ldpp_dout(res.dpp, 1) << "INFO: target_shard: " << shard_name << dendl;
librados::ObjectWriteOperation op;
cls_2pc_queue_abort(op, topic.res_id);
const auto ret = rgw_rados_operate(
res.dpp, res.store->getRados()->get_notif_pool_ctx(),
- queue_name, std::move(op), res.yield);
+ shard_name, std::move(op), res.yield);
if (ret < 0) {
ldpp_dout(res.dpp, 1) << "ERROR: failed to abort reservation: "
<< topic.res_id <<
- " from queue: " << queue_name << ". error: " << ret << dendl;
+ " from queue: " << shard_name << ". error: " << ret << dendl;
return ret;
}
topic.res_id = cls_2pc_reservation::NO_ID;
}
int get_persistent_queue_stats(const DoutPrefixProvider *dpp, librados::IoCtx &rados_ioctx,
- const std::string &queue_name, rgw_topic_stats &stats, optional_yield y)
+ ShardNamesView shards, rgw_topic_stats &stats, optional_yield y)
{
// TODO: use optional_yield instead calling rados_ioctx.operate() synchronously
cls_2pc_reservations reservations;
- auto ret = cls_2pc_queue_list_reservations(rados_ioctx, queue_name, reservations);
- if (ret < 0) {
- ldpp_dout(dpp, 1) << "ERROR: failed to read queue list reservation: " << ret << dendl;
- return ret;
- }
- stats.queue_reservations = reservations.size();
-
- ret = cls_2pc_queue_get_topic_stats(rados_ioctx, queue_name, stats.queue_entries, stats.queue_size);
- if (ret < 0) {
- ldpp_dout(dpp, 1) << "ERROR: failed to get the queue size or the number of entries: " << ret << dendl;
- return ret;
+ uint32_t shard_entries;
+ uint64_t shard_size;
+
+ stats.queue_reservations = 0;
+ stats.queue_size = 0;
+ stats.queue_entries = 0;
+ for(const auto& shard_name: shards){
+ auto ret = cls_2pc_queue_list_reservations(rados_ioctx, shard_name, reservations);
+ if (ret < 0) {
+ ldpp_dout(dpp, 1) << "ERROR: failed to read shard: "<< shard_name << "'s list reservation: " << ret << dendl;
+ return ret;
+ }
+ stats.queue_reservations += reservations.size();
+ shard_entries = 0;
+ shard_size = 0;
+ ret = cls_2pc_queue_get_topic_stats(rados_ioctx, shard_name, shard_entries, shard_size);
+ stats.queue_size += shard_size;
+ stats.queue_entries += shard_entries;
+ if (ret < 0) {
+ ldpp_dout(dpp, 1) << "ERROR: failed to get the size or number of entries for queue shard: " << shard_name << ret << dendl;
+ return ret;
+ }
}
-
return 0;
}
struct topic_t {
topic_t(const std::string& _configurationId, const rgw_pubsub_topic& _cfg,
cls_2pc_reservation::id_t _res_id,
- rgw::notify::EventType _event_type)
+ rgw::notify::EventType _event_type, uint64_t shard_id)
: configurationId(_configurationId),
cfg(_cfg),
res_id(_res_id),
- event_type(_event_type) {}
+ event_type(_event_type),
+ shard_id(shard_id){}
const std::string configurationId;
const rgw_pubsub_topic cfg;
// res_id is reset after topic is committed/aborted
cls_2pc_reservation::id_t res_id;
rgw::notify::EventType event_type;
+ uint64_t shard_id;
};
const DoutPrefixProvider* const dpp;
int publish_abort(reservation_t& reservation);
int get_persistent_queue_stats(const DoutPrefixProvider *dpp, librados::IoCtx &rados_ioctx,
- const std::string &queue_name, rgw_topic_stats &stats, optional_yield y);
+ ShardNamesView shards, rgw_topic_stats &stats, optional_yield y);
}
rgw::notify::rgw_topic_stats stats;
ret = rgw::notify::get_persistent_queue_stats(
dpp(), ioctx,
- topic.dest.persistent_queue, stats, null_yield);
+ topic.dest.get_shard_names(), stats, null_yield);
if (ret < 0) {
- cerr << "ERROR: could not get persistent queue: " << cpp_strerror(-ret) << std::endl;
+ cerr << "ERROR: could not get persistent queues: " << cpp_strerror(-ret) << std::endl;
return -ret;
}
encode_json("", stats, formatter.get());
std::string end_marker;
librados::ObjectReadOperation rop;
std::vector<cls_queue_entry> queue_entries;
- bool truncated = true;
+ bool truncated;
formatter->open_array_section("eventEntries");
- while (truncated) {
- bufferlist bl;
- int rc;
- cls_2pc_queue_list_entries(rop, marker, max_entries, &bl, &rc);
- ioctx.operate(topic.dest.persistent_queue, &rop, nullptr);
- if (rc < 0 ) {
- cerr << "ERROR: could not list entries from queue. error: " << cpp_strerror(-ret) << std::endl;
- return -rc;
- }
- rc = cls_2pc_queue_list_entries_result(bl, queue_entries, &truncated, end_marker);
- if (rc < 0) {
- cerr << "ERROR: failed to parse list entries from queue (skipping). error: " << cpp_strerror(-ret) << std::endl;
- return -rc;
- }
-
- std::for_each(queue_entries.cbegin(),
- queue_entries.cend(),
- [&formatter](const auto& queue_entry) {
- rgw::notify::event_entry_t event_entry;
- bufferlist::const_iterator iter{&queue_entry.data};
- try {
- event_entry.decode(iter);
- encode_json("", event_entry, formatter.get());
- } catch (const buffer::error& e) {
- cerr << "ERROR: failed to decode queue entry. error: " << e.what() << std::endl;
- }
- });
- formatter->flush(cout);
- marker = end_marker;
+
+ for (const auto& shard_name: topic.dest.get_shard_names()){
+ truncated = true;
+ marker.clear();
+ while (truncated) {
+ bufferlist bl;
+ int rc;
+ cls_2pc_queue_list_entries(rop, marker, max_entries, &bl, &rc);
+ ioctx.operate(shard_name, &rop, nullptr);
+ if (rc < 0 ) {
+ cerr << "ERROR: could not list entries from queue. error: " << cpp_strerror(-ret) << std::endl;
+ return -rc;
+ }
+ rc = cls_2pc_queue_list_entries_result(bl, queue_entries, &truncated, end_marker);
+ if (rc < 0) {
+ cerr << "ERROR: failed to parse list entries from queue (skipping). error: " << cpp_strerror(-ret) << std::endl;
+ return -rc;
+ }
+ std::for_each(queue_entries.cbegin(),
+ queue_entries.cend(),
+ [&formatter](const auto& queue_entry) {
+ rgw::notify::event_entry_t event_entry;
+ bufferlist::const_iterator iter{&queue_entry.data};
+ try {
+ event_entry.decode(iter);
+ encode_json("", event_entry, formatter.get());
+ } catch (const buffer::error& e) {
+ cerr << "ERROR: failed to decode queue entry. error: " << e.what() << std::endl;
+ }
+ });
+ formatter->flush(cout);
+ marker = end_marker;
+ }
}
formatter->close_section();
formatter->flush(cout);
#define dout_subsys ceph_subsys_rgw
static constexpr std::string_view topic_tenant_delim = ":";
+static constexpr std::string_view topic_shard_delim = ".";
// format and parse topic metadata keys as tenant:name
std::string get_topic_metadata_key(std::string_view tenant,
std::string& name)
{
// expected format: tenant_name:topic_name*
+ // expected format: tenant_name:topic_name.<shard_id>
auto pos = key.find(topic_tenant_delim);
if (pos != std::string::npos) {
tenant = key.substr(0, pos);
tenant.clear();
name = key;
}
+ //remove shard id if present
+ pos = name.find_last_of(topic_shard_delim);
+ if(pos != std::string::npos){
+ name = name.substr(0, pos);
+ }
}
void set_event_id(std::string& id, const std::string& hash, const utime_t& ts) {
JSONDecoder::decode_json("retry_sleep_duration", sleep_dur, f);
retry_sleep_duration = sleep_dur == DEFAULT_CONFIG ? DEFAULT_GLOBAL_VALUE
: std::stoul(sleep_dur);
+
+}
+
+ShardNamesView rgw_pubsub_dest::get_shard_names() const {
+ const std::string base_name = persistent_queue;
+ auto get_shard_name = [base_name](uint64_t i){return i != 0 ? fmt::format("{}.{}", base_name, i) : base_name;};
+ return std::ranges::views::iota(0ul, num_shards) | std::ranges::views::transform(std::function<std::string(uint64_t)>(get_shard_name));
}
RGWPubSub::RGWPubSub(rgw::sal::Driver* _driver,
const rgw_pubsub_dest& dest = topic.dest;
if (!dest.push_endpoint.empty() && dest.persistent &&
!dest.persistent_queue.empty()) {
- ret = driver->remove_persistent_topic(dpp, y, dest.persistent_queue);
- if (ret < 0 && ret != -ENOENT) {
- ldpp_dout(dpp, 1) << "ERROR: failed to remove queue for "
- "persistent topic: " << cpp_strerror(ret) << dendl;
- return ret;
+
+ for(const auto& q: dest.get_shard_names()) {
+ ret = driver->remove_persistent_topic(dpp, y, q);
+ if (ret < 0 && ret != -ENOENT) {
+ ldpp_dout(dpp, 1) << "ERROR: failed to remove shards for "
+ "persistent topic: " << cpp_strerror(ret) << dendl;
+ return ret;
+ }
}
+ ldpp_dout(dpp, 20) << "Successfully removed " << dest.num_shards << " shards for topic: " + name << dendl;
}
ret = driver->remove_topic_v2(name, tenant, objv_tracker, y, dpp);
}
if (!t->second.dest.push_endpoint.empty() && t->second.dest.persistent &&
!t->second.dest.persistent_queue.empty()) {
- ret = driver->remove_persistent_topic(dpp, y, t->second.dest.persistent_queue);
- if (ret < 0 && ret != -ENOENT) {
- ldpp_dout(dpp, 1) << "ERROR: failed to remove queue for "
- "persistent topic: " << cpp_strerror(ret) << dendl;
- return ret;
- }
+ for(const auto& q: t->second.dest.get_shard_names()) {
+ ret = driver->remove_persistent_topic(dpp, y, q);
+ if (ret < 0 && ret != -ENOENT) {
+ ldpp_dout(dpp, 1) << "ERROR: failed to remove shards for "
+ "persistent topic: " << cpp_strerror(ret) << dendl;
+ return ret;
+ }
+ }
+ ldpp_dout(dpp, 20) << "Successfully removed " << t->second.dest.num_shards << " shards for topic: " + name << dendl;
}
topics.topics.erase(t);
#include "rgw_notify_event_type.h"
#include <boost/container/flat_map.hpp>
#include "rgw_s3_filter.h"
+#include <ranges>
class XMLObj;
// setting a unique ID for an event based on object hash and timestamp
void set_event_id(std::string& id, const std::string& hash, const utime_t& ts);
+using ShardNamesView = std::ranges::transform_view<std::ranges::iota_view<uint64_t, uint64_t>, std::function<std::string(uint64_t)>>;
+
struct rgw_pubsub_dest {
std::string push_endpoint;
std::string push_endpoint_args;
uint32_t time_to_live;
uint32_t max_retries;
uint32_t retry_sleep_duration;
+ // naming convention of sharded queues in the 'notif' pool -> persistent_queue, persistent_queue.1, persistent_queue.(num_shards -1)...
+ uint64_t num_shards; //defaults to a single shard for now, for backward compatibility
+
void encode(bufferlist& bl) const {
- ENCODE_START(7, 1, bl);
+ ENCODE_START(8, 1, bl);
encode("", bl);
encode("", bl);
encode(push_endpoint, bl);
encode(max_retries, bl);
encode(retry_sleep_duration, bl);
encode(persistent_queue, bl);
+ encode(num_shards, bl);
ENCODE_FINISH(bl);
}
void decode(bufferlist::const_iterator& bl) {
- DECODE_START(7, bl);
+ DECODE_START(8, bl);
std::string dummy;
decode(dummy, bl);
decode(dummy, bl);
decode(push_endpoint, bl);
if (struct_v >= 2) {
- decode(push_endpoint_args, bl);
+ decode(push_endpoint_args, bl);
}
if (struct_v >= 3) {
- decode(arn_topic, bl);
+ decode(arn_topic, bl);
}
if (struct_v >= 4) {
- decode(stored_secret, bl);
+ decode(stored_secret, bl);
}
if (struct_v >= 5) {
- decode(persistent, bl);
+ decode(persistent, bl);
+ }
+ else {
+ num_shards = persistent ? 1 : 0; //defaults to a single shard for backward compatibility
}
if (struct_v >= 6) {
decode(time_to_live, bl);
}
if (struct_v >= 7) {
decode(persistent_queue, bl);
- } else if (persistent) {
+ }
+ else if (persistent) {
// persistent topics created before v7 did not support tenant namespacing.
// continue to use 'arn_topic' alone as the queue's rados object name
persistent_queue = arn_topic;
}
+ if (struct_v >= 8) {
+ decode(num_shards, bl);
+ }
+
DECODE_FINISH(bl);
}
void dump_xml(Formatter *f) const;
std::string to_json_str() const;
void decode_json(JSONObj* obj);
+
+ // get the names of the shards in the persistent queue
+ ShardNamesView get_shard_names() const;
};
WRITE_CLASS_ENCODER(rgw_pubsub_dest)
}
}
- // don't add a persistent queue if we already have one
- const bool already_persistent = topic && topic_needs_queue(topic->dest);
+ // if we already have an existing persistent queue - do nothing - resharding not supported
+ // if we don't have a persistent queue already, just make shards or a single based on dest's num_shards from config
+
+ const bool already_persistent = topic && topic_needs_queue(topic->dest);
if (!already_persistent && topic_needs_queue(dest)) {
// initialize the persistent queue's location, using ':' as the namespace
// delimiter because its inclusion in a TopicName would break ARNs
+ // new persistent topics to have shards - loading num_shards from config for sharding persistent bucket notifications
+ dest.num_shards = s->get_cct()->_conf.get_val<uint64_t>("rgw_bucket_persistent_notif_num_shards");
+ if(dest.num_shards == 0) {
+ dest.num_shards = 1;
+ }
dest.persistent_queue = string_cat_reserve(
get_account_or_tenant(s->owner.id), ":", topic_name);
-
- op_ret = driver->add_persistent_topic(this, y, dest.persistent_queue);
- if (op_ret < 0) {
- ldpp_dout(this, 1) << "CreateTopic Action failed to create queue for "
- "persistent topics. error:"
- << op_ret << dendl;
- return;
+ for(const auto& q: dest.get_shard_names()){
+ ldpp_dout(this, 20) << "CreateTopic Action creating persistent topic queue: " << q << dendl;
+ op_ret = driver->add_persistent_topic(this, y, q);
+ if (op_ret < 0) {
+ ldpp_dout(this, 1) << "CreateTopic Action failed to create queue/shards for "
+ "persistent topics. error:"
+ << op_ret << dendl;
+ return;
+ }
}
+ ldpp_dout(this, 20) << "Successfully created " << dest.num_shards << " shards for topic: " << topic_name << dendl;
} else if (already_persistent) { // redundant call to CreateTopic
+ //no resharding of existing topics for persistent bucket notifications
dest.persistent_queue = topic->dest.persistent_queue;
+ dest.num_shards = topic->dest.num_shards;
+ if(s->get_cct()->_conf.get_val<uint64_t>("rgw_bucket_persistent_notif_num_shards") != dest.num_shards) {
+ ldpp_dout(this, 20) << "CreateTopic Action called for topic with existing shards, but num_shards in config is different. "
+ "Not resharding existing topic: " << topic_name << dendl;
+ } else {
+ ldpp_dout(this, 20) << "CreateTopic Action called for topic with existing. Not resharding existing topic: " << topic_name << dendl;
+ }
}
const RGWPubSub ps(driver, get_account_or_tenant(s->owner.id), *s->penv.site);
op_ret = ps.create_topic(this, topic_name, dest, topic_arn.to_string(),
// initialize the persistent queue's location, using ':' as the namespace
// delimiter because its inclusion in a TopicName would break ARNs
dest.persistent_queue = string_cat_reserve(topic_arn.account, ":", topic_name);
-
- op_ret = driver->add_persistent_topic(this, y, dest.persistent_queue);
- if (op_ret < 0) {
- ldpp_dout(this, 4)
- << "SetTopicAttributes Action failed to create queue for "
- "persistent topics. error:"
- << op_ret << dendl;
- return;
+ dest.num_shards = s->get_cct()->_conf.get_val<uint64_t>("rgw_bucket_persistent_notif_num_shards");
+ if (dest.num_shards == 0) {
+ dest.num_shards = 1;
+ }
+ for(const auto& q: dest.get_shard_names()){
+ op_ret = driver->add_persistent_topic(this, y, q);
+ if (op_ret < 0) {
+ ldpp_dout(this, 4)
+ << "SetTopicAttributes Action failed to create queue for "
+ "persistent topics. error:"
+ << op_ret << dendl;
+ return;
+ }
}
+ ldpp_dout(this, 20) << "Successfully created " << dest.num_shards <<" shards for topic: " << topic_name << dendl;
+
} else if (already_persistent && !topic_needs_queue(dest)) {
// changing the persistent topic to non-persistent.
- op_ret = driver->remove_persistent_topic(this, y, result.dest.persistent_queue);
- if (op_ret != -ENOENT && op_ret < 0) {
- ldpp_dout(this, 4) << "SetTopicAttributes Action failed to remove queue "
- "for persistent topics. error:"
- << op_ret << dendl;
- return;
+ for(const auto& q: result.dest.get_shard_names()) {
+ op_ret = driver->remove_persistent_topic(this, y, q);
+ if (op_ret != -ENOENT && op_ret < 0) {
+ ldpp_dout(this, 4) << "SetTopicAttributes Action failed to remove queue "
+ "for persistent topics. error:"
+ << op_ret << dendl;
+ return;
+ }
+ dest.num_shards = 0; // set to 0 to indicate no sharding
}
+ ldpp_dout(this, 20) << "Successfully removed " << result.dest.num_shards << " shards for topic: " << topic_name << dendl;
}
const RGWPubSub ps(driver, topic_arn.account, *s->penv.site);
op_ret = ps.create_topic(this, topic_name, dest, topic_arn.to_string(),
cmd = [test_path + 'test-rgw-call.sh', 'call_rgw_admin', cluster] + args
return bash(cmd, **kwargs)
+def ceph_admin(args, cluster='noname', **kwargs):
+ """ ceph command """
+ cmd = [test_path + 'test-rgw-call.sh', 'call_ceph', cluster] + args
+ print(' '.join(cmd))
+ return bash(cmd, **kwargs)
+
def delete_all_topics(conn, tenant, cluster):
""" delete all topics """
if tenant == '':
for topic in topics_json:
admin(['topic', 'rm', '--tenant', tenant, '--topic', topic['name']], cluster)
+def set_rgw_config_option(client, option, value, cluster='noname'):
+ """ change a config option """
+ print(f'Setting {option} to {value} for {client} in cluster {cluster}')
+ return ceph_admin(['config', 'set', client, option, str(value)], cluster)
\ No newline at end of file
delete_all_objects, \
delete_all_topics, \
put_object_tagging, \
- admin
+ admin, \
+ set_rgw_config_option, \
+ bash
from nose import SkipTest
from nose.tools import assert_not_equal, assert_equal, assert_in, assert_not_in, assert_true
entries = parsed_result['Topic Stats']['Entries']
retries += 1
time_diff = time.time() - start_time
- log.info('queue %s has %d entries after %ds', topic_name, entries, time_diff)
- if retries > 30:
- log.warning('queue %s still has %d entries after %ds', topic_name, entries, time_diff)
+ log.info('shards for %s has %d entries after %ds', topic_name, entries, time_diff)
+ if retries > 100:
+ log.warning('shards for %s still has %d entries after %ds', topic_name, entries, time_diff)
assert_equal(entries, 0)
time.sleep(5)
time_diff = time.time() - start_time
- log.info('waited for %ds for queue %s to drain', time_diff, topic_name)
+ log.info('waited for %ds for shards of %s to drain', time_diff, topic_name)
def persistent_topic_stats(conn, endpoint_type):
get_config_cluster(),
)
admin(["account", "rm", "--account-id", account_id], get_config_cluster())
+
+def persistent_notification_shard_config_change(endpoint_type, conn, new_num_shards, old_num_shards=11):
+ """ test persistent notification shard config change """
+ """ test to check if notifications work when config value for determining num_shards is changed..."""
+
+ default_num_shards = 11
+ rgw_client = f'client.rgw.{get_config_port()}'
+ if (old_num_shards != default_num_shards):
+ set_rgw_config_option(rgw_client, 'rgw_bucket_persistent_notif_num_shards', old_num_shards)
+
+ bucket_name = gen_bucket_name()
+ bucket = conn.create_bucket(bucket_name)
+ topic_name = bucket_name + TOPIC_SUFFIX
+
+ #start receiver thread based on conn type
+ # start endpoint receiver
+ host = get_ip()
+ task = None
+ port = None
+ if endpoint_type == 'http':
+ # create random port for the http server
+ port = random.randint(10000, 20000)
+ # start an http server in a separate thread
+ receiver = HTTPServerWithEvents((host, port))
+ endpoint_address = 'http://'+host+':'+str(port)
+ endpoint_args = 'push-endpoint='+endpoint_address+'&persistent=true'
+ elif endpoint_type == 'kafka':
+ # start kafka receiver
+ task, receiver = create_kafka_receiver_thread(topic_name)
+ task.start()
+ endpoint_address = 'kafka://' + host
+ endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker&persistent=true'
+ else:
+ return SkipTest('Unknown endpoint type: ' + endpoint_type)
+
+ zonegroup = get_config_zonegroup()
+ topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
+ topic_arn = topic_conf.set_config()
+
+ # create s3 notification
+ notif_1 = bucket_name + '_notif_1'
+ topic_conf_list = [{'Id': notif_1, 'TopicArn': topic_arn,
+ 'Events': ['s3:ObjectCreated:*', 's3:ObjectRemoved:*']
+ }]
+
+ s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
+ _, status = s3_notification_conf.set_config()
+ assert_equal(status/100, 2)
+
+ ## create objects in the bucket (async)
+ expected_keys = []
+ create_object_and_verify_events(bucket, 'foo', topic_name, receiver, expected_keys, deletions=True)
+
+ ## change config value for num_shards to new_num_shards
+ set_rgw_config_option(rgw_client, 'rgw_bucket_persistent_notif_num_shards', new_num_shards)
+
+ ## create objects in the bucket (async)
+ expected_keys = []
+ create_object_and_verify_events(bucket, 'bar', topic_name, receiver, expected_keys, deletions=True)
+
+ # cleanup
+ receiver.close(task)
+ s3_notification_conf.del_config()
+ topic_conf.del_config()
+ # delete the bucket
+ conn.delete_bucket(bucket_name)
+
+ ##revert config value for num_shards to default
+ if (new_num_shards != default_num_shards):
+ set_rgw_config_option(rgw_client, 'rgw_bucket_persistent_notif_num_shards', default_num_shards)
+
+
+def create_object_and_verify_events(bucket, key_name, topic_name, receiver, expected_keys, deletions=False):
+ key = bucket.new_key(key_name)
+ key.set_contents_from_string('aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa')
+ expected_keys.append(key_name)
+
+ print('wait for the messages...')
+ wait_for_queue_to_drain(topic_name)
+ events = receiver.get_and_reset_events()
+ assert_equal(len(events), len(expected_keys))
+ for event in events:
+ assert(event['Records'][0]['s3']['object']['key'] in expected_keys)
+
+ if deletions:
+ # delete objects
+ for key in bucket.list():
+ key.delete()
+ print('wait for the messages...')
+ wait_for_queue_to_drain(topic_name)
+ # check endpoint receiver
+ events = receiver.get_and_reset_events()
+ assert_equal(len(events), len(expected_keys))
+ for event in events:
+ assert(event['Records'][0]['s3']['object']['key'] in expected_keys)
+
+@attr('http_test')
+def test_backward_compatibility_persistent_sharded_topic_http():
+ conn = connection()
+ persistent_notification_shard_config_change('http', conn, new_num_shards=11, old_num_shards=1)
+
+@attr('kafka_test')
+def test_backward_compatibility_persistent_sharded_topic_kafka():
+ conn = connection()
+ persistent_notification_shard_config_change('kafka', conn, new_num_shards=11, old_num_shards=1)
+
+@attr('http_test')
+def test_persistent_sharded_topic_config_change_http():
+ conn = connection()
+ new_num_shards = random.randint(2, 10)
+ default_num_shards = 11
+ persistent_notification_shard_config_change('http', conn, new_num_shards, default_num_shards)
+
+@attr('kafka_test')
+def test_persistent_sharded_topic_config_change_kafka():
+ conn = connection()
+ new_num_shards = random.randint(2, 10)
+ default_num_shards = 11
+ persistent_notification_shard_config_change('kafka', conn, new_num_shards, default_num_shards)
\ No newline at end of file
echo "$mrgw $name $port $ssl_port $rgw_flags $@"
}
+function ceph {
+ [ $# -lt 1 ] && echo "ceph() needs atleast 1 param" && exit 1
+ echo "$mrun $1 ceph"
+}
+
function init_first_zone {
[ $# -ne 7 ] && echo "init_first_zone() needs 7 params" && exit 1
x $(rgw_rados $cid) "$@"
}
+function call_ceph {
+ cid=$1
+ shift 1
+ echo $@
+ x $(ceph $cid) "$@"
+}
+
function get_mstart_parameters {
[ $# -ne 1 ] && echo "get_mstart_parameters() needs 1 param" && exit 1
# bash arrays start from zero
echo "$parameters $VSTART_PARAMETERS"
}
-