A user can create different topics. A topic entity is defined by its user and its name. A
user can only manage its own topics, and can only associate them with buckets it owns.
-In order to send notifications for events for a specific bucket, a notification ientity needs to be created. A
+In order to send notifications for events for a specific bucket, a notification entity needs to be created. A
notification can be created on a subset of event types, or for all event types (default).
+The notification may also filter out events based on preffix/suffix and/or regular expression matching of the keys. As well as,
+on the metadata attributes attached to the object.
There can be multiple notifications for any specific topic, and the same topic could be used for multiple notifications.
REST API has been defined to provide configuration and control interfaces for the bucket notification
-==================
+==================
PubSub Sync Module
==================
- In case that bucket deletion implicitly deletes the notification,
the associated subscription will not be deleted automatically (any events of the deleted bucket could still be access),
and will have to be deleted explicitly with the subscription deletion API
+ - Filtering based on metadata (which is an extension to S3) is not supported, and such rules will be ignored
Non S3-Compliant Notifications
+-----------------------------------+----------------------------------------------+
| ``<CloudFunctionConfiguration>`` | not needed, we treat all destinations as SNS |
+-----------------------------------+----------------------------------------------+
-| ``<Filter>`` | object filtering not supported |
-+-----------------------------------+----------------------------------------------+
REST API Extension
------------------
- In S3, it is only possible to fetch all notifications on a bucket
+- In addition to filtering based on prefix/suffix of object keys we support:
+
+ - Filtering based on regular expression matching
+
+ - Filtering based on metadata attributes attached to the object
+
+- Filtering overlapping is allowed, so that same event could be sent as different notification
+
+
Unsupported Fields in the Event Record
--------------------------------------
<Id></Id>
<Topic></Topic>
<Event></Event>
+ <Filter>
+ <S3Key>
+ <FilterRule>
+ <Name></Name>
+ <Value></Value>
+ </FilterRule>
+ </S3Key>
+ <S3Metadata>
+ <FilterRule>
+ <Name></Name>
+ <Value></Value>
+ </FilterRule>
+ </s3Metadata>
+ </Filter>
</TopicConfiguration>
</NotificationConfiguration>
| ``Event`` | String | List of supported events see: `S3 Notification Compatibility`_. Multiple ``Event`` | No |
| | | entities can be used. If omitted, all events are handled | |
+-------------------------------+-----------+--------------------------------------------------------------------------------------+----------+
+| ``Filter`` | Container | Holding ``S3Key`` and ``S3Metadata`` entities | No |
++-------------------------------+-----------+--------------------------------------------------------------------------------------+----------+
+| ``S3Key`` | Container | Holding a list of ``FilterRule`` entities, for filtering based on object key. | No |
+| | | At most, 3 entities may be in the list, with ``Name`` be ``prefix``, ``suffix`` or | |
+| | | ``regex``. All filter rules in the list must match for the filter to match. | |
++-------------------------------+-----------+--------------------------------------------------------------------------------------+----------+
+| ``S3Metadata`` | Container | Holding a list of ``FilterRule`` entities, for filtering based on object metadata. | No |
+| | | All filter rules in the list must match the ones defined on the object. The object, | |
+| | | have other metadata entitied not listed in the filter. | |
++-------------------------------+-----------+--------------------------------------------------------------------------------------+----------+
+| ``S3Key.FilterRule`` | Container | Holding ``Name`` and ``Value`` entities. ``Name`` would be: ``prefix``, ``suffix`` | Yes |
+| | | or ``regex``. The ``Value`` would hold the key prefix, key suffix or a regular | |
+| | | expression for matching the key, accordingly. | |
++-------------------------------+-----------+--------------------------------------------------------------------------------------+----------+
+| ``S3Metadata.FilterRule`` | Container | Holding ``Name`` and ``Value`` entities. ``Name`` would be the name of the metadata | Yes |
+| | | attribute (e.g. ``x-amz-meta-xxx``). The ``Value`` would be the expected value for | |
+| | | this attribute | |
++-------------------------------+-----------+--------------------------------------------------------------------------------------+----------+
+
HTTP Response
~~~~~~~~~~~~~
<Id></Id>
<Topic></Topic>
<Event></Event>
+ <Filter>
+ <S3Key>
+ <FilterRule>
+ <Name></Name>
+ <Value></Value>
+ </FilterRule>
+ </S3Key>
+ <S3Metadata>
+ <FilterRule>
+ <Name></Name>
+ <Value></Value>
+ </FilterRule>
+ </s3Metadata>
+ </Filter>
</TopicConfiguration>
</NotificationConfiguration>
+-------------------------------+-----------+--------------------------------------------------------------------------------------+----------+
| ``Event`` | String | Handled event. Multiple ``Event`` entities may exist | Yes |
+-------------------------------+-----------+--------------------------------------------------------------------------------------+----------+
+| ``Filter`` | Container | Holding the filters configured for this notification | No |
++-------------------------------+-----------+--------------------------------------------------------------------------------------+----------+
HTTP Response
~~~~~~~~~~~~~
record.x_meta_map = s->info.x_meta_map;
}
-bool filter(const rgw_pubsub_topic_filter& filter, const req_state* s, EventType event_type) {
- // if event list exists, and none of the events in the list matches the event type, filter the message
- if (filter.events.size() && std::find(filter.events.begin(), filter.events.end(), event_type) == filter.events.end()) {
- return true;
- }
- // TODO: add filter by compliant conf: object name, prefix, suffix
- // TODO: add extra filtering criteria: object size, ToD, metadata, ...
+bool match(const rgw_pubsub_topic_filter& filter, const req_state* s, EventType event) {
+ if (!::match(filter.events, event)) {
+ return false;
+ }
+ if (!::match(filter.s3_filter.key_filter, s->object.name)) {
+ return false;
+ }
+ if (!::match(filter.s3_filter.metadata_filter, s->info.x_meta_map)) {
return false;
+ }
+ return true;
}
int publish(const req_state* s,
for (const auto& bucket_topic : bucket_topics.topics) {
const rgw_pubsub_topic_filter& topic_filter = bucket_topic.second;
const rgw_pubsub_topic& topic_cfg = topic_filter.topic;
- if (filter(topic_filter, s, event_type)) {
+ if (!match(topic_filter, s, event_type)) {
// topic does not apply to req_state
continue;
}
#include "rgw_arn.h"
#include "rgw_pubsub_push.h"
#include "rgw_rados.h"
+#include <regex>
+#include <algorithm>
#define dout_subsys ceph_subsys_rgw
-void do_decode_xml_obj(rgw::notify::EventTypeList& l, const string& name, XMLObj *obj)
-{
+bool rgw_s3_key_filter::decode_xml(XMLObj* obj) {
+ XMLObjIter iter = obj->find("FilterRule");
+ XMLObj *o;
+
+ const auto throw_if_missing = true;
+ auto prefix_not_set = true;
+ auto suffix_not_set = true;
+ auto regex_not_set = true;
+ std::string name;
+
+ while ((o = iter.get_next())) {
+ RGWXMLDecoder::decode_xml("Name", name, o, throw_if_missing);
+ if (name == "prefix" && prefix_not_set) {
+ prefix_not_set = false;
+ RGWXMLDecoder::decode_xml("Value", prefix_rule, o, throw_if_missing);
+ } else if (name == "suffix" && suffix_not_set) {
+ suffix_not_set = false;
+ RGWXMLDecoder::decode_xml("Value", suffix_rule, o, throw_if_missing);
+ } else if (name == "regex" && regex_not_set) {
+ regex_not_set = false;
+ RGWXMLDecoder::decode_xml("Value", regex_rule, o, throw_if_missing);
+ } else {
+ throw RGWXMLDecoder::err("invalid/duplicate S3Key filter rule name: '" + name + "'");
+ }
+ }
+ return true;
+}
+
+void rgw_s3_key_filter::dump_xml(Formatter *f) const {
+ if (!prefix_rule.empty()) {
+ f->open_object_section("FilterRule");
+ ::encode_xml("Name", "prefix", f);
+ ::encode_xml("Value", prefix_rule, f);
+ f->close_section();
+ }
+ if (!suffix_rule.empty()) {
+ f->open_object_section("FilterRule");
+ ::encode_xml("Name", "suffix", f);
+ ::encode_xml("Value", suffix_rule, f);
+ f->close_section();
+ }
+ if (!regex_rule.empty()) {
+ f->open_object_section("FilterRule");
+ ::encode_xml("Name", "regex", f);
+ ::encode_xml("Value", regex_rule, f);
+ f->close_section();
+ }
+}
+
+bool rgw_s3_key_filter::has_content() const {
+ return !(prefix_rule.empty() && suffix_rule.empty() && regex_rule.empty());
+}
+
+bool rgw_s3_metadata_filter::decode_xml(XMLObj* obj) {
+ metadata.clear();
+ XMLObjIter iter = obj->find("FilterRule");
+ XMLObj *o;
+
+ const auto throw_if_missing = true;
+
+ std::string key;
+ std::string value;
+
+ while ((o = iter.get_next())) {
+ RGWXMLDecoder::decode_xml("Name", key, o, throw_if_missing);
+ RGWXMLDecoder::decode_xml("Value", value, o, throw_if_missing);
+ metadata.emplace(key, value);
+ }
+ return true;
+}
+
+void rgw_s3_metadata_filter::dump_xml(Formatter *f) const {
+ for (const auto& key_value : metadata) {
+ f->open_object_section("FilterRule");
+ ::encode_xml("Name", key_value.first, f);
+ ::encode_xml("Value", key_value.second, f);
+ f->close_section();
+ }
+}
+
+bool rgw_s3_metadata_filter::has_content() const {
+ return !metadata.empty();
+}
+
+bool rgw_s3_filter::decode_xml(XMLObj* obj) {
+ RGWXMLDecoder::decode_xml("S3Key", key_filter, obj);
+ RGWXMLDecoder::decode_xml("S3Metadata", metadata_filter, obj);
+ return true;
+}
+
+void rgw_s3_filter::dump_xml(Formatter *f) const {
+ if (key_filter.has_content()) {
+ ::encode_xml("S3Key", key_filter, f);
+ }
+ if (metadata_filter.has_content()) {
+ ::encode_xml("S3Metadata", metadata_filter, f);
+ }
+}
+
+bool rgw_s3_filter::has_content() const {
+ return key_filter.has_content() ||
+ metadata_filter.has_content();
+}
+
+bool match(const rgw_s3_key_filter& filter, const std::string& key) {
+ const auto key_size = key.size();
+ const auto prefix_size = filter.prefix_rule.size();
+ if (prefix_size != 0) {
+ // prefix rule exists
+ if (prefix_size > key_size) {
+ // if prefix is longer than key, we fail
+ return false;
+ }
+ if (!std::equal(filter.prefix_rule.begin(), filter.prefix_rule.end(), key.begin())) {
+ return false;
+ }
+ }
+ const auto suffix_size = filter.suffix_rule.size();
+ if (suffix_size != 0) {
+ // suffix rule exists
+ if (suffix_size > key_size) {
+ // if suffix is longer than key, we fail
+ return false;
+ }
+ if (!std::equal(filter.suffix_rule.begin(), filter.suffix_rule.end(), (key.end() - suffix_size))) {
+ return false;
+ }
+ }
+ if (!filter.regex_rule.empty()) {
+ // TODO add regex chaching in the filter
+ const std::regex base_regex(filter.regex_rule);
+ if (!std::regex_match(key, base_regex)) {
+ return false;
+ }
+ }
+ return true;
+}
+
+bool match(const rgw_s3_metadata_filter& filter, const Metadata& metadata) {
+ // all filter pairs must exist with the same value in the object's metadata
+ // object metadata may include items not in the filter
+ return std::includes(metadata.begin(), metadata.end(), filter.metadata.begin(), filter.metadata.end());
+}
+
+bool match(const rgw::notify::EventTypeList& events, rgw::notify::EventType event) {
+ // if event list exists, and none of the events in the list matches the event type, filter the message
+ if (!events.empty() && std::find(events.begin(), events.end(), event) == events.end()) {
+ return false;
+ }
+ return true;
+}
+
+void do_decode_xml_obj(rgw::notify::EventTypeList& l, const string& name, XMLObj *obj) {
l.clear();
XMLObjIter iter = obj->find(name);
RGWXMLDecoder::decode_xml("Id", id, obj, throw_if_missing);
RGWXMLDecoder::decode_xml("Topic", topic_arn, obj, throw_if_missing);
+
+ RGWXMLDecoder::decode_xml("Filter", filter, obj);
do_decode_xml_obj(events, "Event", obj);
if (events.empty()) {
void rgw_pubsub_s3_notification::dump_xml(Formatter *f) const {
::encode_xml("Id", id, f);
::encode_xml("Topic", topic_arn.c_str(), f);
+ if (filter.has_content()) {
+ ::encode_xml("Filter", filter, f);
+ }
for (const auto& event : events) {
::encode_xml("Event", rgw::notify::to_string(event), f);
}
return true;
}
+rgw_pubsub_s3_notification::rgw_pubsub_s3_notification(const rgw_pubsub_topic_filter& topic_filter) :
+ id(topic_filter.s3_id), events(topic_filter.events), topic_arn(topic_filter.topic.arn), filter(topic_filter.s3_filter) {}
+
void rgw_pubsub_s3_notifications::dump_xml(Formatter *f) const {
do_encode_xml("NotificationConfiguration", list, "TopicConfiguration", f);
}
return 0;
}
-int RGWUserPubSub::Bucket::create_notification(const string& topic_name, const rgw::notify::EventTypeList& events, const std::string& notif_name)
-{
+int RGWUserPubSub::Bucket::create_notification(const string& topic_name, const rgw::notify::EventTypeList& events) {
+ return create_notification(topic_name, events, std::nullopt, "");
+}
+
+int RGWUserPubSub::Bucket::create_notification(const string& topic_name, const rgw::notify::EventTypeList& events, OptionalFilter s3_filter, const std::string& notif_name) {
rgw_pubsub_topic_subs user_topic_info;
rgw::sal::RGWRadosStore *store = ps->store;
topic_filter.topic = user_topic_info.topic;
topic_filter.events = events;
topic_filter.s3_id = notif_name;
+ if (s3_filter) {
+ topic_filter.s3_filter = *s3_filter;
+ }
ret = write_topics(bucket_topics, &objv_tracker);
if (ret < 0) {
class XMLObj;
+struct rgw_s3_key_filter {
+ std::string prefix_rule;
+ std::string suffix_rule;
+ std::string regex_rule;
+
+ bool has_content() const;
+
+ bool decode_xml(XMLObj *obj);
+ void dump_xml(Formatter *f) const;
+
+ void encode(bufferlist& bl) const {
+ ENCODE_START(1, 1, bl);
+ encode(prefix_rule, bl);
+ encode(suffix_rule, bl);
+ encode(regex_rule, bl);
+ ENCODE_FINISH(bl);
+ }
+
+ void decode(bufferlist::const_iterator& bl) {
+ DECODE_START(1, bl);
+ decode(prefix_rule, bl);
+ decode(suffix_rule, bl);
+ decode(regex_rule, bl);
+ DECODE_FINISH(bl);
+ }
+};
+WRITE_CLASS_ENCODER(rgw_s3_key_filter)
+
+using Metadata = std::map<std::string, std::string>;
+
+struct rgw_s3_metadata_filter {
+ Metadata metadata;
+
+ bool has_content() const;
+
+ bool decode_xml(XMLObj *obj);
+ void dump_xml(Formatter *f) const;
+
+ void encode(bufferlist& bl) const {
+ ENCODE_START(1, 1, bl);
+ encode(metadata, bl);
+ ENCODE_FINISH(bl);
+ }
+ void decode(bufferlist::const_iterator& bl) {
+ DECODE_START(1, bl);
+ decode(metadata, bl);
+ DECODE_FINISH(bl);
+ }
+};
+WRITE_CLASS_ENCODER(rgw_s3_metadata_filter)
+
+struct rgw_s3_filter {
+ rgw_s3_key_filter key_filter;
+ rgw_s3_metadata_filter metadata_filter;
+
+ bool has_content() const;
+
+ bool decode_xml(XMLObj *obj);
+ void dump_xml(Formatter *f) const;
+
+ void encode(bufferlist& bl) const {
+ ENCODE_START(1, 1, bl);
+ encode(key_filter, bl);
+ encode(metadata_filter, bl);
+ ENCODE_FINISH(bl);
+ }
+
+ void decode(bufferlist::const_iterator& bl) {
+ DECODE_START(1, bl);
+ decode(key_filter, bl);
+ decode(metadata_filter, bl);
+ DECODE_FINISH(bl);
+ }
+};
+WRITE_CLASS_ENCODER(rgw_s3_filter)
+
+using OptionalFilter = std::optional<rgw_s3_filter>;
+
+class rgw_pubsub_topic_filter;
/* S3 notification configuration
* based on: https://docs.aws.amazon.com/AmazonS3/latest/API/RESTBucketPUTnotification.html
<NotificationConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
<Value>jpg</Value>
</FilterRule>
</S3Key>
+ <S3Metadata>
+ <FilterRule>
+ <Name></Name>
+ <Value></Value>
+ </FilterRule>
+ </s3Metadata>
</Filter>
<Id>notification1</Id>
<Topic>arn:aws:sns:<region>:<account>:<topic></Topic>
rgw::notify::EventTypeList events;
// topic ARN
std::string topic_arn;
+ // filter rules
+ rgw_s3_filter filter;
bool decode_xml(XMLObj *obj);
void dump_xml(Formatter *f) const;
+
+ rgw_pubsub_s3_notification() = default;
+ // construct from rgw_pubsub_topic_filter (used by get/list notifications)
+ rgw_pubsub_s3_notification(const rgw_pubsub_topic_filter& topic_filter);
};
+// return true if the key matches the prefix/suffix/regex rules of the key filter
+bool match(const rgw_s3_key_filter& filter, const std::string& key);
+// return true if the key matches the metadata rules of the metadata filter
+bool match(const rgw_s3_metadata_filter& filter, const Metadata& metadata);
+// return true if the event type matches (equal or contained in) one of the events in the list
+bool match(const rgw::notify::EventTypeList& events, rgw::notify::EventType event);
+
struct rgw_pubsub_s3_notifications {
std::list<rgw_pubsub_s3_notification> list;
bool decode_xml(XMLObj *obj);
rgw_pubsub_topic topic;
rgw::notify::EventTypeList events;
std::string s3_id;
+ rgw_s3_filter s3_filter;
void encode(bufferlist& bl) const {
- ENCODE_START(2, 1, bl);
+ ENCODE_START(3, 1, bl);
encode(topic, bl);
// events are stored as a vector of strings
std::vector<std::string> tmp_events;
std::transform(events.begin(), events.end(), std::back_inserter(tmp_events), converter);
encode(tmp_events, bl);
encode(s3_id, bl);
+ encode(s3_filter, bl);
ENCODE_FINISH(bl);
}
void decode(bufferlist::const_iterator& bl) {
- DECODE_START(2, bl);
+ DECODE_START(3, bl);
decode(topic, bl);
// events are stored as a vector of strings
events.clear();
if (struct_v >= 2) {
decode(s3_id, bl);
}
+ if (struct_v >= 3) {
+ decode(s3_filter, bl);
+ }
DECODE_FINISH(bl);
}
// read the list of topics associated with a bucket and populate into result
// return 0 on success or if no topic was associated with the bucket, error code otherwise
int get_topics(rgw_pubsub_bucket_topics *result);
- // adds a topic + filter (event list) to a bucket
+ // adds a topic + filter (event list, and possibly name and metadata filters) to a bucket
// assigning a notification name is optional (needed for S3 compatible notifications)
// if the topic already exist on the bucket, the filter event list may be updated
+ // for S3 compliant notifications the version with: s3_filter and notif_name should be used
// return -ENOENT if the topic does not exists
// return 0 on success, error code otherwise
- int create_notification(const string& topic_name, const rgw::notify::EventTypeList& events, const std::string& notif_name="");
+ int create_notification(const string& topic_name, const rgw::notify::EventTypeList& events);
+ int create_notification(const string& topic_name, const rgw::notify::EventTypeList& events, OptionalFilter s3_filter, const std::string& notif_name);
// remove a topic and filter from bucket
// if the topic does not exists on the bucket it is a no-op (considered success)
// return -ENOENT if the topic does not exists
return;
}
- {
- XMLFormatter* f = static_cast<XMLFormatter*>(s->formatter);
- f->open_object_section_in_ns("CreateTopicResponse", "https://sns.amazonaws.com/doc/2010-03-31/");
- f->open_object_section("CreateTopicResult");
- encode_xml("TopicArn", topic_arn, f);
- f->close_section();
- f->open_object_section("ResponseMetadata");
- encode_xml("RequestId", s->req_id, f);
- f->close_section();
- f->close_section();
- }
- rgw_flush_formatter_and_reset(s, s->formatter);
+ const auto f = s->formatter;
+ f->open_object_section_in_ns("CreateTopicResponse", "https://sns.amazonaws.com/doc/2010-03-31/");
+ f->open_object_section("CreateTopicResult");
+ encode_xml("TopicArn", topic_arn, f);
+ f->close_section();
+ f->open_object_section("ResponseMetadata");
+ encode_xml("RequestId", s->req_id, f);
+ f->close_section();
+ f->close_section();
+ rgw_flush_formatter_and_reset(s, f);
}
};
return;
}
- {
- XMLFormatter* f = static_cast<XMLFormatter*>(s->formatter);
- f->open_object_section_in_ns("ListTopicsResponse", "https://sns.amazonaws.com/doc/2010-03-31/");
- f->open_object_section("ListTopicsResult");
- encode_xml("Topics", result, f);
- f->close_section();
- f->open_object_section("ResponseMetadata");
- encode_xml("RequestId", s->req_id, f);
- f->close_section();
- f->close_section();
- }
- rgw_flush_formatter_and_reset(s, s->formatter);
+ const auto f = s->formatter;
+ f->open_object_section_in_ns("ListTopicsResponse", "https://sns.amazonaws.com/doc/2010-03-31/");
+ f->open_object_section("ListTopicsResult");
+ encode_xml("Topics", result, f);
+ f->close_section();
+ f->open_object_section("ResponseMetadata");
+ encode_xml("RequestId", s->req_id, f);
+ f->close_section();
+ f->close_section();
+ rgw_flush_formatter_and_reset(s, f);
}
};
return;
}
- {
- XMLFormatter* f = static_cast<XMLFormatter*>(s->formatter);
- f->open_object_section("GetTopicResponse");
- f->open_object_section("GetTopicResult");
- encode_xml("Topic", result.topic, f);
- f->close_section();
- f->open_object_section("ResponseMetadata");
- encode_xml("RequestId", s->req_id, f);
- f->close_section();
- f->close_section();
- }
- rgw_flush_formatter_and_reset(s, s->formatter);
+ const auto f = s->formatter;
+ f->open_object_section("GetTopicResponse");
+ f->open_object_section("GetTopicResult");
+ encode_xml("Topic", result.topic, f);
+ f->close_section();
+ f->open_object_section("ResponseMetadata");
+ encode_xml("RequestId", s->req_id, f);
+ f->close_section();
+ f->close_section();
+ rgw_flush_formatter_and_reset(s, f);
}
};
return;
}
- {
- XMLFormatter* f = static_cast<XMLFormatter*>(s->formatter);
- f->open_object_section_in_ns("DeleteTopicResponse", "https://sns.amazonaws.com/doc/2010-03-31/");
- f->open_object_section("ResponseMetadata");
- encode_xml("RequestId", s->req_id, f);
- f->close_section();
- f->close_section();
- }
- rgw_flush_formatter_and_reset(s, s->formatter);
+ const auto f = s->formatter;
+ f->open_object_section_in_ns("DeleteTopicResponse", "https://sns.amazonaws.com/doc/2010-03-31/");
+ f->open_object_section("ResponseMetadata");
+ encode_xml("RequestId", s->req_id, f);
+ f->close_section();
+ f->close_section();
+ rgw_flush_formatter_and_reset(s, f);
}
};
ldout(s->cct, 20) << "successfully auto-generated unique topic '" << unique_topic_name << "'" << dendl;
// generate the notification
rgw::notify::EventTypeList events;
- op_ret = b->create_notification(unique_topic_name, c.events, notif_name);
+ op_ret = b->create_notification(unique_topic_name, c.events, std::make_optional(c.filter), notif_name);
if (op_ret < 0) {
ldout(s->cct, 1) << "failed to auto-generate notification for unique topic '" << unique_topic_name <<
"', ret=" << op_ret << dendl;
return 0;
}
- void add_notification_to_list(const rgw_pubsub_topic_filter& topic_filter);
-
public:
void execute() override;
void send_response() override {
const char* name() const override { return "pubsub_notifications_get_s3"; }
};
-void RGWPSListNotifs_ObjStore_S3::add_notification_to_list(const rgw_pubsub_topic_filter& topic_filter) {
- rgw_pubsub_s3_notification notification;
- notification.id = topic_filter.s3_id;
- notification.topic_arn = topic_filter.topic.arn;
- notification.events = topic_filter.events;
- notifications.list.push_back(notification);
-}
-
void RGWPSListNotifs_ObjStore_S3::execute() {
ups.emplace(store, s->owner.get_id());
auto b = ups->get_bucket(bucket_info.bucket);
// get info of a specific notification
const auto unique_topic = find_unique_topic(bucket_topics, notif_name);
if (unique_topic) {
- add_notification_to_list(unique_topic->get());
+ notifications.list.emplace_back(unique_topic->get());
return;
}
op_ret = -ENOENT;
// not an s3 notification
continue;
}
- add_notification_to_list(topic.second);
+ notifications.list.emplace_back(topic.second);
}
}
}
};
+bool match(const rgw_pubsub_topic_filter& filter, const std::string& key_name, rgw::notify::EventType event_type) {
+ if (!match(filter.events, event_type)) {
+ return false;
+ }
+ if (!match(filter.s3_filter.key_filter, key_name)) {
+ return false;
+ }
+ return true;
+}
+
class RGWPSFindBucketTopicsCR : public RGWCoroutine {
RGWDataSyncEnv *sync_env;
PSEnvRef env;
for (auto& titer : bucket_topics.topics) {
auto& topic_filter = titer.second;
auto& info = topic_filter.topic;
- // if event list is defined but event does not match any in the list, we skip to the next one
- if (!topic_filter.events.empty() &&
- std::find(topic_filter.events.begin(), topic_filter.events.end(), event_type) == topic_filter.events.end()) {
+ if (!match(topic_filter, key.name, event_type)) {
continue;
}
std::shared_ptr<PSTopicConfig> tc = std::make_shared<PSTopicConfig>();
# configure logging for the tests module
log = logging.getLogger(__name__)
-skip_push_tests = True
+skip_push_tests = False
####################################
# utility functions for pubsub tests
def __init__(self, exchange, topic):
import pika
hostname = get_ip()
- connection = pika.BlockingConnection(pika.ConnectionParameters(host=hostname, port=rabbitmq_port))
+ remaining_retries = 10
+ while remaining_retries > 0:
+ try:
+ connection = pika.BlockingConnection(pika.ConnectionParameters(host=hostname, port=rabbitmq_port))
+ break
+ except Exception as error:
+ remaining_retries -= 1
+ print 'failed to connect to rabbitmq (remaining retries ' + str(remaining_retries) + '): ' + str(error)
+
+ if remaining_retries == 0:
+ raise Exception('failed to connect to rabbitmq - no retries left')
self.channel = connection.channel()
self.channel.exchange_declare(exchange=exchange, exchange_type='topic', durable=True)
zones[0].delete_bucket(bucket_name)
+def ps_s3_notification_filter(on_master):
+ """ test s3 notification filter on master """
+ if skip_push_tests:
+ return SkipTest("PubSub push tests don't run in teuthology")
+ hostname = get_ip()
+ proc = init_rabbitmq()
+ if proc is None:
+ return SkipTest('end2end amqp tests require rabbitmq-server installed')
+ if on_master:
+ zones, _ = init_env(require_ps=False)
+ ps_zone = zones[0]
+ else:
+ zones, ps_zones = init_env(require_ps=True)
+ ps_zone = ps_zones[0]
+
+ realm = get_realm()
+ zonegroup = realm.master_zonegroup()
+
+ # create bucket
+ bucket_name = gen_bucket_name()
+ bucket = zones[0].create_bucket(bucket_name)
+ topic_name = bucket_name + TOPIC_SUFFIX
+
+ # start amqp receivers
+ exchange = 'ex1'
+ task, receiver = create_amqp_receiver_thread(exchange, topic_name)
+ task.start()
+
+ # create s3 topic
+ endpoint_address = 'amqp://' + hostname
+ endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=broker'
+ if on_master:
+ topic_conf = PSTopicS3(ps_zone.conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
+ topic_arn = topic_conf.set_config()
+ else:
+ topic_conf = PSTopic(ps_zone.conn, topic_name, endpoint=endpoint_address, endpoint_args=endpoint_args)
+ result, _ = topic_conf.set_config()
+ parsed_result = json.loads(result)
+ topic_arn = parsed_result['arn']
+ zone_meta_checkpoint(ps_zone.zone)
+
+ # create s3 notification
+ notification_name = bucket_name + NOTIFICATION_SUFFIX
+ topic_conf_list = [{'Id': notification_name+'_1',
+ 'TopicArn': topic_arn,
+ 'Events': ['s3:ObjectCreated:*'],
+ 'Filter': {
+ 'Key': {
+ 'FilterRules': [{'Name': 'prefix', 'Value': 'hello'}]
+ }
+ }
+ },
+ {'Id': notification_name+'_2',
+ 'TopicArn': topic_arn,
+ 'Events': ['s3:ObjectCreated:*'],
+ 'Filter': {
+ 'Key': {
+ 'FilterRules': [{'Name': 'prefix', 'Value': 'world'},
+ {'Name': 'suffix', 'Value': 'log'}]
+ }
+ }
+ },
+ {'Id': notification_name+'_3',
+ 'TopicArn': topic_arn,
+ 'Events': [],
+ 'Filter': {
+ 'Key': {
+ 'FilterRules': [{'Name': 'regex', 'Value': '([a-z]+)\\.txt'}]
+ }
+ }
+ }]
+
+ s3_notification_conf = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list)
+ result, status = s3_notification_conf.set_config()
+ assert_equal(status/100, 2)
+
+ if on_master:
+ topic_conf_list = [{'Id': notification_name+'_4',
+ 'TopicArn': topic_arn,
+ 'Events': ['s3:ObjectCreated:*', 's3:ObjectRemoved:*'],
+ 'Filter': {
+ 'Metadata': {
+ 'FilterRules': [{'Name': 'x-amz-meta-foo', 'Value': 'bar'},
+ {'Name': 'x-amz-meta-hello', 'Value': 'world'}]
+ },
+ 'Key': {
+ 'FilterRules': [{'Name': 'regex', 'Value': '([a-z]+)'}]
+ }
+ }
+ }]
+
+ try:
+ s3_notification_conf4 = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list)
+ _, status = s3_notification_conf4.set_config()
+ assert_equal(status/100, 2)
+ skip_notif4 = False
+ except Exception as error:
+ print 'note: metadata filter is not supported by boto3 - skipping test'
+ skip_notif4 = True
+ else:
+ print 'filtering by attributes only supported on master zone'
+ skip_notif4 = True
+
+
+ # get all notifications
+ result, status = s3_notification_conf.get_config()
+ assert_equal(status/100, 2)
+ for conf in result['TopicConfigurations']:
+ filter_name = conf['Filter']['Key']['FilterRules'][0]['Name']
+ assert filter_name == 'prefix' or filter_name == 'suffix' or filter_name == 'regex', filter_name
+
+ if not skip_notif4:
+ result, status = s3_notification_conf4.get_config(notification=notification_name+'_4')
+ assert_equal(status/100, 2)
+ filter_name = result['NotificationConfiguration']['TopicConfiguration']['Filter']['S3Metadata']['FilterRule'][0]['Name']
+ assert filter_name == 'x-amz-meta-foo' or filter_name == 'x-amz-meta-hello'
+
+ expected_in1 = ['hello.kaboom', 'hello.txt', 'hello123.txt', 'hello']
+ expected_in2 = ['world1.log', 'world2log', 'world3.log']
+ expected_in3 = ['hello.txt', 'hell.txt', 'worldlog.txt']
+ expected_in4 = ['foo', 'bar', 'hello', 'world']
+ filtered = ['hell.kaboom', 'world.og', 'world.logg', 'he123ll.txt', 'wo', 'log', 'h', 'txt', 'world.log.txt']
+ filtered_with_attr = ['nofoo', 'nobar', 'nohello', 'noworld']
+ # create objects in bucket
+ for key_name in expected_in1:
+ key = bucket.new_key(key_name)
+ key.set_contents_from_string('bar')
+ for key_name in expected_in2:
+ key = bucket.new_key(key_name)
+ key.set_contents_from_string('bar')
+ for key_name in expected_in3:
+ key = bucket.new_key(key_name)
+ key.set_contents_from_string('bar')
+ if not skip_notif4:
+ for key_name in expected_in4:
+ key = bucket.new_key(key_name)
+ key.set_metadata('foo', 'bar')
+ key.set_metadata('hello', 'world')
+ key.set_metadata('goodbye', 'cruel world')
+ key.set_contents_from_string('bar')
+ for key_name in filtered:
+ key = bucket.new_key(key_name)
+ key.set_contents_from_string('bar')
+ for key_name in filtered_with_attr:
+ key.set_metadata('foo', 'nobar')
+ key.set_metadata('hello', 'noworld')
+ key.set_metadata('goodbye', 'cruel world')
+ key = bucket.new_key(key_name)
+ key.set_contents_from_string('bar')
+
+ if on_master:
+ print 'wait for 5sec for the messages...'
+ time.sleep(5)
+ else:
+ zone_bucket_checkpoint(ps_zone.zone, zones[0].zone, bucket_name)
+
+ found_in1 = []
+ found_in2 = []
+ found_in3 = []
+ found_in4 = []
+
+ for event in receiver.get_and_reset_events():
+ notif_id = event['s3']['configurationId']
+ key_name = event['s3']['object']['key']
+ if notif_id == notification_name+'_1':
+ found_in1.append(key_name)
+ elif notif_id == notification_name+'_2':
+ found_in2.append(key_name)
+ elif notif_id == notification_name+'_3':
+ found_in3.append(key_name)
+ elif not skip_notif4 and notif_id == notification_name+'_4':
+ found_in4.append(key_name)
+ else:
+ assert False, 'invalid notification: ' + notif_id
+
+ assert_equal(set(found_in1), set(expected_in1))
+ assert_equal(set(found_in2), set(expected_in2))
+ assert_equal(set(found_in3), set(expected_in3))
+ if not skip_notif4:
+ assert_equal(set(found_in4), set(expected_in4))
+
+ # cleanup
+ s3_notification_conf.del_config()
+ if not skip_notif4:
+ s3_notification_conf4.del_config()
+ topic_conf.del_config()
+ # delete the bucket
+ for key in bucket.list():
+ key.delete()
+ zones[0].delete_bucket(bucket_name)
+ stop_amqp_receiver(receiver, task)
+ clean_rabbitmq(proc)
+
+
+def test_ps_s3_notification_filter_on_master():
+ ps_s3_notification_filter(on_master=True)
+
+
+def test_ps_s3_notification_filter():
+ ps_s3_notification_filter(on_master=False)
+
+
def test_ps_s3_notification_errors_on_master():
""" test s3 notification set/get/delete on master """
zones, _ = init_env(require_ps=False)