]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw/pubsub: notification filtering by object tags 31878/head
authorYuval Lifshitz <yuvalif@yahoo.com>
Sun, 24 Nov 2019 13:33:00 +0000 (15:33 +0200)
committerYuval Lifshitz <yuvalif@yahoo.com>
Sun, 15 Dec 2019 12:20:41 +0000 (14:20 +0200)
Signed-off-by: Yuval Lifshitz <yuvalif@yahoo.com>
20 files changed:
doc/radosgw/notifications.rst
doc/radosgw/pubsub-module.rst
doc/radosgw/s3-notification-compatibility.rst
doc/radosgw/s3/bucketops.rst
examples/boto3/README.md
examples/boto3/notification_filters.py
examples/boto3/service-2.sdk-extras.json
src/common/ceph_json.h
src/rgw/rgw_auth_s3.cc
src/rgw/rgw_auth_s3.h
src/rgw/rgw_common.cc
src/rgw/rgw_common.h
src/rgw/rgw_loadgen.cc
src/rgw/rgw_notify.cc
src/rgw/rgw_op.cc
src/rgw/rgw_pubsub.cc
src/rgw/rgw_pubsub.h
src/rgw/rgw_rest_client.cc
src/test/rgw/rgw_multi/tests_ps.py
src/test/rgw/rgw_multi/zone_ps.py

index dd6762e10a56946c074ffed25f601318d0915ffa..99e749062ec8603c71e865fdbd36a42ade1bbf91 100644 (file)
@@ -18,7 +18,7 @@ user can only manage its own topics, and can only associate them with buckets it
 In order to send notifications for events for a specific bucket, a notification entity needs to be created. A
 notification can be created on a subset of event types, or for all event types (default).
 The notification may also filter out events based on prefix/suffix and/or regular expression matching of the keys. As well as,
-on the metadata attributes attached to the object.
+on the metadata attributes attached to the object, or the object tags.
 There can be multiple notifications for any specific topic, and the same topic could be used for multiple notifications.
 
 REST API has been defined to provide configuration and control interfaces for the bucket notification
@@ -273,7 +273,8 @@ pushed or pulled using the pubsub sync module.
                    "eTag":"",
                    "versionId":"",
                    "sequencer": "",
-                   "metadata":[]
+                   "metadata":[],
+                   "tags":[]
                }
            },
            "eventId":"",
@@ -298,6 +299,7 @@ pushed or pulled using the pubsub sync module.
 - s3.object.version: object version in case of versioned bucket
 - s3.object.sequencer: monotonically increasing identifier of the change per object (hexadecimal format)
 - s3.object.metadata: any metadata set on the object sent as: ``x-amz-meta-`` (an extension to the S3 notification API) 
+- s3.object.tags: any tags set on the objcet (an extension to the S3 notification API)
 - s3.eventId: unique ID of the event, that could be used for acking (an extension to the S3 notification API)
 
 .. _PubSub Module : ../pubsub-module
index 32ff8bf7448a1dd775db9dd828baa0a108ba0e97..2833a8aa5a9cfdac34a3bd6f3d474661b2d3ea1c 100644 (file)
@@ -260,6 +260,7 @@ Detailed under: `Bucket Operations`_.
       the associated subscription will not be deleted automatically (any events of the deleted bucket could still be access),
       and will have to be deleted explicitly with the subscription deletion API
     - Filtering based on metadata (which is an extension to S3) is not supported, and such rules will be ignored
+    - Filtering based on tags (which is an extension to S3) is not supported, and such rules will be ignored
 
 
 Non S3-Compliant Notifications
@@ -474,7 +475,8 @@ the events will have an S3-compatible record format (JSON):
                    "eTag":"",
                    "versionId":"",
                    "sequencer":"",
-                   "metadata":[]
+                   "metadata":[],
+                   "tags":[]
                }
            },
            "eventId":"",
@@ -498,6 +500,7 @@ the events will have an S3-compatible record format (JSON):
 - s3.object.version: object version in case of versioned bucket
 - s3.object.sequencer: monotonically increasing identifier of the change per object (hexadecimal format)
 - s3.object.metadata: not supported (an extension to the S3 notification API)
+- s3.object.tags: not supported (an extension to the S3 notification API)
 - s3.eventId: unique ID of the event, that could be used for acking (an extension to the S3 notification API)
 
 In case that the subscription was not created via a non S3-compatible notification, 
index 6cc6ac0283f1d3cacdc088b086e4ac03dc7b5d74..ffbceca64d4e95e3c27ef50edd37ffb6cc663477 100644 (file)
@@ -49,6 +49,8 @@ Ceph's bucket notification API has the following extensions:
 
   - Filtering based on metadata attributes attached to the object
 
+  - Filtering based on object tags
+
 - Filtering overlapping is allowed, so that same event could be sent as different notification
 
 
index d520e379b37b13bed9e35d01d7f0e2b95ce1b45a..378eb5f044a262d99305ae5f1b7f29f9f5bf9eb1 100644 (file)
@@ -514,7 +514,13 @@ Parameters are XML encoded in the body of the request, in the following format:
                         <Name></Name>
                         <Value></Value>
                     </FilterRule>
-                </s3Metadata>
+                </S3Metadata>
+                <S3Tags>
+                    <FilterRule>
+                        <Name></Name>
+                        <Value></Value>
+                    </FilterRule>
+                </S3Tags>
             </Filter>
        </TopicConfiguration>
    </NotificationConfiguration>
