]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw/pubsub: add notification filtering 28971/head
authorYuval Lifshitz <yuvalif@yahoo.com>
Tue, 10 Sep 2019 15:53:21 +0000 (18:53 +0300)
committerYuval Lifshitz <yuvalif@yahoo.com>
Wed, 11 Sep 2019 16:24:55 +0000 (19:24 +0300)
Signed-off-by: Yuval Lifshitz <yuvalif@yahoo.com>
doc/radosgw/notifications.rst
doc/radosgw/pubsub-module.rst
doc/radosgw/s3-notification-compatibility.rst
doc/radosgw/s3/bucketops.rst
src/rgw/rgw_notify.cc
src/rgw/rgw_pubsub.cc
src/rgw/rgw_pubsub.h
src/rgw/rgw_rest_pubsub.cc
src/rgw/rgw_sync_module_pubsub.cc
src/test/rgw/rgw_multi/tests_ps.py

index da8ad68f97c38a65381fccdfdb93de272e6d2eeb..152dc03f83cf35fa9509f4ab2484927c7f4d3218 100644 (file)
@@ -15,8 +15,10 @@ the `PubSub Module`_ should be used instead of the bucket notification mechanism
 A user can create different topics. A topic entity is defined by its user and its name. A
 user can only manage its own topics, and can only associate them with buckets it owns.
 
-In order to send notifications for events for a specific bucket, a notification ientity needs to be created. A
+In order to send notifications for events for a specific bucket, a notification entity needs to be created. A
 notification can be created on a subset of event types, or for all event types (default).
+The notification may also filter out events based on preffix/suffix and/or regular expression matching of the keys. As well as, 
+on the metadata attributes attached to the object.
 There can be multiple notifications for any specific topic, and the same topic could be used for multiple notifications.
 
 REST API has been defined to provide configuration and control interfaces for the bucket notification
index 6f98f63f8919338b7f8e7d24d122027ee0e08272..9be303b857263bc2491f52c0fd1322a179843066 100644 (file)
@@ -1,4 +1,4 @@
-==================
+================== 
 PubSub Sync Module
 ==================
 
@@ -242,6 +242,7 @@ Detailed under: `Bucket Operations`_.
     - In case that bucket deletion implicitly deletes the notification, 
       the associated subscription will not be deleted automatically (any events of the deleted bucket could still be access),
       and will have to be deleted explicitly with the subscription deletion API
+    - Filtering based on metadata (which is an extension to S3) is not supported, and such rules will be ignored
 
 
 Non S3-Compliant Notifications
index 91d66a7078ced956915b201cfd363f5fb9dbb409..6cc6ac0283f1d3cacdc088b086e4ac03dc7b5d74 100644 (file)
@@ -29,8 +29,6 @@ Following tags (and the tags inside them) are not supported:
 +-----------------------------------+----------------------------------------------+
 | ``<CloudFunctionConfiguration>``  | not needed, we treat all destinations as SNS |
 +-----------------------------------+----------------------------------------------+
-| ``<Filter>``                      | object filtering not supported               |
-+-----------------------------------+----------------------------------------------+
 
 REST API Extension 
 ------------------
@@ -45,6 +43,15 @@ Ceph's bucket notification API has the following extensions:
 
   - In S3, it is only possible to fetch all notifications on a bucket
 
+- In addition to filtering based on prefix/suffix of object keys we support:
+
+  - Filtering based on regular expression matching
+
+  - Filtering based on metadata attributes attached to the object
+
+- Filtering overlapping is allowed, so that same event could be sent as different notification
+
+
 Unsupported Fields in the Event Record
 --------------------------------------
 
index 25a23a2826a199e96d8f7a968796759b0d66afee..50acfeda7306911eaa1357151f8aeed2cb4394e3 100644 (file)
@@ -502,6 +502,20 @@ Parameters are XML encoded in the body of the request, in the following format:
            <Id></Id>
            <Topic></Topic>
            <Event></Event>
+           <Filter>
+               <S3Key>
+                   <FilterRule>
+                       <Name></Name>
+                       <Value></Value>
+                   </FilterRule>
+                   </S3Key>
+                <S3Metadata>
+                    <FilterRule>
+                        <Name></Name>
+                        <Value></Value>
+                    </FilterRule>
+                </s3Metadata>
+            </Filter>
        </TopicConfiguration>
    </NotificationConfiguration>
 
@@ -519,6 +533,25 @@ Parameters are XML encoded in the body of the request, in the following format:
 | ``Event``                     | String    | List of supported events see: `S3 Notification Compatibility`_.  Multiple ``Event``  | No       |
 |                               |           | entities can be used. If omitted, all events are handled                             |          |
 +-------------------------------+-----------+--------------------------------------------------------------------------------------+----------+
