From: Yuval Lifshitz Date: Mon, 20 Apr 2020 14:37:17 +0000 (+0300) Subject: rgw/notifications: fix zero size in notifications X-Git-Tag: v16.1.0~2462^2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=f04f01ad429ff158f2da8346f884335a600c0e41;p=ceph.git rgw/notifications: fix zero size in notifications delete notifications and multipart upload complete notifications send the correct object size Signed-off-by: Yuval Lifshitz Fixes: https://tracker.ceph.com/issues/45150 --- diff --git a/src/rgw/rgw_op.cc b/src/rgw/rgw_op.cc index fadba7c73dbe..6e28396586ed 100644 --- a/src/rgw/rgw_op.cc +++ b/src/rgw/rgw_op.cc @@ -4408,7 +4408,7 @@ void RGWPostObj::execute() } } while (is_next_file_to_upload()); - const auto ret = rgw::notify::publish(s, s->object, s->obj_size, ceph::real_clock::now(), etag, rgw::notify::ObjectCreatedPost, store); + const auto ret = rgw::notify::publish(s, s->object, ofs, ceph::real_clock::now(), etag, rgw::notify::ObjectCreatedPost, store); if (ret < 0) { ldpp_dout(this, 5) << "WARNING: publishing notification failed, with error: " << ret << dendl; // TODO: we should have conf to make send a blocking coroutine and reply with error in case sending failed @@ -4966,19 +4966,20 @@ void RGWDeleteObj::execute() ldpp_dout(this, 5) << "WARNING: failed to populate delete request with object tags: " << err.what() << dendl; } populate_metadata_in_request(s, attrs); + const auto obj_state = obj_ctx->get_state(obj); + + const auto ret = rgw::notify::publish(s, s->object, obj_state->size , obj_state->mtime, attrs[RGW_ATTR_ETAG].to_str(), + delete_marker && s->object.instance.empty() ? rgw::notify::ObjectRemovedDeleteMarkerCreated : rgw::notify::ObjectRemovedDelete, + store); + if (ret < 0) { + ldpp_dout(this, 5) << "WARNING: publishing notification failed, with error: " << ret << dendl; + // TODO: we should have conf to make send a blocking coroutine and reply with error in case sending failed + // this should be global conf (probably returnign a different handler) + // so we don't need to read the configured values before we perform it + } } else { op_ret = -EINVAL; } - - const auto ret = rgw::notify::publish(s, s->object, s->obj_size, ceph::real_clock::now(), attrs[RGW_ATTR_ETAG].to_str(), - delete_marker && s->object.instance.empty() ? rgw::notify::ObjectRemovedDeleteMarkerCreated : rgw::notify::ObjectRemovedDelete, - store); - if (ret < 0) { - ldpp_dout(this, 5) << "WARNING: publishing notification failed, with error: " << ret << dendl; - // TODO: we should have conf to make send a blocking coroutine and reply with error in case sending failed - // this should be global conf (probably returnign a different handler) - // so we don't need to read the configured values before we perform it - } } bool RGWCopyObj::parse_copy_location(const boost::string_view& url_src, @@ -6270,7 +6271,8 @@ void RGWCompleteMultipart::execute() ldpp_dout(this, 0) << "WARNING: failed to remove object " << meta_obj << dendl; } - const auto ret = rgw::notify::publish(s, s->object, s->obj_size, ceph::real_clock::now(), etag, rgw::notify::ObjectCreatedCompleteMultipartUpload, store); + const auto ret = rgw::notify::publish(s, s->object, ofs, ceph::real_clock::now(), final_etag_str, rgw::notify::ObjectCreatedCompleteMultipartUpload, store); + if (ret < 0) { ldpp_dout(this, 5) << "WARNING: publishing notification failed, with error: " << ret << dendl; // TODO: we should have conf to make send a blocking coroutine and reply with error in case sending failed @@ -6631,7 +6633,7 @@ void RGWDeleteMultiObj::execute() bufferlist etag_bl; const auto etag = obj_state->get_attr(RGW_ATTR_ETAG, etag_bl) ? etag_bl.to_str() : ""; - const auto ret = rgw::notify::publish(s, obj.key, obj_state->size, ceph::real_clock::now(), etag, + const auto ret = rgw::notify::publish(s, obj.key, obj_state->size, obj_state->mtime, etag, del_op.result.delete_marker && s->object.instance.empty() ? rgw::notify::ObjectRemovedDeleteMarkerCreated : rgw::notify::ObjectRemovedDelete, store); if (ret < 0) { diff --git a/src/test/rgw/rgw_multi/tests_ps.py b/src/test/rgw/rgw_multi/tests_ps.py index b3aead9c38c1..79d2ef95b7fb 100644 --- a/src/test/rgw/rgw_multi/tests_ps.py +++ b/src/test/rgw/rgw_multi/tests_ps.py @@ -125,13 +125,13 @@ class StreamingHTTPServer: self.sock.listen(num_workers) self.workers = [HTTPServerThread(i, self.sock, addr) for i in range(num_workers)] - def verify_s3_events(self, keys, exact_match=False, deletions=False): + def verify_s3_events(self, keys, exact_match=False, deletions=False, expected_sizes={}): """verify stored s3 records agains a list of keys""" events = [] for worker in self.workers: events += worker.get_events() worker.reset_events() - verify_s3_records_by_elements(events, keys, exact_match=exact_match, deletions=deletions) + verify_s3_records_by_elements(events, keys, exact_match=exact_match, deletions=deletions, expected_sizes=expected_sizes) def verify_events(self, keys, exact_match=False, deletions=False): """verify stored events agains a list of keys""" @@ -300,11 +300,12 @@ def verify_events_by_elements(events, keys, exact_match=False, deletions=False): assert False, err -def verify_s3_records_by_elements(records, keys, exact_match=False, deletions=False): +def verify_s3_records_by_elements(records, keys, exact_match=False, deletions=False, expected_sizes={}): """ verify there is at least one record per element """ err = '' for key in keys: key_found = False + object_size = 0 if type(records) is list: for record_list in records: if key_found: @@ -314,9 +315,11 @@ def verify_s3_records_by_elements(records, keys, exact_match=False, deletions=Fa record['s3']['object']['key'] == key.name: if deletions and 'ObjectRemoved' in record['eventName']: key_found = True + object_size = record['s3']['object']['size'] break elif not deletions and 'ObjectCreated' in record['eventName']: key_found = True + object_size = record['s3']['object']['size'] break else: for record in records['Records']: @@ -324,17 +327,18 @@ def verify_s3_records_by_elements(records, keys, exact_match=False, deletions=Fa record['s3']['object']['key'] == key.name: if deletions and 'ObjectRemoved' in record['eventName']: key_found = True + object_size = record['s3']['object']['size'] break elif not deletions and 'ObjectCreated' in record['eventName']: key_found = True + object_size = record['s3']['object']['size'] break if not key_found: err = 'no ' + ('deletion' if deletions else 'creation') + ' event found for key: ' + str(key) - for record_list in records: - for record in record_list['Records']: - log.error(str(record['s3']['bucket']['name']) + ',' + str(record['s3']['object']['key'])) assert False, err + elif expected_sizes: + assert_equal(object_size, expected_sizes.get(key.name)) if not len(records) == len(keys): err = 'superfluous records are found' @@ -1724,7 +1728,7 @@ def test_ps_s3_notification_multi_delete_on_master(): if skip_push_tests: return SkipTest("PubSub push tests don't run in teuthology") hostname = get_ip() - zones, _ = init_env(require_ps=False) + master_zone, _ = init_env(require_ps=False) realm = get_realm() zonegroup = realm.master_zonegroup() @@ -1737,13 +1741,13 @@ def test_ps_s3_notification_multi_delete_on_master(): # create bucket bucket_name = gen_bucket_name() - bucket = zones[0].create_bucket(bucket_name) + bucket = master_zone.create_bucket(bucket_name) topic_name = bucket_name + TOPIC_SUFFIX # create s3 topic endpoint_address = 'http://'+host+':'+str(port) endpoint_args = 'push-endpoint='+endpoint_address - topic_conf = PSTopicS3(zones[0].conn, topic_name, zonegroup.name, endpoint_args=endpoint_args) + topic_conf = PSTopicS3(master_zone.conn, topic_name, zonegroup.name, endpoint_args=endpoint_args) topic_arn = topic_conf.set_config() # create s3 notification notification_name = bucket_name + NOTIFICATION_SUFFIX @@ -1751,16 +1755,18 @@ def test_ps_s3_notification_multi_delete_on_master(): 'TopicArn': topic_arn, 'Events': ['s3:ObjectRemoved:*'] }] - s3_notification_conf = PSNotificationS3(zones[0].conn, bucket_name, topic_conf_list) + s3_notification_conf = PSNotificationS3(master_zone.conn, bucket_name, topic_conf_list) response, status = s3_notification_conf.set_config() assert_equal(status/100, 2) # create objects in the bucket client_threads = [] + objects_size = {} for i in range(number_of_objects): - obj_size = randint(1, 1024) - content = str(os.urandom(obj_size)) + object_size = randint(1, 1024) + content = str(os.urandom(object_size)) key = bucket.new_key(str(i)) + objects_size[key.name] = object_size thr = threading.Thread(target = set_contents_from_string, args=(key, content,)) thr.start() client_threads.append(thr) @@ -1769,21 +1775,21 @@ def test_ps_s3_notification_multi_delete_on_master(): keys = list(bucket.list()) start_time = time.time() - delete_all_objects(zones[0].conn, bucket_name) + delete_all_objects(master_zone.conn, bucket_name) time_diff = time.time() - start_time print('average time for deletion + http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds') print('wait for 5sec for the messages...') time.sleep(5) - + # check http receiver - http_server.verify_s3_events(keys, exact_match=True, deletions=True) + http_server.verify_s3_events(keys, exact_match=True, deletions=True, expected_sizes=objects_size) # cleanup topic_conf.del_config() s3_notification_conf.del_config(notification=notification_name) # delete the bucket - zones[0].delete_bucket(bucket_name) + master_zone.delete_bucket(bucket_name) http_server.close() @@ -1825,10 +1831,13 @@ def test_ps_s3_notification_push_http_on_master(): # create objects in the bucket client_threads = [] + objects_size = {} start_time = time.time() - content = 'bar' for i in range(number_of_objects): + object_size = randint(1, 1024) + content = str(os.urandom(object_size)) key = bucket.new_key(str(i)) + objects_size[key.name] = object_size thr = threading.Thread(target = set_contents_from_string, args=(key, content,)) thr.start() client_threads.append(thr) @@ -1842,8 +1851,7 @@ def test_ps_s3_notification_push_http_on_master(): # check http receiver keys = list(bucket.list()) - print('total number of objects: ' + str(len(keys))) - http_server.verify_s3_events(keys, exact_match=True) + http_server.verify_s3_events(keys, exact_match=True, deletions=False, expected_sizes=objects_size) # delete objects from the bucket client_threads = [] @@ -1861,7 +1869,7 @@ def test_ps_s3_notification_push_http_on_master(): time.sleep(5) # check http receiver - http_server.verify_s3_events(keys, exact_match=True, deletions=True) + http_server.verify_s3_events(keys, exact_match=True, deletions=True, expected_sizes=objects_size) # cleanup topic_conf.del_config() @@ -2710,7 +2718,8 @@ def test_ps_s3_multipart_on_master(): # create objects in the bucket using multi-part upload fp = tempfile.NamedTemporaryFile(mode='w+b') - content = bytearray(os.urandom(1024*1024)) + object_size = 10*1024*1024 + content = bytearray(os.urandom(object_size)) fp.write(content) fp.flush() fp.seek(0) @@ -2735,6 +2744,7 @@ def test_ps_s3_multipart_on_master(): assert_equal(len(events), 1) assert_equal(events[0]['Records'][0]['eventName'], 's3:ObjectCreated:CompleteMultipartUpload') assert_equal(events[0]['Records'][0]['s3']['configurationId'], notification_name+'_3') + print events[0]['Records'][0]['s3']['object']['size'] # cleanup stop_amqp_receiver(receiver1, task1)