Note, that if the events should be stored in Ceph, in addition, or instead of being pushed to an endpoint,
the `PubSub Module`_ should be used instead of the bucket notification mechanism.
-A user can create different topics. A topic entity is defined by its user and its name. A
-user can only manage its own topics, and can only associate them with buckets it owns.
+A user can create different topics. A topic entity is defined by its name and is per tenant. A
+user can only associate its topics (via notification configuration) with buckets it owns.
In order to send notifications for events for a specific bucket, a notification entity needs to be created. A
notification can be created on a subset of event types, or for all event types (default).
Topic Management via CLI
------------------------
-Configuration of all topics of a user could be fetched using the following command:
+Configuration of all topics, associated with a tenant, could be fetched using the following command:
::
- # radosgw-admin topic list --uid={user-id}
+ # radosgw-admin topic list [--tenant={tenant}]
Configuration of a specific topic could be fetched using:
::
- # radosgw-admin topic get --uid={user-id} --topic={topic-name}
+ # radosgw-admin topic get --topic={topic-name} [--tenant={tenant}]
And removed using:
::
- # radosgw-admin topic rm --uid={user-id} --topic={topic-name}
+ # radosgw-admin topic rm --topic={topic-name} [--tenant={tenant}]
Notification Performance Stats
List Topics
```````````
-List all topics that user defined.
+List all topics associated with a tenant.
::
AMQP0.9.1 and Kafka endpoints. In this case, the events are pushed to an endpoint on top of storing them in Ceph. If events should only be pushed to an endpoint
and do not need to be stored in Ceph, the `Bucket Notification`_ mechanism should be used instead of pubsub sync module.
-A user can create different topics. A topic entity is defined by its user and its name. A
-user can only manage its own topics, and can only subscribe to events published by buckets
-it owns.
+A user can create different topics. A topic entity is defined by its name and is per tenant. A
+user can only associate its topics (via notification configuration) with buckets it owns.
In order to publish events for specific bucket a notification entity needs to be created. A
notification can be created on a subset of event types, or for all event types (default).
together, although it is recommended to use the S3-compatible one.
The S3-compatible API is similar to the one used in the bucket notification mechanism.
-Events are stored as RGW objects in a special bucket, under a special user. Events cannot
+Events are stored as RGW objects in a special bucket, under a special user (pubsub control user). Events cannot
be accessed directly, but need to be pulled and acked using the new REST API.
.. toctree::
Topic and Subscription Management via CLI
-----------------------------------------
-Configuration of all topics of a user could be fetched using the following command:
+Configuration of all topics, associated with a tenant, could be fetched using the following command:
::
- # radosgw-admin topic list --uid={user-id}
+ # radosgw-admin topic list [--tenant={tenant}]
Configuration of a specific topic could be fetched using:
::
- # radosgw-admin topic get --uid={user-id} --topic={topic-name}
+ # radosgw-admin topic get --topic={topic-name} [--tenant={tenant}]
And removed using:
::
- # radosgw-admin topic rm --uid={user-id} --topic={topic-name}
+ # radosgw-admin topic rm --topic={topic-name} [--tenant={tenant}]
Configuration of a subscription could be fetched using:
::
- # radosgw-admin subscription get --uid={user-id} --subscription={topic-name}
+ # radosgw-admin subscription get --subscription={topic-name} [--tenant={tenant}]
And removed using:
::
- # radosgw-admin subscription rm --uid={user-id} --subscription={topic-name}
+ # radosgw-admin subscription rm --subscription={topic-name} [--tenant={tenant}]
To fetch all of the events stored in a subcription, use:
::
- # radosgw-admin subscription pull --uid={user-id} --subscription={topic-name} [--marker={last-marker}]
+ # radosgw-admin subscription pull --subscription={topic-name} [--marker={last-marker}] [--tenant={tenant}]
To ack (and remove) an event from a subscription, use:
::
- # radosgw-admin subscription ack --uid={user-id} --subscription={topic-name} --event-id={event-id}
+ # radosgw-admin subscription ack --subscription={topic-name} --event-id={event-id} [--tenant={tenant}]
PubSub Performance Stats
List Topics
```````````
-List all topics that user defined.
+List all topics associated with a tenant.
::
}
if (opt_cmd == OPT::PUBSUB_TOPICS_LIST) {
- if (user_id.empty()) {
- cerr << "ERROR: user id was not provided (via --uid)" << std::endl;
- return EINVAL;
- }
- RGWUserInfo& user_info = user_op.get_user_info();
- RGWUserPubSub ups(store, user_info.user_id);
+ RGWPubSub ps(store, tenant);
rgw_bucket bucket;
return -ret;
}
- auto b = ups.get_bucket(bucket_info.bucket);
+ auto b = ps.get_bucket(bucket_info.bucket);
ret = b->get_topics(&result);
if (ret < 0) {
cerr << "ERROR: could not get topics: " << cpp_strerror(-ret) << std::endl;
}
encode_json("result", result, formatter.get());
} else {
- rgw_pubsub_user_topics result;
- int ret = ups.get_user_topics(&result);
+ rgw_pubsub_topics result;
+ int ret = ps.get_topics(&result);
if (ret < 0) {
cerr << "ERROR: could not get topics: " << cpp_strerror(-ret) << std::endl;
return -ret;
cerr << "ERROR: topic name was not provided (via --topic)" << std::endl;
return EINVAL;
}
- if (user_id.empty()) {
- cerr << "ERROR: user id was not provided (via --uid)" << std::endl;
- return EINVAL;
- }
- RGWUserInfo& user_info = user_op.get_user_info();
- RGWUserPubSub ups(store, user_info.user_id);
+
+ RGWPubSub ps(store, tenant);
rgw_pubsub_topic_subs topic;
- ret = ups.get_topic(topic_name, &topic);
+ ret = ps.get_topic(topic_name, &topic);
if (ret < 0) {
cerr << "ERROR: could not get topic: " << cpp_strerror(-ret) << std::endl;
return -ret;
cerr << "ERROR: topic name was not provided (via --topic)" << std::endl;
return EINVAL;
}
- if (user_id.empty()) {
- cerr << "ERROR: user id was not provided (via --uid)" << std::endl;
- return EINVAL;
- }
- RGWUserInfo& user_info = user_op.get_user_info();
- RGWUserPubSub ups(store, user_info.user_id);
- ret = ups.remove_topic(topic_name, null_yield);
+ RGWPubSub ps(store, tenant);
+
+ ret = ps.remove_topic(topic_name, null_yield);
if (ret < 0) {
cerr << "ERROR: could not remove topic: " << cpp_strerror(-ret) << std::endl;
return -ret;
cerr << "ERROR: only pubsub tier type supports this command" << std::endl;
return EINVAL;
}
- if (user_id.empty()) {
- cerr << "ERROR: user id was not provided (via --uid)" << std::endl;
- return EINVAL;
- }
if (sub_name.empty()) {
cerr << "ERROR: subscription name was not provided (via --subscription)" << std::endl;
return EINVAL;
}
- RGWUserInfo& user_info = user_op.get_user_info();
- RGWUserPubSub ups(store, user_info.user_id);
+
+ RGWPubSub ps(store, tenant);
rgw_pubsub_sub_config sub_conf;
- auto sub = ups.get_sub(sub_name);
+ auto sub = ps.get_sub(sub_name);
ret = sub->get_conf(&sub_conf);
if (ret < 0) {
cerr << "ERROR: could not get subscription info: " << cpp_strerror(-ret) << std::endl;
cerr << "ERROR: only pubsub tier type supports this command" << std::endl;
return EINVAL;
}
- if (user_id.empty()) {
- cerr << "ERROR: user id was not provided (via --uid)" << std::endl;
- return EINVAL;
- }
if (sub_name.empty()) {
cerr << "ERROR: subscription name was not provided (via --subscription)" << std::endl;
return EINVAL;
}
- RGWUserInfo& user_info = user_op.get_user_info();
- RGWUserPubSub ups(store, user_info.user_id);
- auto sub = ups.get_sub(sub_name);
+ RGWPubSub ps(store, tenant);
+
+ auto sub = ps.get_sub(sub_name);
ret = sub->unsubscribe(topic_name, null_yield);
if (ret < 0) {
cerr << "ERROR: could not get subscription info: " << cpp_strerror(-ret) << std::endl;
cerr << "ERROR: only pubsub tier type supports this command" << std::endl;
return EINVAL;
}
- if (user_id.empty()) {
- cerr << "ERROR: user id was not provided (via --uid)" << std::endl;
- return EINVAL;
- }
if (sub_name.empty()) {
cerr << "ERROR: subscription name was not provided (via --subscription)" << std::endl;
return EINVAL;
}
- RGWUserInfo& user_info = user_op.get_user_info();
- RGWUserPubSub ups(store, user_info.user_id);
+
+ RGWPubSub ps(store, tenant);
if (!max_entries_specified) {
- max_entries = RGWUserPubSub::Sub::DEFAULT_MAX_EVENTS;
+ max_entries = RGWPubSub::Sub::DEFAULT_MAX_EVENTS;
}
- auto sub = ups.get_sub_with_events(sub_name);
+ auto sub = ps.get_sub_with_events(sub_name);
ret = sub->list_events(marker, max_entries);
if (ret < 0) {
cerr << "ERROR: could not list events: " << cpp_strerror(-ret) << std::endl;
cerr << "ERROR: only pubsub tier type supports this command" << std::endl;
return EINVAL;
}
- if (user_id.empty()) {
- cerr << "ERROR: user id was not provided (via --uid)" << std::endl;
- return EINVAL;
- }
if (sub_name.empty()) {
cerr << "ERROR: subscription name was not provided (via --subscription)" << std::endl;
return EINVAL;
cerr << "ERROR: event id was not provided (via --event-id)" << std::endl;
return EINVAL;
}
- RGWUserInfo& user_info = user_op.get_user_info();
- RGWUserPubSub ups(store, user_info.user_id);
- auto sub = ups.get_sub_with_events(sub_name);
+ RGWPubSub ps(store, tenant);
+
+ auto sub = ps.get_sub_with_events(sub_name);
ret = sub->remove_event(event_id);
if (ret < 0) {
cerr << "ERROR: could not remove event: " << cpp_strerror(-ret) << std::endl;
int publish_reserve(EventType event_type,
reservation_t& res)
{
- RGWUserPubSub ps_user(res.store, res.s->user->get_id());
- RGWUserPubSub::Bucket ps_bucket(&ps_user, res.s->bucket->get_key());
+ RGWPubSub ps(res.store, res.s->user->get_id().tenant);
+ RGWPubSub::Bucket ps_bucket(&ps, res.s->bucket->get_key());
rgw_pubsub_bucket_topics bucket_topics;
auto rc = ps_bucket.get_topics(&bucket_topics);
if (rc < 0) {
}
}
-void rgw_pubsub_user_topics::dump(Formatter *f) const
+void rgw_pubsub_topics::dump(Formatter *f) const
{
Formatter::ArraySection s(*f, "topics");
for (auto& t : topics) {
}
}
-void rgw_pubsub_user_topics::dump_xml(Formatter *f) const
+void rgw_pubsub_topics::dump_xml(Formatter *f) const
{
for (auto& t : topics) {
encode_xml("member", t.second.topic, f);
encode_json("s3_id", s3_id, f);
}
-RGWUserPubSub::RGWUserPubSub(rgw::sal::RGWRadosStore* _store, const rgw_user& _user) :
+RGWPubSub::RGWPubSub(rgw::sal::RGWRadosStore* _store, const std::string& _tenant) :
store(_store),
- user(_user),
+ tenant(_tenant),
obj_ctx(store->svc()->sysobj->init_obj_ctx()) {
- get_user_meta_obj(&user_meta_obj);
+ get_meta_obj(&meta_obj);
}
-int RGWUserPubSub::remove(const rgw_raw_obj& obj,
+int RGWPubSub::remove(const rgw_raw_obj& obj,
RGWObjVersionTracker *objv_tracker,
optional_yield y)
{
return 0;
}
-int RGWUserPubSub::read_user_topics(rgw_pubsub_user_topics *result, RGWObjVersionTracker *objv_tracker)
+int RGWPubSub::read_topics(rgw_pubsub_topics *result, RGWObjVersionTracker *objv_tracker)
{
- int ret = read(user_meta_obj, result, objv_tracker);
+ int ret = read(meta_obj, result, objv_tracker);
if (ret < 0) {
ldout(store->ctx(), 10) << "WARNING: failed to read topics info: ret=" << ret << dendl;
return ret;
return 0;
}
-int RGWUserPubSub::write_user_topics(const rgw_pubsub_user_topics& topics,
+int RGWPubSub::write_topics(const rgw_pubsub_topics& topics,
RGWObjVersionTracker *objv_tracker, optional_yield y)
{
- int ret = write(user_meta_obj, topics, objv_tracker, y);
+ int ret = write(meta_obj, topics, objv_tracker, y);
if (ret < 0 && ret != -ENOENT) {
ldout(store->ctx(), 1) << "ERROR: failed to write topics info: ret=" << ret << dendl;
return ret;
return 0;
}
-int RGWUserPubSub::get_user_topics(rgw_pubsub_user_topics *result)
+int RGWPubSub::get_topics(rgw_pubsub_topics *result)
{
- return read_user_topics(result, nullptr);
+ return read_topics(result, nullptr);
}
-int RGWUserPubSub::Bucket::read_topics(rgw_pubsub_bucket_topics *result, RGWObjVersionTracker *objv_tracker)
+int RGWPubSub::Bucket::read_topics(rgw_pubsub_bucket_topics *result, RGWObjVersionTracker *objv_tracker)
{
int ret = ps->read(bucket_meta_obj, result, objv_tracker);
if (ret < 0 && ret != -ENOENT) {
return 0;
}
-int RGWUserPubSub::Bucket::write_topics(const rgw_pubsub_bucket_topics& topics,
+int RGWPubSub::Bucket::write_topics(const rgw_pubsub_bucket_topics& topics,
RGWObjVersionTracker *objv_tracker,
optional_yield y)
{
return 0;
}
-int RGWUserPubSub::Bucket::get_topics(rgw_pubsub_bucket_topics *result)
+int RGWPubSub::Bucket::get_topics(rgw_pubsub_bucket_topics *result)
{
return read_topics(result, nullptr);
}
-int RGWUserPubSub::get_topic(const string& name, rgw_pubsub_topic_subs *result)
+int RGWPubSub::get_topic(const string& name, rgw_pubsub_topic_subs *result)
{
- rgw_pubsub_user_topics topics;
- int ret = get_user_topics(&topics);
+ rgw_pubsub_topics topics;
+ int ret = get_topics(&topics);
if (ret < 0) {
ldout(store->ctx(), 1) << "ERROR: failed to read topics info: ret=" << ret << dendl;
return ret;
return 0;
}
-int RGWUserPubSub::get_topic(const string& name, rgw_pubsub_topic *result)
+int RGWPubSub::get_topic(const string& name, rgw_pubsub_topic *result)
{
- rgw_pubsub_user_topics topics;
- int ret = get_user_topics(&topics);
+ rgw_pubsub_topics topics;
+ int ret = get_topics(&topics);
if (ret < 0) {
ldout(store->ctx(), 1) << "ERROR: failed to read topics info: ret=" << ret << dendl;
return ret;
return 0;
}
-int RGWUserPubSub::Bucket::create_notification(const string& topic_name, const rgw::notify::EventTypeList& events, optional_yield y) {
+int RGWPubSub::Bucket::create_notification(const string& topic_name, const rgw::notify::EventTypeList& events, optional_yield y) {
return create_notification(topic_name, events, std::nullopt, "", y);
}
-int RGWUserPubSub::Bucket::create_notification(const string& topic_name,const rgw::notify::EventTypeList& events, OptionalFilter s3_filter, const std::string& notif_name, optional_yield y) {
- rgw_pubsub_topic_subs user_topic_info;
+int RGWPubSub::Bucket::create_notification(const string& topic_name,const rgw::notify::EventTypeList& events, OptionalFilter s3_filter, const std::string& notif_name, optional_yield y) {
+ rgw_pubsub_topic_subs topic_info;
rgw::sal::RGWRadosStore *store = ps->store;
- int ret = ps->get_topic(topic_name, &user_topic_info);
+ int ret = ps->get_topic(topic_name, &topic_info);
if (ret < 0) {
ldout(store->ctx(), 1) << "ERROR: failed to read topic '" << topic_name << "' info: ret=" << ret << dendl;
return ret;
bucket.name << "'" << dendl;
auto& topic_filter = bucket_topics.topics[topic_name];
- topic_filter.topic = user_topic_info.topic;
+ topic_filter.topic = topic_info.topic;
topic_filter.events = events;
topic_filter.s3_id = notif_name;
if (s3_filter) {
return 0;
}
-int RGWUserPubSub::Bucket::remove_notification(const string& topic_name, optional_yield y)
+int RGWPubSub::Bucket::remove_notification(const string& topic_name, optional_yield y)
{
- rgw_pubsub_topic_subs user_topic_info;
+ rgw_pubsub_topic_subs topic_info;
rgw::sal::RGWRadosStore *store = ps->store;
- int ret = ps->get_topic(topic_name, &user_topic_info);
+ int ret = ps->get_topic(topic_name, &topic_info);
if (ret < 0) {
ldout(store->ctx(), 1) << "ERROR: failed to read topic info: ret=" << ret << dendl;
return ret;
return 0;
}
-int RGWUserPubSub::create_topic(const string& name, optional_yield y) {
+int RGWPubSub::create_topic(const string& name, optional_yield y) {
return create_topic(name, rgw_pubsub_sub_dest(), "", "", y);
}
-int RGWUserPubSub::create_topic(const string& name, const rgw_pubsub_sub_dest& dest, const std::string& arn, const std::string& opaque_data, optional_yield y) {
+int RGWPubSub::create_topic(const string& name, const rgw_pubsub_sub_dest& dest, const std::string& arn, const std::string& opaque_data, optional_yield y) {
RGWObjVersionTracker objv_tracker;
- rgw_pubsub_user_topics topics;
+ rgw_pubsub_topics topics;
- int ret = read_user_topics(&topics, &objv_tracker);
+ int ret = read_topics(&topics, &objv_tracker);
if (ret < 0 && ret != -ENOENT) {
// its not an error if not topics exist, we create one
ldout(store->ctx(), 1) << "ERROR: failed to read topics info: ret=" << ret << dendl;
}
rgw_pubsub_topic_subs& new_topic = topics.topics[name];
- new_topic.topic.user = user;
+ new_topic.topic.user = rgw_user("", tenant);
new_topic.topic.name = name;
new_topic.topic.dest = dest;
new_topic.topic.arn = arn;
new_topic.topic.opaque_data = opaque_data;
- ret = write_user_topics(topics, &objv_tracker, y);
+ ret = write_topics(topics, &objv_tracker, y);
if (ret < 0) {
ldout(store->ctx(), 1) << "ERROR: failed to write topics info: ret=" << ret << dendl;
return ret;
return 0;
}
-int RGWUserPubSub::remove_topic(const string& name, optional_yield y)
+int RGWPubSub::remove_topic(const string& name, optional_yield y)
{
RGWObjVersionTracker objv_tracker;
- rgw_pubsub_user_topics topics;
+ rgw_pubsub_topics topics;
- int ret = read_user_topics(&topics, &objv_tracker);
+ int ret = read_topics(&topics, &objv_tracker);
if (ret < 0 && ret != -ENOENT) {
ldout(store->ctx(), 1) << "ERROR: failed to read topics info: ret=" << ret << dendl;
return ret;
topics.topics.erase(name);
- ret = write_user_topics(topics, &objv_tracker, y);
+ ret = write_topics(topics, &objv_tracker, y);
if (ret < 0) {
ldout(store->ctx(), 1) << "ERROR: failed to remove topics info: ret=" << ret << dendl;
return ret;
return 0;
}
-int RGWUserPubSub::Sub::read_sub(rgw_pubsub_sub_config *result, RGWObjVersionTracker *objv_tracker)
+int RGWPubSub::Sub::read_sub(rgw_pubsub_sub_config *result, RGWObjVersionTracker *objv_tracker)
{
int ret = ps->read(sub_meta_obj, result, objv_tracker);
if (ret < 0 && ret != -ENOENT) {
return 0;
}
-int RGWUserPubSub::Sub::write_sub(const rgw_pubsub_sub_config& sub_conf,
+int RGWPubSub::Sub::write_sub(const rgw_pubsub_sub_config& sub_conf,
RGWObjVersionTracker *objv_tracker,
optional_yield y)
{
return 0;
}
-int RGWUserPubSub::Sub::remove_sub(RGWObjVersionTracker *objv_tracker,
+int RGWPubSub::Sub::remove_sub(RGWObjVersionTracker *objv_tracker,
optional_yield y)
{
int ret = ps->remove(sub_meta_obj, objv_tracker, y);
return 0;
}
-int RGWUserPubSub::Sub::get_conf(rgw_pubsub_sub_config *result)
+int RGWPubSub::Sub::get_conf(rgw_pubsub_sub_config *result)
{
return read_sub(result, nullptr);
}
-int RGWUserPubSub::Sub::subscribe(const string& topic, const rgw_pubsub_sub_dest& dest, optional_yield y, const std::string& s3_id)
+int RGWPubSub::Sub::subscribe(const string& topic, const rgw_pubsub_sub_dest& dest, optional_yield y, const std::string& s3_id)
{
- RGWObjVersionTracker user_objv_tracker;
- rgw_pubsub_user_topics topics;
+ RGWObjVersionTracker objv_tracker;
+ rgw_pubsub_topics topics;
rgw::sal::RGWRadosStore *store = ps->store;
- int ret = ps->read_user_topics(&topics, &user_objv_tracker);
+ int ret = ps->read_topics(&topics, &objv_tracker);
if (ret < 0) {
ldout(store->ctx(), 1) << "ERROR: failed to read topics info: ret=" << ret << dendl;
return ret != -ENOENT ? ret : -EINVAL;
rgw_pubsub_sub_config sub_conf;
- sub_conf.user = ps->user;
+ sub_conf.user = rgw_user("", ps->tenant);
sub_conf.name = sub;
sub_conf.topic = topic;
sub_conf.dest = dest;
t.subs.insert(sub);
- ret = ps->write_user_topics(topics, &user_objv_tracker, y);
+ ret = ps->write_topics(topics, &objv_tracker, y);
if (ret < 0) {
ldout(store->ctx(), 1) << "ERROR: failed to write topics info: ret=" << ret << dendl;
return ret;
return 0;
}
-int RGWUserPubSub::Sub::unsubscribe(const string& _topic, optional_yield y)
+int RGWPubSub::Sub::unsubscribe(const string& _topic, optional_yield y)
{
string topic = _topic;
RGWObjVersionTracker sobjv_tracker;
}
RGWObjVersionTracker objv_tracker;
- rgw_pubsub_user_topics topics;
+ rgw_pubsub_topics topics;
- int ret = ps->read_user_topics(&topics, &objv_tracker);
+ int ret = ps->read_topics(&topics, &objv_tracker);
if (ret < 0) {
// not an error - could be that topic was already deleted
ldout(store->ctx(), 10) << "WARNING: failed to read topics info: ret=" << ret << dendl;
t.subs.erase(sub);
- ret = ps->write_user_topics(topics, &objv_tracker, y);
+ ret = ps->write_topics(topics, &objv_tracker, y);
if (ret < 0) {
ldout(store->ctx(), 1) << "ERROR: failed to write topics info: ret=" << ret << dendl;
return ret;
}
template<typename EventType>
-void RGWUserPubSub::SubWithEvents<EventType>::list_events_result::dump(Formatter *f) const
+void RGWPubSub::SubWithEvents<EventType>::list_events_result::dump(Formatter *f) const
{
encode_json("next_marker", next_marker, f);
encode_json("is_truncated", is_truncated, f);
}
template<typename EventType>
-int RGWUserPubSub::SubWithEvents<EventType>::list_events(const string& marker, int max_events)
+int RGWPubSub::SubWithEvents<EventType>::list_events(const string& marker, int max_events)
{
RGWRados *store = ps->store->getRados();
rgw_pubsub_sub_config sub_conf;
}
template<typename EventType>
-int RGWUserPubSub::SubWithEvents<EventType>::remove_event(const string& event_id)
+int RGWPubSub::SubWithEvents<EventType>::remove_event(const string& event_id)
{
rgw::sal::RGWRadosStore *store = ps->store;
rgw_pubsub_sub_config sub_conf;
return 0;
}
-void RGWUserPubSub::get_user_meta_obj(rgw_raw_obj *obj) const {
- *obj = rgw_raw_obj(store->svc()->zone->get_zone_params().log_pool, user_meta_oid());
+void RGWPubSub::get_meta_obj(rgw_raw_obj *obj) const {
+ *obj = rgw_raw_obj(store->svc()->zone->get_zone_params().log_pool, meta_oid());
}
-void RGWUserPubSub::get_bucket_meta_obj(const rgw_bucket& bucket, rgw_raw_obj *obj) const {
+void RGWPubSub::get_bucket_meta_obj(const rgw_bucket& bucket, rgw_raw_obj *obj) const {
*obj = rgw_raw_obj(store->svc()->zone->get_zone_params().log_pool, bucket_meta_oid(bucket));
}
-void RGWUserPubSub::get_sub_meta_obj(const string& name, rgw_raw_obj *obj) const {
+void RGWPubSub::get_sub_meta_obj(const string& name, rgw_raw_obj *obj) const {
*obj = rgw_raw_obj(store->svc()->zone->get_zone_params().log_pool, sub_meta_oid(name));
}
template<typename EventType>
-void RGWUserPubSub::SubWithEvents<EventType>::dump(Formatter* f) const {
+void RGWPubSub::SubWithEvents<EventType>::dump(Formatter* f) const {
list.dump(f);
}
// explicit instantiation for the only two possible types
// no need to move implementation to header
-template class RGWUserPubSub::SubWithEvents<rgw_pubsub_event>;
-template class RGWUserPubSub::SubWithEvents<rgw_pubsub_s3_record>;
+template class RGWPubSub::SubWithEvents<rgw_pubsub_event>;
+template class RGWPubSub::SubWithEvents<rgw_pubsub_s3_record>;
}
string to_str() const {
- return user.to_str() + "/" + name;
+ return user.tenant + "/" + name;
}
void dump(Formatter *f) const;
};
WRITE_CLASS_ENCODER(rgw_pubsub_bucket_topics)
-struct rgw_pubsub_user_topics {
+struct rgw_pubsub_topics {
std::map<std::string, rgw_pubsub_topic_subs> topics;
void encode(bufferlist& bl) const {
void dump(Formatter *f) const;
void dump_xml(Formatter *f) const;
};
-WRITE_CLASS_ENCODER(rgw_pubsub_user_topics)
+WRITE_CLASS_ENCODER(rgw_pubsub_topics)
-static std::string pubsub_user_oid_prefix = "pubsub.user.";
+static std::string pubsub_oid_prefix = "pubsub.";
-class RGWUserPubSub
+class RGWPubSub
{
friend class Bucket;
rgw::sal::RGWRadosStore *store;
- rgw_user user;
+ const std::string tenant;
RGWSysObjectCtx obj_ctx;
- rgw_raw_obj user_meta_obj;
+ rgw_raw_obj meta_obj;
- std::string user_meta_oid() const {
- return pubsub_user_oid_prefix + user.to_str();
+ std::string meta_oid() const {
+ return pubsub_oid_prefix + tenant;
}
std::string bucket_meta_oid(const rgw_bucket& bucket) const {
- return pubsub_user_oid_prefix + user.to_str() + ".bucket." + bucket.name + "/" + bucket.bucket_id;
+ return pubsub_oid_prefix + tenant + ".bucket." + bucket.name + "/" + bucket.bucket_id;
}
std::string sub_meta_oid(const string& name) const {
- return pubsub_user_oid_prefix + user.to_str() + ".sub." + name;
+ return pubsub_oid_prefix + tenant + ".sub." + name;
}
template <class T>
- int read(const rgw_raw_obj& obj, T *data, RGWObjVersionTracker *objv_tracker);
+ int read(const rgw_raw_obj& obj, T* data, RGWObjVersionTracker* objv_tracker);
template <class T>
int write(const rgw_raw_obj& obj, const T& info,
- RGWObjVersionTracker *obj_tracker, optional_yield y);
+ RGWObjVersionTracker* obj_tracker, optional_yield y);
- int remove(const rgw_raw_obj& obj, RGWObjVersionTracker *objv_tracker,
+ int remove(const rgw_raw_obj& obj, RGWObjVersionTracker* objv_tracker,
optional_yield y);
- int read_user_topics(rgw_pubsub_user_topics *result, RGWObjVersionTracker *objv_tracker);
- int write_user_topics(const rgw_pubsub_user_topics& topics,
- RGWObjVersionTracker *objv_tracker, optional_yield y);
+ int read_topics(rgw_pubsub_topics *result, RGWObjVersionTracker* objv_tracker);
+ int write_topics(const rgw_pubsub_topics& topics,
+ RGWObjVersionTracker* objv_tracker, optional_yield y);
public:
- RGWUserPubSub(rgw::sal::RGWRadosStore *_store, const rgw_user& _user);
+ RGWPubSub(rgw::sal::RGWRadosStore *_store, const std::string& tenant);
class Bucket {
- friend class RGWUserPubSub;
- RGWUserPubSub *ps;
+ friend class RGWPubSub;
+ RGWPubSub *ps;
rgw_bucket bucket;
rgw_raw_obj bucket_meta_obj;
// read the list of topics associated with a bucket and populate into result
// use version tacker to enforce atomicity between read/write
// return 0 on success or if no topic was associated with the bucket, error code otherwise
- int read_topics(rgw_pubsub_bucket_topics *result, RGWObjVersionTracker *objv_tracker);
+ int read_topics(rgw_pubsub_bucket_topics *result, RGWObjVersionTracker* objv_tracker);
// set the list of topics associated with a bucket
// use version tacker to enforce atomicity between read/write
// return 0 on success, error code otherwise
int write_topics(const rgw_pubsub_bucket_topics& topics,
- RGWObjVersionTracker *objv_tracker, optional_yield y);
+ RGWObjVersionTracker* objv_tracker, optional_yield y);
public:
- Bucket(RGWUserPubSub *_ps, const rgw_bucket& _bucket) : ps(_ps), bucket(_bucket) {
+ Bucket(RGWPubSub *_ps, const rgw_bucket& _bucket) : ps(_ps), bucket(_bucket) {
ps->get_bucket_meta_obj(bucket, &bucket_meta_obj);
}
// base class for subscription
class Sub {
- friend class RGWUserPubSub;
+ friend class RGWPubSub;
protected:
- RGWUserPubSub* const ps;
+ RGWPubSub* const ps;
const std::string sub;
rgw_raw_obj sub_meta_obj;
- int read_sub(rgw_pubsub_sub_config *result, RGWObjVersionTracker *objv_tracker);
+ int read_sub(rgw_pubsub_sub_config *result, RGWObjVersionTracker* objv_tracker);
int write_sub(const rgw_pubsub_sub_config& sub_conf,
- RGWObjVersionTracker *objv_tracker, optional_yield y);
- int remove_sub(RGWObjVersionTracker *objv_tracker, optional_yield y);
+ RGWObjVersionTracker* objv_tracker, optional_yield y);
+ int remove_sub(RGWObjVersionTracker* objv_tracker, optional_yield y);
public:
- Sub(RGWUserPubSub *_ps, const std::string& _sub) : ps(_ps), sub(_sub) {
+ Sub(RGWPubSub *_ps, const std::string& _sub) : ps(_ps), sub(_sub) {
ps->get_sub_meta_obj(sub, &sub_meta_obj);
}
} list;
public:
- SubWithEvents(RGWUserPubSub *_ps, const string& _sub) : Sub(_ps, _sub) {}
+ SubWithEvents(RGWPubSub *_ps, const string& _sub) : Sub(_ps, _sub) {}
virtual ~SubWithEvents() = default;
return std::make_shared<SubWithEvents<rgw_pubsub_s3_record>>(this, sub);
}
- void get_user_meta_obj(rgw_raw_obj *obj) const;
+ void get_meta_obj(rgw_raw_obj *obj) const;
void get_bucket_meta_obj(const rgw_bucket& bucket, rgw_raw_obj *obj) const;
void get_sub_meta_obj(const string& name, rgw_raw_obj *obj) const;
- // get all topics defined for the user and populate them into "result"
+ // get all topics (per tenant, if used)) and populate them into "result"
// return 0 on success or if no topics exist, error code otherwise
- int get_user_topics(rgw_pubsub_user_topics *result);
+ int get_topics(rgw_pubsub_topics *result);
// get a topic with its subscriptions by its name and populate it into "result"
// return -ENOENT if the topic does not exists
// return 0 on success, error code otherwise
template <class T>
-int RGWUserPubSub::read(const rgw_raw_obj& obj, T *result, RGWObjVersionTracker *objv_tracker)
+int RGWPubSub::read(const rgw_raw_obj& obj, T* result, RGWObjVersionTracker* objv_tracker)
{
bufferlist bl;
int ret = rgw_get_system_obj(obj_ctx,
}
template <class T>
-int RGWUserPubSub::write(const rgw_raw_obj& obj, const T& info,
- RGWObjVersionTracker *objv_tracker, optional_yield y)
+int RGWPubSub::write(const rgw_raw_obj& obj, const T& info,
+ RGWObjVersionTracker* objv_tracker, optional_yield y)
{
bufferlist bl;
encode(info, bl);
return;
}
- ups.emplace(store, s->owner.get_id());
- auto b = ups->get_bucket(bucket_info.bucket);
+ ps.emplace(store, s->owner.get_id().tenant);
+ auto b = ps->get_bucket(bucket_info.bucket);
ceph_assert(b);
std::string data_bucket_prefix = "";
std::string data_oid_prefix = "";
// get topic information. destination information is stored in the topic
rgw_pubsub_topic topic_info;
- op_ret = ups->get_topic(topic_name, &topic_info);
+ op_ret = ps->get_topic(topic_name, &topic_info);
if (op_ret < 0) {
ldout(s->cct, 1) << "failed to get topic '" << topic_name << "', ret=" << op_ret << dendl;
return;
// generate the internal topic. destination is stored here for the "push-only" case
// when no subscription exists
// ARN is cached to make the "GET" method faster
- op_ret = ups->create_topic(unique_topic_name, topic_info.dest, topic_info.arn, topic_info.opaque_data, y);
+ op_ret = ps->create_topic(unique_topic_name, topic_info.dest, topic_info.arn, topic_info.opaque_data, y);
if (op_ret < 0) {
ldout(s->cct, 1) << "failed to auto-generate unique topic '" << unique_topic_name <<
"', ret=" << op_ret << dendl;
ldout(s->cct, 1) << "failed to auto-generate notification for unique topic '" << unique_topic_name <<
"', ret=" << op_ret << dendl;
// rollback generated topic (ignore return value)
- ups->remove_topic(unique_topic_name, y);
+ ps->remove_topic(unique_topic_name, y);
return;
}
ldout(s->cct, 20) << "successfully auto-generated notification for unique topic '" << unique_topic_name << "'" << dendl;
rgw_pubsub_sub_dest dest = topic_info.dest;
dest.bucket_name = data_bucket_prefix + s->owner.get_id().to_str() + "-" + unique_topic_name;
dest.oid_prefix = data_oid_prefix + notif_name + "/";
- auto sub = ups->get_sub(notif_name);
+ auto sub = ps->get_sub(notif_name);
op_ret = sub->subscribe(unique_topic_name, dest, y, notif_name);
if (op_ret < 0) {
ldout(s->cct, 1) << "failed to auto-generate subscription '" << notif_name << "', ret=" << op_ret << dendl;
// rollback generated notification (ignore return value)
b->remove_notification(unique_topic_name, y);
// rollback generated topic (ignore return value)
- ups->remove_topic(unique_topic_name, y);
+ ps->remove_topic(unique_topic_name, y);
return;
}
ldout(s->cct, 20) << "successfully auto-generated subscription '" << notif_name << "'" << dendl;
return 0;
}
- void remove_notification_by_topic(const std::string& topic_name, const RGWUserPubSub::BucketRef& b, optional_yield y) {
+ void remove_notification_by_topic(const std::string& topic_name, const RGWPubSub::BucketRef& b, optional_yield y) {
op_ret = b->remove_notification(topic_name, y);
if (op_ret < 0) {
ldout(s->cct, 1) << "failed to remove notification of topic '" << topic_name << "', ret=" << op_ret << dendl;
}
- op_ret = ups->remove_topic(topic_name, y);
+ op_ret = ps->remove_topic(topic_name, y);
if (op_ret < 0) {
ldout(s->cct, 1) << "failed to remove auto-generated topic '" << topic_name << "', ret=" << op_ret << dendl;
}
return;
}
- ups.emplace(store, s->owner.get_id());
- auto b = ups->get_bucket(bucket_info.bucket);
+ ps.emplace(store, s->owner.get_id().tenant);
+ auto b = ps->get_bucket(bucket_info.bucket);
ceph_assert(b);
// get all topics on a bucket
if (unique_topic) {
// remove the auto generated subscription according to notification name (if exist)
const auto unique_topic_name = unique_topic->get().topic.name;
- auto sub = ups->get_sub(notif_name);
+ auto sub = ps->get_sub(notif_name);
op_ret = sub->unsubscribe(unique_topic_name, y);
if (op_ret < 0 && op_ret != -ENOENT) {
ldout(s->cct, 1) << "failed to remove auto-generated subscription '" << notif_name << "', ret=" << op_ret << dendl;
for (const auto& topic : bucket_topics.topics) {
// remove the auto generated subscription of the topic (if exist)
rgw_pubsub_topic_subs topic_subs;
- op_ret = ups->get_topic(topic.first, &topic_subs);
+ op_ret = ps->get_topic(topic.first, &topic_subs);
for (const auto& topic_sub_name : topic_subs.subs) {
- auto sub = ups->get_sub(topic_sub_name);
+ auto sub = ps->get_sub(topic_sub_name);
rgw_pubsub_sub_config sub_conf;
op_ret = sub->get_conf(&sub_conf);
if (op_ret < 0) {
};
void RGWPSListNotifs_ObjStore_S3::execute(optional_yield y) {
- ups.emplace(store, s->owner.get_id());
- auto b = ups->get_bucket(bucket_info.bucket);
+ ps.emplace(store, s->owner.get_id().tenant);
+ auto b = ps->get_bucket(bucket_info.bucket);
ceph_assert(b);
// get all topics on a bucket
return topic.topic.dest.stored_secret;
}
-bool topics_has_endpoint_secret(const rgw_pubsub_user_topics& topics) {
+bool topics_has_endpoint_secret(const rgw_pubsub_topics& topics) {
for (const auto& topic : topics.topics) {
if (topic_has_endpoint_secret(topic.second)) return true;
}
return;
}
- ups.emplace(store, s->owner.get_id());
- op_ret = ups->create_topic(topic_name, dest, topic_arn, opaque_data, y);
+ ps.emplace(store, s->owner.get_id().tenant);
+ op_ret = ps->create_topic(topic_name, dest, topic_arn, opaque_data, y);
if (op_ret < 0) {
ldout(s->cct, 1) << "failed to create topic '" << topic_name << "', ret=" << op_ret << dendl;
return;
}
void RGWPSListTopicsOp::execute(optional_yield y) {
- ups.emplace(store, s->owner.get_id());
- op_ret = ups->get_user_topics(&result);
+ ps.emplace(store, s->owner.get_id().tenant);
+ op_ret = ps->get_topics(&result);
// if there are no topics it is not considered an error
op_ret = op_ret == -ENOENT ? 0 : op_ret;
if (op_ret < 0) {
if (op_ret < 0) {
return;
}
- ups.emplace(store, s->owner.get_id());
- op_ret = ups->get_topic(topic_name, &result);
+ ps.emplace(store, s->owner.get_id().tenant);
+ op_ret = ps->get_topic(topic_name, &result);
if (topic_has_endpoint_secret(result) && !rgw_transport_is_secure(s->cct, *(s->info.env))) {
ldout(s->cct, 1) << "topic '" << topic_name << "' contain secret and cannot be sent over insecure transport" << dendl;
op_ret = -EPERM;
if (op_ret < 0) {
return;
}
- ups.emplace(store, s->owner.get_id());
- op_ret = ups->remove_topic(topic_name, y);
+ ps.emplace(store, s->owner.get_id().tenant);
+ op_ret = ps->remove_topic(topic_name, y);
if (op_ret < 0) {
ldout(s->cct, 1) << "failed to remove topic '" << topic_name << ", ret=" << op_ret << dendl;
return;
if (op_ret < 0) {
return;
}
- ups.emplace(store, s->owner.get_id());
- auto sub = ups->get_sub(sub_name);
+ ps.emplace(store, s->owner.get_id().tenant);
+ auto sub = ps->get_sub(sub_name);
op_ret = sub->subscribe(topic_name, dest, y);
if (op_ret < 0) {
ldout(s->cct, 1) << "failed to create subscription '" << sub_name << "', ret=" << op_ret << dendl;
if (op_ret < 0) {
return;
}
- ups.emplace(store, s->owner.get_id());
- auto sub = ups->get_sub(sub_name);
+ ps.emplace(store, s->owner.get_id().tenant);
+ auto sub = ps->get_sub(sub_name);
op_ret = sub->get_conf(&result);
if (subscription_has_endpoint_secret(result) && !rgw_transport_is_secure(s->cct, *(s->info.env))) {
ldout(s->cct, 1) << "subscription '" << sub_name << "' contain secret and cannot be sent over insecure transport" << dendl;
if (op_ret < 0) {
return;
}
- ups.emplace(store, s->owner.get_id());
- auto sub = ups->get_sub(sub_name);
+ ps.emplace(store, s->owner.get_id().tenant);
+ auto sub = ps->get_sub(sub_name);
op_ret = sub->unsubscribe(topic_name, y);
if (op_ret < 0) {
ldout(s->cct, 1) << "failed to remove subscription '" << sub_name << "', ret=" << op_ret << dendl;
if (op_ret < 0) {
return;
}
- ups.emplace(store, s->owner.get_id());
- auto sub = ups->get_sub_with_events(sub_name);
+ ps.emplace(store, s->owner.get_id().tenant);
+ auto sub = ps->get_sub_with_events(sub_name);
op_ret = sub->remove_event(event_id);
if (op_ret < 0) {
ldout(s->cct, 1) << "failed to ack event on subscription '" << sub_name << "', ret=" << op_ret << dendl;
if (op_ret < 0) {
return;
}
- ups.emplace(store, s->owner.get_id());
- sub = ups->get_sub_with_events(sub_name);
+ ps.emplace(store, s->owner.get_id().tenant);
+ sub = ps->get_sub_with_events(sub_name);
if (!sub) {
op_ret = -ENOENT;
ldout(s->cct, 1) << "failed to get subscription '" << sub_name << "' for events, ret=" << op_ret << dendl;
// create a topic
class RGWPSCreateTopicOp : public RGWDefaultResponseOp {
protected:
- std::optional<RGWUserPubSub> ups;
+ std::optional<RGWPubSub> ps;
std::string topic_name;
rgw_pubsub_sub_dest dest;
std::string topic_arn;
// list all topics
class RGWPSListTopicsOp : public RGWOp {
protected:
- std::optional<RGWUserPubSub> ups;
- rgw_pubsub_user_topics result;
+ std::optional<RGWPubSub> ps;
+ rgw_pubsub_topics result;
public:
int verify_permission(optional_yield) override {
class RGWPSGetTopicOp : public RGWOp {
protected:
std::string topic_name;
- std::optional<RGWUserPubSub> ups;
+ std::optional<RGWPubSub> ps;
rgw_pubsub_topic_subs result;
virtual int get_params() = 0;
class RGWPSDeleteTopicOp : public RGWDefaultResponseOp {
protected:
string topic_name;
- std::optional<RGWUserPubSub> ups;
+ std::optional<RGWPubSub> ps;
virtual int get_params() = 0;
protected:
std::string sub_name;
std::string topic_name;
- std::optional<RGWUserPubSub> ups;
+ std::optional<RGWPubSub> ps;
rgw_pubsub_sub_dest dest;
virtual int get_params() = 0;
class RGWPSGetSubOp : public RGWOp {
protected:
std::string sub_name;
- std::optional<RGWUserPubSub> ups;
+ std::optional<RGWPubSub> ps;
rgw_pubsub_sub_config result;
virtual int get_params() = 0;
protected:
std::string sub_name;
std::string topic_name;
- std::optional<RGWUserPubSub> ups;
+ std::optional<RGWPubSub> ps;
virtual int get_params() = 0;
protected:
std::string sub_name;
std::string event_id;
- std::optional<RGWUserPubSub> ups;
+ std::optional<RGWPubSub> ps;
virtual int get_params() = 0;
int max_entries{0};
std::string sub_name;
std::string marker;
- std::optional<RGWUserPubSub> ups;
- RGWUserPubSub::SubRef sub;
+ std::optional<RGWPubSub> ps;
+ RGWPubSub::SubRef sub;
virtual int get_params() = 0;
// notification creation
class RGWPSCreateNotifOp : public RGWDefaultResponseOp {
protected:
- std::optional<RGWUserPubSub> ups;
+ std::optional<RGWPubSub> ps;
string bucket_name;
RGWBucketInfo bucket_info;
// delete a notification
class RGWPSDeleteNotifOp : public RGWDefaultResponseOp {
protected:
- std::optional<RGWUserPubSub> ups;
+ std::optional<RGWPubSub> ps;
std::string bucket_name;
RGWBucketInfo bucket_info;
protected:
std::string bucket_name;
RGWBucketInfo bucket_info;
- std::optional<RGWUserPubSub> ups;
+ std::optional<RGWPubSub> ps;
virtual int get_params() = 0;
} else {
using ReadInfoCR = RGWSimpleRadosReadCR<rgw_pubsub_sub_config>;
yield {
- RGWUserPubSub ups(sync_env->store, owner);
+ RGWPubSub ps(sync_env->store, owner.tenant);
rgw_raw_obj obj;
- ups.get_sub_meta_obj(sub_name, &obj);
+ ps.get_sub_meta_obj(sub_name, &obj);
bool empty_on_enoent = false;
call(new ReadInfoCR(sync_env->async_rados, sync_env->store->svc()->sysobj,
obj,
rgw_obj_key key;
rgw::notify::EventType event_type;
- RGWUserPubSub ups;
+ RGWPubSub ps;
rgw_raw_obj bucket_obj;
rgw_raw_obj user_obj;
rgw_pubsub_bucket_topics bucket_topics;
- rgw_pubsub_user_topics user_topics;
+ rgw_pubsub_topics user_topics;
TopicsRef *topics;
public:
RGWPSFindBucketTopicsCR(RGWDataSyncCtx *_sc,
bucket(_bucket),
key(_key),
event_type(_event_type),
- ups(sync_env->store, owner),
+ ps(sync_env->store, owner.tenant),
topics(_topics) {
*topics = std::make_shared<vector<PSTopicConfigRef> >();
}
int operate() override {
reenter(this) {
- ups.get_bucket_meta_obj(bucket, &bucket_obj);
- ups.get_user_meta_obj(&user_obj);
+ ps.get_bucket_meta_obj(bucket, &bucket_obj);
+ ps.get_meta_obj(&user_obj);
using ReadInfoCR = RGWSimpleRadosReadCR<rgw_pubsub_bucket_topics>;
yield {
ldout(sync_env->cct, 20) << "RGWPSFindBucketTopicsCR(): found " << bucket_topics.topics.size() << " topics for bucket " << bucket << dendl;
if (!bucket_topics.topics.empty()) {
- using ReadUserTopicsInfoCR = RGWSimpleRadosReadCR<rgw_pubsub_user_topics>;
+ using ReadUserTopicsInfoCR = RGWSimpleRadosReadCR<rgw_pubsub_topics>;
yield {
bool empty_on_enoent = true;
call(new ReadUserTopicsInfoCR(sync_env->async_rados, sync_env->store->svc()->sysobj,
sub_name = s->object->get_name();
marker = s->info.args.get("marker");
const int ret = s->info.args.get_int("max-entries", &max_entries,
- RGWUserPubSub::Sub::DEFAULT_MAX_EVENTS);
+ RGWPubSub::Sub::DEFAULT_MAX_EVENTS);
if (ret < 0) {
ldout(s->cct, 1) << "failed to parse 'max-entries' param" << dendl;
return -EINVAL;
void RGWPSCreateNotif_ObjStore::execute(optional_yield y)
{
- ups.emplace(store, s->owner.get_id());
+ ps.emplace(store, s->owner.get_id().tenant);
- auto b = ups->get_bucket(bucket_info.bucket);
+ auto b = ps->get_bucket(bucket_info.bucket);
op_ret = b->create_notification(topic_name, events, y);
if (op_ret < 0) {
ldout(s->cct, 1) << "failed to create notification for topic '" << topic_name << "', ret=" << op_ret << dendl;
return;
}
- ups.emplace(store, s->owner.get_id());
- auto b = ups->get_bucket(bucket_info.bucket);
+ ps.emplace(store, s->owner.get_id().tenant);
+ auto b = ps->get_bucket(bucket_info.bucket);
op_ret = b->remove_notification(topic_name, y);
if (op_ret < 0) {
ldout(s->cct, 1) << "failed to remove notification from topic '" << topic_name << "', ret=" << op_ret << dendl;
void RGWPSListNotifs_ObjStore::execute(optional_yield y)
{
- ups.emplace(store, s->owner.get_id());
- auto b = ups->get_bucket(bucket_info.bucket);
+ ps.emplace(store, s->owner.get_id().tenant);
+ auto b = ps->get_bucket(bucket_info.bucket);
op_ret = b->get_topics(&result);
if (op_ret < 0) {
ldout(s->cct, 1) << "failed to get topics, ret=" << op_ret << dendl;