+| ``Filter``                    | Container | Holding ``S3Key`` and ``S3Metadata`` entities                                        | No       |
++-------------------------------+-----------+--------------------------------------------------------------------------------------+----------+
+| ``S3Key``                     | Container | Holding a list of ``FilterRule`` entities, for filtering based on object key.        | No       |
+|                               |           | At most, 3 entities may be in the list, with ``Name`` be ``prefix``, ``suffix`` or   |          |
+|                               |           | ``regex``. All filter rules in the list must match for the filter to match.          |          |
++-------------------------------+-----------+--------------------------------------------------------------------------------------+----------+
+| ``S3Metadata``                | Container | Holding a list of ``FilterRule`` entities, for filtering based on object metadata.   | No       |
+|                               |           | All filter rules in the list must match the ones defined on the object. The object,  |          |
+|                               |           | have other metadata entitied not listed in the filter.                               |          |
++-------------------------------+-----------+--------------------------------------------------------------------------------------+----------+
+| ``S3Key.FilterRule``          | Container | Holding ``Name`` and ``Value`` entities. ``Name`` would  be: ``prefix``, ``suffix``  | Yes      |
+|                               |           | or ``regex``. The ``Value`` would hold the key prefix, key suffix or a regular       |          |
+|                               |           | expression for matching the key, accordingly.                                        |          |
++-------------------------------+-----------+--------------------------------------------------------------------------------------+----------+
+| ``S3Metadata.FilterRule``     | Container | Holding ``Name`` and ``Value`` entities. ``Name`` would be the name of the metadata  | Yes      |
+|                               |           | attribute (e.g. ``x-amz-meta-xxx``). The ``Value`` would be the expected value for   |          | 
+|                               |           | this attribute                                                                       |          |
++-------------------------------+-----------+--------------------------------------------------------------------------------------+----------+
+
 
 HTTP Response
 ~~~~~~~~~~~~~
@@ -607,6 +640,20 @@ Response is XML encoded in the body of the request, in the following format:
            <Id></Id>
            <Topic></Topic>
            <Event></Event>
+           <Filter>
+               <S3Key>
+                   <FilterRule>
+                       <Name></Name>
+                       <Value></Value>
+                   </FilterRule>
+                   </S3Key>
+                <S3Metadata>
+                    <FilterRule>
+                        <Name></Name>
+                        <Value></Value>
+                    </FilterRule>
+                </s3Metadata>
+            </Filter>
        </TopicConfiguration>
    </NotificationConfiguration>
 
@@ -623,6 +670,8 @@ Response is XML encoded in the body of the request, in the following format:
 +-------------------------------+-----------+--------------------------------------------------------------------------------------+----------+
 | ``Event``                     | String    | Handled event. Multiple ``Event`` entities may exist                                 | Yes      |
 +-------------------------------+-----------+--------------------------------------------------------------------------------------+----------+
+| ``Filter``                    | Container | Holding the filters configured for this notification                                 | No       |
++-------------------------------+-----------+--------------------------------------------------------------------------------------+----------+
 
 HTTP Response
 ~~~~~~~~~~~~~
index ea5f0fd6f42635b605803148c707ea67b78ee9fb..54a212e717bc14e335824d6d50c36ca9f77f599a 100644 (file)
@@ -49,14 +49,17 @@ void populate_record_from_request(const req_state *s,
   record.x_meta_map = s->info.x_meta_map;
 }
 
