]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
RGW:pubsub publish commit with etag populated 56453/head
authorAli Masarwa <ali.saed.masarwa@gmail.com>
Sun, 19 Nov 2023 13:29:28 +0000 (15:29 +0200)
committerYuval Lifshitz <ylifshit@ibm.com>
Mon, 25 Mar 2024 12:06:38 +0000 (12:06 +0000)
Signed-off-by: Ali Masarwa <ali.saed.masarwa@gmail.com>
(cherry picked from commit 18c202c979aefe7c34de3688036586a070addff8)

 Conflicts:
src/rgw/driver/posix/rgw_sal_posix.cc
src/rgw/rgw_op.cc
src/rgw/rgw_op.h

src/rgw/driver/rados/rgw_sal_rados.cc
src/rgw/rgw_op.cc
src/rgw/rgw_op.h
src/rgw/rgw_sal_daos.cc
src/rgw/rgw_sal_dbstore.cc
src/rgw/rgw_sal_motr.cc
src/test/rgw/bucket_notification/test_bn.py

index 723d1ee96177cc61f016bf5f7862ad6c78204549..414f44bb8163b5e7e6233d93cc99c04765ec3c64 100644 (file)
@@ -2697,7 +2697,7 @@ int RadosMultipartUpload::complete(const DoutPrefixProvider *dpp,
   int marker = 0;
   uint64_t min_part_size = cct->_conf->rgw_multipart_min_part_size;
   auto etags_iter = part_etags.begin();
-  rgw::sal::Attrs attrs = target_obj->get_attrs();
+  rgw::sal::Attrs& attrs = target_obj->get_attrs();
 
   do {
     ret = list_parts(dpp, cct, max_parts, marker, &marker, &truncated);
index 64f24cc4499df55d8db6548d214447992a563d57..bb0bfdbfa7f5a4aa91272044f1015679ee544b68 100644 (file)
@@ -6455,9 +6455,6 @@ void RGWCompleteMultipart::execute(optional_yield y)
   RGWMultiCompleteUpload *parts;
   RGWMultiXMLParser parser;
   std::unique_ptr<rgw::sal::MultipartUpload> upload;
-  off_t ofs = 0;
-  std::unique_ptr<rgw::sal::Object> meta_obj;
-  std::unique_ptr<rgw::sal::Object> target_obj;
   uint64_t olh_epoch = 0;
 
   op_ret = get_params(y);
@@ -6546,8 +6543,8 @@ void RGWCompleteMultipart::execute(optional_yield y)
   
 
   // make reservation for notification if needed
-  std::unique_ptr<rgw::sal::Notification> res
-    = driver->get_notification(meta_obj.get(), nullptr, s, rgw::notify::ObjectCreatedCompleteMultipartUpload, y, &s->object->get_name());
+  res = driver->get_notification(meta_obj.get(), nullptr, s, rgw::notify::ObjectCreatedCompleteMultipartUpload, y,
+                                 &s->object->get_name());
   op_ret = res->publish_reserve(this);
   if (op_ret < 0) {
     return;
@@ -6570,21 +6567,10 @@ void RGWCompleteMultipart::execute(optional_yield y)
     return;
   }
 
-  // remove the upload meta object ; the meta object is not versioned
-  // when the bucket is, as that would add an unneeded delete marker
-  int r = meta_obj->delete_object(this, y, rgw::sal::FLAG_PREVENT_VERSIONING | rgw::sal::FLAG_LOG_OP);
-  if (r >= 0)  {
-    /* serializer's exclusive lock is released */
-    serializer->clear_locked();
-  } else {
-    ldpp_dout(this, 0) << "WARNING: failed to remove object " << meta_obj << dendl;
-  }
-
-  // send request to notification manager
-  int ret = res->publish_commit(this, ofs, upload->get_mtime(), etag, target_obj->get_instance());
-  if (ret < 0) {
-    ldpp_dout(this, 1) << "ERROR: publishing notification failed, with error: " << ret << dendl;
-    // too late to rollback operation, hence op_ret is not set here
+  upload_time = upload->get_mtime();
+  int r = serializer->unlock();
+  if (r < 0) {
+    ldpp_dout(this, 0) << "WARNING: failed to unlock " << *serializer.get() << dendl;
   }
 } // RGWCompleteMultipart::execute
 
@@ -6637,7 +6623,42 @@ void RGWCompleteMultipart::complete()
     }
   }
 
