]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
RGW: Add observability over the persistent topics queue
authorAli Masarwa <ali.saed.masarwa@gmail.com>
Tue, 6 Jun 2023 13:20:40 +0000 (16:20 +0300)
committerAli Masarwa <ali.saed.masarwa@gmail.com>
Thu, 6 Jul 2023 08:47:32 +0000 (11:47 +0300)
Signed-off-by: Ali Masarwa <ali.saed.masarwa@gmail.com>
src/cls/2pc_queue/cls_2pc_queue.cc
src/cls/2pc_queue/cls_2pc_queue_client.cc
src/cls/2pc_queue/cls_2pc_queue_client.h
src/cls/2pc_queue/cls_2pc_queue_const.h
src/cls/2pc_queue/cls_2pc_queue_types.h
src/cls/queue/cls_queue_ops.h
src/rgw/driver/rados/rgw_notify.cc
src/rgw/driver/rados/rgw_notify.h
src/rgw/rgw_admin.cc
src/test/rgw/bucket_notification/test_bn.py

index fba76395542f33280bba83a6365c0588b9237cf7..45710f9abe3d9005ecf8c2732c227c0fbbfe9fe8 100644 (file)
@@ -55,6 +55,36 @@ static int cls_2pc_queue_get_capacity(cls_method_context_t hctx, bufferlist *in,
   return 0;
 }
 
