]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw/pubsub: notification filtering by object tags
authorYuval Lifshitz <yuvalif@yahoo.com>
Sun, 24 Nov 2019 13:33:00 +0000 (15:33 +0200)
committerYuval Lifshitz <ylifshit@redhat.com>
Sun, 26 Apr 2020 11:22:29 +0000 (14:22 +0300)
Signed-off-by: Yuval Lifshitz <yuvalif@yahoo.com>
(cherry picked from commit d19474f3d7b4e0a16cc2ef55f2bb4e3fd027545c)
Signed-off-by: Yuval Lifshitz <yuvalif@yahoo.com>
Conflicts:
doc/radosgw/notifications.rst
examples/boto3/README.md
examples/boto3/notification_filters.py
examples/boto3/service-2.sdk-extras.json
src/common/ceph_json.h
src/rgw/rgw_common.cc
src/rgw/rgw_common.h
src/rgw/rgw_op.cc
src/rgw/rgw_pubsub.h
src/test/rgw/rgw_multi/tests_ps.py

16 files changed:
doc/radosgw/notifications.rst
doc/radosgw/pubsub-module.rst
doc/radosgw/s3-notification-compatibility.rst
doc/radosgw/s3/bucketops.rst
src/common/ceph_json.h
src/rgw/rgw_auth_s3.cc
src/rgw/rgw_auth_s3.h
src/rgw/rgw_common.h
src/rgw/rgw_loadgen.cc
src/rgw/rgw_notify.cc
src/rgw/rgw_op.cc
src/rgw/rgw_pubsub.cc
src/rgw/rgw_pubsub.h
src/rgw/rgw_rest_client.cc
src/test/rgw/rgw_multi/tests_ps.py
src/test/rgw/rgw_multi/zone_ps.py

index 2facfab7a42653c892a00377929745d9f358a58f..9ec16ea83d10e7fd84d1f3cf2bb479a7795643df 100644 (file)
@@ -17,8 +17,8 @@ user can only manage its own topics, and can only associate them with buckets it
 
 In order to send notifications for events for a specific bucket, a notification entity needs to be created. A
 notification can be created on a subset of event types, or for all event types (default).
-The notification may also filter out events based on preffix/suffix and/or regular expression matching of the keys. As well as, 
-on the metadata attributes attached to the object.
+The notification may also filter out events based on prefix/suffix and/or regular expression matching of the keys. As well as,
+on the metadata attributes attached to the object, or the object tags.
 There can be multiple notifications for any specific topic, and the same topic could be used for multiple notifications.
 
 REST API has been defined to provide configuration and control interfaces for the bucket notification
@@ -283,7 +283,8 @@ pushed or pulled using the pubsub sync module.
                    "eTag":"",
                    "versionId":"",
                    "sequencer": "",
-                   "metadata":[]
+                   "metadata":[],
+                   "tags":[]
                }
            },
            "eventId":"",
@@ -308,6 +309,7 @@ pushed or pulled using the pubsub sync module.
 - s3.object.version: object version in case of versioned bucket
 - s3.object.sequencer: monotonically increasing identifier of the change per object (hexadecimal format)
 - s3.object.metadata: any metadata set on the object sent as: ``x-amz-meta-`` (an extension to the S3 notification API) 
+- s3.object.tags: any tags set on the objcet (an extension to the S3 notification API)
 - s3.eventId: unique ID of the event, that could be used for acking (an extension to the S3 notification API)
 
 .. _PubSub Module : ../pubsub-module
index 03a0c126e293f9fd3f9e063033cab5064a584e2d..cf3951fec9450f3972835f3fe0f190ff32b538b7 100644 (file)
@@ -269,6 +269,7 @@ Detailed under: `Bucket Operations`_.
       the associated subscription will not be deleted automatically (any events of the deleted bucket could still be access),
       and will have to be deleted explicitly with the subscription deletion API
     - Filtering based on metadata (which is an extension to S3) is not supported, and such rules will be ignored
+    - Filtering based on tags (which is an extension to S3) is not supported, and such rules will be ignored
 
 
 Non S3-Compliant Notifications
@@ -495,7 +496,8 @@ the events will have an S3-compatible record format (JSON):
                    "eTag":"",
                    "versionId":"",
                    "sequencer":"",
-                   "metadata":[]
+                   "metadata":[],
+                   "tags":[]
                }
            },
            "eventId":"",
@@ -519,6 +521,7 @@ the events will have an S3-compatible record format (JSON):
 - s3.object.version: object version in case of versioned bucket
 - s3.object.sequencer: monotonically increasing identifier of the change per object (hexadecimal format)
 - s3.object.metadata: not supported (an extension to the S3 notification API)
+- s3.object.tags: not supported (an extension to the S3 notification API)
 - s3.eventId: unique ID of the event, that could be used for acking (an extension to the S3 notification API)
 
 In case that the subscription was not created via a non S3-compatible notification, 
index 6cc6ac0283f1d3cacdc088b086e4ac03dc7b5d74..ffbceca64d4e95e3c27ef50edd37ffb6cc663477 100644 (file)
@@ -49,6 +49,8 @@ Ceph's bucket notification API has the following extensions:
 
   - Filtering based on metadata attributes attached to the object
 
