]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw/notifications: version id was not sent in versioned buckets 34958/head
authorYuval Lifshitz <ylifshit@redhat.com>
Fri, 8 May 2020 07:09:32 +0000 (10:09 +0300)
committerYuval Lifshitz <ylifshit@redhat.com>
Fri, 8 May 2020 07:09:32 +0000 (10:09 +0300)
Signed-off-by: Yuval Lifshitz <ylifshit@redhat.com>
Fixes: https://tracker.ceph.com/issues/45440
src/rgw/rgw_op.cc
src/test/rgw/rgw_multi/tests_ps.py

index 6e28396586ed07f8163c0811a9573c79fe88cdf9..9fe44e76d81a0b0f9235c2a48f3dd0bbe4141b03 100644 (file)
@@ -4180,7 +4180,7 @@ void RGWPutObj::execute()
   }
 
   // send request to notification manager
-  const auto ret = rgw::notify::publish(s, s->object, s->obj_size, mtime, etag, rgw::notify::ObjectCreatedPut, store);
+  const auto ret = rgw::notify::publish(s, obj.key, s->obj_size, mtime, etag, rgw::notify::ObjectCreatedPut, store);
   if (ret < 0) {
     ldpp_dout(this, 5) << "WARNING: publishing notification failed, with error: " << ret << dendl;
        // TODO: we should have conf to make send a blocking coroutine and reply with error in case sending failed
index 4dfa873e192f4d17d3761f452619e14224867c2d..ac106cd4dc651ea19869d33a83221bab4b020dbe 100644 (file)
@@ -3041,6 +3041,81 @@ def test_ps_s3_tags_on_master():
     clean_rabbitmq(proc)
 
 
+def test_ps_s3_versioning_on_master():
+    """ test s3 notification of object versions """
+    if skip_push_tests:
+        return SkipTest("PubSub push tests don't run in teuthology")
+    hostname = get_ip()
+    proc = init_rabbitmq()
+    if proc is None:
+        return SkipTest('end2end amqp tests require rabbitmq-server installed')
+    master_zone, _ = init_env(require_ps=False)
+    realm = get_realm()
+    zonegroup = realm.master_zonegroup()
+
+    # create bucket
+    bucket_name = gen_bucket_name()
+    bucket = master_zone.create_bucket(bucket_name)
+    bucket.configure_versioning(True)
+    topic_name = bucket_name + TOPIC_SUFFIX
+
+    # start amqp receiver
+    exchange = 'ex1'
+    task, receiver = create_amqp_receiver_thread(exchange, topic_name)
+    task.start()
+
+    # create s3 topic
+    endpoint_address = 'amqp://' + hostname
+    endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=broker'
+    topic_conf = PSTopicS3(master_zone.conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
+    topic_arn = topic_conf.set_config()
+    # create notification
+    notification_name = bucket_name + NOTIFICATION_SUFFIX
+    topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn,
+                        'Events': []
+                       }]
+    s3_notification_conf = PSNotificationS3(master_zone.conn, bucket_name, topic_conf_list)
+    _, status = s3_notification_conf.set_config()
+    assert_equal(status/100, 2)
+
+    # create objects in the bucket
+    key_value = 'foo'
+    key = bucket.new_key(key_value)
+    key.set_contents_from_string('hello')
+    ver1 = key.version_id
+    key.set_contents_from_string('world')
+    ver2 = key.version_id
+
+    print('wait for 5sec for the messages...')
+    time.sleep(5)
+
+    # check amqp receiver
+    events = receiver.get_and_reset_events()
+    num_of_versions = 0
+    for event_list in events:
+        for event in event_list['Records']:
+            assert_equal(event['s3']['object']['key'], key_value)
+            version = event['s3']['object']['versionId']
+            num_of_versions += 1
+            if version not in (ver1, ver2):
+                print('version mismatch: '+version+' not in: ('+ver1+', '+ver2+')')
+                assert_equal(1, 0)
+            else:
+                print('version ok: '+version+' in: ('+ver1+', '+ver2+')')
+
+    assert_equal(num_of_versions, 2)
+
+    # cleanup
+    stop_amqp_receiver(receiver, task)
+    s3_notification_conf.del_config()
+    topic_conf.del_config()
+    # delete the bucket
+    bucket.delete_key(key.name, version_id=ver2)
+    bucket.delete_key(key.name, version_id=ver1)
+    master_zone.delete_bucket(bucket_name)
+    clean_rabbitmq(proc)
+
+
 def test_ps_s3_versioned_deletion_on_master():
     """ test s3 notification of deletion markers on master """
     if skip_push_tests:
@@ -3071,7 +3146,6 @@ def test_ps_s3_versioned_deletion_on_master():
     topic_arn = topic_conf.set_config()
     # create s3 notification
     notification_name = bucket_name + NOTIFICATION_SUFFIX
-    # TODO use s3:ObjectRemoved:DeleteMarkerCreated once supported in the code
     topic_conf_list = [{'Id': notification_name+'_1', 'TopicArn': topic_arn,
                         'Events': ['s3:ObjectRemoved:*']
                        },