@@ -533,15 +539,19 @@ Parameters are XML encoded in the body of the request, in the following format:
 | ``Event``                     | String    | List of supported events see: `S3 Notification Compatibility`_.  Multiple ``Event``  | No       |
 |                               |           | entities can be used. If omitted, all events are handled                             |          |
 +-------------------------------+-----------+--------------------------------------------------------------------------------------+----------+
-| ``Filter``                    | Container | Holding ``S3Key`` and ``S3Metadata`` entities                                        | No       |
+| ``Filter``                    | Container | Holding ``S3Key``, ``S3Metadata`` and ``S3Tags`` entities                            | No       |
 +-------------------------------+-----------+--------------------------------------------------------------------------------------+----------+
 | ``S3Key``                     | Container | Holding a list of ``FilterRule`` entities, for filtering based on object key.        | No       |
 |                               |           | At most, 3 entities may be in the list, with ``Name`` be ``prefix``, ``suffix`` or   |          |
 |                               |           | ``regex``. All filter rules in the list must match for the filter to match.          |          |
 +-------------------------------+-----------+--------------------------------------------------------------------------------------+----------+
 | ``S3Metadata``                | Container | Holding a list of ``FilterRule`` entities, for filtering based on object metadata.   | No       |
-|                               |           | All filter rules in the list must match the ones defined on the object. The object,  |          |
-|                               |           | have other metadata entitied not listed in the filter.                               |          |
+|                               |           | All filter rules in the list must match the metadata defined on the object. However, |          |
+|                               |           | the object still match if it has other metadata entries not listed in the filter.    |          |
++-------------------------------+-----------+--------------------------------------------------------------------------------------+----------+
+| ``S3Tags``                    | Container | Holding a list of ``FilterRule`` entities, for filtering based on object tags.       | No       |
+|                               |           | All filter rules in the list must match the tags defined on the object. However,     |          |
+|                               |           | the object still match it it has other tags not listed in the filter.                |          |
 +-------------------------------+-----------+--------------------------------------------------------------------------------------+----------+
 | ``S3Key.FilterRule``          | Container | Holding ``Name`` and ``Value`` entities. ``Name`` would  be: ``prefix``, ``suffix``  | Yes      |
 |                               |           | or ``regex``. The ``Value`` would hold the key prefix, key suffix or a regular       |          |
@@ -549,7 +559,10 @@ Parameters are XML encoded in the body of the request, in the following format:
 +-------------------------------+-----------+--------------------------------------------------------------------------------------+----------+
 | ``S3Metadata.FilterRule``     | Container | Holding ``Name`` and ``Value`` entities. ``Name`` would be the name of the metadata  | Yes      |
 |                               |           | attribute (e.g. ``x-amz-meta-xxx``). The ``Value`` would be the expected value for   |          | 
-|                               |           | this attribute                                                                       |          |
+|                               |           | this attribute.                                                                      |          |
++-------------------------------+-----------+--------------------------------------------------------------------------------------+----------+
+| ``S3Tags.FilterRule``         | Container | Holding ``Name`` and ``Value`` entities. ``Name`` would be the tag key,              |  Yes     |
+|                               |           | and ``Value`` would be the tag value.                                                |          | 
 +-------------------------------+-----------+--------------------------------------------------------------------------------------+----------+
 
 
@@ -652,7 +665,13 @@ Response is XML encoded in the body of the request, in the following format:
                         <Name></Name>
                         <Value></Value>
                     </FilterRule>
-                </s3Metadata>
+                </S3Metadata>
+                <S3Tags>
+                    <FilterRule>
+                        <Name></Name>
+                        <Value></Value>
+                    </FilterRule>
+                </S3Tags>
             </Filter>
        </TopicConfiguration>
    </NotificationConfiguration>
@@ -684,4 +703,4 @@ HTTP Response
 | ``404``       | NoSuchKey             | The notification does not exist (if provided)            |
 +---------------+-----------------------+----------------------------------------------------------+
 
-.. _S3 Notification Compatibility: ../s3-notification-compatibility
+.. _S3 Notification Compatibility: ../../s3-notification-compatibility
index 2c285261d319f295b0ef66008d5f557b96dbd384..2abbd2812fe57de9c9ad133e36c0d29334707eee 100644 (file)
@@ -1,5 +1,6 @@
 # Introduction
 This directory contains examples on how to use AWS CLI/boto3 to exercise the RadosGW extensions to the S3 API.
+This is an extension to the [AWS SDK](https://github.com/boto/botocore/blob/develop/botocore/data/s3/2006-03-01/service-2.json).
 
 # Users
 For the standard client to support these extensions, the: ``service-2.sdk-extras.json`` file should be placed under: ``~/.aws/models/s3/2006-03-01/`` directory.
index c9fd27e5a150a874c25454e9c88546d038ad472b..a45393c74f9b81cf13ee0fbac87d8b470efe0c72 100755 (executable)
@@ -35,6 +35,10 @@ topic_conf_list = [{'Id': notification_id,
                            'FilterRules': [{'Name': 'x-amz-meta-foo', 'Value': 'bar'},
                                             {'Name': 'x-amz-meta-hello', 'Value': 'world'}]
                          },
+                        'Tags': {
+                           'FilterRules': [{'Name': 'foo', 'Value': 'bar'},
+                                            {'Name': 'hello', 'Value': 'world'}]
+                         },
                          'Key': {
                              'FilterRules': [{'Name': 'regex', 'Value': '([a-z]+)'}]
                          }
index 671f802a49825ff71229e662deffc04e28dd5198..f69912c0bbec5f5a4a2da5a4d48d663122a50aa6 100644 (file)
                     "shape":"S3MetadataFilter",
                     "documentation":"<p/>",
                     "locationName":"S3Metadata"
+                },
+                "Tags":{
+                    "shape":"S3TagsFilter",
+                    "documentation":"<p/>",
+                    "locationName":"S3Tags"
                 }
+
             },
             "documentation":"<p>Specifies object key name filtering rules. For information about key name filtering, see <a href=\"https://docs.aws.amazon.com/AmazonS3/latest/dev/NotificationHowTo.html\">Configuring Event Notifications</a> in the <i>Amazon Simple Storage Service Developer Guide</i>.</p>"
         },
                 }
             },
             "documentation":"<p>A container for metadata filtering rules.</p>"
