encode_delete_at_attr(delete_at, attrs);
+ // get src object size (cached in obj_ctx from verify_permission())
+ RGWObjState* astate = nullptr;
+ op_ret = src_object->get_obj_state(this, s->obj_ctx, &astate, s->yield, true);
+ if (op_ret < 0) {
+ return;
+ }
+
if (!s->system_request) { // no quota enforcement for system requests
- // get src object size (cached in obj_ctx from verify_permission())
- RGWObjState* astate = nullptr;
- op_ret = src_object->get_obj_state(this, s->obj_ctx, &astate, s->yield, true);
- if (op_ret < 0) {
- return;
- }
// enforce quota against the destination bucket owner
op_ret = dest_bucket->check_quota(user_quota, bucket_quota,
astate->accounted_size, y);
s->yield);
// send request to notification manager
- int ret = res->publish_commit(this, s->obj_size, mtime, etag);
+ int ret = res->publish_commit(this, astate->size, mtime, etag);
if (ret < 0) {
ldpp_dout(this, 1) << "ERROR: publishing notification failed, with error: " << ret << dendl;
// too late to rollback operation, hence op_ret is not set here
self.events.append(json.loads(body))
# TODO create a base class for the AMQP and HTTP cases
- def verify_s3_events(self, keys, exact_match=False, deletions=False):
+ def verify_s3_events(self, keys, exact_match=False, deletions=False, expected_sizes={}):
"""verify stored s3 records agains a list of keys"""
- verify_s3_records_by_elements(self.events, keys, exact_match=exact_match, deletions=deletions)
+ verify_s3_records_by_elements(self.events, keys, exact_match=exact_match, deletions=deletions, expected_sizes=expected_sizes)
self.events = []
def verify_events(self, keys, exact_match=False, deletions=False):
response, status = s3_notification_conf.set_config()
assert_equal(status/100, 2)
+ objects_size = {}
# create objects in the bucket using PUT
- key = bucket.new_key('put')
- key.set_contents_from_string('bar')
+ content = str(os.urandom(randint(1, 1024)))
+ key_name = 'put'
+ key = bucket.new_key(key_name)
+ objects_size[key_name] = len(content)
+ key.set_contents_from_string(content)
# create objects in the bucket using COPY
- bucket.copy_key('copy', bucket.name, key.name)
+ key_name = 'copy'
+ bucket.copy_key(key_name, bucket.name, key.name)
+ objects_size[key_name] = len(content)
# create objects in the bucket using multi-part upload
fp = tempfile.NamedTemporaryFile(mode='w+b')
- object_size = 10*1024*1024
- content = bytearray(os.urandom(object_size))
+ content = bytearray(os.urandom(10*1024*1024))
+ key_name = 'multipart'
+ objects_size[key_name] = len(content)
fp.write(content)
fp.flush()
fp.seek(0)
- uploader = bucket.initiate_multipart_upload('multipart')
+ uploader = bucket.initiate_multipart_upload(key_name)
uploader.upload_part_from_file(fp, 1)
uploader.complete_upload()
fp.close()
# check amqp receiver
keys = list(bucket.list())
- receiver.verify_s3_events(keys, exact_match=True)
+ receiver.verify_s3_events(keys, exact_match=True, expected_sizes=objects_size)
# cleanup
stop_amqp_receiver(receiver, task)