-bool filter(const rgw_pubsub_topic_filter& filter, const req_state* s, EventType event_type) {
-    // if event list exists, and none of the events in the list matches the event type, filter the message
-    if (filter.events.size() && std::find(filter.events.begin(), filter.events.end(), event_type) == filter.events.end()) {
-        return true;
-    }
-    // TODO: add filter by compliant conf: object name, prefix, suffix
-    // TODO: add extra filtering criteria: object size, ToD, metadata, ...
+bool match(const rgw_pubsub_topic_filter& filter, const req_state* s, EventType event) {
+  if (!::match(filter.events, event)) { 
+    return false;
+  }
+  if (!::match(filter.s3_filter.key_filter, s->object.name)) {
+    return false;
+  }
+  if (!::match(filter.s3_filter.metadata_filter, s->info.x_meta_map)) {
     return false;
+  }
+  return true;
 }
 
 int publish(const req_state* s, 
@@ -79,7 +82,7 @@ int publish(const req_state* s,
     for (const auto& bucket_topic : bucket_topics.topics) {
         const rgw_pubsub_topic_filter& topic_filter = bucket_topic.second;
         const rgw_pubsub_topic& topic_cfg = topic_filter.topic;
-        if (filter(topic_filter, s, event_type)) {
+        if (!match(topic_filter, s, event_type)) {
             // topic does not apply to req_state
             continue;
         }
index 4e80011388fb07f13cd6d5ee5a4ea94a13491b07..5e3d7cc41283d8aa6583ba8b7af2e8dfe563a248 100644 (file)
 #include "rgw_arn.h"
 #include "rgw_pubsub_push.h"
 #include "rgw_rados.h"
+#include <regex>
+#include <algorithm>
 
 #define dout_subsys ceph_subsys_rgw
 
-void do_decode_xml_obj(rgw::notify::EventTypeList& l, const string& name, XMLObj *obj)
-{
+bool rgw_s3_key_filter::decode_xml(XMLObj* obj) {
+  XMLObjIter iter = obj->find("FilterRule");
+  XMLObj *o;
+
+  const auto throw_if_missing = true;
+  auto prefix_not_set = true;
+  auto suffix_not_set = true;
+  auto regex_not_set = true;
+  std::string name;
+
+  while ((o = iter.get_next())) {
+    RGWXMLDecoder::decode_xml("Name", name, o, throw_if_missing);
+    if (name == "prefix" && prefix_not_set) {
+        prefix_not_set = false;
+        RGWXMLDecoder::decode_xml("Value", prefix_rule, o, throw_if_missing);
+    } else if (name == "suffix" && suffix_not_set) {
+        suffix_not_set = false;
+        RGWXMLDecoder::decode_xml("Value", suffix_rule, o, throw_if_missing);
+    } else if (name == "regex" && regex_not_set) {
+        regex_not_set = false;
+        RGWXMLDecoder::decode_xml("Value", regex_rule, o, throw_if_missing);
+    } else {
+        throw RGWXMLDecoder::err("invalid/duplicate S3Key filter rule name: '" + name + "'");
+    }
+  }
+  return true;
+}
+
+void rgw_s3_key_filter::dump_xml(Formatter *f) const {
+  if (!prefix_rule.empty()) {
+    f->open_object_section("FilterRule");
+    ::encode_xml("Name", "prefix", f);
+    ::encode_xml("Value", prefix_rule, f);
+    f->close_section();
+  }
+  if (!suffix_rule.empty()) {
+    f->open_object_section("FilterRule");
+    ::encode_xml("Name", "suffix", f);
+    ::encode_xml("Value", suffix_rule, f);
+    f->close_section();
+  }
+  if (!regex_rule.empty()) {
+    f->open_object_section("FilterRule");
+    ::encode_xml("Name", "regex", f);
+    ::encode_xml("Value", regex_rule, f);
+    f->close_section();
+  }
+}
+
+bool rgw_s3_key_filter::has_content() const {
+    return !(prefix_rule.empty() && suffix_rule.empty() && regex_rule.empty());
+}
+
+bool rgw_s3_metadata_filter::decode_xml(XMLObj* obj) {
+  metadata.clear();
+  XMLObjIter iter = obj->find("FilterRule");
+  XMLObj *o;
+
+  const auto throw_if_missing = true;
+
+  std::string key;
+  std::string value;
+
+  while ((o = iter.get_next())) {
+    RGWXMLDecoder::decode_xml("Name", key, o, throw_if_missing);
+    RGWXMLDecoder::decode_xml("Value", value, o, throw_if_missing);
+    metadata.emplace(key, value);
+  }
+  return true;
+}
+
+void rgw_s3_metadata_filter::dump_xml(Formatter *f) const {
+  for (const auto& key_value : metadata) {
+    f->open_object_section("FilterRule");
+    ::encode_xml("Name", key_value.first, f);
+    ::encode_xml("Value", key_value.second, f);
+    f->close_section();
+  }
+}
+
+bool rgw_s3_metadata_filter::has_content() const {
+    return !metadata.empty();
+}
+
+bool rgw_s3_filter::decode_xml(XMLObj* obj) {
+    RGWXMLDecoder::decode_xml("S3Key", key_filter, obj);
+    RGWXMLDecoder::decode_xml("S3Metadata", metadata_filter, obj);
+  return true;
+}
+
+void rgw_s3_filter::dump_xml(Formatter *f) const {
+  if (key_filter.has_content()) {
+      ::encode_xml("S3Key", key_filter, f);
+  }
+  if (metadata_filter.has_content()) {
+      ::encode_xml("S3Metadata", metadata_filter, f);
+  }
+}
+
+bool rgw_s3_filter::has_content() const {
+    return key_filter.has_content()  ||
+           metadata_filter.has_content();
+}
+
+bool match(const rgw_s3_key_filter& filter, const std::string& key) {
+  const auto key_size = key.size();
+  const auto prefix_size = filter.prefix_rule.size();
+  if (prefix_size != 0) {
+    // prefix rule exists
+    if (prefix_size > key_size) {
+      // if prefix is longer than key, we fail
+      return false;
+    }
+    if (!std::equal(filter.prefix_rule.begin(), filter.prefix_rule.end(), key.begin())) {
+        return false;
+    }
+  }
+  const auto suffix_size = filter.suffix_rule.size();
+  if (suffix_size != 0) {
+    // suffix rule exists
+    if (suffix_size > key_size) {
+      // if suffix is longer than key, we fail
+      return false;
+    }
+    if (!std::equal(filter.suffix_rule.begin(), filter.suffix_rule.end(), (key.end() - suffix_size))) {
+        return false;
+    }
+  }
+  if (!filter.regex_rule.empty()) {
+    // TODO add regex chaching in the filter
+    const std::regex base_regex(filter.regex_rule);
+    if (!std::regex_match(key, base_regex)) {
+      return false;
+    }
+  }
+  return true;
+}
+
+bool match(const rgw_s3_metadata_filter& filter, const Metadata& metadata) {
+  // all filter pairs must exist with the same value in the object's metadata
+  // object metadata may include items not in the filter
+  return std::includes(metadata.begin(), metadata.end(), filter.metadata.begin(), filter.metadata.end());
+}
+
+bool match(const rgw::notify::EventTypeList& events, rgw::notify::EventType event) {
+  // if event list exists, and none of the events in the list matches the event type, filter the message
+  if (!events.empty() && std::find(events.begin(), events.end(), event) == events.end()) {
+    return false;
+  }
+  return true;
+}
+
+void do_decode_xml_obj(rgw::notify::EventTypeList& l, const string& name, XMLObj *obj) {
   l.clear();
 
   XMLObjIter iter = obj->find(name);
@@ -32,6 +185,8 @@ bool rgw_pubsub_s3_notification::decode_xml(XMLObj *obj) {
   RGWXMLDecoder::decode_xml("Id", id, obj, throw_if_missing);
   
   RGWXMLDecoder::decode_xml("Topic", topic_arn, obj, throw_if_missing);
+  
+  RGWXMLDecoder::decode_xml("Filter", filter, obj);
 
   do_decode_xml_obj(events, "Event", obj);
   if (events.empty()) {
@@ -45,6 +200,9 @@ bool rgw_pubsub_s3_notification::decode_xml(XMLObj *obj) {
 void rgw_pubsub_s3_notification::dump_xml(Formatter *f) const {
   ::encode_xml("Id", id, f);
   ::encode_xml("Topic", topic_arn.c_str(), f);
+  if (filter.has_content()) {
+      ::encode_xml("Filter", filter, f);
+  }
   for (const auto& event : events) {
     ::encode_xml("Event", rgw::notify::to_string(event), f);
   }
@@ -58,6 +216,9 @@ bool rgw_pubsub_s3_notifications::decode_xml(XMLObj *obj) {
   return true;
 }
 
+rgw_pubsub_s3_notification::rgw_pubsub_s3_notification(const rgw_pubsub_topic_filter& topic_filter) :
+    id(topic_filter.s3_id), events(topic_filter.events), topic_arn(topic_filter.topic.arn), filter(topic_filter.s3_filter) {} 
+
 void rgw_pubsub_s3_notifications::dump_xml(Formatter *f) const {
   do_encode_xml("NotificationConfiguration", list, "TopicConfiguration", f);
 }
@@ -309,8 +470,11 @@ int RGWUserPubSub::get_topic(const string& name, rgw_pubsub_topic *result)
   return 0;
 }
 
-int RGWUserPubSub::Bucket::create_notification(const string& topic_name, const rgw::notify::EventTypeList& events, const std::string& notif_name)
-{
+int RGWUserPubSub::Bucket::create_notification(const string& topic_name, const rgw::notify::EventTypeList& events) {
+    return create_notification(topic_name, events, std::nullopt, "");
+}
+
+int RGWUserPubSub::Bucket::create_notification(const string& topic_name, const rgw::notify::EventTypeList& events, OptionalFilter s3_filter, const std::string& notif_name) {
   rgw_pubsub_topic_subs user_topic_info;
   rgw::sal::RGWRadosStore *store = ps->store;
 
@@ -337,6 +501,9 @@ int RGWUserPubSub::Bucket::create_notification(const string& topic_name, const r
   topic_filter.topic = user_topic_info.topic;
   topic_filter.events = events;
   topic_filter.s3_id = notif_name;
+  if (s3_filter) {
+    topic_filter.s3_filter = *s3_filter;
+  }
 
   ret = write_topics(bucket_topics, &objv_tracker);
   if (ret < 0) {
index bf29cd15e873366990141a872ea6c471cf57fd8e..36edcd36cae3d92a3bb47b833b4eca830a395e72 100644 (file)
 
 class XMLObj;
 
+struct rgw_s3_key_filter {
+  std::string prefix_rule;
+  std::string suffix_rule;
+  std::string regex_rule;
+
+  bool has_content() const;
+
+  bool decode_xml(XMLObj *obj);
+  void dump_xml(Formatter *f) const;
+  
+  void encode(bufferlist& bl) const {
+    ENCODE_START(1, 1, bl);
+    encode(prefix_rule, bl);
+    encode(suffix_rule, bl);
+    encode(regex_rule, bl);
+    ENCODE_FINISH(bl);
+  }
+
+  void decode(bufferlist::const_iterator& bl) {
+    DECODE_START(1, bl);
+    decode(prefix_rule, bl);
+    decode(suffix_rule, bl);
+    decode(regex_rule, bl);
+    DECODE_FINISH(bl);
+  }
+};
+WRITE_CLASS_ENCODER(rgw_s3_key_filter)
+
+using Metadata = std::map<std::string, std::string>;
+
+struct rgw_s3_metadata_filter {
+  Metadata metadata;
+  
+  bool has_content() const;
+  
+  bool decode_xml(XMLObj *obj);
+  void dump_xml(Formatter *f) const;
+  
+  void encode(bufferlist& bl) const {
+    ENCODE_START(1, 1, bl);
+    encode(metadata, bl);
+    ENCODE_FINISH(bl);
+  }
+  void decode(bufferlist::const_iterator& bl) {
+    DECODE_START(1, bl);
+    decode(metadata, bl);
+    DECODE_FINISH(bl);
+  }
+};
+WRITE_CLASS_ENCODER(rgw_s3_metadata_filter)
+
+struct rgw_s3_filter {
+  rgw_s3_key_filter key_filter;
+  rgw_s3_metadata_filter metadata_filter;
+
+  bool has_content() const;
+  
+  bool decode_xml(XMLObj *obj);
+  void dump_xml(Formatter *f) const;
+  
+  void encode(bufferlist& bl) const {
+    ENCODE_START(1, 1, bl);
+    encode(key_filter, bl);
+    encode(metadata_filter, bl);
+    ENCODE_FINISH(bl);
+  }
+
+  void decode(bufferlist::const_iterator& bl) {
+    DECODE_START(1, bl);
+    decode(key_filter, bl);
+    decode(metadata_filter, bl);
+    DECODE_FINISH(bl);
+  }
+};
+WRITE_CLASS_ENCODER(rgw_s3_filter)
+
+using OptionalFilter = std::optional<rgw_s3_filter>;
+
+class rgw_pubsub_topic_filter;
 /* S3 notification configuration
  * based on: https://docs.aws.amazon.com/AmazonS3/latest/API/RESTBucketPUTnotification.html
 <NotificationConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
@@ -24,6 +103,12 @@ class XMLObj;
           <Value>jpg</Value>
         </FilterRule>
       </S3Key>
+      <S3Metadata>
+        <FilterRule>
+          <Name></Name>
+          <Value></Value>
+        </FilterRule>
+      </s3Metadata>
     </Filter>
     <Id>notification1</Id>
     <Topic>arn:aws:sns:<region>:<account>:<topic></Topic>
@@ -39,11 +124,24 @@ struct rgw_pubsub_s3_notification {
   rgw::notify::EventTypeList events;
   // topic ARN
   std::string topic_arn;
+  // filter rules
+  rgw_s3_filter filter;
 
   bool decode_xml(XMLObj *obj);
   void dump_xml(Formatter *f) const;
+
+  rgw_pubsub_s3_notification() = default;
+  // construct from rgw_pubsub_topic_filter (used by get/list notifications)
+  rgw_pubsub_s3_notification(const rgw_pubsub_topic_filter& topic_filter);
 };
 
+// return true if the key matches the prefix/suffix/regex rules of the key filter
+bool match(const rgw_s3_key_filter& filter, const std::string& key);
+// return true if the key matches the metadata rules of the metadata filter
+bool match(const rgw_s3_metadata_filter& filter, const Metadata& metadata);
+// return true if the event type matches (equal or contained in) one of the events in the list
+bool match(const rgw::notify::EventTypeList& events, rgw::notify::EventType event);
+
 struct rgw_pubsub_s3_notifications {
   std::list<rgw_pubsub_s3_notification> list;
   bool decode_xml(XMLObj *obj);
@@ -372,9 +470,10 @@ struct rgw_pubsub_topic_filter {
   rgw_pubsub_topic topic;
   rgw::notify::EventTypeList events;
   std::string s3_id;
+  rgw_s3_filter s3_filter;
 
   void encode(bufferlist& bl) const {
-    ENCODE_START(2, 1, bl);
+    ENCODE_START(3, 1, bl);
     encode(topic, bl);
     // events are stored as a vector of strings
     std::vector<std::string> tmp_events;
@@ -382,11 +481,12 @@ struct rgw_pubsub_topic_filter {
     std::transform(events.begin(), events.end(), std::back_inserter(tmp_events), converter);
     encode(tmp_events, bl);
     encode(s3_id, bl);
+    encode(s3_filter, bl);
     ENCODE_FINISH(bl);
   }
 
   void decode(bufferlist::const_iterator& bl) {
-    DECODE_START(2, bl);
+    DECODE_START(3, bl);
     decode(topic, bl);
     // events are stored as a vector of strings
     events.clear();
@@ -396,6 +496,9 @@ struct rgw_pubsub_topic_filter {
     if (struct_v >= 2) {
       decode(s3_id, bl);
     }
+    if (struct_v >= 3) {
+      decode(s3_filter, bl);
+    }
     DECODE_FINISH(bl);
   }
 
@@ -502,12 +605,14 @@ public:
     // read the list of topics associated with a bucket and populate into result
     // return 0 on success or if no topic was associated with the bucket, error code otherwise
     int get_topics(rgw_pubsub_bucket_topics *result);
-    // adds a topic + filter (event list) to a bucket
+    // adds a topic + filter (event list, and possibly name and metadata filters) to a bucket
     // assigning a notification name is optional (needed for S3 compatible notifications)
     // if the topic already exist on the bucket, the filter event list may be updated
+    // for S3 compliant notifications the version with: s3_filter and notif_name should be used
     // return -ENOENT if the topic does not exists
     // return 0 on success, error code otherwise
-    int create_notification(const string& topic_name, const rgw::notify::EventTypeList& events, const std::string& notif_name="");
+    int create_notification(const string& topic_name, const rgw::notify::EventTypeList& events);
+    int create_notification(const string& topic_name, const rgw::notify::EventTypeList& events, OptionalFilter s3_filter, const std::string& notif_name);
     // remove a topic and filter from bucket
     // if the topic does not exists on the bucket it is a no-op (considered success)
     // return -ENOENT if the topic does not exists
index e4bf0bf326f997bc656883ef6756985cde22374a..08d8c544cf1589d0d26f0d0dd56e70cedb30e1f2 100644 (file)
@@ -68,18 +68,16 @@ public:
       return;
     }
 
-    {
-      XMLFormatter* f = static_cast<XMLFormatter*>(s->formatter);
-      f->open_object_section_in_ns("CreateTopicResponse", "https://sns.amazonaws.com/doc/2010-03-31/");
-      f->open_object_section("CreateTopicResult");
-      encode_xml("TopicArn", topic_arn, f); 
-      f->close_section();
-      f->open_object_section("ResponseMetadata");
-      encode_xml("RequestId", s->req_id, f); 
-      f->close_section();
-      f->close_section();
-    }
-    rgw_flush_formatter_and_reset(s, s->formatter);
+    const auto f = s->formatter;
+    f->open_object_section_in_ns("CreateTopicResponse", "https://sns.amazonaws.com/doc/2010-03-31/");
+    f->open_object_section("CreateTopicResult");
+    encode_xml("TopicArn", topic_arn, f); 
+    f->close_section();
+    f->open_object_section("ResponseMetadata");
+    encode_xml("RequestId", s->req_id, f); 
+    f->close_section();
+    f->close_section();
+    rgw_flush_formatter_and_reset(s, f);
   }
 };
 
@@ -99,18 +97,16 @@ public:
       return;
     }
 
-    {
-      XMLFormatter* f = static_cast<XMLFormatter*>(s->formatter);
-      f->open_object_section_in_ns("ListTopicsResponse", "https://sns.amazonaws.com/doc/2010-03-31/");
-      f->open_object_section("ListTopicsResult");
-      encode_xml("Topics", result, f); 
-      f->close_section();
-      f->open_object_section("ResponseMetadata");
-      encode_xml("RequestId", s->req_id, f); 
-      f->close_section();
-      f->close_section();
-    }
-    rgw_flush_formatter_and_reset(s, s->formatter);
+    const auto f = s->formatter;
+    f->open_object_section_in_ns("ListTopicsResponse", "https://sns.amazonaws.com/doc/2010-03-31/");
+    f->open_object_section("ListTopicsResult");
+    encode_xml("Topics", result, f); 
+    f->close_section();
+    f->open_object_section("ResponseMetadata");
+    encode_xml("RequestId", s->req_id, f); 
+    f->close_section();
+    f->close_section();
+    rgw_flush_formatter_and_reset(s, f);
   }
 };
 
@@ -142,18 +138,16 @@ public:
       return;
     }
 
-    {
-      XMLFormatter* f = static_cast<XMLFormatter*>(s->formatter);
-      f->open_object_section("GetTopicResponse");
-      f->open_object_section("GetTopicResult");
-      encode_xml("Topic", result.topic, f); 
-      f->close_section();
-      f->open_object_section("ResponseMetadata");
-      encode_xml("RequestId", s->req_id, f); 
-      f->close_section();
-      f->close_section();
-    }
-    rgw_flush_formatter_and_reset(s, s->formatter);
+    const auto f = s->formatter;
+    f->open_object_section("GetTopicResponse");
+    f->open_object_section("GetTopicResult");
+    encode_xml("Topic", result.topic, f); 
+    f->close_section();
+    f->open_object_section("ResponseMetadata");
+    encode_xml("RequestId", s->req_id, f); 
+    f->close_section();
+    f->close_section();
+    rgw_flush_formatter_and_reset(s, f);
   }
 };
 
@@ -185,15 +179,13 @@ public:
       return;
     }
 
-    {
-      XMLFormatter* f = static_cast<XMLFormatter*>(s->formatter);
-      f->open_object_section_in_ns("DeleteTopicResponse", "https://sns.amazonaws.com/doc/2010-03-31/");
-      f->open_object_section("ResponseMetadata");
-      encode_xml("RequestId", s->req_id, f); 
-      f->close_section();
-      f->close_section();
-    }
-    rgw_flush_formatter_and_reset(s, s->formatter);
+    const auto f = s->formatter;
+    f->open_object_section_in_ns("DeleteTopicResponse", "https://sns.amazonaws.com/doc/2010-03-31/");
+    f->open_object_section("ResponseMetadata");
+    encode_xml("RequestId", s->req_id, f); 
+    f->close_section();
+    f->close_section();
+    rgw_flush_formatter_and_reset(s, f);
   }
 };
 
@@ -501,7 +493,7 @@ void RGWPSCreateNotif_ObjStore_S3::execute() {
     ldout(s->cct, 20) << "successfully auto-generated unique topic '" << unique_topic_name << "'" << dendl;
     // generate the notification
     rgw::notify::EventTypeList events;
-    op_ret = b->create_notification(unique_topic_name, c.events, notif_name);
+    op_ret = b->create_notification(unique_topic_name, c.events, std::make_optional(c.filter), notif_name);
     if (op_ret < 0) {
       ldout(s->cct, 1) << "failed to auto-generate notification for unique topic '" << unique_topic_name <<
         "', ret=" << op_ret << dendl;
@@ -653,8 +645,6 @@ private:
     return 0;
   }
 
-  void add_notification_to_list(const rgw_pubsub_topic_filter& topic_filter);
-
 public:
   void execute() override;
   void send_response() override {
@@ -673,14 +663,6 @@ public:
   const char* name() const override { return "pubsub_notifications_get_s3"; }
 };
 
-void RGWPSListNotifs_ObjStore_S3::add_notification_to_list(const rgw_pubsub_topic_filter& topic_filter) {
-    rgw_pubsub_s3_notification notification;
-    notification.id = topic_filter.s3_id;
-    notification.topic_arn = topic_filter.topic.arn;
-    notification.events = topic_filter.events;
-    notifications.list.push_back(notification);
-}
-
 void RGWPSListNotifs_ObjStore_S3::execute() {
   ups.emplace(store, s->owner.get_id());
   auto b = ups->get_bucket(bucket_info.bucket);
@@ -697,7 +679,7 @@ void RGWPSListNotifs_ObjStore_S3::execute() {
     // get info of a specific notification
     const auto unique_topic = find_unique_topic(bucket_topics, notif_name);
     if (unique_topic) {
-      add_notification_to_list(unique_topic->get());
+      notifications.list.emplace_back(unique_topic->get());
       return;
     }
     op_ret = -ENOENT;
@@ -710,7 +692,7 @@ void RGWPSListNotifs_ObjStore_S3::execute() {
         // not an s3 notification
         continue;
     }
-    add_notification_to_list(topic.second);
+    notifications.list.emplace_back(topic.second);
   }
 }
 
index d565a902a237503fa39142baf444e3b06ba982d0..5d2c1482f5ce19a747361e722370b384fbe6c74b 100644 (file)
@@ -1077,6 +1077,16 @@ public:
   }
 };
 
+bool match(const rgw_pubsub_topic_filter& filter, const std::string& key_name, rgw::notify::EventType event_type) {
+  if (!match(filter.events, event_type)) {
+    return false;
+  }
+  if (!match(filter.s3_filter.key_filter, key_name)) {
+    return false;
+  }
+  return true;
+}
+
 class RGWPSFindBucketTopicsCR : public RGWCoroutine {
   RGWDataSyncEnv *sync_env;
   PSEnvRef env;
@@ -1144,9 +1154,7 @@ public:
       for (auto& titer : bucket_topics.topics) {
         auto& topic_filter = titer.second;
         auto& info = topic_filter.topic;
-        // if event list is defined but event does not match any in the list, we skip to the next one
-        if (!topic_filter.events.empty() &&
-            std::find(topic_filter.events.begin(), topic_filter.events.end(), event_type) == topic_filter.events.end()) {
+        if (!match(topic_filter, key.name, event_type)) {
           continue;
         }
         std::shared_ptr<PSTopicConfig> tc = std::make_shared<PSTopicConfig>();
index a3b7807582e45302e00ac05e0135365d52828a4d..73fb3ea24ecd4bf2be2c78a1ff46010b03165970 100644 (file)
@@ -28,7 +28,7 @@ from nose.tools import assert_not_equal, assert_equal
 # configure logging for the tests module
 log = logging.getLogger(__name__)
 
-skip_push_tests = True
+skip_push_tests = False
 
 ####################################
 # utility functions for pubsub tests
@@ -152,7 +152,17 @@ class AMQPReceiver(object):
     def __init__(self, exchange, topic):
         import pika
         hostname = get_ip()
-        connection = pika.BlockingConnection(pika.ConnectionParameters(host=hostname, port=rabbitmq_port))
+        remaining_retries = 10
+        while remaining_retries > 0:
+            try:
+                connection = pika.BlockingConnection(pika.ConnectionParameters(host=hostname, port=rabbitmq_port))
+                break
+            except Exception as error:
+                remaining_retries -= 1
+                print 'failed to connect to rabbitmq (remaining retries ' + str(remaining_retries) + '): ' + str(error)
+
+        if remaining_retries == 0:
+            raise Exception('failed to connect to rabbitmq - no retries left')
 
         self.channel = connection.channel()
         self.channel.exchange_declare(exchange=exchange, exchange_type='topic', durable=True)
@@ -723,6 +733,208 @@ def test_ps_s3_notification_on_master():
     zones[0].delete_bucket(bucket_name)
 
 
+def ps_s3_notification_filter(on_master):
+    """ test s3 notification filter on master """
+    if skip_push_tests:
+        return SkipTest("PubSub push tests don't run in teuthology")
+    hostname = get_ip()
+    proc = init_rabbitmq()
+    if proc is  None:
+        return SkipTest('end2end amqp tests require rabbitmq-server installed')
+    if on_master:
+        zones, _  = init_env(require_ps=False)
+        ps_zone = zones[0]
+    else:
+        zones, ps_zones  = init_env(require_ps=True)
+        ps_zone = ps_zones[0]
+
+    realm = get_realm()
+    zonegroup = realm.master_zonegroup()
+    
+    # create bucket
+    bucket_name = gen_bucket_name()
+    bucket = zones[0].create_bucket(bucket_name)
+    topic_name = bucket_name + TOPIC_SUFFIX
+
+    # start amqp receivers
+    exchange = 'ex1'
+    task, receiver = create_amqp_receiver_thread(exchange, topic_name)
+    task.start()
+
+    # create s3 topic
+    endpoint_address = 'amqp://' + hostname
+    endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=broker'
+    if on_master:
+        topic_conf = PSTopicS3(ps_zone.conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
+        topic_arn = topic_conf.set_config()
+    else:
+        topic_conf = PSTopic(ps_zone.conn, topic_name, endpoint=endpoint_address, endpoint_args=endpoint_args)
+        result, _ = topic_conf.set_config()
+        parsed_result = json.loads(result)
+        topic_arn = parsed_result['arn']
+        zone_meta_checkpoint(ps_zone.zone)
+
+    # create s3 notification
+    notification_name = bucket_name + NOTIFICATION_SUFFIX
+    topic_conf_list = [{'Id': notification_name+'_1',
+                        'TopicArn': topic_arn,
+                        'Events': ['s3:ObjectCreated:*'],
+                       'Filter': {
+                         'Key': {
+                            'FilterRules': [{'Name': 'prefix', 'Value': 'hello'}]
+                         }
+                        }
+                       },
+                       {'Id': notification_name+'_2',
+                        'TopicArn': topic_arn,
+                        'Events': ['s3:ObjectCreated:*'],
+                       'Filter': {
+                         'Key': {
+                            'FilterRules': [{'Name': 'prefix', 'Value': 'world'},
+                                            {'Name': 'suffix', 'Value': 'log'}]
+                         }
+                        }
+                       },
+                       {'Id': notification_name+'_3',
+                        'TopicArn': topic_arn,
+                        'Events': [],
+                       'Filter': {
+                          'Key': {
+                            'FilterRules': [{'Name': 'regex', 'Value': '([a-z]+)\\.txt'}]
+                         }
+                        }
+                       }]
+
+    s3_notification_conf = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list)
+    result, status = s3_notification_conf.set_config()
+    assert_equal(status/100, 2)
+
+    if on_master:
+        topic_conf_list = [{'Id': notification_name+'_4',
+                            'TopicArn': topic_arn,
+                            'Events': ['s3:ObjectCreated:*', 's3:ObjectRemoved:*'],
+                           'Filter': {
+                               'Metadata': {
+                                    'FilterRules': [{'Name': 'x-amz-meta-foo', 'Value': 'bar'},
+                                                    {'Name': 'x-amz-meta-hello', 'Value': 'world'}]
+                               },
+                                'Key': {
+                                    'FilterRules': [{'Name': 'regex', 'Value': '([a-z]+)'}]
+                                }
+                            }
+                            }]
+
+        try:
+            s3_notification_conf4 = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list)
+            _, status = s3_notification_conf4.set_config()
+            assert_equal(status/100, 2)
+            skip_notif4 = False
+        except Exception as error:
+            print 'note: metadata filter is not supported by boto3 - skipping test'
+            skip_notif4 = True
+    else:
+        print 'filtering by attributes only supported on master zone'
+        skip_notif4 = True
+
+
+    # get all notifications
+    result, status = s3_notification_conf.get_config()
+    assert_equal(status/100, 2)
+    for conf in result['TopicConfigurations']:
+        filter_name = conf['Filter']['Key']['FilterRules'][0]['Name']
+        assert filter_name == 'prefix' or filter_name == 'suffix' or filter_name == 'regex', filter_name
+
+    if not skip_notif4:
+        result, status = s3_notification_conf4.get_config(notification=notification_name+'_4')
+        assert_equal(status/100, 2)
+        filter_name = result['NotificationConfiguration']['TopicConfiguration']['Filter']['S3Metadata']['FilterRule'][0]['Name']
+        assert filter_name == 'x-amz-meta-foo' or filter_name == 'x-amz-meta-hello'
+
+    expected_in1 = ['hello.kaboom', 'hello.txt', 'hello123.txt', 'hello']
+    expected_in2 = ['world1.log', 'world2log', 'world3.log']
+    expected_in3 = ['hello.txt', 'hell.txt', 'worldlog.txt']
+    expected_in4 = ['foo', 'bar', 'hello', 'world']
+    filtered = ['hell.kaboom', 'world.og', 'world.logg', 'he123ll.txt', 'wo', 'log', 'h', 'txt', 'world.log.txt']
+    filtered_with_attr = ['nofoo', 'nobar', 'nohello', 'noworld']
+    # create objects in bucket
+    for key_name in expected_in1:
+        key = bucket.new_key(key_name)
+        key.set_contents_from_string('bar')
+    for key_name in expected_in2:
+        key = bucket.new_key(key_name)
+        key.set_contents_from_string('bar')
+    for key_name in expected_in3:
+        key = bucket.new_key(key_name)
+        key.set_contents_from_string('bar')
+    if not skip_notif4:
+        for key_name in expected_in4:
+            key = bucket.new_key(key_name)
+            key.set_metadata('foo', 'bar')
+            key.set_metadata('hello', 'world')
+            key.set_metadata('goodbye', 'cruel world')
+            key.set_contents_from_string('bar')
+    for key_name in filtered:
+        key = bucket.new_key(key_name)
+        key.set_contents_from_string('bar')
+    for key_name in filtered_with_attr:
+        key.set_metadata('foo', 'nobar')
+        key.set_metadata('hello', 'noworld')
+        key.set_metadata('goodbye', 'cruel world')
+        key = bucket.new_key(key_name)
+        key.set_contents_from_string('bar')
+
+    if on_master:
+        print 'wait for 5sec for the messages...'
+        time.sleep(5)
+    else:
+        zone_bucket_checkpoint(ps_zone.zone, zones[0].zone, bucket_name)
+
+    found_in1 = []
+    found_in2 = []
+    found_in3 = []
+    found_in4 = []
+
+    for event in receiver.get_and_reset_events():
+        notif_id = event['s3']['configurationId']
+        key_name = event['s3']['object']['key']
+        if notif_id == notification_name+'_1':
+            found_in1.append(key_name)
+        elif notif_id == notification_name+'_2':
+            found_in2.append(key_name)
+        elif notif_id == notification_name+'_3':
+            found_in3.append(key_name)
+        elif not skip_notif4 and notif_id == notification_name+'_4':
+            found_in4.append(key_name)
+        else:
+            assert False, 'invalid notification: ' + notif_id
+
+    assert_equal(set(found_in1), set(expected_in1))
+    assert_equal(set(found_in2), set(expected_in2))
+    assert_equal(set(found_in3), set(expected_in3))
+    if not skip_notif4:
+        assert_equal(set(found_in4), set(expected_in4))
+
+    # cleanup
+    s3_notification_conf.del_config()
+    if not skip_notif4:
+        s3_notification_conf4.del_config()
+    topic_conf.del_config()
+    # delete the bucket
+    for key in bucket.list():
+        key.delete()
+    zones[0].delete_bucket(bucket_name)
+    stop_amqp_receiver(receiver, task)
+    clean_rabbitmq(proc)
+
+
+def test_ps_s3_notification_filter_on_master():
+    ps_s3_notification_filter(on_master=True)
+
+
+def test_ps_s3_notification_filter():
+    ps_s3_notification_filter(on_master=False)
+
+
 def test_ps_s3_notification_errors_on_master():
     """ test s3 notification set/get/delete on master """
     zones, _  = init_env(require_ps=False)