};
class Manager : public DoutPrefixProvider {
+ bool shutdown = false;
const size_t max_queue_size;
const uint32_t queues_update_period_ms;
const uint32_t queues_update_retry_ms;
// clean stale reservation from queue
void cleanup_queue(const std::string& queue_name, spawn::yield_context yield) {
- while (true) {
+ while (!shutdown) {
ldpp_dout(this, 20) << "INFO: trying to perform stale reservation cleanup for queue: " << queue_name << dendl;
const auto now = ceph::coarse_real_time::clock::now();
const auto stale_time = now - std::chrono::seconds(stale_reservations_period_s);
boost::system::error_code ec;
timer.async_wait(yield[ec]);
}
+ ldpp_dout(this, 5) << "INFO: manager stopped. done cleanup for queue: " << queue_name << dendl;
+ }
+
+ // unlock (lose ownership) queue
+ int unlock_queue(const std::string& queue_name, spawn::yield_context yield) {
+ librados::ObjectWriteOperation op;
+ op.assert_exists();
+ rados::cls::lock::unlock(&op, queue_name+"_lock", lock_cookie);
+ auto& rados_ioctx = rados_store.getRados()->get_notif_pool_ctx();
+ const auto ret = rgw_rados_operate(this, rados_ioctx, queue_name, &op, optional_yield(io_context, yield));
+ if (ret == -ENOENT) {
+ ldpp_dout(this, 10) << "INFO: queue: " << queue_name
+ << ". was removed. nothing to unlock" << dendl;
+ return 0;
+ }
+ if (ret == -EBUSY) {
+ ldpp_dout(this, 10) << "INFO: queue: " << queue_name
+ << ". already owned by another RGW. no need to unlock" << dendl;
+ return 0;
+ }
+ return ret;
}
// processing of a specific queue
CountersManager queue_counters_container(queue_name, this->get_cct());
- while (true) {
+ while (!shutdown) {
// if queue was empty the last time, sleep for idle timeout
if (is_idle) {
Timer timer(io_context);
if (result == EntryProcessingResult::Successful || result == EntryProcessingResult::Expired
|| result == EntryProcessingResult::Migrating) {
ldpp_dout(this, 20) << "INFO: processing of entry: " << entry.marker
- << " (" << entry_idx << "/" << total_entries << ") from: " << queue_name
+ << " (" << entry_idx << "/" << total_entries << ") from: " << queue_name << " "
<< entryProcessingResultString[static_cast<unsigned int>(result)] << dendl;
remove_entries = true;
needs_migration_vector[entry_idx - 1] = (result == EntryProcessingResult::Migrating);
queue_counters_container.set(l_rgw_persistent_topic_size, entries_size);
}
}
+ ldpp_dout(this, 5) << "INFO: manager stopped. done processing for queue: " << queue_name << dendl;
}
// lits of owned queues
void process_queues(spawn::yield_context yield) {
auto has_error = false;
owned_queues_t owned_queues;
+ size_t processed_queue_count = 0;
// add randomness to the duration between queue checking
// to make sure that different daemons are not synced
std::vector<std::string> queue_gc;
std::mutex queue_gc_lock;
- while (true) {
+ auto& rados_ioctx = rados_store.getRados()->get_notif_pool_ctx();
+ while (!shutdown) {
Timer timer(io_context);
const auto duration = (has_error ?
std::chrono::milliseconds(queues_update_retry_ms) : std::chrono::milliseconds(queues_update_period_ms)) +
failover_time,
LOCK_FLAG_MAY_RENEW);
- ret = rgw_rados_operate(this, rados_store.getRados()->get_notif_pool_ctx(), queue_name, &op, optional_yield(io_context, yield));
+ ret = rgw_rados_operate(this, rados_ioctx, queue_name, &op, optional_yield(io_context, yield));
if (ret == -EBUSY) {
// lock is already taken by another RGW
ldpp_dout(this, 20) << "INFO: queue: " << queue_name << " owned (locked) by another daemon" << dendl;
if (owned_queues.insert(queue_name).second) {
ldpp_dout(this, 10) << "INFO: queue: " << queue_name << " now owned (locked) by this daemon" << dendl;
// start processing this queue
- spawn::spawn(io_context, [this, &queue_gc, &queue_gc_lock, queue_name](spawn::yield_context yield) {
+ spawn::spawn(io_context, [this, &queue_gc, &queue_gc_lock, queue_name, &processed_queue_count](spawn::yield_context yield) {
+ ++processed_queue_count;
process_queue(queue_name, yield);
// if queue processing ended, it means that the queue was removed or not owned anymore
+ const auto ret = unlock_queue(queue_name, yield);
+ if (ret < 0) {
+ ldpp_dout(this, 5) << "WARNING: failed to unlock queue: " << queue_name << " with error: " <<
+ ret << " (ownership would still move if not renewed)" << dendl;
+ } else {
+ ldpp_dout(this, 10) << "INFO: queue: " << queue_name << " not locked (ownership can move)" << dendl;
+ }
// mark it for deletion
std::lock_guard lock_guard(queue_gc_lock);
queue_gc.push_back(queue_name);
+ --processed_queue_count;
ldpp_dout(this, 10) << "INFO: queue: " << queue_name << " marked for removal" << dendl;
}, make_stack_allocator());
} else {
std::for_each(queue_gc.begin(), queue_gc.end(), [this, &owned_queues](const std::string& queue_name) {
topics_persistency_tracker.erase(queue_name);
owned_queues.erase(queue_name);
- ldpp_dout(this, 20) << "INFO: queue: " << queue_name << " removed" << dendl;
+ ldpp_dout(this, 10) << "INFO: queue: " << queue_name << " was removed" << dendl;
});
queue_gc.clear();
}
}
+ Timer timer(io_context);
+ while (processed_queue_count > 0) {
+ ldpp_dout(this, 5) << "INFO: manager stopped. " << processed_queue_count << " queues are still being processed" << dendl;
+ timer.expires_from_now(std::chrono::milliseconds(queues_update_retry_ms));
+ boost::system::error_code ec;
+ timer.async_wait(yield[ec]);
+ }
+ ldpp_dout(this, 5) << "INFO: manager stopped. done processing all queues" << dendl;
}
public:
~Manager() {
+ }
+
+ void stop() {
+ shutdown = true;
work_guard.reset();
- io_context.stop();
std::for_each(workers.begin(), workers.end(), [] (auto& worker) { worker.join(); });
}
+ void init() {
+ spawn::spawn(io_context, [this](spawn::yield_context yield) {
+ process_queues(yield);
+ }, make_stack_allocator());
+
+ // start the worker threads to do the actual queue processing
+ const std::string WORKER_THREAD_NAME = "notif-worker";
+ for (auto worker_id = 0U; worker_id < worker_count; ++worker_id) {
+ workers.emplace_back([this]() {
+ try {
+ io_context.run();
+ } catch (const std::exception& err) {
+ ldpp_dout(this, 1) << "ERROR: notification worker failed with error: " << err.what() << dendl;
+ throw err;
+ }
+ });
+ const auto rc = ceph_pthread_setname(workers.back().native_handle(),
+ (WORKER_THREAD_NAME+std::to_string(worker_id)).c_str());
+ ceph_assert(rc == 0);
+ }
+ ldpp_dout(this, 10) << "INfO: started notification manager with: " << worker_count << " workers" << dendl;
+ }
+
// ctor: start all threads
Manager(CephContext* _cct, uint32_t _max_queue_size, uint32_t _queues_update_period_ms,
uint32_t _queues_update_retry_ms, uint32_t _queue_idle_sleep_us, u_int32_t failover_time_ms,
reservations_cleanup_period_s(_reservations_cleanup_period_s),
site(site),
rados_store(*store)
- {
- spawn::spawn(io_context, [this](spawn::yield_context yield) {
- process_queues(yield);
- }, make_stack_allocator());
-
- // start the worker threads to do the actual queue processing
- const std::string WORKER_THREAD_NAME = "notif-worker";
- for (auto worker_id = 0U; worker_id < worker_count; ++worker_id) {
- workers.emplace_back([this]() {
- try {
- io_context.run();
- } catch (const std::exception& err) {
- ldpp_dout(this, 10) << "Notification worker failed with error: " << err.what() << dendl;
- throw(err);
- }
- });
- const auto rc = ceph_pthread_setname(workers.back().native_handle(),
- (WORKER_THREAD_NAME+std::to_string(worker_id)).c_str());
- ceph_assert(rc == 0);
- }
- ldpp_dout(this, 10) << "Started notification manager with: " << worker_count << " workers" << dendl;
- }
+ {}
int add_persistent_topic(const std::string& topic_queue, optional_yield y) {
if (topic_queue == Q_LIST_OBJECT_NAME) {
}
};
-// singleton manager
-// note that the manager itself is not a singleton, and multiple instances may co-exist
-// TODO make the pointer atomic in allocation and deallocation to avoid race conditions
-static Manager* s_manager = nullptr;
+std::unique_ptr<Manager> s_manager;
constexpr size_t MAX_QUEUE_SIZE = 128*1000*1000; // 128MB
constexpr uint32_t Q_LIST_UPDATE_MSEC = 1000*30; // check queue list every 30seconds
constexpr uint32_t STALE_RESERVATIONS_PERIOD_S = 120; // cleanup reservations that are more than 2 minutes old
constexpr uint32_t RESERVATIONS_CLEANUP_PERIOD_S = 30; // reservation cleanup every 30 seconds
-bool init(CephContext* cct, rgw::sal::RadosStore* store,
- const SiteConfig& site, const DoutPrefixProvider *dpp) {
+bool init(const DoutPrefixProvider* dpp, rgw::sal::RadosStore* store,
+ const SiteConfig& site) {
if (s_manager) {
+ ldpp_dout(dpp, 1) << "ERROR: failed to init notification manager: already exists" << dendl;
+ return false;
+ }
+ if (!RGWPubSubEndpoint::init_all(dpp->get_cct())) {
return false;
}
// TODO: take conf from CephContext
- s_manager = new Manager(cct, MAX_QUEUE_SIZE,
+ s_manager = std::make_unique<Manager>(dpp->get_cct(), MAX_QUEUE_SIZE,
Q_LIST_UPDATE_MSEC, Q_LIST_RETRY_MSEC,
IDLE_TIMEOUT_USEC, FAILOVER_TIME_MSEC,
STALE_RESERVATIONS_PERIOD_S, RESERVATIONS_CLEANUP_PERIOD_S,
WORKER_COUNT,
store, site);
+ s_manager->init();
return true;
}
void shutdown() {
- delete s_manager;
- s_manager = nullptr;
+ if (!s_manager) return;
+ RGWPubSubEndpoint::shutdown_all();
+ s_manager->stop();
+ s_manager.reset();
}
int add_persistent_topic(const std::string& topic_name, optional_yield y) {
if (!s_manager) {
return -EAGAIN;
}
- return remove_persistent_topic(s_manager, s_manager->rados_store.getRados()->get_notif_pool_ctx(), topic_queue, y);
+ return remove_persistent_topic(s_manager.get(), s_manager->rados_store.getRados()->get_notif_pool_ctx(), topic_queue, y);
}
rgw::sal::Object* get_object_with_attributes(
// initialize the notification manager
// notification manager is dequeuing the 2-phase-commit queues
// and send the notifications to the endpoints
-bool init(CephContext* cct, rgw::sal::RadosStore* store,
- const rgw::SiteConfig& site, const DoutPrefixProvider *dpp);
+bool init(const DoutPrefixProvider* dpp, rgw::sal::RadosStore* store,
+ const rgw::SiteConfig& site);
// shutdown the notification manager
void shutdown();
#include <string>
#include <sstream>
#include <algorithm>
+#include <curl/curl.h>
#include "common/Formatter.h"
#include "common/iso_8601.h"
#include "common/async/completion.h"
#include <functional>
#include "rgw_perf_counters.h"
+#define dout_subsys ceph_subsys_rgw_notification
+
using namespace rgw;
template<typename EventType>
return value;
}
+static std::unique_ptr<RGWHTTPManager> s_http_manager;
+static std::shared_mutex s_http_manager_mutex;
+
class RGWPubSubHTTPEndpoint : public RGWPubSubEndpoint {
private:
CephContext* const cct;
}
int send(const rgw_pubsub_s3_event& event, optional_yield y) override {
+ std::shared_lock lock(s_http_manager_mutex);
+ if (!s_http_manager) {
+ ldout(cct, 1) << "ERROR: send failed. http endpoint manager not running" << dendl;
+ return -ESRCH;
+ }
bufferlist read_bl;
RGWPostHTTPData request(cct, "POST", endpoint, &read_bl, verify_ssl);
const auto post_data = json_format_pubsub_event(event);
request.set_send_length(post_data.length());
request.append_header("Content-Type", "application/json");
if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_pending);
- const auto rc = RGWHTTP::process(&request, y);
+ auto rc = s_http_manager->add_request(&request);
+ if (rc == 0) {
+ rc = request.wait(y);
+ }
if (perfcounter) perfcounter->dec(l_rgw_pubsub_push_pending);
// TODO: use read_bl to process return code and handle according to ack level
return rc;
return nullptr;
}
+bool init_http_manager(CephContext* cct) {
+ std::unique_lock lock(s_http_manager_mutex);
+ if (s_http_manager) return false;
+ s_http_manager = std::make_unique<RGWHTTPManager>(cct);
+ return (s_http_manager->start() == 0);
+}
+
+void shutdown_http_manager() {
+ std::unique_lock lock(s_http_manager_mutex);
+ if (s_http_manager) {
+ s_http_manager->stop();
+ s_http_manager.reset();
+ }
+}
+
+bool RGWPubSubEndpoint::init_all(CephContext* cct) {
+#ifdef WITH_RADOSGW_AMQP_ENDPOINT
+ if (!amqp::init(cct)) {
+ ldout(cct, 1) << "ERROR: failed to init amqp endpoint manager" << dendl;
+ return false;
+ }
+#endif
+#ifdef WITH_RADOSGW_KAFKA_ENDPOINT
+ if (!kafka::init(cct)) {
+ ldout(cct, 1) << "ERROR: failed to init kafka endpoint manager" << dendl;
+ return false;
+ }
+#endif
+ if (!init_http_manager(cct)) {
+ ldout(cct, 1) << "ERROR: failed to init http endpoint manager" << dendl;
+ return false;
+ }
+ return true;
+}
+
+void RGWPubSubEndpoint::shutdown_all() {
+#ifdef WITH_RADOSGW_AMQP_ENDPOINT
+ amqp::shutdown();
+#endif
+#ifdef WITH_RADOSGW_KAFKA_ENDPOINT
+ kafka::shutdown();
+#endif
+ shutdown_http_manager();
+}
+
configuration_error(const std::string& what_arg) :
std::logic_error("pubsub endpoint configuration error: " + what_arg) {}
};
+
+ // init all supported endpoints
+ static bool init_all(CephContext* cct);
+ // shutdown all supported endpoints
+ static void shutdown_all();
+
};
index_completion_manager = new RGWIndexCompletionManager(this);
if (run_notification_thread) {
- ret = rgw::notify::init(cct, driver, *svc.site, dpp);
- if (ret < 0 ) {
- ldpp_dout(dpp, 1) << "ERROR: failed to initialize notification manager" << dendl;
- return ret;
+ if (!rgw::notify::init(dpp, driver, *svc.site)) {
+ ldpp_dout(dpp, 0) << "ERROR: failed to initialize notification manager" << dendl;
}
using namespace rgw;
// singleton manager
// note that the manager itself is not a singleton, and multiple instances may co-exist
-// TODO make the pointer atomic in allocation and deallocation to avoid race conditions
static Manager* s_manager = nullptr;
+static std::shared_mutex s_manager_mutex;
static const size_t MAX_CONNECTIONS_DEFAULT = 256;
static const size_t MAX_INFLIGHT_DEFAULT = 8192;
static const unsigned RECONNECT_TIME_MS = 100;
bool init(CephContext* cct) {
+ std::unique_lock lock(s_manager_mutex);
if (s_manager) {
return false;
}
}
void shutdown() {
+ std::unique_lock lock(s_manager_mutex);
delete s_manager;
s_manager = nullptr;
}
bool connect(connection_id_t& conn_id, const std::string& url, const std::string& exchange, bool mandatory_delivery, bool verify_ssl,
boost::optional<const std::string&> ca_location) {
+ std::shared_lock lock(s_manager_mutex);
if (!s_manager) return false;
return s_manager->connect(conn_id, url, exchange, mandatory_delivery, verify_ssl, ca_location);
}
int publish(const connection_id_t& conn_id,
const std::string& topic,
const std::string& message) {
+ std::shared_lock lock(s_manager_mutex);
if (!s_manager) return RGW_AMQP_STATUS_MANAGER_STOPPED;
return s_manager->publish(conn_id, topic, message);
}
const std::string& topic,
const std::string& message,
reply_callback_t cb) {
+ std::shared_lock lock(s_manager_mutex);
if (!s_manager) return RGW_AMQP_STATUS_MANAGER_STOPPED;
return s_manager->publish_with_confirm(conn_id, topic, message, cb);
}
size_t get_connection_count() {
+ std::shared_lock lock(s_manager_mutex);
if (!s_manager) return 0;
return s_manager->get_connection_count();
}
size_t get_inflight() {
+ std::shared_lock lock(s_manager_mutex);
if (!s_manager) return 0;
return s_manager->get_inflight();
}
size_t get_queued() {
+ std::shared_lock lock(s_manager_mutex);
if (!s_manager) return 0;
return s_manager->get_queued();
}
size_t get_dequeued() {
+ std::shared_lock lock(s_manager_mutex);
if (!s_manager) return 0;
return s_manager->get_dequeued();
}
size_t get_max_connections() {
+ std::shared_lock lock(s_manager_mutex);
if (!s_manager) return MAX_CONNECTIONS_DEFAULT;
return s_manager->max_connections;
}
size_t get_max_inflight() {
+ std::shared_lock lock(s_manager_mutex);
if (!s_manager) return MAX_INFLIGHT_DEFAULT;
return s_manager->max_inflight;
}
size_t get_max_queue() {
+ std::shared_lock lock(s_manager_mutex);
if (!s_manager) return MAX_QUEUE_DEFAULT;
return s_manager->max_queue;
}
#include "rgw_kmip_client_impl.h"
#include "rgw_perf_counters.h"
#include "rgw_signal.h"
-#ifdef WITH_RADOSGW_AMQP_ENDPOINT
-#include "rgw_amqp.h"
-#endif
-#ifdef WITH_RADOSGW_KAFKA_ENDPOINT
-#include "rgw_kafka.h"
-#endif
#ifdef WITH_ARROW_FLIGHT
#include "rgw_flight_frontend.h"
#endif
tracing::rgw::tracer.init(dpp->get_cct(), "rgw");
} /* init_tracepoints() */
-void rgw::AppMain::init_notification_endpoints()
-{
-#ifdef WITH_RADOSGW_AMQP_ENDPOINT
- if (!rgw::amqp::init(dpp->get_cct())) {
- derr << "ERROR: failed to initialize AMQP manager" << dendl;
- }
-#endif
-#ifdef WITH_RADOSGW_KAFKA_ENDPOINT
- if (!rgw::kafka::init(dpp->get_cct())) {
- derr << "ERROR: failed to initialize Kafka manager" << dendl;
- }
-#endif
-} /* init_notification_endpoints */
-
void rgw::AppMain::init_lua()
{
rgw::sal::Driver* driver = env.driver;
rgw::curl::cleanup_curl();
g_conf().remove_observer(implicit_tenant_context.get());
implicit_tenant_context.reset(); // deletes
-#ifdef WITH_RADOSGW_AMQP_ENDPOINT
- rgw::amqp::shutdown();
-#endif
-#ifdef WITH_RADOSGW_KAFKA_ENDPOINT
- rgw::kafka::shutdown();
-#endif
rgw_perf_stop(g_ceph_context);
ratelimiter.reset(); // deletes--ensure this happens before we destruct
} /* AppMain::shutdown */
// singleton manager
// note that the manager itself is not a singleton, and multiple instances may co-exist
-// TODO make the pointer atomic in allocation and deallocation to avoid race conditions
static Manager* s_manager = nullptr;
+static std::shared_mutex s_manager_mutex;
static const size_t MAX_CONNECTIONS_DEFAULT = 256;
static const size_t MAX_INFLIGHT_DEFAULT = 8192;
static const size_t MAX_QUEUE_DEFAULT = 8192;
bool init(CephContext* cct) {
+ std::unique_lock lock(s_manager_mutex);
if (s_manager) {
return false;
}
}
void shutdown() {
+ std::unique_lock lock(s_manager_mutex);
delete s_manager;
s_manager = nullptr;
}
boost::optional<const std::string&> mechanism,
boost::optional<const std::string&> user_name,
boost::optional<const std::string&> password) {
+ std::shared_lock lock(s_manager_mutex);
if (!s_manager) return false;
return s_manager->connect(broker, url, use_ssl, verify_ssl, ca_location, mechanism, user_name, password);
}
int publish(const std::string& conn_name,
const std::string& topic,
const std::string& message) {
+ std::shared_lock lock(s_manager_mutex);
if (!s_manager) return STATUS_MANAGER_STOPPED;
return s_manager->publish(conn_name, topic, message);
}
const std::string& topic,
const std::string& message,
reply_callback_t cb) {
+ std::shared_lock lock(s_manager_mutex);
if (!s_manager) return STATUS_MANAGER_STOPPED;
return s_manager->publish_with_confirm(conn_name, topic, message, cb);
}
size_t get_connection_count() {
+ std::shared_lock lock(s_manager_mutex);
if (!s_manager) return 0;
return s_manager->get_connection_count();
}
size_t get_inflight() {
+ std::shared_lock lock(s_manager_mutex);
if (!s_manager) return 0;
return s_manager->get_inflight();
}
size_t get_queued() {
+ std::shared_lock lock(s_manager_mutex);
if (!s_manager) return 0;
return s_manager->get_queued();
}
size_t get_dequeued() {
+ std::shared_lock lock(s_manager_mutex);
if (!s_manager) return 0;
return s_manager->get_dequeued();
}
size_t get_max_connections() {
+ std::shared_lock lock(s_manager_mutex);
if (!s_manager) return MAX_CONNECTIONS_DEFAULT;
return s_manager->max_connections;
}
size_t get_max_inflight() {
+ std::shared_lock lock(s_manager_mutex);
if (!s_manager) return MAX_INFLIGHT_DEFAULT;
return s_manager->max_inflight;
}
size_t get_max_queue() {
+ std::shared_lock lock(s_manager_mutex);
if (!s_manager) return MAX_QUEUE_DEFAULT;
return s_manager->max_queue;
}
return r;
}
- main.init_notification_endpoints();
main.init_lua();
return 0;
main.shutdown();
return r;
}
- main.init_notification_endpoints();
#if defined(HAVE_SYS_PRCTL_H)
if (prctl(PR_SET_DUMPABLE, 1) == -1) {
void init_opslog();
int init_frontends2(RGWLib* rgwlib = nullptr);
void init_tracepoints();
- void init_notification_endpoints();
void init_lua();
bool have_http() {
parsed_result = json.loads(result[0])
entries = parsed_result['Topic Stats']['Entries']
retries += 1
+ time_diff = time.time() - start_time
+ log.info('queue %s has %d entries after %ds', topic_name, entries, time_diff)
if retries > 30:
- time_diff = time.time() - start_time
log.warning('queue %s still has %d entries after %ds', topic_name, entries, time_diff)
assert_equal(entries, 0)
time.sleep(5)
kafka_security('SASL_PLAINTEXT', mechanism='SCRAM-SHA-256')
+@attr('http_test')
+def test_persistent_ps_s3_reload():
+ """ do a realm reload while we send notifications """
+ conn = connection()
+ zonegroup = get_config_zonegroup()
+
+ # make sure there is nothing to migrate
+ print('delete all topics')
+ delete_all_topics(conn, '', get_config_cluster())
+
+ # create random port for the http server
+ host = get_ip()
+ http_port = random.randint(10000, 20000)
+ print('start http server')
+ http_server = HTTPServerWithEvents((host, http_port), delay=2)
+
+ # create bucket
+ bucket_name = gen_bucket_name()
+ bucket = conn.create_bucket(bucket_name)
+ topic_name1 = bucket_name + TOPIC_SUFFIX + '_1'
+
+ # create s3 topics
+ endpoint_address = 'http://'+host+':'+str(http_port)
+ endpoint_args = 'push-endpoint='+endpoint_address+'&persistent=true'
+ topic_conf1 = PSTopicS3(conn, topic_name1, zonegroup, endpoint_args=endpoint_args)
+ topic_arn1 = topic_conf1.set_config()
+ # 2nd topic is unused
+ topic_name2 = bucket_name + TOPIC_SUFFIX + '_2'
+ topic_conf2 = PSTopicS3(conn, topic_name2, zonegroup, endpoint_args=endpoint_args)
+ topic_arn2 = topic_conf2.set_config()
+
+ # create s3 notification
+ notification_name = bucket_name + NOTIFICATION_SUFFIX
+ topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn1,
+ 'Events': []
+ }]
+
+ s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
+ response, status = s3_notification_conf.set_config()
+ assert_equal(status/100, 2)
+
+ # topic stats
+ result = admin(['topic', 'stats', '--topic', topic_name1], get_config_cluster())
+ parsed_result = json.loads(result[0])
+ assert_equal(parsed_result['Topic Stats']['Entries'], 0)
+ assert_equal(result[1], 0)
+
+ # create objects in the bucket (async)
+ number_of_objects = 10
+ client_threads = []
+ start_time = time.time()
+ for i in range(number_of_objects):
+ key = bucket.new_key('key-'+str(i))
+ content = str(os.urandom(1024*1024))
+ thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
+ thr.start()
+ client_threads.append(thr)
+ [thr.join() for thr in client_threads]
+ time_diff = time.time() - start_time
+ print('average time for creation + async http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
+
+ wait_for_queue_to_drain(topic_name1)
+
+ client_threads = []
+ start_time = time.time()
+ for i in range(number_of_objects):
+ key = bucket.new_key('another-key-'+str(i))
+ content = str(os.urandom(1024*1024))
+ thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
+ thr.start()
+ client_threads.append(thr)
+ [thr.join() for thr in client_threads]
+ time_diff = time.time() - start_time
+ print('average time for creation + async http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
+
+ # do a reload
+ print('do reload')
+ result = admin(['zonegroup', 'modify', '--enable-feature=notification_v2'], get_config_cluster())
+ assert_equal(result[1], 0)
+ result = admin(['period', 'update'], get_config_cluster())
+ assert_equal(result[1], 0)
+ result = admin(['period', 'commit'], get_config_cluster())
+ assert_equal(result[1], 0)
+
+ wait_for_queue_to_drain(topic_name1)
+ # verify events
+ keys = list(bucket.list())
+ http_server.verify_s3_events(keys, exact_match=False)
+
+ # cleanup
+ s3_notification_conf.del_config()
+ topic_conf1.del_config()
+ topic_conf2.del_config()
+ # delete objects from the bucket
+ client_threads = []
+ for key in bucket.list():
+ thr = threading.Thread(target = key.delete, args=())
+ thr.start()
+ client_threads.append(thr)
+ [thr.join() for thr in client_threads]
+ # delete the bucket
+ conn.delete_bucket(bucket_name)
+ http_server.close()
+
+
@attr('data_path_v2_test')
def test_persistent_ps_s3_data_path_v2_migration():
""" test data path v2 persistent migration """