}
} while (is_next_file_to_upload());
- const auto ret = rgw::notify::publish(s, s->object, s->obj_size, ceph::real_clock::now(), etag, rgw::notify::ObjectCreatedPost, store);
+ const auto ret = rgw::notify::publish(s, s->object, ofs, ceph::real_clock::now(), etag, rgw::notify::ObjectCreatedPost, store);
if (ret < 0) {
ldpp_dout(this, 5) << "WARNING: publishing notification failed, with error: " << ret << dendl;
// TODO: we should have conf to make send a blocking coroutine and reply with error in case sending failed
ldpp_dout(this, 5) << "WARNING: failed to populate delete request with object tags: " << err.what() << dendl;
}
populate_metadata_in_request(s, attrs);
+ const auto obj_state = obj_ctx->get_state(obj);
+
+ const auto ret = rgw::notify::publish(s, s->object, obj_state->size , obj_state->mtime, attrs[RGW_ATTR_ETAG].to_str(),
+ delete_marker && s->object.instance.empty() ? rgw::notify::ObjectRemovedDeleteMarkerCreated : rgw::notify::ObjectRemovedDelete,
+ store);
+ if (ret < 0) {
+ ldpp_dout(this, 5) << "WARNING: publishing notification failed, with error: " << ret << dendl;
+ // TODO: we should have conf to make send a blocking coroutine and reply with error in case sending failed
+ // this should be global conf (probably returnign a different handler)
+ // so we don't need to read the configured values before we perform it
+ }
} else {
op_ret = -EINVAL;
}
-
- const auto ret = rgw::notify::publish(s, s->object, s->obj_size, ceph::real_clock::now(), attrs[RGW_ATTR_ETAG].to_str(),
- delete_marker && s->object.instance.empty() ? rgw::notify::ObjectRemovedDeleteMarkerCreated : rgw::notify::ObjectRemovedDelete,
- store);
- if (ret < 0) {
- ldpp_dout(this, 5) << "WARNING: publishing notification failed, with error: " << ret << dendl;
- // TODO: we should have conf to make send a blocking coroutine and reply with error in case sending failed
- // this should be global conf (probably returnign a different handler)
- // so we don't need to read the configured values before we perform it
- }
}
bool RGWCopyObj::parse_copy_location(const boost::string_view& url_src,
ldpp_dout(this, 0) << "WARNING: failed to remove object " << meta_obj << dendl;
}
- const auto ret = rgw::notify::publish(s, s->object, s->obj_size, ceph::real_clock::now(), etag, rgw::notify::ObjectCreatedCompleteMultipartUpload, store);
+ const auto ret = rgw::notify::publish(s, s->object, ofs, ceph::real_clock::now(), final_etag_str, rgw::notify::ObjectCreatedCompleteMultipartUpload, store);
+
if (ret < 0) {
ldpp_dout(this, 5) << "WARNING: publishing notification failed, with error: " << ret << dendl;
// TODO: we should have conf to make send a blocking coroutine and reply with error in case sending failed
bufferlist etag_bl;
const auto etag = obj_state->get_attr(RGW_ATTR_ETAG, etag_bl) ? etag_bl.to_str() : "";
- const auto ret = rgw::notify::publish(s, obj.key, obj_state->size, ceph::real_clock::now(), etag,
+ const auto ret = rgw::notify::publish(s, obj.key, obj_state->size, obj_state->mtime, etag,
del_op.result.delete_marker && s->object.instance.empty() ? rgw::notify::ObjectRemovedDeleteMarkerCreated : rgw::notify::ObjectRemovedDelete,
store);
if (ret < 0) {
self.sock.listen(num_workers)
self.workers = [HTTPServerThread(i, self.sock, addr) for i in range(num_workers)]
- def verify_s3_events(self, keys, exact_match=False, deletions=False):
+ def verify_s3_events(self, keys, exact_match=False, deletions=False, expected_sizes={}):
"""verify stored s3 records agains a list of keys"""
events = []
for worker in self.workers:
events += worker.get_events()
worker.reset_events()
- verify_s3_records_by_elements(events, keys, exact_match=exact_match, deletions=deletions)
+ verify_s3_records_by_elements(events, keys, exact_match=exact_match, deletions=deletions, expected_sizes=expected_sizes)
def verify_events(self, keys, exact_match=False, deletions=False):
"""verify stored events agains a list of keys"""
assert False, err
-def verify_s3_records_by_elements(records, keys, exact_match=False, deletions=False):
+def verify_s3_records_by_elements(records, keys, exact_match=False, deletions=False, expected_sizes={}):
""" verify there is at least one record per element """
err = ''
for key in keys:
key_found = False
+ object_size = 0
if type(records) is list:
for record_list in records:
if key_found:
record['s3']['object']['key'] == key.name:
if deletions and 'ObjectRemoved' in record['eventName']:
key_found = True
+ object_size = record['s3']['object']['size']
break
elif not deletions and 'ObjectCreated' in record['eventName']:
key_found = True
+ object_size = record['s3']['object']['size']
break
else:
for record in records['Records']:
record['s3']['object']['key'] == key.name:
if deletions and 'ObjectRemoved' in record['eventName']:
key_found = True
+ object_size = record['s3']['object']['size']
break
elif not deletions and 'ObjectCreated' in record['eventName']:
key_found = True
+ object_size = record['s3']['object']['size']
break
if not key_found:
err = 'no ' + ('deletion' if deletions else 'creation') + ' event found for key: ' + str(key)
- for record_list in records:
- for record in record_list['Records']:
- log.error(str(record['s3']['bucket']['name']) + ',' + str(record['s3']['object']['key']))
assert False, err
+ elif expected_sizes:
+ assert_equal(object_size, expected_sizes.get(key.name))
if not len(records) == len(keys):
err = 'superfluous records are found'
if skip_push_tests:
return SkipTest("PubSub push tests don't run in teuthology")
hostname = get_ip()
- zones, _ = init_env(require_ps=False)
+ master_zone, _ = init_env(require_ps=False)
realm = get_realm()
zonegroup = realm.master_zonegroup()
# create bucket
bucket_name = gen_bucket_name()
- bucket = zones[0].create_bucket(bucket_name)
+ bucket = master_zone.create_bucket(bucket_name)
topic_name = bucket_name + TOPIC_SUFFIX
# create s3 topic
endpoint_address = 'http://'+host+':'+str(port)
endpoint_args = 'push-endpoint='+endpoint_address
- topic_conf = PSTopicS3(zones[0].conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
+ topic_conf = PSTopicS3(master_zone.conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
topic_arn = topic_conf.set_config()
# create s3 notification
notification_name = bucket_name + NOTIFICATION_SUFFIX
'TopicArn': topic_arn,
'Events': ['s3:ObjectRemoved:*']
}]
- s3_notification_conf = PSNotificationS3(zones[0].conn, bucket_name, topic_conf_list)
+ s3_notification_conf = PSNotificationS3(master_zone.conn, bucket_name, topic_conf_list)
response, status = s3_notification_conf.set_config()
assert_equal(status/100, 2)
# create objects in the bucket
client_threads = []
+ objects_size = {}
for i in range(number_of_objects):
- obj_size = randint(1, 1024)
- content = str(os.urandom(obj_size))
+ object_size = randint(1, 1024)
+ content = str(os.urandom(object_size))
key = bucket.new_key(str(i))
+ objects_size[key.name] = object_size
thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
thr.start()
client_threads.append(thr)
keys = list(bucket.list())
start_time = time.time()
- delete_all_objects(zones[0].conn, bucket_name)
+ delete_all_objects(master_zone.conn, bucket_name)
time_diff = time.time() - start_time
print('average time for deletion + http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
print('wait for 5sec for the messages...')
time.sleep(5)
-
+
# check http receiver
- http_server.verify_s3_events(keys, exact_match=True, deletions=True)
+ http_server.verify_s3_events(keys, exact_match=True, deletions=True, expected_sizes=objects_size)
# cleanup
topic_conf.del_config()
s3_notification_conf.del_config(notification=notification_name)
# delete the bucket
- zones[0].delete_bucket(bucket_name)
+ master_zone.delete_bucket(bucket_name)
http_server.close()
# create objects in the bucket
client_threads = []
+ objects_size = {}
start_time = time.time()
- content = 'bar'
for i in range(number_of_objects):
+ object_size = randint(1, 1024)
+ content = str(os.urandom(object_size))
key = bucket.new_key(str(i))
+ objects_size[key.name] = object_size
thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
thr.start()
client_threads.append(thr)
# check http receiver
keys = list(bucket.list())
- print('total number of objects: ' + str(len(keys)))
- http_server.verify_s3_events(keys, exact_match=True)
+ http_server.verify_s3_events(keys, exact_match=True, deletions=False, expected_sizes=objects_size)
# delete objects from the bucket
client_threads = []
time.sleep(5)
# check http receiver
- http_server.verify_s3_events(keys, exact_match=True, deletions=True)
+ http_server.verify_s3_events(keys, exact_match=True, deletions=True, expected_sizes=objects_size)
# cleanup
topic_conf.del_config()
# create objects in the bucket using multi-part upload
fp = tempfile.NamedTemporaryFile(mode='w+b')
- content = bytearray(os.urandom(1024*1024))
+ object_size = 10*1024*1024
+ content = bytearray(os.urandom(object_size))
fp.write(content)
fp.flush()
fp.seek(0)
assert_equal(len(events), 1)
assert_equal(events[0]['Records'][0]['eventName'], 's3:ObjectCreated:CompleteMultipartUpload')
assert_equal(events[0]['Records'][0]['s3']['configurationId'], notification_name+'_3')
+ print events[0]['Records'][0]['s3']['object']['size']
# cleanup
stop_amqp_receiver(receiver1, task1)