+static int cls_2pc_queue_get_topic_stats(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
+{
+  cls_queue_get_stats_ret op_ret;
+
+  // get head
+  cls_queue_head head;
+  auto ret = queue_read_head(hctx, head);
+  if (ret < 0) {
+    return ret;
+  }
+  const auto remaining_size = (head.tail.offset >= head.front.offset) ?
+                              (head.queue_size - head.tail.offset) + (head.front.offset - head.max_head_size) :
+                              head.front.offset - head.tail.offset;
+  op_ret.queue_size = head.queue_size - head.max_head_size - remaining_size;
+
+  cls_2pc_urgent_data urgent_data;
+  try {
+    auto in_iter = head.bl_urgent_data.cbegin();
+    decode(urgent_data, in_iter);
+  } catch (ceph::buffer::error& err) {
+    CLS_LOG(1, "ERROR: cls_2pc_queue_get_committed_entries: failed to decode header of queue: %s", err.what());
+    return -EINVAL;
+  }
+  op_ret.queue_entries = urgent_data.committed_entries;
+
+  encode(op_ret, *out);
+
+  return 0;
+}
+
 static int cls_2pc_queue_reserve(cls_method_context_t hctx, bufferlist *in, bufferlist *out) {
   cls_2pc_queue_reserve_op res_op;
   try {
@@ -112,7 +142,7 @@ static int cls_2pc_queue_reserve(cls_method_context_t hctx, bufferlist *in, buff
   cls_2pc_reservations::iterator last_reservation;
   std::tie(last_reservation, result) = urgent_data.reservations.emplace(std::piecewise_construct,
           std::forward_as_tuple(urgent_data.last_id),
-          std::forward_as_tuple(res_op.size, ceph::coarse_real_clock::now()));
+          std::forward_as_tuple(res_op.size, ceph::coarse_real_clock::now(), res_op.entries));
   if (!result) {
     // an old reservation that was never committed or aborted is in the map
     // caller should try again assuming other IDs are ok
@@ -148,7 +178,7 @@ static int cls_2pc_queue_reserve(cls_method_context_t hctx, bufferlist *in, buff
     }
     std::tie(std::ignore, result) = xattr_reservations.emplace(std::piecewise_construct,
           std::forward_as_tuple(urgent_data.last_id),
-          std::forward_as_tuple(res_op.size, ceph::coarse_real_clock::now()));
+          std::forward_as_tuple(res_op.size, ceph::coarse_real_clock::now(), res_op.entries));
     if (!result) {
       // an old reservation that was never committed or aborted is in the map
       // caller should try again assuming other IDs are ok
@@ -268,6 +298,7 @@ static int cls_2pc_queue_commit(cls_method_context_t hctx, bufferlist *in, buffe
   }
 
   urgent_data.reserved_size -= res.size;
+  urgent_data.committed_entries += res.entries;
 
   if (xattr_reservations.empty()) {
     // remove the reservation from urgent data
@@ -577,6 +608,7 @@ CLS_INIT(2pc_queue)
   cls_handle_t h_class;
   cls_method_handle_t h_2pc_queue_init;
   cls_method_handle_t h_2pc_queue_get_capacity;
+  cls_method_handle_t h_2pc_queue_get_topic_stats;
   cls_method_handle_t h_2pc_queue_reserve;
   cls_method_handle_t h_2pc_queue_commit;
   cls_method_handle_t h_2pc_queue_abort;
@@ -589,6 +621,7 @@ CLS_INIT(2pc_queue)
 
   cls_register_cxx_method(h_class, TPC_QUEUE_INIT, CLS_METHOD_RD | CLS_METHOD_WR, cls_2pc_queue_init, &h_2pc_queue_init);
   cls_register_cxx_method(h_class, TPC_QUEUE_GET_CAPACITY, CLS_METHOD_RD, cls_2pc_queue_get_capacity, &h_2pc_queue_get_capacity);
+  cls_register_cxx_method(h_class, TPC_QUEUE_GET_TOPIC_STATS, CLS_METHOD_RD, cls_2pc_queue_get_topic_stats, &h_2pc_queue_get_topic_stats);
   cls_register_cxx_method(h_class, TPC_QUEUE_RESERVE, CLS_METHOD_RD | CLS_METHOD_WR, cls_2pc_queue_reserve, &h_2pc_queue_reserve);
   cls_register_cxx_method(h_class, TPC_QUEUE_COMMIT, CLS_METHOD_RD | CLS_METHOD_WR, cls_2pc_queue_commit, &h_2pc_queue_commit);
   cls_register_cxx_method(h_class, TPC_QUEUE_ABORT, CLS_METHOD_RD | CLS_METHOD_WR, cls_2pc_queue_abort, &h_2pc_queue_abort);
index 6868b2b6f83ee88a2eeb6fc5eca4ba25562437f4..42632cba61aa5c58eef84178edb3eaea6638f5c0 100644 (file)
@@ -31,6 +31,21 @@ int cls_2pc_queue_get_capacity_result(const bufferlist& bl, uint64_t& size) {
   return 0;
 }
 
+int cls_2pc_queue_get_topic_stats_result(const bufferlist& bl, uint32_t& committed_entries, uint64_t& size) {
+  cls_queue_get_stats_ret op_ret;
+  auto iter = bl.cbegin();
+  try {
+    decode(op_ret, iter);
+  } catch (buffer::error& err) {
+    return -EIO;
+  }
+
+  committed_entries = op_ret.queue_entries;
+  size = op_ret.queue_size;
+
+  return 0;
+}
+
 #ifndef CLS_CLIENT_HIDE_IOCTX
 int cls_2pc_queue_get_capacity(IoCtx& io_ctx, const std::string& queue_name, uint64_t& size) {
   bufferlist in, out;
@@ -44,12 +59,31 @@ int cls_2pc_queue_get_capacity(IoCtx& io_ctx, const std::string& queue_name, uin
 #endif
 
 // optionally async method for getting capacity (bytes) 
-// after answer is received, call cls_2pc_queue_get_capacity_result() to prase the results
+// after answer is received, call cls_2pc_queue_get_capacity_result() to parse the results
 void cls_2pc_queue_get_capacity(ObjectReadOperation& op, bufferlist* obl, int* prval) {
   bufferlist in;
   op.exec(TPC_QUEUE_CLASS, TPC_QUEUE_GET_CAPACITY, in, obl, prval);
 }
 
+#ifndef CLS_CLIENT_HIDE_IOCTX
+int cls_2pc_queue_get_topic_stats(IoCtx& io_ctx, const std::string& queue_name, uint32_t& committed_entries, uint64_t& size) {
+  bufferlist in, out;
+  const auto r = io_ctx.exec(queue_name, TPC_QUEUE_CLASS, TPC_QUEUE_GET_TOPIC_STATS, in, out);
+  if (r < 0 ) {
+    return r;
+  }
+
+  return cls_2pc_queue_get_topic_stats_result(out, committed_entries, size);
+}
+#endif
+
+// optionally async method for getting number of commited entries and size (bytes)
+// after answer is received, call cls_2pc_queue_get_topic_stats_result() to parse the results
+void cls_2pc_queue_get_topic_stats(ObjectReadOperation& op, bufferlist* obl, int* prval) {
+  bufferlist in;
+  op.exec(TPC_QUEUE_CLASS, TPC_QUEUE_GET_TOPIC_STATS, in, obl, prval);
+}
+
 
 int cls_2pc_queue_reserve_result(const bufferlist& bl, cls_2pc_reservation::id_t& res_id) {
   cls_2pc_queue_reserve_ret op_ret;
index e0bdeafd59098c253c6a46edddebe6708ef1fe21..20043edd2009924966005b5c4de51f5b1f5b3571 100644 (file)
@@ -19,6 +19,8 @@ void cls_2pc_queue_init(librados::ObjectWriteOperation& op, const std::string& q
 #ifndef CLS_CLIENT_HIDE_IOCTX
 // return capacity (bytes)
 int cls_2pc_queue_get_capacity(librados::IoCtx& io_ctx, const std::string& queue_name, uint64_t& size);
+// return the number of committed entries and size (bytes)
+int cls_2pc_queue_get_topic_stats(librados::IoCtx& io_ctx, const std::string& queue_name, uint32_t& committed_entries, uint64_t& size);
 
 // make a reservation on the queue (in bytes) and number of expected entries (to calculate overhead)
 // return a reservation id if reservations is possible, 0 otherwise
@@ -37,7 +39,12 @@ int cls_2pc_queue_list_reservations(librados::IoCtx& io_ctx, const std::string&
 // after answer is received, call cls_2pc_queue_get_capacity_result() to parse the results
 void cls_2pc_queue_get_capacity(librados::ObjectReadOperation& op,  bufferlist* obl, int* prval);
 
+// optionally async method for getting capacity (bytes)
+// after answer is received, call cls_2pc_queue_get_topic_stats_result() to parse the results
+void cls_2pc_queue_get_topic_stats(librados::ObjectReadOperation& op,  bufferlist* obl, int* prval);
+
 int cls_2pc_queue_get_capacity_result(const bufferlist& bl, uint64_t& size);
+int cls_2pc_queue_get_topic_stats_result(const bufferlist& bl, uint32_t& committed_entries, uint64_t& size);
 
 // optionally async method for making a reservation on the queue (in bytes) and number of expected entries (to calculate overhead)
 // notes: 
index 160c5b66e9f4c92a71b8450962d4ec6dabfef9c7..ea7afa943cabc59343ab927bde05ce58aadff2b2 100644 (file)
@@ -4,6 +4,7 @@
 
 #define TPC_QUEUE_INIT "2pc_queue_init"
 #define TPC_QUEUE_GET_CAPACITY "2pc_queue_get_capacity"
+#define TPC_QUEUE_GET_TOPIC_STATS "2pc_queue_get_topic_stats"
 #define TPC_QUEUE_RESERVE "2pc_queue_reserve"
 #define TPC_QUEUE_COMMIT "2pc_queue_commit"
 #define TPC_QUEUE_ABORT "2pc_queue_abort"
index 7c94cdebfe0372ceabb58daf65f8972b4847bb26..2413fd7043da0cb10d990fb4c1754663821ec134 100644 (file)
@@ -8,25 +8,30 @@ struct cls_2pc_reservation
 {
   using id_t = uint32_t;
   inline static const id_t NO_ID{0};
-  uint64_t size;        // how many entries are reserved
+  uint64_t size;                     // how much size to reserve (bytes)
   ceph::coarse_real_time timestamp;  // when the reservation was done (used for cleaning stale reservations)
+  uint32_t entries;                  // how many entries are reserved
 
-  cls_2pc_reservation(uint64_t _size, ceph::coarse_real_time _timestamp) :
-      size(_size), timestamp(_timestamp) {}
+  cls_2pc_reservation(uint64_t _size, ceph::coarse_real_time _timestamp, uint32_t _entries) :
+      size(_size), timestamp(_timestamp), entries(_entries) {}
 
   cls_2pc_reservation() = default;
 
   void encode(ceph::buffer::list& bl) const {
-    ENCODE_START(1, 1, bl);
+    ENCODE_START(2, 1, bl);
     encode(size, bl);
     encode(timestamp, bl);
+    encode(entries, bl);
     ENCODE_FINISH(bl);
   }
 
   void decode(ceph::buffer::list::const_iterator& bl) {
-    DECODE_START(1, bl);
+    DECODE_START(2, bl);
     decode(size, bl);
     decode(timestamp, bl);
+    if (struct_v >= 2) {
+      decode(entries, bl);
+    }
     DECODE_FINISH(bl);
   }
 };
@@ -40,22 +45,27 @@ struct cls_2pc_urgent_data
   cls_2pc_reservation::id_t last_id{cls_2pc_reservation::NO_ID}; // last allocated id
   cls_2pc_reservations reservations; // reservation list (keyed by id)
   bool has_xattrs{false};
+  uint32_t committed_entries{0}; // how many entries have been committed so far
 
   void encode(ceph::buffer::list& bl) const {
-    ENCODE_START(1, 1, bl);
+    ENCODE_START(2, 1, bl);
     encode(reserved_size, bl);
     encode(last_id, bl);
     encode(reservations, bl);
     encode(has_xattrs, bl);
+    encode(committed_entries, bl);
     ENCODE_FINISH(bl);
   }
 
   void decode(ceph::buffer::list::const_iterator& bl) {
-    DECODE_START(1, bl);
+    DECODE_START(2, bl);
     decode(reserved_size, bl);
     decode(last_id, bl);
     decode(reservations, bl);
     decode(has_xattrs, bl);
+    if (struct_v >= 2) {
+      decode(committed_entries, bl);
+    }
     DECODE_FINISH(bl);
   }
 };
index 64891cffb390959e4b4c98373c1ae58f6b4f04ce..8209659bda907b7492f6a6366e4762698fa0eec1 100644 (file)
@@ -136,4 +136,26 @@ struct cls_queue_get_capacity_ret {
 };
 WRITE_CLASS_ENCODER(cls_queue_get_capacity_ret)
 
+struct cls_queue_get_stats_ret {
+  uint64_t queue_size;
+  uint32_t queue_entries;
+
+  cls_queue_get_stats_ret() {}
+
+  void encode(ceph::buffer::list& bl) const {
+    ENCODE_START(1, 1, bl);
+    encode(queue_size, bl);
+    encode(queue_entries, bl);
+    ENCODE_FINISH(bl);
+  }
+
+  void decode(ceph::buffer::list::const_iterator& bl) {
+    DECODE_START(1, bl);
+    decode(queue_size, bl);
+    decode(queue_entries, bl);
+    DECODE_FINISH(bl);
+  }
+};
+WRITE_CLASS_ENCODER(cls_queue_get_stats_ret)
+
 #endif /* CEPH_CLS_QUEUE_OPS_H */
index b1835016ec0eef21b9adc01ace1e43c67f74fc50..324fb4460bb396f14c1eefd2609d11950f5d561b 100644 (file)
@@ -977,6 +977,26 @@ int publish_abort(reservation_t& res) {
   return 0;
 }
 
+int get_persistent_queue_stats_by_topic_name(const DoutPrefixProvider *dpp, librados::IoCtx &rados_ioctx,
+                                             const std::string &topic_name, rgw_topic_stats &stats, optional_yield y)
+{
+  cls_2pc_reservations reservations;
+  auto ret = cls_2pc_queue_list_reservations(rados_ioctx, topic_name, reservations);
+  if (ret < 0) {
+    ldpp_dout(dpp, 1) << "ERROR: failed to read queue list reservation: " << ret << dendl;
+    return ret;
+  }
+  stats.queue_reservations = reservations.size();
+
+  ret = cls_2pc_queue_get_topic_stats(rados_ioctx, topic_name, stats.queue_entries, stats.queue_size);
+  if (ret < 0) {
+    ldpp_dout(dpp, 1) << "ERROR: failed to get the queue size or the number of entries: " << ret << dendl;
+    return ret;
+  }
+
+  return 0;
+}
+
 reservation_t::reservation_t(const DoutPrefixProvider* _dpp,
                             rgw::sal::RadosStore* _store,
                             const req_state* _s,
@@ -1020,4 +1040,12 @@ reservation_t::~reservation_t() {
   publish_abort(*this);
 }
 
+void rgw_topic_stats::dump(Formatter *f) const {
+  f->open_object_section("Topic Stats");
+  f->dump_int("Reservations", queue_reservations);
+  f->dump_int("Size", queue_size);
+  f->dump_int("Entries", queue_entries);
+  f->close_section();
+}
+
 } // namespace rgw::notify
index 9269611e4a6f3a1ae539ad6c2c54a2d72044bdf0..460a7bacb5dac5b777e0dc7ef66abe26f4659838 100644 (file)
@@ -98,8 +98,18 @@ struct reservation_t {
   ~reservation_t();
 };
 
+
+
+struct rgw_topic_stats {
+  std::size_t queue_reservations; // number of reservations
+  uint64_t queue_size;            // in bytes
+  uint32_t queue_entries;         // number of entries
+
+  void dump(Formatter *f) const;
+};
+
 // create a reservation on the 2-phase-commit queue
-  int publish_reserve(const DoutPrefixProvider *dpp,
+int publish_reserve(const DoutPrefixProvider *dpp,
                      EventType event_type,
                      reservation_t& reservation,
                      const RGWObjTags* req_tags);
@@ -117,5 +127,8 @@ int publish_commit(rgw::sal::Object* obj,
 // cancel the reservation
 int publish_abort(reservation_t& reservation);
 
+int get_persistent_queue_stats_by_topic_name(const DoutPrefixProvider *dpp, librados::IoCtx &rados_ioctx,
+                                             const std::string &topic_name, rgw_topic_stats &stats, optional_yield y);
+
 }
 
index 96742ad20ba347379e750985f16a5b76353d362c..dbf28c5a22a29538e5c73ed6d9f25e9765141382 100644 (file)
@@ -840,6 +840,7 @@ enum class OPT {
   PUBSUB_NOTIFICATION_LIST,
   PUBSUB_NOTIFICATION_GET,
   PUBSUB_NOTIFICATION_RM,
+  PUBSUB_TOPIC_STATS,
   SCRIPT_PUT,
   SCRIPT_GET,
   SCRIPT_RM,
@@ -1074,6 +1075,7 @@ static SimpleCmd::Commands all_cmds = {
   { "notification list", OPT::PUBSUB_NOTIFICATION_LIST },
   { "notification get", OPT::PUBSUB_NOTIFICATION_GET },
   { "notification rm", OPT::PUBSUB_NOTIFICATION_RM },
+  { "topic stats", OPT::PUBSUB_TOPIC_STATS },
   { "script put", OPT::SCRIPT_PUT },
   { "script get", OPT::SCRIPT_GET },
   { "script rm", OPT::SCRIPT_RM },
@@ -4198,6 +4200,7 @@ int main(int argc, const char **argv)
        OPT::PUBSUB_NOTIFICATION_LIST,
                         OPT::PUBSUB_TOPIC_GET,
        OPT::PUBSUB_NOTIFICATION_GET,
+       OPT::PUBSUB_TOPIC_STATS  ,
                         OPT::SCRIPT_GET,
     };
 
@@ -4284,7 +4287,8 @@ int main(int argc, const char **argv)
                           && opt_cmd != OPT::PUBSUB_TOPIC_GET
                           && opt_cmd != OPT::PUBSUB_NOTIFICATION_GET
                           && opt_cmd != OPT::PUBSUB_TOPIC_RM
-                          && opt_cmd != OPT::PUBSUB_NOTIFICATION_RM) {
+                          && opt_cmd != OPT::PUBSUB_NOTIFICATION_RM
+                          && opt_cmd != OPT::PUBSUB_TOPIC_STATS  ) {
         cerr << "ERROR: --tenant is set, but there's no user ID" << std::endl;
         return EINVAL;
       }
@@ -10513,7 +10517,8 @@ next:
       return EINVAL;
     }
 
-    ret = rgw::notify::remove_persistent_topic(dpp(), static_cast<rgw::sal::RadosStore*>(driver)->getRados()->get_notif_pool_ctx(), topic_name, null_yield);
+    ret = rgw::notify::remove_persistent_topic(
+        dpp(), static_cast<rgw::sal::RadosStore*>(driver)->getRados()->get_notif_pool_ctx(), topic_name, null_yield);
     if (ret < 0) {
       cerr << "ERROR: could not remove persistent topic: " << cpp_strerror(-ret) << std::endl;
       return -ret;
@@ -10558,6 +10563,24 @@ next:
     }
   }
 
+  if (opt_cmd == OPT::PUBSUB_TOPIC_STATS) {
+    if (topic_name.empty()) {
+      cerr << "ERROR: topic name was not provided (via --topic)" << std::endl;
+      return EINVAL;
+    }
+
+    rgw::notify::rgw_topic_stats stats;
+    ret = rgw::notify::get_persistent_queue_stats_by_topic_name(
+        dpp(), static_cast<rgw::sal::RadosStore *>(driver)->getRados()->get_notif_pool_ctx(), topic_name,
+        stats, null_yield);
+    if (ret < 0) {
+      cerr << "ERROR: could not get persistent queue: " << cpp_strerror(-ret) << std::endl;
+      return -ret;
+    }
+    encode_json("", stats, formatter.get());
+    formatter->flush(cout);
+  }
+
   if (opt_cmd == OPT::SCRIPT_PUT) {
     if (!str_script_ctx) {
       cerr << "ERROR: context was not provided (via --context)" << std::endl;
index 8a9b29321988c78ea56ea329e817c1980b675de9..3fd249f61f086e6a794dcba94d9fc64c58a03b43 100644 (file)
@@ -3023,6 +3023,90 @@ def test_ps_s3_persistent_cleanup():
     http_server.close()
 
 
+@attr('basic_test')
+def test_ps_s3_persistent_topic_stats():
+    """ test persistent topic stats """
+    conn = connection()
+    zonegroup = 'default'
+
+    # create random port for the http server
+    host = get_ip()
+    port = random.randint(10000, 20000)
+
+    # create bucket
+    bucket_name = gen_bucket_name()
+    bucket = conn.create_bucket(bucket_name)
+    topic_name = bucket_name + TOPIC_SUFFIX
+
+    # create s3 topic
+    endpoint_address = 'http://'+host+':'+str(port)
+    endpoint_args = 'push-endpoint='+endpoint_address+'&persistent=true'
+    topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
+    topic_arn = topic_conf.set_config()
+    # create s3 notification
+    notification_name = bucket_name + NOTIFICATION_SUFFIX
+    topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn,
+                        'Events': []
+                        }]
+
+    s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
+    response, status = s3_notification_conf.set_config()
+    assert_equal(status/100, 2)
+
+    # topic stats
+    result = admin(['topic', 'stats', '--topic', topic_name])
+    parsed_result = json.loads(result[0])
+    assert_equal(parsed_result['Topic Stats']['Entries'], 0)
+    assert_equal(result[1], 0)
+
+    # create objects in the bucket (async)
+    number_of_objects = 10
+    client_threads = []
+    start_time = time.time()
+    for i in range(number_of_objects):
+        key = bucket.new_key('key-'+str(i))
+        content = str(os.urandom(1024*1024))
+        thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
+        thr.start()
+        client_threads.append(thr)
+    [thr.join() for thr in client_threads]
+    time_diff = time.time() - start_time
+    print('average time for creation + async http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
+
+    # topic stats
+    result = admin(['topic', 'stats', '--topic', topic_name])
+    parsed_result = json.loads(result[0])
+    assert_equal(parsed_result['Topic Stats']['Entries'], number_of_objects)
+    assert_equal(result[1], 0)
+
+    # delete objects from the bucket
+    client_threads = []
+    start_time = time.time()
+    count = 0
+    for key in bucket.list():
+        count += 1
+        thr = threading.Thread(target = key.delete, args=())
+        thr.start()
+        client_threads.append(thr)
+        if count%100 == 0:
+            [thr.join() for thr in client_threads]
+            time_diff = time.time() - start_time
+            print('average time for deletion + async http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
+            client_threads = []
+            start_time = time.time()
+
+    # topic stats
+    result = admin(['topic', 'stats', '--topic', topic_name])
+    parsed_result = json.loads(result[0])
+    assert_equal(parsed_result['Topic Stats']['Entries'], 2*number_of_objects)
+    assert_equal(result[1], 0)
+
+    # cleanup
+    s3_notification_conf.del_config()
+    topic_conf.del_config()
+    # delete the bucket
+    conn.delete_bucket(bucket_name)
+
 @attr('manual_test')
 def test_ps_s3_persistent_notification_pushback():
     """ test pushing persistent notification pushback """