--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab ft=cpp
+
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright contributors to the Ceph project
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include "topic_migration.h"
+#include "services/svc_zone.h"
+#include "rgw_sal_rados.h"
+
+namespace rgwrados::topic_migration {
+
+namespace {
+
+int deconstruct_topics_oid(const std::string& bucket_topics_oid, std::string& tenant, std::string& bucket_name,
+ std::string& marker, const DoutPrefixProvider* dpp) {
+ auto pos = bucket_topics_oid.find(rgw::sal::pubsub_bucket_oid_infix);
+ if (pos == std::string::npos) {
+ ldpp_dout(dpp, 1) << "ERROR: bucket_topics_oid:" << bucket_topics_oid << " doesn't contain " << rgw::sal::pubsub_bucket_oid_infix
+ << " after tenant name!" << dendl;
+ return -EINVAL;
+ }
+ const size_t prefix_len = rgw::sal::pubsub_oid_prefix.size();
+ tenant = bucket_topics_oid.substr(prefix_len, pos - prefix_len);
+
+ auto bucket_name_marker = bucket_topics_oid.substr(pos + rgw::sal::pubsub_bucket_oid_infix.size());
+ pos = bucket_name_marker.find('/');
+ if (pos == std::string::npos) {
+ ldpp_dout(dpp, 1) << "ERROR: bucket_topics_oid:" << bucket_topics_oid << " doesn't contain / after bucket name!" << dendl;
+ return -EINVAL;
+ }
+ bucket_name = bucket_name_marker.substr(0, pos);
+ marker = bucket_name_marker.substr(pos + 1);
+
+ return 0;
+}
+
+// migrate v1 notification metadata for a single bucket
+int migrate_notification(const DoutPrefixProvider* dpp, optional_yield y,
+ rgw::sal::RadosStore* driver, const rgw_raw_obj& obj)
+{
+ // parse bucket name and marker of out "pubsub.{tenant}.bucket.{name}/{marker}"
+ auto* rados = driver->getRados()->get_rados_handle();
+ std::string tenant;
+ std::string bucket_name;
+ std::string marker;
+ int r = deconstruct_topics_oid(obj.oid, tenant, bucket_name, marker, dpp);
+ if (r < 0) {
+ const std::string s = fmt::format("failed to read tenant, bucket name and marker from: {}. error: {}. {}",
+ obj.to_str(), cpp_strerror(r), "expected format pubsub.{tenant}.bucket.{name}/{marker}!");
+ ldpp_dout(dpp, 1) << "ERROR: " << s << dendl;
+ rgw_clog_warn(rados, s);
+ return r;
+ }
+
+ // migrate the notifications
+ rgw_pubsub_bucket_topics v1_bucket_topics;
+ rgw_bucket rgw_bucket_info(tenant, bucket_name);
+ rgw_bucket_info.marker = marker;
+ rgw::sal::RadosBucket rados_bucket(driver, rgw_bucket_info);
+ RGWObjVersionTracker bucket_topics_objv;
+ r = rados_bucket.read_topics(v1_bucket_topics, &bucket_topics_objv, y, dpp);
+ if (r == -ENOENT) {
+ return 0; // ok, someone else already migrated
+ }
+ if (r < 0) {
+ const std::string s = fmt::format("failed to read v1 bucket notifications from: {}. error: {}",
+ obj.to_str(), cpp_strerror(r));
+ ldpp_dout(dpp, 1) << "ERROR: " << s << dendl;
+ rgw_clog_warn(rados, s);
+ return r;
+ }
+
+ if (v1_bucket_topics.topics.size() == 0) {
+ ldpp_dout(dpp, 20) << "INFO: v1 notifications object is empty, nothing to migrate" << dendl;
+ // delete v1 notification obj with Bucket::remove_topics()
+ r = rados_bucket.remove_topics(&bucket_topics_objv, y, dpp);
+ if (r == -ECANCELED || r == -ENOENT) {
+ ldpp_dout(dpp, 20) << "INFO: v1 notifications object: " << obj.to_str() << " already migrated" << dendl;
+ return 0; // ok, someone else already migrated
+ }
+ if (r < 0) {
+ const std::string s = fmt::format("failed to remove migrated v1 bucket notifications obj: {}. error: {}",
+ obj.to_str(), cpp_strerror(-r));
+ ldpp_dout(dpp, 1) << "ERROR: " << s << dendl;
+ rgw_clog_warn(rados, s);
+ return r;
+ }
+ return 0;
+ }
+
+ // in a for-loop that retries ECANCELED errors:
+ // {
+ // load the corresponding bucket by name
+ // break if marker doesn't match loaded bucket's
+ // merge with existing RGW_ATTR_BUCKET_NOTIFICATION topics (don't override existing v2)
+ // write RGW_ATTR_BUCKET_NOTIFICATION xattr
+ // }
+ std::unique_ptr<rgw::sal::Bucket> bucket;
+ r = -ECANCELED;
+ for (auto i = 0u; i < 15u && r == -ECANCELED; ++i) {
+ r = driver->load_bucket(dpp, rgw_bucket_info, &bucket, y);
+ if (r == -ENOENT) {
+ break; // bucket is deleted, we should delete the v1 notification
+ }
+ if (r < 0) {
+ const std::string s = fmt::format("failed to load the bucket from: {}. error: {}",
+ obj.to_str(), cpp_strerror(r));
+ ldpp_dout(dpp, 1) << "ERROR: " << s << dendl;
+ rgw_clog_warn(rados, s);
+ return r;
+ }
+
+ if (bucket->get_marker() != marker) {
+ break;
+ }
+
+ rgw::sal::Attrs& attrs = bucket->get_attrs();
+
+ rgw_pubsub_bucket_topics v2_bucket_topics;
+ if (const auto iter = attrs.find(RGW_ATTR_BUCKET_NOTIFICATION); iter != attrs.end()) {
+ // bucket notification v2 already exists
+ try {
+ const auto& bl = iter->second;
+ auto biter = bl.cbegin();
+ v2_bucket_topics.decode(biter);
+ } catch (buffer::error& err) {
+ const std::string s = fmt::format("failed to decode v2 bucket notifications of bucket: {}. error: {}",
+ bucket->get_name(), err.what());
+ ldpp_dout(dpp, 1) << "ERROR: " << s << dendl;
+ rgw_clog_warn(rados, s);
+ return -EIO;
+ }
+ }
+ const auto original_size = v2_bucket_topics.topics.size();
+ v2_bucket_topics.topics.merge(v1_bucket_topics.topics);
+ if (original_size == v2_bucket_topics.topics.size()) {
+ // nothing changed after the merge
+ break;
+ }
+ bufferlist bl;
+ v2_bucket_topics.encode(bl);
+ attrs[RGW_ATTR_BUCKET_NOTIFICATION] = std::move(bl);
+
+ r = bucket->merge_and_store_attrs(dpp, attrs, y);
+ if (r != -ECANCELED && r < 0) {
+ const std::string s = fmt::format("failed writing migrated notifications to bucket: {}. error: {}",
+ bucket->get_name(), cpp_strerror(-r));
+ ldpp_dout(dpp, 1) << "ERROR: " << s << dendl;
+ rgw_clog_warn(rados, s);
+ return r;
+ }
+ }
+ if (r == -ECANCELED) {
+ // we exhausted the 15 retries
+ ldpp_dout(dpp, 5) << "WARNING: giving up on writing migrated notifications to bucket: " << bucket->get_name() <<
+ ". will retry later" << dendl;
+ return r;
+ }
+
+ // delete v1 notification obj with Bucket::remove_topics()
+ r = rados_bucket.remove_topics(&bucket_topics_objv, y, dpp);
+ if (r == -ECANCELED || r == -ENOENT) {
+ ldpp_dout(dpp, 20) << "INFO: v1 notifications object: " << obj.to_str() << " already removed" << dendl;
+ return 0; // ok, someone else already migrated
+ }
+ if (r < 0) {
+ const std::string s = fmt::format("failed to remove migrated v1 bucket notifications obj: {}. error: {}",
+ obj.to_str(), cpp_strerror(-r));
+ ldpp_dout(dpp, 1) << "ERROR: " << s << dendl;
+ rgw_clog_warn(rados, s);
+ return r;
+ }
+
+ return 0;
+}
+
+// migrate topics for a given tenant
+int migrate_topics(const DoutPrefixProvider* dpp, optional_yield y,
+ rgw::sal::RadosStore* driver,
+ const rgw_raw_obj& topics_obj)
+{
+ // parse tenant name out of topics_obj "pubsub.{tenant}"
+ auto* rados = driver->getRados()->get_rados_handle();
+ std::string tenant;
+ const auto& topics_obj_oid = topics_obj.oid;
+ if (auto pos = topics_obj_oid.find(rgw::sal::pubsub_oid_prefix); pos != std::string::npos) {
+ tenant = topics_obj_oid.substr(std::string(rgw::sal::pubsub_oid_prefix).size());
+ } else {
+ const std::string s = fmt::format("failed to read tenant from name from oid: {}. error: {}",
+ topics_obj_oid, cpp_strerror(-EINVAL));
+ ldpp_dout(dpp, 1) << "ERROR: " << s << dendl;
+ rgw_clog_warn(rados, s);
+ return -EINVAL;
+ }
+
+ // migrate the topics
+ rgw_pubsub_topics topics;
+ RGWObjVersionTracker topics_objv;
+ int r = driver->read_topics(tenant, topics, &topics_objv, y, dpp);
+ if (r == -ENOENT) {
+ ldpp_dout(dpp, 20) << "INFO: v1 topics object: " << topics_obj.to_str() << " does not exists. already migrated" << dendl;
+ return 0; // ok, someone else already migrated
+ }
+ if (r < 0) {
+ const std::string s = fmt::format("failed to read v1 topics from: {}. error: {}",
+ topics_obj.to_str(), cpp_strerror(-r));
+ ldpp_dout(dpp, 1) << "ERROR: " << s << dendl;
+ rgw_clog_warn(rados, s);
+ return r;
+ }
+
+ constexpr bool exclusive = true; // don't overwrite any existing v2 metadata
+ for (const auto& [name, topic] : topics.topics) {
+ if (topic.name != topic.dest.arn_topic) {
+ ldpp_dout(dpp, 20) << "INFO: auto-generated topic: " << topic.name << " will not be migrated" << dendl;
+ continue;
+ }
+ // write the v2 topic
+ RGWObjVersionTracker objv;
+ objv.generate_new_write_ver(dpp->get_cct());
+ r = driver->write_topic_v2(topic, exclusive, objv, y, dpp);
+ if (r == -EEXIST) {
+ ldpp_dout(dpp, 20) << "INFO: v1 topics object: " << topics_obj.to_str() << " already migrated. no need to write v2 object" << dendl;
+ continue; // ok, someone else already migrated
+ }
+ if (r < 0) {
+ const std::string s = fmt::format("v1 topic migration for: {}. failed with: {}",
+ topic.name, cpp_strerror(r));
+ ldpp_dout(dpp, 1) << "ERROR: " << s << dendl;
+ rgw_clog_warn(rados, s);
+ return r;
+ }
+ }
+
+ // remove the v1 topics metadata (this destroys the lock too)
+ r = driver->remove_topics(tenant, &topics_objv, y, dpp);
+ if (r == -ECANCELED || r == -ENOENT) {
+ ldpp_dout(dpp, 20) << "INFO: v1 topics object: " << topics_obj.to_str() << " already migrated. no need to remove" << dendl;
+ return 0; // ok, someone else already migrated
+ }
+ if (r < 0) {
+ const std::string s = fmt::format("failed to remove migrated v1 topics obj: {}. error: {} ",
+ topics_obj.to_str(), cpp_strerror(r));
+ ldpp_dout(dpp, 1) << "ERROR: " << s << dendl;
+ rgw_clog_warn(rados, s);
+ return r;
+ }
+ return r;
+}
+
+} // anonymous namespace
+
+int migrate(const DoutPrefixProvider* dpp,
+ rgw::sal::RadosStore* driver,
+ boost::asio::io_context& context,
+ spawn::yield_context yield)
+{
+ auto y = optional_yield{context, yield};
+
+ ldpp_dout(dpp, 1) << "starting v1 topic migration.." << dendl;
+
+ librados::Rados* rados = driver->getRados()->get_rados_handle();
+ const rgw_pool& pool = driver->svc()->zone->get_zone_params().log_pool;
+ librados::IoCtx ioctx;
+ int r = rgw_init_ioctx(dpp, rados, pool, ioctx);
+ if (r < 0) {
+ ldpp_dout(dpp, 1) << "failed to initialize log pool for listing with: "
+ << cpp_strerror(r) << dendl;
+ return r;
+ }
+
+ // loop over all objects with oid prefix "pubsub."
+ auto filter = rgw::AccessListFilterPrefix(rgw::sal::pubsub_oid_prefix);
+ constexpr uint32_t max = 100;
+ std::string marker;
+ bool truncated = false;
+
+ std::vector<std::string> oids;
+ std::vector<std::string> topics_oid;
+ do {
+ oids.clear();
+ r = rgw_list_pool(dpp, ioctx, max, filter, marker, &oids, &truncated);
+ if (r == -ENOENT) {
+ r = 0;
+ break;
+ }
+ if (r < 0) {
+ ldpp_dout(dpp, 1) << "failed to list v1 topic metadata with: "
+ << cpp_strerror(r) << dendl;
+ return r;
+ }
+
+ std::string msg;
+ for (const std::string& oid : oids) {
+ if (oid.find(rgw::sal::pubsub_bucket_oid_infix) != oid.npos) {
+ const auto obj = rgw_raw_obj{pool, oid};
+ ldpp_dout(dpp, 4) << "migrating v1 bucket notifications " << oid << dendl;
+ r = migrate_notification(dpp, y, driver, obj);
+ ldpp_dout(dpp, 4) << "migrating v1 bucket notifications " << oid << " completed with: "
+ << ((r == 0)? "successful": cpp_strerror(r)) << dendl;
+ } else {
+ // topics will be migrated after we complete migrating the notifications
+ topics_oid.push_back(oid);
+ }
+ }
+ if (!oids.empty()) {
+ marker = oids.back(); // update marker for next listing
+ }
+ } while (truncated);
+
+
+ for (const std::string& oid : topics_oid) {
+ const auto obj = rgw_raw_obj{pool, oid};
+ ldpp_dout(dpp, 4) << "migrating v1 topics " << oid << dendl;
+ r = migrate_topics(dpp, y, driver, obj);
+ ldpp_dout(dpp, 4) << "migrating v1 topics " << oid << " completed with: "
+ << ((r == 0) ? "successful" : cpp_strerror(r)) << dendl;
+ }
+
+ ldpp_dout(dpp, 1) << "finished v1 topic migration" << dendl;
+ return 0;
+}
+
+} // rgwrados::topic_migration
def test_ps_s3_notification_push_kafka_security_sasl_scram():
kafka_security('SASL_PLAINTEXT', mechanism='SCRAM-SHA-256')
+
+@attr('data_path_v2_test')
+def test_persistent_ps_s3_data_path_v2_migration():
+ """ test data path v2 persistent migration """
+ conn = connection()
+ zonegroup = get_config_zonegroup()
+
+ # create random port for the http server
+ host = get_ip()
+ http_port = random.randint(10000, 20000)
+
+ # disable v2 notification
+ result = admin(['zonegroup', 'modify', '--disable-feature=notification_v2'], get_config_cluster())
+ assert_equal(result[1], 0)
+ result = admin(['period', 'update'], get_config_cluster())
+ assert_equal(result[1], 0)
+ result = admin(['period', 'commit'], get_config_cluster())
+ assert_equal(result[1], 0)
+
+ # create bucket
+ bucket_name = gen_bucket_name()
+ bucket = conn.create_bucket(bucket_name)
+ topic_name = bucket_name + TOPIC_SUFFIX
+
+ # create s3 topic
+ endpoint_address = 'http://'+host+':'+str(http_port)
+ endpoint_args = 'push-endpoint='+endpoint_address+'&persistent=true'
+ topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
+ topic_arn = topic_conf.set_config()
+ # create s3 notification
+ notification_name = bucket_name + NOTIFICATION_SUFFIX
+ topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn,
+ 'Events': []
+ }]
+
+ s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
+ response, status = s3_notification_conf.set_config()
+ assert_equal(status/100, 2)
+
+ # topic stats
+ result = admin(['topic', 'stats', '--topic', topic_name], get_config_cluster())
+ parsed_result = json.loads(result[0])
+ assert_equal(parsed_result['Topic Stats']['Entries'], 0)
+ assert_equal(result[1], 0)
+
+ # create objects in the bucket (async)
+ number_of_objects = 10
+ client_threads = []
+ start_time = time.time()
+ for i in range(number_of_objects):
+ key = bucket.new_key('key-'+str(i))
+ content = str(os.urandom(1024*1024))
+ thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
+ thr.start()
+ client_threads.append(thr)
+ [thr.join() for thr in client_threads]
+ time_diff = time.time() - start_time
+ print('average time for creation + async http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
+
+ http_server = None
+ try:
+ # topic stats
+ result = admin(['topic', 'stats', '--topic', topic_name], get_config_cluster())
+ parsed_result = json.loads(result[0])
+ assert_equal(parsed_result['Topic Stats']['Entries'], number_of_objects)
+ assert_equal(result[1], 0)
+
+ # create topic to poll on
+ topic_name_1 = topic_name + '_1'
+ topic_conf_1 = PSTopicS3(conn, topic_name_1, zonegroup, endpoint_args=endpoint_args)
+
+ # enable v2 notification
+ result = admin(['zonegroup', 'modify', '--enable-feature=notification_v2'], get_config_cluster())
+ assert_equal(result[1], 0)
+ result = admin(['period', 'update'], get_config_cluster())
+ assert_equal(result[1], 0)
+ result = admin(['period', 'commit'], get_config_cluster())
+ assert_equal(result[1], 0)
+
+ # poll on topic_1
+ result = 1
+ while result != 0:
+ time.sleep(1)
+ result = admin(['topic', 'rm', '--topic', topic_name_1], get_config_cluster())[1]
+
+ # topic stats
+ result = admin(['topic', 'stats', '--topic', topic_name], get_config_cluster())
+ parsed_result = json.loads(result[0])
+ assert_equal(parsed_result['Topic Stats']['Entries'], number_of_objects)
+ assert_equal(result[1], 0)
+
+ # create more objects in the bucket (async)
+ client_threads = []
+ start_time = time.time()
+ for i in range(number_of_objects):
+ key = bucket.new_key('key-'+str(i))
+ content = str(os.urandom(1024*1024))
+ thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
+ thr.start()
+ client_threads.append(thr)
+ [thr.join() for thr in client_threads]
+ time_diff = time.time() - start_time
+ print('average time for creation + async http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
+
+ # topic stats
+ result = admin(['topic', 'stats', '--topic', topic_name], get_config_cluster())
+ parsed_result = json.loads(result[0])
+ assert_equal(parsed_result['Topic Stats']['Entries'], 2*number_of_objects)
+ assert_equal(result[1], 0)
+
+ # start an http server in a separate thread
+ http_server = StreamingHTTPServer(host, http_port, num_workers=number_of_objects)
+
+ delay = 30
+ print('wait for '+str(delay)+'sec for the messages...')
+ time.sleep(delay)
+
+ # topic stats
+ result = admin(['topic', 'stats', '--topic', topic_name], get_config_cluster())
+ parsed_result = json.loads(result[0])
+ assert_equal(parsed_result['Topic Stats']['Entries'], 0)
+ assert_equal(result[1], 0)
+ # verify events
+ keys = list(bucket.list())
+ http_server.verify_s3_events(keys, exact_match=False)
+
+ except Exception as e:
+ assert False, str(e)
+ finally:
+ # cleanup
+ s3_notification_conf.del_config()
+ topic_conf.del_config()
+ # delete objects from the bucket
+ client_threads = []
+ for key in bucket.list():
+ thr = threading.Thread(target = key.delete, args=())
+ thr.start()
+ client_threads.append(thr)
+ [thr.join() for thr in client_threads]
+ # delete the bucket
+ conn.delete_bucket(bucket_name)
+ if http_server:
+ http_server.close()
+
+
+@attr('data_path_v2_test')
+def test_ps_s3_data_path_v2_migration():
+ """ test data path v2 migration """
+ conn = connection()
+ zonegroup = get_config_zonegroup()
+
+ # create random port for the http server
+ host = get_ip()
+ http_port = random.randint(10000, 20000)
+
+ # start an http server in a separate thread
+ number_of_objects = 10
+ http_server = StreamingHTTPServer(host, http_port, num_workers=number_of_objects)
+
+ # disable v2 notification
+ result = admin(['zonegroup', 'modify', '--disable-feature=notification_v2'], get_config_cluster())
+ assert_equal(result[1], 0)
+ result = admin(['period', 'update'], get_config_cluster())
+ assert_equal(result[1], 0)
+ result = admin(['period', 'commit'], get_config_cluster())
+ assert_equal(result[1], 0)
+
+ # create bucket
+ bucket_name = gen_bucket_name()
+ bucket = conn.create_bucket(bucket_name)
+ topic_name = bucket_name + TOPIC_SUFFIX
+
+ # create s3 topic
+ endpoint_address = 'http://'+host+':'+str(http_port)
+ endpoint_args = 'push-endpoint='+endpoint_address
+ topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
+ topic_arn = topic_conf.set_config()
+ # create s3 notification
+ notification_name = bucket_name + NOTIFICATION_SUFFIX
+ topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn,
+ 'Events': []
+ }]
+
+ s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
+ response, status = s3_notification_conf.set_config()
+ assert_equal(status/100, 2)
+
+ # create objects in the bucket (async)
+ client_threads = []
+ start_time = time.time()
+ for i in range(number_of_objects):
+ key = bucket.new_key('key-'+str(i))
+ content = str(os.urandom(1024*1024))
+ thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
+ thr.start()
+ client_threads.append(thr)
+ [thr.join() for thr in client_threads]
+ time_diff = time.time() - start_time
+ print('average time for creation + http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
+
+ try:
+ # verify events
+ keys = list(bucket.list())
+ http_server.verify_s3_events(keys, exact_match=False)
+
+ # create topic to poll on
+ topic_name_1 = topic_name + '_1'
+ topic_conf_1 = PSTopicS3(conn, topic_name_1, zonegroup, endpoint_args=endpoint_args)
+
+ # enable v2 notification
+ result = admin(['zonegroup', 'modify', '--enable-feature=notification_v2'], get_config_cluster())
+ assert_equal(result[1], 0)
+ result = admin(['period', 'update'], get_config_cluster())
+ assert_equal(result[1], 0)
+ result = admin(['period', 'commit'], get_config_cluster())
+ assert_equal(result[1], 0)
+
+
+ # poll on topic_1
+ result = 1
+ while result != 0:
+ time.sleep(1)
+ result = admin(['topic', 'rm', '--topic', topic_name_1], get_config_cluster())[1]
+
+
+ # create more objects in the bucket (async)
+ client_threads = []
+ start_time = time.time()
+ for i in range(number_of_objects):
+ key = bucket.new_key('key-'+str(i))
+ content = str(os.urandom(1024*1024))
+ thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
+ thr.start()
+ client_threads.append(thr)
+ [thr.join() for thr in client_threads]
+ time_diff = time.time() - start_time
+ print('average time for creation + http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
+
+ # verify events
+ keys = list(bucket.list())
+ http_server.verify_s3_events(keys, exact_match=True)
+
+ except Exception as e:
+ assert False, str(e)
+ finally:
+ # cleanup
+ s3_notification_conf.del_config()
+ topic_conf.del_config()
+ # delete objects from the bucket
+ client_threads = []
+ for key in bucket.list():
+ thr = threading.Thread(target = key.delete, args=())
+ thr.start()
+ client_threads.append(thr)
+ [thr.join() for thr in client_threads]
+ # delete the bucket
+ conn.delete_bucket(bucket_name)
+ http_server.close()
+
+
+@attr('data_path_v2_test')
+def test_ps_s3_data_path_v2_large_migration():
+ """ test data path v2 large migration """
+ conn = connection()
+ connections_list = []
+ connections_list.append(conn)
+ zonegroup = get_config_zonegroup()
+ tenants_list = []
+ tenants_list.append('')
+ for i in ['1', '2']:
+ access_key = str(time.time())
+ secret_key = str(time.time())
+ uid = UID_PREFIX + str(time.time())
+ tenant_id = 'kaboom_' + i
+ _, result = admin(['user', 'create', '--uid', uid, '--tenant', tenant_id, '--access-key', access_key, '--secret-key', secret_key, '--display-name', '"Super Man"'], get_config_cluster())
+ assert_equal(result, 0)
+ tenants_list.append(tenant_id)
+ conn = S3Connection(aws_access_key_id=access_key,
+ aws_secret_access_key=secret_key,
+ is_secure=False, port=get_config_port(), host=get_config_host(),
+ calling_format='boto.s3.connection.OrdinaryCallingFormat')
+ connections_list.append(conn)
+
+ # disable v2 notification
+ result = admin(['zonegroup', 'modify', '--disable-feature=notification_v2'], get_config_cluster())
+ assert_equal(result[1], 0)
+ result = admin(['period', 'update'], get_config_cluster())
+ assert_equal(result[1], 0)
+ result = admin(['period', 'commit'], get_config_cluster())
+ assert_equal(result[1], 0)
+
+ # create random port for the http server
+ host = get_ip()
+ http_port = random.randint(10000, 20000)
+
+ # create s3 topic
+ buckets_list = []
+ topics_conf_list = []
+ s3_notification_conf_list = []
+ num_of_s3_notifications = 110
+ for conn in connections_list:
+ # create bucket
+ bucket_name = gen_bucket_name()
+ bucket = conn.create_bucket(bucket_name)
+ buckets_list.append(bucket)
+ topic_name = bucket_name + TOPIC_SUFFIX
+ # create s3 topic
+ endpoint_address = 'http://' + host + ':' + str(http_port)
+ endpoint_args = 'push-endpoint=' + endpoint_address + '&persistent=true'
+ topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
+ topics_conf_list.append(topic_conf)
+ topic_arn = topic_conf.set_config()
+ # create s3 110 notifications
+ s3_notification_list = []
+ for i in range(num_of_s3_notifications):
+ notification_name = bucket_name + NOTIFICATION_SUFFIX + '_' + str(i + 1)
+ s3_notification_list.append({'Id': notification_name, 'TopicArn': topic_arn,
+ 'Events': []
+ })
+
+ s3_notification_conf = PSNotificationS3(conn, bucket_name, s3_notification_list)
+ s3_notification_conf_list.append(s3_notification_conf)
+ response, status = s3_notification_conf.set_config()
+ assert_equal(status / 100, 2)
+
+ # create topic to poll on
+ polling_topics_conf = []
+ for conn, bucket in zip(connections_list, buckets_list):
+ topic_name = bucket.name + TOPIC_SUFFIX + '_1'
+ topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
+ polling_topics_conf.append(topic_conf)
+
+ # enable v2 notification
+ result = admin(['zonegroup', 'modify', '--enable-feature=notification_v2'], get_config_cluster())
+ assert_equal(result[1], 0)
+ result = admin(['period', 'update'], get_config_cluster())
+ assert_equal(result[1], 0)
+ result = admin(['period', 'commit'], get_config_cluster())
+ assert_equal(result[1], 0)
+
+ # poll on topic_1
+ for tenant, topic_conf in zip(tenants_list, polling_topics_conf):
+ while True:
+ if tenant == '':
+ result = admin(['topic', 'rm', '--topic', topic_conf.topic_name], get_config_cluster())
+ else:
+ result = admin(['topic', 'rm', '--topic', topic_conf.topic_name, '--tenant', tenant], get_config_cluster())
+
+ if result[1] != 0:
+ print('migration in process... error: '+str(result[1]))
+ else:
+ break
+
+ time.sleep(1)
+
+ # check if we migrated all the topics
+ for tenant in tenants_list:
+ if tenant == '':
+ topics_result = admin(['topic', 'list'], get_config_cluster())
+ else:
+ topics_result = admin(['topic', 'list', '--tenant', tenant], get_config_cluster())
+ topics_json = json.loads(topics_result[0])
+ assert_equal(len(topics_json['topics']), 1)
+
+ # check if we migrated all the notifications
+ for tenant, bucket in zip(tenants_list, buckets_list):
+ if tenant == '':
+ result = admin(['notification', 'list', '--bucket', bucket.name], get_config_cluster())
+ else:
+ result = admin(['notification', 'list', '--bucket', bucket.name, '--tenant', tenant], get_config_cluster())
+ parsed_result = json.loads(result[0])
+ assert_equal(len(parsed_result['notifications']), num_of_s3_notifications)
+
+ # cleanup
+ for s3_notification_conf in s3_notification_conf_list:
+ s3_notification_conf.del_config()
+ for topic_conf in topics_conf_list:
+ topic_conf.del_config()
+ # delete the bucket
+ for conn, bucket in zip(connections_list, buckets_list):
+ conn.delete_bucket(bucket.name)
+
+
+@attr('data_path_v2_test')
+def test_ps_s3_data_path_v2_mixed_migration():
+ """ test data path v2 mixed migration """
+ conn = connection()
+ connections_list = []
+ connections_list.append(conn)
+ zonegroup = get_config_zonegroup()
+ tenants_list = []
+ tenants_list.append('')
+
+ # make sure that we start at v2
+ result = admin(['zonegroup', 'modify', '--enable-feature=notification_v2'], get_config_cluster())
+ assert_equal(result[1], 0)
+ result = admin(['period', 'update'], get_config_cluster())
+ assert_equal(result[1], 0)
+ result = admin(['period', 'commit'], get_config_cluster())
+ assert_equal(result[1], 0)
+
+ for i in ['1', '2']:
+ access_key = str(time.time())
+ secret_key = str(time.time())
+ uid = UID_PREFIX + str(time.time())
+ tenant_id = 'kaboom_' + i
+ _, result = admin(['user', 'create', '--uid', uid, '--tenant', tenant_id, '--access-key', access_key, '--secret-key', secret_key, '--display-name', '"Super Man"'], get_config_cluster())
+ assert_equal(result, 0)
+ tenants_list.append(tenant_id)
+ conn = S3Connection(aws_access_key_id=access_key,
+ aws_secret_access_key=secret_key,
+ is_secure=False, port=get_config_port(), host=get_config_host(),
+ calling_format='boto.s3.connection.OrdinaryCallingFormat')
+ connections_list.append(conn)
+
+ # create random port for the http server
+ host = get_ip()
+ http_port = random.randint(10000, 20000)
+
+ # create s3 topic
+ buckets_list = []
+ topics_conf_list = []
+ s3_notification_conf_list = []
+ topic_arn_list = []
+ created_version = '_created_v2'
+ for conn in connections_list:
+ # create bucket
+ bucket_name = gen_bucket_name()
+ bucket = conn.create_bucket(bucket_name)
+ buckets_list.append(bucket)
+ topic_name = bucket_name + TOPIC_SUFFIX + created_version
+ # create s3 topic
+ endpoint_address = 'http://' + host + ':' + str(http_port)
+ endpoint_args = 'push-endpoint=' + endpoint_address + '&persistent=true'
+ topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
+ topics_conf_list.append(topic_conf)
+ topic_arn = topic_conf.set_config()
+ topic_arn_list.append(topic_arn)
+ # create s3 notification
+ notification_name = bucket_name + NOTIFICATION_SUFFIX + created_version
+ s3_notification_list = [{'Id': notification_name, 'TopicArn': topic_arn,
+ 'Events': []
+ }]
+
+ s3_notification_conf = PSNotificationS3(conn, bucket_name, s3_notification_list)
+ s3_notification_conf_list.append(s3_notification_conf)
+ response, status = s3_notification_conf.set_config()
+ assert_equal(status / 100, 2)
+
+ # disable v2 notification
+ result = admin(['zonegroup', 'modify', '--disable-feature=notification_v2'], get_config_cluster())
+ assert_equal(result[1], 0)
+ result = admin(['period', 'update'], get_config_cluster())
+ assert_equal(result[1], 0)
+ result = admin(['period', 'commit'], get_config_cluster())
+ assert_equal(result[1], 0)
+
+ # create s3 topic
+ created_version = '_created_v1'
+ for conn, bucket in zip(connections_list, buckets_list):
+ # create bucket
+ bucket_name = bucket.name
+ topic_name = bucket_name + TOPIC_SUFFIX + created_version
+ # create s3 topic
+ endpoint_address = 'http://' + host + ':' + str(http_port)
+ endpoint_args = 'push-endpoint=' + endpoint_address + '&persistent=true'
+ topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
+ topics_conf_list.append(topic_conf)
+ topic_arn = topic_conf.set_config()
+ # create s3 notification
+ notification_name = bucket_name + NOTIFICATION_SUFFIX + created_version
+ s3_notification_list = [{'Id': notification_name, 'TopicArn': topic_arn,
+ 'Events': []
+ }]
+
+ s3_notification_conf = PSNotificationS3(conn, bucket_name, s3_notification_list)
+ s3_notification_conf_list.append(s3_notification_conf)
+ response, status = s3_notification_conf.set_config()
+ assert_equal(status / 100, 2)
+
+ # create topic to poll on
+ polling_topics_conf = []
+ for conn, bucket in zip(connections_list, buckets_list):
+ topic_name = bucket.name + TOPIC_SUFFIX + '_1'
+ topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
+ polling_topics_conf.append(topic_conf)
+
+ # enable v2 notification
+ result = admin(['zonegroup', 'modify', '--enable-feature=notification_v2'], get_config_cluster())
+ assert_equal(result[1], 0)
+ result = admin(['period', 'update'], get_config_cluster())
+ assert_equal(result[1], 0)
+ result = admin(['period', 'commit'], get_config_cluster())
+ assert_equal(result[1], 0)
+
+ # poll on topic_1
+ for tenant, topic_conf in zip(tenants_list, polling_topics_conf):
+ while True:
+ if tenant == '':
+ result = admin(['topic', 'rm', '--topic', topic_conf.topic_name], get_config_cluster())
+ else:
+ result = admin(['topic', 'rm', '--topic', topic_conf.topic_name, '--tenant', tenant], get_config_cluster())
+
+ if result[1] != 0:
+ print(result)
+ else:
+ break
+
+ time.sleep(1)
+
+ # check if we migrated all the topics
+ for tenant in tenants_list:
+ if tenant == '':
+ topics_result = admin(['topic', 'list'], get_config_cluster())
+ else:
+ topics_result = admin(['topic', 'list', '--tenant', tenant], get_config_cluster())
+ topics_json = json.loads(topics_result[0])
+ assert_equal(len(topics_json['topics']), 2)
+
+ # check if we migrated all the notifications
+ for tenant, bucket in zip(tenants_list, buckets_list):
+ if tenant == '':
+ notifications_result = admin(['notification', 'list', '--bucket', bucket.name], get_config_cluster())
+ else:
+ notifications_result = admin(['notification', 'list', '--bucket', bucket.name, '--tenant', tenant], get_config_cluster())
+ notifications_json = json.loads(notifications_result[0])
+ assert_equal(len(notifications_json['notifications']), 2)
+
+ # cleanup
+ for s3_notification_conf in s3_notification_conf_list:
+ s3_notification_conf.del_config()
+ for topic_conf in topics_conf_list:
+ topic_conf.del_config()
+ # delete the bucket
+ for conn, bucket in zip(connections_list, buckets_list):
+ conn.delete_bucket(bucket.name)
+