return std::includes(kv.begin(), kv.end(), filter.kv.begin(), filter.kv.end());
}
+bool match(const rgw_s3_key_value_filter& filter, const KeyMultiValueMap& kv) {
+ // all filter pairs must exist with the same value in the object's metadata/tags
+ // object metadata/tags may include items not in the filter
+ for (auto& filter : filter.kv) {
+ auto result = kv.equal_range(filter.first);
+ if (std::any_of(result.first, result.second, [&filter](const pair<string,string>& p) { return p.second == filter.second;}))
+ continue;
+ else
+ return false;
+ }
+ return true;
+}
+
bool match(const rgw::notify::EventTypeList& events, rgw::notify::EventType event) {
// if event list exists, and none of the events in the list matches the event type, filter the message
if (!events.empty() && std::find(events.begin(), events.end(), event) == events.end()) {
WRITE_CLASS_ENCODER(rgw_s3_key_filter)
using KeyValueMap = boost::container::flat_map<std::string, std::string>;
+using KeyMultiValueMap = std::multimap<std::string, std::string>;
struct rgw_s3_key_value_filter {
KeyValueMap kv;
// return true if the key matches the prefix/suffix/regex rules of the key filter
bool match(const rgw_s3_key_filter& filter, const std::string& key);
-// return true if the key matches the metadata/tags rules of the metadata/tags filter
+
+// return true if the key matches the metadata rules of the metadata filter
bool match(const rgw_s3_key_value_filter& filter, const KeyValueMap& kv);
+
+// return true if the key matches the tag rules of the tag filter
+bool match(const rgw_s3_key_value_filter& filter, const KeyMultiValueMap& kv);
+
// return true if the event type matches (equal or contained in) one of the events in the list
bool match(const rgw::notify::EventTypeList& events, rgw::notify::EventType event);
// meta data
KeyValueMap x_meta_map;
// tags
- KeyValueMap tags;
+ KeyMultiValueMap tags;
// opaque data received from the topic
// could be used to identify the gateway
std::string opaque_data;
using namespace std;
-bool RGWObjTags::add_tag(const string&key, const string& val){
- return tag_map.emplace(std::make_pair(key,val)).second;
+void RGWObjTags::add_tag(const string& key, const string& val){
+ tag_map.emplace(std::make_pair(key,val));
}
-bool RGWObjTags::emplace_tag(std::string&& key, std::string&& val){
- return tag_map.emplace(std::move(key), std::move(val)).second;
+void RGWObjTags::emplace_tag(std::string&& key, std::string&& val){
+ tag_map.emplace(std::move(key), std::move(val));
}
int RGWObjTags::check_and_add_tag(const string&key, const string& val){
return -ERR_INVALID_TAG;
}
- // if we get a conflicting key, either the XML is malformed or the user
- // supplied an invalid string
- if (!add_tag(key,val))
- return -EINVAL;
+ add_tag(key,val);
return 0;
}
#include <string>
#include <include/types.h>
-#include <boost/container/flat_map.hpp>
+#include <map>
class RGWObjTags
{
public:
- using tag_map_t = boost::container::flat_map <std::string, std::string>;
+ using tag_map_t = std::multimap <std::string, std::string>;
protected:
tag_map_t tag_map;
}
void dump(Formatter *f) const;
- bool add_tag(const std::string& key, const std::string& val="");
- bool emplace_tag(std::string&& key, std::string&& val);
+ void add_tag(const std::string& key, const std::string& val="");
+ void emplace_tag(std::string&& key, std::string&& val);
int check_and_add_tag(const std::string& key, const std::string& val="");
size_t count() const {return tag_map.size();}
int set_from_string(const std::string& input);
'Events': ['s3:ObjectCreated:*', 's3:ObjectRemoved:*'],
'Filter': {
'Tags': {
- 'FilterRules': [{'Name': 'hello', 'Value': 'world'}]
+ 'FilterRules': [{'Name': 'hello', 'Value': 'world'}, {'Name': 'ka', 'Value': 'boom'}]
}
}
}]
response, status = s3_notification_conf.set_config()
assert_equal(status/100, 2)
+ expected_keys = []
# create objects in the bucket with tags
- tags = 'hello=world&ka=boom'
+ # key 1 has all the tags in the filter
+ tags = 'hello=world&ka=boom&hello=helloworld'
key_name1 = 'key1'
put_object_tagging(conn, bucket_name, key_name1, tags)
- tags = 'foo=bar&ka=boom'
- key_name2 = 'key2'
- put_object_tagging(conn, bucket_name, key_name2, tags)
+ expected_keys.append(key_name1)
+ # key 2 has an additional tag not in the filter
+ tags = 'hello=world&foo=bar&ka=boom&hello=helloworld'
+ key_name = 'key2'
+ put_object_tagging(conn, bucket_name, key_name, tags)
+ expected_keys.append(key_name)
+ # key 3 has no tags
key_name3 = 'key3'
key = bucket.new_key(key_name3)
key.set_contents_from_string('bar')
+ # key 4 has the wrong of the multi value tags
+ tags = 'hello=helloworld&ka=boom'
+ key_name = 'key4'
+ put_object_tagging(conn, bucket_name, key_name, tags)
+ # key 5 has the right of the multi value tags
+ tags = 'hello=world&ka=boom'
+ key_name = 'key5'
+ put_object_tagging(conn, bucket_name, key_name, tags)
+ expected_keys.append(key_name)
+ # key 6 is missing a tag
+ tags = 'hello=world'
+ key_name = 'key6'
+ put_object_tagging(conn, bucket_name, key_name, tags)
# create objects in the bucket using COPY
- bucket.copy_key('copy_of_'+key_name1, bucket.name, key_name1)
+ key_name = 'copy_of_'+key_name1
+ bucket.copy_key(key_name, bucket.name, key_name1)
+ expected_keys.append(key_name)
+
print('wait for 5sec for the messages...')
time.sleep(5)
- expected_tags = [{'val': 'world', 'key': 'hello'}, {'val': 'boom', 'key': 'ka'}]
- # check amqp receiver
+ event_count = 0
+ expected_tags1 = [{'key': 'hello', 'val': 'world'}, {'key': 'hello', 'val': 'helloworld'}, {'key': 'ka', 'val': 'boom'}]
+ expected_tags1 = sorted(expected_tags1, key=lambda k: k['key']+k['val'])
for event in receiver.get_and_reset_events():
- obj_tags = event['Records'][0]['s3']['object']['tags']
- assert_equal(obj_tags[0], expected_tags[0])
+ key = event['Records'][0]['s3']['object']['key']
+ if (key == key_name1):
+ obj_tags = sorted(event['Records'][0]['s3']['object']['tags'], key=lambda k: k['key']+k['val'])
+ assert_equal(obj_tags, expected_tags1)
+ event_count += 1
+ assert(key in expected_keys)
+
+ assert_equal(event_count, len(expected_keys))
# delete the objects
for key in bucket.list():
key.delete()
print('wait for 5sec for the messages...')
time.sleep(5)
+ event_count = 0
# check amqp receiver
for event in receiver.get_and_reset_events():
- obj_tags = event['Records'][0]['s3']['object']['tags']
- assert_equal(obj_tags[0], expected_tags[0])
+ key = event['Records'][0]['s3']['object']['key']
+ if (key == key_name1):
+ obj_tags = sorted(event['Records'][0]['s3']['object']['tags'], key=lambda k: k['key']+k['val'])
+ assert_equal(obj_tags, expected_tags1)
+ event_count += 1
+ assert(key in expected_keys)
+
+ assert(event_count == len(expected_keys))
# cleanup
stop_amqp_receiver(receiver, task)
# delete the bucket
conn.delete_bucket(bucket_name)
-
@attr('amqp_test')
def test_ps_s3_versioning_on_master():
""" test s3 notification of object versions """