}
s->object->set_bucket(s->bucket.get());
- rgw::sal::RGWAttrs attrs;
+ if (!rgw::sal::RGWObject::empty(s->object.get())) {
+ uint64_t obj_size = 0;
+ std::string etag;
+ RGWObjectCtx* obj_ctx = static_cast<RGWObjectCtx *>(s->obj_ctx);
+ {
+ RGWObjState* astate = nullptr;
+ bool check_obj_lock = s->object->have_instance() && s->bucket->get_info().obj_lock_enabled();
- bool check_obj_lock = s->object->have_instance() && s->bucket->get_info().obj_lock_enabled();
+ op_ret = s->object->get_obj_state(this, obj_ctx, *s->bucket.get(), &astate, s->yield, true);
+ if (op_ret < 0) {
+ if (need_object_expiration() || multipart_delete) {
+ return;
+ }
- if (!rgw::sal::RGWObject::empty(s->object.get())) {
- op_ret = s->object->get_obj_attrs(s->obj_ctx, s->yield, this);
- if (op_ret < 0) {
- if (need_object_expiration() || multipart_delete) {
- return;
+ if (check_obj_lock) {
+ /* check if obj exists, read orig attrs */
+ if (op_ret == -ENOENT) {
+ /* object maybe delete_marker, skip check_obj_lock*/
+ check_obj_lock = false;
+ } else {
+ return;
+ }
+ }
+ } else {
+ obj_size = astate->size;
+ etag = astate->attrset[RGW_ATTR_ETAG].to_str();
}
+ // ignore return value from get_obj_attrs in all other cases
+ op_ret = 0;
+
if (check_obj_lock) {
- /* check if obj exists, read orig attrs */
- if (op_ret == -ENOENT) {
- /* object maybe delete_marker, skip check_obj_lock*/
- check_obj_lock = false;
- } else {
+ ceph_assert(astate);
+ int object_lock_response = verify_object_lock(this, astate->attrset, bypass_perm, bypass_governance_mode);
+ if (object_lock_response != 0) {
+ op_ret = object_lock_response;
+ if (op_ret == -EACCES) {
+ s->err.message = "forbidden by object lock";
+ }
return;
}
}
- } else {
- attrs = s->object->get_attrs();
- }
-
- // ignore return value from get_obj_attrs in all other cases
- op_ret = 0;
- if (check_obj_lock) {
- int object_lock_response = verify_object_lock(this, attrs, bypass_perm, bypass_governance_mode);
- if (object_lock_response != 0) {
- op_ret = object_lock_response;
- if (op_ret == -EACCES) {
- s->err.message = "forbidden by object lock";
- }
- return;
- }
- }
+ if (multipart_delete) {
+ if (!astate) {
+ op_ret = -ERR_NOT_SLO_MANIFEST;
+ return;
+ }
- if (multipart_delete) {
- const auto slo_attr = attrs.find(RGW_ATTR_SLO_MANIFEST);
+ const auto slo_attr = astate->attrset.find(RGW_ATTR_SLO_MANIFEST);
- if (slo_attr != attrs.end()) {
- op_ret = handle_slo_manifest(slo_attr->second, y);
- if (op_ret < 0) {
- ldpp_dout(this, 0) << "ERROR: failed to handle slo manifest ret=" << op_ret << dendl;
+ if (slo_attr != astate->attrset.end()) {
+ op_ret = handle_slo_manifest(slo_attr->second, y);
+ if (op_ret < 0) {
+ ldpp_dout(this, 0) << "ERROR: failed to handle slo manifest ret=" << op_ret << dendl;
+ }
+ } else {
+ op_ret = -ERR_NOT_SLO_MANIFEST;
}
- } else {
- op_ret = -ERR_NOT_SLO_MANIFEST;
- }
- return;
+ return;
+ }
}
// make reservation for notification if needed
return;
}
- RGWObjectCtx *obj_ctx = static_cast<RGWObjectCtx *>(s->obj_ctx);
s->object->set_atomic(s->obj_ctx);
-
+
bool ver_restored = false;
op_ret = s->object->swift_versioning_restore(s->obj_ctx, ver_restored, this);
if (op_ret < 0) {
op_ret = 0;
}
- const auto obj_state = obj_ctx->get_state(s->object->get_obj());
-
// send request to notification manager
- const auto ret = rgw::notify::publish_commit(s->object.get(), obj_state->size, obj_state->mtime, attrs[RGW_ATTR_ETAG].to_str(), event_type, res, this);
+ const auto ret = rgw::notify::publish_commit(s->object.get(), obj_size, ceph::real_clock::now(), etag, event_type, res, this);
if (ret < 0) {
ldpp_dout(this, 1) << "ERROR: publishing notification failed, with error: " << ret << dendl;
// too late to rollback operation, hence op_ret is not set here
}
}
- // verify_object_lock
- bool check_obj_lock = obj->have_instance() && bucket->get_info().obj_lock_enabled();
- if (check_obj_lock) {
- int get_attrs_response = obj->get_obj_attrs(s->obj_ctx, s->yield, this);
- if (get_attrs_response < 0) {
- if (get_attrs_response == -ENOENT) {
+ uint64_t obj_size = 0;
+ std::string etag;
+
+ if (!rgw::sal::RGWObject::empty(obj.get())) {
+ RGWObjState* astate = nullptr;
+ bool check_obj_lock = obj->have_instance() && bucket->get_info().obj_lock_enabled();
+ const auto ret = obj->get_obj_state(this, obj_ctx, *bucket, &astate, s->yield, true);
+
+ if (ret < 0) {
+ if (ret == -ENOENT) {
// object maybe delete_marker, skip check_obj_lock
check_obj_lock = false;
} else {
// Something went wrong.
- send_partial_response(*iter, false, "", get_attrs_response);
+ send_partial_response(*iter, false, "", ret);
continue;
}
+ } else {
+ obj_size = astate->size;
+ etag = astate->attrset[RGW_ATTR_ETAG].to_str();
}
- }
- if (check_obj_lock) {
- int object_lock_response = verify_object_lock(this, obj->get_attrs(), bypass_perm, bypass_governance_mode);
- if (object_lock_response != 0) {
- send_partial_response(*iter, false, "", object_lock_response);
- continue;
+ if (check_obj_lock) {
+ ceph_assert(astate);
+ int object_lock_response = verify_object_lock(this, astate->attrset, bypass_perm, bypass_governance_mode);
+ if (object_lock_response != 0) {
+ send_partial_response(*iter, false, "", object_lock_response);
+ continue;
+ }
}
}
+
// make reservation for notification if needed
const auto versioned_object = s->bucket->versioning_enabled();
rgw::notify::reservation_t res(this, store, s, obj.get());
send_partial_response(*iter, obj->get_delete_marker(), version_id, op_ret);
- const auto obj_state = obj_ctx->get_state(obj->get_obj());
- bufferlist etag_bl;
- const auto etag = obj_state->get_attr(RGW_ATTR_ETAG, etag_bl) ? etag_bl.to_str() : "";
-
// send request to notification manager
- const auto ret = rgw::notify::publish_commit(obj.get(), obj_state->size, obj_state->mtime, etag, event_type, res, this);
+ const auto ret = rgw::notify::publish_commit(obj.get(), obj_size, ceph::real_clock::now(), etag, event_type, res, this);
if (ret < 0) {
ldpp_dout(this, 1) << "ERROR: publishing notification failed, with error: " << ret << dendl;
// too late to rollback operation, hence op_ret is not set here
# create objects in the bucket
key = bucket.new_key('foo')
- key.set_contents_from_string('bar')
- v1 = key.version_id
- key.set_contents_from_string('kaboom')
- v2 = key.version_id
+ content = str(os.urandom(512))
+ size1 = len(content)
+ key.set_contents_from_string(content)
+ ver1 = key.version_id
+ content = str(os.urandom(511))
+ size2 = len(content)
+ key.set_contents_from_string(content)
+ ver2 = key.version_id
# create delete marker (non versioned deletion)
delete_marker_key = bucket.delete_key(key.name)
time.sleep(1)
# versioned deletion
- bucket.delete_key(key.name, version_id=v2)
- bucket.delete_key(key.name, version_id=v1)
- delete_marker_key.delete()
+ bucket.delete_key(key.name, version_id=ver2)
+ bucket.delete_key(key.name, version_id=ver1)
print('wait for 5sec for the messages...')
time.sleep(5)
delete_marker_create_events = 0
for event_list in events:
for event in event_list['Records']:
+ size = event['s3']['object']['size']
if event['eventName'] == 's3:ObjectRemoved:Delete':
delete_events += 1
+ assert size in [size1, size2]
assert event['s3']['configurationId'] in [notification_name+'_1', notification_name+'_3']
if event['eventName'] == 's3:ObjectRemoved:DeleteMarkerCreated':
delete_marker_create_events += 1
+ assert size == size2
assert event['s3']['configurationId'] in [notification_name+'_1', notification_name+'_2']
- # 3 key versions were deleted (v1, v2 and the deletion marker)
+ # 2 key versions were deleted
# notified over the same topic via 2 notifications (1,3)
- assert_equal(delete_events, 3*2)
+ assert_equal(delete_events, 2*2)
# 1 deletion marker was created
# notified over the same topic over 2 notifications (1,2)
assert_equal(delete_marker_create_events, 1*2)
# cleanup
+ delete_marker_key.delete()
stop_amqp_receiver(receiver, task)
s3_notification_conf.del_config()
topic_conf.del_config()