From 4a6c4caccd8a16bbf4fd872b057e116d37275d0c Mon Sep 17 00:00:00 2001 From: Ali Masarwa Date: Tue, 9 Jan 2024 23:51:54 +0200 Subject: [PATCH] RGW/Rados: Migrate topics to data path v2 also add migration tests Signed-off-by: Ali Masarwa (cherry picked from commit 1a7d1454801e3d7b888aa734c4e3b609febf417f) --- src/rgw/CMakeLists.txt | 3 +- src/rgw/driver/rados/rgw_notify.cc | 2 +- src/rgw/driver/rados/rgw_rados.cc | 13 + src/rgw/driver/rados/rgw_rados.h | 3 + src/rgw/driver/rados/rgw_sal_rados.cc | 5 +- src/rgw/driver/rados/rgw_sal_rados.h | 3 + src/rgw/driver/rados/topic_migration.cc | 334 ++++++++++++ src/rgw/driver/rados/topic_migration.h | 34 ++ src/rgw/rgw_obj_types.h | 4 + src/test/rgw/bucket_notification/test_bn.py | 537 ++++++++++++++++++++ 10 files changed, 934 insertions(+), 4 deletions(-) create mode 100644 src/rgw/driver/rados/topic_migration.cc create mode 100644 src/rgw/driver/rados/topic_migration.h diff --git a/src/rgw/CMakeLists.txt b/src/rgw/CMakeLists.txt index 1bf433cb395..fd656502a78 100644 --- a/src/rgw/CMakeLists.txt +++ b/src/rgw/CMakeLists.txt @@ -194,7 +194,8 @@ set(librgw_common_srcs driver/rados/rgw_user.cc driver/rados/rgw_zone.cc driver/rados/sync_fairness.cc - driver/rados/topic.cc) + driver/rados/topic.cc + driver/rados/topic_migration.cc) list(APPEND librgw_common_srcs driver/immutable_config/store.cc diff --git a/src/rgw/driver/rados/rgw_notify.cc b/src/rgw/driver/rados/rgw_notify.cc index b2e91e3a77a..31cae8daa31 100644 --- a/src/rgw/driver/rados/rgw_notify.cc +++ b/src/rgw/driver/rados/rgw_notify.cc @@ -1064,7 +1064,7 @@ int publish_reserve(const DoutPrefixProvider* dpp, rgw_pubsub_topic result; const RGWPubSub ps(res.store, res.user_tenant, site); auto ret = - ps.get_topic(res.dpp, topic_cfg.name, result, res.yield, nullptr); + ps.get_topic(res.dpp, topic_cfg.dest.arn_topic, result, res.yield, nullptr); if (ret < 0) { ldpp_dout(res.dpp, 1) << "INFO: failed to load topic: " << topic_cfg.name diff --git a/src/rgw/driver/rados/rgw_rados.cc b/src/rgw/driver/rados/rgw_rados.cc index b74f2d0798d..c9da60eff9e 100644 --- a/src/rgw/driver/rados/rgw_rados.cc +++ b/src/rgw/driver/rados/rgw_rados.cc @@ -77,6 +77,7 @@ #include "rgw_realm_watcher.h" #include "rgw_reshard.h" #include "rgw_cr_rados.h" +#include "topic_migration.h" #include "services/svc_zone.h" #include "services/svc_zone_utils.h" @@ -1100,6 +1101,7 @@ void RGWRados::finalize() if (run_notification_thread) { rgw::notify::shutdown(); + v1_topic_migration.stop(); } } @@ -1357,6 +1359,17 @@ int RGWRados::init_complete(const DoutPrefixProvider *dpp, optional_yield y) ret = rgw::notify::init(cct, driver, *svc.site, dpp); if (ret < 0 ) { ldpp_dout(dpp, 1) << "ERROR: failed to initialize notification manager" << dendl; + return ret; + } + + using namespace rgw; + if (svc.site->is_meta_master() && + all_zonegroups_support(*svc.site, zone_features::notification_v2)) { + spawn::spawn(v1_topic_migration, [this] (spawn::yield_context yield) { + DoutPrefix dpp{cct, dout_subsys, "v1 topic migration: "}; + rgwrados::topic_migration::migrate(&dpp, driver, v1_topic_migration, yield); + }); + v1_topic_migration.start(1); } } diff --git a/src/rgw/driver/rados/rgw_rados.h b/src/rgw/driver/rados/rgw_rados.h index 3d7776b0fa0..f05b661b6fd 100644 --- a/src/rgw/driver/rados/rgw_rados.h +++ b/src/rgw/driver/rados/rgw_rados.h @@ -14,6 +14,7 @@ #include "common/RefCountedObj.h" #include "common/ceph_time.h" #include "common/Timer.h" +#include "common/async/context_pool.h" #include "rgw_common.h" #include "cls/rgw/cls_rgw_types.h" #include "cls/version/cls_version_types.h" @@ -390,6 +391,8 @@ class RGWRados ceph::mutex meta_sync_thread_lock{ceph::make_mutex("meta_sync_thread_lock")}; ceph::mutex data_sync_thread_lock{ceph::make_mutex("data_sync_thread_lock")}; + ceph::async::io_context_pool v1_topic_migration; + librados::IoCtx root_pool_ctx; // .rgw ceph::mutex bucket_id_lock{ceph::make_mutex("rados_bucket_id")}; diff --git a/src/rgw/driver/rados/rgw_sal_rados.cc b/src/rgw/driver/rados/rgw_sal_rados.cc index 597f4f1ccf2..7239e289b6e 100644 --- a/src/rgw/driver/rados/rgw_sal_rados.cc +++ b/src/rgw/driver/rados/rgw_sal_rados.cc @@ -79,7 +79,8 @@ namespace rgw::sal { // default number of entries to list with each bucket listing call // (use marker to bridge between calls) static constexpr size_t listing_max_entries = 1000; -static std::string pubsub_oid_prefix = "pubsub."; +const std::string pubsub_oid_prefix = "pubsub."; +const std::string pubsub_bucket_oid_infix = ".bucket."; static int drain_aio(std::list& handles) { @@ -869,7 +870,7 @@ int RadosBucket::abort_multiparts(const DoutPrefixProvider* dpp, } std::string RadosBucket::topics_oid() const { - return pubsub_oid_prefix + get_tenant() + ".bucket." + get_name() + "/" + get_marker(); + return pubsub_oid_prefix + get_tenant() + pubsub_bucket_oid_infix + get_name() + "/" + get_marker(); } int RadosBucket::read_topics(rgw_pubsub_bucket_topics& notifications, diff --git a/src/rgw/driver/rados/rgw_sal_rados.h b/src/rgw/driver/rados/rgw_sal_rados.h index 4e71045cda1..71f7a83a74c 100644 --- a/src/rgw/driver/rados/rgw_sal_rados.h +++ b/src/rgw/driver/rados/rgw_sal_rados.h @@ -33,6 +33,9 @@ namespace rgw { namespace sal { class RadosMultipartUpload; +extern const std::string pubsub_oid_prefix; // v1 topic metadata prefix +extern const std::string pubsub_bucket_oid_infix; // v1 notification in-fix + class RadosPlacementTier: public StorePlacementTier { RadosStore* store; RGWZoneGroupPlacementTier tier; diff --git a/src/rgw/driver/rados/topic_migration.cc b/src/rgw/driver/rados/topic_migration.cc new file mode 100644 index 00000000000..c7dcfc37b44 --- /dev/null +++ b/src/rgw/driver/rados/topic_migration.cc @@ -0,0 +1,334 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab ft=cpp + +/* + * Ceph - scalable distributed file system + * + * Copyright contributors to the Ceph project + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include "topic_migration.h" +#include "services/svc_zone.h" +#include "rgw_sal_rados.h" + +namespace rgwrados::topic_migration { + +namespace { + +int deconstruct_topics_oid(const std::string& bucket_topics_oid, std::string& tenant, std::string& bucket_name, + std::string& marker, const DoutPrefixProvider* dpp) { + auto pos = bucket_topics_oid.find(rgw::sal::pubsub_bucket_oid_infix); + if (pos == std::string::npos) { + ldpp_dout(dpp, 1) << "ERROR: bucket_topics_oid:" << bucket_topics_oid << " doesn't contain " << rgw::sal::pubsub_bucket_oid_infix + << " after tenant name!" << dendl; + return -EINVAL; + } + const size_t prefix_len = rgw::sal::pubsub_oid_prefix.size(); + tenant = bucket_topics_oid.substr(prefix_len, pos - prefix_len); + + auto bucket_name_marker = bucket_topics_oid.substr(pos + rgw::sal::pubsub_bucket_oid_infix.size()); + pos = bucket_name_marker.find('/'); + if (pos == std::string::npos) { + ldpp_dout(dpp, 1) << "ERROR: bucket_topics_oid:" << bucket_topics_oid << " doesn't contain / after bucket name!" << dendl; + return -EINVAL; + } + bucket_name = bucket_name_marker.substr(0, pos); + marker = bucket_name_marker.substr(pos + 1); + + return 0; +} + +// migrate v1 notification metadata for a single bucket +int migrate_notification(const DoutPrefixProvider* dpp, optional_yield y, + rgw::sal::RadosStore* driver, const rgw_raw_obj& obj) +{ + // parse bucket name and marker of out "pubsub.{tenant}.bucket.{name}/{marker}" + auto* rados = driver->getRados()->get_rados_handle(); + std::string tenant; + std::string bucket_name; + std::string marker; + int r = deconstruct_topics_oid(obj.oid, tenant, bucket_name, marker, dpp); + if (r < 0) { + const std::string s = fmt::format("failed to read tenant, bucket name and marker from: {}. error: {}. {}", + obj.to_str(), cpp_strerror(r), "expected format pubsub.{tenant}.bucket.{name}/{marker}!"); + ldpp_dout(dpp, 1) << "ERROR: " << s << dendl; + rgw_clog_warn(rados, s); + return r; + } + + // migrate the notifications + rgw_pubsub_bucket_topics v1_bucket_topics; + rgw_bucket rgw_bucket_info(tenant, bucket_name); + rgw_bucket_info.marker = marker; + rgw::sal::RadosBucket rados_bucket(driver, rgw_bucket_info); + RGWObjVersionTracker bucket_topics_objv; + r = rados_bucket.read_topics(v1_bucket_topics, &bucket_topics_objv, y, dpp); + if (r == -ENOENT) { + return 0; // ok, someone else already migrated + } + if (r < 0) { + const std::string s = fmt::format("failed to read v1 bucket notifications from: {}. error: {}", + obj.to_str(), cpp_strerror(r)); + ldpp_dout(dpp, 1) << "ERROR: " << s << dendl; + rgw_clog_warn(rados, s); + return r; + } + + if (v1_bucket_topics.topics.size() == 0) { + ldpp_dout(dpp, 20) << "INFO: v1 notifications object is empty, nothing to migrate" << dendl; + // delete v1 notification obj with Bucket::remove_topics() + r = rados_bucket.remove_topics(&bucket_topics_objv, y, dpp); + if (r == -ECANCELED || r == -ENOENT) { + ldpp_dout(dpp, 20) << "INFO: v1 notifications object: " << obj.to_str() << " already migrated" << dendl; + return 0; // ok, someone else already migrated + } + if (r < 0) { + const std::string s = fmt::format("failed to remove migrated v1 bucket notifications obj: {}. error: {}", + obj.to_str(), cpp_strerror(-r)); + ldpp_dout(dpp, 1) << "ERROR: " << s << dendl; + rgw_clog_warn(rados, s); + return r; + } + return 0; + } + + // in a for-loop that retries ECANCELED errors: + // { + // load the corresponding bucket by name + // break if marker doesn't match loaded bucket's + // merge with existing RGW_ATTR_BUCKET_NOTIFICATION topics (don't override existing v2) + // write RGW_ATTR_BUCKET_NOTIFICATION xattr + // } + std::unique_ptr bucket; + r = -ECANCELED; + for (auto i = 0u; i < 15u && r == -ECANCELED; ++i) { + r = driver->load_bucket(dpp, rgw_bucket_info, &bucket, y); + if (r == -ENOENT) { + break; // bucket is deleted, we should delete the v1 notification + } + if (r < 0) { + const std::string s = fmt::format("failed to load the bucket from: {}. error: {}", + obj.to_str(), cpp_strerror(r)); + ldpp_dout(dpp, 1) << "ERROR: " << s << dendl; + rgw_clog_warn(rados, s); + return r; + } + + if (bucket->get_marker() != marker) { + break; + } + + rgw::sal::Attrs& attrs = bucket->get_attrs(); + + rgw_pubsub_bucket_topics v2_bucket_topics; + if (const auto iter = attrs.find(RGW_ATTR_BUCKET_NOTIFICATION); iter != attrs.end()) { + // bucket notification v2 already exists + try { + const auto& bl = iter->second; + auto biter = bl.cbegin(); + v2_bucket_topics.decode(biter); + } catch (buffer::error& err) { + const std::string s = fmt::format("failed to decode v2 bucket notifications of bucket: {}. error: {}", + bucket->get_name(), err.what()); + ldpp_dout(dpp, 1) << "ERROR: " << s << dendl; + rgw_clog_warn(rados, s); + return -EIO; + } + } + const auto original_size = v2_bucket_topics.topics.size(); + v2_bucket_topics.topics.merge(v1_bucket_topics.topics); + if (original_size == v2_bucket_topics.topics.size()) { + // nothing changed after the merge + break; + } + bufferlist bl; + v2_bucket_topics.encode(bl); + attrs[RGW_ATTR_BUCKET_NOTIFICATION] = std::move(bl); + + r = bucket->merge_and_store_attrs(dpp, attrs, y); + if (r != -ECANCELED && r < 0) { + const std::string s = fmt::format("failed writing migrated notifications to bucket: {}. error: {}", + bucket->get_name(), cpp_strerror(-r)); + ldpp_dout(dpp, 1) << "ERROR: " << s << dendl; + rgw_clog_warn(rados, s); + return r; + } + } + if (r == -ECANCELED) { + // we exhausted the 15 retries + ldpp_dout(dpp, 5) << "WARNING: giving up on writing migrated notifications to bucket: " << bucket->get_name() << + ". will retry later" << dendl; + return r; + } + + // delete v1 notification obj with Bucket::remove_topics() + r = rados_bucket.remove_topics(&bucket_topics_objv, y, dpp); + if (r == -ECANCELED || r == -ENOENT) { + ldpp_dout(dpp, 20) << "INFO: v1 notifications object: " << obj.to_str() << " already removed" << dendl; + return 0; // ok, someone else already migrated + } + if (r < 0) { + const std::string s = fmt::format("failed to remove migrated v1 bucket notifications obj: {}. error: {}", + obj.to_str(), cpp_strerror(-r)); + ldpp_dout(dpp, 1) << "ERROR: " << s << dendl; + rgw_clog_warn(rados, s); + return r; + } + + return 0; +} + +// migrate topics for a given tenant +int migrate_topics(const DoutPrefixProvider* dpp, optional_yield y, + rgw::sal::RadosStore* driver, + const rgw_raw_obj& topics_obj) +{ + // parse tenant name out of topics_obj "pubsub.{tenant}" + auto* rados = driver->getRados()->get_rados_handle(); + std::string tenant; + const auto& topics_obj_oid = topics_obj.oid; + if (auto pos = topics_obj_oid.find(rgw::sal::pubsub_oid_prefix); pos != std::string::npos) { + tenant = topics_obj_oid.substr(std::string(rgw::sal::pubsub_oid_prefix).size()); + } else { + const std::string s = fmt::format("failed to read tenant from name from oid: {}. error: {}", + topics_obj_oid, cpp_strerror(-EINVAL)); + ldpp_dout(dpp, 1) << "ERROR: " << s << dendl; + rgw_clog_warn(rados, s); + return -EINVAL; + } + + // migrate the topics + rgw_pubsub_topics topics; + RGWObjVersionTracker topics_objv; + int r = driver->read_topics(tenant, topics, &topics_objv, y, dpp); + if (r == -ENOENT) { + ldpp_dout(dpp, 20) << "INFO: v1 topics object: " << topics_obj.to_str() << " does not exists. already migrated" << dendl; + return 0; // ok, someone else already migrated + } + if (r < 0) { + const std::string s = fmt::format("failed to read v1 topics from: {}. error: {}", + topics_obj.to_str(), cpp_strerror(-r)); + ldpp_dout(dpp, 1) << "ERROR: " << s << dendl; + rgw_clog_warn(rados, s); + return r; + } + + constexpr bool exclusive = true; // don't overwrite any existing v2 metadata + for (const auto& [name, topic] : topics.topics) { + if (topic.name != topic.dest.arn_topic) { + ldpp_dout(dpp, 20) << "INFO: auto-generated topic: " << topic.name << " will not be migrated" << dendl; + continue; + } + // write the v2 topic + RGWObjVersionTracker objv; + objv.generate_new_write_ver(dpp->get_cct()); + r = driver->write_topic_v2(topic, exclusive, objv, y, dpp); + if (r == -EEXIST) { + ldpp_dout(dpp, 20) << "INFO: v1 topics object: " << topics_obj.to_str() << " already migrated. no need to write v2 object" << dendl; + continue; // ok, someone else already migrated + } + if (r < 0) { + const std::string s = fmt::format("v1 topic migration for: {}. failed with: {}", + topic.name, cpp_strerror(r)); + ldpp_dout(dpp, 1) << "ERROR: " << s << dendl; + rgw_clog_warn(rados, s); + return r; + } + } + + // remove the v1 topics metadata (this destroys the lock too) + r = driver->remove_topics(tenant, &topics_objv, y, dpp); + if (r == -ECANCELED || r == -ENOENT) { + ldpp_dout(dpp, 20) << "INFO: v1 topics object: " << topics_obj.to_str() << " already migrated. no need to remove" << dendl; + return 0; // ok, someone else already migrated + } + if (r < 0) { + const std::string s = fmt::format("failed to remove migrated v1 topics obj: {}. error: {} ", + topics_obj.to_str(), cpp_strerror(r)); + ldpp_dout(dpp, 1) << "ERROR: " << s << dendl; + rgw_clog_warn(rados, s); + return r; + } + return r; +} + +} // anonymous namespace + +int migrate(const DoutPrefixProvider* dpp, + rgw::sal::RadosStore* driver, + boost::asio::io_context& context, + spawn::yield_context yield) +{ + auto y = optional_yield{context, yield}; + + ldpp_dout(dpp, 1) << "starting v1 topic migration.." << dendl; + + librados::Rados* rados = driver->getRados()->get_rados_handle(); + const rgw_pool& pool = driver->svc()->zone->get_zone_params().log_pool; + librados::IoCtx ioctx; + int r = rgw_init_ioctx(dpp, rados, pool, ioctx); + if (r < 0) { + ldpp_dout(dpp, 1) << "failed to initialize log pool for listing with: " + << cpp_strerror(r) << dendl; + return r; + } + + // loop over all objects with oid prefix "pubsub." + auto filter = rgw::AccessListFilterPrefix(rgw::sal::pubsub_oid_prefix); + constexpr uint32_t max = 100; + std::string marker; + bool truncated = false; + + std::vector oids; + std::vector topics_oid; + do { + oids.clear(); + r = rgw_list_pool(dpp, ioctx, max, filter, marker, &oids, &truncated); + if (r == -ENOENT) { + r = 0; + break; + } + if (r < 0) { + ldpp_dout(dpp, 1) << "failed to list v1 topic metadata with: " + << cpp_strerror(r) << dendl; + return r; + } + + std::string msg; + for (const std::string& oid : oids) { + if (oid.find(rgw::sal::pubsub_bucket_oid_infix) != oid.npos) { + const auto obj = rgw_raw_obj{pool, oid}; + ldpp_dout(dpp, 4) << "migrating v1 bucket notifications " << oid << dendl; + r = migrate_notification(dpp, y, driver, obj); + ldpp_dout(dpp, 4) << "migrating v1 bucket notifications " << oid << " completed with: " + << ((r == 0)? "successful": cpp_strerror(r)) << dendl; + } else { + // topics will be migrated after we complete migrating the notifications + topics_oid.push_back(oid); + } + } + if (!oids.empty()) { + marker = oids.back(); // update marker for next listing + } + } while (truncated); + + + for (const std::string& oid : topics_oid) { + const auto obj = rgw_raw_obj{pool, oid}; + ldpp_dout(dpp, 4) << "migrating v1 topics " << oid << dendl; + r = migrate_topics(dpp, y, driver, obj); + ldpp_dout(dpp, 4) << "migrating v1 topics " << oid << " completed with: " + << ((r == 0) ? "successful" : cpp_strerror(r)) << dendl; + } + + ldpp_dout(dpp, 1) << "finished v1 topic migration" << dendl; + return 0; +} + +} // rgwrados::topic_migration diff --git a/src/rgw/driver/rados/topic_migration.h b/src/rgw/driver/rados/topic_migration.h new file mode 100644 index 00000000000..9545fd63c2e --- /dev/null +++ b/src/rgw/driver/rados/topic_migration.h @@ -0,0 +1,34 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab ft=cpp + +/* + * Ceph - scalable distributed file system + * + * Copyright contributors to the Ceph project + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#pragma once + +#include +#include + +class DoutPrefixProvider; +namespace rgw::sal { class RadosStore; } + +// the squid release changes the format of topic/notification metadata. once the +// notification_v2 feature gets enabled, this migration logic runs on startup to +// convert all v1 metadata to the v2 format +namespace rgwrados::topic_migration { + +int migrate(const DoutPrefixProvider* dpp, + rgw::sal::RadosStore* driver, + boost::asio::io_context& context, + spawn::yield_context yield); + +} // rgwrados::topic_migration diff --git a/src/rgw/rgw_obj_types.h b/src/rgw/rgw_obj_types.h index a092e5ccdab..5dac66086e6 100644 --- a/src/rgw/rgw_obj_types.h +++ b/src/rgw/rgw_obj_types.h @@ -477,6 +477,10 @@ struct rgw_raw_obj { void dump(Formatter *f) const; static void generate_test_instances(std::list& o); void decode_json(JSONObj *obj); + + inline std::string to_str() const { + return pool.to_str() + ":" + oid; + } }; WRITE_CLASS_ENCODER(rgw_raw_obj) diff --git a/src/test/rgw/bucket_notification/test_bn.py b/src/test/rgw/bucket_notification/test_bn.py index c9049dfd1f8..c10c413a2a7 100644 --- a/src/test/rgw/bucket_notification/test_bn.py +++ b/src/test/rgw/bucket_notification/test_bn.py @@ -4690,3 +4690,540 @@ def test_ps_s3_notification_push_kafka_security_ssl_sasl_scram(): def test_ps_s3_notification_push_kafka_security_sasl_scram(): kafka_security('SASL_PLAINTEXT', mechanism='SCRAM-SHA-256') + +@attr('data_path_v2_test') +def test_persistent_ps_s3_data_path_v2_migration(): + """ test data path v2 persistent migration """ + conn = connection() + zonegroup = get_config_zonegroup() + + # create random port for the http server + host = get_ip() + http_port = random.randint(10000, 20000) + + # disable v2 notification + result = admin(['zonegroup', 'modify', '--disable-feature=notification_v2'], get_config_cluster()) + assert_equal(result[1], 0) + result = admin(['period', 'update'], get_config_cluster()) + assert_equal(result[1], 0) + result = admin(['period', 'commit'], get_config_cluster()) + assert_equal(result[1], 0) + + # create bucket + bucket_name = gen_bucket_name() + bucket = conn.create_bucket(bucket_name) + topic_name = bucket_name + TOPIC_SUFFIX + + # create s3 topic + endpoint_address = 'http://'+host+':'+str(http_port) + endpoint_args = 'push-endpoint='+endpoint_address+'&persistent=true' + topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args) + topic_arn = topic_conf.set_config() + # create s3 notification + notification_name = bucket_name + NOTIFICATION_SUFFIX + topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn, + 'Events': [] + }] + + s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list) + response, status = s3_notification_conf.set_config() + assert_equal(status/100, 2) + + # topic stats + result = admin(['topic', 'stats', '--topic', topic_name], get_config_cluster()) + parsed_result = json.loads(result[0]) + assert_equal(parsed_result['Topic Stats']['Entries'], 0) + assert_equal(result[1], 0) + + # create objects in the bucket (async) + number_of_objects = 10 + client_threads = [] + start_time = time.time() + for i in range(number_of_objects): + key = bucket.new_key('key-'+str(i)) + content = str(os.urandom(1024*1024)) + thr = threading.Thread(target = set_contents_from_string, args=(key, content,)) + thr.start() + client_threads.append(thr) + [thr.join() for thr in client_threads] + time_diff = time.time() - start_time + print('average time for creation + async http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds') + + http_server = None + try: + # topic stats + result = admin(['topic', 'stats', '--topic', topic_name], get_config_cluster()) + parsed_result = json.loads(result[0]) + assert_equal(parsed_result['Topic Stats']['Entries'], number_of_objects) + assert_equal(result[1], 0) + + # create topic to poll on + topic_name_1 = topic_name + '_1' + topic_conf_1 = PSTopicS3(conn, topic_name_1, zonegroup, endpoint_args=endpoint_args) + + # enable v2 notification + result = admin(['zonegroup', 'modify', '--enable-feature=notification_v2'], get_config_cluster()) + assert_equal(result[1], 0) + result = admin(['period', 'update'], get_config_cluster()) + assert_equal(result[1], 0) + result = admin(['period', 'commit'], get_config_cluster()) + assert_equal(result[1], 0) + + # poll on topic_1 + result = 1 + while result != 0: + time.sleep(1) + result = admin(['topic', 'rm', '--topic', topic_name_1], get_config_cluster())[1] + + # topic stats + result = admin(['topic', 'stats', '--topic', topic_name], get_config_cluster()) + parsed_result = json.loads(result[0]) + assert_equal(parsed_result['Topic Stats']['Entries'], number_of_objects) + assert_equal(result[1], 0) + + # create more objects in the bucket (async) + client_threads = [] + start_time = time.time() + for i in range(number_of_objects): + key = bucket.new_key('key-'+str(i)) + content = str(os.urandom(1024*1024)) + thr = threading.Thread(target = set_contents_from_string, args=(key, content,)) + thr.start() + client_threads.append(thr) + [thr.join() for thr in client_threads] + time_diff = time.time() - start_time + print('average time for creation + async http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds') + + # topic stats + result = admin(['topic', 'stats', '--topic', topic_name], get_config_cluster()) + parsed_result = json.loads(result[0]) + assert_equal(parsed_result['Topic Stats']['Entries'], 2*number_of_objects) + assert_equal(result[1], 0) + + # start an http server in a separate thread + http_server = StreamingHTTPServer(host, http_port, num_workers=number_of_objects) + + delay = 30 + print('wait for '+str(delay)+'sec for the messages...') + time.sleep(delay) + + # topic stats + result = admin(['topic', 'stats', '--topic', topic_name], get_config_cluster()) + parsed_result = json.loads(result[0]) + assert_equal(parsed_result['Topic Stats']['Entries'], 0) + assert_equal(result[1], 0) + # verify events + keys = list(bucket.list()) + http_server.verify_s3_events(keys, exact_match=False) + + except Exception as e: + assert False, str(e) + finally: + # cleanup + s3_notification_conf.del_config() + topic_conf.del_config() + # delete objects from the bucket + client_threads = [] + for key in bucket.list(): + thr = threading.Thread(target = key.delete, args=()) + thr.start() + client_threads.append(thr) + [thr.join() for thr in client_threads] + # delete the bucket + conn.delete_bucket(bucket_name) + if http_server: + http_server.close() + + +@attr('data_path_v2_test') +def test_ps_s3_data_path_v2_migration(): + """ test data path v2 migration """ + conn = connection() + zonegroup = get_config_zonegroup() + + # create random port for the http server + host = get_ip() + http_port = random.randint(10000, 20000) + + # start an http server in a separate thread + number_of_objects = 10 + http_server = StreamingHTTPServer(host, http_port, num_workers=number_of_objects) + + # disable v2 notification + result = admin(['zonegroup', 'modify', '--disable-feature=notification_v2'], get_config_cluster()) + assert_equal(result[1], 0) + result = admin(['period', 'update'], get_config_cluster()) + assert_equal(result[1], 0) + result = admin(['period', 'commit'], get_config_cluster()) + assert_equal(result[1], 0) + + # create bucket + bucket_name = gen_bucket_name() + bucket = conn.create_bucket(bucket_name) + topic_name = bucket_name + TOPIC_SUFFIX + + # create s3 topic + endpoint_address = 'http://'+host+':'+str(http_port) + endpoint_args = 'push-endpoint='+endpoint_address + topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args) + topic_arn = topic_conf.set_config() + # create s3 notification + notification_name = bucket_name + NOTIFICATION_SUFFIX + topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn, + 'Events': [] + }] + + s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list) + response, status = s3_notification_conf.set_config() + assert_equal(status/100, 2) + + # create objects in the bucket (async) + client_threads = [] + start_time = time.time() + for i in range(number_of_objects): + key = bucket.new_key('key-'+str(i)) + content = str(os.urandom(1024*1024)) + thr = threading.Thread(target = set_contents_from_string, args=(key, content,)) + thr.start() + client_threads.append(thr) + [thr.join() for thr in client_threads] + time_diff = time.time() - start_time + print('average time for creation + http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds') + + try: + # verify events + keys = list(bucket.list()) + http_server.verify_s3_events(keys, exact_match=False) + + # create topic to poll on + topic_name_1 = topic_name + '_1' + topic_conf_1 = PSTopicS3(conn, topic_name_1, zonegroup, endpoint_args=endpoint_args) + + # enable v2 notification + result = admin(['zonegroup', 'modify', '--enable-feature=notification_v2'], get_config_cluster()) + assert_equal(result[1], 0) + result = admin(['period', 'update'], get_config_cluster()) + assert_equal(result[1], 0) + result = admin(['period', 'commit'], get_config_cluster()) + assert_equal(result[1], 0) + + + # poll on topic_1 + result = 1 + while result != 0: + time.sleep(1) + result = admin(['topic', 'rm', '--topic', topic_name_1], get_config_cluster())[1] + + + # create more objects in the bucket (async) + client_threads = [] + start_time = time.time() + for i in range(number_of_objects): + key = bucket.new_key('key-'+str(i)) + content = str(os.urandom(1024*1024)) + thr = threading.Thread(target = set_contents_from_string, args=(key, content,)) + thr.start() + client_threads.append(thr) + [thr.join() for thr in client_threads] + time_diff = time.time() - start_time + print('average time for creation + http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds') + + # verify events + keys = list(bucket.list()) + http_server.verify_s3_events(keys, exact_match=True) + + except Exception as e: + assert False, str(e) + finally: + # cleanup + s3_notification_conf.del_config() + topic_conf.del_config() + # delete objects from the bucket + client_threads = [] + for key in bucket.list(): + thr = threading.Thread(target = key.delete, args=()) + thr.start() + client_threads.append(thr) + [thr.join() for thr in client_threads] + # delete the bucket + conn.delete_bucket(bucket_name) + http_server.close() + + +@attr('data_path_v2_test') +def test_ps_s3_data_path_v2_large_migration(): + """ test data path v2 large migration """ + conn = connection() + connections_list = [] + connections_list.append(conn) + zonegroup = get_config_zonegroup() + tenants_list = [] + tenants_list.append('') + for i in ['1', '2']: + access_key = str(time.time()) + secret_key = str(time.time()) + uid = UID_PREFIX + str(time.time()) + tenant_id = 'kaboom_' + i + _, result = admin(['user', 'create', '--uid', uid, '--tenant', tenant_id, '--access-key', access_key, '--secret-key', secret_key, '--display-name', '"Super Man"'], get_config_cluster()) + assert_equal(result, 0) + tenants_list.append(tenant_id) + conn = S3Connection(aws_access_key_id=access_key, + aws_secret_access_key=secret_key, + is_secure=False, port=get_config_port(), host=get_config_host(), + calling_format='boto.s3.connection.OrdinaryCallingFormat') + connections_list.append(conn) + + # disable v2 notification + result = admin(['zonegroup', 'modify', '--disable-feature=notification_v2'], get_config_cluster()) + assert_equal(result[1], 0) + result = admin(['period', 'update'], get_config_cluster()) + assert_equal(result[1], 0) + result = admin(['period', 'commit'], get_config_cluster()) + assert_equal(result[1], 0) + + # create random port for the http server + host = get_ip() + http_port = random.randint(10000, 20000) + + # create s3 topic + buckets_list = [] + topics_conf_list = [] + s3_notification_conf_list = [] + num_of_s3_notifications = 110 + for conn in connections_list: + # create bucket + bucket_name = gen_bucket_name() + bucket = conn.create_bucket(bucket_name) + buckets_list.append(bucket) + topic_name = bucket_name + TOPIC_SUFFIX + # create s3 topic + endpoint_address = 'http://' + host + ':' + str(http_port) + endpoint_args = 'push-endpoint=' + endpoint_address + '&persistent=true' + topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args) + topics_conf_list.append(topic_conf) + topic_arn = topic_conf.set_config() + # create s3 110 notifications + s3_notification_list = [] + for i in range(num_of_s3_notifications): + notification_name = bucket_name + NOTIFICATION_SUFFIX + '_' + str(i + 1) + s3_notification_list.append({'Id': notification_name, 'TopicArn': topic_arn, + 'Events': [] + }) + + s3_notification_conf = PSNotificationS3(conn, bucket_name, s3_notification_list) + s3_notification_conf_list.append(s3_notification_conf) + response, status = s3_notification_conf.set_config() + assert_equal(status / 100, 2) + + # create topic to poll on + polling_topics_conf = [] + for conn, bucket in zip(connections_list, buckets_list): + topic_name = bucket.name + TOPIC_SUFFIX + '_1' + topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args) + polling_topics_conf.append(topic_conf) + + # enable v2 notification + result = admin(['zonegroup', 'modify', '--enable-feature=notification_v2'], get_config_cluster()) + assert_equal(result[1], 0) + result = admin(['period', 'update'], get_config_cluster()) + assert_equal(result[1], 0) + result = admin(['period', 'commit'], get_config_cluster()) + assert_equal(result[1], 0) + + # poll on topic_1 + for tenant, topic_conf in zip(tenants_list, polling_topics_conf): + while True: + if tenant == '': + result = admin(['topic', 'rm', '--topic', topic_conf.topic_name], get_config_cluster()) + else: + result = admin(['topic', 'rm', '--topic', topic_conf.topic_name, '--tenant', tenant], get_config_cluster()) + + if result[1] != 0: + print('migration in process... error: '+str(result[1])) + else: + break + + time.sleep(1) + + # check if we migrated all the topics + for tenant in tenants_list: + if tenant == '': + topics_result = admin(['topic', 'list'], get_config_cluster()) + else: + topics_result = admin(['topic', 'list', '--tenant', tenant], get_config_cluster()) + topics_json = json.loads(topics_result[0]) + assert_equal(len(topics_json['topics']), 1) + + # check if we migrated all the notifications + for tenant, bucket in zip(tenants_list, buckets_list): + if tenant == '': + result = admin(['notification', 'list', '--bucket', bucket.name], get_config_cluster()) + else: + result = admin(['notification', 'list', '--bucket', bucket.name, '--tenant', tenant], get_config_cluster()) + parsed_result = json.loads(result[0]) + assert_equal(len(parsed_result['notifications']), num_of_s3_notifications) + + # cleanup + for s3_notification_conf in s3_notification_conf_list: + s3_notification_conf.del_config() + for topic_conf in topics_conf_list: + topic_conf.del_config() + # delete the bucket + for conn, bucket in zip(connections_list, buckets_list): + conn.delete_bucket(bucket.name) + + +@attr('data_path_v2_test') +def test_ps_s3_data_path_v2_mixed_migration(): + """ test data path v2 mixed migration """ + conn = connection() + connections_list = [] + connections_list.append(conn) + zonegroup = get_config_zonegroup() + tenants_list = [] + tenants_list.append('') + + # make sure that we start at v2 + result = admin(['zonegroup', 'modify', '--enable-feature=notification_v2'], get_config_cluster()) + assert_equal(result[1], 0) + result = admin(['period', 'update'], get_config_cluster()) + assert_equal(result[1], 0) + result = admin(['period', 'commit'], get_config_cluster()) + assert_equal(result[1], 0) + + for i in ['1', '2']: + access_key = str(time.time()) + secret_key = str(time.time()) + uid = UID_PREFIX + str(time.time()) + tenant_id = 'kaboom_' + i + _, result = admin(['user', 'create', '--uid', uid, '--tenant', tenant_id, '--access-key', access_key, '--secret-key', secret_key, '--display-name', '"Super Man"'], get_config_cluster()) + assert_equal(result, 0) + tenants_list.append(tenant_id) + conn = S3Connection(aws_access_key_id=access_key, + aws_secret_access_key=secret_key, + is_secure=False, port=get_config_port(), host=get_config_host(), + calling_format='boto.s3.connection.OrdinaryCallingFormat') + connections_list.append(conn) + + # create random port for the http server + host = get_ip() + http_port = random.randint(10000, 20000) + + # create s3 topic + buckets_list = [] + topics_conf_list = [] + s3_notification_conf_list = [] + topic_arn_list = [] + created_version = '_created_v2' + for conn in connections_list: + # create bucket + bucket_name = gen_bucket_name() + bucket = conn.create_bucket(bucket_name) + buckets_list.append(bucket) + topic_name = bucket_name + TOPIC_SUFFIX + created_version + # create s3 topic + endpoint_address = 'http://' + host + ':' + str(http_port) + endpoint_args = 'push-endpoint=' + endpoint_address + '&persistent=true' + topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args) + topics_conf_list.append(topic_conf) + topic_arn = topic_conf.set_config() + topic_arn_list.append(topic_arn) + # create s3 notification + notification_name = bucket_name + NOTIFICATION_SUFFIX + created_version + s3_notification_list = [{'Id': notification_name, 'TopicArn': topic_arn, + 'Events': [] + }] + + s3_notification_conf = PSNotificationS3(conn, bucket_name, s3_notification_list) + s3_notification_conf_list.append(s3_notification_conf) + response, status = s3_notification_conf.set_config() + assert_equal(status / 100, 2) + + # disable v2 notification + result = admin(['zonegroup', 'modify', '--disable-feature=notification_v2'], get_config_cluster()) + assert_equal(result[1], 0) + result = admin(['period', 'update'], get_config_cluster()) + assert_equal(result[1], 0) + result = admin(['period', 'commit'], get_config_cluster()) + assert_equal(result[1], 0) + + # create s3 topic + created_version = '_created_v1' + for conn, bucket in zip(connections_list, buckets_list): + # create bucket + bucket_name = bucket.name + topic_name = bucket_name + TOPIC_SUFFIX + created_version + # create s3 topic + endpoint_address = 'http://' + host + ':' + str(http_port) + endpoint_args = 'push-endpoint=' + endpoint_address + '&persistent=true' + topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args) + topics_conf_list.append(topic_conf) + topic_arn = topic_conf.set_config() + # create s3 notification + notification_name = bucket_name + NOTIFICATION_SUFFIX + created_version + s3_notification_list = [{'Id': notification_name, 'TopicArn': topic_arn, + 'Events': [] + }] + + s3_notification_conf = PSNotificationS3(conn, bucket_name, s3_notification_list) + s3_notification_conf_list.append(s3_notification_conf) + response, status = s3_notification_conf.set_config() + assert_equal(status / 100, 2) + + # create topic to poll on + polling_topics_conf = [] + for conn, bucket in zip(connections_list, buckets_list): + topic_name = bucket.name + TOPIC_SUFFIX + '_1' + topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args) + polling_topics_conf.append(topic_conf) + + # enable v2 notification + result = admin(['zonegroup', 'modify', '--enable-feature=notification_v2'], get_config_cluster()) + assert_equal(result[1], 0) + result = admin(['period', 'update'], get_config_cluster()) + assert_equal(result[1], 0) + result = admin(['period', 'commit'], get_config_cluster()) + assert_equal(result[1], 0) + + # poll on topic_1 + for tenant, topic_conf in zip(tenants_list, polling_topics_conf): + while True: + if tenant == '': + result = admin(['topic', 'rm', '--topic', topic_conf.topic_name], get_config_cluster()) + else: + result = admin(['topic', 'rm', '--topic', topic_conf.topic_name, '--tenant', tenant], get_config_cluster()) + + if result[1] != 0: + print(result) + else: + break + + time.sleep(1) + + # check if we migrated all the topics + for tenant in tenants_list: + if tenant == '': + topics_result = admin(['topic', 'list'], get_config_cluster()) + else: + topics_result = admin(['topic', 'list', '--tenant', tenant], get_config_cluster()) + topics_json = json.loads(topics_result[0]) + assert_equal(len(topics_json['topics']), 2) + + # check if we migrated all the notifications + for tenant, bucket in zip(tenants_list, buckets_list): + if tenant == '': + notifications_result = admin(['notification', 'list', '--bucket', bucket.name], get_config_cluster()) + else: + notifications_result = admin(['notification', 'list', '--bucket', bucket.name, '--tenant', tenant], get_config_cluster()) + notifications_json = json.loads(notifications_result[0]) + assert_equal(len(notifications_json['notifications']), 2) + + # cleanup + for s3_notification_conf in s3_notification_conf_list: + s3_notification_conf.del_config() + for topic_conf in topics_conf_list: + topic_conf.del_config() + # delete the bucket + for conn, bucket in zip(connections_list, buckets_list): + conn.delete_bucket(bucket.name) + -- 2.39.5