]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
rgw/pubsub: send notifications from multi-delete op
authorYuval Lifshitz <yuvalif@yahoo.com>
Tue, 10 Dec 2019 18:22:43 +0000 (20:22 +0200)
committerYuval Lifshitz <yuvalif@yahoo.com>
Tue, 21 Jan 2020 11:06:14 +0000 (13:06 +0200)
Signed-off-by: Yuval Lifshitz <yuvalif@yahoo.com>
src/rgw/rgw_notify.cc
src/rgw/rgw_notify.h
src/rgw/rgw_op.cc
src/test/rgw/rgw_multi/tests_ps.py
src/test/rgw/rgw_multi/zone_ps.py

index 79ac062df7a2691bc22bccc46a237dec319c3a93..25b5c533bfdfd5e444a4c3da137bb5e0fb35f0cb 100644 (file)
@@ -15,6 +15,8 @@ namespace rgw::notify {
 
 // populate record from request
 void populate_record_from_request(const req_state *s, 
+        const rgw_obj_key& key,
+        uint64_t size,
         const ceph::real_time& mtime, 
         const std::string& etag, 
         EventType event_type,
@@ -28,10 +30,10 @@ void populate_record_from_request(const req_state *s,
   record.bucket_name = s->bucket_name;
   record.bucket_ownerIdentity = s->bucket_owner.get_id().id;
   record.bucket_arn = to_string(rgw::ARN(s->bucket));
-  record.object_key = s->object.name;
-  record.object_size = s->obj_size;
+  record.object_key = key.name;
+  record.object_size = size;
   record.object_etag = etag;
-  record.object_versionId = s->object.instance;
+  record.object_versionId = key.instance;
   // use timestamp as per key sequence id (hex encoded)
   const utime_t ts(real_clock::now());
   boost::algorithm::hex((const char*)&ts, (const char*)&ts + sizeof(utime_t), 
@@ -62,6 +64,8 @@ bool match(const rgw_pubsub_topic_filter& filter, const req_state* s, EventType
 }
 
 int publish(const req_state* s, 
+        const rgw_obj_key& key,
+        uint64_t size,
         const ceph::real_time& mtime, 
         const std::string& etag, 
         EventType event_type,
@@ -75,7 +79,7 @@ int publish(const req_state* s,
         return rc;
     }
     rgw_pubsub_s3_record record;
-    populate_record_from_request(s, mtime, etag, event_type, record);
+    populate_record_from_request(s, key, size, mtime, etag, event_type, record);
     bool event_handled = false;
     bool event_should_be_handled = false;
     for (const auto& bucket_topic : bucket_topics.topics) {
index 4711c30aaa860aacbea3a383c8ed72be4e6029c6..0dc666b365ed3d847157468581594fbc6aa5ec16 100644 (file)
@@ -14,11 +14,14 @@ namespace rgw::sal {
 }
 class RGWRados;
 class req_state;
+struct rgw_obj_key;
 
 namespace rgw::notify {
 
 // publish notification
 int publish(const req_state* s, 
+        const rgw_obj_key& key,
+        uint64_t size,
         const ceph::real_time& mtime, 
         const std::string& etag, 
         EventType event_type,
index dbffcfb1a6e5e1bbead8647496c642d7a2c70e02..8ec3ec0f12ea6e3b51abcbc9bd878d7c9b0a98e3 100644 (file)
@@ -4061,7 +4061,7 @@ void RGWPutObj::execute()
   }
 
   // send request to notification manager
-  const auto ret = rgw::notify::publish(s, mtime, etag, rgw::notify::ObjectCreatedPut, store);
+  const auto ret = rgw::notify::publish(s, s->object, s->obj_size, mtime, etag, rgw::notify::ObjectCreatedPut, store);
   if (ret < 0) {
     ldpp_dout(this, 5) << "WARNING: publishing notification failed, with error: " << ret << dendl;
        // TODO: we should have conf to make send a blocking coroutine and reply with error in case sending failed
@@ -4289,7 +4289,7 @@ void RGWPostObj::execute()
     }
   } while (is_next_file_to_upload());
 
-  const auto ret = rgw::notify::publish(s, ceph::real_clock::now(), etag, rgw::notify::ObjectCreatedPost, store);
+  const auto ret = rgw::notify::publish(s, s->object, s->obj_size, ceph::real_clock::now(), etag, rgw::notify::ObjectCreatedPost, store);
   if (ret < 0) {
     ldpp_dout(this, 5) << "WARNING: publishing notification failed, with error: " << ret << dendl;
        // TODO: we should have conf to make send a blocking coroutine and reply with error in case sending failed
@@ -4851,7 +4851,7 @@ void RGWDeleteObj::execute()
     op_ret = -EINVAL;
   }
 
-  const auto ret = rgw::notify::publish(s, ceph::real_clock::now(), attrs[RGW_ATTR_ETAG].to_str(),
+  const auto ret = rgw::notify::publish(s, s->object, s->obj_size, ceph::real_clock::now(), attrs[RGW_ATTR_ETAG].to_str(),
           delete_marker && s->object.instance.empty() ? rgw::notify::ObjectRemovedDeleteMarkerCreated : rgw::notify::ObjectRemovedDelete,
           store);
   if (ret < 0) {
@@ -5165,7 +5165,7 @@ void RGWCopyObj::execute()
           this,
           s->yield);
 
-  const auto ret = rgw::notify::publish(s, mtime, etag, rgw::notify::ObjectCreatedCopy, store);
+  const auto ret = rgw::notify::publish(s, s->object, s->obj_size, mtime, etag, rgw::notify::ObjectCreatedCopy, store);
   if (ret < 0) {
     ldpp_dout(this, 5) << "WARNING: publishing notification failed, with error: " << ret << dendl;
        // TODO: we should have conf to make send a blocking coroutine and reply with error in case sending failed
@@ -5843,7 +5843,7 @@ void RGWInitMultipart::execute()
     op_ret = obj_op.write_meta(bl.length(), 0, attrs, s->yield);
   } while (op_ret == -EEXIST);
   
-  const auto ret = rgw::notify::publish(s, ceph::real_clock::now(), attrs[RGW_ATTR_ETAG].to_str(), rgw::notify::ObjectCreatedPost, store);
+  const auto ret = rgw::notify::publish(s, s->object, s->obj_size, ceph::real_clock::now(), attrs[RGW_ATTR_ETAG].to_str(), rgw::notify::ObjectCreatedPost, store);
   if (ret < 0) {
     ldpp_dout(this, 5) << "WARNING: publishing notification failed, with error: " << ret << dendl;
        // TODO: we should have conf to make send a blocking coroutine and reply with error in case sending failed
@@ -6157,7 +6157,7 @@ void RGWCompleteMultipart::execute()
     ldpp_dout(this, 0) << "WARNING: failed to remove object " << meta_obj << dendl;
   }
   
-  const auto ret = rgw::notify::publish(s, ceph::real_clock::now(), etag, rgw::notify::ObjectCreatedCompleteMultipartUpload, store);
+  const auto ret = rgw::notify::publish(s, s->object, s->obj_size, ceph::real_clock::now(), etag, rgw::notify::ObjectCreatedCompleteMultipartUpload, store);
   if (ret < 0) {
     ldpp_dout(this, 5) << "WARNING: publishing notification failed, with error: " << ret << dendl;
        // TODO: we should have conf to make send a blocking coroutine and reply with error in case sending failed
@@ -6513,6 +6513,20 @@ void RGWDeleteMultiObj::execute()
 
     send_partial_response(*iter, del_op.result.delete_marker,
                          del_op.result.version_id, op_ret);
+
+    const auto obj_state = obj_ctx->get_state(obj);
+    bufferlist etag_bl;
+    const auto etag = obj_state->get_attr(RGW_ATTR_ETAG, etag_bl) ? etag_bl.to_str() : "";
+
+    const auto ret = rgw::notify::publish(s, obj.key, obj_state->size, ceph::real_clock::now(), etag, 
+            del_op.result.delete_marker && s->object.instance.empty() ? rgw::notify::ObjectRemovedDeleteMarkerCreated : rgw::notify::ObjectRemovedDelete,
+            store);
+    if (ret < 0) {
+        ldpp_dout(this, 5) << "WARNING: publishing notification failed, with error: " << ret << dendl;
+           // TODO: we should have conf to make send a blocking coroutine and reply with error in case sending failed
+           // this should be global conf (probably returnign a different handler)
+        // so we don't need to read the configured values before we perform it
+    }
   }
 
   /*  set the return code to zero, errors at this point will be
index 4eb5bb8914018eb89b0d8ddd294595dc01c6c42f..27916a70378e5217fd3d4d024d5af6ef27486fd6 100644 (file)
@@ -9,6 +9,7 @@ import subprocess
 import socket
 import time
 import os
+from random import randint
 from .tests import get_realm, \
     ZonegroupConns, \
     zonegroup_meta_checkpoint, \
@@ -28,7 +29,8 @@ from .zone_ps import PSTopic, \
     print_connection_info, \
     delete_all_s3_topics, \
     put_object_tagging, \
-    get_object_tagging
+    get_object_tagging, \
+    delete_all_objects
 from multisite import User
 from nose import SkipTest
 from nose.tools import assert_not_equal, assert_equal
@@ -157,6 +159,13 @@ class StreamingHTTPServer:
         for worker in self.workers:
             worker.close()
             worker.join()
+    
+    def get_and_reset_events(self):
+        events = []
+        for worker in self.workers:
+            events += worker.get_events()
+            worker.reset_events()
+        return events
 
 
 # AMQP endpoint functions
@@ -1719,6 +1728,74 @@ def test_ps_s3_notification_push_kafka_security_ssl_sasl():
     kafka_security('SSL_SASL')
 
 
+def test_ps_s3_notification_multi_delete_on_master():
+    """ test deletion of multiple keys on master """
+    if skip_push_tests:
+        return SkipTest("PubSub push tests don't run in teuthology")
+    hostname = get_ip()
+    zones, _  = init_env(require_ps=False)
+    realm = get_realm()
+    zonegroup = realm.master_zonegroup()
+    
+    # create random port for the http server
+    host = get_ip()
+    port = random.randint(10000, 20000)
+    # start an http server in a separate thread
+    number_of_objects = 10
+    http_server = StreamingHTTPServer(host, port, num_workers=number_of_objects)
+    
+    # create bucket
+    bucket_name = gen_bucket_name()
+    bucket = zones[0].create_bucket(bucket_name)
+    topic_name = bucket_name + TOPIC_SUFFIX
+
+    # create s3 topic
+    endpoint_address = 'http://'+host+':'+str(port)
+    endpoint_args = 'push-endpoint='+endpoint_address
+    topic_conf = PSTopicS3(zones[0].conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
+    topic_arn = topic_conf.set_config()
+    # create s3 notification
+    notification_name = bucket_name + NOTIFICATION_SUFFIX
+    topic_conf_list = [{'Id': notification_name,
+                        'TopicArn': topic_arn,
+                        'Events': ['s3:ObjectRemoved:*']
+                       }]
+    s3_notification_conf = PSNotificationS3(zones[0].conn, bucket_name, topic_conf_list)
+    response, status = s3_notification_conf.set_config()
+    assert_equal(status/100, 2)
+
+    # create objects in the bucket
+    client_threads = []
+    for i in range(number_of_objects):
+        obj_size = randint(1, 1024)
+        content = str(os.urandom(obj_size))
+        key = bucket.new_key(str(i))
+        thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
+        thr.start()
+        client_threads.append(thr)
+    [thr.join() for thr in client_threads] 
+    
+    keys = list(bucket.list())
+
+    start_time = time.time()
+    delete_all_objects(zones[0].conn, bucket_name)
+    time_diff = time.time() - start_time
+    print 'average time for deletion + http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds'
+
+    print('wait for 5sec for the messages...')
+    time.sleep(5)
+    
+    # check http receiver
+    http_server.verify_s3_events(keys, exact_match=True, deletions=True)
+    
+    # cleanup
+    topic_conf.del_config()
+    s3_notification_conf.del_config(notification=notification_name)
+    # delete the bucket
+    zones[0].delete_bucket(bucket_name)
+    http_server.close()
+
+
 def test_ps_s3_notification_push_http_on_master():
     """ test pushing http s3 notification on master """
     if skip_push_tests:
@@ -2888,7 +2965,7 @@ def test_ps_s3_tags_on_master():
     expected_tags = [{'val': 'world', 'key': 'hello'}, {'val': 'boom', 'key': 'ka'}]
     # check amqp receiver
     for event in receiver.get_and_reset_events():
-        obj_tags =  event['s3']['object']['tags']
+        obj_tags =  event['Records'][0]['s3']['object']['tags']
         assert_equal(obj_tags[0], expected_tags[0])
 
     # delete the objects
@@ -2898,7 +2975,7 @@ def test_ps_s3_tags_on_master():
     time.sleep(5)
     # check amqp receiver
     for event in receiver.get_and_reset_events():
-        obj_tags =  event['s3']['object']['tags']
+        obj_tags =  event['Records'][0]['s3']['object']['tags']
         assert_equal(obj_tags[0], expected_tags[0])
 
     # cleanup
index 090fbc789fc2bab29e5997cbe3b9de03c70673fe..d5786d25fbf5ef077ad5583ec077375b21ff2308 100644 (file)
@@ -171,6 +171,21 @@ def delete_all_s3_topics(zone, region):
         print 'failed to do topic cleanup: ' + str(err)
     
 
+def delete_all_objects(conn, bucket_name):
+    client = boto3.client('s3',
+                      endpoint_url='http://'+conn.host+':'+str(conn.port),
+                      aws_access_key_id=conn.aws_access_key_id,
+                      aws_secret_access_key=conn.aws_secret_access_key)
+
+    objects = []
+    for key in client.list_objects(Bucket=bucket_name)['Contents']:
+        objects.append({'Key': key['Key']})
+    # delete objects from the bucket
+    response = client.delete_objects(Bucket=bucket_name,
+            Delete={'Objects': objects})
+    return response
+
+
 class PSTopicS3:
     """class to set/list/get/delete a topic
     POST ?Action=CreateTopic&Name=<topic name>[&OpaqueData=<data>[&push-endpoint=<endpoint>&[<arg1>=<value1>...]]]