notifications needs to pull them (instead of the notifications be pushed
to it), an external message bus (e.g. rabbitmq, Kafka) should be used for
that purpose.
+* RGW: The serialized format of notification and topics has changed, so that
+ new/updated topics will be unreadable by old RGWs. We recommend completing
+ the RGW upgrades before creating or modifying any notification topics.
* RBD: Trailing newline in passphrase files (`<passphrase-file>` argument in
`rbd encryption format` command and `--encryption-passphrase-file` option
in other commands) is no longer stripped.
reservation_t& res,
const RGWObjTags* req_tags)
{
- RGWPubSub ps(res.store, res.user_tenant);
- RGWPubSub::Bucket ps_bucket(&ps, res.bucket->get_key());
+ const RGWPubSub ps(res.store, res.user_tenant);
+ const RGWPubSub::Bucket ps_bucket(ps, res.bucket->get_key());
rgw_pubsub_bucket_topics bucket_topics;
auto rc = ps_bucket.get_topics(&bucket_topics);
if (rc < 0) {
#define dout_subsys ceph_subsys_rgw
-using namespace std;
void set_event_id(std::string& id, const std::string& hash, const utime_t& ts) {
char buf[64];
const auto len = snprintf(buf, sizeof(buf), "%010ld.%06ld.%s", (long)ts.sec(), (long)ts.usec(), hash.c_str());
// object metadata/tags may include items not in the filter
for (auto& filter : filter.kv) {
auto result = kv.equal_range(filter.first);
- if (std::any_of(result.first, result.second, [&filter](const pair<string,string>& p) { return p.second == filter.second;}))
+ if (std::any_of(result.first, result.second, [&filter](const std::pair<std::string, std::string>& p) { return p.second == filter.second;}))
continue;
else
return false;
return true;
}
-void do_decode_xml_obj(rgw::notify::EventTypeList& l, const string& name, XMLObj *obj) {
+void do_decode_xml_obj(rgw::notify::EventTypeList& l, const std::string& name, XMLObj *obj) {
l.clear();
XMLObjIter iter = obj->find(name);
int RGWPubSub::remove(const DoutPrefixProvider *dpp,
const rgw_raw_obj& obj,
RGWObjVersionTracker *objv_tracker,
- optional_yield y)
+ optional_yield y) const
{
int ret = rgw_delete_system_obj(dpp, store->svc()->sysobj, obj.pool, obj.oid, objv_tracker, y);
if (ret < 0) {
return 0;
}
-int RGWPubSub::read_topics(rgw_pubsub_topics *result, RGWObjVersionTracker *objv_tracker)
+int RGWPubSub::read_topics(rgw_pubsub_topics *result, RGWObjVersionTracker *objv_tracker) const
{
int ret = read(meta_obj, result, objv_tracker);
if (ret < 0) {
}
int RGWPubSub::write_topics(const DoutPrefixProvider *dpp, const rgw_pubsub_topics& topics,
- RGWObjVersionTracker *objv_tracker, optional_yield y)
+ RGWObjVersionTracker *objv_tracker, optional_yield y) const
{
int ret = write(dpp, meta_obj, topics, objv_tracker, y);
if (ret < 0 && ret != -ENOENT) {
return 0;
}
-int RGWPubSub::get_topics(rgw_pubsub_topics *result)
+int RGWPubSub::get_topics(rgw_pubsub_topics *result) const
{
return read_topics(result, nullptr);
}
-int RGWPubSub::Bucket::read_topics(rgw_pubsub_bucket_topics *result, RGWObjVersionTracker *objv_tracker)
+int RGWPubSub::Bucket::read_topics(rgw_pubsub_bucket_topics *result, RGWObjVersionTracker *objv_tracker) const
{
- int ret = ps->read(bucket_meta_obj, result, objv_tracker);
+ int ret = ps.read(bucket_meta_obj, result, objv_tracker);
if (ret < 0 && ret != -ENOENT) {
- ldout(ps->store->ctx(), 1) << "ERROR: failed to read bucket topics info: ret=" << ret << dendl;
+ ldout(ps.store->ctx(), 1) << "ERROR: failed to read bucket topics info: ret=" << ret << dendl;
return ret;
}
return 0;
int RGWPubSub::Bucket::write_topics(const DoutPrefixProvider *dpp, const rgw_pubsub_bucket_topics& topics,
RGWObjVersionTracker *objv_tracker,
- optional_yield y)
+ optional_yield y) const
{
- int ret = ps->write(dpp, bucket_meta_obj, topics, objv_tracker, y);
+ int ret = ps.write(dpp, bucket_meta_obj, topics, objv_tracker, y);
if (ret < 0) {
- ldout(ps->store->ctx(), 1) << "ERROR: failed to write bucket topics info: ret=" << ret << dendl;
+ ldout(ps.store->ctx(), 1) << "ERROR: failed to write bucket topics info: ret=" << ret << dendl;
return ret;
}
return 0;
}
-int RGWPubSub::Bucket::get_topics(rgw_pubsub_bucket_topics *result)
+int RGWPubSub::Bucket::get_topics(rgw_pubsub_bucket_topics *result) const
{
return read_topics(result, nullptr);
}
-int RGWPubSub::get_topic(const string& name, rgw_pubsub_topic *result)
+int RGWPubSub::get_topic(const std::string& name, rgw_pubsub_topic *result) const
{
rgw_pubsub_topics topics;
int ret = get_topics(&topics);
return 0;
}
-int RGWPubSub::Bucket::create_notification(const DoutPrefixProvider *dpp, const string& topic_name, const rgw::notify::EventTypeList& events, optional_yield y) {
+int RGWPubSub::Bucket::create_notification(const DoutPrefixProvider *dpp, const std::string& topic_name,
+ const rgw::notify::EventTypeList& events, optional_yield y) const {
return create_notification(dpp, topic_name, events, std::nullopt, "", y);
}
-int RGWPubSub::Bucket::create_notification(const DoutPrefixProvider *dpp, const string& topic_name,const rgw::notify::EventTypeList& events, OptionalFilter s3_filter, const std::string& notif_name, optional_yield y) {
+int RGWPubSub::Bucket::create_notification(const DoutPrefixProvider *dpp, const std::string& topic_name,
+ const rgw::notify::EventTypeList& events, OptionalFilter s3_filter, const std::string& notif_name, optional_yield y) const {
rgw_pubsub_topic topic_info;
- int ret = ps->get_topic(topic_name, &topic_info);
+ int ret = ps.get_topic(topic_name, &topic_info);
if (ret < 0) {
ldpp_dout(dpp, 1) << "ERROR: failed to read topic '" << topic_name << "' info: ret=" << ret << dendl;
return ret;
return 0;
}
-int RGWPubSub::Bucket::remove_notification(const DoutPrefixProvider *dpp, const string& topic_name, optional_yield y)
+int RGWPubSub::Bucket::remove_notification(const DoutPrefixProvider *dpp, const std::string& topic_name, optional_yield y) const
{
rgw_pubsub_topic topic_info;
- int ret = ps->get_topic(topic_name, &topic_info);
+ int ret = ps.get_topic(topic_name, &topic_info);
if (ret < 0) {
ldpp_dout(dpp, 1) << "ERROR: failed to read topic info: ret=" << ret << dendl;
return ret;
if (bucket_topics.topics.empty()) {
// no more topics - delete the notification object of the bucket
- ret = ps->remove(dpp, bucket_meta_obj, &objv_tracker, y);
+ ret = ps.remove(dpp, bucket_meta_obj, &objv_tracker, y);
if (ret < 0 && ret != -ENOENT) {
ldpp_dout(dpp, 1) << "ERROR: failed to remove bucket topics: ret=" << ret << dendl;
return ret;
return 0;
}
-int RGWPubSub::Bucket::remove_notifications(const DoutPrefixProvider *dpp, optional_yield y)
+int RGWPubSub::Bucket::remove_notifications(const DoutPrefixProvider *dpp, optional_yield y) const
{
// get all topics on a bucket
rgw_pubsub_bucket_topics bucket_topics;
// remove all auto-genrated topics
for (const auto& topic : bucket_topics.topics) {
const auto& topic_name = topic.first;
- ret = ps->remove_topic(dpp, topic_name, y);
+ ret = ps.remove_topic(dpp, topic_name, y);
if (ret < 0 && ret != -ENOENT) {
ldpp_dout(dpp, 5) << "WARNING: failed to remove auto-generated topic '" << topic_name << "', ret=" << ret << dendl;
}
}
// delete the notification object of the bucket
- ret = ps->remove(dpp, bucket_meta_obj, nullptr, y);
+ ret = ps.remove(dpp, bucket_meta_obj, nullptr, y);
if (ret < 0 && ret != -ENOENT) {
ldpp_dout(dpp, 1) << "ERROR: failed to remove bucket topics: ret=" << ret << dendl;
return ret;
return 0;
}
-int RGWPubSub::create_topic(const DoutPrefixProvider *dpp, const string& name, optional_yield y) {
+int RGWPubSub::create_topic(const DoutPrefixProvider *dpp, const std::string& name, optional_yield y) const {
return create_topic(dpp, name, rgw_pubsub_dest{}, "", "", y);
}
-int RGWPubSub::create_topic(const DoutPrefixProvider *dpp, const string& name, const rgw_pubsub_dest& dest, const std::string& arn, const std::string& opaque_data, optional_yield y) {
+int RGWPubSub::create_topic(const DoutPrefixProvider *dpp, const std::string& name, const rgw_pubsub_dest& dest,
+ const std::string& arn, const std::string& opaque_data, optional_yield y) const {
RGWObjVersionTracker objv_tracker;
rgw_pubsub_topics topics;
return 0;
}
-int RGWPubSub::remove_topic(const DoutPrefixProvider *dpp, const string& name, optional_yield y)
+int RGWPubSub::remove_topic(const DoutPrefixProvider *dpp, const std::string& name, optional_yield y) const
{
RGWObjVersionTracker objv_tracker;
rgw_pubsub_topics topics;
{
friend class Bucket;
- rgw::sal::RadosStore* store;
+ rgw::sal::RadosStore* const store;
const std::string tenant;
- RGWSI_SysObj* svc_sysobj;
+ RGWSI_SysObj* const svc_sysobj;
rgw_raw_obj meta_obj;
}
template <class T>
- int read(const rgw_raw_obj& obj, T* data, RGWObjVersionTracker* objv_tracker);
+ int read(const rgw_raw_obj& obj, T* data, RGWObjVersionTracker* objv_tracker) const;
template <class T>
int write(const DoutPrefixProvider *dpp, const rgw_raw_obj& obj, const T& info,
- RGWObjVersionTracker* obj_tracker, optional_yield y);
+ RGWObjVersionTracker* obj_tracker, optional_yield y) const;
int remove(const DoutPrefixProvider *dpp, const rgw_raw_obj& obj, RGWObjVersionTracker* objv_tracker,
- optional_yield y);
+ optional_yield y) const;
- int read_topics(rgw_pubsub_topics *result, RGWObjVersionTracker* objv_tracker);
+ int read_topics(rgw_pubsub_topics *result, RGWObjVersionTracker* objv_tracker) const;
int write_topics(const DoutPrefixProvider *dpp, const rgw_pubsub_topics& topics,
- RGWObjVersionTracker* objv_tracker, optional_yield y);
+ RGWObjVersionTracker* objv_tracker, optional_yield y) const;
public:
RGWPubSub(rgw::sal::RadosStore* _store, const std::string& tenant);
class Bucket {
friend class RGWPubSub;
- RGWPubSub *ps;
- rgw_bucket bucket;
+ const RGWPubSub& ps;
+ const rgw_bucket& bucket;
rgw_raw_obj bucket_meta_obj;
// read the list of topics associated with a bucket and populate into result
// use version tacker to enforce atomicity between read/write
// return 0 on success or if no topic was associated with the bucket, error code otherwise
- int read_topics(rgw_pubsub_bucket_topics *result, RGWObjVersionTracker* objv_tracker);
+ int read_topics(rgw_pubsub_bucket_topics *result, RGWObjVersionTracker* objv_tracker) const;
// set the list of topics associated with a bucket
// use version tacker to enforce atomicity between read/write
// return 0 on success, error code otherwise
int write_topics(const DoutPrefixProvider *dpp, const rgw_pubsub_bucket_topics& topics,
- RGWObjVersionTracker* objv_tracker, optional_yield y);
+ RGWObjVersionTracker* objv_tracker, optional_yield y) const;
public:
- Bucket(RGWPubSub *_ps, const rgw_bucket& _bucket) : ps(_ps), bucket(_bucket) {
- ps->get_bucket_meta_obj(bucket, &bucket_meta_obj);
+ Bucket(const RGWPubSub& _ps, const rgw_bucket& _bucket) : ps(_ps), bucket(_bucket) {
+ ps.get_bucket_meta_obj(bucket, &bucket_meta_obj);
}
// read the list of topics associated with a bucket and populate into result
// return 0 on success or if no topic was associated with the bucket, error code otherwise
- int get_topics(rgw_pubsub_bucket_topics *result);
+ int get_topics(rgw_pubsub_bucket_topics *result) const;
// adds a topic + filter (event list, and possibly name metadata or tags filters) to a bucket
// assigning a notification name is optional (needed for S3 compatible notifications)
// if the topic already exist on the bucket, the filter event list may be updated
// for S3 compliant notifications the version with: s3_filter and notif_name should be used
// return -ENOENT if the topic does not exists
// return 0 on success, error code otherwise
- int create_notification(const DoutPrefixProvider *dpp, const std::string& topic_name, const rgw::notify::EventTypeList& events, optional_yield y);
- int create_notification(const DoutPrefixProvider *dpp, const std::string& topic_name, const rgw::notify::EventTypeList& events, OptionalFilter s3_filter, const std::string& notif_name, optional_yield y);
+ int create_notification(const DoutPrefixProvider *dpp, const std::string& topic_name,
+ const rgw::notify::EventTypeList& events, optional_yield y) const;
+ int create_notification(const DoutPrefixProvider *dpp, const std::string& topic_name,
+ const rgw::notify::EventTypeList& events, OptionalFilter s3_filter, const std::string& notif_name, optional_yield y) const;
// remove a topic and filter from bucket
// if the topic does not exists on the bucket it is a no-op (considered success)
// return -ENOENT if the topic does not exists
// return 0 on success, error code otherwise
- int remove_notification(const DoutPrefixProvider *dpp, const std::string& topic_name, optional_yield y);
+ int remove_notification(const DoutPrefixProvider *dpp, const std::string& topic_name, optional_yield y) const;
// remove all notifications (and autogenerated topics) associated with the bucket
// return 0 on success or if no topic was associated with the bucket, error code otherwise
- int remove_notifications(const DoutPrefixProvider *dpp, optional_yield y);
+ int remove_notifications(const DoutPrefixProvider *dpp, optional_yield y) const;
};
void get_meta_obj(rgw_raw_obj *obj) const;
// get all topics (per tenant, if used)) and populate them into "result"
// return 0 on success or if no topics exist, error code otherwise
- int get_topics(rgw_pubsub_topics *result);
+ int get_topics(rgw_pubsub_topics *result) const;
// get a topic with by its name and populate it into "result"
// return -ENOENT if the topic does not exists
// return 0 on success, error code otherwise
- int get_topic(const std::string& name, rgw_pubsub_topic *result);
+ int get_topic(const std::string& name, rgw_pubsub_topic *result) const;
// create a topic with a name only
// if the topic already exists it is a no-op (considered success)
// return 0 on success, error code otherwise
- int create_topic(const DoutPrefixProvider *dpp, const std::string& name, optional_yield y);
+ int create_topic(const DoutPrefixProvider *dpp, const std::string& name, optional_yield y) const;
// create a topic with push destination information and ARN
// if the topic already exists the destination and ARN values may be updated (considered succsess)
// return 0 on success, error code otherwise
- int create_topic(const DoutPrefixProvider *dpp, const std::string& name, const rgw_pubsub_dest& dest, const std::string& arn, const std::string& opaque_data, optional_yield y);
+ int create_topic(const DoutPrefixProvider *dpp, const std::string& name, const rgw_pubsub_dest& dest,
+ const std::string& arn, const std::string& opaque_data, optional_yield y) const;
// remove a topic according to its name
// if the topic does not exists it is a no-op (considered success)
// return 0 on success, error code otherwise
- int remove_topic(const DoutPrefixProvider *dpp, const std::string& name, optional_yield y);
+ int remove_topic(const DoutPrefixProvider *dpp, const std::string& name, optional_yield y) const;
};
-
template <class T>
-int RGWPubSub::read(const rgw_raw_obj& obj, T* result, RGWObjVersionTracker* objv_tracker)
+int RGWPubSub::read(const rgw_raw_obj& obj, T* result, RGWObjVersionTracker* objv_tracker) const
{
bufferlist bl;
int ret = rgw_get_system_obj(svc_sysobj,
template <class T>
int RGWPubSub::write(const DoutPrefixProvider *dpp, const rgw_raw_obj& obj, const T& info,
- RGWObjVersionTracker* objv_tracker, optional_yield y)
+ RGWObjVersionTracker* objv_tracker, optional_yield y) const
{
bufferlist bl;
encode(info, bl);
return rgw_put_system_obj(dpp, svc_sysobj, obj.pool, obj.oid,
bl, false, objv_tracker, real_time(), y);
}
+
}
}
-int remove_notification_by_topic(const DoutPrefixProvider *dpp, const std::string& topic_name, RGWPubSub::Bucket& b, optional_yield y, RGWPubSub& ps) {
+int remove_notification_by_topic(const DoutPrefixProvider *dpp, const std::string& topic_name, const RGWPubSub::Bucket& b, optional_yield y, const RGWPubSub& ps) {
int op_ret = b.remove_notification(dpp, topic_name, y);
if (op_ret < 0) {
ldpp_dout(dpp, 1) << "failed to remove notification of topic '" << topic_name << "', ret=" << op_ret << dendl;
return op_ret;
}
-int delete_all_notifications(const DoutPrefixProvider *dpp, const rgw_pubsub_bucket_topics& bucket_topics, RGWPubSub::Bucket& b, optional_yield y, RGWPubSub& ps) {
+int delete_all_notifications(const DoutPrefixProvider *dpp, const rgw_pubsub_bucket_topics& bucket_topics, const RGWPubSub::Bucket& b, optional_yield y, const RGWPubSub& ps) {
// delete all notifications of on a bucket
for (const auto& topic : bucket_topics.topics) {
const auto op_ret = remove_notification_by_topic(dpp, topic.first, b, y, ps);
return;
}
- RGWPubSub ps(static_cast<rgw::sal::RadosStore*>(driver), s->owner.get_id().tenant);
- RGWPubSub::Bucket b(&ps, bucket_info.bucket);
+ const RGWPubSub ps(static_cast<rgw::sal::RadosStore*>(driver), s->owner.get_id().tenant);
+ const RGWPubSub::Bucket b(ps, bucket_info.bucket);
if(configurations.list.empty()) {
// get all topics on a bucket
return;
}
- RGWPubSub ps(static_cast<rgw::sal::RadosStore*>(driver), s->owner.get_id().tenant);
- RGWPubSub::Bucket b(&ps, bucket_info.bucket);
+ const RGWPubSub ps(static_cast<rgw::sal::RadosStore*>(driver), s->owner.get_id().tenant);
+ const RGWPubSub::Bucket b(ps, bucket_info.bucket);
// get all topics on a bucket
rgw_pubsub_bucket_topics bucket_topics;
};
void RGWPSListNotifsOp::execute(optional_yield y) {
- RGWPubSub ps(static_cast<rgw::sal::RadosStore*>(driver), s->owner.get_id().tenant);
- RGWPubSub::Bucket b(&ps, bucket_info.bucket);
+ const RGWPubSub ps(static_cast<rgw::sal::RadosStore*>(driver), s->owner.get_id().tenant);
+ const RGWPubSub::Bucket b(ps, bucket_info.bucket);
// get all topics on a bucket
rgw_pubsub_bucket_topics bucket_topics;
// if bucket has notification definitions associated with it
// they should be removed (note that any pending notifications on the bucket are still going to be sent)
- RGWPubSub ps(store, info.owner.tenant);
- RGWPubSub::Bucket ps_bucket(&ps, info.bucket);
+ const RGWPubSub ps(store, info.owner.tenant);
+ const RGWPubSub::Bucket ps_bucket(ps, info.bucket);
const auto ps_ret = ps_bucket.remove_notifications(dpp, y);
if (ps_ret < 0 && ps_ret != -ENOENT) {
ldpp_dout(dpp, -1) << "ERROR: unable to remove notifications from bucket. ret=" << ps_ret << dendl;
return -ret;
}
- RGWPubSub::Bucket b(&ps, bucket->get_key());
+ const RGWPubSub::Bucket b(ps, bucket->get_key());
ret = b.get_topics(&result);
if (ret < 0 && ret != -ENOENT) {
cerr << "ERROR: could not get topics: " << cpp_strerror(-ret) << std::endl;