// populate record from request
void populate_record_from_request(const req_state *s,
+ const rgw_obj_key& key,
+ uint64_t size,
const ceph::real_time& mtime,
const std::string& etag,
EventType event_type,
record.bucket_name = s->bucket_name;
record.bucket_ownerIdentity = s->bucket_owner.get_id().id;
record.bucket_arn = to_string(rgw::ARN(s->bucket));
- record.object_key = s->object.name;
- record.object_size = s->obj_size;
+ record.object_key = key.name;
+ record.object_size = size;
record.object_etag = etag;
- record.object_versionId = s->object.instance;
+ record.object_versionId = key.instance;
// use timestamp as per key sequence id (hex encoded)
const utime_t ts(real_clock::now());
boost::algorithm::hex((const char*)&ts, (const char*)&ts + sizeof(utime_t),
}
int publish(const req_state* s,
+ const rgw_obj_key& key,
+ uint64_t size,
const ceph::real_time& mtime,
const std::string& etag,
EventType event_type,
return rc;
}
rgw_pubsub_s3_record record;
- populate_record_from_request(s, mtime, etag, event_type, record);
+ populate_record_from_request(s, key, size, mtime, etag, event_type, record);
bool event_handled = false;
bool event_should_be_handled = false;
for (const auto& bucket_topic : bucket_topics.topics) {
// forward declarations
class RGWRados;
class req_state;
+struct rgw_obj_key;
namespace rgw::notify {
// publish notification
int publish(const req_state* s,
+ const rgw_obj_key& key,
+ uint64_t size,
const ceph::real_time& mtime,
const std::string& etag,
EventType event_type,
}
// send request to notification manager
- const auto ret = rgw::notify::publish(s, mtime, etag, rgw::notify::ObjectCreatedPut, store);
+ const auto ret = rgw::notify::publish(s, s->object, s->obj_size, mtime, etag, rgw::notify::ObjectCreatedPut, store);
if (ret < 0) {
ldpp_dout(this, 5) << "WARNING: publishing notification failed, with error: " << ret << dendl;
// TODO: we should have conf to make send a blocking coroutine and reply with error in case sending failed
}
} while (is_next_file_to_upload());
- const auto ret = rgw::notify::publish(s, ceph::real_clock::now(), etag, rgw::notify::ObjectCreatedPost, store);
+ const auto ret = rgw::notify::publish(s, s->object, s->obj_size, ceph::real_clock::now(), etag, rgw::notify::ObjectCreatedPost, store);
if (ret < 0) {
ldpp_dout(this, 5) << "WARNING: publishing notification failed, with error: " << ret << dendl;
// TODO: we should have conf to make send a blocking coroutine and reply with error in case sending failed
op_ret = -EINVAL;
}
- const auto ret = rgw::notify::publish(s, ceph::real_clock::now(), attrs[RGW_ATTR_ETAG].to_str(),
+ const auto ret = rgw::notify::publish(s, s->object, s->obj_size, ceph::real_clock::now(), attrs[RGW_ATTR_ETAG].to_str(),
delete_marker && s->object.instance.empty() ? rgw::notify::ObjectRemovedDeleteMarkerCreated : rgw::notify::ObjectRemovedDelete,
store);
if (ret < 0) {
copy_obj_progress_cb, (void *)this
);
- const auto ret = rgw::notify::publish(s, mtime, etag, rgw::notify::ObjectCreatedCopy, store);
+ const auto ret = rgw::notify::publish(s, s->object, s->obj_size, mtime, etag, rgw::notify::ObjectCreatedCopy, store);
if (ret < 0) {
ldpp_dout(this, 5) << "WARNING: publishing notification failed, with error: " << ret << dendl;
// TODO: we should have conf to make send a blocking coroutine and reply with error in case sending failed
op_ret = obj_op.write_meta(bl.length(), 0, attrs);
} while (op_ret == -EEXIST);
- const auto ret = rgw::notify::publish(s, ceph::real_clock::now(), attrs[RGW_ATTR_ETAG].to_str(), rgw::notify::ObjectCreatedPost, store);
+ const auto ret = rgw::notify::publish(s, s->object, s->obj_size, ceph::real_clock::now(), attrs[RGW_ATTR_ETAG].to_str(), rgw::notify::ObjectCreatedPost, store);
if (ret < 0) {
ldpp_dout(this, 5) << "WARNING: publishing notification failed, with error: " << ret << dendl;
// TODO: we should have conf to make send a blocking coroutine and reply with error in case sending failed
ldpp_dout(this, 0) << "WARNING: failed to remove object " << meta_obj << dendl;
}
- const auto ret = rgw::notify::publish(s, ceph::real_clock::now(), etag, rgw::notify::ObjectCreatedCompleteMultipartUpload, store);
+ const auto ret = rgw::notify::publish(s, s->object, s->obj_size, ceph::real_clock::now(), etag, rgw::notify::ObjectCreatedCompleteMultipartUpload, store);
if (ret < 0) {
ldpp_dout(this, 5) << "WARNING: publishing notification failed, with error: " << ret << dendl;
// TODO: we should have conf to make send a blocking coroutine and reply with error in case sending failed
send_partial_response(*iter, del_op.result.delete_marker,
del_op.result.version_id, op_ret);
+
+ const auto obj_state = obj_ctx->get_state(obj);
+ bufferlist etag_bl;
+ const auto etag = obj_state->get_attr(RGW_ATTR_ETAG, etag_bl) ? etag_bl.to_str() : "";
+
+ const auto ret = rgw::notify::publish(s, obj.key, obj_state->size, ceph::real_clock::now(), etag,
+ del_op.result.delete_marker && s->object.instance.empty() ? rgw::notify::ObjectRemovedDeleteMarkerCreated : rgw::notify::ObjectRemovedDelete,
+ store);
+ if (ret < 0) {
+ ldpp_dout(this, 5) << "WARNING: publishing notification failed, with error: " << ret << dendl;
+ // TODO: we should have conf to make send a blocking coroutine and reply with error in case sending failed
+ // this should be global conf (probably returnign a different handler)
+ // so we don't need to read the configured values before we perform it
+ }
}
/* set the return code to zero, errors at this point will be
import socket
import time
import os
+from random import randint
from .tests import get_realm, \
ZonegroupConns, \
zonegroup_meta_checkpoint, \
print_connection_info, \
delete_all_s3_topics, \
put_object_tagging, \
- get_object_tagging
+ get_object_tagging, \
+ delete_all_objects
from multisite import User
from nose import SkipTest
from nose.tools import assert_not_equal, assert_equal
for worker in self.workers:
worker.close()
worker.join()
+
+ def get_and_reset_events(self):
+ events = []
+ for worker in self.workers:
+ events += worker.get_events()
+ worker.reset_events()
+ return events
# AMQP endpoint functions
kafka_security('SSL_SASL')
+def test_ps_s3_notification_multi_delete_on_master():
+ """ test deletion of multiple keys on master """
+ if skip_push_tests:
+ return SkipTest("PubSub push tests don't run in teuthology")
+ hostname = get_ip()
+ zones, _ = init_env(require_ps=False)
+ realm = get_realm()
+ zonegroup = realm.master_zonegroup()
+
+ # create random port for the http server
+ host = get_ip()
+ port = random.randint(10000, 20000)
+ # start an http server in a separate thread
+ number_of_objects = 10
+ http_server = StreamingHTTPServer(host, port, num_workers=number_of_objects)
+
+ # create bucket
+ bucket_name = gen_bucket_name()
+ bucket = zones[0].create_bucket(bucket_name)
+ topic_name = bucket_name + TOPIC_SUFFIX
+
+ # create s3 topic
+ endpoint_address = 'http://'+host+':'+str(port)
+ endpoint_args = 'push-endpoint='+endpoint_address
+ topic_conf = PSTopicS3(zones[0].conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
+ topic_arn = topic_conf.set_config()
+ # create s3 notification
+ notification_name = bucket_name + NOTIFICATION_SUFFIX
+ topic_conf_list = [{'Id': notification_name,
+ 'TopicArn': topic_arn,
+ 'Events': ['s3:ObjectRemoved:*']
+ }]
+ s3_notification_conf = PSNotificationS3(zones[0].conn, bucket_name, topic_conf_list)
+ response, status = s3_notification_conf.set_config()
+ assert_equal(status/100, 2)
+
+ # create objects in the bucket
+ client_threads = []
+ for i in range(number_of_objects):
+ obj_size = randint(1, 1024)
+ content = str(os.urandom(obj_size))
+ key = bucket.new_key(str(i))
+ thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
+ thr.start()
+ client_threads.append(thr)
+ [thr.join() for thr in client_threads]
+
+ keys = list(bucket.list())
+
+ start_time = time.time()
+ delete_all_objects(zones[0].conn, bucket_name)
+ time_diff = time.time() - start_time
+ print 'average time for deletion + http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds'
+
+ print('wait for 5sec for the messages...')
+ time.sleep(5)
+
+ # check http receiver
+ http_server.verify_s3_events(keys, exact_match=True, deletions=True)
+
+ # cleanup
+ topic_conf.del_config()
+ s3_notification_conf.del_config(notification=notification_name)
+ # delete the bucket
+ zones[0].delete_bucket(bucket_name)
+ http_server.close()
+
+
def test_ps_s3_notification_push_http_on_master():
""" test pushing http s3 notification on master """
if skip_push_tests:
print 'failed to do topic cleanup: ' + str(err)
+def delete_all_objects(conn, bucket_name):
+ client = boto3.client('s3',
+ endpoint_url='http://'+conn.host+':'+str(conn.port),
+ aws_access_key_id=conn.aws_access_key_id,
+ aws_secret_access_key=conn.aws_secret_access_key)
+
+ objects = []
+ for key in client.list_objects(Bucket=bucket_name)['Contents']:
+ objects.append({'Key': key['Key']})
+ # delete objects from the bucket
+ response = client.delete_objects(Bucket=bucket_name,
+ Delete={'Objects': objects})
+ return response
+
+
class PSTopicS3:
"""class to set/list/get/delete a topic
POST ?Action=CreateTopic&Name=<topic name>[&OpaqueData=<data>[&push-endpoint=<endpoint>&[<arg1>=<value1>...]]]