]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw/notifications: send correct size in case of delete marker creation 42355/head
authorYuval Lifshitz <ylifshit@redhat.com>
Thu, 15 Jul 2021 13:40:06 +0000 (16:40 +0300)
committerYuval Lifshitz <ylifshit@redhat.com>
Fri, 16 Jul 2021 11:12:10 +0000 (14:12 +0300)
Fixes: https://tracker.ceph.com/issues/51681
Signed-off-by: Yuval Lifshitz <ylifshit@redhat.com>
doc/radosgw/s3-notification-compatibility.rst
src/rgw/rgw_op.cc
src/test/rgw/bucket_notification/test_bn.py

index 6a6538aec402419c86674a2132b9b33e77701fdc..008c33a59bd8a64490734ac6df26152d46a802e0 100644 (file)
@@ -105,6 +105,10 @@ Event Types
 | ``s3:ReducedRedundancyLostObject``           | Not applicable to Ceph                                      |
 +----------------------------------------------+-----------------+-------------------------------------------+
 
+.. note:: 
+
+   The ``s3:ObjectRemoved:DeleteMarkerCreated`` event presents information on the latest version of the object
+
 Topic Configuration
 -------------------
 In the case of bucket notifications, the topics management API will be derived from `AWS Simple Notification Service API`_. 
index b868a18a93f2e869c2033a9ebc16dee4dfbb2c11..045748d89e807d6b88c8ad53b040ad8fa4ee796e 100644 (file)
@@ -4810,57 +4810,69 @@ void RGWDeleteObj::execute(optional_yield y)
   }
   s->object->set_bucket(s->bucket.get());
 