-  etag = s->object->get_attrs()[RGW_ATTR_ETAG].to_str();
+  if (op_ret >= 0 && target_obj.get() != nullptr) {
+    s->object->set_attrs(target_obj->get_attrs());
+    etag = s->object->get_attrs()[RGW_ATTR_ETAG].to_str();
+    // send request to notification manager
+    if (res.get() != nullptr) {
+      int ret = res->publish_commit(this, ofs, upload_time, etag, target_obj->get_instance());
+      if (ret < 0) {
+        ldpp_dout(this, 1) << "ERROR: publishing notification failed, with error: " << ret << dendl;
+        // too late to rollback operation, hence op_ret is not set here
+      }
+    } else {
+      ldpp_dout(this, 1) << "ERROR: reservation is null" << dendl;
+    }
+  } else {
+    ldpp_dout(this, 1) << "ERROR: either op_ret is negative (execute failed) or target_obj is null, op_ret: "
+                       << op_ret << dendl;
+  }
+
+  // remove the upload meta object ; the meta object is not versioned
+  // when the bucket is, as that would add an unneeded delete marker
+  // moved to complete to prevent segmentation fault in publish commit
+  if (meta_obj.get() != nullptr) {
+    int ret = meta_obj->delete_object(this, null_yield, rgw::sal::FLAG_PREVENT_VERSIONING | rgw::sal::FLAG_LOG_OP);
+    if (ret >= 0) {
+      /* serializer's exclusive lock is released */
+      serializer->clear_locked();
+    } else {
+      ldpp_dout(this, 0) << "WARNING: failed to remove object " << meta_obj << ", ret: " << ret << dendl;
+    }
+  } else {
+    ldpp_dout(this, 0) << "WARNING: meta_obj is null" << dendl;
+  }
+
+  res.reset();
+  meta_obj.reset();
+  target_obj.reset();
 
   send_response();
 }
index f398b5b15ba0764eb47fcfee254a8a5ee2e62aef..2bffcddcc8c462eb10b02462dddb0fd9c699a78d 100644 (file)
@@ -1876,6 +1876,11 @@ protected:
   bufferlist data;
   std::unique_ptr<rgw::sal::MPSerializer> serializer;
   jspan multipart_trace;
+  ceph::real_time upload_time;
+  std::unique_ptr<rgw::sal::Object> target_obj;
+  std::unique_ptr<rgw::sal::Notification> res;
+  std::unique_ptr<rgw::sal::Object> meta_obj;
+  off_t ofs = 0;
 
 public:
   RGWCompleteMultipart() {}
