]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw/notifications: fix zero size in notifications 34641/head
authorYuval Lifshitz <ylifshit@redhat.com>
Mon, 20 Apr 2020 14:37:17 +0000 (17:37 +0300)
committerYuval Lifshitz <ylifshit@redhat.com>
Wed, 22 Apr 2020 15:06:16 +0000 (18:06 +0300)
delete notifications and multipart upload complete notifications
send the correct object size

Signed-off-by: Yuval Lifshitz <ylifshit@redhat.com>
Fixes: https://tracker.ceph.com/issues/45150
src/rgw/rgw_op.cc
src/test/rgw/rgw_multi/tests_ps.py

index fadba7c73dbe3a637a9c4740a664fd9d65c89849..6e28396586ed07f8163c0811a9573c79fe88cdf9 100644 (file)
@@ -4408,7 +4408,7 @@ void RGWPostObj::execute()
     }
   } while (is_next_file_to_upload());
 
-  const auto ret = rgw::notify::publish(s, s->object, s->obj_size, ceph::real_clock::now(), etag, rgw::notify::ObjectCreatedPost, store);
+  const auto ret = rgw::notify::publish(s, s->object, ofs, ceph::real_clock::now(), etag, rgw::notify::ObjectCreatedPost, store);
   if (ret < 0) {
     ldpp_dout(this, 5) << "WARNING: publishing notification failed, with error: " << ret << dendl;
        // TODO: we should have conf to make send a blocking coroutine and reply with error in case sending failed
@@ -4966,19 +4966,20 @@ void RGWDeleteObj::execute()
       ldpp_dout(this, 5) << "WARNING: failed to populate delete request with object tags: " << err.what() << dendl;
     }
     populate_metadata_in_request(s, attrs);
+    const auto obj_state = obj_ctx->get_state(obj);
+
+    const auto ret = rgw::notify::publish(s, s->object, obj_state->size , obj_state->mtime, attrs[RGW_ATTR_ETAG].to_str(),
+        delete_marker && s->object.instance.empty() ? rgw::notify::ObjectRemovedDeleteMarkerCreated : rgw::notify::ObjectRemovedDelete,
+        store);
+    if (ret < 0) {
+      ldpp_dout(this, 5) << "WARNING: publishing notification failed, with error: " << ret << dendl;
+           // TODO: we should have conf to make send a blocking coroutine and reply with error in case sending failed
+           // this should be global conf (probably returnign a different handler)
+      // so we don't need to read the configured values before we perform it
+    }
   } else {
     op_ret = -EINVAL;
   }
-
-  const auto ret = rgw::notify::publish(s, s->object, s->obj_size, ceph::real_clock::now(), attrs[RGW_ATTR_ETAG].to_str(),
-          delete_marker && s->object.instance.empty() ? rgw::notify::ObjectRemovedDeleteMarkerCreated : rgw::notify::ObjectRemovedDelete,
-          store);
-  if (ret < 0) {
-    ldpp_dout(this, 5) << "WARNING: publishing notification failed, with error: " << ret << dendl;
-       // TODO: we should have conf to make send a blocking coroutine and reply with error in case sending failed
-       // this should be global conf (probably returnign a different handler)
-    // so we don't need to read the configured values before we perform it
-  }
 }
 
 bool RGWCopyObj::parse_copy_location(const boost::string_view& url_src,
@@ -6270,7 +6271,8 @@ void RGWCompleteMultipart::execute()
     ldpp_dout(this, 0) << "WARNING: failed to remove object " << meta_obj << dendl;
   }
   