+        },
+        "S3TagsFilter":{
+            "type":"structure",
+            "members":{
+                "FilterRules":{
+                    "shape":"FilterRuleList",
+                    "documentation":"<p/>",
+                    "locationName":"FilterRule"
+                }
+            },
+            "documentation":"<p>A container for object tags filtering rules.</p>"
         }
     },
     "documentation":"<p/>"
index f7d0e11b8f45a9a34237da02d84a8b61ba1fa496..05615c6dc71d0ecc3b0ce5c5cfc8dbacd81eaaa6 100644 (file)
@@ -3,7 +3,7 @@
 
 #include <stdexcept>
 #include <include/types.h>
-
+#include <boost/container/flat_map.hpp>
 
 #include "json_spirit/json_spirit.h"
 
@@ -259,6 +259,22 @@ void decode_json_obj(std::multimap<K, V>& m, JSONObj *obj)
   }
 }
 
+template<class K, class V>
+void decode_json_obj(boost::container::flat_map<K, V>& m, JSONObj *obj)
+{
+  m.clear();
+
+  JSONObjIter iter = obj->find_first();
+
+  for (; !iter.end(); ++iter) {
+    K key;
+    V val;
+    JSONObj *o = *iter;
+    JSONDecoder::decode_json("key", key, o);
+    JSONDecoder::decode_json("val", val, o);
+    m[key] = val;
+  }
+}
 template<class C>
 void decode_json_obj(C& container, void (*cb)(C&, JSONObj *obj), JSONObj *obj)
 {
@@ -397,6 +413,7 @@ static void encode_json(const char *name, const std::list<T>& l, ceph::Formatter
   }
   f->close_section();
 }
+
 template<class T>
 static void encode_json(const char *name, const std::deque<T>& l, ceph::Formatter *f)
 {
@@ -406,6 +423,7 @@ static void encode_json(const char *name, const std::deque<T>& l, ceph::Formatte
   }
   f->close_section();
 }
