}
// populate event from request
-void populate_event_from_request(const req_state *s,
+void populate_event_from_request(const reservation_t& res,
rgw::sal::RGWObject* obj,
uint64_t size,
const ceph::real_time& mtime,
const std::string& etag,
EventType event_type,
- rgw_pubsub_s3_event& event) {
+ rgw_pubsub_s3_event& event) {
+ const auto s = res.s;
event.eventTime = mtime;
event.eventName = to_string(event_type);
event.userIdentity = s->user->get_id().id; // user that triggered the change
std::back_inserter(event.object_sequencer));
set_event_id(event.id, etag, ts);
event.bucket_id = s->bucket->get_bucket_id();
- // pass meta data
- if (s->info.x_meta_map.empty()) {
- // try to fetch the metadata from the attributes
+ // pass metadata
+ if (res.cached_metadata.empty()) {
+ // no metadata cached:
+ // either no metadata exist or no metadata filter was used
+ event.x_meta_map = s->info.x_meta_map;
metadata_from_attributes(s, obj, event.x_meta_map);
} else {
- event.x_meta_map = s->info.x_meta_map;
+ event.x_meta_map = std::move(res.cached_metadata);
}
// pass tags
if (s->tagset.get_tags().empty()) {
// opaque data will be filled from topic configuration
}
-bool notification_match(const rgw_pubsub_topic_filter& filter, const req_state* s, rgw::sal::RGWObject* obj,
- EventType event, const RGWObjTags* req_tags) {
- if (!match(filter.events, event)) {
+bool notification_match(reservation_t& res, const rgw_pubsub_topic_filter& filter, EventType event, const RGWObjTags* req_tags) {
+ if (!match(filter.events, event)) {
return false;
}
+ const auto obj = res.object;
if (!match(filter.s3_filter.key_filter, obj->get_name())) {
return false;
}
+ const auto s = res.s;
if (!filter.s3_filter.metadata_filter.kv.empty()) {
// metadata filter exists
- if (!s->info.x_meta_map.empty()) {
- // metadata was cached in req_state
- if (!match(filter.s3_filter.metadata_filter, s->info.x_meta_map)) {
- return false;
- }
- } else {
- // try to fetch the metadata from the attributes
- KeyValueMap metadata;
- metadata_from_attributes(s, obj, metadata);
- if (!match(filter.s3_filter.metadata_filter, metadata)) {
- return false;
- }
+ res.cached_metadata = s->info.x_meta_map;
+ metadata_from_attributes(s, obj, res.cached_metadata);
+ if (!match(filter.s3_filter.metadata_filter, res.cached_metadata)) {
+ return false;
}
}
for (const auto& bucket_topic : bucket_topics.topics) {
const rgw_pubsub_topic_filter& topic_filter = bucket_topic.second;
const rgw_pubsub_topic& topic_cfg = topic_filter.topic;
- if (!notification_match(topic_filter, res.s, res.object, event_type, req_tags)) {
+ if (!notification_match(res, topic_filter, event_type, req_tags)) {
// notification does not apply to req_state
continue;
}
continue;
}
event_entry_t event_entry;
- populate_event_from_request(res.s, obj, size, mtime, etag, event_type, event_entry.event);
+ populate_event_from_request(res, obj, size, mtime, etag, event_type, event_entry.event);
event_entry.event.configurationId = topic.configurationId;
event_entry.event.opaque_data = topic.cfg.opaque_data;
if (topic.cfg.dest.persistent) {
response, status = s3_notification_conf.set_config()
assert_equal(status/100, 2)
+ expected_keys = []
# create objects in the bucket
key_name = 'foo'
key = bucket.new_key(key_name)
key.set_metadata(meta_key, meta_value)
key.set_contents_from_string('aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa')
+ expected_keys.append(key_name)
# create objects in the bucket using COPY
- bucket.copy_key('copy_of_foo', bucket.name, key.name)
+ key_name = 'copy_of_foo'
+ bucket.copy_key(key_name, bucket.name, key.name)
+ expected_keys.append(key_name)
+
+ # create another objects in the bucket using COPY
+ # but override the metadata value
+ key_name = 'another_copy_of_foo'
+ bucket.copy_key(key_name, bucket.name, key.name, metadata={meta_key: 'kaboom'})
# create objects in the bucket using multi-part upload
fp = tempfile.NamedTemporaryFile(mode='w+b')
- object_size = 1024
+ chunk_size = 1024*1024*5 # 5MB
+ object_size = 10*chunk_size
content = bytearray(os.urandom(object_size))
fp.write(content)
fp.flush()
fp.seek(0)
- uploader = bucket.initiate_multipart_upload('multipart_foo',
- metadata={meta_key: meta_value})
- uploader.upload_part_from_file(fp, 1)
+ key_name = 'multipart_foo'
+ uploader = bucket.initiate_multipart_upload(key_name,
+ metadata={meta_key: meta_value})
+ for i in range(1,5):
+ uploader.upload_part_from_file(fp, i, size=chunk_size)
+ fp.seek(i*chunk_size)
uploader.complete_upload()
fp.close()
+ expected_keys.append(key_name)
print('wait for 5sec for the messages...')
time.sleep(5)
# check amqp receiver
- event_count = 0
- for event in receiver.get_and_reset_events():
- s3_event = event['Records'][0]['s3']
- assert_equal(s3_event['object']['metadata'][0]['key'], meta_prefix+meta_key)
- assert_equal(s3_event['object']['metadata'][0]['val'], meta_value)
- event_count +=1
-
- # only PUT and POST has the metadata value
- assert_equal(event_count, 2)
+ events = receiver.get_and_reset_events()
+ assert_equal(len(events), 4) # PUT, COPY, Multipart start, Multipart End
+ for event in events:
+ assert(event['Records'][0]['s3']['object']['key'] in expected_keys)
# delete objects
for key in bucket.list():
print('wait for 5sec for the messages...')
time.sleep(5)
# check amqp receiver
- event_count = 0
- for event in receiver.get_and_reset_events():
- s3_event = event['Records'][0]['s3']
- assert_equal(s3_event['object']['metadata'][0]['key'], meta_prefix+meta_key)
- assert_equal(s3_event['object']['metadata'][0]['val'], meta_value)
- event_count +=1
+ #assert_equal(len(receiver.get_and_reset_events()), len(expected_keys))
# all 3 object has metadata when deleted
assert_equal(event_count, 3)