]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
RGW/Rados: Migrate topics to data path v2
authorAli Masarwa <amasarwa@redhat.com>
Tue, 9 Jan 2024 21:51:54 +0000 (23:51 +0200)
committerYuval Lifshitz <ylifshit@ibm.com>
Tue, 2 Apr 2024 19:28:00 +0000 (19:28 +0000)
also add migration tests

Signed-off-by: Ali Masarwa <amasarwa@redhat.com>
src/rgw/CMakeLists.txt
src/rgw/driver/rados/rgw_notify.cc
src/rgw/driver/rados/rgw_rados.cc
src/rgw/driver/rados/rgw_rados.h
src/rgw/driver/rados/rgw_sal_rados.cc
src/rgw/driver/rados/rgw_sal_rados.h
src/rgw/driver/rados/topic_migration.cc [new file with mode: 0644]
src/rgw/driver/rados/topic_migration.h [new file with mode: 0644]
src/rgw/rgw_obj_types.h
src/test/rgw/bucket_notification/test_bn.py

index 1bf433cb395108b3fbdd5dd73fbce412d1eea3e1..fd656502a782461389d1eb76604dca644da203a8 100644 (file)
@@ -194,7 +194,8 @@ set(librgw_common_srcs
   driver/rados/rgw_user.cc
   driver/rados/rgw_zone.cc
   driver/rados/sync_fairness.cc
-  driver/rados/topic.cc)
+  driver/rados/topic.cc
+  driver/rados/topic_migration.cc)
 
 list(APPEND librgw_common_srcs
   driver/immutable_config/store.cc
index 3dc187367f0b8fdfe06eb27424c6d49e67b0ec52..e862a44766aacb1618cd138a4378a428b858f6f8 100644 (file)
@@ -1086,7 +1086,7 @@ int publish_reserve(const DoutPrefixProvider* dpp,
       rgw_pubsub_topic result;
       const RGWPubSub ps(res.store, res.user_tenant, site);
       auto ret =
-          ps.get_topic(res.dpp, topic_cfg.name, result, res.yield, nullptr);
+          ps.get_topic(res.dpp, topic_cfg.dest.arn_topic, result, res.yield, nullptr);
       if (ret < 0) {
         ldpp_dout(res.dpp, 1)
             << "INFO: failed to load topic: " << topic_cfg.name
index b74f2d0798d51c6192a90904770b39bc22111e55..c9da60eff9edab73d4536dc27f25e3f58a15f702 100644 (file)
@@ -77,6 +77,7 @@
 #include "rgw_realm_watcher.h"
 #include "rgw_reshard.h"
 #include "rgw_cr_rados.h"
+#include "topic_migration.h"
 
 #include "services/svc_zone.h"
 #include "services/svc_zone_utils.h"
@@ -1100,6 +1101,7 @@ void RGWRados::finalize()
 
   if (run_notification_thread) {
     rgw::notify::shutdown();
+    v1_topic_migration.stop();
   }
 }
 
@@ -1357,6 +1359,17 @@ int RGWRados::init_complete(const DoutPrefixProvider *dpp, optional_yield y)
     ret = rgw::notify::init(cct, driver, *svc.site, dpp);
     if (ret < 0 ) {
       ldpp_dout(dpp, 1) << "ERROR: failed to initialize notification manager" << dendl;
+      return ret;
+    }
+
+    using namespace rgw;
+    if (svc.site->is_meta_master() &&
+        all_zonegroups_support(*svc.site, zone_features::notification_v2)) {
+      spawn::spawn(v1_topic_migration, [this] (spawn::yield_context yield) {
+            DoutPrefix dpp{cct, dout_subsys, "v1 topic migration: "};
+            rgwrados::topic_migration::migrate(&dpp, driver, v1_topic_migration, yield);
+          });
+      v1_topic_migration.start(1);
     }
   }
 
index 3d7776b0fa0d04c087e95b2eb9fdad9f849806dd..f05b661b6fd22e39ac2ca65ddbe65fe0fa8944a2 100644 (file)
@@ -14,6 +14,7 @@
 #include "common/RefCountedObj.h"
 #include "common/ceph_time.h"
 #include "common/Timer.h"
+#include "common/async/context_pool.h"
 #include "rgw_common.h"
 #include "cls/rgw/cls_rgw_types.h"
 #include "cls/version/cls_version_types.h"
@@ -390,6 +391,8 @@ class RGWRados
   ceph::mutex meta_sync_thread_lock{ceph::make_mutex("meta_sync_thread_lock")};
   ceph::mutex data_sync_thread_lock{ceph::make_mutex("data_sync_thread_lock")};
 
+  ceph::async::io_context_pool v1_topic_migration;
+
   librados::IoCtx root_pool_ctx;      // .rgw
 
   ceph::mutex bucket_id_lock{ceph::make_mutex("rados_bucket_id")};
index 597f4f1ccf29f003ad2be545f242ae88faa6061f..7239e289b6e4337c0e2c7e4614c34d29e198c626 100644 (file)
@@ -79,7 +79,8 @@ namespace rgw::sal {
 // default number of entries to list with each bucket listing call
 // (use marker to bridge between calls)
 static constexpr size_t listing_max_entries = 1000;
-static std::string pubsub_oid_prefix = "pubsub.";
+const std::string pubsub_oid_prefix = "pubsub.";
+const std::string pubsub_bucket_oid_infix  = ".bucket.";
 
 static int drain_aio(std::list<librados::AioCompletion*>& handles)
 {
@@ -869,7 +870,7 @@ int RadosBucket::abort_multiparts(const DoutPrefixProvider* dpp,
 }
 
 std::string RadosBucket::topics_oid() const {
-  return pubsub_oid_prefix + get_tenant() + ".bucket." + get_name() + "/" + get_marker();
+  return pubsub_oid_prefix + get_tenant() + pubsub_bucket_oid_infix + get_name() + "/" + get_marker();
 }
 
 int RadosBucket::read_topics(rgw_pubsub_bucket_topics& notifications,
index 4e71045cda1e5a7fa00217e0b7bf36aee0533cd8..71f7a83a74cdecf0ec033f8b1800a0f3c1a0b65b 100644 (file)
@@ -33,6 +33,9 @@ namespace rgw { namespace sal {
 
 class RadosMultipartUpload;
 
+extern const std::string pubsub_oid_prefix; // v1 topic metadata prefix
+extern const std::string pubsub_bucket_oid_infix;  // v1 notification in-fix
+
 class RadosPlacementTier: public StorePlacementTier {
   RadosStore* store;
   RGWZoneGroupPlacementTier tier;
diff --git a/src/rgw/driver/rados/topic_migration.cc b/src/rgw/driver/rados/topic_migration.cc
new file mode 100644 (file)
index 0000000..c7dcfc3
--- /dev/null
@@ -0,0 +1,334 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab ft=cpp
+
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright contributors to the Ceph project
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include "topic_migration.h"
+#include "services/svc_zone.h"
+#include "rgw_sal_rados.h"
+
+namespace rgwrados::topic_migration {
+
+namespace {
+
+int deconstruct_topics_oid(const std::string& bucket_topics_oid, std::string& tenant, std::string& bucket_name,
+                           std::string& marker, const DoutPrefixProvider* dpp) {
+  auto pos = bucket_topics_oid.find(rgw::sal::pubsub_bucket_oid_infix);
+  if (pos == std::string::npos) {
+    ldpp_dout(dpp, 1) << "ERROR: bucket_topics_oid:" << bucket_topics_oid << " doesn't contain " << rgw::sal::pubsub_bucket_oid_infix
+                      << " after tenant name!" << dendl;
+    return -EINVAL;
+  }
+  const size_t prefix_len = rgw::sal::pubsub_oid_prefix.size();
+  tenant = bucket_topics_oid.substr(prefix_len, pos - prefix_len);
+
+  auto bucket_name_marker = bucket_topics_oid.substr(pos + rgw::sal::pubsub_bucket_oid_infix.size());
+  pos = bucket_name_marker.find('/');
+  if (pos == std::string::npos) {
+    ldpp_dout(dpp, 1) << "ERROR: bucket_topics_oid:" << bucket_topics_oid << " doesn't contain / after bucket name!" << dendl;
+    return -EINVAL;
+  }
+  bucket_name = bucket_name_marker.substr(0, pos);
+  marker = bucket_name_marker.substr(pos + 1);
+
+  return 0;
+}
+
+// migrate v1 notification metadata for a single bucket
+int migrate_notification(const DoutPrefixProvider* dpp, optional_yield y,
+                         rgw::sal::RadosStore* driver, const rgw_raw_obj& obj)
+{
+  // parse bucket name and marker of out "pubsub.{tenant}.bucket.{name}/{marker}"
+  auto* rados = driver->getRados()->get_rados_handle();
+  std::string tenant;
+  std::string bucket_name;
+  std::string marker;
+  int r = deconstruct_topics_oid(obj.oid, tenant, bucket_name, marker, dpp);
+  if (r < 0) {
+    const std::string s = fmt::format("failed to read tenant, bucket name and marker from: {}. error: {}. {}",
+        obj.to_str(), cpp_strerror(r), "expected format pubsub.{tenant}.bucket.{name}/{marker}!");
+    ldpp_dout(dpp, 1) << "ERROR: " << s << dendl;
+    rgw_clog_warn(rados, s);
+    return r;
+  }
+
+  // migrate the notifications
+  rgw_pubsub_bucket_topics v1_bucket_topics;
+  rgw_bucket rgw_bucket_info(tenant, bucket_name);
+  rgw_bucket_info.marker = marker;
+  rgw::sal::RadosBucket rados_bucket(driver, rgw_bucket_info);
+  RGWObjVersionTracker bucket_topics_objv;
+  r = rados_bucket.read_topics(v1_bucket_topics, &bucket_topics_objv, y, dpp);
+  if (r == -ENOENT) {
+    return 0; // ok, someone else already migrated
+  }
+  if (r < 0) {
+    const std::string s = fmt::format("failed to read v1 bucket notifications from: {}. error: {}", 
+        obj.to_str(), cpp_strerror(r));
+    ldpp_dout(dpp, 1) << "ERROR: " << s << dendl;
+    rgw_clog_warn(rados, s);
+    return r;
+  }
+
+  if (v1_bucket_topics.topics.size() == 0) {
+    ldpp_dout(dpp, 20) << "INFO: v1 notifications object is empty, nothing to migrate" << dendl;
+    // delete v1 notification obj with Bucket::remove_topics()
+    r = rados_bucket.remove_topics(&bucket_topics_objv, y, dpp);
+    if (r == -ECANCELED || r == -ENOENT) {
+      ldpp_dout(dpp, 20) << "INFO: v1 notifications object: " << obj.to_str() << " already migrated" << dendl;
+      return 0; // ok, someone else already migrated
+    }
+    if (r < 0) {
+      const std::string s = fmt::format("failed to remove migrated v1 bucket notifications obj: {}. error: {}",
+          obj.to_str(), cpp_strerror(-r));
+      ldpp_dout(dpp, 1) << "ERROR: " << s << dendl;
+      rgw_clog_warn(rados, s);
+      return r;
+    }
+    return 0;
+  }
+
+  // in a for-loop that retries ECANCELED errors:
+  // {
+  // load the corresponding bucket by name
+  // break if marker doesn't match loaded bucket's
+  // merge with existing RGW_ATTR_BUCKET_NOTIFICATION topics (don't override existing v2)
+  // write RGW_ATTR_BUCKET_NOTIFICATION xattr
+  // }
+  std::unique_ptr<rgw::sal::Bucket> bucket;
+  r = -ECANCELED;
+  for (auto i = 0u; i < 15u && r == -ECANCELED; ++i) {
+    r = driver->load_bucket(dpp, rgw_bucket_info, &bucket, y);
+    if (r == -ENOENT) {
+      break; // bucket is deleted, we should delete the v1 notification
+    }
+    if (r < 0) {
+      const std::string s = fmt::format("failed to load the bucket from: {}. error: {}",
+          obj.to_str(), cpp_strerror(r));
+      ldpp_dout(dpp, 1) << "ERROR: " << s << dendl;
+      rgw_clog_warn(rados, s);
+      return r;
+    }
+
+    if (bucket->get_marker() != marker) {
+      break;
+    }
+
+    rgw::sal::Attrs& attrs = bucket->get_attrs();
+
+    rgw_pubsub_bucket_topics v2_bucket_topics;
+    if (const auto iter = attrs.find(RGW_ATTR_BUCKET_NOTIFICATION); iter != attrs.end()) {
+      // bucket notification v2 already exists
+      try {
+        const auto& bl = iter->second;
+        auto biter = bl.cbegin();
+        v2_bucket_topics.decode(biter);
+      } catch (buffer::error& err) {
+        const std::string s = fmt::format("failed to decode v2 bucket notifications of bucket: {}. error: {}",
+            bucket->get_name(), err.what());
+        ldpp_dout(dpp, 1) << "ERROR: " << s << dendl;
+        rgw_clog_warn(rados, s);
+        return -EIO;
+      }
+    }
+    const auto original_size = v2_bucket_topics.topics.size();
+    v2_bucket_topics.topics.merge(v1_bucket_topics.topics);
+    if (original_size == v2_bucket_topics.topics.size()) {
+      // nothing changed after the merge
+      break;
+    }
+    bufferlist bl;
+    v2_bucket_topics.encode(bl);
+    attrs[RGW_ATTR_BUCKET_NOTIFICATION] = std::move(bl);
+
+    r = bucket->merge_and_store_attrs(dpp, attrs, y);
+    if (r != -ECANCELED && r < 0) {
+      const std::string s = fmt::format("failed writing migrated notifications to bucket: {}. error: {}", 
+          bucket->get_name(), cpp_strerror(-r));
+      ldpp_dout(dpp, 1) << "ERROR: " << s << dendl;
+      rgw_clog_warn(rados, s);
+      return r;
+    }
+  }
+  if (r == -ECANCELED) {
+    // we exhausted the 15 retries
+    ldpp_dout(dpp, 5) << "WARNING: giving up on writing migrated notifications to bucket: " << bucket->get_name() <<
+      ". will retry later" << dendl;
+    return r;
+  }
+
+  // delete v1 notification obj with Bucket::remove_topics()
+  r = rados_bucket.remove_topics(&bucket_topics_objv, y, dpp);
+  if (r == -ECANCELED || r == -ENOENT) {
+    ldpp_dout(dpp, 20) << "INFO: v1 notifications object: " << obj.to_str() << " already removed" << dendl;
+    return 0; // ok, someone else already migrated
+  }
+  if (r < 0) {
+    const std::string s = fmt::format("failed to remove migrated v1 bucket notifications obj: {}. error: {}",
+        obj.to_str(), cpp_strerror(-r));
+    ldpp_dout(dpp, 1) << "ERROR: " << s << dendl;
+    rgw_clog_warn(rados, s);
+    return r;
+  }
+
+  return 0;
+}
+
+// migrate topics for a given tenant
+int migrate_topics(const DoutPrefixProvider* dpp, optional_yield y,
+                   rgw::sal::RadosStore* driver,
+                   const rgw_raw_obj& topics_obj)
+{
+  // parse tenant name out of topics_obj "pubsub.{tenant}"
+  auto* rados = driver->getRados()->get_rados_handle();
+  std::string tenant;
+  const auto& topics_obj_oid = topics_obj.oid;
+  if (auto pos = topics_obj_oid.find(rgw::sal::pubsub_oid_prefix); pos != std::string::npos) {
+    tenant = topics_obj_oid.substr(std::string(rgw::sal::pubsub_oid_prefix).size());
+  } else {
+    const std::string s = fmt::format("failed to read tenant from name from oid: {}. error: {}",
+        topics_obj_oid, cpp_strerror(-EINVAL));
+    ldpp_dout(dpp, 1) << "ERROR: " << s << dendl;
+    rgw_clog_warn(rados, s);
+    return -EINVAL;
+  }
+
+  // migrate the topics
+  rgw_pubsub_topics topics;
+  RGWObjVersionTracker topics_objv;
+  int r = driver->read_topics(tenant, topics, &topics_objv, y, dpp);
+  if (r == -ENOENT) {
+    ldpp_dout(dpp, 20) << "INFO: v1 topics object: " << topics_obj.to_str() << " does not exists. already migrated" << dendl;
+    return 0; // ok, someone else already migrated
+  }
+  if (r < 0) {
+    const std::string s = fmt::format("failed to read v1 topics from: {}. error: {}",
+        topics_obj.to_str(), cpp_strerror(-r));
+    ldpp_dout(dpp, 1) << "ERROR: " << s << dendl;
+    rgw_clog_warn(rados, s);
+    return r;
+  }
+
+  constexpr bool exclusive = true; // don't overwrite any existing v2 metadata
+  for (const auto& [name, topic] : topics.topics) {
+    if (topic.name != topic.dest.arn_topic) {
+      ldpp_dout(dpp, 20) << "INFO: auto-generated topic: " << topic.name << " will not be migrated" << dendl;
+      continue;
+    }
+    // write the v2 topic
+    RGWObjVersionTracker objv;
+    objv.generate_new_write_ver(dpp->get_cct());
+    r = driver->write_topic_v2(topic, exclusive, objv, y, dpp);
+    if (r == -EEXIST) {
+      ldpp_dout(dpp, 20) << "INFO: v1 topics object: " << topics_obj.to_str() << " already migrated. no need to write v2 object" << dendl;
+      continue; // ok, someone else already migrated
+    }
+    if (r < 0) {
+      const std::string s = fmt::format("v1 topic migration for: {}.  failed with: {}",
+          topic.name, cpp_strerror(r));
+      ldpp_dout(dpp, 1) << "ERROR: " << s << dendl;
+      rgw_clog_warn(rados, s);
+      return r;
+    }
+  }
+
+  // remove the v1 topics metadata (this destroys the lock too)
+  r = driver->remove_topics(tenant, &topics_objv, y, dpp);
+  if (r == -ECANCELED || r == -ENOENT) {
+    ldpp_dout(dpp, 20) << "INFO: v1 topics object: " << topics_obj.to_str() << " already migrated. no need to remove" << dendl;
+    return 0; // ok, someone else already migrated
+  }
+  if (r < 0) {
+    const std::string s = fmt::format("failed to remove migrated v1 topics obj: {}. error: {} ",
+        topics_obj.to_str(), cpp_strerror(r));
+    ldpp_dout(dpp, 1) << "ERROR: " << s << dendl;
+    rgw_clog_warn(rados, s);
+    return r;
+  }
+  return r;
+}
+
+} // anonymous namespace
+
+int migrate(const DoutPrefixProvider* dpp,
+            rgw::sal::RadosStore* driver,
+            boost::asio::io_context& context,
+            spawn::yield_context yield)
+{
+  auto y = optional_yield{context, yield};
+
+  ldpp_dout(dpp, 1) << "starting v1 topic migration.." << dendl;
+
+  librados::Rados* rados = driver->getRados()->get_rados_handle();
+  const rgw_pool& pool = driver->svc()->zone->get_zone_params().log_pool;
+  librados::IoCtx ioctx;
+  int r = rgw_init_ioctx(dpp, rados, pool, ioctx);
+  if (r < 0) {
+    ldpp_dout(dpp, 1) << "failed to initialize log pool for listing with: "
+        << cpp_strerror(r) << dendl;
+    return r;
+  }
+
+  // loop over all objects with oid prefix "pubsub."
+  auto filter = rgw::AccessListFilterPrefix(rgw::sal::pubsub_oid_prefix);
+  constexpr uint32_t max = 100;
+  std::string marker;
+  bool truncated = false;
+
+  std::vector<std::string> oids;
+  std::vector<std::string> topics_oid;
+  do {
+    oids.clear();
+    r = rgw_list_pool(dpp, ioctx, max, filter, marker, &oids, &truncated);
+    if (r == -ENOENT) {
+      r = 0;
+      break;
+    }
+    if (r < 0) {
+      ldpp_dout(dpp, 1) << "failed to list v1 topic metadata with: "
+          << cpp_strerror(r) << dendl;
+      return r;
+    }
+
+    std::string msg;
+    for (const std::string& oid : oids) {
+      if (oid.find(rgw::sal::pubsub_bucket_oid_infix) != oid.npos) {
+        const auto obj = rgw_raw_obj{pool, oid};
+        ldpp_dout(dpp, 4) << "migrating v1 bucket notifications " << oid << dendl;
+        r = migrate_notification(dpp, y, driver, obj);
+        ldpp_dout(dpp, 4) << "migrating v1 bucket notifications " << oid << " completed with: "
+                          << ((r == 0)? "successful": cpp_strerror(r)) << dendl;
+      } else {
+        // topics will be migrated after we complete migrating the notifications
+        topics_oid.push_back(oid);
+      }
+    }
+    if (!oids.empty()) {
+      marker = oids.back(); // update marker for next listing
+    }
+  } while (truncated);
+
+
+  for (const std::string& oid : topics_oid) {
+    const auto obj = rgw_raw_obj{pool, oid};
+    ldpp_dout(dpp, 4) << "migrating v1 topics " << oid << dendl;
+    r = migrate_topics(dpp, y, driver, obj);
+    ldpp_dout(dpp, 4) << "migrating v1 topics " << oid << " completed with: "
+                      << ((r == 0) ? "successful" : cpp_strerror(r)) << dendl;
+  }
+
+  ldpp_dout(dpp, 1) << "finished v1 topic migration" << dendl;
+  return 0;
+}
+
+} // rgwrados::topic_migration
diff --git a/src/rgw/driver/rados/topic_migration.h b/src/rgw/driver/rados/topic_migration.h
new file mode 100644 (file)
index 0000000..9545fd6
--- /dev/null
@@ -0,0 +1,34 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab ft=cpp
+
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright contributors to the Ceph project
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#pragma once
+
+#include <boost/asio/io_context.hpp>
+#include <spawn/spawn.hpp>
+
+class DoutPrefixProvider;
+namespace rgw::sal { class RadosStore; }
+
+// the squid release changes the format of topic/notification metadata. once the
+// notification_v2 feature gets enabled, this migration logic runs on startup to
+// convert all v1 metadata to the v2 format
+namespace rgwrados::topic_migration {
+
+int migrate(const DoutPrefixProvider* dpp,
+            rgw::sal::RadosStore* driver,
+            boost::asio::io_context& context,
+            spawn::yield_context yield);
+
+} // rgwrados::topic_migration
index a092e5ccdab69f498834a0a2aaf405d00aa680e3..5dac66086e6bbaf197176f69556885f50680977e 100644 (file)
@@ -477,6 +477,10 @@ struct rgw_raw_obj {
   void dump(Formatter *f) const;
   static void generate_test_instances(std::list<rgw_raw_obj*>& o);
   void decode_json(JSONObj *obj);
+
+  inline std::string to_str() const {
+    return pool.to_str() + ":" + oid;
+  }
 };
 WRITE_CLASS_ENCODER(rgw_raw_obj)
 
index c9049dfd1f8faac6fa9b1c075b5308e1c8cf022f..c10c413a2a7db5cf68b31c406d5c1f196e81da73 100644 (file)
@@ -4690,3 +4690,540 @@ def test_ps_s3_notification_push_kafka_security_ssl_sasl_scram():
 def test_ps_s3_notification_push_kafka_security_sasl_scram():
     kafka_security('SASL_PLAINTEXT', mechanism='SCRAM-SHA-256')
 
+
+@attr('data_path_v2_test')
+def test_persistent_ps_s3_data_path_v2_migration():
+    """ test data path v2 persistent migration """
+    conn = connection()
+    zonegroup = get_config_zonegroup()
+
+    # create random port for the http server
+    host = get_ip()
+    http_port = random.randint(10000, 20000)
+
+    # disable v2 notification
+    result = admin(['zonegroup', 'modify', '--disable-feature=notification_v2'], get_config_cluster())
+    assert_equal(result[1], 0)
+    result = admin(['period', 'update'], get_config_cluster())
+    assert_equal(result[1], 0)
+    result = admin(['period', 'commit'], get_config_cluster())
+    assert_equal(result[1], 0)
+
+    # create bucket
+    bucket_name = gen_bucket_name()
+    bucket = conn.create_bucket(bucket_name)
+    topic_name = bucket_name + TOPIC_SUFFIX
+
+    # create s3 topic
+    endpoint_address = 'http://'+host+':'+str(http_port)
+    endpoint_args = 'push-endpoint='+endpoint_address+'&persistent=true'
+    topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
+    topic_arn = topic_conf.set_config()
+    # create s3 notification
+    notification_name = bucket_name + NOTIFICATION_SUFFIX
+    topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn,
+                        'Events': []
+                        }]
+
+    s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
+    response, status = s3_notification_conf.set_config()
+    assert_equal(status/100, 2)
+
+    # topic stats
+    result = admin(['topic', 'stats', '--topic', topic_name], get_config_cluster())
+    parsed_result = json.loads(result[0])
+    assert_equal(parsed_result['Topic Stats']['Entries'], 0)
+    assert_equal(result[1], 0)
+
+    # create objects in the bucket (async)
+    number_of_objects = 10
+    client_threads = []
+    start_time = time.time()
+    for i in range(number_of_objects):
+        key = bucket.new_key('key-'+str(i))
+        content = str(os.urandom(1024*1024))
+        thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
+        thr.start()
+        client_threads.append(thr)
+    [thr.join() for thr in client_threads]
+    time_diff = time.time() - start_time
+    print('average time for creation + async http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
+
+    http_server = None
+    try:
+        # topic stats
+        result = admin(['topic', 'stats', '--topic', topic_name], get_config_cluster())
+        parsed_result = json.loads(result[0])
+        assert_equal(parsed_result['Topic Stats']['Entries'], number_of_objects)
+        assert_equal(result[1], 0)
+
+        # create topic to poll on
+        topic_name_1 = topic_name + '_1'
+        topic_conf_1 = PSTopicS3(conn, topic_name_1, zonegroup, endpoint_args=endpoint_args)
+
+        # enable v2 notification
+        result = admin(['zonegroup', 'modify', '--enable-feature=notification_v2'], get_config_cluster())
+        assert_equal(result[1], 0)
+        result = admin(['period', 'update'], get_config_cluster())
+        assert_equal(result[1], 0)
+        result = admin(['period', 'commit'], get_config_cluster())
+        assert_equal(result[1], 0)
+
+        # poll on topic_1
+        result = 1
+        while result != 0:
+            time.sleep(1)
+            result = admin(['topic', 'rm', '--topic', topic_name_1], get_config_cluster())[1]
+
+        # topic stats
+        result = admin(['topic', 'stats', '--topic', topic_name], get_config_cluster())
+        parsed_result = json.loads(result[0])
+        assert_equal(parsed_result['Topic Stats']['Entries'], number_of_objects)
+        assert_equal(result[1], 0)
+
+        # create more objects in the bucket (async)
+        client_threads = []
+        start_time = time.time()
+        for i in range(number_of_objects):
+            key = bucket.new_key('key-'+str(i))
+            content = str(os.urandom(1024*1024))
+            thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
+            thr.start()
+            client_threads.append(thr)
+        [thr.join() for thr in client_threads]
+        time_diff = time.time() - start_time
+        print('average time for creation + async http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
+
+        # topic stats
+        result = admin(['topic', 'stats', '--topic', topic_name], get_config_cluster())
+        parsed_result = json.loads(result[0])
+        assert_equal(parsed_result['Topic Stats']['Entries'], 2*number_of_objects)
+        assert_equal(result[1], 0)
+
+        # start an http server in a separate thread
+        http_server = StreamingHTTPServer(host, http_port, num_workers=number_of_objects)
+
+        delay = 30
+        print('wait for '+str(delay)+'sec for the messages...')
+        time.sleep(delay)
+
+        # topic stats
+        result = admin(['topic', 'stats', '--topic', topic_name], get_config_cluster())
+        parsed_result = json.loads(result[0])
+        assert_equal(parsed_result['Topic Stats']['Entries'], 0)
+        assert_equal(result[1], 0)
+        # verify events
+        keys = list(bucket.list())
+        http_server.verify_s3_events(keys, exact_match=False)
+
+    except Exception as e:
+        assert False, str(e)
+    finally:
+        # cleanup
+        s3_notification_conf.del_config()
+        topic_conf.del_config()
+        # delete objects from the bucket
+        client_threads = []
+        for key in bucket.list():
+            thr = threading.Thread(target = key.delete, args=())
+            thr.start()
+            client_threads.append(thr)
+        [thr.join() for thr in client_threads]
+        # delete the bucket
+        conn.delete_bucket(bucket_name)
+        if http_server:
+            http_server.close()
+
+
+@attr('data_path_v2_test')
+def test_ps_s3_data_path_v2_migration():
+    """ test data path v2 migration """
+    conn = connection()
+    zonegroup = get_config_zonegroup()
+
+    # create random port for the http server
+    host = get_ip()
+    http_port = random.randint(10000, 20000)
+
+    # start an http server in a separate thread
+    number_of_objects = 10
+    http_server = StreamingHTTPServer(host, http_port, num_workers=number_of_objects)
+
+    # disable v2 notification
+    result = admin(['zonegroup', 'modify', '--disable-feature=notification_v2'], get_config_cluster())
+    assert_equal(result[1], 0)
+    result = admin(['period', 'update'], get_config_cluster())
+    assert_equal(result[1], 0)
+    result = admin(['period', 'commit'], get_config_cluster())
+    assert_equal(result[1], 0)
+
+    # create bucket
+    bucket_name = gen_bucket_name()
+    bucket = conn.create_bucket(bucket_name)
+    topic_name = bucket_name + TOPIC_SUFFIX
+
+    # create s3 topic
+    endpoint_address = 'http://'+host+':'+str(http_port)
+    endpoint_args = 'push-endpoint='+endpoint_address
+    topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
+    topic_arn = topic_conf.set_config()
+    # create s3 notification
+    notification_name = bucket_name + NOTIFICATION_SUFFIX
+    topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn,
+                        'Events': []
+                        }]
+
+    s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
+    response, status = s3_notification_conf.set_config()
+    assert_equal(status/100, 2)
+
+    # create objects in the bucket (async)
+    client_threads = []
+    start_time = time.time()
+    for i in range(number_of_objects):
+        key = bucket.new_key('key-'+str(i))
+        content = str(os.urandom(1024*1024))
+        thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
+        thr.start()
+        client_threads.append(thr)
+    [thr.join() for thr in client_threads]
+    time_diff = time.time() - start_time
+    print('average time for creation + http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
+
+    try:
+        # verify events
+        keys = list(bucket.list())
+        http_server.verify_s3_events(keys, exact_match=False)
+
+        # create topic to poll on
+        topic_name_1 = topic_name + '_1'
+        topic_conf_1 = PSTopicS3(conn, topic_name_1, zonegroup, endpoint_args=endpoint_args)
+
+        # enable v2 notification
+        result = admin(['zonegroup', 'modify', '--enable-feature=notification_v2'], get_config_cluster())
+        assert_equal(result[1], 0)
+        result = admin(['period', 'update'], get_config_cluster())
+        assert_equal(result[1], 0)
+        result = admin(['period', 'commit'], get_config_cluster())
+        assert_equal(result[1], 0)
+
+
+        # poll on topic_1
+        result = 1
+        while result != 0:
+            time.sleep(1)
+            result = admin(['topic', 'rm', '--topic', topic_name_1], get_config_cluster())[1]
+
+
+        # create more objects in the bucket (async)
+        client_threads = []
+        start_time = time.time()
+        for i in range(number_of_objects):
+            key = bucket.new_key('key-'+str(i))
+            content = str(os.urandom(1024*1024))
+            thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
+            thr.start()
+            client_threads.append(thr)
+        [thr.join() for thr in client_threads]
+        time_diff = time.time() - start_time
+        print('average time for creation + http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
+
+        # verify events
+        keys = list(bucket.list())
+        http_server.verify_s3_events(keys, exact_match=True)
+
+    except Exception as e:
+        assert False, str(e)
+    finally:
+        # cleanup
+        s3_notification_conf.del_config()
+        topic_conf.del_config()
+        # delete objects from the bucket
+        client_threads = []
+        for key in bucket.list():
+            thr = threading.Thread(target = key.delete, args=())
+            thr.start()
+            client_threads.append(thr)
+        [thr.join() for thr in client_threads]
+        # delete the bucket
+        conn.delete_bucket(bucket_name)
+        http_server.close()
+
+
+@attr('data_path_v2_test')
+def test_ps_s3_data_path_v2_large_migration():
+    """ test data path v2 large migration """
+    conn = connection()
+    connections_list = []
+    connections_list.append(conn)
+    zonegroup = get_config_zonegroup()
+    tenants_list = []
+    tenants_list.append('')
+    for i in ['1', '2']:
+        access_key = str(time.time())
+        secret_key = str(time.time())
+        uid = UID_PREFIX + str(time.time())
+        tenant_id = 'kaboom_' + i
+        _, result = admin(['user', 'create', '--uid', uid, '--tenant', tenant_id, '--access-key', access_key, '--secret-key', secret_key, '--display-name', '"Super Man"'], get_config_cluster())
+        assert_equal(result, 0)
+        tenants_list.append(tenant_id)
+        conn = S3Connection(aws_access_key_id=access_key,
+                            aws_secret_access_key=secret_key,
+                            is_secure=False, port=get_config_port(), host=get_config_host(),
+                            calling_format='boto.s3.connection.OrdinaryCallingFormat')
+        connections_list.append(conn)
+
+    # disable v2 notification
+    result = admin(['zonegroup', 'modify', '--disable-feature=notification_v2'], get_config_cluster())
+    assert_equal(result[1], 0)
+    result = admin(['period', 'update'], get_config_cluster())
+    assert_equal(result[1], 0)
+    result = admin(['period', 'commit'], get_config_cluster())
+    assert_equal(result[1], 0)
+
+    # create random port for the http server
+    host = get_ip()
+    http_port = random.randint(10000, 20000)
+
+    # create s3 topic
+    buckets_list = []
+    topics_conf_list = []
+    s3_notification_conf_list = []
+    num_of_s3_notifications = 110
+    for conn in connections_list:
+        # create bucket
+        bucket_name = gen_bucket_name()
+        bucket = conn.create_bucket(bucket_name)
+        buckets_list.append(bucket)
+        topic_name = bucket_name + TOPIC_SUFFIX
+        # create s3 topic
+        endpoint_address = 'http://' + host + ':' + str(http_port)
+        endpoint_args = 'push-endpoint=' + endpoint_address + '&persistent=true'
+        topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
+        topics_conf_list.append(topic_conf)
+        topic_arn = topic_conf.set_config()
+        # create s3 110 notifications
+        s3_notification_list = []
+        for i in range(num_of_s3_notifications):
+            notification_name = bucket_name + NOTIFICATION_SUFFIX + '_' + str(i + 1)
+            s3_notification_list.append({'Id': notification_name, 'TopicArn': topic_arn,
+                                    'Events': []
+                                    })
+
+        s3_notification_conf = PSNotificationS3(conn, bucket_name, s3_notification_list)
+        s3_notification_conf_list.append(s3_notification_conf)
+        response, status = s3_notification_conf.set_config()
+        assert_equal(status / 100, 2)
+
+    # create topic to poll on
+    polling_topics_conf = []
+    for conn, bucket in zip(connections_list, buckets_list):
+        topic_name = bucket.name + TOPIC_SUFFIX + '_1'
+        topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
+        polling_topics_conf.append(topic_conf)
+
+    # enable v2 notification
+    result = admin(['zonegroup', 'modify', '--enable-feature=notification_v2'], get_config_cluster())
+    assert_equal(result[1], 0)
+    result = admin(['period', 'update'], get_config_cluster())
+    assert_equal(result[1], 0)
+    result = admin(['period', 'commit'], get_config_cluster())
+    assert_equal(result[1], 0)
+
+    # poll on topic_1
+    for tenant, topic_conf in zip(tenants_list, polling_topics_conf):
+        while True:
+            if tenant == '':
+                result = admin(['topic', 'rm', '--topic', topic_conf.topic_name], get_config_cluster())
+            else:
+                result = admin(['topic', 'rm', '--topic', topic_conf.topic_name, '--tenant', tenant], get_config_cluster())
+
+            if result[1] != 0:
+                print('migration in process... error: '+str(result[1]))
+            else:
+                break
+
+            time.sleep(1)
+
+    # check if we migrated all the topics
+    for tenant in tenants_list:
+        if tenant == '':
+            topics_result = admin(['topic', 'list'], get_config_cluster())
+        else:
+            topics_result = admin(['topic', 'list', '--tenant', tenant], get_config_cluster())
+        topics_json = json.loads(topics_result[0])
+        assert_equal(len(topics_json['topics']), 1)
+
+    # check if we migrated all the notifications
+    for tenant, bucket in zip(tenants_list, buckets_list):
+        if tenant == '':
+            result = admin(['notification', 'list', '--bucket', bucket.name], get_config_cluster())
+        else:
+            result = admin(['notification', 'list', '--bucket', bucket.name, '--tenant', tenant], get_config_cluster())
+        parsed_result = json.loads(result[0])
+        assert_equal(len(parsed_result['notifications']), num_of_s3_notifications)
+
+    # cleanup
+    for s3_notification_conf in s3_notification_conf_list:
+        s3_notification_conf.del_config()
+    for topic_conf in topics_conf_list:
+        topic_conf.del_config()
+    # delete the bucket
+    for conn, bucket in zip(connections_list, buckets_list):
+        conn.delete_bucket(bucket.name)
+
+
+@attr('data_path_v2_test')
+def test_ps_s3_data_path_v2_mixed_migration():
+    """ test data path v2 mixed migration """
+    conn = connection()
+    connections_list = []
+    connections_list.append(conn)
+    zonegroup = get_config_zonegroup()
+    tenants_list = []
+    tenants_list.append('')
+    
+    # make sure that we start at v2
+    result = admin(['zonegroup', 'modify', '--enable-feature=notification_v2'], get_config_cluster())
+    assert_equal(result[1], 0)
+    result = admin(['period', 'update'], get_config_cluster())
+    assert_equal(result[1], 0)
+    result = admin(['period', 'commit'], get_config_cluster())
+    assert_equal(result[1], 0)
+
+    for i in ['1', '2']:
+        access_key = str(time.time())
+        secret_key = str(time.time())
+        uid = UID_PREFIX + str(time.time())
+        tenant_id = 'kaboom_' + i
+        _, result = admin(['user', 'create', '--uid', uid, '--tenant', tenant_id, '--access-key', access_key, '--secret-key', secret_key, '--display-name', '"Super Man"'], get_config_cluster())
+        assert_equal(result, 0)
+        tenants_list.append(tenant_id)
+        conn = S3Connection(aws_access_key_id=access_key,
+                            aws_secret_access_key=secret_key,
+                            is_secure=False, port=get_config_port(), host=get_config_host(),
+                            calling_format='boto.s3.connection.OrdinaryCallingFormat')
+        connections_list.append(conn)
+
+    # create random port for the http server
+    host = get_ip()
+    http_port = random.randint(10000, 20000)
+
+    # create s3 topic
+    buckets_list = []
+    topics_conf_list = []
+    s3_notification_conf_list = []
+    topic_arn_list = []
+    created_version = '_created_v2'
+    for conn in connections_list:
+        # create bucket
+        bucket_name = gen_bucket_name()
+        bucket = conn.create_bucket(bucket_name)
+        buckets_list.append(bucket)
+        topic_name = bucket_name + TOPIC_SUFFIX + created_version
+        # create s3 topic
+        endpoint_address = 'http://' + host + ':' + str(http_port)
+        endpoint_args = 'push-endpoint=' + endpoint_address + '&persistent=true'
+        topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
+        topics_conf_list.append(topic_conf)
+        topic_arn = topic_conf.set_config()
+        topic_arn_list.append(topic_arn)
+        # create s3 notification
+        notification_name = bucket_name + NOTIFICATION_SUFFIX + created_version
+        s3_notification_list = [{'Id': notification_name, 'TopicArn': topic_arn,
+                                 'Events': []
+                                 }]
+
+        s3_notification_conf = PSNotificationS3(conn, bucket_name, s3_notification_list)
+        s3_notification_conf_list.append(s3_notification_conf)
+        response, status = s3_notification_conf.set_config()
+        assert_equal(status / 100, 2)
+
+    # disable v2 notification
+    result = admin(['zonegroup', 'modify', '--disable-feature=notification_v2'], get_config_cluster())
+    assert_equal(result[1], 0)
+    result = admin(['period', 'update'], get_config_cluster())
+    assert_equal(result[1], 0)
+    result = admin(['period', 'commit'], get_config_cluster())
+    assert_equal(result[1], 0)
+
+    # create s3 topic
+    created_version = '_created_v1'
+    for conn, bucket in zip(connections_list, buckets_list):
+        # create bucket
+        bucket_name = bucket.name
+        topic_name = bucket_name + TOPIC_SUFFIX + created_version
+        # create s3 topic
+        endpoint_address = 'http://' + host + ':' + str(http_port)
+        endpoint_args = 'push-endpoint=' + endpoint_address + '&persistent=true'
+        topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
+        topics_conf_list.append(topic_conf)
+        topic_arn = topic_conf.set_config()
+        # create s3 notification
+        notification_name = bucket_name + NOTIFICATION_SUFFIX + created_version
+        s3_notification_list = [{'Id': notification_name, 'TopicArn': topic_arn,
+                                 'Events': []
+                                 }]
+
+        s3_notification_conf = PSNotificationS3(conn, bucket_name, s3_notification_list)
+        s3_notification_conf_list.append(s3_notification_conf)
+        response, status = s3_notification_conf.set_config()
+        assert_equal(status / 100, 2)
+
+    # create topic to poll on
+    polling_topics_conf = []
+    for conn, bucket in zip(connections_list, buckets_list):
+        topic_name = bucket.name + TOPIC_SUFFIX + '_1'
+        topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
+        polling_topics_conf.append(topic_conf)
+
+    # enable v2 notification
+    result = admin(['zonegroup', 'modify', '--enable-feature=notification_v2'], get_config_cluster())
+    assert_equal(result[1], 0)
+    result = admin(['period', 'update'], get_config_cluster())
+    assert_equal(result[1], 0)
+    result = admin(['period', 'commit'], get_config_cluster())
+    assert_equal(result[1], 0)
+
+    # poll on topic_1
+    for tenant, topic_conf in zip(tenants_list, polling_topics_conf):
+        while True:
+            if tenant == '':
+                result = admin(['topic', 'rm', '--topic', topic_conf.topic_name], get_config_cluster())
+            else:
+                result = admin(['topic', 'rm', '--topic', topic_conf.topic_name, '--tenant', tenant], get_config_cluster())
+
+            if result[1] != 0:
+                print(result)
+            else:
+                break
+
+            time.sleep(1)
+
+    # check if we migrated all the topics
+    for tenant in tenants_list:
+        if tenant == '':
+            topics_result = admin(['topic', 'list'], get_config_cluster())
+        else:
+            topics_result = admin(['topic', 'list', '--tenant', tenant], get_config_cluster())
+        topics_json = json.loads(topics_result[0])
+        assert_equal(len(topics_json['topics']), 2)
+
+    # check if we migrated all the notifications
+    for tenant, bucket in zip(tenants_list, buckets_list):
+        if tenant == '':
+            notifications_result = admin(['notification', 'list', '--bucket', bucket.name], get_config_cluster())
+        else:
+            notifications_result = admin(['notification', 'list', '--bucket', bucket.name, '--tenant', tenant], get_config_cluster())
+        notifications_json = json.loads(notifications_result[0])
+        assert_equal(len(notifications_json['notifications']), 2)
+
+    # cleanup
+    for s3_notification_conf in s3_notification_conf_list:
+        s3_notification_conf.del_config()
+    for topic_conf in topics_conf_list:
+        topic_conf.del_config()
+    # delete the bucket
+    for conn, bucket in zip(connections_list, buckets_list):
+        conn.delete_bucket(bucket.name)
+