index 2647e18823a13088d1b6a3cfdba6ca4156f0866b..00e9157ac31253544fc8a74e7e24d0fab28f9c15 100644 (file)
@@ -1737,7 +1737,7 @@ int DaosMultipartUpload::complete(
   int marker = 0;
   uint64_t min_part_size = cct->_conf->rgw_multipart_min_part_size;
   auto etags_iter = part_etags.begin();
-  rgw::sal::Attrs attrs = target_obj->get_attrs();
+  rgw::sal::Attrs& attrs = target_obj->get_attrs();
 
   do {
     ldpp_dout(dpp, 20) << "DaosMultipartUpload::complete(): list_parts()"
index a6eca697538f2b4a95e822d2dae9cec0a5cb065d..ca1b3f33396ac3d620f0527615afe82ba872af8e 100644 (file)
@@ -1057,7 +1057,7 @@ namespace rgw::sal {
     int marker = 0;
     uint64_t min_part_size = cct->_conf->rgw_multipart_min_part_size;
     auto etags_iter = part_etags.begin();
-    rgw::sal::Attrs attrs = target_obj->get_attrs();
+    rgw::sal::Attrs& attrs = target_obj->get_attrs();
 
     ofs = 0;
     accounted_size = 0;
index a7a205e8e80e1439dfdf8040e775299bf4bde84e..1ae7565f002e1bd80953340bb6c513f1e02dc270 100644 (file)
@@ -2735,7 +2735,7 @@ int MotrMultipartUpload::complete(const DoutPrefixProvider *dpp,
   int marker = 0;
   uint64_t min_part_size = cct->_conf->rgw_multipart_min_part_size;
   auto etags_iter = part_etags.begin();
-  rgw::sal::Attrs attrs = target_obj->get_attrs();
+  rgw::sal::Attrs& attrs = target_obj->get_attrs();
 
   do {
     ldpp_dout(dpp, 20) << "MotrMultipartUpload::complete(): list_parts()" << dendl;
index 87a2acb76a20c72830bb6b9cba4ec659997c331a..ee89d326d433c47e777e020dfc41f5d625bb785d 100644 (file)
@@ -2346,6 +2346,71 @@ def test_http_post_object_upload():
     conn1.delete_bucket(Bucket=bucket_name)
 
 
+@attr('mpu_test')
+def test_ps_s3_multipart_on_master_http():
+    """ test http multipart object upload on master"""
+    conn = connection()
+    zonegroup = 'default'
+
+    # create random port for the http server
+    host = get_ip()
+    port = random.randint(10000, 20000)
+    # start an http server in a separate thread
+    http_server = StreamingHTTPServer(host, port, num_workers=10)
+
+    # create bucket
+    bucket_name = gen_bucket_name()
+    bucket = conn.create_bucket(bucket_name)
+    topic_name = bucket_name + TOPIC_SUFFIX
+
+    # create s3 topic
+    endpoint_address = 'http://'+host+':'+str(port)
+    endpoint_args = 'push-endpoint='+endpoint_address
+    opaque_data = 'http://1.2.3.4:8888'
+    topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args, opaque_data=opaque_data)
+    topic_arn = topic_conf.set_config()
+    # create s3 notification
+    notification_name = bucket_name + NOTIFICATION_SUFFIX
+    topic_conf_list = [{'Id': notification_name,
+                        'TopicArn': topic_arn,
+                        'Events': []
+                        }]
+    s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
+    response, status = s3_notification_conf.set_config()
+    assert_equal(status/100, 2)
+
+    # create objects in the bucket
+    client_threads = []
+    content = str(os.urandom(20*1024*1024))
+    key = bucket.new_key('obj')
+    thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
+    thr.start()
+    client_threads.append(thr)
+    [thr.join() for thr in client_threads]
+
+    print('wait for 5sec for the messages...')
+    time.sleep(5)
+
+    # check http receiver
+    keys = list(bucket.list())
+    print('total number of objects: ' + str(len(keys)))
+    events = http_server.get_and_reset_events()
+    for event in events:
+        assert_equal(event['Records'][0]['opaqueData'], opaque_data)
+        assert_equal(event['Records'][0]['s3']['object']['eTag'] != '', True)
+        print(event['Records'][0]['s3']['object'])
+
+    # cleanup
+    for key in keys:
+        key.delete()
+    [thr.join() for thr in client_threads]
+    topic_conf.del_config()
+    s3_notification_conf.del_config(notification=notification_name)
+    # delete the bucket
+    conn.delete_bucket(bucket_name)
+    http_server.close()
+
+
 @attr('amqp_test')
 def test_ps_s3_multipart_on_master():
     """ test multipart object upload on master"""