]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw/pubsub: send notifications from multi-delete op 34107/head
authorYuval Lifshitz <yuvalif@yahoo.com>
Tue, 10 Dec 2019 18:22:43 +0000 (20:22 +0200)
committerYuval Lifshitz <ylifshit@redhat.com>
Sun, 26 Apr 2020 11:22:29 +0000 (14:22 +0300)
Signed-off-by: Yuval Lifshitz <yuvalif@yahoo.com>
(cherry picked from commit 754f7edbb8156727de45c17c03826c9a0df6dbb8)
Signed-off-by: Yuval Lifshitz <yuvalif@yahoo.com>
src/rgw/rgw_notify.cc
src/rgw/rgw_notify.h
src/rgw/rgw_op.cc
src/test/rgw/rgw_multi/tests_ps.py
src/test/rgw/rgw_multi/zone_ps.py

index 55da57799ae92ac506ec36f0212d7d7c7195db29..2104031a2fd7f678ff6000938d91b5e8260a61f1 100644 (file)
@@ -15,6 +15,8 @@ namespace rgw::notify {
 
 // populate record from request
 void populate_record_from_request(const req_state *s, 
+        const rgw_obj_key& key,
+        uint64_t size,
         const ceph::real_time& mtime, 
         const std::string& etag, 
         EventType event_type,
@@ -28,10 +30,10 @@ void populate_record_from_request(const req_state *s,
   record.bucket_name = s->bucket_name;
   record.bucket_ownerIdentity = s->bucket_owner.get_id().id;
   record.bucket_arn = to_string(rgw::ARN(s->bucket));
-  record.object_key = s->object.name;
-  record.object_size = s->obj_size;
+  record.object_key = key.name;
+  record.object_size = size;
   record.object_etag = etag;
-  record.object_versionId = s->object.instance;
+  record.object_versionId = key.instance;
   // use timestamp as per key sequence id (hex encoded)
   const utime_t ts(real_clock::now());
   boost::algorithm::hex((const char*)&ts, (const char*)&ts + sizeof(utime_t), 
@@ -62,6 +64,8 @@ bool match(const rgw_pubsub_topic_filter& filter, const req_state* s, EventType
 }
 
 int publish(const req_state* s, 
+        const rgw_obj_key& key,
+        uint64_t size,
         const ceph::real_time& mtime, 
         const std::string& etag, 
         EventType event_type,
@@ -75,7 +79,7 @@ int publish(const req_state* s,
         return rc;
     }
     rgw_pubsub_s3_record record;
-    populate_record_from_request(s, mtime, etag, event_type, record);
+    populate_record_from_request(s, key, size, mtime, etag, event_type, record);
     bool event_handled = false;
     bool event_should_be_handled = false;
     for (const auto& bucket_topic : bucket_topics.topics) {
index 486e7dffa4c32a057a36011ab3b36a13742ad86b..5b480c0e39c49bb6be63d556dcf69be07f64cccd 100644 (file)
 // forward declarations
 class RGWRados;
 class req_state;
+struct rgw_obj_key;
 
 namespace rgw::notify {
 
 // publish notification
 int publish(const req_state* s, 
+        const rgw_obj_key& key,
+        uint64_t size,
         const ceph::real_time& mtime, 
         const std::string& etag, 
         EventType event_type,
index 5bc76c4d6b45f0c22538991362f0dc4dd6515693..72bf870a5736b8195d9b9926c5aef264694b60d8 100644 (file)
@@ -4003,7 +4003,7 @@ void RGWPutObj::execute()
   }
 
   // send request to notification manager
-  const auto ret = rgw::notify::publish(s, mtime, etag, rgw::notify::ObjectCreatedPut, store);
+  const auto ret = rgw::notify::publish(s, s->object, s->obj_size, mtime, etag, rgw::notify::ObjectCreatedPut, store);
   if (ret < 0) {
     ldpp_dout(this, 5) << "WARNING: publishing notification failed, with error: " << ret << dendl;
        // TODO: we should have conf to make send a blocking coroutine and reply with error in case sending failed
@@ -4239,7 +4239,7 @@ void RGWPostObj::execute()
     }
   } while (is_next_file_to_upload());
 
-  const auto ret = rgw::notify::publish(s, ceph::real_clock::now(), etag, rgw::notify::ObjectCreatedPost, store);
+  const auto ret = rgw::notify::publish(s, s->object, s->obj_size, ceph::real_clock::now(), etag, rgw::notify::ObjectCreatedPost, store);
   if (ret < 0) {
     ldpp_dout(this, 5) << "WARNING: publishing notification failed, with error: " << ret << dendl;
        // TODO: we should have conf to make send a blocking coroutine and reply with error in case sending failed
@@ -4792,7 +4792,7 @@ void RGWDeleteObj::execute()
     op_ret = -EINVAL;
   }
 
-  const auto ret = rgw::notify::publish(s, ceph::real_clock::now(), attrs[RGW_ATTR_ETAG].to_str(),
+  const auto ret = rgw::notify::publish(s, s->object, s->obj_size, ceph::real_clock::now(), attrs[RGW_ATTR_ETAG].to_str(),
           delete_marker && s->object.instance.empty() ? rgw::notify::ObjectRemovedDeleteMarkerCreated : rgw::notify::ObjectRemovedDelete,
           store);
   if (ret < 0) {
@@ -5102,7 +5102,7 @@ void RGWCopyObj::execute()
                           copy_obj_progress_cb, (void *)this
     );
 
-  const auto ret = rgw::notify::publish(s, mtime, etag, rgw::notify::ObjectCreatedCopy, store);
+  const auto ret = rgw::notify::publish(s, s->object, s->obj_size, mtime, etag, rgw::notify::ObjectCreatedCopy, store);
   if (ret < 0) {
     ldpp_dout(this, 5) << "WARNING: publishing notification failed, with error: " << ret << dendl;
        // TODO: we should have conf to make send a blocking coroutine and reply with error in case sending failed
@@ -5765,7 +5765,7 @@ void RGWInitMultipart::execute()
     op_ret = obj_op.write_meta(bl.length(), 0, attrs);
   } while (op_ret == -EEXIST);
   
-  const auto ret = rgw::notify::publish(s, ceph::real_clock::now(), attrs[RGW_ATTR_ETAG].to_str(), rgw::notify::ObjectCreatedPost, store);
+  const auto ret = rgw::notify::publish(s, s->object, s->obj_size, ceph::real_clock::now(), attrs[RGW_ATTR_ETAG].to_str(), rgw::notify::ObjectCreatedPost, store);
   if (ret < 0) {
     ldpp_dout(this, 5) << "WARNING: publishing notification failed, with error: " << ret << dendl;
        // TODO: we should have conf to make send a blocking coroutine and reply with error in case sending failed
@@ -6079,7 +6079,7 @@ void RGWCompleteMultipart::execute()
     ldpp_dout(this, 0) << "WARNING: failed to remove object " << meta_obj << dendl;
   }
   
-  const auto ret = rgw::notify::publish(s, ceph::real_clock::now(), etag, rgw::notify::ObjectCreatedCompleteMultipartUpload, store);
+  const auto ret = rgw::notify::publish(s, s->object, s->obj_size, ceph::real_clock::now(), etag, rgw::notify::ObjectCreatedCompleteMultipartUpload, store);
   if (ret < 0) {
     ldpp_dout(this, 5) << "WARNING: publishing notification failed, with error: " << ret << dendl;
        // TODO: we should have conf to make send a blocking coroutine and reply with error in case sending failed
@@ -6435,6 +6435,20 @@ void RGWDeleteMultiObj::execute()
 
     send_partial_response(*iter, del_op.result.delete_marker,
                          del_op.result.version_id, op_ret);
+
+    const auto obj_state = obj_ctx->get_state(obj);
+    bufferlist etag_bl;
+    const auto etag = obj_state->get_attr(RGW_ATTR_ETAG, etag_bl) ? etag_bl.to_str() : "";
+
+    const auto ret = rgw::notify::publish(s, obj.key, obj_state->size, ceph::real_clock::now(), etag, 
+            del_op.result.delete_marker && s->object.instance.empty() ? rgw::notify::ObjectRemovedDeleteMarkerCreated : rgw::notify::ObjectRemovedDelete,
+            store);
+    if (ret < 0) {
+        ldpp_dout(this, 5) << "WARNING: publishing notification failed, with error: " << ret << dendl;
+           // TODO: we should have conf to make send a blocking coroutine and reply with error in case sending failed
+           // this should be global conf (probably returnign a different handler)
+        // so we don't need to read the configured values before we perform it
+    }
   }
 
   /*  set the return code to zero, errors at this point will be
index b5df8817cd6b40382e90868f994751404bfcb432..5e5fa8f8345399d546f69daf9f37f5192033f069 100644 (file)
@@ -9,6 +9,7 @@ import subprocess
 import socket
 import time
 import os
+from random import randint
 from .tests import get_realm, \
     ZonegroupConns, \
     zonegroup_meta_checkpoint, \
@@ -28,7 +29,8 @@ from .zone_ps import PSTopic, \
     print_connection_info, \
     delete_all_s3_topics, \
     put_object_tagging, \
-    get_object_tagging
+    get_object_tagging, \
+    delete_all_objects
 from multisite import User
 from nose import SkipTest
 from nose.tools import assert_not_equal, assert_equal
@@ -157,6 +159,13 @@ class StreamingHTTPServer:
         for worker in self.workers:
             worker.close()
             worker.join()
+    
+    def get_and_reset_events(self):
+        events = []
+        for worker in self.workers:
+            events += worker.get_events()
+            worker.reset_events()
+        return events
 
 
 # AMQP endpoint functions
@@ -1719,6 +1728,74 @@ def test_ps_s3_notification_push_kafka_security_ssl_sasl():
     kafka_security('SSL_SASL')
 
 
+def test_ps_s3_notification_multi_delete_on_master():
+    """ test deletion of multiple keys on master """
+    if skip_push_tests:
+        return SkipTest("PubSub push tests don't run in teuthology")
+    hostname = get_ip()
+    zones, _  = init_env(require_ps=False)
+    realm = get_realm()
+    zonegroup = realm.master_zonegroup()
+    
+    # create random port for the http server
+    host = get_ip()
+    port = random.randint(10000, 20000)
+    # start an http server in a separate thread
+    number_of_objects = 10
+    http_server = StreamingHTTPServer(host, port, num_workers=number_of_objects)
+    
+    # create bucket
+    bucket_name = gen_bucket_name()
+    bucket = zones[0].create_bucket(bucket_name)
+    topic_name = bucket_name + TOPIC_SUFFIX
+
+    # create s3 topic
+    endpoint_address = 'http://'+host+':'+str(port)
+    endpoint_args = 'push-endpoint='+endpoint_address
+    topic_conf = PSTopicS3(zones[0].conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
+    topic_arn = topic_conf.set_config()
+    # create s3 notification
+    notification_name = bucket_name + NOTIFICATION_SUFFIX
+    topic_conf_list = [{'Id': notification_name,
+                        'TopicArn': topic_arn,
+                        'Events': ['s3:ObjectRemoved:*']
+                       }]
+    s3_notification_conf = PSNotificationS3(zones[0].conn, bucket_name, topic_conf_list)
+    response, status = s3_notification_conf.set_config()
+    assert_equal(status/100, 2)
+
+    # create objects in the bucket
+    client_threads = []
+    for i in range(number_of_objects):
+        obj_size = randint(1, 1024)
+        content = str(os.urandom(obj_size))
+        key = bucket.new_key(str(i))
+        thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
+        thr.start()
+        client_threads.append(thr)
+    [thr.join() for thr in client_threads] 
+    
+    keys = list(bucket.list())
+
+    start_time = time.time()
+    delete_all_objects(zones[0].conn, bucket_name)
+    time_diff = time.time() - start_time
+    print 'average time for deletion + http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds'
+
+    print('wait for 5sec for the messages...')
+    time.sleep(5)
+    
+    # check http receiver
+    http_server.verify_s3_events(keys, exact_match=True, deletions=True)
+    
+    # cleanup
+    topic_conf.del_config()
+    s3_notification_conf.del_config(notification=notification_name)
+    # delete the bucket
+    zones[0].delete_bucket(bucket_name)
+    http_server.close()
+
+
 def test_ps_s3_notification_push_http_on_master():
     """ test pushing http s3 notification on master """
     if skip_push_tests:
index 4ccff3eb0ad4e5e335cba9b04cf85c75797dfe92..9fd272abba7d08195cbffd15b1db57e8def510f5 100644 (file)
@@ -171,6 +171,21 @@ def delete_all_s3_topics(zone, region):
         print 'failed to do topic cleanup: ' + str(err)
     
 
+def delete_all_objects(conn, bucket_name):
+    client = boto3.client('s3',
+                      endpoint_url='http://'+conn.host+':'+str(conn.port),
+                      aws_access_key_id=conn.aws_access_key_id,
+                      aws_secret_access_key=conn.aws_secret_access_key)
+
+    objects = []
+    for key in client.list_objects(Bucket=bucket_name)['Contents']:
+        objects.append({'Key': key['Key']})
+    # delete objects from the bucket
+    response = client.delete_objects(Bucket=bucket_name,
+            Delete={'Objects': objects})
+    return response
+
+
 class PSTopicS3:
     """class to set/list/get/delete a topic
     POST ?Action=CreateTopic&Name=<topic name>[&OpaqueData=<data>[&push-endpoint=<endpoint>&[<arg1>=<value1>...]]]