OPT_PUBSUB_TOPICS_LIST,
OPT_PUBSUB_TOPIC_CREATE,
OPT_PUBSUB_TOPIC_RM,
+ OPT_PUBSUB_NOTIFICATION_CREATE,
+ OPT_PUBSUB_NOTIFICATION_RM,
OPT_PUBSUB_SUB_GET,
OPT_PUBSUB_SUB_CREATE,
OPT_PUBSUB_SUB_RM,
strcmp(cmd, "mdlog") == 0 ||
strcmp(cmd, "metadata") == 0 ||
strcmp(cmd, "mfa") == 0 ||
+ strcmp(cmd, "notification") == 0 ||
strcmp(cmd, "object") == 0 ||
strcmp(cmd, "objects") == 0 ||
strcmp(cmd, "olh") == 0 ||
return OPT_PUBSUB_TOPIC_CREATE;
if (strcmp(cmd, "rm") == 0)
return OPT_PUBSUB_TOPIC_RM;
+ } else if (strcmp(prev_cmd, "notification") == 0) {
+ if (strcmp(cmd, "create") == 0)
+ return OPT_PUBSUB_NOTIFICATION_CREATE;
+ if (strcmp(cmd, "rm") == 0)
+ return OPT_PUBSUB_NOTIFICATION_RM;
} else if (strcmp(prev_cmd, "sub") == 0) {
if (strcmp(cmd, "get") == 0)
return OPT_PUBSUB_SUB_GET;
rgw_bucket bucket;
- rgw_pubsub_user_topics result;
-
if (!bucket_name.empty()) {
+ rgw_pubsub_bucket_topics result;
RGWBucketInfo bucket_info;
int ret = init_bucket(tenant, bucket_name, bucket_id, bucket_info, bucket);
if (ret < 0) {
return -ret;
}
- ret = ups.get_bucket_topics(bucket_info.bucket, &result);
+ auto b = ups.get_bucket(bucket_info.bucket);
+ ret = b->get_topics(&result);
if (ret < 0) {
cerr << "ERROR: could not get topics: " << cpp_strerror(-ret) << std::endl;
return -ret;
}
+ encode_json("result", result, formatter);
} else {
- int ret = ups.get_topics(&result);
+ rgw_pubsub_user_topics result;
+ int ret = ups.get_user_topics(&result);
if (ret < 0) {
cerr << "ERROR: could not get topics: " << cpp_strerror(-ret) << std::endl;
return -ret;
}
+ encode_json("result", result, formatter);
}
- encode_json("result", result, formatter);
formatter->flush(cout);
}
if (opt_cmd == OPT_PUBSUB_TOPIC_CREATE) {
+ if (get_tier_type(store) != "pubsub") {
+ cerr << "ERROR: only pubsub tier type supports this command" << std::endl;
+ return EINVAL;
+ }
+ if (topic_name.empty()) {
+ cerr << "ERROR: topic name was not provided (via --topic)" << std::endl;
+ return EINVAL;
+ }
+ if (user_id.empty()) {
+ cerr << "ERROR: user id was not provided (via --uid)" << std::endl;
+ return EINVAL;
+ }
+ RGWUserInfo& user_info = user_op.get_user_info();
+ RGWUserPubSub ups(store, user_info.user_id);
+
+ ret = ups.create_topic(topic_name);
+ if (ret < 0) {
+ cerr << "ERROR: could not create topic: " << cpp_strerror(-ret) << std::endl;
+ return -ret;
+ }
+ }
+
+ if (opt_cmd == OPT_PUBSUB_NOTIFICATION_CREATE) {
if (get_tier_type(store) != "pubsub") {
cerr << "ERROR: only pubsub tier type supports this command" << std::endl;
return EINVAL;
return -ret;
}
- ret = ups.create_topic(topic_name, bucket_info.bucket);
+ auto b = ups.get_bucket(bucket_info.bucket);
+ ret = b->create_notification(topic_name);
if (ret < 0) {
- cerr << "ERROR: could not create topic: " << cpp_strerror(-ret) << std::endl;
+ cerr << "ERROR: could not publish bucket: " << cpp_strerror(-ret) << std::endl;
return -ret;
}
}
- if (opt_cmd == OPT_PUBSUB_TOPIC_RM) {
+ if (opt_cmd == OPT_PUBSUB_NOTIFICATION_RM) {
if (get_tier_type(store) != "pubsub") {
cerr << "ERROR: only pubsub tier type supports this command" << std::endl;
return EINVAL;
return -ret;
}
+ auto b = ups.get_bucket(bucket_info.bucket);
+ ret = b->remove_notification(topic_name);
+ if (ret < 0) {
+ cerr << "ERROR: could not publish bucket: " << cpp_strerror(-ret) << std::endl;
+ return -ret;
+ }
+ }
+
+ if (opt_cmd == OPT_PUBSUB_TOPIC_RM) {
+ if (get_tier_type(store) != "pubsub") {
+ cerr << "ERROR: only pubsub tier type supports this command" << std::endl;
+ return EINVAL;
+ }
+ if (topic_name.empty()) {
+ cerr << "ERROR: topic name was not provided (via --topic)" << std::endl;
+ return EINVAL;
+ }
+ if (user_id.empty()) {
+ cerr << "ERROR: user id was not provided (via --uid)" << std::endl;
+ return EINVAL;
+ }
+ RGWUserInfo& user_info = user_op.get_user_info();
+ RGWUserPubSub ups(store, user_info.user_id);
+
ret = ups.remove_topic(topic_name);
if (ret < 0) {
cerr << "ERROR: could not remove topic: " << cpp_strerror(-ret) << std::endl;
RGWUserInfo& user_info = user_op.get_user_info();
RGWUserPubSub ups(store, user_info.user_id);
- rgw_pubsub_user_sub_config sub;
+ rgw_pubsub_sub_config sub_conf;
- ret = ups.get_sub(sub_name, &sub);
+ auto sub = ups.get_sub(sub_name);
+ ret = sub->get_conf(&sub_conf);
if (ret < 0) {
cerr << "ERROR: could not get subscription info: " << cpp_strerror(-ret) << std::endl;
return -ret;
}
- encode_json("sub", sub, formatter);
+ encode_json("sub", sub_conf, formatter);
formatter->flush(cout);
}
RGWUserInfo& user_info = user_op.get_user_info();
RGWUserPubSub ups(store, user_info.user_id);
- rgw_pubsub_user_topic_info topic;
+ rgw_pubsub_topic_subs topic;
int ret = ups.get_topic(topic_name, &topic);
if (ret < 0) {
cerr << "ERROR: topic not found" << std::endl;
return EINVAL;
}
- rgw_pubsub_user_sub_config sub;
auto& tier_config = get_tier_config(store);
- rgw_pubsub_user_sub_dest dest_config;
+ rgw_pubsub_sub_dest dest_config;
dest_config.bucket_name = sub_dest_bucket;
dest_config.oid_prefix = sub_oid_prefix;
dest_config.push_endpoint = sub_push_endpoint;
if (dest_config.bucket_name.empty()) {
- dest_config.bucket_name = string(tier_config["data_bucket_prefix"]) + topic.user.to_str() + "-" + topic.topic.name;
+ dest_config.bucket_name = string(tier_config["data_bucket_prefix"]) + user_info.user_id.to_str() + "-" + topic.topic.name;
}
if (dest_config.oid_prefix.empty()) {
dest_config.oid_prefix = tier_config["data_oid_prefix"];
}
- ret = ups.add_sub(sub_name, topic_name, dest_config);
+ auto sub = ups.get_sub(sub_name);
+ ret = sub->subscribe(topic_name, dest_config);
if (ret < 0) {
cerr << "ERROR: could not store subscription info: " << cpp_strerror(-ret) << std::endl;
return -ret;
RGWUserInfo& user_info = user_op.get_user_info();
RGWUserPubSub ups(store, user_info.user_id);
- rgw_pubsub_user_sub_config sub;
-
- ret = ups.get_sub(sub_name, &sub);
+ auto sub = ups.get_sub(sub_name);
+ ret = sub->unsubscribe(topic_name);
if (ret < 0) {
cerr << "ERROR: could not get subscription info: " << cpp_strerror(-ret) << std::endl;
return -ret;
}
- encode_json("sub", sub, formatter);
- formatter->flush(cout);
}
if (opt_cmd == OPT_PUBSUB_SUB_PULL) {
RGWUserInfo& user_info = user_op.get_user_info();
RGWUserPubSub ups(store, user_info.user_id);
- RGWUserPubSub::list_events_result result;
+ RGWUserPubSub::Sub::list_events_result result;
if (!max_entries_specified) {
max_entries = 100;
}
- ret = ups.list_events(sub_name, marker, max_entries, &result);
+ auto sub = ups.get_sub(sub_name);
+ ret = sub->list_events(marker, max_entries, &result);
if (ret < 0) {
cerr << "ERROR: could not list events: " << cpp_strerror(-ret) << std::endl;
return -ret;
RGWUserInfo& user_info = user_op.get_user_info();
RGWUserPubSub ups(store, user_info.user_id);
- ret = ups.remove_event(sub_name, event_id);
+ auto sub = ups.get_sub(sub_name);
+ ret = sub->remove_event(event_id);
if (ret < 0) {
cerr << "ERROR: could not remove event: " << cpp_strerror(-ret) << std::endl;
return -ret;
RGW_OP_PUBSUB_SUB_DELETE,
RGW_OP_PUBSUB_SUB_PULL,
RGW_OP_PUBSUB_SUB_ACK,
+ RGW_OP_PUBSUB_NOTIF_CREATE,
+ RGW_OP_PUBSUB_NOTIF_DELETE,
+ RGW_OP_PUBSUB_NOTIF_LIST,
};
class RGWAccessControlPolicy;
}
}
-void rgw_pubsub_user_topic::dump(Formatter *f) const
+void rgw_pubsub_topic::dump(Formatter *f) const
{
+ encode_json("user", user, f);
encode_json("name", name, f);
- encode_json("bucket", bucket, f);
}
-void rgw_pubsub_user_topic_info::dump(Formatter *f) const
+void rgw_pubsub_topic_subs::dump(Formatter *f) const
{
- encode_json("user", user, f);
encode_json("topic", topic, f);
encode_json("subs", subs, f);
}
+void rgw_pubsub_bucket_topics::dump(Formatter *f) const
+{
+ Formatter::ObjectSection s(*f, "topics");
+ for (auto& t : topics) {
+ encode_json(t.first.c_str(), t.second, f);
+ }
+}
+
void rgw_pubsub_user_topics::dump(Formatter *f) const
{
Formatter::ObjectSection s(*f, "topics");
}
}
-void rgw_pubsub_user_sub_dest::dump(Formatter *f) const
+void rgw_pubsub_sub_dest::dump(Formatter *f) const
{
encode_json("bucket_name", bucket_name, f);
encode_json("oid_prefix", oid_prefix, f);
encode_json("push_endpoint", push_endpoint, f);
}
-void rgw_pubsub_user_sub_config::dump(Formatter *f) const
+void rgw_pubsub_sub_config::dump(Formatter *f) const
{
encode_json("user", user, f);
encode_json("name", name, f);
return 0;
}
-int RGWUserPubSub::get_topics(rgw_pubsub_user_topics *result)
+int RGWUserPubSub::read_user_topics(rgw_pubsub_user_topics *result, RGWObjVersionTracker *objv_tracker)
{
- rgw_raw_obj obj;
- get_user_meta_obj(&obj);
+ int ret = read(user_meta_obj, result, objv_tracker);
+ if (ret < 0 && ret != -ENOENT) {
+ ldout(store->ctx(), 0) << "ERROR: failed to read topics info: ret=" << ret << dendl;
+ return ret;
+ }
+ return 0;
+}
- RGWObjVersionTracker objv_tracker;
- int ret = read(obj, result, &objv_tracker);
+int RGWUserPubSub::write_user_topics(const rgw_pubsub_user_topics& topics, RGWObjVersionTracker *objv_tracker)
+{
+ int ret = write(user_meta_obj, topics, objv_tracker);
if (ret < 0 && ret != -ENOENT) {
ldout(store->ctx(), 0) << "ERROR: failed to read topics info: ret=" << ret << dendl;
return ret;
return 0;
}
-int RGWUserPubSub::get_bucket_topics(const rgw_bucket& bucket, rgw_pubsub_user_topics *result)
+int RGWUserPubSub::get_user_topics(rgw_pubsub_user_topics *result)
{
- rgw_raw_obj obj;
- get_bucket_meta_obj(bucket, &obj);
+ return read_user_topics(result, nullptr);
+}
- RGWObjVersionTracker objv_tracker;
- int ret = read(obj, result, &objv_tracker);
+int RGWUserPubSub::Bucket::read_topics(rgw_pubsub_bucket_topics *result, RGWObjVersionTracker *objv_tracker)
+{
+ int ret = ps->read(bucket_meta_obj, result, objv_tracker);
if (ret < 0 && ret != -ENOENT) {
- ldout(store->ctx(), 0) << "ERROR: failed to read topics info: ret=" << ret << dendl;
+ ldout(ps->store->ctx(), 0) << "ERROR: failed to read bucket topics info: ret=" << ret << dendl;
+ return ret;
+ }
+ return 0;
+}
+
+int RGWUserPubSub::Bucket::write_topics(const rgw_pubsub_bucket_topics& topics, RGWObjVersionTracker *objv_tracker)
+{
+ int ret = ps->write(bucket_meta_obj, topics, objv_tracker);
+ if (ret < 0) {
+ ldout(ps->store->ctx(), 0) << "ERROR: failed to write bucket topics info: ret=" << ret << dendl;
return ret;
}
+
return 0;
}
-int RGWUserPubSub::get_topic(const string& name, rgw_pubsub_user_topic_info *result)
+int RGWUserPubSub::Bucket::get_topics(rgw_pubsub_bucket_topics *result)
{
- rgw_raw_obj obj;
- get_user_meta_obj(&obj);
+ return read_topics(result, nullptr);
+}
- RGWObjVersionTracker objv_tracker;
+int RGWUserPubSub::get_topic(const string& name, rgw_pubsub_topic_subs *result)
+{
rgw_pubsub_user_topics topics;
-
- int ret = read(obj, &topics, &objv_tracker);
+ int ret = get_user_topics(&topics);
if (ret < 0) {
ldout(store->ctx(), 0) << "ERROR: failed to read topics info: ret=" << ret << dendl;
return ret;
}
-int RGWUserPubSub::create_topic(const string& name, const rgw_bucket& bucket)
+int RGWUserPubSub::Bucket::create_notification(const string& topic_name)
{
- rgw_raw_obj obj;
- get_user_meta_obj(&obj);
+ rgw_pubsub_topic_subs user_topic_info;
+ RGWRados *store = ps->store;
+
+ int ret = ps->get_topic(topic_name, &user_topic_info);
+ if (ret < 0) {
+ ldout(store->ctx(), 0) << "ERROR: failed to read topic info: ret=" << ret << dendl;
+ return ret;
+ }
RGWObjVersionTracker objv_tracker;
- rgw_pubsub_user_topics topics;
+ rgw_pubsub_bucket_topics bucket_topics;
- int ret = read(obj, &topics, &objv_tracker);
+ ret = read_topics(&bucket_topics, &objv_tracker);
if (ret < 0 && ret != -ENOENT) {
- ldout(store->ctx(), 0) << "ERROR: failed to read topics info: ret=" << ret << dendl;
+ ldout(store->ctx(), 0) << "ERROR: failed to read bucket topics info: ret=" << ret << dendl;
return ret;
}
- rgw_pubsub_user_topic_info& new_topic = topics.topics[name];
- new_topic.user = user;
- new_topic.topic.name = name;
- new_topic.topic.bucket = bucket;
+ bucket_topics.topics[topic_name] = user_topic_info.topic;
- ret = write(obj, topics, &objv_tracker);
+ ret = write_topics(bucket_topics, &objv_tracker);
if (ret < 0) {
ldout(store->ctx(), 0) << "ERROR: failed to write topics info: ret=" << ret << dendl;
return ret;
}
- ret = update_bucket(topics, bucket);
+ return 0;
+}
+
+int RGWUserPubSub::Bucket::remove_notification(const string& topic_name)
+{
+ rgw_pubsub_topic_subs user_topic_info;
+ RGWRados *store = ps->store;
+
+ int ret = ps->get_topic(topic_name, &user_topic_info);
+ if (ret < 0) {
+ ldout(store->ctx(), 0) << "ERROR: failed to read topic info: ret=" << ret << dendl;
+ return ret;
+ }
+
+ RGWObjVersionTracker objv_tracker;
+ rgw_pubsub_bucket_topics bucket_topics;
+
+ ret = read_topics(&bucket_topics, &objv_tracker);
+ if (ret < 0 && ret != -ENOENT) {
+ ldout(store->ctx(), 0) << "ERROR: failed to read bucket topics info: ret=" << ret << dendl;
+ return ret;
+ }
+
+ bucket_topics.topics.erase(topic_name);
+
+ ret = write_topics(bucket_topics, &objv_tracker);
if (ret < 0) {
- ldout(store->ctx(), 0) << "ERROR: failed to write bucket topics info: ret=" << ret << dendl;
+ ldout(store->ctx(), 0) << "ERROR: failed to write topics info: ret=" << ret << dendl;
return ret;
}
return 0;
}
-int RGWUserPubSub::remove_topic(const string& name)
+int RGWUserPubSub::create_topic(const string& name)
{
- rgw_raw_obj obj;
- get_user_meta_obj(&obj);
-
RGWObjVersionTracker objv_tracker;
rgw_pubsub_user_topics topics;
- int ret = read(obj, &topics, &objv_tracker);
+ int ret = read_user_topics(&topics, &objv_tracker);
if (ret < 0 && ret != -ENOENT) {
ldout(store->ctx(), 0) << "ERROR: failed to read topics info: ret=" << ret << dendl;
return ret;
}
- rgw_bucket bucket;
- auto t = topics.topics.find(name);
- if (t != topics.topics.end()) {
- bucket = t->second.topic.bucket;
- }
-
- topics.topics.erase(name);
+ rgw_pubsub_topic_subs& new_topic = topics.topics[name];
+ new_topic.topic.user = user;
+ new_topic.topic.name = name;
- ret = write(obj, topics, &objv_tracker);
+ ret = write_user_topics(topics, &objv_tracker);
if (ret < 0) {
ldout(store->ctx(), 0) << "ERROR: failed to write topics info: ret=" << ret << dendl;
return ret;
}
- if (bucket.name.empty()) {
- return 0;
+ return 0;
+}
+
+int RGWUserPubSub::remove_topic(const string& name)
+{
+ RGWObjVersionTracker objv_tracker;
+ rgw_pubsub_user_topics topics;
+
+ int ret = read_user_topics(&topics, &objv_tracker);
+ if (ret < 0 && ret != -ENOENT) {
+ ldout(store->ctx(), 0) << "ERROR: failed to read topics info: ret=" << ret << dendl;
+ return ret;
}
- ret = update_bucket(topics, bucket);
+ topics.topics.erase(name);
+
+ ret = write_user_topics(topics, &objv_tracker);
if (ret < 0) {
- ldout(store->ctx(), 0) << "ERROR: failed to write bucket topics info: ret=" << ret << dendl;
+ ldout(store->ctx(), 0) << "ERROR: failed to write topics info: ret=" << ret << dendl;
return ret;
}
return 0;
}
-int RGWUserPubSub::update_bucket(const rgw_pubsub_user_topics& topics, const rgw_bucket& bucket)
+int RGWUserPubSub::Sub::read_sub(rgw_pubsub_sub_config *result, RGWObjVersionTracker *objv_tracker)
{
- rgw_pubsub_user_topics bucket_topics;
- for (auto& t : topics.topics) {
- if (t.second.topic.bucket == bucket) {
- bucket_topics.topics.insert(t);
- }
+ int ret = ps->read(sub_meta_obj, result, objv_tracker);
+ if (ret < 0 && ret != -ENOENT) {
+ ldout(ps->store->ctx(), 0) << "ERROR: failed to read subscription info: ret=" << ret << dendl;
+ return ret;
}
+ return 0;
+}
- rgw_raw_obj bobj;
- get_bucket_meta_obj(bucket, &bobj);
- int ret = write(bobj, bucket_topics, nullptr);
+int RGWUserPubSub::Sub::write_sub(const rgw_pubsub_sub_config& sub_conf, RGWObjVersionTracker *objv_tracker)
+{
+ int ret = ps->write(sub_meta_obj, sub_conf, objv_tracker);
if (ret < 0) {
- ldout(store->ctx(), 0) << "ERROR: failed to write topics info: ret=" << ret << dendl;
+ ldout(ps->store->ctx(), 0) << "ERROR: failed to write subscription info: ret=" << ret << dendl;
return ret;
}
+
return 0;
}
-int RGWUserPubSub::get_sub(const string& name, rgw_pubsub_user_sub_config *result)
+int RGWUserPubSub::Sub::remove_sub(RGWObjVersionTracker *objv_tracker)
{
- rgw_raw_obj obj;
- get_sub_meta_obj(name, &obj);
- int ret = read(obj, result, nullptr);
+ int ret = ps->remove(sub_meta_obj, objv_tracker);
if (ret < 0) {
- ldout(store->ctx(), 0) << "ERROR: failed to read subscription info: ret=" << ret << dendl;
+ ldout(ps->store->ctx(), 0) << "ERROR: failed to write subscription info: ret=" << ret << dendl;
return ret;
}
+
return 0;
}
-int RGWUserPubSub::add_sub(const string& name, const string& topic, const rgw_pubsub_user_sub_dest& dest)
+int RGWUserPubSub::Sub::get_conf(rgw_pubsub_sub_config *result)
{
- rgw_raw_obj obj;
- get_user_meta_obj(&obj);
+ return read_sub(result, nullptr);
+}
- RGWObjVersionTracker objv_tracker;
+int RGWUserPubSub::Sub::subscribe(const string& topic, const rgw_pubsub_sub_dest& dest)
+{
+ RGWObjVersionTracker user_objv_tracker;
rgw_pubsub_user_topics topics;
+ RGWRados *store = ps->store;
- int ret = read(obj, &topics, &objv_tracker);
+ int ret = ps->read_user_topics(&topics, &user_objv_tracker);
if (ret < 0) {
ldout(store->ctx(), 0) << "ERROR: failed to read topics info: ret=" << ret << dendl;
return ret;
auto& t = iter->second;
- rgw_pubsub_user_sub_config sub_conf;
+ rgw_pubsub_sub_config sub_conf;
- sub_conf.user = user;
- sub_conf.name = name;
+ sub_conf.user = ps->user;
+ sub_conf.name = sub;
sub_conf.topic = topic;
sub_conf.dest = dest;
- t.subs.insert(name);
+ t.subs.insert(sub);
- ret = write(obj, topics, &objv_tracker);
+ ret = ps->write_user_topics(topics, &user_objv_tracker);
if (ret < 0) {
ldout(store->ctx(), 0) << "ERROR: failed to write topics info: ret=" << ret << dendl;
return ret;
}
- ret = update_bucket(topics, t.topic.bucket);
- if (ret < 0) {
- ldout(store->ctx(), 0) << "ERROR: failed to write bucket topics info: ret=" << ret << dendl;
- return ret;
- }
-
- rgw_raw_obj sobj;
- get_sub_meta_obj(name, &sobj);
- ret = write(sobj, sub_conf, nullptr);
+ ret = write_sub(sub_conf, nullptr);
if (ret < 0) {
ldout(store->ctx(), 0) << "ERROR: failed to write subscription info: ret=" << ret << dendl;
return ret;
return 0;
}
-int RGWUserPubSub::remove_sub(const string& name, const string& _topic)
+int RGWUserPubSub::Sub::unsubscribe(const string& _topic)
{
string topic = _topic;
-
RGWObjVersionTracker sobjv_tracker;
- rgw_raw_obj sobj;
- get_sub_meta_obj(name, &sobj);
+ RGWRados *store = ps->store;
if (topic.empty()) {
- rgw_pubsub_user_sub_config sub_conf;
- int ret = read(sobj, &sub_conf, &sobjv_tracker);
+ rgw_pubsub_sub_config sub_conf;
+ int ret = read_sub(&sub_conf, &sobjv_tracker);
if (ret < 0) {
ldout(store->ctx(), 0) << "ERROR: failed to read subscription info: ret=" << ret << dendl;
return ret;
topic = sub_conf.topic;
}
- rgw_raw_obj obj;
- get_user_meta_obj(&obj);
-
RGWObjVersionTracker objv_tracker;
rgw_pubsub_user_topics topics;
- int ret = read(obj, &topics, &objv_tracker);
+ int ret = ps->read_user_topics(&topics, &objv_tracker);
if (ret < 0) {
ldout(store->ctx(), 0) << "ERROR: failed to read topics info: ret=" << ret << dendl;
}
if (ret >= 0) {
auto iter = topics.topics.find(topic);
- if (iter == topics.topics.end()) {
- ldout(store->ctx(), 20) << "ERROR: cannot add subscription to topic: topic not found" << dendl;
- } else {
+ if (iter != topics.topics.end()) {
auto& t = iter->second;
- t.subs.erase(name);
+ t.subs.erase(sub);
- ret = write(obj, topics, &objv_tracker);
+ ret = ps->write_user_topics(topics, &objv_tracker);
if (ret < 0) {
ldout(store->ctx(), 0) << "ERROR: failed to write topics info: ret=" << ret << dendl;
return ret;
}
}
- ret = remove(sobj, &sobjv_tracker);
+ ret = remove_sub(&sobjv_tracker);
if (ret < 0) {
- ldout(store->ctx(), 0) << "ERROR: failed to write subscription info: ret=" << ret << dendl;
+ ldout(store->ctx(), 0) << "ERROR: failed to delete subscription info: ret=" << ret << dendl;
return ret;
}
return 0;
}
-void RGWUserPubSub::list_events_result::dump(Formatter *f) const
+void RGWUserPubSub::Sub::list_events_result::dump(Formatter *f) const
{
encode_json("next_marker", next_marker, f);
encode_json("is_truncated", is_truncated, f);
}
}
-int RGWUserPubSub::list_events(const string& sub_name,
- const string& marker, int max_events,
+int RGWUserPubSub::Sub::list_events(const string& marker, int max_events,
list_events_result *result)
{
- rgw_pubsub_user_sub_config sub_conf;
- int ret = get_sub(sub_name, &sub_conf);
+ RGWRados *store = ps->store;
+ rgw_pubsub_sub_config sub_conf;
+ int ret = get_conf(&sub_conf);
if (ret < 0) {
ldout(store->ctx(), 0) << "ERROR: failed to read sub config: ret=" << ret << dendl;
return ret;
return 0;
}
-int RGWUserPubSub::remove_event(const string& sub_name, const string& event_id)
+int RGWUserPubSub::Sub::remove_event(const string& event_id)
{
- rgw_pubsub_user_sub_config sub_conf;
- int ret = get_sub(sub_name, &sub_conf);
+ RGWRados *store = ps->store;
+ rgw_pubsub_sub_config sub_conf;
+ int ret = get_conf(&sub_conf);
if (ret < 0) {
ldout(store->ctx(), 0) << "ERROR: failed to read sub config: ret=" << ret << dendl;
return ret;
};
WRITE_CLASS_ENCODER(rgw_pubsub_event)
-struct rgw_pubsub_user_sub_dest {
+struct rgw_pubsub_sub_dest {
string bucket_name;
string oid_prefix;
string push_endpoint;
void dump(Formatter *f) const;
};
-WRITE_CLASS_ENCODER(rgw_pubsub_user_sub_dest)
+WRITE_CLASS_ENCODER(rgw_pubsub_sub_dest)
-struct rgw_pubsub_user_sub_config {
+struct rgw_pubsub_sub_config {
rgw_user user;
string name;
string topic;
- rgw_pubsub_user_sub_dest dest;
+ rgw_pubsub_sub_dest dest;
void encode(bufferlist& bl) const {
ENCODE_START(1, 1, bl);
void dump(Formatter *f) const;
};
-WRITE_CLASS_ENCODER(rgw_pubsub_user_sub_config)
+WRITE_CLASS_ENCODER(rgw_pubsub_sub_config)
-struct rgw_pubsub_user_topic {
+struct rgw_pubsub_topic {
+ rgw_user user;
string name;
- rgw_bucket bucket;
void encode(bufferlist& bl) const {
ENCODE_START(1, 1, bl);
+ encode(user, bl);
encode(name, bl);
- encode(bucket, bl);
ENCODE_FINISH(bl);
}
void decode(bufferlist::const_iterator& bl) {
DECODE_START(1, bl);
+ decode(user, bl);
decode(name, bl);
- decode(bucket, bl);
DECODE_FINISH(bl);
}
- const string& to_str() const {
- return name;
+ string to_str() const {
+ return user.to_str() + "/" + name;
}
+
void dump(Formatter *f) const;
+
+ bool operator<(const rgw_pubsub_topic& t) const {
+ return to_str().compare(t.to_str());
+ }
};
-WRITE_CLASS_ENCODER(rgw_pubsub_user_topic)
+WRITE_CLASS_ENCODER(rgw_pubsub_topic)
-struct rgw_pubsub_user_topic_info {
- rgw_user user;
- rgw_pubsub_user_topic topic;
+struct rgw_pubsub_topic_subs {
+ rgw_pubsub_topic topic;
set<string> subs;
void encode(bufferlist& bl) const {
ENCODE_START(1, 1, bl);
- encode(user, bl);
encode(topic, bl);
encode(subs, bl);
ENCODE_FINISH(bl);
void decode(bufferlist::const_iterator& bl) {
DECODE_START(1, bl);
- decode(user, bl);
decode(topic, bl);
decode(subs, bl);
DECODE_FINISH(bl);
}
- string to_str() const {
- return user.to_str() + "/" + topic.name;
- }
-
void dump(Formatter *f) const;
+};
+WRITE_CLASS_ENCODER(rgw_pubsub_topic_subs)
- bool operator<(const rgw_pubsub_user_topic& t) const {
- return to_str().compare(t.to_str());
+struct rgw_pubsub_bucket_topics {
+ map<string, rgw_pubsub_topic> topics;
+
+ void encode(bufferlist& bl) const {
+ ENCODE_START(1, 1, bl);
+ encode(topics, bl);
+ ENCODE_FINISH(bl);
}
+
+ void decode(bufferlist::const_iterator& bl) {
+ DECODE_START(1, bl);
+ decode(topics, bl);
+ DECODE_FINISH(bl);
+ }
+
+ void dump(Formatter *f) const;
};
-WRITE_CLASS_ENCODER(rgw_pubsub_user_topic_info)
+WRITE_CLASS_ENCODER(rgw_pubsub_bucket_topics)
struct rgw_pubsub_user_topics {
- map<string, rgw_pubsub_user_topic_info> topics;
+ map<string, rgw_pubsub_topic_subs> topics;
void encode(bufferlist& bl) const {
ENCODE_START(1, 1, bl);
class RGWUserPubSub
{
+ friend class Bucket;
+
RGWRados *store;
rgw_user user;
RGWObjectCtx obj_ctx;
+ rgw_raw_obj user_meta_obj;
+
+ string user_meta_oid() const {
+ return pubsub_user_oid_prefix + user.to_str();
+ }
+
+ string bucket_meta_oid(const rgw_bucket& bucket) const {
+ return pubsub_user_oid_prefix + user.to_str() + ".bucket." + bucket.name + "/" + bucket.bucket_id;
+ }
+
+ string sub_meta_oid(const string& name) const {
+ return pubsub_user_oid_prefix + user.to_str() + ".sub." + name;
+ }
+
template <class T>
int read(const rgw_raw_obj& obj, T *data, RGWObjVersionTracker *objv_tracker);
int remove(const rgw_raw_obj& obj, RGWObjVersionTracker *objv_tracker);
- int update_bucket(const rgw_pubsub_user_topics& topics, const rgw_bucket& bucket);
+ int read_user_topics(rgw_pubsub_user_topics *result, RGWObjVersionTracker *objv_tracker);
+ int write_user_topics(const rgw_pubsub_user_topics& topics, RGWObjVersionTracker *objv_tracker);
public:
RGWUserPubSub(RGWRados *_store, const rgw_user& _user) : store(_store),
user(_user),
- obj_ctx(store) {}
-
- string user_meta_oid() const {
- return pubsub_user_oid_prefix + user.to_str();
+ obj_ctx(store) {
+ get_user_meta_obj(&user_meta_obj);
}
- string bucket_meta_oid(const rgw_bucket& bucket) const {
- return pubsub_user_oid_prefix + user.to_str() + ".bucket." + bucket.name + "/" + bucket.bucket_id;
+ class Bucket {
+ friend class RGWUserPubSub;
+ RGWUserPubSub *ps;
+ rgw_bucket bucket;
+ rgw_raw_obj bucket_meta_obj;
+
+ int read_topics(rgw_pubsub_bucket_topics *result, RGWObjVersionTracker *objv_tracker);
+ int write_topics(const rgw_pubsub_bucket_topics& topics, RGWObjVersionTracker *objv_tracker);
+ public:
+ Bucket(RGWUserPubSub *_ps, const rgw_bucket& _bucket) : ps(_ps), bucket(_bucket) {
+ ps->get_bucket_meta_obj(bucket, &bucket_meta_obj);
+ }
+
+ int get_topics(rgw_pubsub_bucket_topics *result);
+ int create_notification(const string& topic_name);
+ int remove_notification(const string& topic_name);
+ };
+
+ class Sub {
+ friend class RGWUserPubSub;
+ RGWUserPubSub *ps;
+ string sub;
+ rgw_raw_obj sub_meta_obj;
+
+ int read_sub(rgw_pubsub_sub_config *result, RGWObjVersionTracker *objv_tracker);
+ int write_sub(const rgw_pubsub_sub_config& sub_conf, RGWObjVersionTracker *objv_tracker);
+ int remove_sub(RGWObjVersionTracker *objv_tracker);
+ public:
+ Sub(RGWUserPubSub *_ps, const string& _sub) : ps(_ps), sub(_sub) {
+ ps->get_sub_meta_obj(sub, &sub_meta_obj);
+ }
+
+ int subscribe(const string& topic_name, const rgw_pubsub_sub_dest& dest);
+ int unsubscribe(const string& topic_name);
+ int get_conf(rgw_pubsub_sub_config *result);
+
+ struct list_events_result {
+ string next_marker;
+ bool is_truncated{false};
+ std::vector<rgw_pubsub_event> events;
+
+ void dump(Formatter *f) const;
+ };
+
+ int list_events(const string& marker, int max_events, list_events_result *result);
+ int remove_event(const string& event_id);
+ };
+
+ using BucketRef = std::shared_ptr<Bucket>;
+ using SubRef = std::shared_ptr<Sub>;
+
+ BucketRef get_bucket(const rgw_bucket& bucket) {
+ return std::make_shared<Bucket>(this, bucket);
}
- string sub_meta_oid(const string& name) const {
- return pubsub_user_oid_prefix + user.to_str() + ".sub." + name;
+ SubRef get_sub(const string& sub) {
+ return std::make_shared<Sub>(this, sub);
}
void get_user_meta_obj(rgw_raw_obj *obj) const {
*obj = rgw_raw_obj(store->get_zone_params().log_pool, sub_meta_oid(name));
}
- int get_topics(rgw_pubsub_user_topics *result);
- int get_bucket_topics(const rgw_bucket& bucket, rgw_pubsub_user_topics *result);
- int get_topic(const string& name, rgw_pubsub_user_topic_info *result);
- int create_topic(const string& name, const rgw_bucket& bucket);
+ int get_user_topics(rgw_pubsub_user_topics *result);
+ int get_topic(const string& name, rgw_pubsub_topic_subs *result);
+ int create_topic(const string& name);
int remove_topic(const string& name);
- int get_sub(const string& name, rgw_pubsub_user_sub_config *result);
- int add_sub(const string& name, const string& topic, const rgw_pubsub_user_sub_dest& dest);
- int remove_sub(const string& name, const string& topic);
-
- struct list_events_result {
- string next_marker;
- bool is_truncated{false};
- std::vector<rgw_pubsub_event> events;
-
- void dump(Formatter *f) const;
- };
-
- int list_events(const string& sub_name, const string& marker, int max_events, list_events_result *result);
- int remove_event(const string& sub_name, const string& event_id);
};
template <class T>
string data_bucket_name;
string data_oid_prefix;
- void from_user_conf(const rgw_pubsub_user_sub_config& uc) {
+ void from_user_conf(const rgw_pubsub_sub_config& uc) {
name = uc.name;
topic = uc.topic;
push_endpoint = uc.dest.push_endpoint;
PSSubscription(RGWDataSyncEnv *_sync_env,
PSEnvRef _env,
- rgw_pubsub_user_sub_config& user_sub_conf) : sync_env(_sync_env),
+ rgw_pubsub_sub_config& user_sub_conf) : sync_env(_sync_env),
env(_env),
sub_conf(std::make_shared<PSSubConfig>()),
data_access(std::make_shared<RGWDataAccess>(sync_env->store)) {
PSConfigRef conf;
PSSubConfigRef sub_conf;
- rgw_pubsub_user_sub_config user_sub_conf;
+ rgw_pubsub_sub_config user_sub_conf;
public:
GetSubCR(RGWDataSyncEnv *_sync_env,
PSManagerRef& _mgr,
*ref = PSSubscription::get_shared(sync_env, mgr->env, sub_conf);
} else {
- using ReadInfoCR = RGWSimpleRadosReadCR<rgw_pubsub_user_sub_config>;
+ using ReadInfoCR = RGWSimpleRadosReadCR<rgw_pubsub_sub_config>;
yield {
RGWUserPubSub ups(sync_env->store, owner);
rgw_raw_obj obj;
RGWUserPubSub ups;
- rgw_raw_obj obj;
- rgw_pubsub_user_topics bucket_topics;
+ rgw_raw_obj bucket_obj;
+ rgw_raw_obj user_obj;
+ rgw_pubsub_bucket_topics bucket_topics;
+ rgw_pubsub_user_topics user_topics;
TopicsRef *topics;
public:
RGWPSFindBucketTopicsCR(RGWDataSyncEnv *_sync_env,
}
int operate() override {
reenter(this) {
- ups.get_bucket_meta_obj(bucket, &obj);
+ ups.get_bucket_meta_obj(bucket, &bucket_obj);
+ ups.get_user_meta_obj(&user_obj);
-
- using ReadInfoCR = RGWSimpleRadosReadCR<rgw_pubsub_user_topics>;
+ using ReadInfoCR = RGWSimpleRadosReadCR<rgw_pubsub_bucket_topics>;
yield {
bool empty_on_enoent = true;
call(new ReadInfoCR(sync_env->async_rados, sync_env->store,
- obj,
+ bucket_obj,
&bucket_topics, empty_on_enoent));
}
if (retcode < 0 && retcode != -ENOENT) {
ldout(sync_env->cct, 20) << "RGWPSFindBucketTopicsCR(): found " << bucket_topics.topics.size() << " topics for bucket " << bucket << dendl;
+ if (!bucket_topics.topics.empty()) {
+ using ReadUserTopicsInfoCR = RGWSimpleRadosReadCR<rgw_pubsub_user_topics>;
+ yield {
+ bool empty_on_enoent = true;
+ call(new ReadUserTopicsInfoCR(sync_env->async_rados, sync_env->store,
+ user_obj,
+ &user_topics, empty_on_enoent));
+ }
+ if (retcode < 0 && retcode != -ENOENT) {
+ return set_cr_error(retcode);
+ }
+ }
+
for (auto& titer : bucket_topics.topics) {
auto& info = titer.second;
shared_ptr<PSTopicConfig> tc = std::make_shared<PSTopicConfig>();
- tc->name = info.topic.name;
- tc->subs = info.subs;
+ tc->name = info.name;
+ tc->subs = user_topics.topics[info.name].subs;
(*topics)->push_back(tc);
}
std::unique_ptr<RGWUserPubSub> ups;
string topic_name;
string bucket_name;
- RGWBucketInfo bucket_info;
public:
RGWPSCreateTopicOp() {}
int verify_permission() override {
- int ret = get_params();
- if (ret < 0) {
- return ret;
- }
-
- RGWObjectCtx& obj_ctx = *static_cast<RGWObjectCtx *>(s->obj_ctx);
-
- ret = store->get_bucket_info(obj_ctx, s->owner.get_id().tenant, bucket_name,
- bucket_info, nullptr, nullptr);
- if (ret < 0) {
- return ret;
- }
-
- if (bucket_info.owner != s->owner.get_id()) {
- ldout(s->cct, 20) << "user doesn't own bucket, cannot create topic" << dendl;
- return -EPERM;
- }
-
return 0;
}
void pre_exec() override {
void RGWPSCreateTopicOp::execute()
{
+ op_ret = get_params();
+ if (op_ret < 0) {
+ return;
+ }
+
ups = make_unique<RGWUserPubSub>(store, s->owner.get_id());
- op_ret = ups->create_topic(topic_name, bucket_info.bucket);
+ op_ret = ups->create_topic(topic_name);
if (op_ret < 0) {
ldout(s->cct, 20) << "failed to create topic, ret=" << op_ret << dendl;
return;
int get_params() override {
topic_name = s->object.name;
-
- bool exists;
- bucket_name = s->info.args.get("bucket", &exists);
- if (!exists) {
- ldout(s->cct, 20) << "ERROR: missing required param 'bucket' for request" << dendl;
- return -EINVAL;
- }
-
return 0;
}
};
class RGWPSListTopicsOp : public RGWOp {
protected:
- string bucket_name;
- RGWBucketInfo bucket_info;
std::unique_ptr<RGWUserPubSub> ups;
rgw_pubsub_user_topics result;
RGWPSListTopicsOp() {}
int verify_permission() override {
- int ret = get_params();
- if (ret < 0) {
- return ret;
- }
-
- if (bucket_name.empty()) {
- return 0;
- }
-
- RGWObjectCtx& obj_ctx = *static_cast<RGWObjectCtx *>(s->obj_ctx);
-
- ret = store->get_bucket_info(obj_ctx, s->owner.get_id().tenant, bucket_name,
- bucket_info, nullptr, nullptr);
- if (ret < 0) {
- return ret;
- }
-
- if (bucket_info.owner != s->owner.get_id()) {
- ldout(s->cct, 20) << "user doesn't own bucket, cannot create topic" << dendl;
- return -EPERM;
- }
-
return 0;
}
void pre_exec() override {
const char* name() const override { return "pubsub_topics_list"; }
virtual RGWOpType get_type() override { return RGW_OP_PUBSUB_TOPICS_LIST; }
virtual uint32_t op_mask() override { return RGW_OP_TYPE_READ; }
- virtual int get_params() = 0;
};
void RGWPSListTopicsOp::execute()
{
ups = make_unique<RGWUserPubSub>(store, s->owner.get_id());
- if (bucket_name.empty()) {
- op_ret = ups->get_topics(&result);
- } else {
- op_ret = ups->get_bucket_topics(bucket_info.bucket, &result);
- }
+ op_ret = ups->get_user_topics(&result);
if (op_ret < 0) {
ldout(s->cct, 20) << "failed to get topics, ret=" << op_ret << dendl;
return;
public:
explicit RGWPSListTopics_ObjStore_S3() {}
- int get_params() override {
- bucket_name = s->info.args.get("bucket");
- return 0;
- }
-
void send_response() override {
if (op_ret) {
set_req_state_err(s, op_ret);
protected:
string topic_name;
std::unique_ptr<RGWUserPubSub> ups;
- rgw_pubsub_user_topic_info result;
+ rgw_pubsub_topic_subs result;
public:
RGWPSGetTopicOp() {}
string sub_name;
string topic_name;
std::unique_ptr<RGWUserPubSub> ups;
- rgw_pubsub_user_sub_dest dest;
+ rgw_pubsub_sub_dest dest;
public:
RGWPSCreateSubOp() {}
return;
}
ups = make_unique<RGWUserPubSub>(store, s->owner.get_id());
- op_ret = ups->add_sub(sub_name, topic_name, dest);
+ auto sub = ups->get_sub(sub_name);
+ op_ret = sub->subscribe(topic_name, dest);
if (op_ret < 0) {
ldout(s->cct, 20) << "failed to create subscription, ret=" << op_ret << dendl;
return;
protected:
string sub_name;
std::unique_ptr<RGWUserPubSub> ups;
- rgw_pubsub_user_sub_config result;
+ rgw_pubsub_sub_config result;
public:
RGWPSGetSubOp() {}
return;
}
ups = make_unique<RGWUserPubSub>(store, s->owner.get_id());
- op_ret = ups->get_sub(sub_name, &result);
+ auto sub = ups->get_sub(sub_name);
+ op_ret = sub->get_conf(&result);
if (op_ret < 0) {
ldout(s->cct, 20) << "failed to get subscription, ret=" << op_ret << dendl;
return;
return;
}
ups = make_unique<RGWUserPubSub>(store, s->owner.get_id());
- op_ret = ups->remove_sub(sub_name, topic_name);
+ auto sub = ups->get_sub(sub_name);
+ op_ret = sub->unsubscribe(topic_name);
if (op_ret < 0) {
ldout(s->cct, 20) << "failed to remove subscription, ret=" << op_ret << dendl;
return;
return;
}
ups = make_unique<RGWUserPubSub>(store, s->owner.get_id());
- op_ret = ups->remove_event(sub_name, event_id);
+ auto sub = ups->get_sub(sub_name);
+ op_ret = sub->remove_event(event_id);
if (op_ret < 0) {
ldout(s->cct, 20) << "failed to remove event, ret=" << op_ret << dendl;
return;
string sub_name;
string marker;
std::unique_ptr<RGWUserPubSub> ups;
- RGWUserPubSub::list_events_result result;
+ RGWUserPubSub::Sub::list_events_result result;
public:
RGWPSPullSubEventsOp() {}
return;
}
ups = make_unique<RGWUserPubSub>(store, s->owner.get_id());
- op_ret = ups->list_events(sub_name, marker, max_entries, &result);
+ auto sub = ups->get_sub(sub_name);
+ op_ret = sub->list_events(marker, max_entries, &result);
if (op_ret < 0) {
ldout(s->cct, 20) << "failed to get subscription, ret=" << op_ret << dendl;
return;
};
+static int notif_bucket_path(const string& path, string *bucket_name)
+{
+ if (path.empty()) {
+ return -EINVAL;
+ }
+ size_t pos = path.find('/');
+ if (pos == string::npos) {
+ return -EINVAL;
+ }
+ if (pos >= path.size()) {
+ return -EINVAL;
+ }
+
+ string type = path.substr(0, pos);
+ if (type != "bucket") {
+ return -EINVAL;
+ }
+
+ *bucket_name = path.substr(pos + 1);
+ return 0;
+}
+
+class RGWPSCreateNotifOp : public RGWOp {
+protected:
+ std::unique_ptr<RGWUserPubSub> ups;
+ string topic_name;
+ string bucket_name;
+ RGWBucketInfo bucket_info;
+
+public:
+ RGWPSCreateNotifOp() {}
+
+ int verify_permission() override {
+ int ret = get_params();
+ if (ret < 0) {
+ return ret;
+ }
+
+ RGWObjectCtx& obj_ctx = *static_cast<RGWObjectCtx *>(s->obj_ctx);
+
+ ret = store->get_bucket_info(obj_ctx, s->owner.get_id().tenant, bucket_name,
+ bucket_info, nullptr, nullptr);
+ if (ret < 0) {
+ return ret;
+ }
+
+ if (bucket_info.owner != s->owner.get_id()) {
+ ldout(s->cct, 20) << "user doesn't own bucket, cannot create topic" << dendl;
+ return -EPERM;
+ }
+ return 0;
+ }
+ void pre_exec() override {
+ rgw_bucket_object_pre_exec(s);
+ }
+ void execute() override;
+
+ const char* name() const override { return "pubsub_notification_create"; }
+ virtual RGWOpType get_type() override { return RGW_OP_PUBSUB_NOTIF_CREATE; }
+ virtual uint32_t op_mask() override { return RGW_OP_TYPE_WRITE; }
+ virtual int get_params() = 0;
+};
+
+void RGWPSCreateNotifOp::execute()
+{
+ op_ret = get_params();
+ if (op_ret < 0) {
+ return;
+ }
+
+ ups = make_unique<RGWUserPubSub>(store, s->owner.get_id());
+ auto b = ups->get_bucket(bucket_info.bucket);
+ op_ret = b->create_notification(topic_name);
+ if (op_ret < 0) {
+ ldout(s->cct, 20) << "failed to create notification, ret=" << op_ret << dendl;
+ return;
+ }
+}
+
+class RGWPSCreateNotif_ObjStore_S3 : public RGWPSCreateNotifOp {
+public:
+ explicit RGWPSCreateNotif_ObjStore_S3() {}
+
+ int get_params() override {
+ bool exists;
+ topic_name = s->info.args.get("topic", &exists);
+ if (!exists) {
+ ldout(s->cct, 20) << "param 'topic' not provided" << dendl;
+ return -EINVAL;
+ }
+ return notif_bucket_path(s->object.name, &bucket_name);
+ }
+};
+
+class RGWPSDeleteNotifOp : public RGWOp {
+protected:
+ std::unique_ptr<RGWUserPubSub> ups;
+ string topic_name;
+ string bucket_name;
+ RGWBucketInfo bucket_info;
+
+public:
+ RGWPSDeleteNotifOp() {}
+
+ int verify_permission() override {
+ int ret = get_params();
+ if (ret < 0) {
+ return ret;
+ }
+
+ RGWObjectCtx& obj_ctx = *static_cast<RGWObjectCtx *>(s->obj_ctx);
+
+ ret = store->get_bucket_info(obj_ctx, s->owner.get_id().tenant, bucket_name,
+ bucket_info, nullptr, nullptr);
+ if (ret < 0) {
+ return ret;
+ }
+
+ if (bucket_info.owner != s->owner.get_id()) {
+ ldout(s->cct, 20) << "user doesn't own bucket, cannot create topic" << dendl;
+ return -EPERM;
+ }
+ return 0;
+ }
+ void pre_exec() override {
+ rgw_bucket_object_pre_exec(s);
+ }
+ void execute() override;
+
+ const char* name() const override { return "pubsub_notification_delete"; }
+ virtual RGWOpType get_type() override { return RGW_OP_PUBSUB_NOTIF_DELETE; }
+ virtual uint32_t op_mask() override { return RGW_OP_TYPE_DELETE; }
+ virtual int get_params() = 0;
+};
+
+void RGWPSDeleteNotifOp::execute()
+{
+ op_ret = get_params();
+ if (op_ret < 0) {
+ return;
+ }
+
+ ups = make_unique<RGWUserPubSub>(store, s->owner.get_id());
+ auto b = ups->get_bucket(bucket_info.bucket);
+ op_ret = b->remove_notification(topic_name);
+ if (op_ret < 0) {
+ ldout(s->cct, 20) << "failed to remove notification, ret=" << op_ret << dendl;
+ return;
+ }
+}
+
+class RGWPSDeleteNotif_ObjStore_S3 : public RGWPSCreateNotifOp {
+public:
+ explicit RGWPSDeleteNotif_ObjStore_S3() {}
+
+ int get_params() override {
+ bool exists;
+ topic_name = s->info.args.get("topic", &exists);
+ if (!exists) {
+ ldout(s->cct, 20) << "param 'topic' not provided" << dendl;
+ return -EINVAL;
+ }
+ return notif_bucket_path(s->object.name, &bucket_name);
+ }
+};
+
+class RGWPSListNotifsOp : public RGWOp {
+protected:
+ string bucket_name;
+ RGWBucketInfo bucket_info;
+ std::unique_ptr<RGWUserPubSub> ups;
+ rgw_pubsub_bucket_topics result;
+
+
+public:
+ RGWPSListNotifsOp() {}
+
+ int verify_permission() override {
+ int ret = get_params();
+ if (ret < 0) {
+ return ret;
+ }
+
+ RGWObjectCtx& obj_ctx = *static_cast<RGWObjectCtx *>(s->obj_ctx);
+
+ ret = store->get_bucket_info(obj_ctx, s->owner.get_id().tenant, bucket_name,
+ bucket_info, nullptr, nullptr);
+ if (ret < 0) {
+ return ret;
+ }
+
+ if (bucket_info.owner != s->owner.get_id()) {
+ ldout(s->cct, 20) << "user doesn't own bucket, cannot create topic" << dendl;
+ return -EPERM;
+ }
+
+ return 0;
+ }
+ void pre_exec() override {
+ rgw_bucket_object_pre_exec(s);
+ }
+ void execute() override;
+
+ const char* name() const override { return "pubsub_notifications_list"; }
+ virtual RGWOpType get_type() override { return RGW_OP_PUBSUB_NOTIF_LIST; }
+ virtual uint32_t op_mask() override { return RGW_OP_TYPE_READ; }
+ virtual int get_params() = 0;
+};
+
+void RGWPSListNotifsOp::execute()
+{
+ ups = make_unique<RGWUserPubSub>(store, s->owner.get_id());
+ auto b = ups->get_bucket(bucket_info.bucket);
+ op_ret = b->get_topics(&result);
+ if (op_ret < 0) {
+ ldout(s->cct, 20) << "failed to get topics, ret=" << op_ret << dendl;
+ return;
+ }
+
+}
+
+class RGWPSListNotifs_ObjStore_S3 : public RGWPSListNotifsOp {
+public:
+ explicit RGWPSListNotifs_ObjStore_S3() {}
+
+ int get_params() override {
+ return notif_bucket_path(s->object.name, &bucket_name);
+ }
+
+ void send_response() override {
+ if (op_ret) {
+ set_req_state_err(s, op_ret);
+ }
+ dump_errno(s);
+ end_header(s, this, "application/json");
+
+ if (op_ret < 0) {
+ return;
+ }
+
+ encode_json("result", result, s->formatter);
+ rgw_flush_formatter_and_reset(s, s->formatter);
+ }
+};
+
+
+class RGWHandler_REST_PSNotifs_S3 : public RGWHandler_REST_S3 {
+protected:
+ int init_permissions(RGWOp* op) override {
+ return 0;
+ }
+
+ int read_permissions(RGWOp* op) override {
+ return 0;
+ }
+ bool supports_quota() override {
+ return false;
+ }
+ RGWOp *op_get() override {
+ if (s->object.empty()) {
+ return nullptr;
+ }
+ return new RGWPSListNotifs_ObjStore_S3();
+ }
+ RGWOp *op_put() override {
+ if (!s->object.empty()) {
+ return new RGWPSCreateNotif_ObjStore_S3();
+ }
+ return nullptr;
+ }
+ RGWOp *op_delete() override {
+ if (!s->object.empty()) {
+ return new RGWPSDeleteNotif_ObjStore_S3();
+ }
+ return nullptr;
+ }
+public:
+ explicit RGWHandler_REST_PSNotifs_S3(const rgw::auth::StrategyRegistry& auth_registry) : RGWHandler_REST_S3(auth_registry) {}
+ virtual ~RGWHandler_REST_PSNotifs_S3() {}
+};
+
+
RGWHandler_REST* RGWRESTMgr_PubSub_S3::get_handler(struct req_state* const s,
const rgw::auth::StrategyRegistry& auth_registry,
const std::string& frontend_prefix)
handler = new RGWHandler_REST_PSSub_S3(auth_registry);
}
+ if (s->init_state.url_bucket == "notifications") {
+ handler = new RGWHandler_REST_PSNotifs_S3(auth_registry);
+ }
+
ldout(s->cct, 20) << __func__ << " handler=" << (handler ? typeid(*handler).name() : "<null>") << dendl;
return handler;
}