cout << " script-package add add a lua package to the scripts allowlist\n";
cout << " script-package rm remove a lua package from the scripts allowlist\n";
cout << " script-package list get the lua packages allowlist\n";
+ cout << " notification list list bucket notifications configuration\n";
+ cout << " notification get get a bucket notifications configuration\n";
+ cout << " notification rm remove a bucket notifications configuration\n";
cout << "options:\n";
cout << " --tenant=<tenant> tenant name\n";
cout << " --user_ns=<namespace> namespace of user (oidc in case of users authenticated with oidc provider)\n";
cout << " --totp-pin the valid value of a TOTP token at a certain time\n";
cout << "\nBucket notifications options:\n";
cout << " --topic bucket notifications topic name\n";
+ cout << " --notification-id bucket notifications id\n";
cout << "\nScript options:\n";
cout << " --context context in which the script runs. one of: "+LUA_CONTEXT_LIST+"\n";
cout << " --package name of the lua package that should be added/removed to/from the allowlist\n";
MFA_RESYNC,
RESHARD_STALE_INSTANCES_LIST,
RESHARD_STALE_INSTANCES_DELETE,
- PUBSUB_TOPICS_LIST,
+ PUBSUB_TOPIC_LIST,
PUBSUB_TOPIC_GET,
PUBSUB_TOPIC_RM,
+ PUBSUB_NOTIFICATION_LIST,
+ PUBSUB_NOTIFICATION_GET,
+ PUBSUB_NOTIFICATION_RM,
SCRIPT_PUT,
SCRIPT_GET,
SCRIPT_RM,
{ "reshard stale list", OPT::RESHARD_STALE_INSTANCES_LIST },
{ "reshard stale-instances delete", OPT::RESHARD_STALE_INSTANCES_DELETE },
{ "reshard stale delete", OPT::RESHARD_STALE_INSTANCES_DELETE },
- { "topic list", OPT::PUBSUB_TOPICS_LIST },
+ { "topic list", OPT::PUBSUB_TOPIC_LIST },
{ "topic get", OPT::PUBSUB_TOPIC_GET },
{ "topic rm", OPT::PUBSUB_TOPIC_RM },
+ { "notification list", OPT::PUBSUB_NOTIFICATION_LIST },
+ { "notification get", OPT::PUBSUB_NOTIFICATION_GET },
+ { "notification rm", OPT::PUBSUB_NOTIFICATION_RM },
{ "script put", OPT::SCRIPT_PUT },
{ "script get", OPT::SCRIPT_GET },
{ "script rm", OPT::SCRIPT_RM },
int trim_delay_ms = 0;
string topic_name;
+ string notification_id;
string sub_name;
string event_id;
trim_delay_ms = atoi(val.c_str());
} else if (ceph_argparse_witharg(args, i, &val, "--topic", (char*)NULL)) {
topic_name = val;
+ } else if (ceph_argparse_witharg(args, i, &val, "--notification-id", (char*)NULL)) {
+ notification_id = val;
} else if (ceph_argparse_witharg(args, i, &val, "--subscription", (char*)NULL)) {
sub_name = val;
} else if (ceph_argparse_witharg(args, i, &val, "--event-id", (char*)NULL)) {
OPT::ROLE_POLICY_GET,
OPT::RESHARD_LIST,
OPT::RESHARD_STATUS,
- OPT::PUBSUB_TOPICS_LIST,
+ OPT::PUBSUB_TOPIC_LIST,
+ OPT::PUBSUB_NOTIFICATION_LIST,
OPT::PUBSUB_TOPIC_GET,
+ OPT::PUBSUB_NOTIFICATION_GET,
OPT::SCRIPT_GET,
};
&& opt_cmd != OPT::RESHARD_ADD
&& opt_cmd != OPT::RESHARD_CANCEL
&& opt_cmd != OPT::RESHARD_STATUS
- && opt_cmd != OPT::PUBSUB_TOPICS_LIST
+ && opt_cmd != OPT::PUBSUB_TOPIC_LIST
+ && opt_cmd != OPT::PUBSUB_NOTIFICATION_LIST
&& opt_cmd != OPT::PUBSUB_TOPIC_GET
- && opt_cmd != OPT::PUBSUB_TOPIC_RM) {
+ && opt_cmd != OPT::PUBSUB_NOTIFICATION_GET
+ && opt_cmd != OPT::PUBSUB_TOPIC_RM
+ && opt_cmd != OPT::PUBSUB_NOTIFICATION_RM) {
cerr << "ERROR: --tenant is set, but there's no user ID" << std::endl;
return EINVAL;
}
}
}
- if (opt_cmd == OPT::PUBSUB_TOPICS_LIST) {
+ if (opt_cmd == OPT::PUBSUB_NOTIFICATION_LIST) {
+ if (bucket_name.empty()) {
+ cerr << "ERROR: bucket name was not provided (via --bucket)" << std::endl;
+ return EINVAL;
+ }
RGWPubSub ps(driver, tenant);
- if (!bucket_name.empty()) {
- rgw_pubsub_bucket_topics result;
- int ret = init_bucket(user.get(), tenant, bucket_name, bucket_id, &bucket);
- if (ret < 0) {
- cerr << "ERROR: could not init bucket: " << cpp_strerror(-ret) << std::endl;
- return -ret;
- }
+ rgw_pubsub_bucket_topics result;
+ int ret = init_bucket(user.get(), tenant, bucket_name, bucket_id, &bucket);
+ if (ret < 0) {
+ cerr << "ERROR: could not init bucket: " << cpp_strerror(-ret) << std::endl;
+ return -ret;
+ }
- const RGWPubSub::Bucket b(ps, bucket.get());
- ret = b.get_topics(dpp(), result, null_yield);
- if (ret < 0 && ret != -ENOENT) {
- cerr << "ERROR: could not get topics: " << cpp_strerror(-ret) << std::endl;
- return -ret;
- }
- encode_json("result", result, formatter.get());
- } else {
- rgw_pubsub_topics result;
- int ret = ps.get_topics(dpp(), result, null_yield);
- if (ret < 0 && ret != -ENOENT) {
- cerr << "ERROR: could not get topics: " << cpp_strerror(-ret) << std::endl;
- return -ret;
- }
- encode_json("result", result, formatter.get());
+ const RGWPubSub::Bucket b(ps, bucket.get());
+ ret = b.get_topics(dpp(), result, null_yield);
+ if (ret < 0 && ret != -ENOENT) {
+ cerr << "ERROR: could not get topics: " << cpp_strerror(-ret) << std::endl;
+ return -ret;
}
+ encode_json("result", result, formatter.get());
+ formatter->flush(cout);
+ }
+
+ if (opt_cmd == OPT::PUBSUB_TOPIC_LIST) {
+ RGWPubSub ps(driver, tenant);
+
+ rgw_pubsub_topics result;
+ int ret = ps.get_topics(dpp(), result, null_yield);
+ if (ret < 0 && ret != -ENOENT) {
+ cerr << "ERROR: could not get topics: " << cpp_strerror(-ret) << std::endl;
+ return -ret;
+ }
+ encode_json("result", result, formatter.get());
formatter->flush(cout);
}
formatter->flush(cout);
}
+ if (opt_cmd == OPT::PUBSUB_NOTIFICATION_GET) {
+ if (notification_id.empty()) {
+ cerr << "ERROR: notification-id was not provided (via --notification-id)" << std::endl;
+ return EINVAL;
+ }
+ if (bucket_name.empty()) {
+ cerr << "ERROR: bucket name was not provided (via --bucket)" << std::endl;
+ return EINVAL;
+ }
+
+ int ret = init_bucket(user.get(), tenant, bucket_name, bucket_id, &bucket);
+ if (ret < 0) {
+ cerr << "ERROR: could not init bucket: " << cpp_strerror(-ret) << std::endl;
+ return -ret;
+ }
+
+ RGWPubSub ps(driver, tenant);
+
+ rgw_pubsub_bucket_topics bucket_topics;
+ const RGWPubSub::Bucket b(ps, bucket.get());
+ ret = b.get_topics(dpp(), bucket_topics, null_yield);
+ if (ret < 0 && ret != -ENOENT) {
+ cerr << "ERROR: could not get bucket notifications: " << cpp_strerror(-ret) << std::endl;
+ return -ret;
+ }
+
+ rgw_pubsub_topic_filter bucket_topic;
+ ret = b.get_notification_by_id(dpp(), notification_id, bucket_topic, null_yield);
+ if (ret < 0) {
+ cerr << "ERROR: could not get notification: " << cpp_strerror(-ret) << std::endl;
+ return -ret;
+ }
+ encode_json("notification", bucket_topic, formatter.get());
+ formatter->flush(cout);
+ }
+
if (opt_cmd == OPT::PUBSUB_TOPIC_RM) {
if (topic_name.empty()) {
cerr << "ERROR: topic name was not provided (via --topic)" << std::endl;
}
}
+ if (opt_cmd == OPT::PUBSUB_NOTIFICATION_RM) {
+ if (bucket_name.empty()) {
+ cerr << "ERROR: bucket name was not provided (via --bucket)" << std::endl;
+ return EINVAL;
+ }
+
+ int ret = init_bucket(user.get(), tenant, bucket_name, bucket_id, &bucket);
+ if (ret < 0) {
+ cerr << "ERROR: could not init bucket: " << cpp_strerror(-ret) << std::endl;
+ return -ret;
+ }
+
+ RGWPubSub ps(driver, tenant);
+
+ rgw_pubsub_bucket_topics bucket_topics;
+ const RGWPubSub::Bucket b(ps, bucket.get());
+ ret = b.get_topics(dpp(), bucket_topics, null_yield);
+ if (ret < 0 && ret != -ENOENT) {
+ cerr << "ERROR: could not get bucket notifications: " << cpp_strerror(-ret) << std::endl;
+ return -ret;
+ }
+
+ rgw_pubsub_topic_filter bucket_topic;
+ if(notification_id.empty()) {
+ ret = b.remove_notifications(dpp(), null_yield);
+ } else {
+ ret = b.remove_notification_by_id(dpp(), notification_id, null_yield);
+ }
+ }
+
if (opt_cmd == OPT::SCRIPT_PUT) {
if (!str_script_ctx) {
cerr << "ERROR: context was not provided (via --context)" << std::endl;
}
}
+void rgw_s3_key_filter::dump(Formatter *f) const {
+ if (!prefix_rule.empty()) {
+ f->open_object_section("FilterRule");
+ ::encode_json("Name", "prefix", f);
+ ::encode_json("Value", prefix_rule, f);
+ f->close_section();
+ }
+ if (!suffix_rule.empty()) {
+ f->open_object_section("FilterRule");
+ ::encode_json("Name", "suffix", f);
+ ::encode_json("Value", suffix_rule, f);
+ f->close_section();
+ }
+ if (!regex_rule.empty()) {
+ f->open_object_section("FilterRule");
+ ::encode_json("Name", "regex", f);
+ ::encode_json("Value", regex_rule, f);
+ f->close_section();
+ }
+}
+
bool rgw_s3_key_filter::decode_xml(XMLObj* obj) {
XMLObjIter iter = obj->find("FilterRule");
XMLObj *o;
return !(prefix_rule.empty() && suffix_rule.empty() && regex_rule.empty());
}
+void rgw_s3_key_value_filter::dump(Formatter *f) const {
+ for (const auto& key_value : kv) {
+ f->open_object_section("FilterRule");
+ ::encode_json("Name", key_value.first, f);
+ ::encode_json("Value", key_value.second, f);
+ f->close_section();
+ }
+}
+
bool rgw_s3_key_value_filter::decode_xml(XMLObj* obj) {
kv.clear();
XMLObjIter iter = obj->find("FilterRule");
return !kv.empty();
}
+void rgw_s3_filter::dump(Formatter *f) const {
+ encode_json("S3Key", key_filter, f);
+ encode_json("S3Metadata", metadata_filter, f);
+ encode_json("S3Tags", tag_filter, f);
+}
+
bool rgw_s3_filter::decode_xml(XMLObj* obj) {
RGWXMLDecoder::decode_xml("S3Key", key_filter, obj);
RGWXMLDecoder::decode_xml("S3Metadata", metadata_filter, obj);
void rgw_pubsub_topic_filter::dump(Formatter *f) const
{
- encode_json("topic", topic, f);
- encode_json("events", events, f);
+ encode_json("TopicArn", topic.arn, f);
+ encode_json("Id", s3_id, f);
+ encode_json("Events", events, f);
+ encode_json("Filter", s3_filter, f);
}
void rgw_pubsub_bucket_topics::dump(Formatter *f) const
{
- Formatter::ArraySection s(*f, "topics");
+ Formatter::ArraySection s(*f, "notifications");
for (auto& t : topics) {
encode_json(t.first.c_str(), t.second, f);
}
return 0;
}
+// from list of bucket topics, find the one that was auto-generated by a notification
+auto find_unique_topic(const rgw_pubsub_bucket_topics &bucket_topics, const std::string ¬ification_id) {
+ auto it = std::find_if(bucket_topics.topics.begin(), bucket_topics.topics.end(),
+ [&](const auto& val) { return notification_id == val.second.s3_id; });
+ return it != bucket_topics.topics.end() ?
+ std::optional<std::reference_wrapper<const rgw_pubsub_topic_filter>>(it->second):
+ std::nullopt;
+}
+
+int RGWPubSub::Bucket::get_notification_by_id(const DoutPrefixProvider *dpp, const std::string& notification_id,
+ rgw_pubsub_topic_filter& result, optional_yield y) const {
+ rgw_pubsub_bucket_topics bucket_topics;
+ const int ret = read_topics(dpp, bucket_topics, nullptr, y);
+ if (ret < 0) {
+ ldpp_dout(dpp, 1) << "ERROR: failed to read bucket_topics info: ret=" << ret << dendl;
+ return ret;
+ }
+
+ auto iter = find_unique_topic(bucket_topics, notification_id);
+ if (!iter) {
+ ldpp_dout(dpp, 1) << "ERROR: notification was not found" << dendl;
+ return -ENOENT;
+ }
+
+ result = iter->get();
+ return 0;
+}
+
+
int RGWPubSub::Bucket::create_notification(const DoutPrefixProvider *dpp, const std::string& topic_name,
const rgw::notify::EventTypeList& events, optional_yield y) const {
return create_notification(dpp, topic_name, events, std::nullopt, "", y);
}
int RGWPubSub::Bucket::remove_notification(const DoutPrefixProvider *dpp, const std::string& topic_name, optional_yield y) const
+{
+ return remove_notification_inner(dpp, topic_name, false, y);
+}
+
+int RGWPubSub::Bucket::remove_notification_inner(const DoutPrefixProvider *dpp, const std::string& notification_id,
+ bool is_notification_id, optional_yield y) const
{
RGWObjVersionTracker objv_tracker;
rgw_pubsub_bucket_topics bucket_topics;
return ret;
}
- if (bucket_topics.topics.erase(topic_name) == 0) {
+
+ std::unique_ptr<std::string> topic_name = std::make_unique<std::string>(notification_id);
+ if(is_notification_id) {
+ auto iter = find_unique_topic(bucket_topics, notification_id);
+ if (!iter) {
+ ldpp_dout(dpp, 1) << "ERROR: notification was not found" << dendl;
+ return -ENOENT;
+ }
+ topic_name = std::make_unique<std::string>(iter->get().topic.name);
+ }
+
+ if (bucket_topics.topics.erase(*topic_name) == 0) {
ldpp_dout(dpp, 1) << "INFO: no need to remove, topic does not exist" << dendl;
return 0;
}
return 0;
}
+int RGWPubSub::Bucket::remove_notification_by_id(const DoutPrefixProvider *dpp, const std::string& notif_id, optional_yield y) const
+{
+ return remove_notification_inner(dpp, notif_id, true, y);
+}
+
int RGWPubSub::Bucket::remove_notifications(const DoutPrefixProvider *dpp, optional_yield y) const
{
// get all topics on a bucket
bool has_content() const;
+ void dump(Formatter *f) const;
bool decode_xml(XMLObj *obj);
void dump_xml(Formatter *f) const;
KeyValueMap kv;
bool has_content() const;
-
+
+ void dump(Formatter *f) const;
bool decode_xml(XMLObj *obj);
void dump_xml(Formatter *f) const;
rgw_s3_key_value_filter tag_filter;
bool has_content() const;
-
+
+ void dump(Formatter *f) const;
bool decode_xml(XMLObj *obj);
void dump_xml(Formatter *f) const;
// return 0 on success, error code otherwise
int write_topics(const DoutPrefixProvider *dpp, const rgw_pubsub_bucket_topics& topics,
RGWObjVersionTracker* objv_tracker, optional_yield y) const;
+ int remove_notification_inner(const DoutPrefixProvider *dpp, const std::string& notification_id,
+ bool notif_id_or_topic, optional_yield y) const;
public:
Bucket(const RGWPubSub& _ps, rgw::sal::Bucket* _bucket) :
ps(_ps), bucket(_bucket)
int get_topics(const DoutPrefixProvider *dpp, rgw_pubsub_bucket_topics& result, optional_yield y) const {
return read_topics(dpp, result, nullptr, y);
}
+ // get a bucket_topic with by its name and populate it into "result"
+ // return -ENOENT if the topic does not exists
+ // return 0 on success, error code otherwise
+ int get_notification_by_id(const DoutPrefixProvider *dpp, const std::string& notification_id, rgw_pubsub_topic_filter& result, optional_yield y) const;
// adds a topic + filter (event list, and possibly name metadata or tags filters) to a bucket
// assigning a notification name is optional (needed for S3 compatible notifications)
// if the topic already exist on the bucket, the filter event list may be updated
const rgw::notify::EventTypeList& events, OptionalFilter s3_filter, const std::string& notif_name, optional_yield y) const;
// remove a topic and filter from bucket
// if the topic does not exists on the bucket it is a no-op (considered success)
- // return -ENOENT if the topic does not exists
+ // return -ENOENT if the notification-id/topic does not exists
// return 0 on success, error code otherwise
+ int remove_notification_by_id(const DoutPrefixProvider *dpp, const std::string& notif_id, optional_yield y) const;
int remove_notification(const DoutPrefixProvider *dpp, const std::string& topic_name, optional_yield y) const;
// remove all notifications (and autogenerated topics) associated with the bucket
// return 0 on success or if no topic was associated with the bucket, error code otherwise
script-package add add a lua package to the scripts allowlist
script-package rm remove a lua package from the scripts allowlist
script-package list get the lua packages allowlist
+ notification list list bucket notifications configuration
+ notification get get a bucket notifications configuration
+ notification rm remove a bucket notifications configuration
options:
--tenant=<tenant> tenant name
--user_ns=<namespace> namespace of user (oidc in case of users authenticated with oidc provider)
Bucket notifications options:
--topic bucket notifications topic name
+ --notification-id bucket notifications id
Script options:
--context context in which the script runs. one of: prerequest, postrequest, background, getdata, putdata
assert_equal(len(parsed_result['topics']), 0)
+@attr('basic_test')
+def test_ps_s3_notification_configuration_admin_on_master():
+ """ test s3 notification list/get/delete on master """
+ conn = connection()
+ zonegroup = 'default'
+ bucket_name = gen_bucket_name()
+ bucket = conn.create_bucket(bucket_name)
+ topic_name = bucket_name + TOPIC_SUFFIX
+
+ # create s3 topics
+ endpoint_address = 'amqp://127.0.0.1:7001/vhost_1'
+ endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=amqp.direct&amqp-ack-level=none'
+ topic_conf = PSTopicS3(conn, topic_name+'_1', zonegroup, endpoint_args=endpoint_args)
+ # clean all topics
+ try:
+ result = topic_conf.get_list()[0]['ListTopicsResponse']['ListTopicsResult']['Topics']
+ topics = []
+ if result is not None:
+ topics = result['member']
+ for topic in topics:
+ topic_conf.del_config(topic_arn=topic['TopicArn'])
+ except Exception as err:
+ print('failed to do topic cleanup: ' + str(err))
+
+ topic_arn = topic_conf.set_config()
+ assert_equal(topic_arn,
+ 'arn:aws:sns:' + zonegroup + '::' + topic_name + '_1')
+ # create s3 notification
+ notification_name = bucket_name + NOTIFICATION_SUFFIX
+ topic_conf_list = [{'Id': notification_name+'_1',
+ 'TopicArn': topic_arn,
+ 'Events': ['s3:ObjectCreated:*']
+ },
+ {'Id': notification_name+'_2',
+ 'TopicArn': topic_arn,
+ 'Events': ['s3:ObjectRemoved:*']
+ },
+ {'Id': notification_name+'_3',
+ 'TopicArn': topic_arn,
+ 'Events': []
+ }]
+ s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
+ _, status = s3_notification_conf.set_config()
+ assert_equal(status/100, 2)
+
+ # list notification
+ result = admin(['notification', 'list', '--bucket', bucket_name])
+ parsed_result = json.loads(result[0])
+ assert_equal(len(parsed_result['notifications']), 3)
+ assert_equal(result[1], 0)
+
+ # get notification 1
+ result = admin(['notification', 'get', '--bucket', bucket_name, '--notification-id', notification_name+'_1'])
+ parsed_result = json.loads(result[0])
+ assert_equal(parsed_result['Id'], notification_name+'_1')
+ assert_equal(result[1], 0)
+
+ # remove notification 3
+ _, result = admin(['notification', 'rm', '--bucket', bucket_name, '--notification-id', notification_name+'_3'])
+ assert_equal(result, 0)
+
+ # list notification
+ result = admin(['notification', 'list', '--bucket', bucket_name])
+ parsed_result = json.loads(result[0])
+ assert_equal(len(parsed_result['notifications']), 2)
+ assert_equal(result[1], 0)
+
+ # delete notifications
+ _, result = admin(['notification', 'rm', '--bucket', bucket_name])
+ assert_equal(result, 0)
+
+ # list notification, make sure it is empty
+ result = admin(['notification', 'list', '--bucket', bucket_name])
+ parsed_result = json.loads(result[0])
+ assert_equal(len(parsed_result['notifications']), 0)
+ assert_equal(result[1], 0)
+
+
@attr('modification_required')
def test_ps_s3_topic_with_secret_on_master():
""" test s3 topics with secret set/get/delete on master """