From: Yuval Lifshitz Date: Fri, 8 May 2020 07:09:32 +0000 (+0300) Subject: rgw/notifications: version id was not sent in versioned buckets X-Git-Tag: v17.0.0~2404^2 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=refs%2Fpull%2F34958%2Fhead;p=ceph.git rgw/notifications: version id was not sent in versioned buckets Signed-off-by: Yuval Lifshitz Fixes: https://tracker.ceph.com/issues/45440 --- diff --git a/src/rgw/rgw_op.cc b/src/rgw/rgw_op.cc index 6e28396586ed0..9fe44e76d81a0 100644 --- a/src/rgw/rgw_op.cc +++ b/src/rgw/rgw_op.cc @@ -4180,7 +4180,7 @@ void RGWPutObj::execute() } // send request to notification manager - const auto ret = rgw::notify::publish(s, s->object, s->obj_size, mtime, etag, rgw::notify::ObjectCreatedPut, store); + const auto ret = rgw::notify::publish(s, obj.key, s->obj_size, mtime, etag, rgw::notify::ObjectCreatedPut, store); if (ret < 0) { ldpp_dout(this, 5) << "WARNING: publishing notification failed, with error: " << ret << dendl; // TODO: we should have conf to make send a blocking coroutine and reply with error in case sending failed diff --git a/src/test/rgw/rgw_multi/tests_ps.py b/src/test/rgw/rgw_multi/tests_ps.py index 4dfa873e192f4..ac106cd4dc651 100644 --- a/src/test/rgw/rgw_multi/tests_ps.py +++ b/src/test/rgw/rgw_multi/tests_ps.py @@ -3041,6 +3041,81 @@ def test_ps_s3_tags_on_master(): clean_rabbitmq(proc) +def test_ps_s3_versioning_on_master(): + """ test s3 notification of object versions """ + if skip_push_tests: + return SkipTest("PubSub push tests don't run in teuthology") + hostname = get_ip() + proc = init_rabbitmq() + if proc is None: + return SkipTest('end2end amqp tests require rabbitmq-server installed') + master_zone, _ = init_env(require_ps=False) + realm = get_realm() + zonegroup = realm.master_zonegroup() + + # create bucket + bucket_name = gen_bucket_name() + bucket = master_zone.create_bucket(bucket_name) + bucket.configure_versioning(True) + topic_name = bucket_name + TOPIC_SUFFIX + + # start amqp receiver + exchange = 'ex1' + task, receiver = create_amqp_receiver_thread(exchange, topic_name) + task.start() + + # create s3 topic + endpoint_address = 'amqp://' + hostname + endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=broker' + topic_conf = PSTopicS3(master_zone.conn, topic_name, zonegroup.name, endpoint_args=endpoint_args) + topic_arn = topic_conf.set_config() + # create notification + notification_name = bucket_name + NOTIFICATION_SUFFIX + topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn, + 'Events': [] + }] + s3_notification_conf = PSNotificationS3(master_zone.conn, bucket_name, topic_conf_list) + _, status = s3_notification_conf.set_config() + assert_equal(status/100, 2) + + # create objects in the bucket + key_value = 'foo' + key = bucket.new_key(key_value) + key.set_contents_from_string('hello') + ver1 = key.version_id + key.set_contents_from_string('world') + ver2 = key.version_id + + print('wait for 5sec for the messages...') + time.sleep(5) + + # check amqp receiver + events = receiver.get_and_reset_events() + num_of_versions = 0 + for event_list in events: + for event in event_list['Records']: + assert_equal(event['s3']['object']['key'], key_value) + version = event['s3']['object']['versionId'] + num_of_versions += 1 + if version not in (ver1, ver2): + print('version mismatch: '+version+' not in: ('+ver1+', '+ver2+')') + assert_equal(1, 0) + else: + print('version ok: '+version+' in: ('+ver1+', '+ver2+')') + + assert_equal(num_of_versions, 2) + + # cleanup + stop_amqp_receiver(receiver, task) + s3_notification_conf.del_config() + topic_conf.del_config() + # delete the bucket + bucket.delete_key(key.name, version_id=ver2) + bucket.delete_key(key.name, version_id=ver1) + master_zone.delete_bucket(bucket_name) + clean_rabbitmq(proc) + + def test_ps_s3_versioned_deletion_on_master(): """ test s3 notification of deletion markers on master """ if skip_push_tests: @@ -3071,7 +3146,6 @@ def test_ps_s3_versioned_deletion_on_master(): topic_arn = topic_conf.set_config() # create s3 notification notification_name = bucket_name + NOTIFICATION_SUFFIX - # TODO use s3:ObjectRemoved:DeleteMarkerCreated once supported in the code topic_conf_list = [{'Id': notification_name+'_1', 'TopicArn': topic_arn, 'Events': ['s3:ObjectRemoved:*'] },