+  - Filtering based on object tags
+
 - Filtering overlapping is allowed, so that same event could be sent as different notification
 
 
index 7c94a835d5fc3e41621fe955eef1cd9304da35e5..5d77049210f8d8609ad8ff5ebb9491da77415de6 100644 (file)
@@ -509,7 +509,13 @@ Parameters are XML encoded in the body of the request, in the following format:
                         <Name></Name>
                         <Value></Value>
                     </FilterRule>
-                </s3Metadata>
+                </S3Metadata>
+                <S3Tags>
+                    <FilterRule>
+                        <Name></Name>
+                        <Value></Value>
+                    </FilterRule>
+                </S3Tags>
             </Filter>
        </TopicConfiguration>
    </NotificationConfiguration>
@@ -528,15 +534,19 @@ Parameters are XML encoded in the body of the request, in the following format:
 | ``Event``                     | String    | List of supported events see: `S3 Notification Compatibility`_.  Multiple ``Event``  | No       |
 |                               |           | entities can be used. If omitted, all events are handled                             |          |
 +-------------------------------+-----------+--------------------------------------------------------------------------------------+----------+
-| ``Filter``                    | Container | Holding ``S3Key`` and ``S3Metadata`` entities                                        | No       |
+| ``Filter``                    | Container | Holding ``S3Key``, ``S3Metadata`` and ``S3Tags`` entities                            | No       |
 +-------------------------------+-----------+--------------------------------------------------------------------------------------+----------+
 | ``S3Key``                     | Container | Holding a list of ``FilterRule`` entities, for filtering based on object key.        | No       |
 |                               |           | At most, 3 entities may be in the list, with ``Name`` be ``prefix``, ``suffix`` or   |          |
 |                               |           | ``regex``. All filter rules in the list must match for the filter to match.          |          |
 +-------------------------------+-----------+--------------------------------------------------------------------------------------+----------+
 | ``S3Metadata``                | Container | Holding a list of ``FilterRule`` entities, for filtering based on object metadata.   | No       |
-|                               |           | All filter rules in the list must match the ones defined on the object. The object,  |          |
-|                               |           | have other metadata entitied not listed in the filter.                               |          |
+|                               |           | All filter rules in the list must match the metadata defined on the object. However, |          |
+|                               |           | the object still match if it has other metadata entries not listed in the filter.    |          |
++-------------------------------+-----------+--------------------------------------------------------------------------------------+----------+
+| ``S3Tags``                    | Container | Holding a list of ``FilterRule`` entities, for filtering based on object tags.       | No       |
+|                               |           | All filter rules in the list must match the tags defined on the object. However,     |          |
+|                               |           | the object still match it it has other tags not listed in the filter.                |          |
 +-------------------------------+-----------+--------------------------------------------------------------------------------------+----------+
 | ``S3Key.FilterRule``          | Container | Holding ``Name`` and ``Value`` entities. ``Name`` would  be: ``prefix``, ``suffix``  | Yes      |
 |                               |           | or ``regex``. The ``Value`` would hold the key prefix, key suffix or a regular       |          |
@@ -544,7 +554,10 @@ Parameters are XML encoded in the body of the request, in the following format:
 +-------------------------------+-----------+--------------------------------------------------------------------------------------+----------+
 | ``S3Metadata.FilterRule``     | Container | Holding ``Name`` and ``Value`` entities. ``Name`` would be the name of the metadata  | Yes      |
 |                               |           | attribute (e.g. ``x-amz-meta-xxx``). The ``Value`` would be the expected value for   |          | 
-|                               |           | this attribute                                                                       |          |
+|                               |           | this attribute.                                                                      |          |
++-------------------------------+-----------+--------------------------------------------------------------------------------------+----------+
+| ``S3Tags.FilterRule``         | Container | Holding ``Name`` and ``Value`` entities. ``Name`` would be the tag key,              |  Yes     |
+|                               |           | and ``Value`` would be the tag value.                                                |          | 
 +-------------------------------+-----------+--------------------------------------------------------------------------------------+----------+
 
 
@@ -647,7 +660,13 @@ Response is XML encoded in the body of the request, in the following format:
                         <Name></Name>
                         <Value></Value>
                     </FilterRule>
-                </s3Metadata>
+                </S3Metadata>
+                <S3Tags>
+                    <FilterRule>
+                        <Name></Name>
+                        <Value></Value>
+                    </FilterRule>
+                </S3Tags>
             </Filter>
        </TopicConfiguration>
    </NotificationConfiguration>
@@ -679,4 +698,4 @@ HTTP Response
 | ``404``       | NoSuchKey             | The notification does not exist (if provided)            |
 +---------------+-----------------------+----------------------------------------------------------+
 
-.. _S3 Notification Compatibility: ../s3-notification-compatibility
+.. _S3 Notification Compatibility: ../../s3-notification-compatibility
index ba841d269daee805d5a12b578dca324fc5c34066..c77dddbe37c85f9b12bcad3df49a043470d1df61 100644 (file)
@@ -2,6 +2,7 @@
 #define CEPH_JSON_H
 
 #include <include/types.h>
