}
// send request to notification manager
- const auto ret = rgw::notify::publish(s, s->object, s->obj_size, mtime, etag, rgw::notify::ObjectCreatedPut, store);
+ const auto ret = rgw::notify::publish(s, obj.key, s->obj_size, mtime, etag, rgw::notify::ObjectCreatedPut, store);
if (ret < 0) {
ldpp_dout(this, 5) << "WARNING: publishing notification failed, with error: " << ret << dendl;
// TODO: we should have conf to make send a blocking coroutine and reply with error in case sending failed
clean_rabbitmq(proc)
+def test_ps_s3_versioning_on_master():
+ """ test s3 notification of object versions """
+ if skip_push_tests:
+ return SkipTest("PubSub push tests don't run in teuthology")
+ hostname = get_ip()
+ proc = init_rabbitmq()
+ if proc is None:
+ return SkipTest('end2end amqp tests require rabbitmq-server installed')
+ master_zone, _ = init_env(require_ps=False)
+ realm = get_realm()
+ zonegroup = realm.master_zonegroup()
+
+ # create bucket
+ bucket_name = gen_bucket_name()
+ bucket = master_zone.create_bucket(bucket_name)
+ bucket.configure_versioning(True)
+ topic_name = bucket_name + TOPIC_SUFFIX
+
+ # start amqp receiver
+ exchange = 'ex1'
+ task, receiver = create_amqp_receiver_thread(exchange, topic_name)
+ task.start()
+
+ # create s3 topic
+ endpoint_address = 'amqp://' + hostname
+ endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=broker'
+ topic_conf = PSTopicS3(master_zone.conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
+ topic_arn = topic_conf.set_config()
+ # create notification
+ notification_name = bucket_name + NOTIFICATION_SUFFIX
+ topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn,
+ 'Events': []
+ }]
+ s3_notification_conf = PSNotificationS3(master_zone.conn, bucket_name, topic_conf_list)
+ _, status = s3_notification_conf.set_config()
+ assert_equal(status/100, 2)
+
+ # create objects in the bucket
+ key_value = 'foo'
+ key = bucket.new_key(key_value)
+ key.set_contents_from_string('hello')
+ ver1 = key.version_id
+ key.set_contents_from_string('world')
+ ver2 = key.version_id
+
+ print('wait for 5sec for the messages...')
+ time.sleep(5)
+
+ # check amqp receiver
+ events = receiver.get_and_reset_events()
+ num_of_versions = 0
+ for event_list in events:
+ for event in event_list['Records']:
+ assert_equal(event['s3']['object']['key'], key_value)
+ version = event['s3']['object']['versionId']
+ num_of_versions += 1
+ if version not in (ver1, ver2):
+ print('version mismatch: '+version+' not in: ('+ver1+', '+ver2+')')
+ assert_equal(1, 0)
+ else:
+ print('version ok: '+version+' in: ('+ver1+', '+ver2+')')
+
+ assert_equal(num_of_versions, 2)
+
+ # cleanup
+ stop_amqp_receiver(receiver, task)
+ s3_notification_conf.del_config()
+ topic_conf.del_config()
+ # delete the bucket
+ bucket.delete_key(key.name, version_id=ver2)
+ bucket.delete_key(key.name, version_id=ver1)
+ master_zone.delete_bucket(bucket_name)
+ clean_rabbitmq(proc)
+
+
def test_ps_s3_versioned_deletion_on_master():
""" test s3 notification of deletion markers on master """
if skip_push_tests:
topic_arn = topic_conf.set_config()
# create s3 notification
notification_name = bucket_name + NOTIFICATION_SUFFIX
- # TODO use s3:ObjectRemoved:DeleteMarkerCreated once supported in the code
topic_conf_list = [{'Id': notification_name+'_1', 'TopicArn': topic_arn,
'Events': ['s3:ObjectRemoved:*']
},