return std::make_unique<DaosZoneGroup>(store, group);
}
const RGWZoneGroup& get_group() { return group; }
- virtual bool supports_feature(std::string_view feature) const override {
- return group.supports(feature);
- }
};
class DaosZone : public StoreZone {
virtual std::unique_ptr<ZoneGroup> clone() override {
return std::make_unique<MotrZoneGroup>(store, group);
}
- virtual bool supports_feature(std::string_view feature) const override {
- return group.supports(feature);
- }
friend class MotrZone;
};
req_id, null_yield);
auto notify_res = static_cast<rgw::sal::RadosNotification*>(notify.get())->get_reservation();
- int ret = rgw::notify::publish_reserve(dpp, rgw::notify::ObjectSyncedCreate, notify_res, &obj_tags);
+ int ret = rgw::notify::publish_reserve(dpp, *store->svc()->site, rgw::notify::ObjectSyncedCreate, notify_res, &obj_tags);
if (ret < 0) {
ldpp_dout(dpp, 1) << "ERROR: reserving notification failed, with error: " << ret << dendl;
// no need to return, the sync already happened
const uint32_t stale_reservations_period_s;
const uint32_t reservations_cleanup_period_s;
queues_persistency_tracker topics_persistency_tracker;
+ const SiteConfig& site;
public:
rgw::sal::RadosStore& rados_store;
std::string tenant_name;
// TODO: extract tenant name from queue_name once it is fixed
uint64_t size_to_migrate = 0;
- RGWPubSub ps(&rados_store, tenant_name);
+ RGWPubSub ps(&rados_store, tenant_name, site);
rgw_pubsub_topic topic;
auto ret_of_get_topic = ps.get_topic(this, queue_name, topic,
Manager(CephContext* _cct, uint32_t _max_queue_size, uint32_t _queues_update_period_ms,
uint32_t _queues_update_retry_ms, uint32_t _queue_idle_sleep_us, u_int32_t failover_time_ms,
uint32_t _stale_reservations_period_s, uint32_t _reservations_cleanup_period_s,
- uint32_t _worker_count, rgw::sal::RadosStore* store) :
+ uint32_t _worker_count, rgw::sal::RadosStore* store,
+ const SiteConfig& site) :
max_queue_size(_max_queue_size),
queues_update_period_ms(_queues_update_period_ms),
queues_update_retry_ms(_queues_update_retry_ms),
worker_count(_worker_count),
stale_reservations_period_s(_stale_reservations_period_s),
reservations_cleanup_period_s(_reservations_cleanup_period_s),
+ site(site),
rados_store(*store)
{
spawn::spawn(io_context, [this](spawn::yield_context yield) {
constexpr uint32_t STALE_RESERVATIONS_PERIOD_S = 120; // cleanup reservations that are more than 2 minutes old
constexpr uint32_t RESERVATIONS_CLEANUP_PERIOD_S = 30; // reservation cleanup every 30 seconds
-bool init(CephContext* cct, rgw::sal::RadosStore* store, const DoutPrefixProvider *dpp) {
+bool init(CephContext* cct, rgw::sal::RadosStore* store,
+ const SiteConfig& site, const DoutPrefixProvider *dpp) {
if (s_manager) {
return false;
}
IDLE_TIMEOUT_USEC, FAILOVER_TIME_MSEC,
STALE_RESERVATIONS_PERIOD_S, RESERVATIONS_CLEANUP_PERIOD_S,
WORKER_COUNT,
- store);
+ store, site);
return true;
}
}
int publish_reserve(const DoutPrefixProvider* dpp,
+ const SiteConfig& site,
EventType event_type,
reservation_t& res,
const RGWObjTags* req_tags)
{
rgw_pubsub_bucket_topics bucket_topics;
- if (do_all_zonegroups_support_notification_v2(
- res.store->svc()->zone->get_current_period().get_map().zonegroups)) {
+ if (all_zonegroups_support(site, zone_features::notification_v2)) {
auto ret = 0;
if (!res.s) {
// for non S3-request caller (e.g., lifecycle, ObjectSync), bucket attrs
return ret;
}
} else {
- const RGWPubSub ps(res.store, res.user_tenant);
+ const RGWPubSub ps(res.store, res.user_tenant, site);
const RGWPubSub::Bucket ps_bucket(ps, res.bucket);
auto rc = ps_bucket.get_topics(res.dpp, bucket_topics, res.yield);
if (rc < 0) {
// load the topic,if there is change in topic config while it's stored in
// notification.
rgw_pubsub_topic result;
- const RGWPubSub ps(
- res.store, res.user_tenant,
- &res.store->svc()->zone->get_current_period().get_map().zonegroups);
+ const RGWPubSub ps(res.store, res.user_tenant, site);
auto ret = ps.get_topic(res.dpp, topic_cfg.name, result, res.yield, nullptr);
if (ret < 0) {
ldpp_dout(res.dpp, 1)
#include "rgw_pubsub.h"
// forward declarations
+namespace rgw { class SiteConfig; }
namespace rgw::sal {
class RadosStore;
class RGWObject;
// initialize the notification manager
// notification manager is dequeuing the 2-phase-commit queues
// and send the notifications to the endpoints
-bool init(CephContext* cct, rgw::sal::RadosStore* store, const DoutPrefixProvider *dpp);
+bool init(CephContext* cct, rgw::sal::RadosStore* store,
+ const rgw::SiteConfig& site, const DoutPrefixProvider *dpp);
// shutdown the notification manager
void shutdown();
// create a reservation on the 2-phase-commit queue
int publish_reserve(const DoutPrefixProvider *dpp,
+ const SiteConfig& site,
EventType event_type,
reservation_t& reservation,
const RGWObjTags* req_tags);
index_completion_manager = new RGWIndexCompletionManager(this);
if (run_notification_thread) {
- ret = rgw::notify::init(cct, driver, dpp);
+ ret = rgw::notify::init(cct, driver, *svc.site, dpp);
if (ret < 0 ) {
ldpp_dout(dpp, 1) << "ERROR: failed to initialize notification manager" << dendl;
}
// if bucket has notification definitions associated with it
// they should be removed (note that any pending notifications on the bucket are still going to be sent)
- const RGWPubSub ps(store, info.owner.tenant);
+ const RGWPubSub ps(store, info.owner.tenant, *store->svc()->site);
const RGWPubSub::Bucket ps_bucket(ps, this);
const auto ps_ret = ps_bucket.remove_notifications(dpp, y);
if (ps_ret < 0 && ps_ret != -ENOENT) {
int RadosNotification::publish_reserve(const DoutPrefixProvider *dpp, RGWObjTags* obj_tags)
{
- return rgw::notify::publish_reserve(dpp, event_type, res, obj_tags);
+ return rgw::notify::publish_reserve(dpp, *store->svc()->site, event_type, res, obj_tags);
}
int RadosNotification::publish_commit(const DoutPrefixProvider* dpp, uint64_t size,
virtual std::unique_ptr<ZoneGroup> clone() override {
return std::make_unique<RadosZoneGroup>(store, group);
}
- virtual bool supports_feature(std::string_view feature) const override {
- return group.supports(feature);
- }
const RGWZoneGroup& get_group() const { return group; }
};
return &i->second;
}
+bool all_zonegroups_support(const SiteConfig& site, std::string_view feature)
+{
+ const auto& period = site.get_period();
+ if (!period) {
+ // if we're not in a realm, just check the local zonegroup
+ return site.get_zonegroup().supports(feature);
+ }
+ const auto& zgs = period->period_map.zonegroups;
+ return std::all_of(zgs.begin(), zgs.end(), [feature] (const auto& pair) {
+ return pair.second.supports(feature);
+ });
+}
+
static int read_or_create_default_zone(const DoutPrefixProvider* dpp,
optional_yield y,
sal::ConfigStore* cfgstore,
};
+/// Test whether all zonegroups in the realm support the given zone feature.
+bool all_zonegroups_support(const SiteConfig& site, std::string_view feature);
+
} // namespace rgw
cerr << "ERROR: could not init bucket: " << cpp_strerror(-ret) << std::endl;
return -ret;
}
- if (driver->get_zone()->get_zonegroup().supports_feature(
- rgw::zone_features::notification_v2)) {
+ if (rgw::all_zonegroups_support(*site, rgw::zone_features::notification_v2)) {
ret = get_bucket_notifications(dpp(), bucket.get(), result);
if (ret < 0) {
cerr << "ERROR: could not get topics: " << cpp_strerror(-ret)
return -ret;
}
} else {
- RGWPubSub ps(driver, tenant);
+ RGWPubSub ps(driver, tenant, *site);
const RGWPubSub::Bucket b(ps, bucket.get());
ret = b.get_topics(dpp(), result, null_yield);
if (ret < 0 && ret != -ENOENT) {
}
if (opt_cmd == OPT::PUBSUB_TOPIC_LIST) {
- RGWPubSub ps(driver, tenant, &site->get_period()->get_map().zonegroups);
+ RGWPubSub ps(driver, tenant, *site);
rgw_pubsub_topics result;
ret = ps.get_topics(dpp(), result, null_yield);
if (ret < 0 && ret != -ENOENT) {
}
}
}
- if (driver->get_zone()->get_zonegroup().supports_feature(
- rgw::zone_features::notification_v2)) {
+ if (rgw::all_zonegroups_support(*site, rgw::zone_features::notification_v2)) {
Formatter::ObjectSection top_section(*formatter, "result");
Formatter::ArraySection s(*formatter, "topics");
for (const auto& [_, topic] : result.topics) {
cerr << "ERROR: topic name was not provided (via --topic)" << std::endl;
return EINVAL;
}
- RGWPubSub ps(driver, tenant, &site->get_period()->get_map().zonegroups);
+ RGWPubSub ps(driver, tenant, *site);
rgw_pubsub_topic topic;
std::set<std::string> subscribed_buckets;
cerr << "ERROR: could not get topic: " << cpp_strerror(-ret) << std::endl;
return -ret;
}
- if (driver->get_zone()->get_zonegroup().supports_feature(
- rgw::zone_features::notification_v2)) {
+ if (rgw::all_zonegroups_support(*site, rgw::zone_features::notification_v2)) {
show_topics_info_v2(topic, subscribed_buckets, formatter.get());
} else {
encode_json("topic", topic, formatter.get());
return -ret;
}
rgw_pubsub_bucket_topics bucket_topics;
- if (driver->get_zone()->get_zonegroup().supports_feature(
- rgw::zone_features::notification_v2)) {
+ if (rgw::all_zonegroups_support(*site, rgw::zone_features::notification_v2)) {
ret = get_bucket_notifications(dpp(), bucket.get(), bucket_topics);
if (ret < 0) {
cerr << "ERROR: could not get bucket notifications: "
return -ret;
}
} else {
- RGWPubSub ps(driver, tenant);
+ RGWPubSub ps(driver, tenant, *site);
const RGWPubSub::Bucket b(ps, bucket.get());
ret = b.get_topics(dpp(), bucket_topics, null_yield);
if (ret < 0 && ret != -ENOENT) {
return -ret;
}
- RGWPubSub ps(driver, tenant, &site->get_period()->get_map().zonegroups);
+ RGWPubSub ps(driver, tenant, *site);
ret = ps.remove_topic(dpp(), topic_name, null_yield);
if (ret < 0) {
return -ret;
}
- if (driver->get_zone()->get_zonegroup().supports_feature(
- rgw::zone_features::notification_v2)) {
+ if (rgw::all_zonegroups_support(*site, rgw::zone_features::notification_v2)) {
ret = remove_notification_v2(dpp(), driver, bucket.get(), notification_id,
null_yield);
} else {
- RGWPubSub ps(driver, tenant);
+ RGWPubSub ps(driver, tenant, *site);
rgw_pubsub_bucket_topics bucket_topics;
const RGWPubSub::Bucket b(ps, bucket.get());
RGWPubSub::RGWPubSub(rgw::sal::Driver* _driver,
const std::string& _tenant,
- const std::map<std::string, RGWZoneGroup>* _zonegroups)
- : driver(_driver), tenant(_tenant), zonegroups(_zonegroups) {
- use_notification_v2 = do_all_zonegroups_support_notification_v2(*zonegroups);
+ const rgw::SiteConfig& site)
+ : driver(_driver), tenant(_tenant),
+ use_notification_v2(rgw::all_zonegroups_support(site, rgw::zone_features::notification_v2))
+{
}
int RGWPubSub::read_topics(const DoutPrefixProvider *dpp, rgw_pubsub_topics& result,
return 0;
}
-bool do_all_zonegroups_support_notification_v2(
- std::map<std::string, RGWZoneGroup> zonegroups) {
- for (const auto& [_, zonegroup] : zonegroups) {
- if (!zonegroup.supports(rgw::zone_features::notification_v2)) {
- return false;
- }
- }
- return true;
-}
-
std::string topic_to_unique(const std::string& topic,
const std::string& notification) {
return notification + "_" + topic;
rgw::sal::Driver* const driver;
const std::string tenant;
- const std::map<std::string, RGWZoneGroup>* zonegroups = nullptr;
bool use_notification_v2 = false;
int read_topics(const DoutPrefixProvider *dpp, rgw_pubsub_topics& result,
RGWPubSub(rgw::sal::Driver* _driver,
const std::string& _tenant,
- const std::map<std::string, RGWZoneGroup>* zonegroups);
+ const rgw::SiteConfig& site);
class Bucket {
friend class RGWPubSub;
constexpr static const std::string_view DEFAULT_CONFIG{"None"};
}
-bool do_all_zonegroups_support_notification_v2(
- std::map<std::string, RGWZoneGroup> zonegroups);
-
std::string topic_to_unique(const std::string& topic,
const std::string& notification);
return ret;
}
- const RGWPubSub ps(driver, s->owner.id.tenant,
- &s->penv.site->get_period()->get_map().zonegroups);
+ const RGWPubSub ps(driver, s->owner.id.tenant, *s->penv.site);
rgw_pubsub_topic result;
ret = ps.get_topic(this, topic_name, result, y, nullptr);
if (ret == -ENOENT) {
return;
}
}
- const RGWPubSub ps(driver, s->owner.id.tenant,
- &s->penv.site->get_period()->get_map().zonegroups);
+ const RGWPubSub ps(driver, s->owner.id.tenant, *s->penv.site);
op_ret = ps.create_topic(this, topic_name, dest, topic_arn, opaque_data,
s->owner.id, policy_text, y);
if (op_ret < 0) {
};
void RGWPSListTopicsOp::execute(optional_yield y) {
- const RGWPubSub ps(driver, s->owner.id.tenant,
- &s->penv.site->get_period()->get_map().zonegroups);
+ const RGWPubSub ps(driver, s->owner.id.tenant, *s->penv.site);
op_ret = ps.get_topics(this, result, y);
// if there are no topics it is not considered an error
op_ret = op_ret == -ENOENT ? 0 : op_ret;
if (op_ret < 0) {
return;
}
- const RGWPubSub ps(driver, s->owner.id.tenant,
- &s->penv.site->get_period()->get_map().zonegroups);
+ const RGWPubSub ps(driver, s->owner.id.tenant, *s->penv.site);
op_ret = ps.get_topic(this, topic_name, result, y, nullptr);
if (op_ret < 0) {
ldpp_dout(this, 1) << "failed to get topic '" << topic_name << "', ret=" << op_ret << dendl;
if (op_ret < 0) {
return;
}
- const RGWPubSub ps(driver, s->owner.id.tenant,
- &s->penv.site->get_period()->get_map().zonegroups);
+ const RGWPubSub ps(driver, s->owner.id.tenant, *s->penv.site);
op_ret = ps.get_topic(this, topic_name, result, y, nullptr);
if (op_ret < 0) {
ldpp_dout(this, 1) << "failed to get topic '" << topic_name << "', ret=" << op_ret << dendl;
return ret;
}
rgw_pubsub_topic result;
- const RGWPubSub ps(driver, s->owner.id.tenant,
- &s->penv.site->get_period()->get_map().zonegroups);
+ const RGWPubSub ps(driver, s->owner.id.tenant, *s->penv.site);
ret = ps.get_topic(this, topic_name, result, y, nullptr);
if (ret < 0) {
ldpp_dout(this, 1) << "failed to get topic '" << topic_name
return;
}
}
- const RGWPubSub ps(driver, s->owner.id.tenant,
- &s->penv.site->get_period()->get_map().zonegroups);
+ const RGWPubSub ps(driver, s->owner.id.tenant, *s->penv.site);
op_ret = ps.create_topic(this, topic_name, dest, topic_arn, opaque_data,
topic_owner, policy_text, y);
if (op_ret < 0) {
return;
}
}
- const RGWPubSub ps(driver, s->owner.id.tenant,
- &s->penv.site->get_period()->get_map().zonegroups);
+ const RGWPubSub ps(driver, s->owner.id.tenant, *s->penv.site);
rgw_pubsub_topic result;
op_ret = ps.get_topic(this, topic_name, result, y, nullptr);
};
void RGWPSCreateNotifOp::execute(optional_yield y) {
- if (do_all_zonegroups_support_notification_v2(
- s->penv.site->get_period()->get_map().zonegroups)) {
+ if (rgw::all_zonegroups_support(*s->penv.site, rgw::zone_features::notification_v2)) {
return execute_v2(y);
}
op_ret = verify_params();
<< "' , ret = " << op_ret << dendl;
return;
}
- const RGWPubSub ps(driver, s->owner.id.tenant,
- &s->penv.site->get_period()->get_map().zonegroups);
+ const RGWPubSub ps(driver, s->owner.id.tenant, *s->penv.site);
std::unordered_map<std::string, rgw_pubsub_topic> topics;
for (const auto& c : configurations.list) {
const auto& notif_name = c.id;
};
void RGWPSDeleteNotifOp::execute(optional_yield y) {
- if (do_all_zonegroups_support_notification_v2(
- s->penv.site->get_period()->get_map().zonegroups)) {
+ if (rgw::all_zonegroups_support(*s->penv.site, rgw::zone_features::notification_v2)) {
return execute_v2(y);
}
std::string notif_name;
// get all topics on a bucket
rgw_pubsub_bucket_topics bucket_topics;
- if (do_all_zonegroups_support_notification_v2(
- s->penv.site->get_period()->get_map().zonegroups)) {
+ if (rgw::all_zonegroups_support(*s->penv.site, rgw::zone_features::notification_v2)) {
op_ret = get_bucket_notifications(this, bucket.get(), bucket_topics);
} else {
const RGWPubSub ps(driver, s->owner.id.tenant);
virtual int list_zones(std::list<std::string>& zone_ids) = 0;
/** Clone a copy of this zonegroup. */
virtual std::unique_ptr<ZoneGroup> clone() = 0;
- /** Determine if zonegroup |feature| is supported.*/
- virtual bool supports_feature(std::string_view feature) const = 0;
};
/**
std::unique_ptr<RGWZoneGroup>zg = std::make_unique<RGWZoneGroup>(*group.get());
return std::make_unique<DBZoneGroup>(store, std::move(zg));
}
- virtual bool supports_feature(std::string_view feature) const override {
- return group->supports(feature);
- }
};
class DBZone : public StoreZone {
std::unique_ptr<ZoneGroup> nzg = next->clone();
return std::make_unique<FilterZoneGroup>(std::move(nzg));
}
- virtual bool supports_feature(std::string_view feature) const override {
- return next->supports_feature(feature);
- }
};
class FilterZone : public Zone {