From: Yehuda Sadeh Date: Fri, 6 Jul 2018 19:53:36 +0000 (-0700) Subject: rgw: cleanups and data structure changes, api changes X-Git-Tag: v14.1.0~616^2~40 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=f94f597931c4cf65afe45686e84befaf2acdaad1;p=ceph-ci.git rgw: cleanups and data structure changes, api changes split user topics and bucket topics, can have multiple buckets per topic and multiple topics per bucket. Buckets don't hold subscription info. Signed-off-by: Yehuda Sadeh --- diff --git a/src/rgw/rgw_admin.cc b/src/rgw/rgw_admin.cc index 52ac87c61aa..fde88f0dd39 100644 --- a/src/rgw/rgw_admin.cc +++ b/src/rgw/rgw_admin.cc @@ -535,6 +535,8 @@ enum { OPT_PUBSUB_TOPICS_LIST, OPT_PUBSUB_TOPIC_CREATE, OPT_PUBSUB_TOPIC_RM, + OPT_PUBSUB_NOTIFICATION_CREATE, + OPT_PUBSUB_NOTIFICATION_RM, OPT_PUBSUB_SUB_GET, OPT_PUBSUB_SUB_CREATE, OPT_PUBSUB_SUB_RM, @@ -564,6 +566,7 @@ static int get_cmd(const char *cmd, const char *prev_cmd, const char *prev_prev_ strcmp(cmd, "mdlog") == 0 || strcmp(cmd, "metadata") == 0 || strcmp(cmd, "mfa") == 0 || + strcmp(cmd, "notification") == 0 || strcmp(cmd, "object") == 0 || strcmp(cmd, "objects") == 0 || strcmp(cmd, "olh") == 0 || @@ -1015,6 +1018,11 @@ static int get_cmd(const char *cmd, const char *prev_cmd, const char *prev_prev_ return OPT_PUBSUB_TOPIC_CREATE; if (strcmp(cmd, "rm") == 0) return OPT_PUBSUB_TOPIC_RM; + } else if (strcmp(prev_cmd, "notification") == 0) { + if (strcmp(cmd, "create") == 0) + return OPT_PUBSUB_NOTIFICATION_CREATE; + if (strcmp(cmd, "rm") == 0) + return OPT_PUBSUB_NOTIFICATION_RM; } else if (strcmp(prev_cmd, "sub") == 0) { if (strcmp(cmd, "get") == 0) return OPT_PUBSUB_SUB_GET; @@ -7779,9 +7787,8 @@ next: rgw_bucket bucket; - rgw_pubsub_user_topics result; - if (!bucket_name.empty()) { + rgw_pubsub_bucket_topics result; RGWBucketInfo bucket_info; int ret = init_bucket(tenant, bucket_name, bucket_id, bucket_info, bucket); if (ret < 0) { @@ -7789,23 +7796,49 @@ next: return -ret; } - ret = ups.get_bucket_topics(bucket_info.bucket, &result); + auto b = ups.get_bucket(bucket_info.bucket); + ret = b->get_topics(&result); if (ret < 0) { cerr << "ERROR: could not get topics: " << cpp_strerror(-ret) << std::endl; return -ret; } + encode_json("result", result, formatter); } else { - int ret = ups.get_topics(&result); + rgw_pubsub_user_topics result; + int ret = ups.get_user_topics(&result); if (ret < 0) { cerr << "ERROR: could not get topics: " << cpp_strerror(-ret) << std::endl; return -ret; } + encode_json("result", result, formatter); } - encode_json("result", result, formatter); formatter->flush(cout); } if (opt_cmd == OPT_PUBSUB_TOPIC_CREATE) { + if (get_tier_type(store) != "pubsub") { + cerr << "ERROR: only pubsub tier type supports this command" << std::endl; + return EINVAL; + } + if (topic_name.empty()) { + cerr << "ERROR: topic name was not provided (via --topic)" << std::endl; + return EINVAL; + } + if (user_id.empty()) { + cerr << "ERROR: user id was not provided (via --uid)" << std::endl; + return EINVAL; + } + RGWUserInfo& user_info = user_op.get_user_info(); + RGWUserPubSub ups(store, user_info.user_id); + + ret = ups.create_topic(topic_name); + if (ret < 0) { + cerr << "ERROR: could not create topic: " << cpp_strerror(-ret) << std::endl; + return -ret; + } + } + + if (opt_cmd == OPT_PUBSUB_NOTIFICATION_CREATE) { if (get_tier_type(store) != "pubsub") { cerr << "ERROR: only pubsub tier type supports this command" << std::endl; return EINVAL; @@ -7834,14 +7867,15 @@ next: return -ret; } - ret = ups.create_topic(topic_name, bucket_info.bucket); + auto b = ups.get_bucket(bucket_info.bucket); + ret = b->create_notification(topic_name); if (ret < 0) { - cerr << "ERROR: could not create topic: " << cpp_strerror(-ret) << std::endl; + cerr << "ERROR: could not publish bucket: " << cpp_strerror(-ret) << std::endl; return -ret; } } - if (opt_cmd == OPT_PUBSUB_TOPIC_RM) { + if (opt_cmd == OPT_PUBSUB_NOTIFICATION_RM) { if (get_tier_type(store) != "pubsub") { cerr << "ERROR: only pubsub tier type supports this command" << std::endl; return EINVAL; @@ -7870,6 +7904,30 @@ next: return -ret; } + auto b = ups.get_bucket(bucket_info.bucket); + ret = b->remove_notification(topic_name); + if (ret < 0) { + cerr << "ERROR: could not publish bucket: " << cpp_strerror(-ret) << std::endl; + return -ret; + } + } + + if (opt_cmd == OPT_PUBSUB_TOPIC_RM) { + if (get_tier_type(store) != "pubsub") { + cerr << "ERROR: only pubsub tier type supports this command" << std::endl; + return EINVAL; + } + if (topic_name.empty()) { + cerr << "ERROR: topic name was not provided (via --topic)" << std::endl; + return EINVAL; + } + if (user_id.empty()) { + cerr << "ERROR: user id was not provided (via --uid)" << std::endl; + return EINVAL; + } + RGWUserInfo& user_info = user_op.get_user_info(); + RGWUserPubSub ups(store, user_info.user_id); + ret = ups.remove_topic(topic_name); if (ret < 0) { cerr << "ERROR: could not remove topic: " << cpp_strerror(-ret) << std::endl; @@ -7893,14 +7951,15 @@ next: RGWUserInfo& user_info = user_op.get_user_info(); RGWUserPubSub ups(store, user_info.user_id); - rgw_pubsub_user_sub_config sub; + rgw_pubsub_sub_config sub_conf; - ret = ups.get_sub(sub_name, &sub); + auto sub = ups.get_sub(sub_name); + ret = sub->get_conf(&sub_conf); if (ret < 0) { cerr << "ERROR: could not get subscription info: " << cpp_strerror(-ret) << std::endl; return -ret; } - encode_json("sub", sub, formatter); + encode_json("sub", sub_conf, formatter); formatter->flush(cout); } @@ -7924,28 +7983,28 @@ next: RGWUserInfo& user_info = user_op.get_user_info(); RGWUserPubSub ups(store, user_info.user_id); - rgw_pubsub_user_topic_info topic; + rgw_pubsub_topic_subs topic; int ret = ups.get_topic(topic_name, &topic); if (ret < 0) { cerr << "ERROR: topic not found" << std::endl; return EINVAL; } - rgw_pubsub_user_sub_config sub; auto& tier_config = get_tier_config(store); - rgw_pubsub_user_sub_dest dest_config; + rgw_pubsub_sub_dest dest_config; dest_config.bucket_name = sub_dest_bucket; dest_config.oid_prefix = sub_oid_prefix; dest_config.push_endpoint = sub_push_endpoint; if (dest_config.bucket_name.empty()) { - dest_config.bucket_name = string(tier_config["data_bucket_prefix"]) + topic.user.to_str() + "-" + topic.topic.name; + dest_config.bucket_name = string(tier_config["data_bucket_prefix"]) + user_info.user_id.to_str() + "-" + topic.topic.name; } if (dest_config.oid_prefix.empty()) { dest_config.oid_prefix = tier_config["data_oid_prefix"]; } - ret = ups.add_sub(sub_name, topic_name, dest_config); + auto sub = ups.get_sub(sub_name); + ret = sub->subscribe(topic_name, dest_config); if (ret < 0) { cerr << "ERROR: could not store subscription info: " << cpp_strerror(-ret) << std::endl; return -ret; @@ -7968,15 +8027,12 @@ next: RGWUserInfo& user_info = user_op.get_user_info(); RGWUserPubSub ups(store, user_info.user_id); - rgw_pubsub_user_sub_config sub; - - ret = ups.get_sub(sub_name, &sub); + auto sub = ups.get_sub(sub_name); + ret = sub->unsubscribe(topic_name); if (ret < 0) { cerr << "ERROR: could not get subscription info: " << cpp_strerror(-ret) << std::endl; return -ret; } - encode_json("sub", sub, formatter); - formatter->flush(cout); } if (opt_cmd == OPT_PUBSUB_SUB_PULL) { @@ -7995,12 +8051,13 @@ next: RGWUserInfo& user_info = user_op.get_user_info(); RGWUserPubSub ups(store, user_info.user_id); - RGWUserPubSub::list_events_result result; + RGWUserPubSub::Sub::list_events_result result; if (!max_entries_specified) { max_entries = 100; } - ret = ups.list_events(sub_name, marker, max_entries, &result); + auto sub = ups.get_sub(sub_name); + ret = sub->list_events(marker, max_entries, &result); if (ret < 0) { cerr << "ERROR: could not list events: " << cpp_strerror(-ret) << std::endl; return -ret; @@ -8029,7 +8086,8 @@ next: RGWUserInfo& user_info = user_op.get_user_info(); RGWUserPubSub ups(store, user_info.user_id); - ret = ups.remove_event(sub_name, event_id); + auto sub = ups.get_sub(sub_name); + ret = sub->remove_event(event_id); if (ret < 0) { cerr << "ERROR: could not remove event: " << cpp_strerror(-ret) << std::endl; return -ret; diff --git a/src/rgw/rgw_common.h b/src/rgw/rgw_common.h index aaaaaeda899..c5e0a30b596 100644 --- a/src/rgw/rgw_common.h +++ b/src/rgw/rgw_common.h @@ -540,6 +540,9 @@ enum RGWOpType { RGW_OP_PUBSUB_SUB_DELETE, RGW_OP_PUBSUB_SUB_PULL, RGW_OP_PUBSUB_SUB_ACK, + RGW_OP_PUBSUB_NOTIF_CREATE, + RGW_OP_PUBSUB_NOTIF_DELETE, + RGW_OP_PUBSUB_NOTIF_LIST, }; class RGWAccessControlPolicy; diff --git a/src/rgw/rgw_pubsub.cc b/src/rgw/rgw_pubsub.cc index 449b33c867e..d53a0547bd5 100644 --- a/src/rgw/rgw_pubsub.cc +++ b/src/rgw/rgw_pubsub.cc @@ -52,19 +52,26 @@ void rgw_pubsub_event::dump(Formatter *f) const } } -void rgw_pubsub_user_topic::dump(Formatter *f) const +void rgw_pubsub_topic::dump(Formatter *f) const { + encode_json("user", user, f); encode_json("name", name, f); - encode_json("bucket", bucket, f); } -void rgw_pubsub_user_topic_info::dump(Formatter *f) const +void rgw_pubsub_topic_subs::dump(Formatter *f) const { - encode_json("user", user, f); encode_json("topic", topic, f); encode_json("subs", subs, f); } +void rgw_pubsub_bucket_topics::dump(Formatter *f) const +{ + Formatter::ObjectSection s(*f, "topics"); + for (auto& t : topics) { + encode_json(t.first.c_str(), t.second, f); + } +} + void rgw_pubsub_user_topics::dump(Formatter *f) const { Formatter::ObjectSection s(*f, "topics"); @@ -73,14 +80,14 @@ void rgw_pubsub_user_topics::dump(Formatter *f) const } } -void rgw_pubsub_user_sub_dest::dump(Formatter *f) const +void rgw_pubsub_sub_dest::dump(Formatter *f) const { encode_json("bucket_name", bucket_name, f); encode_json("oid_prefix", oid_prefix, f); encode_json("push_endpoint", push_endpoint, f); } -void rgw_pubsub_user_sub_config::dump(Formatter *f) const +void rgw_pubsub_sub_config::dump(Formatter *f) const { encode_json("user", user, f); encode_json("name", name, f); @@ -99,13 +106,19 @@ int RGWUserPubSub::remove(const rgw_raw_obj& obj, RGWObjVersionTracker *objv_tra return 0; } -int RGWUserPubSub::get_topics(rgw_pubsub_user_topics *result) +int RGWUserPubSub::read_user_topics(rgw_pubsub_user_topics *result, RGWObjVersionTracker *objv_tracker) { - rgw_raw_obj obj; - get_user_meta_obj(&obj); + int ret = read(user_meta_obj, result, objv_tracker); + if (ret < 0 && ret != -ENOENT) { + ldout(store->ctx(), 0) << "ERROR: failed to read topics info: ret=" << ret << dendl; + return ret; + } + return 0; +} - RGWObjVersionTracker objv_tracker; - int ret = read(obj, result, &objv_tracker); +int RGWUserPubSub::write_user_topics(const rgw_pubsub_user_topics& topics, RGWObjVersionTracker *objv_tracker) +{ + int ret = write(user_meta_obj, topics, objv_tracker); if (ret < 0 && ret != -ENOENT) { ldout(store->ctx(), 0) << "ERROR: failed to read topics info: ret=" << ret << dendl; return ret; @@ -113,29 +126,41 @@ int RGWUserPubSub::get_topics(rgw_pubsub_user_topics *result) return 0; } -int RGWUserPubSub::get_bucket_topics(const rgw_bucket& bucket, rgw_pubsub_user_topics *result) +int RGWUserPubSub::get_user_topics(rgw_pubsub_user_topics *result) { - rgw_raw_obj obj; - get_bucket_meta_obj(bucket, &obj); + return read_user_topics(result, nullptr); +} - RGWObjVersionTracker objv_tracker; - int ret = read(obj, result, &objv_tracker); +int RGWUserPubSub::Bucket::read_topics(rgw_pubsub_bucket_topics *result, RGWObjVersionTracker *objv_tracker) +{ + int ret = ps->read(bucket_meta_obj, result, objv_tracker); if (ret < 0 && ret != -ENOENT) { - ldout(store->ctx(), 0) << "ERROR: failed to read topics info: ret=" << ret << dendl; + ldout(ps->store->ctx(), 0) << "ERROR: failed to read bucket topics info: ret=" << ret << dendl; + return ret; + } + return 0; +} + +int RGWUserPubSub::Bucket::write_topics(const rgw_pubsub_bucket_topics& topics, RGWObjVersionTracker *objv_tracker) +{ + int ret = ps->write(bucket_meta_obj, topics, objv_tracker); + if (ret < 0) { + ldout(ps->store->ctx(), 0) << "ERROR: failed to write bucket topics info: ret=" << ret << dendl; return ret; } + return 0; } -int RGWUserPubSub::get_topic(const string& name, rgw_pubsub_user_topic_info *result) +int RGWUserPubSub::Bucket::get_topics(rgw_pubsub_bucket_topics *result) { - rgw_raw_obj obj; - get_user_meta_obj(&obj); + return read_topics(result, nullptr); +} - RGWObjVersionTracker objv_tracker; +int RGWUserPubSub::get_topic(const string& name, rgw_pubsub_topic_subs *result) +{ rgw_pubsub_user_topics topics; - - int ret = read(obj, &topics, &objv_tracker); + int ret = get_user_topics(&topics); if (ret < 0) { ldout(store->ctx(), 0) << "ERROR: failed to read topics info: ret=" << ret << dendl; return ret; @@ -152,121 +177,158 @@ int RGWUserPubSub::get_topic(const string& name, rgw_pubsub_user_topic_info *res } -int RGWUserPubSub::create_topic(const string& name, const rgw_bucket& bucket) +int RGWUserPubSub::Bucket::create_notification(const string& topic_name) { - rgw_raw_obj obj; - get_user_meta_obj(&obj); + rgw_pubsub_topic_subs user_topic_info; + RGWRados *store = ps->store; + + int ret = ps->get_topic(topic_name, &user_topic_info); + if (ret < 0) { + ldout(store->ctx(), 0) << "ERROR: failed to read topic info: ret=" << ret << dendl; + return ret; + } RGWObjVersionTracker objv_tracker; - rgw_pubsub_user_topics topics; + rgw_pubsub_bucket_topics bucket_topics; - int ret = read(obj, &topics, &objv_tracker); + ret = read_topics(&bucket_topics, &objv_tracker); if (ret < 0 && ret != -ENOENT) { - ldout(store->ctx(), 0) << "ERROR: failed to read topics info: ret=" << ret << dendl; + ldout(store->ctx(), 0) << "ERROR: failed to read bucket topics info: ret=" << ret << dendl; return ret; } - rgw_pubsub_user_topic_info& new_topic = topics.topics[name]; - new_topic.user = user; - new_topic.topic.name = name; - new_topic.topic.bucket = bucket; + bucket_topics.topics[topic_name] = user_topic_info.topic; - ret = write(obj, topics, &objv_tracker); + ret = write_topics(bucket_topics, &objv_tracker); if (ret < 0) { ldout(store->ctx(), 0) << "ERROR: failed to write topics info: ret=" << ret << dendl; return ret; } - ret = update_bucket(topics, bucket); + return 0; +} + +int RGWUserPubSub::Bucket::remove_notification(const string& topic_name) +{ + rgw_pubsub_topic_subs user_topic_info; + RGWRados *store = ps->store; + + int ret = ps->get_topic(topic_name, &user_topic_info); + if (ret < 0) { + ldout(store->ctx(), 0) << "ERROR: failed to read topic info: ret=" << ret << dendl; + return ret; + } + + RGWObjVersionTracker objv_tracker; + rgw_pubsub_bucket_topics bucket_topics; + + ret = read_topics(&bucket_topics, &objv_tracker); + if (ret < 0 && ret != -ENOENT) { + ldout(store->ctx(), 0) << "ERROR: failed to read bucket topics info: ret=" << ret << dendl; + return ret; + } + + bucket_topics.topics.erase(topic_name); + + ret = write_topics(bucket_topics, &objv_tracker); if (ret < 0) { - ldout(store->ctx(), 0) << "ERROR: failed to write bucket topics info: ret=" << ret << dendl; + ldout(store->ctx(), 0) << "ERROR: failed to write topics info: ret=" << ret << dendl; return ret; } return 0; } -int RGWUserPubSub::remove_topic(const string& name) +int RGWUserPubSub::create_topic(const string& name) { - rgw_raw_obj obj; - get_user_meta_obj(&obj); - RGWObjVersionTracker objv_tracker; rgw_pubsub_user_topics topics; - int ret = read(obj, &topics, &objv_tracker); + int ret = read_user_topics(&topics, &objv_tracker); if (ret < 0 && ret != -ENOENT) { ldout(store->ctx(), 0) << "ERROR: failed to read topics info: ret=" << ret << dendl; return ret; } - rgw_bucket bucket; - auto t = topics.topics.find(name); - if (t != topics.topics.end()) { - bucket = t->second.topic.bucket; - } - - topics.topics.erase(name); + rgw_pubsub_topic_subs& new_topic = topics.topics[name]; + new_topic.topic.user = user; + new_topic.topic.name = name; - ret = write(obj, topics, &objv_tracker); + ret = write_user_topics(topics, &objv_tracker); if (ret < 0) { ldout(store->ctx(), 0) << "ERROR: failed to write topics info: ret=" << ret << dendl; return ret; } - if (bucket.name.empty()) { - return 0; + return 0; +} + +int RGWUserPubSub::remove_topic(const string& name) +{ + RGWObjVersionTracker objv_tracker; + rgw_pubsub_user_topics topics; + + int ret = read_user_topics(&topics, &objv_tracker); + if (ret < 0 && ret != -ENOENT) { + ldout(store->ctx(), 0) << "ERROR: failed to read topics info: ret=" << ret << dendl; + return ret; } - ret = update_bucket(topics, bucket); + topics.topics.erase(name); + + ret = write_user_topics(topics, &objv_tracker); if (ret < 0) { - ldout(store->ctx(), 0) << "ERROR: failed to write bucket topics info: ret=" << ret << dendl; + ldout(store->ctx(), 0) << "ERROR: failed to write topics info: ret=" << ret << dendl; return ret; } return 0; } -int RGWUserPubSub::update_bucket(const rgw_pubsub_user_topics& topics, const rgw_bucket& bucket) +int RGWUserPubSub::Sub::read_sub(rgw_pubsub_sub_config *result, RGWObjVersionTracker *objv_tracker) { - rgw_pubsub_user_topics bucket_topics; - for (auto& t : topics.topics) { - if (t.second.topic.bucket == bucket) { - bucket_topics.topics.insert(t); - } + int ret = ps->read(sub_meta_obj, result, objv_tracker); + if (ret < 0 && ret != -ENOENT) { + ldout(ps->store->ctx(), 0) << "ERROR: failed to read subscription info: ret=" << ret << dendl; + return ret; } + return 0; +} - rgw_raw_obj bobj; - get_bucket_meta_obj(bucket, &bobj); - int ret = write(bobj, bucket_topics, nullptr); +int RGWUserPubSub::Sub::write_sub(const rgw_pubsub_sub_config& sub_conf, RGWObjVersionTracker *objv_tracker) +{ + int ret = ps->write(sub_meta_obj, sub_conf, objv_tracker); if (ret < 0) { - ldout(store->ctx(), 0) << "ERROR: failed to write topics info: ret=" << ret << dendl; + ldout(ps->store->ctx(), 0) << "ERROR: failed to write subscription info: ret=" << ret << dendl; return ret; } + return 0; } -int RGWUserPubSub::get_sub(const string& name, rgw_pubsub_user_sub_config *result) +int RGWUserPubSub::Sub::remove_sub(RGWObjVersionTracker *objv_tracker) { - rgw_raw_obj obj; - get_sub_meta_obj(name, &obj); - int ret = read(obj, result, nullptr); + int ret = ps->remove(sub_meta_obj, objv_tracker); if (ret < 0) { - ldout(store->ctx(), 0) << "ERROR: failed to read subscription info: ret=" << ret << dendl; + ldout(ps->store->ctx(), 0) << "ERROR: failed to write subscription info: ret=" << ret << dendl; return ret; } + return 0; } -int RGWUserPubSub::add_sub(const string& name, const string& topic, const rgw_pubsub_user_sub_dest& dest) +int RGWUserPubSub::Sub::get_conf(rgw_pubsub_sub_config *result) { - rgw_raw_obj obj; - get_user_meta_obj(&obj); + return read_sub(result, nullptr); +} - RGWObjVersionTracker objv_tracker; +int RGWUserPubSub::Sub::subscribe(const string& topic, const rgw_pubsub_sub_dest& dest) +{ + RGWObjVersionTracker user_objv_tracker; rgw_pubsub_user_topics topics; + RGWRados *store = ps->store; - int ret = read(obj, &topics, &objv_tracker); + int ret = ps->read_user_topics(&topics, &user_objv_tracker); if (ret < 0) { ldout(store->ctx(), 0) << "ERROR: failed to read topics info: ret=" << ret << dendl; return ret; @@ -280,30 +342,22 @@ int RGWUserPubSub::add_sub(const string& name, const string& topic, const rgw_pu auto& t = iter->second; - rgw_pubsub_user_sub_config sub_conf; + rgw_pubsub_sub_config sub_conf; - sub_conf.user = user; - sub_conf.name = name; + sub_conf.user = ps->user; + sub_conf.name = sub; sub_conf.topic = topic; sub_conf.dest = dest; - t.subs.insert(name); + t.subs.insert(sub); - ret = write(obj, topics, &objv_tracker); + ret = ps->write_user_topics(topics, &user_objv_tracker); if (ret < 0) { ldout(store->ctx(), 0) << "ERROR: failed to write topics info: ret=" << ret << dendl; return ret; } - ret = update_bucket(topics, t.topic.bucket); - if (ret < 0) { - ldout(store->ctx(), 0) << "ERROR: failed to write bucket topics info: ret=" << ret << dendl; - return ret; - } - - rgw_raw_obj sobj; - get_sub_meta_obj(name, &sobj); - ret = write(sobj, sub_conf, nullptr); + ret = write_sub(sub_conf, nullptr); if (ret < 0) { ldout(store->ctx(), 0) << "ERROR: failed to write subscription info: ret=" << ret << dendl; return ret; @@ -311,17 +365,15 @@ int RGWUserPubSub::add_sub(const string& name, const string& topic, const rgw_pu return 0; } -int RGWUserPubSub::remove_sub(const string& name, const string& _topic) +int RGWUserPubSub::Sub::unsubscribe(const string& _topic) { string topic = _topic; - RGWObjVersionTracker sobjv_tracker; - rgw_raw_obj sobj; - get_sub_meta_obj(name, &sobj); + RGWRados *store = ps->store; if (topic.empty()) { - rgw_pubsub_user_sub_config sub_conf; - int ret = read(sobj, &sub_conf, &sobjv_tracker); + rgw_pubsub_sub_config sub_conf; + int ret = read_sub(&sub_conf, &sobjv_tracker); if (ret < 0) { ldout(store->ctx(), 0) << "ERROR: failed to read subscription info: ret=" << ret << dendl; return ret; @@ -329,27 +381,22 @@ int RGWUserPubSub::remove_sub(const string& name, const string& _topic) topic = sub_conf.topic; } - rgw_raw_obj obj; - get_user_meta_obj(&obj); - RGWObjVersionTracker objv_tracker; rgw_pubsub_user_topics topics; - int ret = read(obj, &topics, &objv_tracker); + int ret = ps->read_user_topics(&topics, &objv_tracker); if (ret < 0) { ldout(store->ctx(), 0) << "ERROR: failed to read topics info: ret=" << ret << dendl; } if (ret >= 0) { auto iter = topics.topics.find(topic); - if (iter == topics.topics.end()) { - ldout(store->ctx(), 20) << "ERROR: cannot add subscription to topic: topic not found" << dendl; - } else { + if (iter != topics.topics.end()) { auto& t = iter->second; - t.subs.erase(name); + t.subs.erase(sub); - ret = write(obj, topics, &objv_tracker); + ret = ps->write_user_topics(topics, &objv_tracker); if (ret < 0) { ldout(store->ctx(), 0) << "ERROR: failed to write topics info: ret=" << ret << dendl; return ret; @@ -357,15 +404,15 @@ int RGWUserPubSub::remove_sub(const string& name, const string& _topic) } } - ret = remove(sobj, &sobjv_tracker); + ret = remove_sub(&sobjv_tracker); if (ret < 0) { - ldout(store->ctx(), 0) << "ERROR: failed to write subscription info: ret=" << ret << dendl; + ldout(store->ctx(), 0) << "ERROR: failed to delete subscription info: ret=" << ret << dendl; return ret; } return 0; } -void RGWUserPubSub::list_events_result::dump(Formatter *f) const +void RGWUserPubSub::Sub::list_events_result::dump(Formatter *f) const { encode_json("next_marker", next_marker, f); encode_json("is_truncated", is_truncated, f); @@ -376,12 +423,12 @@ void RGWUserPubSub::list_events_result::dump(Formatter *f) const } } -int RGWUserPubSub::list_events(const string& sub_name, - const string& marker, int max_events, +int RGWUserPubSub::Sub::list_events(const string& marker, int max_events, list_events_result *result) { - rgw_pubsub_user_sub_config sub_conf; - int ret = get_sub(sub_name, &sub_conf); + RGWRados *store = ps->store; + rgw_pubsub_sub_config sub_conf; + int ret = get_conf(&sub_conf); if (ret < 0) { ldout(store->ctx(), 0) << "ERROR: failed to read sub config: ret=" << ret << dendl; return ret; @@ -438,10 +485,11 @@ int RGWUserPubSub::list_events(const string& sub_name, return 0; } -int RGWUserPubSub::remove_event(const string& sub_name, const string& event_id) +int RGWUserPubSub::Sub::remove_event(const string& event_id) { - rgw_pubsub_user_sub_config sub_conf; - int ret = get_sub(sub_name, &sub_conf); + RGWRados *store = ps->store; + rgw_pubsub_sub_config sub_conf; + int ret = get_conf(&sub_conf); if (ret < 0) { ldout(store->ctx(), 0) << "ERROR: failed to read sub config: ret=" << ret << dendl; return ret; diff --git a/src/rgw/rgw_pubsub.h b/src/rgw/rgw_pubsub.h index 071432ed8e1..40f3abb6f98 100644 --- a/src/rgw/rgw_pubsub.h +++ b/src/rgw/rgw_pubsub.h @@ -54,7 +54,7 @@ struct rgw_pubsub_event { }; WRITE_CLASS_ENCODER(rgw_pubsub_event) -struct rgw_pubsub_user_sub_dest { +struct rgw_pubsub_sub_dest { string bucket_name; string oid_prefix; string push_endpoint; @@ -77,13 +77,13 @@ struct rgw_pubsub_user_sub_dest { void dump(Formatter *f) const; }; -WRITE_CLASS_ENCODER(rgw_pubsub_user_sub_dest) +WRITE_CLASS_ENCODER(rgw_pubsub_sub_dest) -struct rgw_pubsub_user_sub_config { +struct rgw_pubsub_sub_config { rgw_user user; string name; string topic; - rgw_pubsub_user_sub_dest dest; + rgw_pubsub_sub_dest dest; void encode(bufferlist& bl) const { ENCODE_START(1, 1, bl); @@ -105,41 +105,44 @@ struct rgw_pubsub_user_sub_config { void dump(Formatter *f) const; }; -WRITE_CLASS_ENCODER(rgw_pubsub_user_sub_config) +WRITE_CLASS_ENCODER(rgw_pubsub_sub_config) -struct rgw_pubsub_user_topic { +struct rgw_pubsub_topic { + rgw_user user; string name; - rgw_bucket bucket; void encode(bufferlist& bl) const { ENCODE_START(1, 1, bl); + encode(user, bl); encode(name, bl); - encode(bucket, bl); ENCODE_FINISH(bl); } void decode(bufferlist::const_iterator& bl) { DECODE_START(1, bl); + decode(user, bl); decode(name, bl); - decode(bucket, bl); DECODE_FINISH(bl); } - const string& to_str() const { - return name; + string to_str() const { + return user.to_str() + "/" + name; } + void dump(Formatter *f) const; + + bool operator<(const rgw_pubsub_topic& t) const { + return to_str().compare(t.to_str()); + } }; -WRITE_CLASS_ENCODER(rgw_pubsub_user_topic) +WRITE_CLASS_ENCODER(rgw_pubsub_topic) -struct rgw_pubsub_user_topic_info { - rgw_user user; - rgw_pubsub_user_topic topic; +struct rgw_pubsub_topic_subs { + rgw_pubsub_topic topic; set subs; void encode(bufferlist& bl) const { ENCODE_START(1, 1, bl); - encode(user, bl); encode(topic, bl); encode(subs, bl); ENCODE_FINISH(bl); @@ -147,26 +150,36 @@ struct rgw_pubsub_user_topic_info { void decode(bufferlist::const_iterator& bl) { DECODE_START(1, bl); - decode(user, bl); decode(topic, bl); decode(subs, bl); DECODE_FINISH(bl); } - string to_str() const { - return user.to_str() + "/" + topic.name; - } - void dump(Formatter *f) const; +}; +WRITE_CLASS_ENCODER(rgw_pubsub_topic_subs) - bool operator<(const rgw_pubsub_user_topic& t) const { - return to_str().compare(t.to_str()); +struct rgw_pubsub_bucket_topics { + map topics; + + void encode(bufferlist& bl) const { + ENCODE_START(1, 1, bl); + encode(topics, bl); + ENCODE_FINISH(bl); } + + void decode(bufferlist::const_iterator& bl) { + DECODE_START(1, bl); + decode(topics, bl); + DECODE_FINISH(bl); + } + + void dump(Formatter *f) const; }; -WRITE_CLASS_ENCODER(rgw_pubsub_user_topic_info) +WRITE_CLASS_ENCODER(rgw_pubsub_bucket_topics) struct rgw_pubsub_user_topics { - map topics; + map topics; void encode(bufferlist& bl) const { ENCODE_START(1, 1, bl); @@ -188,10 +201,26 @@ static string pubsub_user_oid_prefix = "pubsub.user."; class RGWUserPubSub { + friend class Bucket; + RGWRados *store; rgw_user user; RGWObjectCtx obj_ctx; + rgw_raw_obj user_meta_obj; + + string user_meta_oid() const { + return pubsub_user_oid_prefix + user.to_str(); + } + + string bucket_meta_oid(const rgw_bucket& bucket) const { + return pubsub_user_oid_prefix + user.to_str() + ".bucket." + bucket.name + "/" + bucket.bucket_id; + } + + string sub_meta_oid(const string& name) const { + return pubsub_user_oid_prefix + user.to_str() + ".sub." + name; + } + template int read(const rgw_raw_obj& obj, T *data, RGWObjVersionTracker *objv_tracker); @@ -200,22 +229,72 @@ class RGWUserPubSub int remove(const rgw_raw_obj& obj, RGWObjVersionTracker *objv_tracker); - int update_bucket(const rgw_pubsub_user_topics& topics, const rgw_bucket& bucket); + int read_user_topics(rgw_pubsub_user_topics *result, RGWObjVersionTracker *objv_tracker); + int write_user_topics(const rgw_pubsub_user_topics& topics, RGWObjVersionTracker *objv_tracker); public: RGWUserPubSub(RGWRados *_store, const rgw_user& _user) : store(_store), user(_user), - obj_ctx(store) {} - - string user_meta_oid() const { - return pubsub_user_oid_prefix + user.to_str(); + obj_ctx(store) { + get_user_meta_obj(&user_meta_obj); } - string bucket_meta_oid(const rgw_bucket& bucket) const { - return pubsub_user_oid_prefix + user.to_str() + ".bucket." + bucket.name + "/" + bucket.bucket_id; + class Bucket { + friend class RGWUserPubSub; + RGWUserPubSub *ps; + rgw_bucket bucket; + rgw_raw_obj bucket_meta_obj; + + int read_topics(rgw_pubsub_bucket_topics *result, RGWObjVersionTracker *objv_tracker); + int write_topics(const rgw_pubsub_bucket_topics& topics, RGWObjVersionTracker *objv_tracker); + public: + Bucket(RGWUserPubSub *_ps, const rgw_bucket& _bucket) : ps(_ps), bucket(_bucket) { + ps->get_bucket_meta_obj(bucket, &bucket_meta_obj); + } + + int get_topics(rgw_pubsub_bucket_topics *result); + int create_notification(const string& topic_name); + int remove_notification(const string& topic_name); + }; + + class Sub { + friend class RGWUserPubSub; + RGWUserPubSub *ps; + string sub; + rgw_raw_obj sub_meta_obj; + + int read_sub(rgw_pubsub_sub_config *result, RGWObjVersionTracker *objv_tracker); + int write_sub(const rgw_pubsub_sub_config& sub_conf, RGWObjVersionTracker *objv_tracker); + int remove_sub(RGWObjVersionTracker *objv_tracker); + public: + Sub(RGWUserPubSub *_ps, const string& _sub) : ps(_ps), sub(_sub) { + ps->get_sub_meta_obj(sub, &sub_meta_obj); + } + + int subscribe(const string& topic_name, const rgw_pubsub_sub_dest& dest); + int unsubscribe(const string& topic_name); + int get_conf(rgw_pubsub_sub_config *result); + + struct list_events_result { + string next_marker; + bool is_truncated{false}; + std::vector events; + + void dump(Formatter *f) const; + }; + + int list_events(const string& marker, int max_events, list_events_result *result); + int remove_event(const string& event_id); + }; + + using BucketRef = std::shared_ptr; + using SubRef = std::shared_ptr; + + BucketRef get_bucket(const rgw_bucket& bucket) { + return std::make_shared(this, bucket); } - string sub_meta_oid(const string& name) const { - return pubsub_user_oid_prefix + user.to_str() + ".sub." + name; + SubRef get_sub(const string& sub) { + return std::make_shared(this, sub); } void get_user_meta_obj(rgw_raw_obj *obj) const { @@ -230,25 +309,10 @@ public: *obj = rgw_raw_obj(store->get_zone_params().log_pool, sub_meta_oid(name)); } - int get_topics(rgw_pubsub_user_topics *result); - int get_bucket_topics(const rgw_bucket& bucket, rgw_pubsub_user_topics *result); - int get_topic(const string& name, rgw_pubsub_user_topic_info *result); - int create_topic(const string& name, const rgw_bucket& bucket); + int get_user_topics(rgw_pubsub_user_topics *result); + int get_topic(const string& name, rgw_pubsub_topic_subs *result); + int create_topic(const string& name); int remove_topic(const string& name); - int get_sub(const string& name, rgw_pubsub_user_sub_config *result); - int add_sub(const string& name, const string& topic, const rgw_pubsub_user_sub_dest& dest); - int remove_sub(const string& name, const string& topic); - - struct list_events_result { - string next_marker; - bool is_truncated{false}; - std::vector events; - - void dump(Formatter *f) const; - }; - - int list_events(const string& sub_name, const string& marker, int max_events, list_events_result *result); - int remove_event(const string& sub_name, const string& event_id); }; template diff --git a/src/rgw/rgw_sync_module_pubsub.cc b/src/rgw/rgw_sync_module_pubsub.cc index c434bd37c7a..24a72862714 100644 --- a/src/rgw/rgw_sync_module_pubsub.cc +++ b/src/rgw/rgw_sync_module_pubsub.cc @@ -89,7 +89,7 @@ struct PSSubConfig { /* subscription config */ string data_bucket_name; string data_oid_prefix; - void from_user_conf(const rgw_pubsub_user_sub_config& uc) { + void from_user_conf(const rgw_pubsub_sub_config& uc) { name = uc.name; topic = uc.topic; push_endpoint = uc.dest.push_endpoint; @@ -597,7 +597,7 @@ public: PSSubscription(RGWDataSyncEnv *_sync_env, PSEnvRef _env, - rgw_pubsub_user_sub_config& user_sub_conf) : sync_env(_sync_env), + rgw_pubsub_sub_config& user_sub_conf) : sync_env(_sync_env), env(_env), sub_conf(std::make_shared()), data_access(std::make_shared(sync_env->store)) { @@ -649,7 +649,7 @@ class PSManager PSConfigRef conf; PSSubConfigRef sub_conf; - rgw_pubsub_user_sub_config user_sub_conf; + rgw_pubsub_sub_config user_sub_conf; public: GetSubCR(RGWDataSyncEnv *_sync_env, PSManagerRef& _mgr, @@ -677,7 +677,7 @@ class PSManager *ref = PSSubscription::get_shared(sync_env, mgr->env, sub_conf); } else { - using ReadInfoCR = RGWSimpleRadosReadCR; + using ReadInfoCR = RGWSimpleRadosReadCR; yield { RGWUserPubSub ups(sync_env->store, owner); rgw_raw_obj obj; @@ -832,8 +832,10 @@ class RGWPSFindBucketTopicsCR : public RGWCoroutine { RGWUserPubSub ups; - rgw_raw_obj obj; - rgw_pubsub_user_topics bucket_topics; + rgw_raw_obj bucket_obj; + rgw_raw_obj user_obj; + rgw_pubsub_bucket_topics bucket_topics; + rgw_pubsub_user_topics user_topics; TopicsRef *topics; public: RGWPSFindBucketTopicsCR(RGWDataSyncEnv *_sync_env, @@ -853,14 +855,14 @@ public: } int operate() override { reenter(this) { - ups.get_bucket_meta_obj(bucket, &obj); + ups.get_bucket_meta_obj(bucket, &bucket_obj); + ups.get_user_meta_obj(&user_obj); - - using ReadInfoCR = RGWSimpleRadosReadCR; + using ReadInfoCR = RGWSimpleRadosReadCR; yield { bool empty_on_enoent = true; call(new ReadInfoCR(sync_env->async_rados, sync_env->store, - obj, + bucket_obj, &bucket_topics, empty_on_enoent)); } if (retcode < 0 && retcode != -ENOENT) { @@ -869,11 +871,24 @@ public: ldout(sync_env->cct, 20) << "RGWPSFindBucketTopicsCR(): found " << bucket_topics.topics.size() << " topics for bucket " << bucket << dendl; + if (!bucket_topics.topics.empty()) { + using ReadUserTopicsInfoCR = RGWSimpleRadosReadCR; + yield { + bool empty_on_enoent = true; + call(new ReadUserTopicsInfoCR(sync_env->async_rados, sync_env->store, + user_obj, + &user_topics, empty_on_enoent)); + } + if (retcode < 0 && retcode != -ENOENT) { + return set_cr_error(retcode); + } + } + for (auto& titer : bucket_topics.topics) { auto& info = titer.second; shared_ptr tc = std::make_shared(); - tc->name = info.topic.name; - tc->subs = info.subs; + tc->name = info.name; + tc->subs = user_topics.topics[info.name].subs; (*topics)->push_back(tc); } diff --git a/src/rgw/rgw_sync_module_pubsub_rest.cc b/src/rgw/rgw_sync_module_pubsub_rest.cc index f2df4afb998..5b5392cbde5 100644 --- a/src/rgw/rgw_sync_module_pubsub_rest.cc +++ b/src/rgw/rgw_sync_module_pubsub_rest.cc @@ -14,30 +14,11 @@ protected: std::unique_ptr ups; string topic_name; string bucket_name; - RGWBucketInfo bucket_info; public: RGWPSCreateTopicOp() {} int verify_permission() override { - int ret = get_params(); - if (ret < 0) { - return ret; - } - - RGWObjectCtx& obj_ctx = *static_cast(s->obj_ctx); - - ret = store->get_bucket_info(obj_ctx, s->owner.get_id().tenant, bucket_name, - bucket_info, nullptr, nullptr); - if (ret < 0) { - return ret; - } - - if (bucket_info.owner != s->owner.get_id()) { - ldout(s->cct, 20) << "user doesn't own bucket, cannot create topic" << dendl; - return -EPERM; - } - return 0; } void pre_exec() override { @@ -53,8 +34,13 @@ public: void RGWPSCreateTopicOp::execute() { + op_ret = get_params(); + if (op_ret < 0) { + return; + } + ups = make_unique(store, s->owner.get_id()); - op_ret = ups->create_topic(topic_name, bucket_info.bucket); + op_ret = ups->create_topic(topic_name); if (op_ret < 0) { ldout(s->cct, 20) << "failed to create topic, ret=" << op_ret << dendl; return; @@ -67,22 +53,12 @@ public: int get_params() override { topic_name = s->object.name; - - bool exists; - bucket_name = s->info.args.get("bucket", &exists); - if (!exists) { - ldout(s->cct, 20) << "ERROR: missing required param 'bucket' for request" << dendl; - return -EINVAL; - } - return 0; } }; class RGWPSListTopicsOp : public RGWOp { protected: - string bucket_name; - RGWBucketInfo bucket_info; std::unique_ptr ups; rgw_pubsub_user_topics result; @@ -91,28 +67,6 @@ public: RGWPSListTopicsOp() {} int verify_permission() override { - int ret = get_params(); - if (ret < 0) { - return ret; - } - - if (bucket_name.empty()) { - return 0; - } - - RGWObjectCtx& obj_ctx = *static_cast(s->obj_ctx); - - ret = store->get_bucket_info(obj_ctx, s->owner.get_id().tenant, bucket_name, - bucket_info, nullptr, nullptr); - if (ret < 0) { - return ret; - } - - if (bucket_info.owner != s->owner.get_id()) { - ldout(s->cct, 20) << "user doesn't own bucket, cannot create topic" << dendl; - return -EPERM; - } - return 0; } void pre_exec() override { @@ -123,17 +77,12 @@ public: const char* name() const override { return "pubsub_topics_list"; } virtual RGWOpType get_type() override { return RGW_OP_PUBSUB_TOPICS_LIST; } virtual uint32_t op_mask() override { return RGW_OP_TYPE_READ; } - virtual int get_params() = 0; }; void RGWPSListTopicsOp::execute() { ups = make_unique(store, s->owner.get_id()); - if (bucket_name.empty()) { - op_ret = ups->get_topics(&result); - } else { - op_ret = ups->get_bucket_topics(bucket_info.bucket, &result); - } + op_ret = ups->get_user_topics(&result); if (op_ret < 0) { ldout(s->cct, 20) << "failed to get topics, ret=" << op_ret << dendl; return; @@ -145,11 +94,6 @@ class RGWPSListTopics_ObjStore_S3 : public RGWPSListTopicsOp { public: explicit RGWPSListTopics_ObjStore_S3() {} - int get_params() override { - bucket_name = s->info.args.get("bucket"); - return 0; - } - void send_response() override { if (op_ret) { set_req_state_err(s, op_ret); @@ -170,7 +114,7 @@ class RGWPSGetTopicOp : public RGWOp { protected: string topic_name; std::unique_ptr ups; - rgw_pubsub_user_topic_info result; + rgw_pubsub_topic_subs result; public: RGWPSGetTopicOp() {} @@ -318,7 +262,7 @@ protected: string sub_name; string topic_name; std::unique_ptr ups; - rgw_pubsub_user_sub_dest dest; + rgw_pubsub_sub_dest dest; public: RGWPSCreateSubOp() {} @@ -344,7 +288,8 @@ void RGWPSCreateSubOp::execute() return; } ups = make_unique(store, s->owner.get_id()); - op_ret = ups->add_sub(sub_name, topic_name, dest); + auto sub = ups->get_sub(sub_name); + op_ret = sub->subscribe(topic_name, dest); if (op_ret < 0) { ldout(s->cct, 20) << "failed to create subscription, ret=" << op_ret << dendl; return; @@ -376,7 +321,7 @@ class RGWPSGetSubOp : public RGWOp { protected: string sub_name; std::unique_ptr ups; - rgw_pubsub_user_sub_config result; + rgw_pubsub_sub_config result; public: RGWPSGetSubOp() {} @@ -402,7 +347,8 @@ void RGWPSGetSubOp::execute() return; } ups = make_unique(store, s->owner.get_id()); - op_ret = ups->get_sub(sub_name, &result); + auto sub = ups->get_sub(sub_name); + op_ret = sub->get_conf(&result); if (op_ret < 0) { ldout(s->cct, 20) << "failed to get subscription, ret=" << op_ret << dendl; return; @@ -468,7 +414,8 @@ void RGWPSDeleteSubOp::execute() return; } ups = make_unique(store, s->owner.get_id()); - op_ret = ups->remove_sub(sub_name, topic_name); + auto sub = ups->get_sub(sub_name); + op_ret = sub->unsubscribe(topic_name); if (op_ret < 0) { ldout(s->cct, 20) << "failed to remove subscription, ret=" << op_ret << dendl; return; @@ -516,7 +463,8 @@ void RGWPSAckSubEventOp::execute() return; } ups = make_unique(store, s->owner.get_id()); - op_ret = ups->remove_event(sub_name, event_id); + auto sub = ups->get_sub(sub_name); + op_ret = sub->remove_event(event_id); if (op_ret < 0) { ldout(s->cct, 20) << "failed to remove event, ret=" << op_ret << dendl; return; @@ -547,7 +495,7 @@ protected: string sub_name; string marker; std::unique_ptr ups; - RGWUserPubSub::list_events_result result; + RGWUserPubSub::Sub::list_events_result result; public: RGWPSPullSubEventsOp() {} @@ -573,7 +521,8 @@ void RGWPSPullSubEventsOp::execute() return; } ups = make_unique(store, s->owner.get_id()); - op_ret = ups->list_events(sub_name, marker, max_entries, &result); + auto sub = ups->get_sub(sub_name); + op_ret = sub->list_events(marker, max_entries, &result); if (op_ret < 0) { ldout(s->cct, 20) << "failed to get subscription, ret=" << op_ret << dendl; return; @@ -657,6 +606,288 @@ public: }; +static int notif_bucket_path(const string& path, string *bucket_name) +{ + if (path.empty()) { + return -EINVAL; + } + size_t pos = path.find('/'); + if (pos == string::npos) { + return -EINVAL; + } + if (pos >= path.size()) { + return -EINVAL; + } + + string type = path.substr(0, pos); + if (type != "bucket") { + return -EINVAL; + } + + *bucket_name = path.substr(pos + 1); + return 0; +} + +class RGWPSCreateNotifOp : public RGWOp { +protected: + std::unique_ptr ups; + string topic_name; + string bucket_name; + RGWBucketInfo bucket_info; + +public: + RGWPSCreateNotifOp() {} + + int verify_permission() override { + int ret = get_params(); + if (ret < 0) { + return ret; + } + + RGWObjectCtx& obj_ctx = *static_cast(s->obj_ctx); + + ret = store->get_bucket_info(obj_ctx, s->owner.get_id().tenant, bucket_name, + bucket_info, nullptr, nullptr); + if (ret < 0) { + return ret; + } + + if (bucket_info.owner != s->owner.get_id()) { + ldout(s->cct, 20) << "user doesn't own bucket, cannot create topic" << dendl; + return -EPERM; + } + return 0; + } + void pre_exec() override { + rgw_bucket_object_pre_exec(s); + } + void execute() override; + + const char* name() const override { return "pubsub_notification_create"; } + virtual RGWOpType get_type() override { return RGW_OP_PUBSUB_NOTIF_CREATE; } + virtual uint32_t op_mask() override { return RGW_OP_TYPE_WRITE; } + virtual int get_params() = 0; +}; + +void RGWPSCreateNotifOp::execute() +{ + op_ret = get_params(); + if (op_ret < 0) { + return; + } + + ups = make_unique(store, s->owner.get_id()); + auto b = ups->get_bucket(bucket_info.bucket); + op_ret = b->create_notification(topic_name); + if (op_ret < 0) { + ldout(s->cct, 20) << "failed to create notification, ret=" << op_ret << dendl; + return; + } +} + +class RGWPSCreateNotif_ObjStore_S3 : public RGWPSCreateNotifOp { +public: + explicit RGWPSCreateNotif_ObjStore_S3() {} + + int get_params() override { + bool exists; + topic_name = s->info.args.get("topic", &exists); + if (!exists) { + ldout(s->cct, 20) << "param 'topic' not provided" << dendl; + return -EINVAL; + } + return notif_bucket_path(s->object.name, &bucket_name); + } +}; + +class RGWPSDeleteNotifOp : public RGWOp { +protected: + std::unique_ptr ups; + string topic_name; + string bucket_name; + RGWBucketInfo bucket_info; + +public: + RGWPSDeleteNotifOp() {} + + int verify_permission() override { + int ret = get_params(); + if (ret < 0) { + return ret; + } + + RGWObjectCtx& obj_ctx = *static_cast(s->obj_ctx); + + ret = store->get_bucket_info(obj_ctx, s->owner.get_id().tenant, bucket_name, + bucket_info, nullptr, nullptr); + if (ret < 0) { + return ret; + } + + if (bucket_info.owner != s->owner.get_id()) { + ldout(s->cct, 20) << "user doesn't own bucket, cannot create topic" << dendl; + return -EPERM; + } + return 0; + } + void pre_exec() override { + rgw_bucket_object_pre_exec(s); + } + void execute() override; + + const char* name() const override { return "pubsub_notification_delete"; } + virtual RGWOpType get_type() override { return RGW_OP_PUBSUB_NOTIF_DELETE; } + virtual uint32_t op_mask() override { return RGW_OP_TYPE_DELETE; } + virtual int get_params() = 0; +}; + +void RGWPSDeleteNotifOp::execute() +{ + op_ret = get_params(); + if (op_ret < 0) { + return; + } + + ups = make_unique(store, s->owner.get_id()); + auto b = ups->get_bucket(bucket_info.bucket); + op_ret = b->remove_notification(topic_name); + if (op_ret < 0) { + ldout(s->cct, 20) << "failed to remove notification, ret=" << op_ret << dendl; + return; + } +} + +class RGWPSDeleteNotif_ObjStore_S3 : public RGWPSCreateNotifOp { +public: + explicit RGWPSDeleteNotif_ObjStore_S3() {} + + int get_params() override { + bool exists; + topic_name = s->info.args.get("topic", &exists); + if (!exists) { + ldout(s->cct, 20) << "param 'topic' not provided" << dendl; + return -EINVAL; + } + return notif_bucket_path(s->object.name, &bucket_name); + } +}; + +class RGWPSListNotifsOp : public RGWOp { +protected: + string bucket_name; + RGWBucketInfo bucket_info; + std::unique_ptr ups; + rgw_pubsub_bucket_topics result; + + +public: + RGWPSListNotifsOp() {} + + int verify_permission() override { + int ret = get_params(); + if (ret < 0) { + return ret; + } + + RGWObjectCtx& obj_ctx = *static_cast(s->obj_ctx); + + ret = store->get_bucket_info(obj_ctx, s->owner.get_id().tenant, bucket_name, + bucket_info, nullptr, nullptr); + if (ret < 0) { + return ret; + } + + if (bucket_info.owner != s->owner.get_id()) { + ldout(s->cct, 20) << "user doesn't own bucket, cannot create topic" << dendl; + return -EPERM; + } + + return 0; + } + void pre_exec() override { + rgw_bucket_object_pre_exec(s); + } + void execute() override; + + const char* name() const override { return "pubsub_notifications_list"; } + virtual RGWOpType get_type() override { return RGW_OP_PUBSUB_NOTIF_LIST; } + virtual uint32_t op_mask() override { return RGW_OP_TYPE_READ; } + virtual int get_params() = 0; +}; + +void RGWPSListNotifsOp::execute() +{ + ups = make_unique(store, s->owner.get_id()); + auto b = ups->get_bucket(bucket_info.bucket); + op_ret = b->get_topics(&result); + if (op_ret < 0) { + ldout(s->cct, 20) << "failed to get topics, ret=" << op_ret << dendl; + return; + } + +} + +class RGWPSListNotifs_ObjStore_S3 : public RGWPSListNotifsOp { +public: + explicit RGWPSListNotifs_ObjStore_S3() {} + + int get_params() override { + return notif_bucket_path(s->object.name, &bucket_name); + } + + void send_response() override { + if (op_ret) { + set_req_state_err(s, op_ret); + } + dump_errno(s); + end_header(s, this, "application/json"); + + if (op_ret < 0) { + return; + } + + encode_json("result", result, s->formatter); + rgw_flush_formatter_and_reset(s, s->formatter); + } +}; + + +class RGWHandler_REST_PSNotifs_S3 : public RGWHandler_REST_S3 { +protected: + int init_permissions(RGWOp* op) override { + return 0; + } + + int read_permissions(RGWOp* op) override { + return 0; + } + bool supports_quota() override { + return false; + } + RGWOp *op_get() override { + if (s->object.empty()) { + return nullptr; + } + return new RGWPSListNotifs_ObjStore_S3(); + } + RGWOp *op_put() override { + if (!s->object.empty()) { + return new RGWPSCreateNotif_ObjStore_S3(); + } + return nullptr; + } + RGWOp *op_delete() override { + if (!s->object.empty()) { + return new RGWPSDeleteNotif_ObjStore_S3(); + } + return nullptr; + } +public: + explicit RGWHandler_REST_PSNotifs_S3(const rgw::auth::StrategyRegistry& auth_registry) : RGWHandler_REST_S3(auth_registry) {} + virtual ~RGWHandler_REST_PSNotifs_S3() {} +}; + + RGWHandler_REST* RGWRESTMgr_PubSub_S3::get_handler(struct req_state* const s, const rgw::auth::StrategyRegistry& auth_registry, const std::string& frontend_prefix) @@ -678,6 +909,10 @@ RGWHandler_REST* RGWRESTMgr_PubSub_S3::get_handler(struct req_state* const s, handler = new RGWHandler_REST_PSSub_S3(auth_registry); } + if (s->init_state.url_bucket == "notifications") { + handler = new RGWHandler_REST_PSNotifs_S3(auth_registry); + } + ldout(s->cct, 20) << __func__ << " handler=" << (handler ? typeid(*handler).name() : "") << dendl; return handler; }