-  rgw::sal::Attrs attrs;
-
-  bool check_obj_lock = s->object->have_instance() && s->bucket->get_info().obj_lock_enabled();
 
   if (!rgw::sal::Object::empty(s->object.get())) {
-    op_ret = s->object->get_obj_attrs(s->obj_ctx, s->yield, this);
-    if (op_ret < 0) {
-      if (need_object_expiration() || multipart_delete) {
-        return;
-      }
+    uint64_t obj_size = 0;
+    std::string etag;
+    RGWObjectCtx* obj_ctx = static_cast<RGWObjectCtx *>(s->obj_ctx);
+    {
+      RGWObjState* astate = nullptr;
+      bool check_obj_lock = s->object->have_instance() && s->bucket->get_info().obj_lock_enabled();
 
-      if (check_obj_lock) {
-        /* check if obj exists, read orig attrs */
-        if (op_ret == -ENOENT) {
-          /* object maybe delete_marker, skip check_obj_lock*/
-          check_obj_lock = false;
-        } else {
+      op_ret = s->object->get_obj_state(this, obj_ctx, &astate, s->yield, true);
+      if (op_ret < 0) {
+        if (need_object_expiration() || multipart_delete) {
           return;
         }
+
+        if (check_obj_lock) {
+          /* check if obj exists, read orig attrs */
+          if (op_ret == -ENOENT) {
+            /* object maybe delete_marker, skip check_obj_lock*/
+            check_obj_lock = false;
+          } else {
+            return;
+          }
+        }
+      } else {
+        obj_size = astate->size;
+        etag = astate->attrset[RGW_ATTR_ETAG].to_str();
       }
-    } else {
-      attrs = s->object->get_attrs();
-    }
 
-    // ignore return value from get_obj_attrs in all other cases
-    op_ret = 0;
+      // ignore return value from get_obj_attrs in all other cases
+      op_ret = 0;
 
-    if (check_obj_lock) {
-      int object_lock_response = verify_object_lock(this, attrs, bypass_perm, bypass_governance_mode);
-      if (object_lock_response != 0) {
-        op_ret = object_lock_response;
-       if (op_ret == -EACCES) {
-         s->err.message = "forbidden by object lock";
-       }
-        return;
+      if (check_obj_lock) {
+        ceph_assert(astate);
+        int object_lock_response = verify_object_lock(this, astate->attrset, bypass_perm, bypass_governance_mode);
+        if (object_lock_response != 0) {
+          op_ret = object_lock_response;
+          if (op_ret == -EACCES) {
+            s->err.message = "forbidden by object lock";
+          }
+          return;
+        }
       }
-    }
 
-    if (multipart_delete) {
-      const auto slo_attr = attrs.find(RGW_ATTR_SLO_MANIFEST);
+      if (multipart_delete) {
+        if (!astate) {
+          op_ret = -ERR_NOT_SLO_MANIFEST;
+          return;
+        }
+
+        const auto slo_attr = astate->attrset.find(RGW_ATTR_SLO_MANIFEST);
 
-      if (slo_attr != attrs.end()) {
-        op_ret = handle_slo_manifest(slo_attr->second, y);
-        if (op_ret < 0) {
-          ldpp_dout(this, 0) << "ERROR: failed to handle slo manifest ret=" << op_ret << dendl;
+        if (slo_attr != astate->attrset.end()) {
+          op_ret = handle_slo_manifest(slo_attr->second, y);
+          if (op_ret < 0) {
+            ldpp_dout(this, 0) << "ERROR: failed to handle slo manifest ret=" << op_ret << dendl;
+          }
+        } else {
+          op_ret = -ERR_NOT_SLO_MANIFEST;
         }
-      } else {
-        op_ret = -ERR_NOT_SLO_MANIFEST;
-      }
 
-      return;
+        return;
+      }
     }
 
     // make reservation for notification if needed
@@ -4874,9 +4886,8 @@ void RGWDeleteObj::execute(optional_yield y)
       return;
     }
 
-    RGWObjectCtx *obj_ctx = static_cast<RGWObjectCtx *>(s->obj_ctx);
     s->object->set_atomic(s->obj_ctx);
-
+    
     bool ver_restored = false;
     op_ret = s->object->swift_versioning_restore(s->obj_ctx, ver_restored, this);
     if (op_ret < 0) {
@@ -4924,10 +4935,8 @@ void RGWDeleteObj::execute(optional_yield y)
       op_ret = 0;
     }
 
-    const auto obj_state = obj_ctx->get_state(s->object->get_obj());
-
     // send request to notification manager
-    int ret = res->publish_commit(this, obj_state->size, obj_state->mtime, attrs[RGW_ATTR_ETAG].to_str(), version_id);
+    int ret = res->publish_commit(this, obj_size, ceph::real_clock::now(), etag, version_id);
     if (ret < 0) {
       ldpp_dout(this, 1) << "ERROR: publishing notification failed, with error: " << ret << dendl;
       // too late to rollback operation, hence op_ret is not set here
@@ -6879,29 +6888,38 @@ void RGWDeleteMultiObj::execute(optional_yield y)
       }
     }
 
-    // verify_object_lock
-    bool check_obj_lock = obj->have_instance() && bucket->get_info().obj_lock_enabled();
-    if (check_obj_lock) {
-      int get_attrs_response = obj->get_obj_attrs(s->obj_ctx, s->yield, this);
-      if (get_attrs_response < 0) {
-        if (get_attrs_response == -ENOENT) {
+    uint64_t obj_size = 0;
+    std::string etag;
+
+    if (!rgw::sal::Object::empty(obj.get())) {
+      RGWObjState* astate = nullptr;
+      bool check_obj_lock = obj->have_instance() && bucket->get_info().obj_lock_enabled();
+      const auto ret = obj->get_obj_state(this, obj_ctx, &astate, s->yield, true);
+
+      if (ret < 0) {
+        if (ret == -ENOENT) {
           // object maybe delete_marker, skip check_obj_lock
           check_obj_lock = false;
         } else {
           // Something went wrong.
-          send_partial_response(*iter, false, "", get_attrs_response);
+          send_partial_response(*iter, false, "", ret);
           continue;
         }
+      } else {
+        obj_size = astate->size;
+        etag = astate->attrset[RGW_ATTR_ETAG].to_str();
       }
-    }
 
-    if (check_obj_lock) {
-      int object_lock_response = verify_object_lock(this, obj->get_attrs(), bypass_perm, bypass_governance_mode);
-      if (object_lock_response != 0) {
-        send_partial_response(*iter, false, "", object_lock_response);
-        continue;
+      if (check_obj_lock) {
+        ceph_assert(astate);
+        int object_lock_response = verify_object_lock(this, astate->attrset, bypass_perm, bypass_governance_mode);
+        if (object_lock_response != 0) {
+          send_partial_response(*iter, false, "", object_lock_response);
+          continue;
+        }
       }
     }
+
     // make reservation for notification if needed
     const auto versioned_object = s->bucket->versioning_enabled();
     const auto event_type = versioned_object && obj->get_instance().empty() ? 
@@ -6929,12 +6947,8 @@ void RGWDeleteMultiObj::execute(optional_yield y)
 
     send_partial_response(*iter, obj->get_delete_marker(), del_op->result.version_id, op_ret);
 
-    const auto obj_state = obj_ctx->get_state(obj->get_obj());
-    bufferlist etag_bl;
-    const auto etag = obj_state->get_attr(RGW_ATTR_ETAG, etag_bl) ? etag_bl.to_str() : "";
-
     // send request to notification manager
-    int ret = res->publish_commit(this, obj_state->size, obj_state->mtime, etag, version_id);
+    int ret = res->publish_commit(this, obj_size, ceph::real_clock::now(), etag, version_id);
     if (ret < 0) {
       ldpp_dout(this, 1) << "ERROR: publishing notification failed, with error: " << ret << dendl;
       // too late to rollback operation, hence op_ret is not set here
index 62bad82f4b32e7663ffef3230dc197514cfed6d4..2a5d305bc59bf137dca0df11d17e39ade67ccfa3 100644 (file)
@@ -1981,9 +1981,13 @@ def test_ps_s3_versioned_deletion_on_master():
 
     # create objects in the bucket
     key = bucket.new_key('foo')
-    key.set_contents_from_string('bar')
+    content = str(os.urandom(512))
+    size1 = len(content)
+    key.set_contents_from_string(content)
     ver1 = key.version_id
-    key.set_contents_from_string('kaboom')
+    content = str(os.urandom(511))
+    size2 = len(content)
+    key.set_contents_from_string(content)
     ver2 = key.version_id
     # create delete marker (non versioned deletion)
     delete_marker_key = bucket.delete_key(key.name)
@@ -1994,7 +1998,6 @@ def test_ps_s3_versioned_deletion_on_master():
     # versioned deletion
     bucket.delete_key(key.name, version_id=ver2)
     bucket.delete_key(key.name, version_id=ver1)
-    delete_marker_key.delete()
 
     print('wait for 5sec for the messages...')
     time.sleep(5)
@@ -2006,6 +2009,7 @@ def test_ps_s3_versioned_deletion_on_master():
     for event_list in events:
         for event in event_list['Records']:
             version = event['s3']['object']['versionId']
+            size = event['s3']['object']['size']
             if version not in versions:
                 print('version mismatch: '+version+' not in: '+str(versions))
                 assert False 
@@ -2013,19 +2017,22 @@ def test_ps_s3_versioned_deletion_on_master():
                 print('version ok: '+version+' in: '+str(versions))
             if event['eventName'] == 'ObjectRemoved:Delete':
                 delete_events += 1
+                assert size in [size1, size2]
                 assert event['s3']['configurationId'] in [notification_name+'_1', notification_name+'_3']
             if event['eventName'] == 'ObjectRemoved:DeleteMarkerCreated':
                 delete_marker_create_events += 1
+                assert size == size2
                 assert event['s3']['configurationId'] in [notification_name+'_1', notification_name+'_2']
 
-    # 3 key versions were deleted (v1, v2 and the deletion marker)
+    # 2 key versions were deleted
     # notified over the same topic via 2 notifications (1,3)
-    assert_equal(delete_events, 3*2)
+    assert_equal(delete_events, 2*2)
     # 1 deletion marker was created
     # notified over the same topic over 2 notifications (1,2)
     assert_equal(delete_marker_create_events, 1*2)
 
     # cleanup
+    delete_marker_key.delete()
     stop_amqp_receiver(receiver, task)
     s3_notification_conf.del_config()
     topic_conf.del_config()