In order to send notifications for events for a specific bucket, a notification entity needs to be created. A
notification can be created on a subset of event types, or for all event types (default).
-The notification may also filter out events based on preffix/suffix and/or regular expression matching of the keys. As well as,
-on the metadata attributes attached to the object.
+The notification may also filter out events based on prefix/suffix and/or regular expression matching of the keys. As well as,
+on the metadata attributes attached to the object, or the object tags.
There can be multiple notifications for any specific topic, and the same topic could be used for multiple notifications.
REST API has been defined to provide configuration and control interfaces for the bucket notification
"eTag":"",
"versionId":"",
"sequencer": "",
- "metadata":[]
+ "metadata":[],
+ "tags":[]
}
},
"eventId":"",
- s3.object.version: object version in case of versioned bucket
- s3.object.sequencer: monotonically increasing identifier of the change per object (hexadecimal format)
- s3.object.metadata: any metadata set on the object sent as: ``x-amz-meta-`` (an extension to the S3 notification API)
+- s3.object.tags: any tags set on the objcet (an extension to the S3 notification API)
- s3.eventId: unique ID of the event, that could be used for acking (an extension to the S3 notification API)
.. _PubSub Module : ../pubsub-module
the associated subscription will not be deleted automatically (any events of the deleted bucket could still be access),
and will have to be deleted explicitly with the subscription deletion API
- Filtering based on metadata (which is an extension to S3) is not supported, and such rules will be ignored
+ - Filtering based on tags (which is an extension to S3) is not supported, and such rules will be ignored
Non S3-Compliant Notifications
"eTag":"",
"versionId":"",
"sequencer":"",
- "metadata":[]
+ "metadata":[],
+ "tags":[]
}
},
"eventId":"",
- s3.object.version: object version in case of versioned bucket
- s3.object.sequencer: monotonically increasing identifier of the change per object (hexadecimal format)
- s3.object.metadata: not supported (an extension to the S3 notification API)
+- s3.object.tags: not supported (an extension to the S3 notification API)
- s3.eventId: unique ID of the event, that could be used for acking (an extension to the S3 notification API)
In case that the subscription was not created via a non S3-compatible notification,
- Filtering based on metadata attributes attached to the object
+ - Filtering based on object tags
+
- Filtering overlapping is allowed, so that same event could be sent as different notification
<Name></Name>
<Value></Value>
</FilterRule>
- </s3Metadata>
+ </S3Metadata>
+ <S3Tags>
+ <FilterRule>
+ <Name></Name>
+ <Value></Value>
+ </FilterRule>
+ </S3Tags>
</Filter>
</TopicConfiguration>
</NotificationConfiguration>
| ``Event`` | String | List of supported events see: `S3 Notification Compatibility`_. Multiple ``Event`` | No |
| | | entities can be used. If omitted, all events are handled | |
+-------------------------------+-----------+--------------------------------------------------------------------------------------+----------+
-| ``Filter`` | Container | Holding ``S3Key`` and ``S3Metadata`` entities | No |
+| ``Filter`` | Container | Holding ``S3Key``, ``S3Metadata`` and ``S3Tags`` entities | No |
+-------------------------------+-----------+--------------------------------------------------------------------------------------+----------+
| ``S3Key`` | Container | Holding a list of ``FilterRule`` entities, for filtering based on object key. | No |
| | | At most, 3 entities may be in the list, with ``Name`` be ``prefix``, ``suffix`` or | |
| | | ``regex``. All filter rules in the list must match for the filter to match. | |
+-------------------------------+-----------+--------------------------------------------------------------------------------------+----------+
| ``S3Metadata`` | Container | Holding a list of ``FilterRule`` entities, for filtering based on object metadata. | No |
-| | | All filter rules in the list must match the ones defined on the object. The object, | |
-| | | have other metadata entitied not listed in the filter. | |
+| | | All filter rules in the list must match the metadata defined on the object. However, | |
+| | | the object still match if it has other metadata entries not listed in the filter. | |
++-------------------------------+-----------+--------------------------------------------------------------------------------------+----------+
+| ``S3Tags`` | Container | Holding a list of ``FilterRule`` entities, for filtering based on object tags. | No |
+| | | All filter rules in the list must match the tags defined on the object. However, | |
+| | | the object still match it it has other tags not listed in the filter. | |
+-------------------------------+-----------+--------------------------------------------------------------------------------------+----------+
| ``S3Key.FilterRule`` | Container | Holding ``Name`` and ``Value`` entities. ``Name`` would be: ``prefix``, ``suffix`` | Yes |
| | | or ``regex``. The ``Value`` would hold the key prefix, key suffix or a regular | |
+-------------------------------+-----------+--------------------------------------------------------------------------------------+----------+
| ``S3Metadata.FilterRule`` | Container | Holding ``Name`` and ``Value`` entities. ``Name`` would be the name of the metadata | Yes |
| | | attribute (e.g. ``x-amz-meta-xxx``). The ``Value`` would be the expected value for | |
-| | | this attribute | |
+| | | this attribute. | |
++-------------------------------+-----------+--------------------------------------------------------------------------------------+----------+
+| ``S3Tags.FilterRule`` | Container | Holding ``Name`` and ``Value`` entities. ``Name`` would be the tag key, | Yes |
+| | | and ``Value`` would be the tag value. | |
+-------------------------------+-----------+--------------------------------------------------------------------------------------+----------+
<Name></Name>
<Value></Value>
</FilterRule>
- </s3Metadata>
+ </S3Metadata>
+ <S3Tags>
+ <FilterRule>
+ <Name></Name>
+ <Value></Value>
+ </FilterRule>
+ </S3Tags>
</Filter>
</TopicConfiguration>
</NotificationConfiguration>
| ``404`` | NoSuchKey | The notification does not exist (if provided) |
+---------------+-----------------------+----------------------------------------------------------+
-.. _S3 Notification Compatibility: ../s3-notification-compatibility
+.. _S3 Notification Compatibility: ../../s3-notification-compatibility
#define CEPH_JSON_H
#include <include/types.h>
+#include <boost/container/flat_map.hpp>
#ifdef _ASSERT_H
#define NEED_ASSERT_H
}
}
+template<class K, class V>
+void decode_json_obj(boost::container::flat_map<K, V>& m, JSONObj *obj)
+{
+ m.clear();
+
+ JSONObjIter iter = obj->find_first();
+
+ for (; !iter.end(); ++iter) {
+ K key;
+ V val;
+ JSONObj *o = *iter;
+ JSONDecoder::decode_json("key", key, o);
+ JSONDecoder::decode_json("val", val, o);
+ m[key] = val;
+ }
+}
template<class C>
void decode_json_obj(C& container, void (*cb)(C&, JSONObj *obj), JSONObj *obj)
{
}
f->close_section();
}
+
template<class T>
static void encode_json(const char *name, const std::deque<T>& l, ceph::Formatter *f)
{
}
f->close_section();
}
+
template<class T, class Compare = std::less<T> >
static void encode_json(const char *name, const std::set<T, Compare>& l, ceph::Formatter *f)
{
f->close_section();
}
-template<class K, class V, class C = std::less<K> >
+template<class K, class V, class C = std::less<K>>
static void encode_json(const char *name, const std::map<K, V, C>& m, ceph::Formatter *f)
{
f->open_array_section(name);
}
f->close_section();
}
+
+template<class K, class V>
+static void encode_json(const char *name, const boost::container::flat_map<K, V>& m, ceph::Formatter *f)
+{
+ f->open_array_section(name);
+ for (auto i = m.begin(); i != m.end(); ++i) {
+ f->open_object_section("entry");
+ encode_json("key", i->first, f);
+ encode_json("val", i->second, f);
+ f->close_section();
+ }
+ f->close_section();
+}
+
template<class K, class V>
void encode_json_map(const char *name, const map<K, V>& m, ceph::Formatter *f)
{
*/
static std::string
-get_canon_amz_hdr(const std::map<std::string, std::string>& meta_map)
+get_canon_amz_hdr(const meta_map_t& meta_map)
{
std::string dest;
const char* const content_md5,
const char* const content_type,
const char* const date,
- const std::map<std::string, std::string>& meta_map,
- const std::map<std::string, std::string>& qs_map,
+ const meta_map_t& meta_map,
+ const meta_map_t& qs_map,
const char* const request_uri,
const std::map<std::string, std::string>& sub_resources,
std::string& dest_str)
}
static inline void get_v2_qs_map(const req_info& info,
- std::map<std::string, std::string>& qs_map) {
+ meta_map_t& qs_map) {
const auto& params = const_cast<RGWHTTPArgs&>(info.args).get_params();
for (const auto& elt : params) {
std::string k = boost::algorithm::to_lower_copy(elt.first);
const char *content_type = info.env->get("CONTENT_TYPE");
std::string date;
- std::map<std::string, std::string> qs_map;
+ meta_map_t qs_map;
if (qsr) {
get_v2_qs_map(info, qs_map); // handle qs metadata
const char *content_md5,
const char *content_type,
const char *date,
- const std::map<std::string, std::string>& meta_map,
- const std::map<std::string, std::string>& qs_map,
+ const meta_map_t& meta_map,
+ const meta_map_t& qs_map,
const char *request_uri,
const std::map<std::string, std::string>& sub_resources,
std::string& dest_str);
}
}
+using meta_map_t = boost::container::flat_map <std::string, std::string>;
struct req_info {
const RGWEnv *env;
RGWHTTPArgs args;
- map<string, string> x_meta_map;
+ meta_map_t x_meta_map;
string host;
const char *method;
/* implement combining step, S3 header canonicalization; k is a
* valid header and in lc form */
static inline void add_amz_meta_header(
- std::map<std::string, std::string>& x_meta_map,
+ meta_map_t& x_meta_map,
const std::string& k,
const std::string& v)
{
int RGWLoadGenRequestEnv::sign(RGWAccessKey& access_key)
{
- map<string, string> meta_map;
+ meta_map_t meta_map;
map<string, string> sub_resources;
string canonical_header;
content_type.c_str(),
date_str.c_str(),
meta_map,
- map<string, string>{},
+ meta_map_t{},
uri.c_str(),
sub_resources,
canonical_header);
record.bucket_id = s->bucket.bucket_id;
// pass meta data
record.x_meta_map = s->info.x_meta_map;
+ // pass tags
+ record.tags = s->tagset.get_tags();
}
bool match(const rgw_pubsub_topic_filter& filter, const req_state* s, EventType event) {
if (!::match(filter.s3_filter.metadata_filter, s->info.x_meta_map)) {
return false;
}
+ if (!::match(filter.s3_filter.tag_filter, s->tagset.get_tags())) {
+ return false;
+ }
return true;
}
return 0;
}
+// cache the objects tags into the requests
+// use inside try/catch as "decode()" may throw
+void populate_tags_in_request(req_state* s, const std::map<std::string, bufferlist>& attrs) {
+ const auto attr_iter = attrs.find(RGW_ATTR_TAGS);
+ if (attr_iter != attrs.end()) {
+ auto bliter = attr_iter->second.cbegin();
+ decode(s->tagset, bliter);
+ }
+}
+
+// cache the objects metadata into the request
+void populate_metadata_in_request(req_state* s, std::map<std::string, bufferlist>& attrs) {
+ for (auto& attr : attrs) {
+ if (boost::algorithm::starts_with(attr.first, RGW_ATTR_META_PREFIX)) {
+ std::string_view key(attr.first);
+ key.remove_prefix(sizeof(RGW_ATTR_PREFIX)-1);
+ s->info.x_meta_map.emplace(key, attr.second.c_str());
+ }
+ }
+}
int RGWOp::verify_op_mask()
{
bool check_obj_lock = obj.key.have_instance() && s->bucket_info.obj_lock_enabled();
if (!s->object.empty()) {
+ /* check if obj exists, read orig attrs */
+ op_ret = get_obj_attrs(store, s, obj, attrs);
+
if (need_object_expiration() || multipart_delete) {
- /* check if obj exists, read orig attrs */
- op_ret = get_obj_attrs(store, s, obj, attrs);
if (op_ret < 0) {
+ // failed to get attributes
return;
}
}
if (check_obj_lock) {
- /* check if obj exists, read orig attrs */
- op_ret = get_obj_attrs(store, s, obj, attrs);
if (op_ret < 0) {
if (op_ret == -ENOENT) {
/* object maybe delete_marker, skip check_obj_lock*/
check_obj_lock = false;
} else {
+ // failed to get attributes and check_obj_lock is needed
return;
}
}
if (op_ret == -ERR_PRECONDITION_FAILED && no_precondition_error) {
op_ret = 0;
}
+
+ // cache the objects tags and metadata into the requests
+ // so it could be used in the notification mechanism
+ try {
+ populate_tags_in_request(s, attrs);
+ } catch (buffer::error& err) {
+ ldpp_dout(this, 5) << "WARNING: failed to populate delete request with object tags: " << err.what() << dendl;
+ }
+ populate_metadata_in_request(s, attrs);
} else {
op_ret = -EINVAL;
}
return !(prefix_rule.empty() && suffix_rule.empty() && regex_rule.empty());
}
-bool rgw_s3_metadata_filter::decode_xml(XMLObj* obj) {
- metadata.clear();
+bool rgw_s3_key_value_filter::decode_xml(XMLObj* obj) {
+ kvl.clear();
XMLObjIter iter = obj->find("FilterRule");
XMLObj *o;
while ((o = iter.get_next())) {
RGWXMLDecoder::decode_xml("Name", key, o, throw_if_missing);
RGWXMLDecoder::decode_xml("Value", value, o, throw_if_missing);
- metadata.emplace(key, value);
+ kvl.emplace(key, value);
}
return true;
}
-void rgw_s3_metadata_filter::dump_xml(Formatter *f) const {
- for (const auto& key_value : metadata) {
+void rgw_s3_key_value_filter::dump_xml(Formatter *f) const {
+ for (const auto& key_value : kvl) {
f->open_object_section("FilterRule");
::encode_xml("Name", key_value.first, f);
::encode_xml("Value", key_value.second, f);
}
}
-bool rgw_s3_metadata_filter::has_content() const {
- return !metadata.empty();
+bool rgw_s3_key_value_filter::has_content() const {
+ return !kvl.empty();
}
bool rgw_s3_filter::decode_xml(XMLObj* obj) {
RGWXMLDecoder::decode_xml("S3Key", key_filter, obj);
RGWXMLDecoder::decode_xml("S3Metadata", metadata_filter, obj);
+ RGWXMLDecoder::decode_xml("S3Tags", tag_filter, obj);
return true;
}
if (metadata_filter.has_content()) {
::encode_xml("S3Metadata", metadata_filter, f);
}
+ if (tag_filter.has_content()) {
+ ::encode_xml("S3Tags", tag_filter, f);
+ }
}
bool rgw_s3_filter::has_content() const {
return key_filter.has_content() ||
- metadata_filter.has_content();
+ metadata_filter.has_content() ||
+ tag_filter.has_content();
}
bool match(const rgw_s3_key_filter& filter, const std::string& key) {
return true;
}
-bool match(const rgw_s3_metadata_filter& filter, const Metadata& metadata) {
- // all filter pairs must exist with the same value in the object's metadata
- // object metadata may include items not in the filter
- return std::includes(metadata.begin(), metadata.end(), filter.metadata.begin(), filter.metadata.end());
+bool match(const rgw_s3_key_value_filter& filter, const KeyValueList& kvl) {
+ // all filter pairs must exist with the same value in the object's metadata/tags
+ // object metadata/tags may include items not in the filter
+ return std::includes(kvl.begin(), kvl.end(), filter.kvl.begin(), filter.kvl.end());
}
bool match(const rgw::notify::EventTypeList& events, rgw::notify::EventType event) {
encode_json("versionId", object_versionId, f);
encode_json("sequencer", object_sequencer, f);
encode_json("metadata", x_meta_map, f);
+ encode_json("tags", tags, f);
}
}
encode_json("eventId", id, f);
#include "rgw_rados.h"
#include "rgw_notify_event_type.h"
#include "services/svc_sys_obj.h"
+#include <boost/container/flat_map.hpp>
class XMLObj;
};
WRITE_CLASS_ENCODER(rgw_s3_key_filter)
-using Metadata = std::map<std::string, std::string>;
+using KeyValueList = boost::container::flat_map<std::string, std::string>;
-struct rgw_s3_metadata_filter {
- Metadata metadata;
+struct rgw_s3_key_value_filter {
+ KeyValueList kvl;
bool has_content() const;
void encode(bufferlist& bl) const {
ENCODE_START(1, 1, bl);
- encode(metadata, bl);
+ encode(kvl, bl);
ENCODE_FINISH(bl);
}
void decode(bufferlist::const_iterator& bl) {
DECODE_START(1, bl);
- decode(metadata, bl);
+ decode(kvl, bl);
DECODE_FINISH(bl);
}
};
-WRITE_CLASS_ENCODER(rgw_s3_metadata_filter)
+WRITE_CLASS_ENCODER(rgw_s3_key_value_filter)
struct rgw_s3_filter {
rgw_s3_key_filter key_filter;
- rgw_s3_metadata_filter metadata_filter;
+ rgw_s3_key_value_filter metadata_filter;
+ rgw_s3_key_value_filter tag_filter;
bool has_content() const;
void dump_xml(Formatter *f) const;
void encode(bufferlist& bl) const {
- ENCODE_START(1, 1, bl);
+ ENCODE_START(2, 1, bl);
encode(key_filter, bl);
encode(metadata_filter, bl);
+ encode(tag_filter, bl);
ENCODE_FINISH(bl);
}
void decode(bufferlist::const_iterator& bl) {
- DECODE_START(1, bl);
+ DECODE_START(2, bl);
decode(key_filter, bl);
decode(metadata_filter, bl);
+ if (struct_v >= 2) {
+ decode(tag_filter, bl);
+ }
DECODE_FINISH(bl);
}
};
<Value></Value>
</FilterRule>
</S3Metadata>
+ <S3Tags>
+ <FilterRule>
+ <Name></Name>
+ <Value></Value>
+ </FilterRule>
+ </S3Tags>
</Filter>
<Id>notification1</Id>
<Topic>arn:aws:sns:<region>:<account>:<topic></Topic>
rgw_pubsub_s3_notification() = default;
// construct from rgw_pubsub_topic_filter (used by get/list notifications)
- rgw_pubsub_s3_notification(const rgw_pubsub_topic_filter& topic_filter);
+ explicit rgw_pubsub_s3_notification(const rgw_pubsub_topic_filter& topic_filter);
};
// return true if the key matches the prefix/suffix/regex rules of the key filter
bool match(const rgw_s3_key_filter& filter, const std::string& key);
-// return true if the key matches the metadata rules of the metadata filter
-bool match(const rgw_s3_metadata_filter& filter, const Metadata& metadata);
+// return true if the key matches the metadata/tags rules of the metadata/tags filter
+bool match(const rgw_s3_key_value_filter& filter, const KeyValueList& kvl);
// return true if the event type matches (equal or contained in) one of the events in the list
bool match(const rgw::notify::EventTypeList& events, rgw::notify::EventType event);
"versionId":"",
"sequencer": "",
"metadata": ""
+ "tags": ""
}
},
"eventId":"",
// this is an rgw extension holding the internal bucket id
std::string bucket_id;
// meta data
- std::map<std::string, std::string> x_meta_map;
+ KeyValueList x_meta_map;
+ // tags
+ KeyValueList tags;
void encode(bufferlist& bl) const {
- ENCODE_START(2, 1, bl);
+ ENCODE_START(3, 1, bl);
encode(eventVersion, bl);
encode(eventSource, bl);
encode(awsRegion, bl);
encode(id, bl);
encode(bucket_id, bl);
encode(x_meta_map, bl);
+ encode(tags, bl);
ENCODE_FINISH(bl);
}
void decode(bufferlist::const_iterator& bl) {
- DECODE_START(2, bl);
+ DECODE_START(3, bl);
decode(eventVersion, bl);
decode(eventSource, bl);
decode(awsRegion, bl);
decode(bucket_id, bl);
decode(x_meta_map, bl);
}
+ if (struct_v >= 3) {
+ decode(tags, bl);
+ }
DECODE_FINISH(bl);
}
// read the list of topics associated with a bucket and populate into result
// return 0 on success or if no topic was associated with the bucket, error code otherwise
int get_topics(rgw_pubsub_bucket_topics *result);
- // adds a topic + filter (event list, and possibly name and metadata filters) to a bucket
+ // adds a topic + filter (event list, and possibly name metadata or tags filters) to a bucket
// assigning a notification name is optional (needed for S3 compatible notifications)
// if the topic already exist on the bucket, the filter event list may be updated
// for S3 compliant notifications the version with: s3_filter and notif_name should be used
headers.push_back(pair<string, string>("HTTP_DATE", date_str));
string canonical_header;
- map<string, string> meta_map;
+ meta_map_t meta_map;
map<string, string> sub_resources;
rgw_create_s3_canonical_header(method.c_str(), NULL, NULL, date_str.c_str(),
headers.emplace_back(kv);
}
- map<string, string>& meta_map = new_info.x_meta_map;
+ meta_map_t& meta_map = new_info.x_meta_map;
for (const auto& kv: meta_map) {
headers.emplace_back(kv);
}
}
}
-static void add_grants_headers(map<int, string>& grants, RGWEnv& env, map<string, string>& meta_map)
+static void add_grants_headers(map<int, string>& grants, RGWEnv& env, meta_map_t& meta_map)
{
struct grant_type_to_header *t;
gen_bucket_name, \
get_user, \
get_tenant
-from .zone_ps import PSTopic, PSTopicS3, PSNotification, PSSubscription, PSNotificationS3, print_connection_info, delete_all_s3_topics
+from .zone_ps import PSTopic, \
+ PSTopicS3, \
+ PSNotification, \
+ PSSubscription, \
+ PSNotificationS3, \
+ print_connection_info, \
+ delete_all_s3_topics, \
+ put_object_tagging, \
+ get_object_tagging
from multisite import User
from nose import SkipTest
from nose.tools import assert_not_equal, assert_equal
+import boto.s3.tagging
# configure logging for the tests module
log = logging.getLogger(__name__)
topic_arn = topic_conf.set_config()
# create s3 notification
notification_name = bucket_name + NOTIFICATION_SUFFIX
+ meta_key = 'meta1'
+ meta_value = 'This is my metadata value'
+ meta_prefix = 'x-amz-meta-'
topic_conf_list = [{'Id': notification_name,'TopicArn': topic_arn,
- 'Events': ['s3:ObjectCreated:*']
- }]
+ 'Events': ['s3:ObjectCreated:*', 's3:ObjectRemoved:*'],
+ 'Filter': {
+ 'Metadata': {
+ 'FilterRules': [{'Name': meta_prefix+meta_key, 'Value': meta_value}]
+ }
+ }
+ }]
s3_notification_conf = PSNotificationS3(zones[0].conn, bucket_name, topic_conf_list)
response, status = s3_notification_conf.set_config()
assert_equal(status/100, 2)
# create objects in the bucket
- key = bucket.new_key('foo')
- key.set_metadata('meta1', 'This is my metadata value')
+ key_name = 'foo'
+ key = bucket.new_key(key_name)
+ key.set_metadata(meta_key, meta_value)
key.set_contents_from_string('aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa')
- keys = list(bucket.list())
- print 'wait for 5sec for the messages...'
+
+ # create objects in the bucket using COPY
+ bucket.copy_key('copy_of_foo', bucket.name, key.name)
+ # create objects in the bucket using multi-part upload
+ fp = tempfile.TemporaryFile(mode='w')
+ fp.write('bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb')
+ fp.close()
+ uploader = bucket.initiate_multipart_upload('multipart_foo',
+ metadata={meta_key: meta_value})
+ fp = tempfile.TemporaryFile(mode='r')
+ uploader.upload_part_from_file(fp, 1)
+ uploader.complete_upload()
+ fp.close()
+ print('wait for 5sec for the messages...')
time.sleep(5)
# check amqp receiver
- receiver.verify_s3_events(keys, exact_match=True)
+ event_count = 0
+ for event in receiver.get_and_reset_events():
+ assert_equal(event['Records'][0]['s3']['object']['metadata'][0]['key'], meta_prefix+meta_key)
+ assert_equal(event['Records'][0]['s3']['object']['metadata'][0]['val'], meta_value)
+ event_count +=1
+
+ # only PUT and POST has the metadata value
+ assert_equal(event_count, 2)
+
+ # delete objects
+ for key in bucket.list():
+ key.delete()
+ print('wait for 5sec for the messages...')
+ time.sleep(5)
+ # check amqp receiver
+ event_count = 0
+ for event in receiver.get_and_reset_events():
+ assert_equal(event['Records'][0]['s3']['object']['metadata'][0]['key'], meta_prefix+meta_key)
+ assert_equal(event['Records'][0]['s3']['object']['metadata'][0]['val'], meta_value)
+ event_count +=1
+
+ # all 3 object has metadata when deleted
+ assert_equal(event_count, 3)
# cleanup
stop_amqp_receiver(receiver, task)
s3_notification_conf.del_config()
topic_conf.del_config()
+ # delete the bucket
+ zones[0].delete_bucket(bucket_name)
+ clean_rabbitmq(proc)
+
+
+def test_ps_s3_tags_on_master():
+ """ test s3 notification of tags on master """
+ if skip_push_tests:
+ return SkipTest("PubSub push tests don't run in teuthology")
+ hostname = get_ip()
+ proc = init_rabbitmq()
+ if proc is None:
+ return SkipTest('end2end amqp tests require rabbitmq-server installed')
+ zones, _ = init_env(require_ps=False)
+ realm = get_realm()
+ zonegroup = realm.master_zonegroup()
+
+ # create bucket
+ bucket_name = gen_bucket_name()
+ bucket = zones[0].create_bucket(bucket_name)
+ topic_name = bucket_name + TOPIC_SUFFIX
+
+ # start amqp receiver
+ exchange = 'ex1'
+ task, receiver = create_amqp_receiver_thread(exchange, topic_name)
+ task.start()
+
+ # create s3 topic
+ endpoint_address = 'amqp://' + hostname
+ endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=broker'
+ topic_conf = PSTopicS3(zones[0].conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
+ topic_arn = topic_conf.set_config()
+ # create s3 notification
+ notification_name = bucket_name + NOTIFICATION_SUFFIX
+ topic_conf_list = [{'Id': notification_name,'TopicArn': topic_arn,
+ 'Events': ['s3:ObjectCreated:*', 's3:ObjectRemoved:*'],
+ 'Filter': {
+ 'Tags': {
+ 'FilterRules': [{'Name': 'hello', 'Value': 'world'}]
+ }
+ }
+ }]
+
+ s3_notification_conf = PSNotificationS3(zones[0].conn, bucket_name, topic_conf_list)
+ response, status = s3_notification_conf.set_config()
+ assert_equal(status/100, 2)
+
+ # create objects in the bucket with tags
+ tags = 'hello=world&ka=boom'
+ key_name1 = 'key1'
+ put_object_tagging(zones[0].conn, bucket_name, key_name1, tags)
+ tags = 'foo=bar&ka=boom'
+ key_name2 = 'key2'
+ put_object_tagging(zones[0].conn, bucket_name, key_name2, tags)
+ key_name3 = 'key3'
+ key = bucket.new_key(key_name3)
+ key.set_contents_from_string('bar')
+ # create objects in the bucket using COPY
+ bucket.copy_key('copy_of_'+key_name1, bucket.name, key_name1)
+ print('wait for 5sec for the messages...')
+ time.sleep(5)
+ expected_tags = [{'val': 'world', 'key': 'hello'}, {'val': 'boom', 'key': 'ka'}]
+ # check amqp receiver
+ for event in receiver.get_and_reset_events():
+ obj_tags = event['Records'][0]['s3']['object']['tags']
+ assert_equal(obj_tags[0], expected_tags[0])
+
+ # delete the objects
for key in bucket.list():
key.delete()
+ print('wait for 5sec for the messages...')
+ time.sleep(5)
+ # check amqp receiver
+ for event in receiver.get_and_reset_events():
+ obj_tags = event['Records'][0]['s3']['object']['tags']
+ assert_equal(obj_tags[0], expected_tags[0])
+
+ # cleanup
+ stop_amqp_receiver(receiver, task)
+ s3_notification_conf.del_config()
+ topic_conf.del_config()
# delete the bucket
zones[0].delete_bucket(bucket_name)
clean_rabbitmq(proc)
log = logging.getLogger('rgw_multi.tests')
+def put_object_tagging(conn, bucket_name, key, tags):
+ client = boto3.client('s3',
+ endpoint_url='http://'+conn.host+':'+str(conn.port),
+ aws_access_key_id=conn.aws_access_key_id,
+ aws_secret_access_key=conn.aws_secret_access_key,
+ config=Config(signature_version='s3'))
+ return client.put_object(Body='aaaaaaaaaaa', Bucket=bucket_name, Key=key, Tagging=tags)
+
+
+def get_object_tagging(conn, bucket, object_key):
+ client = boto3.client('s3',
+ endpoint_url='http://'+conn.host+':'+str(conn.port),
+ aws_access_key_id=conn.aws_access_key_id,
+ aws_secret_access_key=conn.aws_secret_access_key,
+ config=Config(signature_version='s3'))
+ return client.get_object_tagging(
+ Bucket=bucket,
+ Key=object_key
+ )
+
class PSZone(Zone): # pylint: disable=too-many-ancestors
""" PubSub zone class """