if (!src_obj->get_bucket()) {
src_obj->set_bucket(res.bucket);
}
- if (src_obj->get_obj_attrs(res.yield, res.dpp) < 0) {
+ const auto ret = src_obj->get_obj_attrs(res.yield, res.dpp);
+ if (ret < 0) {
+ ldpp_dout(res.dpp, 20) << "failed to get attributes from object: " <<
+ src_obj->get_key() << ". ret = " << ret << dendl;
return nullptr;
}
}
if (!src_obj) {
return;
}
+ res.metadata_fetched_from_attributes = true;
for (auto& attr : src_obj->get_attrs()) {
if (boost::algorithm::starts_with(attr.first, RGW_ATTR_META_PREFIX)) {
std::string_view key(attr.first);
set_event_id(event.id, etag, ts);
event.bucket_id = res.bucket->get_bucket_id();
// pass meta data
- if (res.x_meta_map.empty()) {
- // no metadata cached:
+ if (!res.metadata_fetched_from_attributes) {
// either no metadata exist or no metadata filter was used
metadata_from_attributes(res, obj);
- } else {
- event.x_meta_map = res.x_meta_map;
}
+ event.x_meta_map = res.x_meta_map;
// pass tags
if (!res.tagset ||
(*res.tagset).get_tags().empty()) {
object_name(_object_name),
tagset(_s->tagset),
x_meta_map(_s->info.x_meta_map),
+ metadata_fetched_from_attributes(false),
user_id(_s->user->get_id().id),
user_tenant(_s->user->get_id().tenant),
req_id(_s->req_id),
# delete the bucket
conn.delete_bucket(bucket_name)
+META_PREFIX = 'x-amz-meta-'
@attr('amqp_test')
-def test_ps_s3_metadata_on_master():
+def test_ps_s3_metadata_filter_on_master():
""" test s3 notification of metadata on master """
hostname = get_ip()
# create bucket
bucket_name = gen_bucket_name()
bucket = conn.create_bucket(bucket_name)
- topic_name = bucket_name + TOPIC_SUFFIX
+ topic_name = bucket_name + TOPIC_SUFFIX
- # start amqp receiver
+ # start amqp receivers
exchange = 'ex1'
task, receiver = create_amqp_receiver_thread(exchange, topic_name)
task.start()
notification_name = bucket_name + NOTIFICATION_SUFFIX
meta_key = 'meta1'
meta_value = 'This is my metadata value'
- meta_prefix = 'x-amz-meta-'
- topic_conf_list = [{'Id': notification_name,'TopicArn': topic_arn,
+ topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn,
'Events': ['s3:ObjectCreated:*', 's3:ObjectRemoved:*'],
'Filter': {
'Metadata': {
- 'FilterRules': [{'Name': meta_prefix+meta_key, 'Value': meta_value}]
+ 'FilterRules': [{'Name': META_PREFIX+meta_key, 'Value': meta_value}]
}
}
}]
s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
- response, status = s3_notification_conf.set_config()
+ _, status = s3_notification_conf.set_config()
assert_equal(status/100, 2)
expected_keys = []
# but override the metadata value
key_name = 'another_copy_of_foo'
bucket.copy_key(key_name, bucket.name, key.name, metadata={meta_key: 'kaboom'})
+ # this key is not in the expected keys due to the different meta value
# create objects in the bucket using multi-part upload
fp = tempfile.NamedTemporaryFile(mode='w+b')
time.sleep(5)
# check amqp receiver
events = receiver.get_and_reset_events()
- assert_equal(len(events), 3) # PUT, COPY, Multipart Complete
+ assert_equal(len(events), len(expected_keys))
for event in events:
assert(event['Records'][0]['s3']['object']['key'] in expected_keys)
print('wait for 5sec for the messages...')
time.sleep(5)
# check amqp receiver
- #assert_equal(len(receiver.get_and_reset_events()), len(expected_keys))
+ events = receiver.get_and_reset_events()
+ assert_equal(len(events), len(expected_keys))
+ for event in events:
+ assert(event['Records'][0]['s3']['object']['key'] in expected_keys)
+
+ # cleanup
+ stop_amqp_receiver(receiver, task)
+ s3_notification_conf.del_config()
+ topic_conf.del_config()
+ # delete the bucket
+ conn.delete_bucket(bucket_name)
+
+
+@attr('amqp_test')
+def test_ps_s3_metadata_on_master():
+ """ test s3 notification of metadata on master """
+
+ hostname = get_ip()
+ conn = connection()
+ zonegroup = 'default'
+
+ # create bucket
+ bucket_name = gen_bucket_name()
+ bucket = conn.create_bucket(bucket_name)
+ topic_name = bucket_name + TOPIC_SUFFIX
+
+ # start amqp receivers
+ exchange = 'ex1'
+ task, receiver = create_amqp_receiver_thread(exchange, topic_name)
+ task.start()
+
+ # create s3 topic
+ endpoint_address = 'amqp://' + hostname
+ endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=routable'
+ topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
+ topic_arn = topic_conf.set_config()
+ # create s3 notification
+ notification_name = bucket_name + NOTIFICATION_SUFFIX
+ meta_key = 'meta1'
+ meta_value = 'This is my metadata value'
+ meta_prefix = 'x-amz-meta-'
+ topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn,
+ 'Events': ['s3:ObjectCreated:*', 's3:ObjectRemoved:*'],
+ }]
+
+ s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
+ _, status = s3_notification_conf.set_config()
+ assert_equal(status/100, 2)
+
+ # create objects in the bucket
+ key_name = 'foo'
+ key = bucket.new_key(key_name)
+ key.set_metadata(meta_key, meta_value)
+ key.set_contents_from_string('aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa')
+ # update the object
+ another_meta_key = 'meta2'
+ key.set_metadata(another_meta_key, meta_value)
+ key.set_contents_from_string('bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb')
+
+ # create objects in the bucket using COPY
+ key_name = 'copy_of_foo'
+ bucket.copy_key(key_name, bucket.name, key.name)
+
+ # create objects in the bucket using multi-part upload
+ fp = tempfile.NamedTemporaryFile(mode='w+b')
+ chunk_size = 1024*1024*5 # 5MB
+ object_size = 10*chunk_size
+ content = bytearray(os.urandom(object_size))
+ fp.write(content)
+ fp.flush()
+ fp.seek(0)
+ key_name = 'multipart_foo'
+ uploader = bucket.initiate_multipart_upload(key_name,
+ metadata={meta_key: meta_value})
+ for i in range(1,5):
+ uploader.upload_part_from_file(fp, i, size=chunk_size)
+ fp.seek(i*chunk_size)
+ uploader.complete_upload()
+ fp.close()
+
+ print('wait for 5sec for the messages...')
+ time.sleep(5)
+ # check amqp receiver
+ events = receiver.get_and_reset_events()
+ for event in events:
+ value = [x['val'] for x in event['Records'][0]['s3']['object']['metadata'] if x['key'] == META_PREFIX+meta_key]
+ assert_equal(value[0], meta_value)
+
+ # delete objects
+ for key in bucket.list():
+ key.delete()
+ print('wait for 5sec for the messages...')
+ time.sleep(5)
+ # check amqp receiver
+ events = receiver.get_and_reset_events()
+ for event in events:
+ value = [x['val'] for x in event['Records'][0]['s3']['object']['metadata'] if x['key'] == META_PREFIX+meta_key]
+ assert_equal(value[0], meta_value)
# cleanup
stop_amqp_receiver(receiver, task)