From 18c202c979aefe7c34de3688036586a070addff8 Mon Sep 17 00:00:00 2001 From: Ali Masarwa Date: Sun, 19 Nov 2023 15:29:28 +0200 Subject: [PATCH] RGW:pubsub publish commit with etag populated Signed-off-by: Ali Masarwa --- src/rgw/driver/daos/rgw_sal_daos.cc | 2 +- src/rgw/driver/motr/rgw_sal_motr.cc | 2 +- src/rgw/driver/posix/rgw_sal_posix.cc | 2 +- src/rgw/driver/rados/rgw_sal_rados.cc | 2 +- src/rgw/rgw_op.cc | 63 +++++++++++++------- src/rgw/rgw_op.h | 5 ++ src/rgw/rgw_sal_dbstore.cc | 2 +- src/test/rgw/bucket_notification/test_bn.py | 64 +++++++++++++++++++++ 8 files changed, 116 insertions(+), 26 deletions(-) diff --git a/src/rgw/driver/daos/rgw_sal_daos.cc b/src/rgw/driver/daos/rgw_sal_daos.cc index 6943205657973..21252c906daaf 100644 --- a/src/rgw/driver/daos/rgw_sal_daos.cc +++ b/src/rgw/driver/daos/rgw_sal_daos.cc @@ -1694,7 +1694,7 @@ int DaosMultipartUpload::complete( int marker = 0; uint64_t min_part_size = cct->_conf->rgw_multipart_min_part_size; auto etags_iter = part_etags.begin(); - rgw::sal::Attrs attrs = target_obj->get_attrs(); + rgw::sal::Attrs& attrs = target_obj->get_attrs(); do { ldpp_dout(dpp, 20) << "DaosMultipartUpload::complete(): list_parts()" diff --git a/src/rgw/driver/motr/rgw_sal_motr.cc b/src/rgw/driver/motr/rgw_sal_motr.cc index 08053b9b90c10..6a97ef2f01e8f 100644 --- a/src/rgw/driver/motr/rgw_sal_motr.cc +++ b/src/rgw/driver/motr/rgw_sal_motr.cc @@ -2683,7 +2683,7 @@ int MotrMultipartUpload::complete(const DoutPrefixProvider *dpp, int marker = 0; uint64_t min_part_size = cct->_conf->rgw_multipart_min_part_size; auto etags_iter = part_etags.begin(); - rgw::sal::Attrs attrs = target_obj->get_attrs(); + rgw::sal::Attrs& attrs = target_obj->get_attrs(); do { ldpp_dout(dpp, 20) << "MotrMultipartUpload::complete(): list_parts()" << dendl; diff --git a/src/rgw/driver/posix/rgw_sal_posix.cc b/src/rgw/driver/posix/rgw_sal_posix.cc index 5c1e50ca5c93c..f5b79f9b0933b 100644 --- a/src/rgw/driver/posix/rgw_sal_posix.cc +++ b/src/rgw/driver/posix/rgw_sal_posix.cc @@ -2612,7 +2612,7 @@ int POSIXMultipartUpload::complete(const DoutPrefixProvider *dpp, int marker = 0; uint64_t min_part_size = cct->_conf->rgw_multipart_min_part_size; auto etags_iter = part_etags.begin(); - rgw::sal::Attrs attrs = target_obj->get_attrs(); + rgw::sal::Attrs& attrs = target_obj->get_attrs(); do { ret = list_parts(dpp, cct, max_parts, marker, &marker, &truncated, y); diff --git a/src/rgw/driver/rados/rgw_sal_rados.cc b/src/rgw/driver/rados/rgw_sal_rados.cc index 5ede8d44fa9b1..171978c68629a 100644 --- a/src/rgw/driver/rados/rgw_sal_rados.cc +++ b/src/rgw/driver/rados/rgw_sal_rados.cc @@ -2422,7 +2422,7 @@ int RadosMultipartUpload::complete(const DoutPrefixProvider *dpp, int marker = 0; uint64_t min_part_size = cct->_conf->rgw_multipart_min_part_size; auto etags_iter = part_etags.begin(); - rgw::sal::Attrs attrs = target_obj->get_attrs(); + rgw::sal::Attrs& attrs = target_obj->get_attrs(); do { ret = list_parts(dpp, cct, max_parts, marker, &marker, &truncated, y); diff --git a/src/rgw/rgw_op.cc b/src/rgw/rgw_op.cc index 750ad7cb7739c..57a21cbb55bf2 100644 --- a/src/rgw/rgw_op.cc +++ b/src/rgw/rgw_op.cc @@ -6591,9 +6591,6 @@ void RGWCompleteMultipart::execute(optional_yield y) RGWMultiCompleteUpload *parts; RGWMultiXMLParser parser; std::unique_ptr upload; - off_t ofs = 0; - std::unique_ptr meta_obj; - std::unique_ptr target_obj; uint64_t olh_epoch = 0; op_ret = get_params(y); @@ -6682,8 +6679,8 @@ void RGWCompleteMultipart::execute(optional_yield y) // make reservation for notification if needed - std::unique_ptr res - = driver->get_notification(meta_obj.get(), nullptr, s, rgw::notify::ObjectCreatedCompleteMultipartUpload, y, &s->object->get_name()); + res = driver->get_notification(meta_obj.get(), nullptr, s, rgw::notify::ObjectCreatedCompleteMultipartUpload, y, + &s->object->get_name()); op_ret = res->publish_reserve(this); if (op_ret < 0) { return; @@ -6706,21 +6703,10 @@ void RGWCompleteMultipart::execute(optional_yield y) return; } - // remove the upload meta object ; the meta object is not versioned - // when the bucket is, as that would add an unneeded delete marker - int r = meta_obj->delete_object(this, y, true /* prevent versioning */); - if (r >= 0) { - /* serializer's exclusive lock is released */ - serializer->clear_locked(); - } else { - ldpp_dout(this, 0) << "WARNING: failed to remove object " << meta_obj << dendl; - } - - // send request to notification manager - int ret = res->publish_commit(this, ofs, upload->get_mtime(), etag, target_obj->get_instance()); - if (ret < 0) { - ldpp_dout(this, 1) << "ERROR: publishing notification failed, with error: " << ret << dendl; - // too late to rollback operation, hence op_ret is not set here + upload_time = upload->get_mtime(); + int r = serializer->unlock(); + if (r < 0) { + ldpp_dout(this, 0) << "WARNING: failed to unlock " << *serializer.get() << dendl; } } // RGWCompleteMultipart::execute @@ -6773,7 +6759,42 @@ void RGWCompleteMultipart::complete() } } - etag = s->object->get_attrs()[RGW_ATTR_ETAG].to_str(); + if (op_ret >= 0 && target_obj.get() != nullptr) { + s->object->set_attrs(target_obj->get_attrs()); + etag = s->object->get_attrs()[RGW_ATTR_ETAG].to_str(); + // send request to notification manager + if (res.get() != nullptr) { + int ret = res->publish_commit(this, ofs, upload_time, etag, target_obj->get_instance()); + if (ret < 0) { + ldpp_dout(this, 1) << "ERROR: publishing notification failed, with error: " << ret << dendl; + // too late to rollback operation, hence op_ret is not set here + } + } else { + ldpp_dout(this, 1) << "ERROR: reservation is null" << dendl; + } + } else { + ldpp_dout(this, 1) << "ERROR: either op_ret is negative (execute failed) or target_obj is null, op_ret: " + << op_ret << dendl; + } + + // remove the upload meta object ; the meta object is not versioned + // when the bucket is, as that would add an unneeded delete marker + // moved to complete to prevent segmentation fault in publish commit + if (meta_obj.get() != nullptr) { + int ret = meta_obj->delete_object(this, null_yield, true /* prevent versioning */); + if (ret >= 0) { + /* serializer's exclusive lock is released */ + serializer->clear_locked(); + } else { + ldpp_dout(this, 0) << "WARNING: failed to remove object " << meta_obj << ", ret: " << ret << dendl; + } + } else { + ldpp_dout(this, 0) << "WARNING: meta_obj is null" << dendl; + } + + res.reset(); + meta_obj.reset(); + target_obj.reset(); send_response(); } diff --git a/src/rgw/rgw_op.h b/src/rgw/rgw_op.h index 9314d454c7918..fcfb24786e8f2 100644 --- a/src/rgw/rgw_op.h +++ b/src/rgw/rgw_op.h @@ -1835,6 +1835,11 @@ protected: bufferlist data; std::unique_ptr serializer; jspan_ptr multipart_trace; + ceph::real_time upload_time; + std::unique_ptr target_obj; + std::unique_ptr res; + std::unique_ptr meta_obj; + off_t ofs = 0; public: RGWCompleteMultipart() {} diff --git a/src/rgw/rgw_sal_dbstore.cc b/src/rgw/rgw_sal_dbstore.cc index 308404c87b88b..4edb30166dbd2 100644 --- a/src/rgw/rgw_sal_dbstore.cc +++ b/src/rgw/rgw_sal_dbstore.cc @@ -932,7 +932,7 @@ namespace rgw::sal { int marker = 0; uint64_t min_part_size = cct->_conf->rgw_multipart_min_part_size; auto etags_iter = part_etags.begin(); - rgw::sal::Attrs attrs = target_obj->get_attrs(); + rgw::sal::Attrs& attrs = target_obj->get_attrs(); ofs = 0; accounted_size = 0; diff --git a/src/test/rgw/bucket_notification/test_bn.py b/src/test/rgw/bucket_notification/test_bn.py index 075d47466a236..d493a57e004e1 100644 --- a/src/test/rgw/bucket_notification/test_bn.py +++ b/src/test/rgw/bucket_notification/test_bn.py @@ -2397,6 +2397,70 @@ def test_http_post_object_upload(): conn1.delete_bucket(Bucket=bucket_name) +@attr('mpu_test') +def test_ps_s3_multipart_on_master_http(): + """ test http multipart object upload on master""" + conn = connection() + zonegroup = 'default' + + # create random port for the http server + host = get_ip() + port = random.randint(10000, 20000) + # start an http server in a separate thread + http_server = StreamingHTTPServer(host, port, num_workers=10) + + # create bucket + bucket_name = gen_bucket_name() + bucket = conn.create_bucket(bucket_name) + topic_name = bucket_name + TOPIC_SUFFIX + + # create s3 topic + endpoint_address = 'http://'+host+':'+str(port) + endpoint_args = 'push-endpoint='+endpoint_address + opaque_data = 'http://1.2.3.4:8888' + topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args, opaque_data=opaque_data) + topic_arn = topic_conf.set_config() + # create s3 notification + notification_name = bucket_name + NOTIFICATION_SUFFIX + topic_conf_list = [{'Id': notification_name, + 'TopicArn': topic_arn, + 'Events': [] + }] + s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list) + response, status = s3_notification_conf.set_config() + assert_equal(status/100, 2) + + # create objects in the bucket + client_threads = [] + content = str(os.urandom(20*1024*1024)) + key = bucket.new_key('obj') + thr = threading.Thread(target = set_contents_from_string, args=(key, content,)) + thr.start() + client_threads.append(thr) + [thr.join() for thr in client_threads] + + print('wait for 5sec for the messages...') + time.sleep(5) + + # check http receiver + keys = list(bucket.list()) + print('total number of objects: ' + str(len(keys))) + events = http_server.get_and_reset_events() + for event in events: + assert_equal(event['Records'][0]['opaqueData'], opaque_data) + assert_true(event['Records'][0]['s3']['object']['eTag'] != '') + + # cleanup + for key in keys: + key.delete() + [thr.join() for thr in client_threads] + topic_conf.del_config() + s3_notification_conf.del_config(notification=notification_name) + # delete the bucket + conn.delete_bucket(bucket_name) + http_server.close() + + @attr('amqp_test') def test_ps_s3_multipart_on_master(): """ test multipart object upload on master""" -- 2.39.5