From bc04b2448e3ab15757e53c3361047d49d03b4b1e Mon Sep 17 00:00:00 2001 From: Ali Masarwa Date: Mon, 8 May 2023 16:55:01 +0300 Subject: [PATCH] RGW: add admin interfaces to get and delete notifications from bucket Signed-off-by: Ali Masarwa (cherry picked from commit b0307db65f30ac4d40cd283bc175229f943d3e82) --- src/rgw/rgw_admin.cc | 147 ++++++++++++++++---- src/rgw/rgw_pubsub.cc | 97 ++++++++++++- src/rgw/rgw_pubsub.h | 16 ++- src/test/cli/radosgw-admin/help.t | 4 + src/test/rgw/bucket_notification/test_bn.py | 78 +++++++++++ 5 files changed, 307 insertions(+), 35 deletions(-) diff --git a/src/rgw/rgw_admin.cc b/src/rgw/rgw_admin.cc index 6a181c954d4bb..346eb2cf05392 100644 --- a/src/rgw/rgw_admin.cc +++ b/src/rgw/rgw_admin.cc @@ -318,6 +318,9 @@ void usage() cout << " script-package add add a lua package to the scripts allowlist\n"; cout << " script-package rm remove a lua package from the scripts allowlist\n"; cout << " script-package list get the lua packages allowlist\n"; + cout << " notification list list bucket notifications configuration\n"; + cout << " notification get get a bucket notifications configuration\n"; + cout << " notification rm remove a bucket notifications configuration\n"; cout << "options:\n"; cout << " --tenant= tenant name\n"; cout << " --user_ns= namespace of user (oidc in case of users authenticated with oidc provider)\n"; @@ -483,6 +486,7 @@ void usage() cout << " --totp-pin the valid value of a TOTP token at a certain time\n"; cout << "\nBucket notifications options:\n"; cout << " --topic bucket notifications topic name\n"; + cout << " --notification-id bucket notifications id\n"; cout << "\nScript options:\n"; cout << " --context context in which the script runs. one of: "+LUA_CONTEXT_LIST+"\n"; cout << " --package name of the lua package that should be added/removed to/from the allowlist\n"; @@ -831,9 +835,12 @@ enum class OPT { MFA_RESYNC, RESHARD_STALE_INSTANCES_LIST, RESHARD_STALE_INSTANCES_DELETE, - PUBSUB_TOPICS_LIST, + PUBSUB_TOPIC_LIST, PUBSUB_TOPIC_GET, PUBSUB_TOPIC_RM, + PUBSUB_NOTIFICATION_LIST, + PUBSUB_NOTIFICATION_GET, + PUBSUB_NOTIFICATION_RM, SCRIPT_PUT, SCRIPT_GET, SCRIPT_RM, @@ -1063,9 +1070,12 @@ static SimpleCmd::Commands all_cmds = { { "reshard stale list", OPT::RESHARD_STALE_INSTANCES_LIST }, { "reshard stale-instances delete", OPT::RESHARD_STALE_INSTANCES_DELETE }, { "reshard stale delete", OPT::RESHARD_STALE_INSTANCES_DELETE }, - { "topic list", OPT::PUBSUB_TOPICS_LIST }, + { "topic list", OPT::PUBSUB_TOPIC_LIST }, { "topic get", OPT::PUBSUB_TOPIC_GET }, { "topic rm", OPT::PUBSUB_TOPIC_RM }, + { "notification list", OPT::PUBSUB_NOTIFICATION_LIST }, + { "notification get", OPT::PUBSUB_NOTIFICATION_GET }, + { "notification rm", OPT::PUBSUB_NOTIFICATION_RM }, { "script put", OPT::SCRIPT_PUT }, { "script get", OPT::SCRIPT_GET }, { "script rm", OPT::SCRIPT_RM }, @@ -3490,6 +3500,7 @@ int main(int argc, const char **argv) int trim_delay_ms = 0; string topic_name; + string notification_id; string sub_name; string event_id; @@ -3965,6 +3976,8 @@ int main(int argc, const char **argv) trim_delay_ms = atoi(val.c_str()); } else if (ceph_argparse_witharg(args, i, &val, "--topic", (char*)NULL)) { topic_name = val; + } else if (ceph_argparse_witharg(args, i, &val, "--notification-id", (char*)NULL)) { + notification_id = val; } else if (ceph_argparse_witharg(args, i, &val, "--subscription", (char*)NULL)) { sub_name = val; } else if (ceph_argparse_witharg(args, i, &val, "--event-id", (char*)NULL)) { @@ -4211,8 +4224,10 @@ int main(int argc, const char **argv) OPT::ROLE_POLICY_GET, OPT::RESHARD_LIST, OPT::RESHARD_STATUS, - OPT::PUBSUB_TOPICS_LIST, + OPT::PUBSUB_TOPIC_LIST, + OPT::PUBSUB_NOTIFICATION_LIST, OPT::PUBSUB_TOPIC_GET, + OPT::PUBSUB_NOTIFICATION_GET, OPT::SCRIPT_GET, }; @@ -4292,9 +4307,12 @@ int main(int argc, const char **argv) && opt_cmd != OPT::RESHARD_ADD && opt_cmd != OPT::RESHARD_CANCEL && opt_cmd != OPT::RESHARD_STATUS - && opt_cmd != OPT::PUBSUB_TOPICS_LIST + && opt_cmd != OPT::PUBSUB_TOPIC_LIST + && opt_cmd != OPT::PUBSUB_NOTIFICATION_LIST && opt_cmd != OPT::PUBSUB_TOPIC_GET - && opt_cmd != OPT::PUBSUB_TOPIC_RM) { + && opt_cmd != OPT::PUBSUB_NOTIFICATION_GET + && opt_cmd != OPT::PUBSUB_TOPIC_RM + && opt_cmd != OPT::PUBSUB_NOTIFICATION_RM) { cerr << "ERROR: --tenant is set, but there's no user ID" << std::endl; return EINVAL; } @@ -10450,34 +10468,41 @@ next: } } - if (opt_cmd == OPT::PUBSUB_TOPICS_LIST) { + if (opt_cmd == OPT::PUBSUB_NOTIFICATION_LIST) { + if (bucket_name.empty()) { + cerr << "ERROR: bucket name was not provided (via --bucket)" << std::endl; + return EINVAL; + } RGWPubSub ps(driver, tenant); - if (!bucket_name.empty()) { - rgw_pubsub_bucket_topics result; - int ret = init_bucket(user.get(), tenant, bucket_name, bucket_id, &bucket); - if (ret < 0) { - cerr << "ERROR: could not init bucket: " << cpp_strerror(-ret) << std::endl; - return -ret; - } + rgw_pubsub_bucket_topics result; + int ret = init_bucket(user.get(), tenant, bucket_name, bucket_id, &bucket); + if (ret < 0) { + cerr << "ERROR: could not init bucket: " << cpp_strerror(-ret) << std::endl; + return -ret; + } - const RGWPubSub::Bucket b(ps, bucket.get()); - ret = b.get_topics(dpp(), result, null_yield); - if (ret < 0 && ret != -ENOENT) { - cerr << "ERROR: could not get topics: " << cpp_strerror(-ret) << std::endl; - return -ret; - } - encode_json("result", result, formatter.get()); - } else { - rgw_pubsub_topics result; - int ret = ps.get_topics(dpp(), result, null_yield); - if (ret < 0 && ret != -ENOENT) { - cerr << "ERROR: could not get topics: " << cpp_strerror(-ret) << std::endl; - return -ret; - } - encode_json("result", result, formatter.get()); + const RGWPubSub::Bucket b(ps, bucket.get()); + ret = b.get_topics(dpp(), result, null_yield); + if (ret < 0 && ret != -ENOENT) { + cerr << "ERROR: could not get topics: " << cpp_strerror(-ret) << std::endl; + return -ret; } + encode_json("result", result, formatter.get()); + formatter->flush(cout); + } + + if (opt_cmd == OPT::PUBSUB_TOPIC_LIST) { + RGWPubSub ps(driver, tenant); + + rgw_pubsub_topics result; + int ret = ps.get_topics(dpp(), result, null_yield); + if (ret < 0 && ret != -ENOENT) { + cerr << "ERROR: could not get topics: " << cpp_strerror(-ret) << std::endl; + return -ret; + } + encode_json("result", result, formatter.get()); formatter->flush(cout); } @@ -10499,6 +10524,42 @@ next: formatter->flush(cout); } + if (opt_cmd == OPT::PUBSUB_NOTIFICATION_GET) { + if (notification_id.empty()) { + cerr << "ERROR: notification-id was not provided (via --notification-id)" << std::endl; + return EINVAL; + } + if (bucket_name.empty()) { + cerr << "ERROR: bucket name was not provided (via --bucket)" << std::endl; + return EINVAL; + } + + int ret = init_bucket(user.get(), tenant, bucket_name, bucket_id, &bucket); + if (ret < 0) { + cerr << "ERROR: could not init bucket: " << cpp_strerror(-ret) << std::endl; + return -ret; + } + + RGWPubSub ps(driver, tenant); + + rgw_pubsub_bucket_topics bucket_topics; + const RGWPubSub::Bucket b(ps, bucket.get()); + ret = b.get_topics(dpp(), bucket_topics, null_yield); + if (ret < 0 && ret != -ENOENT) { + cerr << "ERROR: could not get bucket notifications: " << cpp_strerror(-ret) << std::endl; + return -ret; + } + + rgw_pubsub_topic_filter bucket_topic; + ret = b.get_notification_by_id(dpp(), notification_id, bucket_topic, null_yield); + if (ret < 0) { + cerr << "ERROR: could not get notification: " << cpp_strerror(-ret) << std::endl; + return -ret; + } + encode_json("notification", bucket_topic, formatter.get()); + formatter->flush(cout); + } + if (opt_cmd == OPT::PUBSUB_TOPIC_RM) { if (topic_name.empty()) { cerr << "ERROR: topic name was not provided (via --topic)" << std::endl; @@ -10514,6 +10575,36 @@ next: } } + if (opt_cmd == OPT::PUBSUB_NOTIFICATION_RM) { + if (bucket_name.empty()) { + cerr << "ERROR: bucket name was not provided (via --bucket)" << std::endl; + return EINVAL; + } + + int ret = init_bucket(user.get(), tenant, bucket_name, bucket_id, &bucket); + if (ret < 0) { + cerr << "ERROR: could not init bucket: " << cpp_strerror(-ret) << std::endl; + return -ret; + } + + RGWPubSub ps(driver, tenant); + + rgw_pubsub_bucket_topics bucket_topics; + const RGWPubSub::Bucket b(ps, bucket.get()); + ret = b.get_topics(dpp(), bucket_topics, null_yield); + if (ret < 0 && ret != -ENOENT) { + cerr << "ERROR: could not get bucket notifications: " << cpp_strerror(-ret) << std::endl; + return -ret; + } + + rgw_pubsub_topic_filter bucket_topic; + if(notification_id.empty()) { + ret = b.remove_notifications(dpp(), null_yield); + } else { + ret = b.remove_notification_by_id(dpp(), notification_id, null_yield); + } + } + if (opt_cmd == OPT::SCRIPT_PUT) { if (!str_script_ctx) { cerr << "ERROR: context was not provided (via --context)" << std::endl; diff --git a/src/rgw/rgw_pubsub.cc b/src/rgw/rgw_pubsub.cc index 6ebd87e3fe434..e0ee659d1ad75 100644 --- a/src/rgw/rgw_pubsub.cc +++ b/src/rgw/rgw_pubsub.cc @@ -22,6 +22,27 @@ void set_event_id(std::string& id, const std::string& hash, const utime_t& ts) { } } +void rgw_s3_key_filter::dump(Formatter *f) const { + if (!prefix_rule.empty()) { + f->open_object_section("FilterRule"); + ::encode_json("Name", "prefix", f); + ::encode_json("Value", prefix_rule, f); + f->close_section(); + } + if (!suffix_rule.empty()) { + f->open_object_section("FilterRule"); + ::encode_json("Name", "suffix", f); + ::encode_json("Value", suffix_rule, f); + f->close_section(); + } + if (!regex_rule.empty()) { + f->open_object_section("FilterRule"); + ::encode_json("Name", "regex", f); + ::encode_json("Value", regex_rule, f); + f->close_section(); + } +} + bool rgw_s3_key_filter::decode_xml(XMLObj* obj) { XMLObjIter iter = obj->find("FilterRule"); XMLObj *o; @@ -75,6 +96,15 @@ bool rgw_s3_key_filter::has_content() const { return !(prefix_rule.empty() && suffix_rule.empty() && regex_rule.empty()); } +void rgw_s3_key_value_filter::dump(Formatter *f) const { + for (const auto& key_value : kv) { + f->open_object_section("FilterRule"); + ::encode_json("Name", key_value.first, f); + ::encode_json("Value", key_value.second, f); + f->close_section(); + } +} + bool rgw_s3_key_value_filter::decode_xml(XMLObj* obj) { kv.clear(); XMLObjIter iter = obj->find("FilterRule"); @@ -106,6 +136,12 @@ bool rgw_s3_key_value_filter::has_content() const { return !kv.empty(); } +void rgw_s3_filter::dump(Formatter *f) const { + encode_json("S3Key", key_filter, f); + encode_json("S3Metadata", metadata_filter, f); + encode_json("S3Tags", tag_filter, f); +} + bool rgw_s3_filter::decode_xml(XMLObj* obj) { RGWXMLDecoder::decode_xml("S3Key", key_filter, obj); RGWXMLDecoder::decode_xml("S3Metadata", metadata_filter, obj); @@ -343,13 +379,15 @@ void encode_json(const char *name, const rgw::notify::EventTypeList& l, Formatte void rgw_pubsub_topic_filter::dump(Formatter *f) const { - encode_json("topic", topic, f); - encode_json("events", events, f); + encode_json("TopicArn", topic.arn, f); + encode_json("Id", s3_id, f); + encode_json("Events", events, f); + encode_json("Filter", s3_filter, f); } void rgw_pubsub_bucket_topics::dump(Formatter *f) const { - Formatter::ArraySection s(*f, "topics"); + Formatter::ArraySection s(*f, "notifications"); for (auto& t : topics) { encode_json(t.first.c_str(), t.second, f); } @@ -472,6 +510,35 @@ int RGWPubSub::get_topic(const DoutPrefixProvider *dpp, const std::string& name, return 0; } +// from list of bucket topics, find the one that was auto-generated by a notification +auto find_unique_topic(const rgw_pubsub_bucket_topics &bucket_topics, const std::string ¬ification_id) { + auto it = std::find_if(bucket_topics.topics.begin(), bucket_topics.topics.end(), + [&](const auto& val) { return notification_id == val.second.s3_id; }); + return it != bucket_topics.topics.end() ? + std::optional>(it->second): + std::nullopt; +} + +int RGWPubSub::Bucket::get_notification_by_id(const DoutPrefixProvider *dpp, const std::string& notification_id, + rgw_pubsub_topic_filter& result, optional_yield y) const { + rgw_pubsub_bucket_topics bucket_topics; + const int ret = read_topics(dpp, bucket_topics, nullptr, y); + if (ret < 0) { + ldpp_dout(dpp, 1) << "ERROR: failed to read bucket_topics info: ret=" << ret << dendl; + return ret; + } + + auto iter = find_unique_topic(bucket_topics, notification_id); + if (!iter) { + ldpp_dout(dpp, 1) << "ERROR: notification was not found" << dendl; + return -ENOENT; + } + + result = iter->get(); + return 0; +} + + int RGWPubSub::Bucket::create_notification(const DoutPrefixProvider *dpp, const std::string& topic_name, const rgw::notify::EventTypeList& events, optional_yield y) const { return create_notification(dpp, topic_name, events, std::nullopt, "", y); @@ -520,6 +587,12 @@ int RGWPubSub::Bucket::create_notification(const DoutPrefixProvider *dpp, const } int RGWPubSub::Bucket::remove_notification(const DoutPrefixProvider *dpp, const std::string& topic_name, optional_yield y) const +{ + return remove_notification_inner(dpp, topic_name, false, y); +} + +int RGWPubSub::Bucket::remove_notification_inner(const DoutPrefixProvider *dpp, const std::string& notification_id, + bool is_notification_id, optional_yield y) const { RGWObjVersionTracker objv_tracker; rgw_pubsub_bucket_topics bucket_topics; @@ -530,7 +603,18 @@ int RGWPubSub::Bucket::remove_notification(const DoutPrefixProvider *dpp, const return ret; } - if (bucket_topics.topics.erase(topic_name) == 0) { + + std::unique_ptr topic_name = std::make_unique(notification_id); + if(is_notification_id) { + auto iter = find_unique_topic(bucket_topics, notification_id); + if (!iter) { + ldpp_dout(dpp, 1) << "ERROR: notification was not found" << dendl; + return -ENOENT; + } + topic_name = std::make_unique(iter->get().topic.name); + } + + if (bucket_topics.topics.erase(*topic_name) == 0) { ldpp_dout(dpp, 1) << "INFO: no need to remove, topic does not exist" << dendl; return 0; } @@ -555,6 +639,11 @@ int RGWPubSub::Bucket::remove_notification(const DoutPrefixProvider *dpp, const return 0; } +int RGWPubSub::Bucket::remove_notification_by_id(const DoutPrefixProvider *dpp, const std::string& notif_id, optional_yield y) const +{ + return remove_notification_inner(dpp, notif_id, true, y); +} + int RGWPubSub::Bucket::remove_notifications(const DoutPrefixProvider *dpp, optional_yield y) const { // get all topics on a bucket diff --git a/src/rgw/rgw_pubsub.h b/src/rgw/rgw_pubsub.h index 974581ce3d917..290c52c2b8fe5 100644 --- a/src/rgw/rgw_pubsub.h +++ b/src/rgw/rgw_pubsub.h @@ -18,6 +18,7 @@ struct rgw_s3_key_filter { bool has_content() const; + void dump(Formatter *f) const; bool decode_xml(XMLObj *obj); void dump_xml(Formatter *f) const; @@ -46,7 +47,8 @@ struct rgw_s3_key_value_filter { KeyValueMap kv; bool has_content() const; - + + void dump(Formatter *f) const; bool decode_xml(XMLObj *obj); void dump_xml(Formatter *f) const; @@ -69,7 +71,8 @@ struct rgw_s3_filter { rgw_s3_key_value_filter tag_filter; bool has_content() const; - + + void dump(Formatter *f) const; bool decode_xml(XMLObj *obj); void dump_xml(Formatter *f) const; @@ -563,6 +566,8 @@ public: // return 0 on success, error code otherwise int write_topics(const DoutPrefixProvider *dpp, const rgw_pubsub_bucket_topics& topics, RGWObjVersionTracker* objv_tracker, optional_yield y) const; + int remove_notification_inner(const DoutPrefixProvider *dpp, const std::string& notification_id, + bool notif_id_or_topic, optional_yield y) const; public: Bucket(const RGWPubSub& _ps, rgw::sal::Bucket* _bucket) : ps(_ps), bucket(_bucket) @@ -573,6 +578,10 @@ public: int get_topics(const DoutPrefixProvider *dpp, rgw_pubsub_bucket_topics& result, optional_yield y) const { return read_topics(dpp, result, nullptr, y); } + // get a bucket_topic with by its name and populate it into "result" + // return -ENOENT if the topic does not exists + // return 0 on success, error code otherwise + int get_notification_by_id(const DoutPrefixProvider *dpp, const std::string& notification_id, rgw_pubsub_topic_filter& result, optional_yield y) const; // adds a topic + filter (event list, and possibly name metadata or tags filters) to a bucket // assigning a notification name is optional (needed for S3 compatible notifications) // if the topic already exist on the bucket, the filter event list may be updated @@ -585,8 +594,9 @@ public: const rgw::notify::EventTypeList& events, OptionalFilter s3_filter, const std::string& notif_name, optional_yield y) const; // remove a topic and filter from bucket // if the topic does not exists on the bucket it is a no-op (considered success) - // return -ENOENT if the topic does not exists + // return -ENOENT if the notification-id/topic does not exists // return 0 on success, error code otherwise + int remove_notification_by_id(const DoutPrefixProvider *dpp, const std::string& notif_id, optional_yield y) const; int remove_notification(const DoutPrefixProvider *dpp, const std::string& topic_name, optional_yield y) const; // remove all notifications (and autogenerated topics) associated with the bucket // return 0 on success or if no topic was associated with the bucket, error code otherwise diff --git a/src/test/cli/radosgw-admin/help.t b/src/test/cli/radosgw-admin/help.t index 72b14c09c8359..c5984273bcb27 100644 --- a/src/test/cli/radosgw-admin/help.t +++ b/src/test/cli/radosgw-admin/help.t @@ -188,6 +188,9 @@ script-package add add a lua package to the scripts allowlist script-package rm remove a lua package from the scripts allowlist script-package list get the lua packages allowlist + notification list list bucket notifications configuration + notification get get a bucket notifications configuration + notification rm remove a bucket notifications configuration options: --tenant= tenant name --user_ns= namespace of user (oidc in case of users authenticated with oidc provider) @@ -360,6 +363,7 @@ Bucket notifications options: --topic bucket notifications topic name + --notification-id bucket notifications id Script options: --context context in which the script runs. one of: prerequest, postrequest, background, getdata, putdata diff --git a/src/test/rgw/bucket_notification/test_bn.py b/src/test/rgw/bucket_notification/test_bn.py index ae253107cb09e..28079f6b42680 100644 --- a/src/test/rgw/bucket_notification/test_bn.py +++ b/src/test/rgw/bucket_notification/test_bn.py @@ -685,6 +685,84 @@ def test_ps_s3_topic_admin_on_master(): assert_equal(len(parsed_result['topics']), 0) +@attr('basic_test') +def test_ps_s3_notification_configuration_admin_on_master(): + """ test s3 notification list/get/delete on master """ + conn = connection() + zonegroup = 'default' + bucket_name = gen_bucket_name() + bucket = conn.create_bucket(bucket_name) + topic_name = bucket_name + TOPIC_SUFFIX + + # create s3 topics + endpoint_address = 'amqp://127.0.0.1:7001/vhost_1' + endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=amqp.direct&amqp-ack-level=none' + topic_conf = PSTopicS3(conn, topic_name+'_1', zonegroup, endpoint_args=endpoint_args) + # clean all topics + try: + result = topic_conf.get_list()[0]['ListTopicsResponse']['ListTopicsResult']['Topics'] + topics = [] + if result is not None: + topics = result['member'] + for topic in topics: + topic_conf.del_config(topic_arn=topic['TopicArn']) + except Exception as err: + print('failed to do topic cleanup: ' + str(err)) + + topic_arn = topic_conf.set_config() + assert_equal(topic_arn, + 'arn:aws:sns:' + zonegroup + '::' + topic_name + '_1') + # create s3 notification + notification_name = bucket_name + NOTIFICATION_SUFFIX + topic_conf_list = [{'Id': notification_name+'_1', + 'TopicArn': topic_arn, + 'Events': ['s3:ObjectCreated:*'] + }, + {'Id': notification_name+'_2', + 'TopicArn': topic_arn, + 'Events': ['s3:ObjectRemoved:*'] + }, + {'Id': notification_name+'_3', + 'TopicArn': topic_arn, + 'Events': [] + }] + s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list) + _, status = s3_notification_conf.set_config() + assert_equal(status/100, 2) + + # list notification + result = admin(['notification', 'list', '--bucket', bucket_name]) + parsed_result = json.loads(result[0]) + assert_equal(len(parsed_result['notifications']), 3) + assert_equal(result[1], 0) + + # get notification 1 + result = admin(['notification', 'get', '--bucket', bucket_name, '--notification-id', notification_name+'_1']) + parsed_result = json.loads(result[0]) + assert_equal(parsed_result['Id'], notification_name+'_1') + assert_equal(result[1], 0) + + # remove notification 3 + _, result = admin(['notification', 'rm', '--bucket', bucket_name, '--notification-id', notification_name+'_3']) + assert_equal(result, 0) + + # list notification + result = admin(['notification', 'list', '--bucket', bucket_name]) + parsed_result = json.loads(result[0]) + assert_equal(len(parsed_result['notifications']), 2) + assert_equal(result[1], 0) + + # delete notifications + _, result = admin(['notification', 'rm', '--bucket', bucket_name]) + assert_equal(result, 0) + + # list notification, make sure it is empty + result = admin(['notification', 'list', '--bucket', bucket_name]) + parsed_result = json.loads(result[0]) + assert_equal(len(parsed_result['notifications']), 0) + assert_equal(result[1], 0) + + @attr('modification_required') def test_ps_s3_topic_with_secret_on_master(): """ test s3 topics with secret set/get/delete on master """ -- 2.39.5