From 754f7edbb8156727de45c17c03826c9a0df6dbb8 Mon Sep 17 00:00:00 2001 From: Yuval Lifshitz Date: Tue, 10 Dec 2019 20:22:43 +0200 Subject: [PATCH] rgw/pubsub: send notifications from multi-delete op Signed-off-by: Yuval Lifshitz --- src/rgw/rgw_notify.cc | 12 +++-- src/rgw/rgw_notify.h | 3 ++ src/rgw/rgw_op.cc | 26 +++++++--- src/test/rgw/rgw_multi/tests_ps.py | 83 ++++++++++++++++++++++++++++-- src/test/rgw/rgw_multi/zone_ps.py | 15 ++++++ 5 files changed, 126 insertions(+), 13 deletions(-) diff --git a/src/rgw/rgw_notify.cc b/src/rgw/rgw_notify.cc index 79ac062df7a..25b5c533bfd 100644 --- a/src/rgw/rgw_notify.cc +++ b/src/rgw/rgw_notify.cc @@ -15,6 +15,8 @@ namespace rgw::notify { // populate record from request void populate_record_from_request(const req_state *s, + const rgw_obj_key& key, + uint64_t size, const ceph::real_time& mtime, const std::string& etag, EventType event_type, @@ -28,10 +30,10 @@ void populate_record_from_request(const req_state *s, record.bucket_name = s->bucket_name; record.bucket_ownerIdentity = s->bucket_owner.get_id().id; record.bucket_arn = to_string(rgw::ARN(s->bucket)); - record.object_key = s->object.name; - record.object_size = s->obj_size; + record.object_key = key.name; + record.object_size = size; record.object_etag = etag; - record.object_versionId = s->object.instance; + record.object_versionId = key.instance; // use timestamp as per key sequence id (hex encoded) const utime_t ts(real_clock::now()); boost::algorithm::hex((const char*)&ts, (const char*)&ts + sizeof(utime_t), @@ -62,6 +64,8 @@ bool match(const rgw_pubsub_topic_filter& filter, const req_state* s, EventType } int publish(const req_state* s, + const rgw_obj_key& key, + uint64_t size, const ceph::real_time& mtime, const std::string& etag, EventType event_type, @@ -75,7 +79,7 @@ int publish(const req_state* s, return rc; } rgw_pubsub_s3_record record; - populate_record_from_request(s, mtime, etag, event_type, record); + populate_record_from_request(s, key, size, mtime, etag, event_type, record); bool event_handled = false; bool event_should_be_handled = false; for (const auto& bucket_topic : bucket_topics.topics) { diff --git a/src/rgw/rgw_notify.h b/src/rgw/rgw_notify.h index 4711c30aaa8..0dc666b365e 100644 --- a/src/rgw/rgw_notify.h +++ b/src/rgw/rgw_notify.h @@ -14,11 +14,14 @@ namespace rgw::sal { } class RGWRados; class req_state; +struct rgw_obj_key; namespace rgw::notify { // publish notification int publish(const req_state* s, + const rgw_obj_key& key, + uint64_t size, const ceph::real_time& mtime, const std::string& etag, EventType event_type, diff --git a/src/rgw/rgw_op.cc b/src/rgw/rgw_op.cc index dbffcfb1a6e..8ec3ec0f12e 100644 --- a/src/rgw/rgw_op.cc +++ b/src/rgw/rgw_op.cc @@ -4061,7 +4061,7 @@ void RGWPutObj::execute() } // send request to notification manager - const auto ret = rgw::notify::publish(s, mtime, etag, rgw::notify::ObjectCreatedPut, store); + const auto ret = rgw::notify::publish(s, s->object, s->obj_size, mtime, etag, rgw::notify::ObjectCreatedPut, store); if (ret < 0) { ldpp_dout(this, 5) << "WARNING: publishing notification failed, with error: " << ret << dendl; // TODO: we should have conf to make send a blocking coroutine and reply with error in case sending failed @@ -4289,7 +4289,7 @@ void RGWPostObj::execute() } } while (is_next_file_to_upload()); - const auto ret = rgw::notify::publish(s, ceph::real_clock::now(), etag, rgw::notify::ObjectCreatedPost, store); + const auto ret = rgw::notify::publish(s, s->object, s->obj_size, ceph::real_clock::now(), etag, rgw::notify::ObjectCreatedPost, store); if (ret < 0) { ldpp_dout(this, 5) << "WARNING: publishing notification failed, with error: " << ret << dendl; // TODO: we should have conf to make send a blocking coroutine and reply with error in case sending failed @@ -4851,7 +4851,7 @@ void RGWDeleteObj::execute() op_ret = -EINVAL; } - const auto ret = rgw::notify::publish(s, ceph::real_clock::now(), attrs[RGW_ATTR_ETAG].to_str(), + const auto ret = rgw::notify::publish(s, s->object, s->obj_size, ceph::real_clock::now(), attrs[RGW_ATTR_ETAG].to_str(), delete_marker && s->object.instance.empty() ? rgw::notify::ObjectRemovedDeleteMarkerCreated : rgw::notify::ObjectRemovedDelete, store); if (ret < 0) { @@ -5165,7 +5165,7 @@ void RGWCopyObj::execute() this, s->yield); - const auto ret = rgw::notify::publish(s, mtime, etag, rgw::notify::ObjectCreatedCopy, store); + const auto ret = rgw::notify::publish(s, s->object, s->obj_size, mtime, etag, rgw::notify::ObjectCreatedCopy, store); if (ret < 0) { ldpp_dout(this, 5) << "WARNING: publishing notification failed, with error: " << ret << dendl; // TODO: we should have conf to make send a blocking coroutine and reply with error in case sending failed @@ -5843,7 +5843,7 @@ void RGWInitMultipart::execute() op_ret = obj_op.write_meta(bl.length(), 0, attrs, s->yield); } while (op_ret == -EEXIST); - const auto ret = rgw::notify::publish(s, ceph::real_clock::now(), attrs[RGW_ATTR_ETAG].to_str(), rgw::notify::ObjectCreatedPost, store); + const auto ret = rgw::notify::publish(s, s->object, s->obj_size, ceph::real_clock::now(), attrs[RGW_ATTR_ETAG].to_str(), rgw::notify::ObjectCreatedPost, store); if (ret < 0) { ldpp_dout(this, 5) << "WARNING: publishing notification failed, with error: " << ret << dendl; // TODO: we should have conf to make send a blocking coroutine and reply with error in case sending failed @@ -6157,7 +6157,7 @@ void RGWCompleteMultipart::execute() ldpp_dout(this, 0) << "WARNING: failed to remove object " << meta_obj << dendl; } - const auto ret = rgw::notify::publish(s, ceph::real_clock::now(), etag, rgw::notify::ObjectCreatedCompleteMultipartUpload, store); + const auto ret = rgw::notify::publish(s, s->object, s->obj_size, ceph::real_clock::now(), etag, rgw::notify::ObjectCreatedCompleteMultipartUpload, store); if (ret < 0) { ldpp_dout(this, 5) << "WARNING: publishing notification failed, with error: " << ret << dendl; // TODO: we should have conf to make send a blocking coroutine and reply with error in case sending failed @@ -6513,6 +6513,20 @@ void RGWDeleteMultiObj::execute() send_partial_response(*iter, del_op.result.delete_marker, del_op.result.version_id, op_ret); + + const auto obj_state = obj_ctx->get_state(obj); + bufferlist etag_bl; + const auto etag = obj_state->get_attr(RGW_ATTR_ETAG, etag_bl) ? etag_bl.to_str() : ""; + + const auto ret = rgw::notify::publish(s, obj.key, obj_state->size, ceph::real_clock::now(), etag, + del_op.result.delete_marker && s->object.instance.empty() ? rgw::notify::ObjectRemovedDeleteMarkerCreated : rgw::notify::ObjectRemovedDelete, + store); + if (ret < 0) { + ldpp_dout(this, 5) << "WARNING: publishing notification failed, with error: " << ret << dendl; + // TODO: we should have conf to make send a blocking coroutine and reply with error in case sending failed + // this should be global conf (probably returnign a different handler) + // so we don't need to read the configured values before we perform it + } } /* set the return code to zero, errors at this point will be diff --git a/src/test/rgw/rgw_multi/tests_ps.py b/src/test/rgw/rgw_multi/tests_ps.py index 4eb5bb89140..27916a70378 100644 --- a/src/test/rgw/rgw_multi/tests_ps.py +++ b/src/test/rgw/rgw_multi/tests_ps.py @@ -9,6 +9,7 @@ import subprocess import socket import time import os +from random import randint from .tests import get_realm, \ ZonegroupConns, \ zonegroup_meta_checkpoint, \ @@ -28,7 +29,8 @@ from .zone_ps import PSTopic, \ print_connection_info, \ delete_all_s3_topics, \ put_object_tagging, \ - get_object_tagging + get_object_tagging, \ + delete_all_objects from multisite import User from nose import SkipTest from nose.tools import assert_not_equal, assert_equal @@ -157,6 +159,13 @@ class StreamingHTTPServer: for worker in self.workers: worker.close() worker.join() + + def get_and_reset_events(self): + events = [] + for worker in self.workers: + events += worker.get_events() + worker.reset_events() + return events # AMQP endpoint functions @@ -1719,6 +1728,74 @@ def test_ps_s3_notification_push_kafka_security_ssl_sasl(): kafka_security('SSL_SASL') +def test_ps_s3_notification_multi_delete_on_master(): + """ test deletion of multiple keys on master """ + if skip_push_tests: + return SkipTest("PubSub push tests don't run in teuthology") + hostname = get_ip() + zones, _ = init_env(require_ps=False) + realm = get_realm() + zonegroup = realm.master_zonegroup() + + # create random port for the http server + host = get_ip() + port = random.randint(10000, 20000) + # start an http server in a separate thread + number_of_objects = 10 + http_server = StreamingHTTPServer(host, port, num_workers=number_of_objects) + + # create bucket + bucket_name = gen_bucket_name() + bucket = zones[0].create_bucket(bucket_name) + topic_name = bucket_name + TOPIC_SUFFIX + + # create s3 topic + endpoint_address = 'http://'+host+':'+str(port) + endpoint_args = 'push-endpoint='+endpoint_address + topic_conf = PSTopicS3(zones[0].conn, topic_name, zonegroup.name, endpoint_args=endpoint_args) + topic_arn = topic_conf.set_config() + # create s3 notification + notification_name = bucket_name + NOTIFICATION_SUFFIX + topic_conf_list = [{'Id': notification_name, + 'TopicArn': topic_arn, + 'Events': ['s3:ObjectRemoved:*'] + }] + s3_notification_conf = PSNotificationS3(zones[0].conn, bucket_name, topic_conf_list) + response, status = s3_notification_conf.set_config() + assert_equal(status/100, 2) + + # create objects in the bucket + client_threads = [] + for i in range(number_of_objects): + obj_size = randint(1, 1024) + content = str(os.urandom(obj_size)) + key = bucket.new_key(str(i)) + thr = threading.Thread(target = set_contents_from_string, args=(key, content,)) + thr.start() + client_threads.append(thr) + [thr.join() for thr in client_threads] + + keys = list(bucket.list()) + + start_time = time.time() + delete_all_objects(zones[0].conn, bucket_name) + time_diff = time.time() - start_time + print 'average time for deletion + http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds' + + print('wait for 5sec for the messages...') + time.sleep(5) + + # check http receiver + http_server.verify_s3_events(keys, exact_match=True, deletions=True) + + # cleanup + topic_conf.del_config() + s3_notification_conf.del_config(notification=notification_name) + # delete the bucket + zones[0].delete_bucket(bucket_name) + http_server.close() + + def test_ps_s3_notification_push_http_on_master(): """ test pushing http s3 notification on master """ if skip_push_tests: @@ -2888,7 +2965,7 @@ def test_ps_s3_tags_on_master(): expected_tags = [{'val': 'world', 'key': 'hello'}, {'val': 'boom', 'key': 'ka'}] # check amqp receiver for event in receiver.get_and_reset_events(): - obj_tags = event['s3']['object']['tags'] + obj_tags = event['Records'][0]['s3']['object']['tags'] assert_equal(obj_tags[0], expected_tags[0]) # delete the objects @@ -2898,7 +2975,7 @@ def test_ps_s3_tags_on_master(): time.sleep(5) # check amqp receiver for event in receiver.get_and_reset_events(): - obj_tags = event['s3']['object']['tags'] + obj_tags = event['Records'][0]['s3']['object']['tags'] assert_equal(obj_tags[0], expected_tags[0]) # cleanup diff --git a/src/test/rgw/rgw_multi/zone_ps.py b/src/test/rgw/rgw_multi/zone_ps.py index 090fbc789fc..d5786d25fbf 100644 --- a/src/test/rgw/rgw_multi/zone_ps.py +++ b/src/test/rgw/rgw_multi/zone_ps.py @@ -171,6 +171,21 @@ def delete_all_s3_topics(zone, region): print 'failed to do topic cleanup: ' + str(err) +def delete_all_objects(conn, bucket_name): + client = boto3.client('s3', + endpoint_url='http://'+conn.host+':'+str(conn.port), + aws_access_key_id=conn.aws_access_key_id, + aws_secret_access_key=conn.aws_secret_access_key) + + objects = [] + for key in client.list_objects(Bucket=bucket_name)['Contents']: + objects.append({'Key': key['Key']}) + # delete objects from the bucket + response = client.delete_objects(Bucket=bucket_name, + Delete={'Objects': objects}) + return response + + class PSTopicS3: """class to set/list/get/delete a topic POST ?Action=CreateTopic&Name=[&OpaqueData=[&push-endpoint=&[=...]]] -- 2.39.5