-  const auto ret = rgw::notify::publish(s, s->object, s->obj_size, ceph::real_clock::now(), etag, rgw::notify::ObjectCreatedCompleteMultipartUpload, store);
+  const auto ret = rgw::notify::publish(s, s->object, ofs, ceph::real_clock::now(), final_etag_str, rgw::notify::ObjectCreatedCompleteMultipartUpload, store);
+
   if (ret < 0) {
     ldpp_dout(this, 5) << "WARNING: publishing notification failed, with error: " << ret << dendl;
        // TODO: we should have conf to make send a blocking coroutine and reply with error in case sending failed
@@ -6631,7 +6633,7 @@ void RGWDeleteMultiObj::execute()
     bufferlist etag_bl;
     const auto etag = obj_state->get_attr(RGW_ATTR_ETAG, etag_bl) ? etag_bl.to_str() : "";
 
-    const auto ret = rgw::notify::publish(s, obj.key, obj_state->size, ceph::real_clock::now(), etag, 
+    const auto ret = rgw::notify::publish(s, obj.key, obj_state->size, obj_state->mtime, etag, 
             del_op.result.delete_marker && s->object.instance.empty() ? rgw::notify::ObjectRemovedDeleteMarkerCreated : rgw::notify::ObjectRemovedDelete,
             store);
     if (ret < 0) {
index b3aead9c38c1664ac4bf7ac70ff801ef35bfaefd..79d2ef95b7fb18431a537929673c00402e8ac839 100644 (file)
@@ -125,13 +125,13 @@ class StreamingHTTPServer:
         self.sock.listen(num_workers)
         self.workers = [HTTPServerThread(i, self.sock, addr) for i in range(num_workers)]
 
-    def verify_s3_events(self, keys, exact_match=False, deletions=False):
+    def verify_s3_events(self, keys, exact_match=False, deletions=False, expected_sizes={}):
         """verify stored s3 records agains a list of keys"""
         events = []
         for worker in self.workers:
             events += worker.get_events()
             worker.reset_events()
-        verify_s3_records_by_elements(events, keys, exact_match=exact_match, deletions=deletions)
+        verify_s3_records_by_elements(events, keys, exact_match=exact_match, deletions=deletions, expected_sizes=expected_sizes)
 
     def verify_events(self, keys, exact_match=False, deletions=False):
         """verify stored events agains a list of keys"""
@@ -300,11 +300,12 @@ def verify_events_by_elements(events, keys, exact_match=False, deletions=False):
             assert False, err
 
 
-def verify_s3_records_by_elements(records, keys, exact_match=False, deletions=False):
+def verify_s3_records_by_elements(records, keys, exact_match=False, deletions=False, expected_sizes={}):
     """ verify there is at least one record per element """
     err = ''
     for key in keys:
         key_found = False
+        object_size = 0
         if type(records) is list:
             for record_list in records:
                 if key_found:
@@ -314,9 +315,11 @@ def verify_s3_records_by_elements(records, keys, exact_match=False, deletions=Fa
                         record['s3']['object']['key'] == key.name:
                         if deletions and 'ObjectRemoved' in record['eventName']:
                             key_found = True
+                            object_size = record['s3']['object']['size']
                             break
                         elif not deletions and 'ObjectCreated' in record['eventName']:
                             key_found = True
+                            object_size = record['s3']['object']['size']
                             break
         else:
             for record in records['Records']:
@@ -324,17 +327,18 @@ def verify_s3_records_by_elements(records, keys, exact_match=False, deletions=Fa
                     record['s3']['object']['key'] == key.name:
                     if deletions and 'ObjectRemoved' in record['eventName']:
                         key_found = True
+                        object_size = record['s3']['object']['size']
                         break
                     elif not deletions and 'ObjectCreated' in record['eventName']:
                         key_found = True
+                        object_size = record['s3']['object']['size']
                         break
 
         if not key_found:
             err = 'no ' + ('deletion' if deletions else 'creation') + ' event found for key: ' + str(key)
-            for record_list in records:
-                for record in record_list['Records']:
-                    log.error(str(record['s3']['bucket']['name']) + ',' + str(record['s3']['object']['key']))
             assert False, err
+        elif expected_sizes:
+            assert_equal(object_size, expected_sizes.get(key.name))
 
     if not len(records) == len(keys):
         err = 'superfluous records are found'
@@ -1724,7 +1728,7 @@ def test_ps_s3_notification_multi_delete_on_master():
     if skip_push_tests:
         return SkipTest("PubSub push tests don't run in teuthology")
     hostname = get_ip()
-    zones, _  = init_env(require_ps=False)
+    master_zone, _  = init_env(require_ps=False)
     realm = get_realm()
     zonegroup = realm.master_zonegroup()
     
@@ -1737,13 +1741,13 @@ def test_ps_s3_notification_multi_delete_on_master():
     
     # create bucket
     bucket_name = gen_bucket_name()
-    bucket = zones[0].create_bucket(bucket_name)
+    bucket = master_zone.create_bucket(bucket_name)
     topic_name = bucket_name + TOPIC_SUFFIX
 
     # create s3 topic
     endpoint_address = 'http://'+host+':'+str(port)
     endpoint_args = 'push-endpoint='+endpoint_address
-    topic_conf = PSTopicS3(zones[0].conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
+    topic_conf = PSTopicS3(master_zone.conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
     topic_arn = topic_conf.set_config()
     # create s3 notification
     notification_name = bucket_name + NOTIFICATION_SUFFIX
@@ -1751,16 +1755,18 @@ def test_ps_s3_notification_multi_delete_on_master():
                         'TopicArn': topic_arn,
                         'Events': ['s3:ObjectRemoved:*']
                        }]
-    s3_notification_conf = PSNotificationS3(zones[0].conn, bucket_name, topic_conf_list)
+    s3_notification_conf = PSNotificationS3(master_zone.conn, bucket_name, topic_conf_list)
     response, status = s3_notification_conf.set_config()
     assert_equal(status/100, 2)
 
     # create objects in the bucket
     client_threads = []
+    objects_size = {}
     for i in range(number_of_objects):
-        obj_size = randint(1, 1024)
-        content = str(os.urandom(obj_size))
+        object_size = randint(1, 1024)
+        content = str(os.urandom(object_size))
         key = bucket.new_key(str(i))
+        objects_size[key.name] = object_size
         thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
         thr.start()
         client_threads.append(thr)
@@ -1769,21 +1775,21 @@ def test_ps_s3_notification_multi_delete_on_master():
     keys = list(bucket.list())
 
     start_time = time.time()
-    delete_all_objects(zones[0].conn, bucket_name)
+    delete_all_objects(master_zone.conn, bucket_name)
     time_diff = time.time() - start_time
     print('average time for deletion + http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
 
     print('wait for 5sec for the messages...')
     time.sleep(5)
-    
+   
     # check http receiver
-    http_server.verify_s3_events(keys, exact_match=True, deletions=True)
+    http_server.verify_s3_events(keys, exact_match=True, deletions=True, expected_sizes=objects_size)
     
     # cleanup
     topic_conf.del_config()
     s3_notification_conf.del_config(notification=notification_name)
     # delete the bucket
-    zones[0].delete_bucket(bucket_name)
+    master_zone.delete_bucket(bucket_name)
     http_server.close()
 
 
@@ -1825,10 +1831,13 @@ def test_ps_s3_notification_push_http_on_master():
 
     # create objects in the bucket
     client_threads = []
+    objects_size = {}
     start_time = time.time()
-    content = 'bar'
     for i in range(number_of_objects):
+        object_size = randint(1, 1024)
+        content = str(os.urandom(object_size))
         key = bucket.new_key(str(i))
+        objects_size[key.name] = object_size
         thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
         thr.start()
         client_threads.append(thr)
@@ -1842,8 +1851,7 @@ def test_ps_s3_notification_push_http_on_master():
     
     # check http receiver
     keys = list(bucket.list())
-    print('total number of objects: ' + str(len(keys)))
-    http_server.verify_s3_events(keys, exact_match=True)
+    http_server.verify_s3_events(keys, exact_match=True, deletions=False, expected_sizes=objects_size)
     
     # delete objects from the bucket
     client_threads = []
@@ -1861,7 +1869,7 @@ def test_ps_s3_notification_push_http_on_master():
     time.sleep(5)
     
     # check http receiver
-    http_server.verify_s3_events(keys, exact_match=True, deletions=True)
+    http_server.verify_s3_events(keys, exact_match=True, deletions=True, expected_sizes=objects_size)
     
     # cleanup
     topic_conf.del_config()
@@ -2710,7 +2718,8 @@ def test_ps_s3_multipart_on_master():
 
     # create objects in the bucket using multi-part upload
     fp = tempfile.NamedTemporaryFile(mode='w+b')
-    content = bytearray(os.urandom(1024*1024))
+    object_size = 10*1024*1024
+    content = bytearray(os.urandom(object_size))
     fp.write(content)
     fp.flush()
     fp.seek(0)
@@ -2735,6 +2744,7 @@ def test_ps_s3_multipart_on_master():
     assert_equal(len(events), 1)
     assert_equal(events[0]['Records'][0]['eventName'], 's3:ObjectCreated:CompleteMultipartUpload')
     assert_equal(events[0]['Records'][0]['s3']['configurationId'], notification_name+'_3')
+    print events[0]['Records'][0]['s3']['object']['size']
 
     # cleanup
     stop_amqp_receiver(receiver1, task1)