]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw/notifications: sending metadata in COPY and CompleteMultipartUpload 48875/head
authoryuval Lifshitz <ylifshit@redhat.com>
Mon, 14 Nov 2022 15:08:19 +0000 (17:08 +0200)
committeryuval Lifshitz <ylifshit@redhat.com>
Mon, 14 Nov 2022 15:11:20 +0000 (17:11 +0200)
when the metadata filter is not set.
this is a regression from: 35a4eb4410394a0014648dda7df92642f3b536d

Fixes: https://tracker.ceph.com/issues/58014
Signed-off-by: yuval Lifshitz <ylifshit@redhat.com>
src/rgw/rgw_notify.cc
src/rgw/rgw_notify.h
src/test/rgw/bucket_notification/test_bn.py

index c5940059e5cf576fc549e777b323cc9f58046dd2..498349035441de73130c680d6f56fa911e9105b1 100644 (file)
@@ -624,7 +624,10 @@ rgw::sal::Object* get_object_with_atttributes(
     if (!src_obj->get_bucket()) {
       src_obj->set_bucket(res.bucket);
     }
-    if (src_obj->get_obj_attrs(res.yield, res.dpp) < 0) {
+    const auto ret = src_obj->get_obj_attrs(res.yield, res.dpp);
+    if (ret < 0) {
+      ldpp_dout(res.dpp, 20) << "failed to get attributes from object: " << 
+        src_obj->get_key() << ". ret = " << ret << dendl;
       return nullptr;
     }
   }
@@ -638,6 +641,7 @@ static inline void metadata_from_attributes(
   if (!src_obj) {
     return;
   }
+  res.metadata_fetched_from_attributes = true;
   for (auto& attr : src_obj->get_attrs()) {
     if (boost::algorithm::starts_with(attr.first, RGW_ATTR_META_PREFIX)) {
       std::string_view key(attr.first);
@@ -703,13 +707,11 @@ static inline void populate_event(reservation_t& res,
   set_event_id(event.id, etag, ts);
   event.bucket_id = res.bucket->get_bucket_id();
   // pass meta data
-  if (res.x_meta_map.empty()) {
-    // no metadata cached:
+  if (!res.metadata_fetched_from_attributes) {
     // either no metadata exist or no metadata filter was used
     metadata_from_attributes(res, obj);
-  } else {
-      event.x_meta_map = res.x_meta_map;
   }
+  event.x_meta_map = res.x_meta_map;
   // pass tags
   if (!res.tagset ||
       (*res.tagset).get_tags().empty()) {
@@ -974,6 +976,7 @@ reservation_t::reservation_t(const DoutPrefixProvider* _dpp,
   object_name(_object_name),
   tagset(_s->tagset),
   x_meta_map(_s->info.x_meta_map),
+  metadata_fetched_from_attributes(false),
   user_id(_s->user->get_id().id),
   user_tenant(_s->user->get_id().tenant),
   req_id(_s->req_id),
index 60c79e7f70f76ecbf974a213f2913e48672dbc02..175dc11463d65c3192c145a61b62af53e17a518d 100644 (file)
@@ -64,6 +64,7 @@ struct reservation_t {
   const std::string* const object_name;
   boost::optional<const RGWObjTags&> tagset;
   meta_map_t x_meta_map; // metadata cached by value
+  bool metadata_fetched_from_attributes;
   const std::string user_id;
   const std::string user_tenant;
   const std::string req_id;
index cb09d4aa38b74549a22a7b26b6bd1f16298436ec..e4af3e7d496a4d083561f1e460265326baee10f7 100644 (file)
@@ -2155,9 +2155,10 @@ def test_ps_s3_multipart_on_master():
     # delete the bucket
     conn.delete_bucket(bucket_name)
 
+META_PREFIX = 'x-amz-meta-'
 
 @attr('amqp_test')
-def test_ps_s3_metadata_on_master():
+def test_ps_s3_metadata_filter_on_master():
     """ test s3 notification of metadata on master """
 
     hostname = get_ip()
@@ -2167,9 +2168,9 @@ def test_ps_s3_metadata_on_master():
     # create bucket
     bucket_name = gen_bucket_name()
     bucket = conn.create_bucket(bucket_name)
-    topic_name = bucket_name + TOPIC_SUFFIX
+    topic_name = bucket_name + TOPIC_SUFFIX 
 
-    # start amqp receiver
+    # start amqp receivers
     exchange = 'ex1'
     task, receiver = create_amqp_receiver_thread(exchange, topic_name)
     task.start()
@@ -2183,18 +2184,17 @@ def test_ps_s3_metadata_on_master():
     notification_name = bucket_name + NOTIFICATION_SUFFIX
     meta_key = 'meta1'
     meta_value = 'This is my metadata value'
-    meta_prefix = 'x-amz-meta-'
-    topic_conf_list = [{'Id': notification_name,'TopicArn': topic_arn,
+    topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn,
         'Events': ['s3:ObjectCreated:*', 's3:ObjectRemoved:*'],
         'Filter': {
             'Metadata': {
-                'FilterRules': [{'Name': meta_prefix+meta_key, 'Value': meta_value}]
+                'FilterRules': [{'Name': META_PREFIX+meta_key, 'Value': meta_value}]
             }
         }
     }]
 
     s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
-    response, status = s3_notification_conf.set_config()
+    _, status = s3_notification_conf.set_config()
     assert_equal(status/100, 2)
 
     expected_keys = []
@@ -2214,6 +2214,7 @@ def test_ps_s3_metadata_on_master():
     # but override the metadata value
     key_name = 'another_copy_of_foo'
     bucket.copy_key(key_name, bucket.name, key.name, metadata={meta_key: 'kaboom'})
+    # this key is not in the expected keys due to the different meta value
 
     # create objects in the bucket using multi-part upload
     fp = tempfile.NamedTemporaryFile(mode='w+b')
@@ -2237,7 +2238,7 @@ def test_ps_s3_metadata_on_master():
     time.sleep(5)
     # check amqp receiver
     events = receiver.get_and_reset_events()
-    assert_equal(len(events), 3) # PUT, COPY, Multipart Complete
+    assert_equal(len(events), len(expected_keys))
     for event in events:
         assert(event['Records'][0]['s3']['object']['key'] in expected_keys)
 
@@ -2247,7 +2248,104 @@ def test_ps_s3_metadata_on_master():
     print('wait for 5sec for the messages...')
     time.sleep(5)
     # check amqp receiver
-    #assert_equal(len(receiver.get_and_reset_events()), len(expected_keys))
+    events = receiver.get_and_reset_events()
+    assert_equal(len(events), len(expected_keys))
+    for event in events:
+        assert(event['Records'][0]['s3']['object']['key'] in expected_keys)
+
+    # cleanup
+    stop_amqp_receiver(receiver, task)
+    s3_notification_conf.del_config()
+    topic_conf.del_config()
+    # delete the bucket
+    conn.delete_bucket(bucket_name)
+
+
+@attr('amqp_test')
+def test_ps_s3_metadata_on_master():
+    """ test s3 notification of metadata on master """
+
+    hostname = get_ip()
+    conn = connection()
+    zonegroup = 'default'
+
+    # create bucket
+    bucket_name = gen_bucket_name()
+    bucket = conn.create_bucket(bucket_name)
+    topic_name = bucket_name + TOPIC_SUFFIX
+
+    # start amqp receivers
+    exchange = 'ex1'
+    task, receiver = create_amqp_receiver_thread(exchange, topic_name)
+    task.start()
+
+    # create s3 topic
+    endpoint_address = 'amqp://' + hostname
+    endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=routable'
+    topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
+    topic_arn = topic_conf.set_config()
+    # create s3 notification
+    notification_name = bucket_name + NOTIFICATION_SUFFIX
+    meta_key = 'meta1'
+    meta_value = 'This is my metadata value'
+    meta_prefix = 'x-amz-meta-'
+    topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn,
+        'Events': ['s3:ObjectCreated:*', 's3:ObjectRemoved:*'],
+    }]
+
+    s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
+    _, status = s3_notification_conf.set_config()
+    assert_equal(status/100, 2)
+
+    # create objects in the bucket
+    key_name = 'foo'
+    key = bucket.new_key(key_name)
+    key.set_metadata(meta_key, meta_value)
+    key.set_contents_from_string('aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa')
+    # update the object
+    another_meta_key = 'meta2'
+    key.set_metadata(another_meta_key, meta_value)
+    key.set_contents_from_string('bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb')
+
+    # create objects in the bucket using COPY
+    key_name = 'copy_of_foo'
+    bucket.copy_key(key_name, bucket.name, key.name)
+    
+    # create objects in the bucket using multi-part upload
+    fp = tempfile.NamedTemporaryFile(mode='w+b')
+    chunk_size = 1024*1024*5 # 5MB
+    object_size = 10*chunk_size
+    content = bytearray(os.urandom(object_size))
+    fp.write(content)
+    fp.flush()
+    fp.seek(0)
+    key_name = 'multipart_foo'
+    uploader = bucket.initiate_multipart_upload(key_name,
+            metadata={meta_key: meta_value})
+    for i in range(1,5):
+        uploader.upload_part_from_file(fp, i, size=chunk_size)
+        fp.seek(i*chunk_size)
+    uploader.complete_upload()
+    fp.close()
+
+    print('wait for 5sec for the messages...')
+    time.sleep(5)
+    # check amqp receiver
+    events = receiver.get_and_reset_events()
+    for event in events:
+        value = [x['val'] for x in event['Records'][0]['s3']['object']['metadata'] if x['key'] == META_PREFIX+meta_key]
+        assert_equal(value[0], meta_value)
+
+    # delete objects
+    for key in bucket.list():
+        key.delete()
+    print('wait for 5sec for the messages...')
+    time.sleep(5)
+    # check amqp receiver
+    events = receiver.get_and_reset_events()
+    for event in events:
+        value = [x['val'] for x in event['Records'][0]['s3']['object']['metadata'] if x['key'] == META_PREFIX+meta_key]
+        assert_equal(value[0], meta_value)
 
     # cleanup
     stop_amqp_receiver(receiver, task)