int marker = 0;
uint64_t min_part_size = cct->_conf->rgw_multipart_min_part_size;
auto etags_iter = part_etags.begin();
- rgw::sal::Attrs attrs = target_obj->get_attrs();
+ rgw::sal::Attrs& attrs = target_obj->get_attrs();
do {
ldpp_dout(dpp, 20) << "DaosMultipartUpload::complete(): list_parts()"
int marker = 0;
uint64_t min_part_size = cct->_conf->rgw_multipart_min_part_size;
auto etags_iter = part_etags.begin();
- rgw::sal::Attrs attrs = target_obj->get_attrs();
+ rgw::sal::Attrs& attrs = target_obj->get_attrs();
do {
ldpp_dout(dpp, 20) << "MotrMultipartUpload::complete(): list_parts()" << dendl;
int marker = 0;
uint64_t min_part_size = cct->_conf->rgw_multipart_min_part_size;
auto etags_iter = part_etags.begin();
- rgw::sal::Attrs attrs = target_obj->get_attrs();
+ rgw::sal::Attrs& attrs = target_obj->get_attrs();
do {
ret = list_parts(dpp, cct, max_parts, marker, &marker, &truncated, y);
int marker = 0;
uint64_t min_part_size = cct->_conf->rgw_multipart_min_part_size;
auto etags_iter = part_etags.begin();
- rgw::sal::Attrs attrs = target_obj->get_attrs();
+ rgw::sal::Attrs& attrs = target_obj->get_attrs();
do {
ret = list_parts(dpp, cct, max_parts, marker, &marker, &truncated, y);
RGWMultiCompleteUpload *parts;
RGWMultiXMLParser parser;
std::unique_ptr<rgw::sal::MultipartUpload> upload;
- off_t ofs = 0;
- std::unique_ptr<rgw::sal::Object> meta_obj;
- std::unique_ptr<rgw::sal::Object> target_obj;
uint64_t olh_epoch = 0;
op_ret = get_params(y);
// make reservation for notification if needed
- std::unique_ptr<rgw::sal::Notification> res
- = driver->get_notification(meta_obj.get(), nullptr, s, rgw::notify::ObjectCreatedCompleteMultipartUpload, y, &s->object->get_name());
+ res = driver->get_notification(meta_obj.get(), nullptr, s, rgw::notify::ObjectCreatedCompleteMultipartUpload, y,
+ &s->object->get_name());
op_ret = res->publish_reserve(this);
if (op_ret < 0) {
return;
return;
}
- // remove the upload meta object ; the meta object is not versioned
- // when the bucket is, as that would add an unneeded delete marker
- int r = meta_obj->delete_object(this, y, true /* prevent versioning */);
- if (r >= 0) {
- /* serializer's exclusive lock is released */
- serializer->clear_locked();
- } else {
- ldpp_dout(this, 0) << "WARNING: failed to remove object " << meta_obj << dendl;
- }
-
- // send request to notification manager
- int ret = res->publish_commit(this, ofs, upload->get_mtime(), etag, target_obj->get_instance());
- if (ret < 0) {
- ldpp_dout(this, 1) << "ERROR: publishing notification failed, with error: " << ret << dendl;
- // too late to rollback operation, hence op_ret is not set here
+ upload_time = upload->get_mtime();
+ int r = serializer->unlock();
+ if (r < 0) {
+ ldpp_dout(this, 0) << "WARNING: failed to unlock " << *serializer.get() << dendl;
}
} // RGWCompleteMultipart::execute
}
}
- etag = s->object->get_attrs()[RGW_ATTR_ETAG].to_str();
+ if (op_ret >= 0 && target_obj.get() != nullptr) {
+ s->object->set_attrs(target_obj->get_attrs());
+ etag = s->object->get_attrs()[RGW_ATTR_ETAG].to_str();
+ // send request to notification manager
+ if (res.get() != nullptr) {
+ int ret = res->publish_commit(this, ofs, upload_time, etag, target_obj->get_instance());
+ if (ret < 0) {
+ ldpp_dout(this, 1) << "ERROR: publishing notification failed, with error: " << ret << dendl;
+ // too late to rollback operation, hence op_ret is not set here
+ }
+ } else {
+ ldpp_dout(this, 1) << "ERROR: reservation is null" << dendl;
+ }
+ } else {
+ ldpp_dout(this, 1) << "ERROR: either op_ret is negative (execute failed) or target_obj is null, op_ret: "
+ << op_ret << dendl;
+ }
+
+ // remove the upload meta object ; the meta object is not versioned
+ // when the bucket is, as that would add an unneeded delete marker
+ // moved to complete to prevent segmentation fault in publish commit
+ if (meta_obj.get() != nullptr) {
+ int ret = meta_obj->delete_object(this, null_yield, true /* prevent versioning */);
+ if (ret >= 0) {
+ /* serializer's exclusive lock is released */
+ serializer->clear_locked();
+ } else {
+ ldpp_dout(this, 0) << "WARNING: failed to remove object " << meta_obj << ", ret: " << ret << dendl;
+ }
+ } else {
+ ldpp_dout(this, 0) << "WARNING: meta_obj is null" << dendl;
+ }
+
+ res.reset();
+ meta_obj.reset();
+ target_obj.reset();
send_response();
}
bufferlist data;
std::unique_ptr<rgw::sal::MPSerializer> serializer;
jspan_ptr multipart_trace;
+ ceph::real_time upload_time;
+ std::unique_ptr<rgw::sal::Object> target_obj;
+ std::unique_ptr<rgw::sal::Notification> res;
+ std::unique_ptr<rgw::sal::Object> meta_obj;
+ off_t ofs = 0;
public:
RGWCompleteMultipart() {}
int marker = 0;
uint64_t min_part_size = cct->_conf->rgw_multipart_min_part_size;
auto etags_iter = part_etags.begin();
- rgw::sal::Attrs attrs = target_obj->get_attrs();
+ rgw::sal::Attrs& attrs = target_obj->get_attrs();
ofs = 0;
accounted_size = 0;
conn1.delete_bucket(Bucket=bucket_name)
+@attr('mpu_test')
+def test_ps_s3_multipart_on_master_http():
+ """ test http multipart object upload on master"""
+ conn = connection()
+ zonegroup = 'default'
+
+ # create random port for the http server
+ host = get_ip()
+ port = random.randint(10000, 20000)
+ # start an http server in a separate thread
+ http_server = StreamingHTTPServer(host, port, num_workers=10)
+
+ # create bucket
+ bucket_name = gen_bucket_name()
+ bucket = conn.create_bucket(bucket_name)
+ topic_name = bucket_name + TOPIC_SUFFIX
+
+ # create s3 topic
+ endpoint_address = 'http://'+host+':'+str(port)
+ endpoint_args = 'push-endpoint='+endpoint_address
+ opaque_data = 'http://1.2.3.4:8888'
+ topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args, opaque_data=opaque_data)
+ topic_arn = topic_conf.set_config()
+ # create s3 notification
+ notification_name = bucket_name + NOTIFICATION_SUFFIX
+ topic_conf_list = [{'Id': notification_name,
+ 'TopicArn': topic_arn,
+ 'Events': []
+ }]
+ s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
+ response, status = s3_notification_conf.set_config()
+ assert_equal(status/100, 2)
+
+ # create objects in the bucket
+ client_threads = []
+ content = str(os.urandom(20*1024*1024))
+ key = bucket.new_key('obj')
+ thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
+ thr.start()
+ client_threads.append(thr)
+ [thr.join() for thr in client_threads]
+
+ print('wait for 5sec for the messages...')
+ time.sleep(5)
+
+ # check http receiver
+ keys = list(bucket.list())
+ print('total number of objects: ' + str(len(keys)))
+ events = http_server.get_and_reset_events()
+ for event in events:
+ assert_equal(event['Records'][0]['opaqueData'], opaque_data)
+ assert_true(event['Records'][0]['s3']['object']['eTag'] != '')
+
+ # cleanup
+ for key in keys:
+ key.delete()
+ [thr.join() for thr in client_threads]
+ topic_conf.del_config()
+ s3_notification_conf.del_config(notification=notification_name)
+ # delete the bucket
+ conn.delete_bucket(bucket_name)
+ http_server.close()
+
+
@attr('amqp_test')
def test_ps_s3_multipart_on_master():
""" test multipart object upload on master"""