+
 template<class T, class Compare = std::less<T> >
 static void encode_json(const char *name, const std::set<T, Compare>& l, ceph::Formatter *f)
 {
@@ -426,7 +444,7 @@ static void encode_json(const char *name, const std::vector<T>& l, ceph::Formatt
   f->close_section();
 }
 
-template<class K, class V, class C = std::less<K> >
+template<class K, class V, class C = std::less<K>>
 static void encode_json(const char *name, const std::map<K, V, C>& m, ceph::Formatter *f)
 {
   f->open_array_section(name);
@@ -451,6 +469,20 @@ static void encode_json(const char *name, const std::multimap<K, V>& m, ceph::Fo
   }
   f->close_section();
 }
+
+template<class K, class V>
+static void encode_json(const char *name, const boost::container::flat_map<K, V>& m, ceph::Formatter *f)
+{
+  f->open_array_section(name);
+  for (auto i = m.begin(); i != m.end(); ++i) {
+    f->open_object_section("entry");
+    encode_json("key", i->first, f);
+    encode_json("val", i->second, f);
+    f->close_section();
+  }
+  f->close_section();
+}
+
 template<class K, class V>
 void encode_json_map(const char *name, const std::map<K, V>& m, ceph::Formatter *f)
 {
index 1f8050da720e51ff7557a52a2d82dd9cf0e9a68b..93e46bc4621b21fc3b47f3cf22fa7d7a142d9fe7 100644 (file)
@@ -56,7 +56,7 @@ static const auto signed_subresources = {
  */
 
 static std::string
-get_canon_amz_hdr(const std::map<std::string, std::string>& meta_map)
+get_canon_amz_hdr(const meta_map_t& meta_map)
 {
   std::string dest;
 
@@ -117,8 +117,8 @@ void rgw_create_s3_canonical_header(
   const char* const content_md5,
   const char* const content_type,
   const char* const date,
-  const std::map<std::string, std::string>& meta_map,
-  const std::map<std::string, std::string>& qs_map,
+  const meta_map_t& meta_map,
+  const meta_map_t& qs_map,
   const char* const request_uri,
   const std::map<std::string, std::string>& sub_resources,
   std::string& dest_str)
@@ -157,7 +157,7 @@ static inline bool is_base64_for_content_md5(unsigned char c) {
 }
 
 static inline void get_v2_qs_map(const req_info& info,
-                                std::map<std::string, std::string>& qs_map) {
+                                meta_map_t& qs_map) {
   const auto& params = const_cast<RGWHTTPArgs&>(info.args).get_params();
   for (const auto& elt : params) {
     std::string k = boost::algorithm::to_lower_copy(elt.first);
@@ -190,7 +190,7 @@ bool rgw_create_s3_canonical_header(const req_info& info,
   const char *content_type = info.env->get("CONTENT_TYPE");
 
   std::string date;
-  std::map<std::string, std::string> qs_map;
+  meta_map_t qs_map;
 
   if (qsr) {
     get_v2_qs_map(info, qs_map); // handle qs metadata
index 67a01b4e9c846787f02b53de806a132a8e9becbc..9e79ee162136e15fdb48d2b88d92c5f2158e08e5 100644 (file)
@@ -421,8 +421,8 @@ void rgw_create_s3_canonical_header(
   const char *content_md5,
   const char *content_type,
   const char *date,
-  const std::map<std::string, std::string>& meta_map,
-  const std::map<std::string, std::string>& qs_map,
+  const meta_map_t& meta_map,
+  const meta_map_t& qs_map,
   const char *request_uri,
   const std::map<std::string, std::string>& sub_resources,
   std::string& dest_str);
index 6e394bb015175dbbb239f4ecd3a5e0f59bc4ed3a..65421d6e130c26080925c3623d9d73f9137583e5 100644 (file)
@@ -435,7 +435,7 @@ std::ostream& operator<<(std::ostream& oss, const rgw_err &err)
 }
 
 void rgw_add_amz_meta_header(
-  std::map<std::string, std::string>& x_meta_map,
+  meta_map_t& x_meta_map,
   const std::string& k,
   const std::string& v)
 {
index a8ea073512a34e506ecba3fb4502573e84b861a6..6af3de23af65fb196adacf344186fd098352b4d2 100644 (file)
@@ -1648,11 +1648,12 @@ namespace rgw {
   }
 }
 
+using meta_map_t = boost::container::flat_map <std::string, std::string>;
 
 struct req_info {
   const RGWEnv *env;
   RGWHTTPArgs args;
-  map<string, string> x_meta_map;
+  meta_map_t x_meta_map;
 
   string host;
   const char *method;
@@ -2456,7 +2457,7 @@ static inline uint64_t rgw_rounded_objsize_kb(uint64_t bytes)
 /* implement combining step, S3 header canonicalization;  k is a
  * valid header and in lc form */
 void rgw_add_amz_meta_header(
-  std::map<std::string, std::string>& x_meta_map,
+  meta_map_t& x_meta_map,
   const std::string& k,
   const std::string& v);
 
index 4085f739707038c43ce80aa5ecb19ef7c7f46786..a8e6fd7b2bf6550583b73152c7ced0eda1c3649b 100644 (file)
@@ -18,7 +18,7 @@ void RGWLoadGenRequestEnv::set_date(utime_t& tm)
 
 int RGWLoadGenRequestEnv::sign(RGWAccessKey& access_key)
 {
-  map<string, string> meta_map;
+  meta_map_t meta_map;
   map<string, string> sub_resources;
 
   string canonical_header;
@@ -29,7 +29,7 @@ int RGWLoadGenRequestEnv::sign(RGWAccessKey& access_key)
                                  content_type.c_str(),
                                  date_str.c_str(),
                                  meta_map,
-                                map<string, string>{},
+                                                meta_map_t{},
                                  uri.c_str(),
                                  sub_resources,
                                  canonical_header);
index 11ca0256462da3779983f179bad15606cf51f8b9..0c41b679a1170e4bdad5e1133eb6aae57987fe36 100644 (file)
@@ -40,6 +40,8 @@ void populate_record_from_request(const req_state *s,
   record.bucket_id = s->bucket.bucket_id;
   // pass meta data
   record.x_meta_map = s->info.x_meta_map;
+  // pass tags
+  record.tags = s->tagset.get_tags();
 }
 
 bool match(const rgw_pubsub_topic_filter& filter, const req_state* s, EventType event) {
@@ -52,6 +54,9 @@ bool match(const rgw_pubsub_topic_filter& filter, const req_state* s, EventType
   if (!::match(filter.s3_filter.metadata_filter, s->info.x_meta_map)) {
     return false;
   }
+  if (!::match(filter.s3_filter.tag_filter, s->tagset.get_tags())) {
+    return false;
+  }
   return true;
 }
 
index ef6e7b0d9bd4be30b42befa28a8dc98726ca28a4..504973417b3aefe453d6802f100c632f7bb2cc14 100644 (file)
@@ -980,6 +980,26 @@ int RGWGetObj::verify_permission()
   return 0;
 }
 
+// cache the objects tags into the requests
+// use inside try/catch as "decode()" may throw
+void populate_tags_in_request(req_state* s, const std::map<std::string, bufferlist>& attrs) {
+  const auto attr_iter = attrs.find(RGW_ATTR_TAGS);
+  if (attr_iter != attrs.end()) {
+    auto bliter = attr_iter->second.cbegin();
+    decode(s->tagset, bliter);
+  }
+}
+
+// cache the objects metadata into the request
+void populate_metadata_in_request(req_state* s, std::map<std::string, bufferlist>& attrs) {
+  for (auto& attr : attrs) {
+    if (boost::algorithm::starts_with(attr.first, RGW_ATTR_META_PREFIX)) {
+      std::string_view key(attr.first);
+      key.remove_prefix(sizeof(RGW_ATTR_PREFIX)-1);
+      s->info.x_meta_map.emplace(key, attr.second.c_str());
+    }
+  }
+}
 
 int RGWOp::verify_op_mask()
 {
@@ -4818,6 +4838,15 @@ void RGWDeleteObj::execute()
     if (op_ret == -ERR_PRECONDITION_FAILED && no_precondition_error) {
       op_ret = 0;
     }
+
+    // cache the objects tags and metadata into the requests
+    // so it could be used in the notification mechanism
+    try {
+      populate_tags_in_request(s, attrs);
+    } catch (buffer::error& err) {
+      ldpp_dout(this, 5) << "WARNING: failed to populate delete request with object tags: " << err.what() << dendl;
+    }
+    populate_metadata_in_request(s, attrs);
   } else {
     op_ret = -EINVAL;
   }
@@ -5109,33 +5138,33 @@ void RGWCopyObj::execute()
   }
 
   op_ret = store->getRados()->copy_obj(obj_ctx,
-                          s->user->user_id,
-                          &s->info,
-                          source_zone,
-                          dst_obj,
-                          src_obj,
-                          dest_bucket_info,
-                          src_bucket_info,
-                           s->dest_placement,
-                          &src_mtime,
-                          &mtime,
-                          mod_ptr,
-                          unmod_ptr,
-                           high_precision_time,
-                          if_match,
-                          if_nomatch,
-                          attrs_mod,
-                           copy_if_newer,
-                          attrs, RGWObjCategory::Main,
-                          olh_epoch,
-                          (delete_at ? *delete_at : real_time()),
-                          (version_id.empty() ? NULL : &version_id),
-                          &s->req_id, /* use req_id as tag */
-                          &etag,
-                          copy_obj_progress_cb, (void *)this, 
-                           this,
-                           s->yield);
-  
+          s->user->user_id,
+          &s->info,
+          source_zone,
+          dst_obj,
+          src_obj,
+          dest_bucket_info,
+          src_bucket_info,
+          s->dest_placement,
+          &src_mtime,
+          &mtime,
+          mod_ptr,
+          unmod_ptr,
+          high_precision_time,
+          if_match,
+          if_nomatch,
+          attrs_mod,
+          copy_if_newer,
+          attrs, RGWObjCategory::Main,
+          olh_epoch,
+          (delete_at ? *delete_at : real_time()),
+          (version_id.empty() ? NULL : &version_id),
+          &s->req_id, /* use req_id as tag */
+          &etag,
+          copy_obj_progress_cb, (void *)this,
+            this,
+            s->yield);
+
   const auto ret = rgw::notify::publish(s, mtime, etag, rgw::notify::ObjectCreatedCopy, store);
   if (ret < 0) {
     ldpp_dout(this, 5) << "WARNING: publishing notification failed, with error: " << ret << dendl;
index 4fac3ae07272c84878891df1efdbdeb864ef8b71..3236d525f4913c190bbe60a1945da6d6142a589f 100644 (file)
@@ -76,8 +76,8 @@ bool rgw_s3_key_filter::has_content() const {
     return !(prefix_rule.empty() && suffix_rule.empty() && regex_rule.empty());
 }
 
-bool rgw_s3_metadata_filter::decode_xml(XMLObj* obj) {
-  metadata.clear();
+bool rgw_s3_key_value_filter::decode_xml(XMLObj* obj) {
+  kvl.clear();
   XMLObjIter iter = obj->find("FilterRule");
   XMLObj *o;
 
@@ -89,13 +89,13 @@ bool rgw_s3_metadata_filter::decode_xml(XMLObj* obj) {
   while ((o = iter.get_next())) {
     RGWXMLDecoder::decode_xml("Name", key, o, throw_if_missing);
     RGWXMLDecoder::decode_xml("Value", value, o, throw_if_missing);
-    metadata.emplace(key, value);
+    kvl.emplace(key, value);
   }
   return true;
 }
 
-void rgw_s3_metadata_filter::dump_xml(Formatter *f) const {
-  for (const auto& key_value : metadata) {
+void rgw_s3_key_value_filter::dump_xml(Formatter *f) const {
+  for (const auto& key_value : kvl) {
     f->open_object_section("FilterRule");
     ::encode_xml("Name", key_value.first, f);
     ::encode_xml("Value", key_value.second, f);
@@ -103,13 +103,14 @@ void rgw_s3_metadata_filter::dump_xml(Formatter *f) const {
   }
 }
 
-bool rgw_s3_metadata_filter::has_content() const {
-    return !metadata.empty();
+bool rgw_s3_key_value_filter::has_content() const {
+    return !kvl.empty();
 }
 
 bool rgw_s3_filter::decode_xml(XMLObj* obj) {
     RGWXMLDecoder::decode_xml("S3Key", key_filter, obj);
     RGWXMLDecoder::decode_xml("S3Metadata", metadata_filter, obj);
+    RGWXMLDecoder::decode_xml("S3Tags", tag_filter, obj);
   return true;
 }
 
@@ -120,11 +121,15 @@ void rgw_s3_filter::dump_xml(Formatter *f) const {
   if (metadata_filter.has_content()) {
       ::encode_xml("S3Metadata", metadata_filter, f);
   }
+  if (tag_filter.has_content()) {
+      ::encode_xml("S3Tags", tag_filter, f);
+  }
 }
 
 bool rgw_s3_filter::has_content() const {
     return key_filter.has_content()  ||
-           metadata_filter.has_content();
+           metadata_filter.has_content() ||
+           tag_filter.has_content();
 }
 
 bool match(const rgw_s3_key_filter& filter, const std::string& key) {
@@ -161,10 +166,10 @@ bool match(const rgw_s3_key_filter& filter, const std::string& key) {
   return true;
 }
 
-bool match(const rgw_s3_metadata_filter& filter, const Metadata& metadata) {
-  // all filter pairs must exist with the same value in the object's metadata
-  // object metadata may include items not in the filter
-  return std::includes(metadata.begin(), metadata.end(), filter.metadata.begin(), filter.metadata.end());
+bool match(const rgw_s3_key_value_filter& filter, const KeyValueList& kvl) {
+  // all filter pairs must exist with the same value in the object's metadata/tags
+  // object metadata/tags may include items not in the filter
+  return std::includes(kvl.begin(), kvl.end(), filter.kvl.begin(), filter.kvl.end());
 }
 
 bool match(const rgw::notify::EventTypeList& events, rgw::notify::EventType event) {
@@ -273,6 +278,7 @@ void rgw_pubsub_s3_record::dump(Formatter *f) const {
         encode_json("versionId", object_versionId, f);
         encode_json("sequencer", object_sequencer, f);
         encode_json("metadata", x_meta_map, f);
+        encode_json("tags", tags, f);
     }
   }
   encode_json("eventId", id, f);
index a6f97ea6ff1102a9efe65c851e87434942afa8f1..0cc7b05b07dc667e57baf092dbc3b6b8b7765945 100644 (file)
@@ -10,6 +10,7 @@
 #include "rgw_zone.h"
 #include "rgw_rados.h"
 #include "rgw_notify_event_type.h"
+#include <boost/container/flat_map.hpp>
 
 class XMLObj;
 
@@ -41,10 +42,10 @@ struct rgw_s3_key_filter {
 };
 WRITE_CLASS_ENCODER(rgw_s3_key_filter)
 
-using Metadata = std::map<std::string, std::string>;
+using KeyValueList = boost::container::flat_map<std::string, std::string>;
 
-struct rgw_s3_metadata_filter {
-  Metadata metadata;
+struct rgw_s3_key_value_filter {
+  KeyValueList kvl;
   
   bool has_content() const;
   
@@ -53,20 +54,21 @@ struct rgw_s3_metadata_filter {
   
   void encode(bufferlist& bl) const {
     ENCODE_START(1, 1, bl);
-    encode(metadata, bl);
+    encode(kvl, bl);
     ENCODE_FINISH(bl);
   }
   void decode(bufferlist::const_iterator& bl) {
     DECODE_START(1, bl);
-    decode(metadata, bl);
+    decode(kvl, bl);
     DECODE_FINISH(bl);
   }
 };
-WRITE_CLASS_ENCODER(rgw_s3_metadata_filter)
+WRITE_CLASS_ENCODER(rgw_s3_key_value_filter)
 
 struct rgw_s3_filter {
   rgw_s3_key_filter key_filter;
-  rgw_s3_metadata_filter metadata_filter;
+  rgw_s3_key_value_filter metadata_filter;
+  rgw_s3_key_value_filter tag_filter;
 
   bool has_content() const;
   
@@ -74,16 +76,20 @@ struct rgw_s3_filter {
   void dump_xml(Formatter *f) const;
   
   void encode(bufferlist& bl) const {
-    ENCODE_START(1, 1, bl);
+    ENCODE_START(2, 1, bl);
     encode(key_filter, bl);
     encode(metadata_filter, bl);
+    encode(tag_filter, bl);
     ENCODE_FINISH(bl);
   }
 
   void decode(bufferlist::const_iterator& bl) {
-    DECODE_START(1, bl);
+    DECODE_START(2, bl);
     decode(key_filter, bl);
     decode(metadata_filter, bl);
+    if (struct_v >= 2) {
+        decode(tag_filter, bl);
+    }
     DECODE_FINISH(bl);
   }
 };
@@ -109,6 +115,12 @@ class rgw_pubsub_topic_filter;
           <Value></Value>
         </FilterRule>
       </S3Metadata>
+      <S3Tags>
+        <FilterRule>
+          <Name></Name>
+          <Value></Value>
+        </FilterRule>
+      </S3Tags>
     </Filter>
     <Id>notification1</Id>
     <Topic>arn:aws:sns:<region>:<account>:<topic></Topic>
@@ -132,13 +144,13 @@ struct rgw_pubsub_s3_notification {
 
   rgw_pubsub_s3_notification() = default;
   // construct from rgw_pubsub_topic_filter (used by get/list notifications)
-  rgw_pubsub_s3_notification(const rgw_pubsub_topic_filter& topic_filter);
+  explicit rgw_pubsub_s3_notification(const rgw_pubsub_topic_filter& topic_filter);
 };
 
 // return true if the key matches the prefix/suffix/regex rules of the key filter
 bool match(const rgw_s3_key_filter& filter, const std::string& key);
-// return true if the key matches the metadata rules of the metadata filter
-bool match(const rgw_s3_metadata_filter& filter, const Metadata& metadata);
+// return true if the key matches the metadata/tags rules of the metadata/tags filter
+bool match(const rgw_s3_key_value_filter& filter, const KeyValueList& kvl);
 // return true if the event type matches (equal or contained in) one of the events in the list
 bool match(const rgw::notify::EventTypeList& events, rgw::notify::EventType event);
 
@@ -186,6 +198,7 @@ struct rgw_pubsub_s3_notifications {
         "versionId":"",
         "sequencer": "",
         "metadata": ""
+        "tags": ""
       }
     },
     "eventId":"",
@@ -238,10 +251,12 @@ struct rgw_pubsub_s3_record {
   // this is an rgw extension holding the internal bucket id
   std::string bucket_id;
   // meta data
-  std::map<std::string, std::string> x_meta_map;
+  KeyValueList x_meta_map;
+  // tags
+  KeyValueList tags;
 
   void encode(bufferlist& bl) const {
-    ENCODE_START(2, 1, bl);
+    ENCODE_START(3, 1, bl);
     encode(eventVersion, bl);
     encode(eventSource, bl);
     encode(awsRegion, bl);
@@ -264,11 +279,12 @@ struct rgw_pubsub_s3_record {
     encode(id, bl);
     encode(bucket_id, bl);
     encode(x_meta_map, bl);
+    encode(tags, bl);
     ENCODE_FINISH(bl);
   }
 
   void decode(bufferlist::const_iterator& bl) {
-    DECODE_START(2, bl);
+    DECODE_START(3, bl);
     decode(eventVersion, bl);
     decode(eventSource, bl);
     decode(awsRegion, bl);
@@ -293,6 +309,9 @@ struct rgw_pubsub_s3_record {
         decode(bucket_id, bl);
         decode(x_meta_map, bl);
     }
+    if (struct_v >= 3) {
+        decode(tags, bl);
+    }
     DECODE_FINISH(bl);
   }
 
@@ -604,7 +623,7 @@ public:
     // read the list of topics associated with a bucket and populate into result
     // return 0 on success or if no topic was associated with the bucket, error code otherwise
     int get_topics(rgw_pubsub_bucket_topics *result);
-    // adds a topic + filter (event list, and possibly name and metadata filters) to a bucket
+    // adds a topic + filter (event list, and possibly name metadata or tags filters) to a bucket
     // assigning a notification name is optional (needed for S3 compatible notifications)
     // if the topic already exist on the bucket, the filter event list may be updated
     // for S3 compliant notifications the version with: s3_filter and notif_name should be used
index de34de6837b3e08ea3cfcb33a2088e1cbe5abb04..eab3f7a5321024c92f67348e79a4fea28aebb3b3 100644 (file)
@@ -137,7 +137,7 @@ int RGWRESTSimpleRequest::execute(RGWAccessKey& key, const char *_method, const
   headers.push_back(pair<string, string>("HTTP_DATE", date_str));
 
   string canonical_header;
-  map<string, string> meta_map;
+  meta_map_t meta_map;
   map<string, string> sub_resources;
 
   rgw_create_s3_canonical_header(method.c_str(), NULL, NULL, date_str.c_str(),
@@ -292,7 +292,7 @@ int RGWRESTSimpleRequest::forward_request(RGWAccessKey& key, req_info& info, siz
     headers.emplace_back(kv);
   }
 
-  map<string, string>& meta_map = new_info.x_meta_map;
+  meta_map_t& meta_map = new_info.x_meta_map;
   for (const auto& kv: meta_map) {
     headers.emplace_back(kv);
   }
@@ -426,7 +426,7 @@ static void grants_by_type_add_perm(map<int, string>& grants_by_type, int perm,
   }
 }
 
-static void add_grants_headers(map<int, string>& grants, RGWEnv& env, map<string, string>& meta_map)
+static void add_grants_headers(map<int, string>& grants, RGWEnv& env, meta_map_t& meta_map)
 {
   struct grant_type_to_header *t;
 
index bedd189bc630804a47a49d04b659d0d49aa37ade..5133c785275c964816888412da14d4ec7f27cd54 100644 (file)
@@ -20,10 +20,19 @@ from .tests import get_realm, \
     gen_bucket_name, \
     get_user, \
     get_tenant
-from .zone_ps import PSTopic, PSTopicS3, PSNotification, PSSubscription, PSNotificationS3, print_connection_info, delete_all_s3_topics
+from .zone_ps import PSTopic, \
+    PSTopicS3, \
+    PSNotification, \
+    PSSubscription, \
+    PSNotificationS3, \
+    print_connection_info, \
+    delete_all_s3_topics, \
+    put_object_tagging, \
+    get_object_tagging
 from multisite import User
 from nose import SkipTest
 from nose.tools import assert_not_equal, assert_equal
+import boto.s3.tagging
 
 # configure logging for the tests module
 log = logging.getLogger(__name__)
@@ -2399,30 +2408,152 @@ def test_ps_s3_metadata_on_master():
     topic_arn = topic_conf.set_config()
     # create s3 notification
     notification_name = bucket_name + NOTIFICATION_SUFFIX
+    meta_key = 'meta1'
+    meta_value = 'This is my metadata value'
+    meta_prefix = 'x-amz-meta-'
     topic_conf_list = [{'Id': notification_name,'TopicArn': topic_arn,
-                        'Events': ['s3:ObjectCreated:*']
-                       }]
+        'Events': ['s3:ObjectCreated:*', 's3:ObjectRemoved:*'],
+        'Filter': {
+            'Metadata': {
+                'FilterRules': [{'Name': meta_prefix+meta_key, 'Value': meta_value}]
+            }
+        }
+    }]
 
     s3_notification_conf = PSNotificationS3(zones[0].conn, bucket_name, topic_conf_list)
     response, status = s3_notification_conf.set_config()
     assert_equal(status/100, 2)
 
     # create objects in the bucket
-    key = bucket.new_key('foo')
-    key.set_metadata('meta1', 'This is my metadata value')
+    key_name = 'foo'
+    key = bucket.new_key(key_name)
+    key.set_metadata(meta_key, meta_value)
     key.set_contents_from_string('aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa')
-    keys = list(bucket.list())
+    
+    # create objects in the bucket using COPY
+    bucket.copy_key('copy_of_foo', bucket.name, key.name)
+    # create objects in the bucket using multi-part upload
+    fp = tempfile.TemporaryFile(mode='w')
+    fp.write('bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb')
+    fp.close()
+    uploader = bucket.initiate_multipart_upload('multipart_foo', 
+            metadata={meta_key: meta_value})
+    fp = tempfile.TemporaryFile(mode='r')
+    uploader.upload_part_from_file(fp, 1)
+    uploader.complete_upload()
+    fp.close()
     print('wait for 5sec for the messages...')
     time.sleep(5)
     # check amqp receiver
-    receiver.verify_s3_events(keys, exact_match=True)
+    event_count = 0
+    for event in receiver.get_and_reset_events():
+        assert_equal(event['s3']['object']['metadata'][0]['key'], meta_prefix+meta_key)
+        assert_equal(event['s3']['object']['metadata'][0]['val'], meta_value)
+        event_count +=1
+
+    # only PUT and POST has the metadata value
+    assert_equal(event_count, 2)
+
+    # delete objects
+    for key in bucket.list():
+        key.delete()
+    print('wait for 5sec for the messages...')
+    time.sleep(5)
+    # check amqp receiver
+    event_count = 0
+    for event in receiver.get_and_reset_events():
+        assert_equal(event['s3']['object']['metadata'][0]['key'], meta_prefix+meta_key)
+        assert_equal(event['s3']['object']['metadata'][0]['val'], meta_value)
+        event_count +=1
+
+    # all 3 object has metadata when deleted
+    assert_equal(event_count, 3)
 
     # cleanup
     stop_amqp_receiver(receiver, task)
     s3_notification_conf.del_config()
     topic_conf.del_config()
+    # delete the bucket
+    zones[0].delete_bucket(bucket_name)
+    clean_rabbitmq(proc)
+
+
+def test_ps_s3_tags_on_master():
+    """ test s3 notification of tags on master """
+    if skip_push_tests:
+        return SkipTest("PubSub push tests don't run in teuthology")
+    hostname = get_ip()
+    proc = init_rabbitmq()
+    if proc is  None:
+        return SkipTest('end2end amqp tests require rabbitmq-server installed')
+    zones, _  = init_env(require_ps=False)
+    realm = get_realm()
+    zonegroup = realm.master_zonegroup()
+    
+    # create bucket
+    bucket_name = gen_bucket_name()
+    bucket = zones[0].create_bucket(bucket_name)
+    topic_name = bucket_name + TOPIC_SUFFIX
+
+    # start amqp receiver
+    exchange = 'ex1'
+    task, receiver = create_amqp_receiver_thread(exchange, topic_name)
+    task.start()
+
+    # create s3 topic
+    endpoint_address = 'amqp://' + hostname
+    endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=broker'
+    topic_conf = PSTopicS3(zones[0].conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
+    topic_arn = topic_conf.set_config()
+    # create s3 notification
+    notification_name = bucket_name + NOTIFICATION_SUFFIX
+    topic_conf_list = [{'Id': notification_name,'TopicArn': topic_arn,
+        'Events': ['s3:ObjectCreated:*', 's3:ObjectRemoved:*'],
+        'Filter': {
+            'Tags': {
+                'FilterRules': [{'Name': 'hello', 'Value': 'world'}]
+            }
+        }
+    }]
+
+    s3_notification_conf = PSNotificationS3(zones[0].conn, bucket_name, topic_conf_list)
+    response, status = s3_notification_conf.set_config()
+    assert_equal(status/100, 2)
+
+    # create objects in the bucket with tags
+    tags = 'hello=world&ka=boom'
+    key_name1 = 'key1'
+    put_object_tagging(zones[0].conn, bucket_name, key_name1, tags)
+    tags = 'foo=bar&ka=boom'
+    key_name2 = 'key2'
+    put_object_tagging(zones[0].conn, bucket_name, key_name2, tags)
+    key_name3 = 'key3'
+    key = bucket.new_key(key_name3)
+    key.set_contents_from_string('bar')
+    # create objects in the bucket using COPY
+    bucket.copy_key('copy_of_'+key_name1, bucket.name, key_name1)
+    print('wait for 5sec for the messages...')
+    time.sleep(5)
+    expected_tags = [{'val': 'world', 'key': 'hello'}, {'val': 'boom', 'key': 'ka'}]
+    # check amqp receiver
+    for event in receiver.get_and_reset_events():
+        obj_tags =  event['s3']['object']['tags']
+        assert_equal(obj_tags[0], expected_tags[0])
+
+    # delete the objects
     for key in bucket.list():
         key.delete()
+    print('wait for 5sec for the messages...')
+    time.sleep(5)
+    # check amqp receiver
+    for event in receiver.get_and_reset_events():
+        obj_tags =  event['s3']['object']['tags']
+        assert_equal(obj_tags[0], expected_tags[0])
+
+    # cleanup
+    stop_amqp_receiver(receiver, task)
+    s3_notification_conf.del_config()
+    topic_conf.del_config()
     # delete the bucket
     zones[0].delete_bucket(bucket_name)
     clean_rabbitmq(proc)
index cfdf480ee0b5d1d5f52bfb3f2a9b3b1663522495..260260628b4bf066a0d3113ff86074970b818fe6 100644 (file)
@@ -13,6 +13,26 @@ from botocore.client import Config
 
 log = logging.getLogger('rgw_multi.tests')
 
+def put_object_tagging(conn, bucket_name, key, tags):
+    client = boto3.client('s3',
+            endpoint_url='http://'+conn.host+':'+str(conn.port),
+            aws_access_key_id=conn.aws_access_key_id,
+            aws_secret_access_key=conn.aws_secret_access_key,
+            config=Config(signature_version='s3'))
+    return client.put_object(Body='aaaaaaaaaaa', Bucket=bucket_name, Key=key, Tagging=tags)
+
+
+def get_object_tagging(conn, bucket, object_key):
+    client = boto3.client('s3',
+            endpoint_url='http://'+conn.host+':'+str(conn.port),
+            aws_access_key_id=conn.aws_access_key_id,
+            aws_secret_access_key=conn.aws_secret_access_key,
+            config=Config(signature_version='s3'))
+    return client.get_object_tagging(
+                Bucket=bucket, 
+                Key=object_key
+            )
+
 
 class PSZone(Zone):  # pylint: disable=too-many-ancestors
     """ PubSub zone class """