]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
RGW:pubsub publish commit with etag populated
authorAli Masarwa <ali.saed.masarwa@gmail.com>
Sun, 19 Nov 2023 13:29:28 +0000 (15:29 +0200)
committerAli Masarwa <ali.saed.masarwa@gmail.com>
Thu, 14 Dec 2023 09:49:04 +0000 (11:49 +0200)
Signed-off-by: Ali Masarwa <ali.saed.masarwa@gmail.com>
src/rgw/driver/daos/rgw_sal_daos.cc
src/rgw/driver/motr/rgw_sal_motr.cc
src/rgw/driver/posix/rgw_sal_posix.cc
src/rgw/driver/rados/rgw_sal_rados.cc
src/rgw/rgw_op.cc
src/rgw/rgw_op.h
src/rgw/rgw_sal_dbstore.cc
src/test/rgw/bucket_notification/test_bn.py

index 6943205657973511b1e5a75e354e4a3507923299..21252c906daaf2d9cfbd400daa14a807c953d9a4 100644 (file)
@@ -1694,7 +1694,7 @@ int DaosMultipartUpload::complete(
   int marker = 0;
   uint64_t min_part_size = cct->_conf->rgw_multipart_min_part_size;
   auto etags_iter = part_etags.begin();
-  rgw::sal::Attrs attrs = target_obj->get_attrs();
+  rgw::sal::Attrs& attrs = target_obj->get_attrs();
 
   do {
     ldpp_dout(dpp, 20) << "DaosMultipartUpload::complete(): list_parts()"
index 08053b9b90c10d0469a0d0c41b572ec75969b2da..6a97ef2f01e8fa02305462722ebec310f8c4a5ad 100644 (file)
@@ -2683,7 +2683,7 @@ int MotrMultipartUpload::complete(const DoutPrefixProvider *dpp,
   int marker = 0;
   uint64_t min_part_size = cct->_conf->rgw_multipart_min_part_size;
   auto etags_iter = part_etags.begin();
-  rgw::sal::Attrs attrs = target_obj->get_attrs();
+  rgw::sal::Attrs& attrs = target_obj->get_attrs();
 
   do {
     ldpp_dout(dpp, 20) << "MotrMultipartUpload::complete(): list_parts()" << dendl;
index 5c1e50ca5c93cf79375e8661fc0cc1753ead3054..f5b79f9b0933b3c4a3e8b8c618458a07a6117825 100644 (file)
@@ -2612,7 +2612,7 @@ int POSIXMultipartUpload::complete(const DoutPrefixProvider *dpp,
   int marker = 0;
   uint64_t min_part_size = cct->_conf->rgw_multipart_min_part_size;
   auto etags_iter = part_etags.begin();
-  rgw::sal::Attrs attrs = target_obj->get_attrs();
+  rgw::sal::Attrs& attrs = target_obj->get_attrs();
 
   do {
     ret = list_parts(dpp, cct, max_parts, marker, &marker, &truncated, y);
index 5ede8d44fa9b1ae4d260dc589500d418aa3794db..171978c68629a335d5c7af046afe9b12867236f2 100644 (file)
@@ -2422,7 +2422,7 @@ int RadosMultipartUpload::complete(const DoutPrefixProvider *dpp,
   int marker = 0;
   uint64_t min_part_size = cct->_conf->rgw_multipart_min_part_size;
   auto etags_iter = part_etags.begin();
-  rgw::sal::Attrs attrs = target_obj->get_attrs();
+  rgw::sal::Attrs& attrs = target_obj->get_attrs();
 
   do {
     ret = list_parts(dpp, cct, max_parts, marker, &marker, &truncated, y);
index 750ad7cb7739c7978f5ca8113be89360f561bbe7..57a21cbb55bf20b6ad171086c09d0c75a54d98ae 100644 (file)
@@ -6591,9 +6591,6 @@ void RGWCompleteMultipart::execute(optional_yield y)
   RGWMultiCompleteUpload *parts;
   RGWMultiXMLParser parser;
   std::unique_ptr<rgw::sal::MultipartUpload> upload;
-  off_t ofs = 0;
-  std::unique_ptr<rgw::sal::Object> meta_obj;
-  std::unique_ptr<rgw::sal::Object> target_obj;
   uint64_t olh_epoch = 0;
 
   op_ret = get_params(y);
@@ -6682,8 +6679,8 @@ void RGWCompleteMultipart::execute(optional_yield y)
   
 
   // make reservation for notification if needed
-  std::unique_ptr<rgw::sal::Notification> res
-    = driver->get_notification(meta_obj.get(), nullptr, s, rgw::notify::ObjectCreatedCompleteMultipartUpload, y, &s->object->get_name());
+  res = driver->get_notification(meta_obj.get(), nullptr, s, rgw::notify::ObjectCreatedCompleteMultipartUpload, y,
+                                 &s->object->get_name());
   op_ret = res->publish_reserve(this);
   if (op_ret < 0) {
     return;
@@ -6706,21 +6703,10 @@ void RGWCompleteMultipart::execute(optional_yield y)
     return;
   }
 
-  // remove the upload meta object ; the meta object is not versioned
-  // when the bucket is, as that would add an unneeded delete marker
-  int r = meta_obj->delete_object(this, y, true /* prevent versioning */);
-  if (r >= 0)  {
-    /* serializer's exclusive lock is released */
-    serializer->clear_locked();
-  } else {
-    ldpp_dout(this, 0) << "WARNING: failed to remove object " << meta_obj << dendl;
-  }
-
-  // send request to notification manager
-  int ret = res->publish_commit(this, ofs, upload->get_mtime(), etag, target_obj->get_instance());
-  if (ret < 0) {
-    ldpp_dout(this, 1) << "ERROR: publishing notification failed, with error: " << ret << dendl;
-    // too late to rollback operation, hence op_ret is not set here
+  upload_time = upload->get_mtime();
+  int r = serializer->unlock();
+  if (r < 0) {
+    ldpp_dout(this, 0) << "WARNING: failed to unlock " << *serializer.get() << dendl;
   }
 } // RGWCompleteMultipart::execute
 
@@ -6773,7 +6759,42 @@ void RGWCompleteMultipart::complete()
     }
   }
 
-  etag = s->object->get_attrs()[RGW_ATTR_ETAG].to_str();
+  if (op_ret >= 0 && target_obj.get() != nullptr) {
+    s->object->set_attrs(target_obj->get_attrs());
+    etag = s->object->get_attrs()[RGW_ATTR_ETAG].to_str();
+    // send request to notification manager
+    if (res.get() != nullptr) {
+      int ret = res->publish_commit(this, ofs, upload_time, etag, target_obj->get_instance());
+      if (ret < 0) {
+        ldpp_dout(this, 1) << "ERROR: publishing notification failed, with error: " << ret << dendl;
+        // too late to rollback operation, hence op_ret is not set here
+      }
+    } else {
+      ldpp_dout(this, 1) << "ERROR: reservation is null" << dendl;
+    }
+  } else {
+    ldpp_dout(this, 1) << "ERROR: either op_ret is negative (execute failed) or target_obj is null, op_ret: "
+                       << op_ret << dendl;
+  }
+
+  // remove the upload meta object ; the meta object is not versioned
+  // when the bucket is, as that would add an unneeded delete marker
+  // moved to complete to prevent segmentation fault in publish commit
+  if (meta_obj.get() != nullptr) {
+    int ret = meta_obj->delete_object(this, null_yield, true /* prevent versioning */);
+    if (ret >= 0) {
+      /* serializer's exclusive lock is released */
+      serializer->clear_locked();
+    } else {
+      ldpp_dout(this, 0) << "WARNING: failed to remove object " << meta_obj << ", ret: " << ret << dendl;
+    }
+  } else {
+    ldpp_dout(this, 0) << "WARNING: meta_obj is null" << dendl;
+  }
+
+  res.reset();
+  meta_obj.reset();
+  target_obj.reset();
 
   send_response();
 }
index 9314d454c791856e7e3d9b665e1670a7a0085715..fcfb24786e8f2e00dac0e06e6e8075387bcc829e 100644 (file)
@@ -1835,6 +1835,11 @@ protected:
   bufferlist data;
   std::unique_ptr<rgw::sal::MPSerializer> serializer;
   jspan_ptr multipart_trace;
+  ceph::real_time upload_time;
+  std::unique_ptr<rgw::sal::Object> target_obj;
+  std::unique_ptr<rgw::sal::Notification> res;
+  std::unique_ptr<rgw::sal::Object> meta_obj;
+  off_t ofs = 0;
 
 public:
   RGWCompleteMultipart() {}
index 308404c87b88bc3220fcca9532d24cdd153c1ab6..4edb30166dbd2da98afedf516d02d96c9f892a25 100644 (file)
@@ -932,7 +932,7 @@ namespace rgw::sal {
     int marker = 0;
     uint64_t min_part_size = cct->_conf->rgw_multipart_min_part_size;
     auto etags_iter = part_etags.begin();
-    rgw::sal::Attrs attrs = target_obj->get_attrs();
+    rgw::sal::Attrs& attrs = target_obj->get_attrs();
 
     ofs = 0;
     accounted_size = 0;
index 075d47466a2368da41fdcafe418cb207f9d81e30..d493a57e004e1c06ff17866dfdd5d54c218911c7 100644 (file)
@@ -2397,6 +2397,70 @@ def test_http_post_object_upload():
     conn1.delete_bucket(Bucket=bucket_name)
 
 
+@attr('mpu_test')
+def test_ps_s3_multipart_on_master_http():
+    """ test http multipart object upload on master"""
+    conn = connection()
+    zonegroup = 'default'
+
+    # create random port for the http server
+    host = get_ip()
+    port = random.randint(10000, 20000)
+    # start an http server in a separate thread
+    http_server = StreamingHTTPServer(host, port, num_workers=10)
+
+    # create bucket
+    bucket_name = gen_bucket_name()
+    bucket = conn.create_bucket(bucket_name)
+    topic_name = bucket_name + TOPIC_SUFFIX
+
+    # create s3 topic
+    endpoint_address = 'http://'+host+':'+str(port)
+    endpoint_args = 'push-endpoint='+endpoint_address
+    opaque_data = 'http://1.2.3.4:8888'
+    topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args, opaque_data=opaque_data)
+    topic_arn = topic_conf.set_config()
+    # create s3 notification
+    notification_name = bucket_name + NOTIFICATION_SUFFIX
+    topic_conf_list = [{'Id': notification_name,
+                        'TopicArn': topic_arn,
+                        'Events': []
+                        }]
+    s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
+    response, status = s3_notification_conf.set_config()
+    assert_equal(status/100, 2)
+
+    # create objects in the bucket
+    client_threads = []
+    content = str(os.urandom(20*1024*1024))
+    key = bucket.new_key('obj')
+    thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
+    thr.start()
+    client_threads.append(thr)
+    [thr.join() for thr in client_threads]
+
+    print('wait for 5sec for the messages...')
+    time.sleep(5)
+
+    # check http receiver
+    keys = list(bucket.list())
+    print('total number of objects: ' + str(len(keys)))
+    events = http_server.get_and_reset_events()
+    for event in events:
+        assert_equal(event['Records'][0]['opaqueData'], opaque_data)
+        assert_true(event['Records'][0]['s3']['object']['eTag'] != '')
+
+    # cleanup
+    for key in keys:
+        key.delete()
+    [thr.join() for thr in client_threads]
+    topic_conf.del_config()
+    s3_notification_conf.del_config(notification=notification_name)
+    # delete the bucket
+    conn.delete_bucket(bucket_name)
+    http_server.close()
+
+
 @attr('amqp_test')
 def test_ps_s3_multipart_on_master():
     """ test multipart object upload on master"""