]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw/notifications: version id was not sent in versioned buckets 35181/head
authorYuval Lifshitz <ylifshit@redhat.com>
Fri, 8 May 2020 07:09:32 +0000 (10:09 +0300)
committerNathan Cutler <ncutler@suse.com>
Thu, 21 May 2020 17:44:28 +0000 (19:44 +0200)
Signed-off-by: Yuval Lifshitz <ylifshit@redhat.com>
Fixes: https://tracker.ceph.com/issues/45440
(cherry picked from commit 973b0b1ed8767767f0c4446d97248c1b8002f1d3)

src/rgw/rgw_op.cc
src/test/rgw/rgw_multi/tests_ps.py

index ae666a726f5101fad68966e9f8e9f83bc10eb79e..a0c0e81801e5176dd0d2faa4ad2d6f2fafd06ba9 100644 (file)
@@ -4003,7 +4003,7 @@ void RGWPutObj::execute()
   }
 
   // send request to notification manager
-  const auto ret = rgw::notify::publish(s, s->object, s->obj_size, mtime, etag, rgw::notify::ObjectCreatedPut, store);
+  const auto ret = rgw::notify::publish(s, obj.key, s->obj_size, mtime, etag, rgw::notify::ObjectCreatedPut, store);
   if (ret < 0) {
     ldpp_dout(this, 5) << "WARNING: publishing notification failed, with error: " << ret << dendl;
        // TODO: we should have conf to make send a blocking coroutine and reply with error in case sending failed
index 5e5fa8f8345399d546f69daf9f37f5192033f069..dc3da5b57df10272bf981dd6e88193d7800ed02c 100644 (file)
@@ -2987,6 +2987,81 @@ def test_ps_s3_tags_on_master():
     clean_rabbitmq(proc)
 
 
+def test_ps_s3_versioning_on_master():
+    """ test s3 notification of object versions """
+    if skip_push_tests:
+        return SkipTest("PubSub push tests don't run in teuthology")
+    hostname = get_ip()
+    proc = init_rabbitmq()
+    if proc is None:
+        return SkipTest('end2end amqp tests require rabbitmq-server installed')
+    master_zone, _ = init_env(require_ps=False)
+    realm = get_realm()
+    zonegroup = realm.master_zonegroup()
+
+    # create bucket
+    bucket_name = gen_bucket_name()
+    bucket = master_zone.create_bucket(bucket_name)
+    bucket.configure_versioning(True)
+    topic_name = bucket_name + TOPIC_SUFFIX
+
+    # start amqp receiver
+    exchange = 'ex1'
+    task, receiver = create_amqp_receiver_thread(exchange, topic_name)
+    task.start()
+
+    # create s3 topic
+    endpoint_address = 'amqp://' + hostname
+    endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=broker'
+    topic_conf = PSTopicS3(master_zone.conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
+    topic_arn = topic_conf.set_config()
+    # create notification
+    notification_name = bucket_name + NOTIFICATION_SUFFIX
+    topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn,
+                        'Events': []
+                       }]
+    s3_notification_conf = PSNotificationS3(master_zone.conn, bucket_name, topic_conf_list)
+    _, status = s3_notification_conf.set_config()
+    assert_equal(status/100, 2)
+
+    # create objects in the bucket
+    key_value = 'foo'
+    key = bucket.new_key(key_value)
+    key.set_contents_from_string('hello')
+    ver1 = key.version_id
+    key.set_contents_from_string('world')
+    ver2 = key.version_id
+
+    print('wait for 5sec for the messages...')
+    time.sleep(5)
+
+    # check amqp receiver
+    events = receiver.get_and_reset_events()
+    num_of_versions = 0
+    for event_list in events:
+        for event in event_list['Records']:
+            assert_equal(event['s3']['object']['key'], key_value)
+            version = event['s3']['object']['versionId']
+            num_of_versions += 1
+            if version not in (ver1, ver2):
+                print('version mismatch: '+version+' not in: ('+ver1+', '+ver2+')')
+                assert_equal(1, 0)
+            else:
+                print('version ok: '+version+' in: ('+ver1+', '+ver2+')')
+
+    assert_equal(num_of_versions, 2)
+
+    # cleanup
+    stop_amqp_receiver(receiver, task)
+    s3_notification_conf.del_config()
+    topic_conf.del_config()
+    # delete the bucket
+    bucket.delete_key(key.name, version_id=ver2)
+    bucket.delete_key(key.name, version_id=ver1)
+    master_zone.delete_bucket(bucket_name)
+    clean_rabbitmq(proc)
+
+
 def test_ps_s3_versioned_deletion_on_master():
     """ test s3 notification of deletion markers on master """
     if skip_push_tests:
@@ -3017,7 +3092,6 @@ def test_ps_s3_versioned_deletion_on_master():
     topic_arn = topic_conf.set_config()
     # create s3 notification
     notification_name = bucket_name + NOTIFICATION_SUFFIX
-    # TODO use s3:ObjectRemoved:DeleteMarkerCreated once supported in the code
     topic_conf_list = [{'Id': notification_name+'_1', 'TopicArn': topic_arn,
                         'Events': ['s3:ObjectRemoved:*']
                        },