const RGWObjTags* req_tags)
{
rgw_pubsub_bucket_topics bucket_topics;
- if (all_zonegroups_support(site, zone_features::notification_v2)) {
+ if (all_zonegroups_support(site, zone_features::notification_v2) &&
+ res.store->stat_topics_v1(res.user_tenant, res.yield, res.dpp) == -ENOENT) {
auto ret = 0;
if (!res.s) {
// for non S3-request caller (e.g., lifecycle, ObjectSync), bucket attrs
ldpp_dout(res.dpp, 1)
<< "INFO: failed to load topic: " << topic_cfg.name
<< ". error: " << ret
- << " while storing the persistent notification event" << dendl;
+ << " while resrving persistent notification event" << dendl;
if (ret == -ENOENT) {
// either the topic is deleted but the corresponding notification still
// exist or in v2 mode the notification could have synced first but
#include "rgw_acl_s3.h"
#include "rgw_aio.h"
#include "rgw_aio_throttle.h"
+#include "rgw_tools.h"
#include "rgw_tracer.h"
#include "rgw_zone.h"
return 0;
}
+int RadosStore::stat_topics_v1(const std::string& tenant, optional_yield y, const DoutPrefixProvider *dpp) {
+ return rgw_stat_system_obj(dpp, svc()->sysobj, svc()->zone->get_zone_params().log_pool, topics_oid(tenant), nullptr, nullptr, y, nullptr);
+}
+
int RadosStore::write_topics(const std::string& tenant, const rgw_pubsub_topics& topics, RGWObjVersionTracker* objv_tracker,
optional_yield y, const DoutPrefixProvider *dpp) {
bufferlist bl;
std::string& _req_id, optional_yield y) override;
int read_topics(const std::string& tenant, rgw_pubsub_topics& topics, RGWObjVersionTracker* objv_tracker,
optional_yield y, const DoutPrefixProvider *dpp) override;
+ int stat_topics_v1(const std::string& tenant, optional_yield y, const DoutPrefixProvider *dpp) override;
int write_topics(const std::string& tenant, const rgw_pubsub_topics& topics, RGWObjVersionTracker* objv_tracker,
optional_yield y, const DoutPrefixProvider *dpp) override;
int remove_topics(const std::string& tenant, RGWObjVersionTracker* objv_tracker,
cerr << "ERROR: could not init bucket: " << cpp_strerror(-ret) << std::endl;
return -ret;
}
- if (rgw::all_zonegroups_support(*site, rgw::zone_features::notification_v2)) {
+ if (rgw::all_zonegroups_support(*site, rgw::zone_features::notification_v2) &&
+ driver->stat_topics_v1(tenant, null_yield, dpp()) == -ENOENT) {
ret = get_bucket_notifications(dpp(), bucket.get(), result);
if (ret < 0) {
cerr << "ERROR: could not get topics: " << cpp_strerror(-ret)
continue;
}
std::set<std::string> subscribed_buckets;
- if (rgw::all_zonegroups_support(*site, rgw::zone_features::notification_v2)) {
+ if (rgw::all_zonegroups_support(*site, rgw::zone_features::notification_v2) &&
+ driver->stat_topics_v1(tenant, null_yield, dpp()) == -ENOENT) {
ret = driver->get_bucket_topic_mapping(topic, subscribed_buckets,
null_yield, dpp());
if (ret < 0) {
cerr << "ERROR: could not get topic: " << cpp_strerror(-ret) << std::endl;
return -ret;
}
- if (rgw::all_zonegroups_support(*site, rgw::zone_features::notification_v2)) {
+ if (rgw::all_zonegroups_support(*site, rgw::zone_features::notification_v2) &&
+ driver->stat_topics_v1(tenant, null_yield, dpp()) == -ENOENT) {
show_topics_info_v2(topic, subscribed_buckets, formatter.get());
} else {
encode_json("topic", topic, formatter.get());
return -ret;
}
rgw_pubsub_bucket_topics bucket_topics;
- if (rgw::all_zonegroups_support(*site, rgw::zone_features::notification_v2)) {
+ if (rgw::all_zonegroups_support(*site, rgw::zone_features::notification_v2) &&
+ driver->stat_topics_v1(tenant, null_yield, dpp()) == -ENOENT) {
ret = get_bucket_notifications(dpp(), bucket.get(), bucket_topics);
if (ret < 0) {
cerr << "ERROR: could not get bucket notifications: "
}
if (rgw::all_zonegroups_support(*site, rgw::zone_features::notification_v2)) {
+ if (ret = driver->stat_topics_v1(tenant, null_yield, dpp()); ret != -ENOENT) {
+ cerr << "WARNING: " << (ret == 0 ? "topic migration in process" : "cannot determine topic migration status. ret = " + std::to_string(ret))
+ << ". please try again later" << std::endl;
+ return -ret;
+ }
ret = remove_notification_v2(dpp(), driver, bucket.get(), notification_id,
null_yield);
} else {
{ ERR_INTERNAL_ERROR, {500, "InternalError" }},
{ ERR_NOT_IMPLEMENTED, {501, "NotImplemented" }},
{ ERR_SERVICE_UNAVAILABLE, {503, "ServiceUnavailable"}},
+ { EBUSY, {503, "ServiceUnavailable"}},
{ ERR_RATE_LIMITED, {503, "SlowDown"}},
{ ERR_ZERO_IN_URL, {400, "InvalidRequest" }},
{ ERR_NO_SUCH_TAG_SET, {404, "NoSuchTagSet"}},
rgw_pubsub_topics& result, std::string& next_marker,
optional_yield y) const
{
- if (!use_notification_v2) {
+ if (!use_notification_v2 || driver->stat_topics_v1(tenant, y, dpp) != -ENOENT) {
+ // in case of v1 or during migration we use v1 topics
// v1 returns all topics, ignoring marker/max_items
return read_topics_v1(dpp, result, nullptr, y);
}
-
+
// TODO: prefix filter on 'tenant:'
void* handle = NULL;
int ret = driver->meta_list_keys_init(dpp, "topic", start_marker, &handle);
RGWObjVersionTracker *objv_tracker,
optional_yield y) const
{
+ if (ps.use_notification_v2) {
+ if (const auto ret = ps.driver->stat_topics_v1(bucket->get_tenant(), y, dpp); ret != -ENOENT) {
+ ldpp_dout(dpp, 1) << "WARNING: " << (ret == 0 ? "topic migration in process" : "cannot determine topic migration status. ret = " + std::to_string(ret))
+ << ". please try again later" << dendl;
+ return -ERR_SERVICE_UNAVAILABLE;
+ }
+ }
const int ret = bucket->write_topics(topics, objv_tracker, y, dpp);
if (ret < 0) {
ldpp_dout(dpp, 1) << "ERROR: failed to write bucket topics info: ret=" << ret << dendl;
rgw_pubsub_topic& result,
optional_yield y,
std::set<std::string>* subscribed_buckets) const {
- if (use_notification_v2) {
+ if (use_notification_v2 && driver->stat_topics_v1(tenant, y, dpp) == -ENOENT) {
+ // in case of v1 or during migration we use v1 topics
int ret = driver->read_topic_v2(name, tenant, result, nullptr, y, dpp);
if (ret < 0) {
ldpp_dout(dpp, 1) << "failed to read topic info for name: " << name
const std::string& policy_text,
optional_yield y) const {
if (use_notification_v2) {
+ if (const auto ret = driver->stat_topics_v1(tenant, y, dpp); ret != -ENOENT) {
+ ldpp_dout(dpp, 1) << "WARNING: " << (ret == 0 ? "topic migration in process" : "cannot determine topic migration status. ret = " + std::to_string(ret))
+ << ". please try again later" << dendl;
+ return -ERR_SERVICE_UNAVAILABLE;
+ }
rgw_pubsub_topic new_topic;
new_topic.user = user;
new_topic.name = name;
ldpp_dout(dpp, 1) << "ERROR: failed to write topics info: ret=" << ret << dendl;
return ret;
}
+ ldpp_dout(dpp, 1) << "INFO: successfully created v1 topic" << dendl;
return 0;
}
int RGWPubSub::remove_topic(const DoutPrefixProvider *dpp, const std::string& name, optional_yield y) const
{
if (use_notification_v2) {
+ if (const auto ret = driver->stat_topics_v1(tenant, y, dpp); ret != -ENOENT) {
+ ldpp_dout(dpp, 1) << "WARNING: " << (ret == 0 ? "topic migration in process" : "cannot determine topic migration status. ret = " + std::to_string(ret))
+ << ". please try again later" << dendl;
+ return -ERR_SERVICE_UNAVAILABLE;
+ }
return remove_topic_v2(dpp, name, y);
}
RGWObjVersionTracker objv_tracker;
}
}
+ if (const auto ret = driver->stat_topics_v1(s->bucket_tenant, y, this); ret != -ENOENT) {
+ ldpp_dout(this, 1) << "WARNING: " << (ret == 0 ? "topic migration in process" : "cannot determine topic migration status. ret = " + std::to_string(ret))
+ << ". please try again later" << dendl;
+ op_ret = -ERR_SERVICE_UNAVAILABLE;
+ return;
+ }
+
std::unique_ptr<rgw::sal::Bucket> bucket;
op_ret = driver->load_bucket(this, rgw_bucket(s->bucket_tenant, s->bucket_name),
&bucket, y);
}
}
+ if (const auto ret = driver->stat_topics_v1(s->bucket_tenant, y, this); ret != -ENOENT) {
+ ldpp_dout(this, 1) << "WARNING: " << (ret == 0 ? "topic migration in process" : "cannot determine topic migration status. ret = " + std::to_string(ret))
+ << ". please try again later" << dendl;
+ op_ret = -ERR_SERVICE_UNAVAILABLE;
+ return;
+ }
+
std::unique_ptr<rgw::sal::Bucket> bucket;
op_ret = driver->load_bucket(this, rgw_bucket(s->bucket_tenant, s->bucket_name),
&bucket, y);
// get all topics on a bucket
rgw_pubsub_bucket_topics bucket_topics;
- if (rgw::all_zonegroups_support(*s->penv.site, rgw::zone_features::notification_v2)) {
+ if (rgw::all_zonegroups_support(*s->penv.site, rgw::zone_features::notification_v2) &&
+ driver->stat_topics_v1(s->bucket_tenant, y, this) == -ENOENT) {
op_ret = get_bucket_notifications(this, bucket.get(), bucket_topics);
} else {
const RGWPubSub ps(driver, s->owner.id.tenant);
/** Read the topic config entry into @a data and (optionally) @a objv_tracker */
virtual int read_topics(const std::string& tenant, rgw_pubsub_topics& topics, RGWObjVersionTracker* objv_tracker,
optional_yield y, const DoutPrefixProvider *dpp) = 0;
+ /** check if the v1 topics object exists */
+ virtual int stat_topics_v1(const std::string& tenant, optional_yield y, const DoutPrefixProvider *dpp) = 0;
/** Write @a info and (optionally) @a objv_tracker into the config */
virtual int write_topics(const std::string& tenant, const rgw_pubsub_topics& topics, RGWObjVersionTracker* objv_tracker,
optional_yield y, const DoutPrefixProvider *dpp) = 0;
optional_yield y, const DoutPrefixProvider *dpp) override {
return next->read_topics(tenant, topics, objv_tracker, y, dpp);
}
+ int stat_topics_v1(const std::string& tenant, optional_yield y, const DoutPrefixProvider *dpp) override {
+ return next->stat_topics_v1(tenant, y, dpp);
+ }
int write_topics(const std::string& tenant, const rgw_pubsub_topics& topics, RGWObjVersionTracker* objv_tracker,
optional_yield y, const DoutPrefixProvider *dpp) override {
return next->write_topics(tenant, topics, objv_tracker, y, dpp);
int read_topics(const std::string& tenant, rgw_pubsub_topics& topics, RGWObjVersionTracker* objv_tracker,
optional_yield y, const DoutPrefixProvider *dpp) override {return -EOPNOTSUPP;}
+ int stat_topics_v1(const std::string& tenant, optional_yield y, const DoutPrefixProvider *dpp) override {return -EOPNOTSUPP;}
int write_topics(const std::string& tenant, const rgw_pubsub_topics& topics, RGWObjVersionTracker* objv_tracker,
optional_yield y, const DoutPrefixProvider *dpp) override {return -ENOENT;}
int remove_topics(const std::string& tenant, RGWObjVersionTracker* objv_tracker,