+#include <boost/container/flat_map.hpp>
 
 #ifdef _ASSERT_H
 #define NEED_ASSERT_H
@@ -269,6 +270,22 @@ void decode_json_obj(multimap<K, V>& m, JSONObj *obj)
   }
 }
 
+template<class K, class V>
+void decode_json_obj(boost::container::flat_map<K, V>& m, JSONObj *obj)
+{
+  m.clear();
+
+  JSONObjIter iter = obj->find_first();
+
+  for (; !iter.end(); ++iter) {
+    K key;
+    V val;
+    JSONObj *o = *iter;
+    JSONDecoder::decode_json("key", key, o);
+    JSONDecoder::decode_json("val", val, o);
+    m[key] = val;
+  }
+}
 template<class C>
 void decode_json_obj(C& container, void (*cb)(C&, JSONObj *obj), JSONObj *obj)
 {
@@ -407,6 +424,7 @@ static void encode_json(const char *name, const std::list<T>& l, ceph::Formatter
   }
   f->close_section();
 }
+
 template<class T>
 static void encode_json(const char *name, const std::deque<T>& l, ceph::Formatter *f)
 {
@@ -416,6 +434,7 @@ static void encode_json(const char *name, const std::deque<T>& l, ceph::Formatte
   }
   f->close_section();
 }
