From: yuval Lifshitz Date: Mon, 14 Nov 2022 15:08:19 +0000 (+0200) Subject: rgw/notifications: sending metadata in COPY and CompleteMultipartUpload X-Git-Tag: v18.1.0~812^2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=e4e05dde604830f6f2e58b28c4cb2e6d208b0064;p=ceph.git rgw/notifications: sending metadata in COPY and CompleteMultipartUpload when the metadata filter is not set. this is a regression from: 35a4eb4410394a0014648dda7df92642f3b536d Fixes: https://tracker.ceph.com/issues/58014 Signed-off-by: yuval Lifshitz --- diff --git a/src/rgw/rgw_notify.cc b/src/rgw/rgw_notify.cc index c5940059e5cf..498349035441 100644 --- a/src/rgw/rgw_notify.cc +++ b/src/rgw/rgw_notify.cc @@ -624,7 +624,10 @@ rgw::sal::Object* get_object_with_atttributes( if (!src_obj->get_bucket()) { src_obj->set_bucket(res.bucket); } - if (src_obj->get_obj_attrs(res.yield, res.dpp) < 0) { + const auto ret = src_obj->get_obj_attrs(res.yield, res.dpp); + if (ret < 0) { + ldpp_dout(res.dpp, 20) << "failed to get attributes from object: " << + src_obj->get_key() << ". ret = " << ret << dendl; return nullptr; } } @@ -638,6 +641,7 @@ static inline void metadata_from_attributes( if (!src_obj) { return; } + res.metadata_fetched_from_attributes = true; for (auto& attr : src_obj->get_attrs()) { if (boost::algorithm::starts_with(attr.first, RGW_ATTR_META_PREFIX)) { std::string_view key(attr.first); @@ -703,13 +707,11 @@ static inline void populate_event(reservation_t& res, set_event_id(event.id, etag, ts); event.bucket_id = res.bucket->get_bucket_id(); // pass meta data - if (res.x_meta_map.empty()) { - // no metadata cached: + if (!res.metadata_fetched_from_attributes) { // either no metadata exist or no metadata filter was used metadata_from_attributes(res, obj); - } else { - event.x_meta_map = res.x_meta_map; } + event.x_meta_map = res.x_meta_map; // pass tags if (!res.tagset || (*res.tagset).get_tags().empty()) { @@ -974,6 +976,7 @@ reservation_t::reservation_t(const DoutPrefixProvider* _dpp, object_name(_object_name), tagset(_s->tagset), x_meta_map(_s->info.x_meta_map), + metadata_fetched_from_attributes(false), user_id(_s->user->get_id().id), user_tenant(_s->user->get_id().tenant), req_id(_s->req_id), diff --git a/src/rgw/rgw_notify.h b/src/rgw/rgw_notify.h index 60c79e7f70f7..175dc11463d6 100644 --- a/src/rgw/rgw_notify.h +++ b/src/rgw/rgw_notify.h @@ -64,6 +64,7 @@ struct reservation_t { const std::string* const object_name; boost::optional tagset; meta_map_t x_meta_map; // metadata cached by value + bool metadata_fetched_from_attributes; const std::string user_id; const std::string user_tenant; const std::string req_id; diff --git a/src/test/rgw/bucket_notification/test_bn.py b/src/test/rgw/bucket_notification/test_bn.py index cb09d4aa38b7..e4af3e7d496a 100644 --- a/src/test/rgw/bucket_notification/test_bn.py +++ b/src/test/rgw/bucket_notification/test_bn.py @@ -2155,9 +2155,10 @@ def test_ps_s3_multipart_on_master(): # delete the bucket conn.delete_bucket(bucket_name) +META_PREFIX = 'x-amz-meta-' @attr('amqp_test') -def test_ps_s3_metadata_on_master(): +def test_ps_s3_metadata_filter_on_master(): """ test s3 notification of metadata on master """ hostname = get_ip() @@ -2167,9 +2168,9 @@ def test_ps_s3_metadata_on_master(): # create bucket bucket_name = gen_bucket_name() bucket = conn.create_bucket(bucket_name) - topic_name = bucket_name + TOPIC_SUFFIX + topic_name = bucket_name + TOPIC_SUFFIX - # start amqp receiver + # start amqp receivers exchange = 'ex1' task, receiver = create_amqp_receiver_thread(exchange, topic_name) task.start() @@ -2183,18 +2184,17 @@ def test_ps_s3_metadata_on_master(): notification_name = bucket_name + NOTIFICATION_SUFFIX meta_key = 'meta1' meta_value = 'This is my metadata value' - meta_prefix = 'x-amz-meta-' - topic_conf_list = [{'Id': notification_name,'TopicArn': topic_arn, + topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn, 'Events': ['s3:ObjectCreated:*', 's3:ObjectRemoved:*'], 'Filter': { 'Metadata': { - 'FilterRules': [{'Name': meta_prefix+meta_key, 'Value': meta_value}] + 'FilterRules': [{'Name': META_PREFIX+meta_key, 'Value': meta_value}] } } }] s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list) - response, status = s3_notification_conf.set_config() + _, status = s3_notification_conf.set_config() assert_equal(status/100, 2) expected_keys = [] @@ -2214,6 +2214,7 @@ def test_ps_s3_metadata_on_master(): # but override the metadata value key_name = 'another_copy_of_foo' bucket.copy_key(key_name, bucket.name, key.name, metadata={meta_key: 'kaboom'}) + # this key is not in the expected keys due to the different meta value # create objects in the bucket using multi-part upload fp = tempfile.NamedTemporaryFile(mode='w+b') @@ -2237,7 +2238,7 @@ def test_ps_s3_metadata_on_master(): time.sleep(5) # check amqp receiver events = receiver.get_and_reset_events() - assert_equal(len(events), 3) # PUT, COPY, Multipart Complete + assert_equal(len(events), len(expected_keys)) for event in events: assert(event['Records'][0]['s3']['object']['key'] in expected_keys) @@ -2247,7 +2248,104 @@ def test_ps_s3_metadata_on_master(): print('wait for 5sec for the messages...') time.sleep(5) # check amqp receiver - #assert_equal(len(receiver.get_and_reset_events()), len(expected_keys)) + events = receiver.get_and_reset_events() + assert_equal(len(events), len(expected_keys)) + for event in events: + assert(event['Records'][0]['s3']['object']['key'] in expected_keys) + + # cleanup + stop_amqp_receiver(receiver, task) + s3_notification_conf.del_config() + topic_conf.del_config() + # delete the bucket + conn.delete_bucket(bucket_name) + + +@attr('amqp_test') +def test_ps_s3_metadata_on_master(): + """ test s3 notification of metadata on master """ + + hostname = get_ip() + conn = connection() + zonegroup = 'default' + + # create bucket + bucket_name = gen_bucket_name() + bucket = conn.create_bucket(bucket_name) + topic_name = bucket_name + TOPIC_SUFFIX + + # start amqp receivers + exchange = 'ex1' + task, receiver = create_amqp_receiver_thread(exchange, topic_name) + task.start() + + # create s3 topic + endpoint_address = 'amqp://' + hostname + endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=routable' + topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args) + topic_arn = topic_conf.set_config() + # create s3 notification + notification_name = bucket_name + NOTIFICATION_SUFFIX + meta_key = 'meta1' + meta_value = 'This is my metadata value' + meta_prefix = 'x-amz-meta-' + topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn, + 'Events': ['s3:ObjectCreated:*', 's3:ObjectRemoved:*'], + }] + + s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list) + _, status = s3_notification_conf.set_config() + assert_equal(status/100, 2) + + # create objects in the bucket + key_name = 'foo' + key = bucket.new_key(key_name) + key.set_metadata(meta_key, meta_value) + key.set_contents_from_string('aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa') + # update the object + another_meta_key = 'meta2' + key.set_metadata(another_meta_key, meta_value) + key.set_contents_from_string('bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb') + + # create objects in the bucket using COPY + key_name = 'copy_of_foo' + bucket.copy_key(key_name, bucket.name, key.name) + + # create objects in the bucket using multi-part upload + fp = tempfile.NamedTemporaryFile(mode='w+b') + chunk_size = 1024*1024*5 # 5MB + object_size = 10*chunk_size + content = bytearray(os.urandom(object_size)) + fp.write(content) + fp.flush() + fp.seek(0) + key_name = 'multipart_foo' + uploader = bucket.initiate_multipart_upload(key_name, + metadata={meta_key: meta_value}) + for i in range(1,5): + uploader.upload_part_from_file(fp, i, size=chunk_size) + fp.seek(i*chunk_size) + uploader.complete_upload() + fp.close() + + print('wait for 5sec for the messages...') + time.sleep(5) + # check amqp receiver + events = receiver.get_and_reset_events() + for event in events: + value = [x['val'] for x in event['Records'][0]['s3']['object']['metadata'] if x['key'] == META_PREFIX+meta_key] + assert_equal(value[0], meta_value) + + # delete objects + for key in bucket.list(): + key.delete() + print('wait for 5sec for the messages...') + time.sleep(5) + # check amqp receiver + events = receiver.get_and_reset_events() + for event in events: + value = [x['val'] for x in event['Records'][0]['s3']['object']['metadata'] if x['key'] == META_PREFIX+meta_key] + assert_equal(value[0], meta_value) # cleanup stop_amqp_receiver(receiver, task)