+
 template<class T, class Compare = std::less<T> >
 static void encode_json(const char *name, const std::set<T, Compare>& l, ceph::Formatter *f)
 {
@@ -436,7 +455,7 @@ static void encode_json(const char *name, const std::vector<T>& l, ceph::Formatt
   f->close_section();
 }
 
-template<class K, class V, class C = std::less<K> >
+template<class K, class V, class C = std::less<K>>
 static void encode_json(const char *name, const std::map<K, V, C>& m, ceph::Formatter *f)
 {
   f->open_array_section(name);
@@ -461,6 +480,20 @@ static void encode_json(const char *name, const std::multimap<K, V>& m, ceph::Fo
   }
   f->close_section();
 }
+
+template<class K, class V>
+static void encode_json(const char *name, const boost::container::flat_map<K, V>& m, ceph::Formatter *f)
+{
+  f->open_array_section(name);
+  for (auto i = m.begin(); i != m.end(); ++i) {
+    f->open_object_section("entry");
+    encode_json("key", i->first, f);
+    encode_json("val", i->second, f);
+    f->close_section();
+  }
+  f->close_section();
+}
+
 template<class K, class V>
 void encode_json_map(const char *name, const map<K, V>& m, ceph::Formatter *f)
 {
index 0780101fb4aeab0652d6aab2c900ac634990224e..dc96471ff7a86967eecac53f4d50544f9531dc75 100644 (file)
@@ -56,7 +56,7 @@ static const auto signed_subresources = {
  */
 
 static std::string
-get_canon_amz_hdr(const std::map<std::string, std::string>& meta_map)
+get_canon_amz_hdr(const meta_map_t& meta_map)
 {
   std::string dest;
 
@@ -117,8 +117,8 @@ void rgw_create_s3_canonical_header(
   const char* const content_md5,
   const char* const content_type,
   const char* const date,
-  const std::map<std::string, std::string>& meta_map,
-  const std::map<std::string, std::string>& qs_map,
+  const meta_map_t& meta_map,
+  const meta_map_t& qs_map,
   const char* const request_uri,
   const std::map<std::string, std::string>& sub_resources,
   std::string& dest_str)
@@ -157,7 +157,7 @@ static inline bool is_base64_for_content_md5(unsigned char c) {
 }
 
 static inline void get_v2_qs_map(const req_info& info,
-                                std::map<std::string, std::string>& qs_map) {
+                                meta_map_t& qs_map) {
   const auto& params = const_cast<RGWHTTPArgs&>(info.args).get_params();
   for (const auto& elt : params) {
     std::string k = boost::algorithm::to_lower_copy(elt.first);
@@ -190,7 +190,7 @@ bool rgw_create_s3_canonical_header(const req_info& info,
   const char *content_type = info.env->get("CONTENT_TYPE");
 
   std::string date;
-  std::map<std::string, std::string> qs_map;
+  meta_map_t qs_map;
 
   if (qsr) {
     get_v2_qs_map(info, qs_map); // handle qs metadata
index 44a662cb92ed2e6abf6ebb4732a04de6a81068e7..519f839536fc3982baa25729cfa87e8a7a13c036 100644 (file)
@@ -419,8 +419,8 @@ void rgw_create_s3_canonical_header(
   const char *content_md5,
   const char *content_type,
   const char *date,
-  const std::map<std::string, std::string>& meta_map,
-  const std::map<std::string, std::string>& qs_map,
+  const meta_map_t& meta_map,
+  const meta_map_t& qs_map,
   const char *request_uri,
   const std::map<std::string, std::string>& sub_resources,
   std::string& dest_str);
index 8f31d7508d65f9690aefd108ceb14ea88be981f2..1a7603320979efdccbb5601340c7b279667b73ee 100644 (file)
@@ -1632,11 +1632,12 @@ namespace rgw {
   }
 }
 
+using meta_map_t = boost::container::flat_map <std::string, std::string>;
 
 struct req_info {
   const RGWEnv *env;
   RGWHTTPArgs args;
-  map<string, string> x_meta_map;
+  meta_map_t x_meta_map;
 
   string host;
   const char *method;
@@ -2447,7 +2448,7 @@ static inline uint64_t rgw_rounded_objsize_kb(uint64_t bytes)
 /* implement combining step, S3 header canonicalization;  k is a
  * valid header and in lc form */
 static inline void add_amz_meta_header(
-  std::map<std::string, std::string>& x_meta_map,
+  meta_map_t& x_meta_map,
   const std::string& k,
   const std::string& v)
 {
index 9b5e3401436072bfe92f6746c7fffdc5f9ea4df1..e13520dd8c7ea005416f060a643ae29da6275193 100644 (file)
@@ -18,7 +18,7 @@ void RGWLoadGenRequestEnv::set_date(utime_t& tm)
 
 int RGWLoadGenRequestEnv::sign(RGWAccessKey& access_key)
 {
-  map<string, string> meta_map;
+  meta_map_t meta_map;
   map<string, string> sub_resources;
 
   string canonical_header;
@@ -29,7 +29,7 @@ int RGWLoadGenRequestEnv::sign(RGWAccessKey& access_key)
                                  content_type.c_str(),
                                  date_str.c_str(),
                                  meta_map,
-                                map<string, string>{},
+                                                meta_map_t{},
                                  uri.c_str(),
                                  sub_resources,
                                  canonical_header);
index 8d87c4a2f61efff641d768751ee168baf8b07168..75a5eeb0ca948db73a45b381b5a94f38507bba09 100644 (file)
@@ -40,6 +40,8 @@ void populate_record_from_request(const req_state *s,
   record.bucket_id = s->bucket.bucket_id;
   // pass meta data
   record.x_meta_map = s->info.x_meta_map;
+  // pass tags
+  record.tags = s->tagset.get_tags();
 }
 
 bool match(const rgw_pubsub_topic_filter& filter, const req_state* s, EventType event) {
@@ -52,6 +54,9 @@ bool match(const rgw_pubsub_topic_filter& filter, const req_state* s, EventType
   if (!::match(filter.s3_filter.metadata_filter, s->info.x_meta_map)) {
     return false;
   }
+  if (!::match(filter.s3_filter.tag_filter, s->tagset.get_tags())) {
+    return false;
+  }
   return true;
 }
 
index c6033823dd4151425b70ad7eb18be6573bad459e..5bc76c4d6b45f0c22538991362f0dc4dd6515693 100644 (file)
@@ -986,6 +986,26 @@ int RGWGetObj::verify_permission()
   return 0;
 }
 
+// cache the objects tags into the requests
+// use inside try/catch as "decode()" may throw
+void populate_tags_in_request(req_state* s, const std::map<std::string, bufferlist>& attrs) {
+  const auto attr_iter = attrs.find(RGW_ATTR_TAGS);
+  if (attr_iter != attrs.end()) {
+    auto bliter = attr_iter->second.cbegin();
+    decode(s->tagset, bliter);
+  }
+}
+
+// cache the objects metadata into the request
+void populate_metadata_in_request(req_state* s, std::map<std::string, bufferlist>& attrs) {
+  for (auto& attr : attrs) {
+    if (boost::algorithm::starts_with(attr.first, RGW_ATTR_META_PREFIX)) {
+      std::string_view key(attr.first);
+      key.remove_prefix(sizeof(RGW_ATTR_PREFIX)-1);
+      s->info.x_meta_map.emplace(key, attr.second.c_str());
+    }
+  }
+}
 
 int RGWOp::verify_op_mask()
 {
@@ -4638,22 +4658,23 @@ void RGWDeleteObj::execute()
   bool check_obj_lock = obj.key.have_instance() && s->bucket_info.obj_lock_enabled();
 
   if (!s->object.empty()) {
+    /* check if obj exists, read orig attrs */
+    op_ret = get_obj_attrs(store, s, obj, attrs);
+    
     if (need_object_expiration() || multipart_delete) {
-      /* check if obj exists, read orig attrs */
-      op_ret = get_obj_attrs(store, s, obj, attrs);
       if (op_ret < 0) {
+        // failed to get attributes
         return;
       }
     }
 
     if (check_obj_lock) {
-      /* check if obj exists, read orig attrs */
-      op_ret = get_obj_attrs(store, s, obj, attrs);
       if (op_ret < 0) {
         if (op_ret == -ENOENT) {
           /* object maybe delete_marker, skip check_obj_lock*/
           check_obj_lock = false;
         } else {
+          // failed to get attributes and check_obj_lock is needed
           return;
         }
       }
@@ -4758,6 +4779,15 @@ void RGWDeleteObj::execute()
     if (op_ret == -ERR_PRECONDITION_FAILED && no_precondition_error) {
       op_ret = 0;
     }
+
+    // cache the objects tags and metadata into the requests
+    // so it could be used in the notification mechanism
+    try {
+      populate_tags_in_request(s, attrs);
+    } catch (buffer::error& err) {
+      ldpp_dout(this, 5) << "WARNING: failed to populate delete request with object tags: " << err.what() << dendl;
+    }
+    populate_metadata_in_request(s, attrs);
   } else {
     op_ret = -EINVAL;
   }
index 404bb45ade0e2ba0989bb1c08c8656d8b9322042..8f945e82296c24c57b2090d00b3476126833c5cd 100644 (file)
@@ -76,8 +76,8 @@ bool rgw_s3_key_filter::has_content() const {
     return !(prefix_rule.empty() && suffix_rule.empty() && regex_rule.empty());
 }
 
-bool rgw_s3_metadata_filter::decode_xml(XMLObj* obj) {
-  metadata.clear();
+bool rgw_s3_key_value_filter::decode_xml(XMLObj* obj) {
+  kvl.clear();
   XMLObjIter iter = obj->find("FilterRule");
   XMLObj *o;
 
@@ -89,13 +89,13 @@ bool rgw_s3_metadata_filter::decode_xml(XMLObj* obj) {
   while ((o = iter.get_next())) {
     RGWXMLDecoder::decode_xml("Name", key, o, throw_if_missing);
     RGWXMLDecoder::decode_xml("Value", value, o, throw_if_missing);
-    metadata.emplace(key, value);
+    kvl.emplace(key, value);
   }
   return true;
 }
 
-void rgw_s3_metadata_filter::dump_xml(Formatter *f) const {
-  for (const auto& key_value : metadata) {
+void rgw_s3_key_value_filter::dump_xml(Formatter *f) const {
+  for (const auto& key_value : kvl) {
     f->open_object_section("FilterRule");
     ::encode_xml("Name", key_value.first, f);
     ::encode_xml("Value", key_value.second, f);
@@ -103,13 +103,14 @@ void rgw_s3_metadata_filter::dump_xml(Formatter *f) const {
   }
 }
 
-bool rgw_s3_metadata_filter::has_content() const {
-    return !metadata.empty();
+bool rgw_s3_key_value_filter::has_content() const {
+    return !kvl.empty();
 }
 
 bool rgw_s3_filter::decode_xml(XMLObj* obj) {
     RGWXMLDecoder::decode_xml("S3Key", key_filter, obj);
     RGWXMLDecoder::decode_xml("S3Metadata", metadata_filter, obj);
+    RGWXMLDecoder::decode_xml("S3Tags", tag_filter, obj);
   return true;
 }
 
@@ -120,11 +121,15 @@ void rgw_s3_filter::dump_xml(Formatter *f) const {
   if (metadata_filter.has_content()) {
       ::encode_xml("S3Metadata", metadata_filter, f);
   }
+  if (tag_filter.has_content()) {
+      ::encode_xml("S3Tags", tag_filter, f);
+  }
 }
 
 bool rgw_s3_filter::has_content() const {
     return key_filter.has_content()  ||
-           metadata_filter.has_content();
+           metadata_filter.has_content() ||
+           tag_filter.has_content();
 }
 
 bool match(const rgw_s3_key_filter& filter, const std::string& key) {
@@ -161,10 +166,10 @@ bool match(const rgw_s3_key_filter& filter, const std::string& key) {
   return true;
 }
 
-bool match(const rgw_s3_metadata_filter& filter, const Metadata& metadata) {
-  // all filter pairs must exist with the same value in the object's metadata
-  // object metadata may include items not in the filter
-  return std::includes(metadata.begin(), metadata.end(), filter.metadata.begin(), filter.metadata.end());
+bool match(const rgw_s3_key_value_filter& filter, const KeyValueList& kvl) {
+  // all filter pairs must exist with the same value in the object's metadata/tags
+  // object metadata/tags may include items not in the filter
+  return std::includes(kvl.begin(), kvl.end(), filter.kvl.begin(), filter.kvl.end());
 }
 
 bool match(const rgw::notify::EventTypeList& events, rgw::notify::EventType event) {
@@ -273,6 +278,7 @@ void rgw_pubsub_s3_record::dump(Formatter *f) const {
         encode_json("versionId", object_versionId, f);
         encode_json("sequencer", object_sequencer, f);
         encode_json("metadata", x_meta_map, f);
+        encode_json("tags", tags, f);
     }
   }
   encode_json("eventId", id, f);
index f746b273d9ce8aba75d480d8dbff0ad7c8f7a1b7..fbe6e15dbc80984fd0ba6b42d61b228ff0adb6dd 100644 (file)
@@ -10,6 +10,7 @@
 #include "rgw_rados.h"
 #include "rgw_notify_event_type.h"
 #include "services/svc_sys_obj.h"
+#include <boost/container/flat_map.hpp>
 
 class XMLObj;
 
@@ -41,10 +42,10 @@ struct rgw_s3_key_filter {
 };
 WRITE_CLASS_ENCODER(rgw_s3_key_filter)
 
-using Metadata = std::map<std::string, std::string>;
+using KeyValueList = boost::container::flat_map<std::string, std::string>;
 
-struct rgw_s3_metadata_filter {
-  Metadata metadata;
+struct rgw_s3_key_value_filter {
+  KeyValueList kvl;
   
   bool has_content() const;
   
@@ -53,20 +54,21 @@ struct rgw_s3_metadata_filter {
   
   void encode(bufferlist& bl) const {
     ENCODE_START(1, 1, bl);
-    encode(metadata, bl);
+    encode(kvl, bl);
     ENCODE_FINISH(bl);
   }
   void decode(bufferlist::const_iterator& bl) {
     DECODE_START(1, bl);
-    decode(metadata, bl);
+    decode(kvl, bl);
     DECODE_FINISH(bl);
   }
 };
-WRITE_CLASS_ENCODER(rgw_s3_metadata_filter)
+WRITE_CLASS_ENCODER(rgw_s3_key_value_filter)
 
 struct rgw_s3_filter {
   rgw_s3_key_filter key_filter;
-  rgw_s3_metadata_filter metadata_filter;
+  rgw_s3_key_value_filter metadata_filter;
+  rgw_s3_key_value_filter tag_filter;
 
   bool has_content() const;
   
@@ -74,16 +76,20 @@ struct rgw_s3_filter {
   void dump_xml(Formatter *f) const;
   
   void encode(bufferlist& bl) const {
-    ENCODE_START(1, 1, bl);
+    ENCODE_START(2, 1, bl);
     encode(key_filter, bl);
     encode(metadata_filter, bl);
+    encode(tag_filter, bl);
     ENCODE_FINISH(bl);
   }
 
   void decode(bufferlist::const_iterator& bl) {
-    DECODE_START(1, bl);
+    DECODE_START(2, bl);
     decode(key_filter, bl);
     decode(metadata_filter, bl);
+    if (struct_v >= 2) {
+        decode(tag_filter, bl);
+    }
     DECODE_FINISH(bl);
   }
 };
@@ -109,6 +115,12 @@ class rgw_pubsub_topic_filter;
           <Value></Value>
         </FilterRule>
       </S3Metadata>
+      <S3Tags>
+        <FilterRule>
+          <Name></Name>
+          <Value></Value>
+        </FilterRule>
+      </S3Tags>
     </Filter>
     <Id>notification1</Id>
     <Topic>arn:aws:sns:<region>:<account>:<topic></Topic>
@@ -132,13 +144,13 @@ struct rgw_pubsub_s3_notification {
 
   rgw_pubsub_s3_notification() = default;
   // construct from rgw_pubsub_topic_filter (used by get/list notifications)
-  rgw_pubsub_s3_notification(const rgw_pubsub_topic_filter& topic_filter);
+  explicit rgw_pubsub_s3_notification(const rgw_pubsub_topic_filter& topic_filter);
 };
 
 // return true if the key matches the prefix/suffix/regex rules of the key filter
 bool match(const rgw_s3_key_filter& filter, const std::string& key);
-// return true if the key matches the metadata rules of the metadata filter
-bool match(const rgw_s3_metadata_filter& filter, const Metadata& metadata);
+// return true if the key matches the metadata/tags rules of the metadata/tags filter
+bool match(const rgw_s3_key_value_filter& filter, const KeyValueList& kvl);
 // return true if the event type matches (equal or contained in) one of the events in the list
 bool match(const rgw::notify::EventTypeList& events, rgw::notify::EventType event);
 
@@ -186,6 +198,7 @@ struct rgw_pubsub_s3_notifications {
         "versionId":"",
         "sequencer": "",
         "metadata": ""
+        "tags": ""
       }
     },
     "eventId":"",
@@ -238,10 +251,12 @@ struct rgw_pubsub_s3_record {
   // this is an rgw extension holding the internal bucket id
   std::string bucket_id;
   // meta data
-  std::map<std::string, std::string> x_meta_map;
+  KeyValueList x_meta_map;
+  // tags
+  KeyValueList tags;
 
   void encode(bufferlist& bl) const {
-    ENCODE_START(2, 1, bl);
+    ENCODE_START(3, 1, bl);
     encode(eventVersion, bl);
     encode(eventSource, bl);
     encode(awsRegion, bl);
@@ -264,11 +279,12 @@ struct rgw_pubsub_s3_record {
     encode(id, bl);
     encode(bucket_id, bl);
     encode(x_meta_map, bl);
+    encode(tags, bl);
     ENCODE_FINISH(bl);
   }
 
   void decode(bufferlist::const_iterator& bl) {
-    DECODE_START(2, bl);
+    DECODE_START(3, bl);
     decode(eventVersion, bl);
     decode(eventSource, bl);
     decode(awsRegion, bl);
@@ -293,6 +309,9 @@ struct rgw_pubsub_s3_record {
         decode(bucket_id, bl);
         decode(x_meta_map, bl);
     }
+    if (struct_v >= 3) {
+        decode(tags, bl);
+    }
     DECODE_FINISH(bl);
   }
 
@@ -613,7 +632,7 @@ public:
     // read the list of topics associated with a bucket and populate into result
     // return 0 on success or if no topic was associated with the bucket, error code otherwise
     int get_topics(rgw_pubsub_bucket_topics *result);
-    // adds a topic + filter (event list, and possibly name and metadata filters) to a bucket
+    // adds a topic + filter (event list, and possibly name metadata or tags filters) to a bucket
     // assigning a notification name is optional (needed for S3 compatible notifications)
     // if the topic already exist on the bucket, the filter event list may be updated
     // for S3 compliant notifications the version with: s3_filter and notif_name should be used
index daf6a24e53daa379017a9ec9e22d2abbfa18ff94..c0d1cc1cb376f3febbbae34e99570f122f3bad3d 100644 (file)
@@ -138,7 +138,7 @@ int RGWRESTSimpleRequest::execute(RGWAccessKey& key, const char *_method, const
   headers.push_back(pair<string, string>("HTTP_DATE", date_str));
 
   string canonical_header;
-  map<string, string> meta_map;
+  meta_map_t meta_map;
   map<string, string> sub_resources;
 
   rgw_create_s3_canonical_header(method.c_str(), NULL, NULL, date_str.c_str(),
@@ -293,7 +293,7 @@ int RGWRESTSimpleRequest::forward_request(RGWAccessKey& key, req_info& info, siz
     headers.emplace_back(kv);
   }
 
-  map<string, string>& meta_map = new_info.x_meta_map;
+  meta_map_t& meta_map = new_info.x_meta_map;
   for (const auto& kv: meta_map) {
     headers.emplace_back(kv);
   }
@@ -427,7 +427,7 @@ static void grants_by_type_add_perm(map<int, string>& grants_by_type, int perm,
   }
 }
 
-static void add_grants_headers(map<int, string>& grants, RGWEnv& env, map<string, string>& meta_map)
+static void add_grants_headers(map<int, string>& grants, RGWEnv& env, meta_map_t& meta_map)
 {
   struct grant_type_to_header *t;
 
index 157aca61c9373562208e16a56982260d0e5d8bc0..376f815313e03984a0c15332202b80321783c077 100644 (file)
@@ -20,10 +20,19 @@ from .tests import get_realm, \
     gen_bucket_name, \
     get_user, \
     get_tenant
-from .zone_ps import PSTopic, PSTopicS3, PSNotification, PSSubscription, PSNotificationS3, print_connection_info, delete_all_s3_topics
+from .zone_ps import PSTopic, \
+    PSTopicS3, \
+    PSNotification, \
+    PSSubscription, \
+    PSNotificationS3, \
+    print_connection_info, \
+    delete_all_s3_topics, \
+    put_object_tagging, \
+    get_object_tagging
 from multisite import User
 from nose import SkipTest
 from nose.tools import assert_not_equal, assert_equal
+import boto.s3.tagging
 
 # configure logging for the tests module
 log = logging.getLogger(__name__)
@@ -2597,30 +2606,152 @@ def test_ps_s3_metadata_on_master():
     topic_arn = topic_conf.set_config()
     # create s3 notification
     notification_name = bucket_name + NOTIFICATION_SUFFIX
+    meta_key = 'meta1'
+    meta_value = 'This is my metadata value'
+    meta_prefix = 'x-amz-meta-'
     topic_conf_list = [{'Id': notification_name,'TopicArn': topic_arn,
-                        'Events': ['s3:ObjectCreated:*']
-                       }]
+        'Events': ['s3:ObjectCreated:*', 's3:ObjectRemoved:*'],
+        'Filter': {
+            'Metadata': {
+                'FilterRules': [{'Name': meta_prefix+meta_key, 'Value': meta_value}]
+            }
+        }
+    }]
 
     s3_notification_conf = PSNotificationS3(zones[0].conn, bucket_name, topic_conf_list)
     response, status = s3_notification_conf.set_config()
     assert_equal(status/100, 2)
 
     # create objects in the bucket
-    key = bucket.new_key('foo')
-    key.set_metadata('meta1', 'This is my metadata value')
+    key_name = 'foo'
+    key = bucket.new_key(key_name)
+    key.set_metadata(meta_key, meta_value)
     key.set_contents_from_string('aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa')
-    keys = list(bucket.list())
-    print 'wait for 5sec for the messages...'
+    
+    # create objects in the bucket using COPY
+    bucket.copy_key('copy_of_foo', bucket.name, key.name)
+    # create objects in the bucket using multi-part upload
+    fp = tempfile.TemporaryFile(mode='w')
+    fp.write('bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb')
+    fp.close()
+    uploader = bucket.initiate_multipart_upload('multipart_foo', 
+            metadata={meta_key: meta_value})
+    fp = tempfile.TemporaryFile(mode='r')
+    uploader.upload_part_from_file(fp, 1)
+    uploader.complete_upload()
+    fp.close()
+    print('wait for 5sec for the messages...')
     time.sleep(5)
     # check amqp receiver
-    receiver.verify_s3_events(keys, exact_match=True)
+    event_count = 0
+    for event in receiver.get_and_reset_events():
+        assert_equal(event['Records'][0]['s3']['object']['metadata'][0]['key'], meta_prefix+meta_key)
+        assert_equal(event['Records'][0]['s3']['object']['metadata'][0]['val'], meta_value)
+        event_count +=1
+
+    # only PUT and POST has the metadata value
+    assert_equal(event_count, 2)
+
+    # delete objects
+    for key in bucket.list():
+        key.delete()
+    print('wait for 5sec for the messages...')
+    time.sleep(5)
+    # check amqp receiver
+    event_count = 0
+    for event in receiver.get_and_reset_events():
+        assert_equal(event['Records'][0]['s3']['object']['metadata'][0]['key'], meta_prefix+meta_key)
+        assert_equal(event['Records'][0]['s3']['object']['metadata'][0]['val'], meta_value)
+        event_count +=1
+
+    # all 3 object has metadata when deleted
+    assert_equal(event_count, 3)
 
     # cleanup
     stop_amqp_receiver(receiver, task)
     s3_notification_conf.del_config()
     topic_conf.del_config()
+    # delete the bucket
+    zones[0].delete_bucket(bucket_name)
+    clean_rabbitmq(proc)
+
+
+def test_ps_s3_tags_on_master():
+    """ test s3 notification of tags on master """
+    if skip_push_tests:
+        return SkipTest("PubSub push tests don't run in teuthology")
+    hostname = get_ip()
+    proc = init_rabbitmq()
+    if proc is  None:
+        return SkipTest('end2end amqp tests require rabbitmq-server installed')
+    zones, _  = init_env(require_ps=False)
+    realm = get_realm()
+    zonegroup = realm.master_zonegroup()
+    
+    # create bucket
+    bucket_name = gen_bucket_name()
+    bucket = zones[0].create_bucket(bucket_name)
+    topic_name = bucket_name + TOPIC_SUFFIX
+
+    # start amqp receiver
+    exchange = 'ex1'
+    task, receiver = create_amqp_receiver_thread(exchange, topic_name)
+    task.start()
+
+    # create s3 topic
+    endpoint_address = 'amqp://' + hostname
+    endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=broker'
+    topic_conf = PSTopicS3(zones[0].conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
+    topic_arn = topic_conf.set_config()
+    # create s3 notification
+    notification_name = bucket_name + NOTIFICATION_SUFFIX
+    topic_conf_list = [{'Id': notification_name,'TopicArn': topic_arn,
+        'Events': ['s3:ObjectCreated:*', 's3:ObjectRemoved:*'],
+        'Filter': {
+            'Tags': {
+                'FilterRules': [{'Name': 'hello', 'Value': 'world'}]
+            }
+        }
+    }]
+
+    s3_notification_conf = PSNotificationS3(zones[0].conn, bucket_name, topic_conf_list)
+    response, status = s3_notification_conf.set_config()
+    assert_equal(status/100, 2)
+
+    # create objects in the bucket with tags
+    tags = 'hello=world&ka=boom'
+    key_name1 = 'key1'
+    put_object_tagging(zones[0].conn, bucket_name, key_name1, tags)
+    tags = 'foo=bar&ka=boom'
+    key_name2 = 'key2'
+    put_object_tagging(zones[0].conn, bucket_name, key_name2, tags)
+    key_name3 = 'key3'
+    key = bucket.new_key(key_name3)
+    key.set_contents_from_string('bar')
+    # create objects in the bucket using COPY
+    bucket.copy_key('copy_of_'+key_name1, bucket.name, key_name1)
+    print('wait for 5sec for the messages...')
+    time.sleep(5)
+    expected_tags = [{'val': 'world', 'key': 'hello'}, {'val': 'boom', 'key': 'ka'}]
+    # check amqp receiver
+    for event in receiver.get_and_reset_events():
+        obj_tags =  event['Records'][0]['s3']['object']['tags']
+        assert_equal(obj_tags[0], expected_tags[0])
+
+    # delete the objects
     for key in bucket.list():
         key.delete()
+    print('wait for 5sec for the messages...')
+    time.sleep(5)
+    # check amqp receiver
+    for event in receiver.get_and_reset_events():
+        obj_tags =  event['Records'][0]['s3']['object']['tags']
+        assert_equal(obj_tags[0], expected_tags[0])
+
+    # cleanup
+    stop_amqp_receiver(receiver, task)
+    s3_notification_conf.del_config()
+    topic_conf.del_config()
     # delete the bucket
     zones[0].delete_bucket(bucket_name)
     clean_rabbitmq(proc)
index d9d01cc7bbb5de63ef4fc02875bb243e2222d866..61d32679bc75e272db31e8dafb6998709a25b64c 100644 (file)
@@ -14,6 +14,26 @@ from botocore.client import Config
 
 log = logging.getLogger('rgw_multi.tests')
 
+def put_object_tagging(conn, bucket_name, key, tags):
+    client = boto3.client('s3',
+            endpoint_url='http://'+conn.host+':'+str(conn.port),
+            aws_access_key_id=conn.aws_access_key_id,
+            aws_secret_access_key=conn.aws_secret_access_key,
+            config=Config(signature_version='s3'))
+    return client.put_object(Body='aaaaaaaaaaa', Bucket=bucket_name, Key=key, Tagging=tags)
+
+
+def get_object_tagging(conn, bucket, object_key):
+    client = boto3.client('s3',
+            endpoint_url='http://'+conn.host+':'+str(conn.port),
+            aws_access_key_id=conn.aws_access_key_id,
+            aws_secret_access_key=conn.aws_secret_access_key,
+            config=Config(signature_version='s3'))
+    return client.get_object_tagging(
+                Bucket=bucket, 
+                Key=object_key
+            )
+
 
 class PSZone(Zone):  # pylint: disable=too-many-ancestors
     """ PubSub zone class """