return 0;
}
+static int cls_2pc_queue_get_topic_stats(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
+{
+ cls_queue_get_stats_ret op_ret;
+
+ // get head
+ cls_queue_head head;
+ auto ret = queue_read_head(hctx, head);
+ if (ret < 0) {
+ return ret;
+ }
+ const auto remaining_size = (head.tail.offset >= head.front.offset) ?
+ (head.queue_size - head.tail.offset) + (head.front.offset - head.max_head_size) :
+ head.front.offset - head.tail.offset;
+ op_ret.queue_size = head.queue_size - head.max_head_size - remaining_size;
+
+ cls_2pc_urgent_data urgent_data;
+ try {
+ auto in_iter = head.bl_urgent_data.cbegin();
+ decode(urgent_data, in_iter);
+ } catch (ceph::buffer::error& err) {
+ CLS_LOG(1, "ERROR: cls_2pc_queue_get_committed_entries: failed to decode header of queue: %s", err.what());
+ return -EINVAL;
+ }
+ op_ret.queue_entries = urgent_data.committed_entries;
+
+ encode(op_ret, *out);
+
+ return 0;
+}
+
static int cls_2pc_queue_reserve(cls_method_context_t hctx, bufferlist *in, bufferlist *out) {
cls_2pc_queue_reserve_op res_op;
try {
cls_2pc_reservations::iterator last_reservation;
std::tie(last_reservation, result) = urgent_data.reservations.emplace(std::piecewise_construct,
std::forward_as_tuple(urgent_data.last_id),
- std::forward_as_tuple(res_op.size, ceph::coarse_real_clock::now()));
+ std::forward_as_tuple(res_op.size, ceph::coarse_real_clock::now(), res_op.entries));
if (!result) {
// an old reservation that was never committed or aborted is in the map
// caller should try again assuming other IDs are ok
}
std::tie(std::ignore, result) = xattr_reservations.emplace(std::piecewise_construct,
std::forward_as_tuple(urgent_data.last_id),
- std::forward_as_tuple(res_op.size, ceph::coarse_real_clock::now()));
+ std::forward_as_tuple(res_op.size, ceph::coarse_real_clock::now(), res_op.entries));
if (!result) {
// an old reservation that was never committed or aborted is in the map
// caller should try again assuming other IDs are ok
}
urgent_data.reserved_size -= res.size;
+ urgent_data.committed_entries += res.entries;
if (xattr_reservations.empty()) {
// remove the reservation from urgent data
cls_handle_t h_class;
cls_method_handle_t h_2pc_queue_init;
cls_method_handle_t h_2pc_queue_get_capacity;
+ cls_method_handle_t h_2pc_queue_get_topic_stats;
cls_method_handle_t h_2pc_queue_reserve;
cls_method_handle_t h_2pc_queue_commit;
cls_method_handle_t h_2pc_queue_abort;
cls_register_cxx_method(h_class, TPC_QUEUE_INIT, CLS_METHOD_RD | CLS_METHOD_WR, cls_2pc_queue_init, &h_2pc_queue_init);
cls_register_cxx_method(h_class, TPC_QUEUE_GET_CAPACITY, CLS_METHOD_RD, cls_2pc_queue_get_capacity, &h_2pc_queue_get_capacity);
+ cls_register_cxx_method(h_class, TPC_QUEUE_GET_TOPIC_STATS, CLS_METHOD_RD, cls_2pc_queue_get_topic_stats, &h_2pc_queue_get_topic_stats);
cls_register_cxx_method(h_class, TPC_QUEUE_RESERVE, CLS_METHOD_RD | CLS_METHOD_WR, cls_2pc_queue_reserve, &h_2pc_queue_reserve);
cls_register_cxx_method(h_class, TPC_QUEUE_COMMIT, CLS_METHOD_RD | CLS_METHOD_WR, cls_2pc_queue_commit, &h_2pc_queue_commit);
cls_register_cxx_method(h_class, TPC_QUEUE_ABORT, CLS_METHOD_RD | CLS_METHOD_WR, cls_2pc_queue_abort, &h_2pc_queue_abort);
return 0;
}
+int cls_2pc_queue_get_topic_stats_result(const bufferlist& bl, uint32_t& committed_entries, uint64_t& size) {
+ cls_queue_get_stats_ret op_ret;
+ auto iter = bl.cbegin();
+ try {
+ decode(op_ret, iter);
+ } catch (buffer::error& err) {
+ return -EIO;
+ }
+
+ committed_entries = op_ret.queue_entries;
+ size = op_ret.queue_size;
+
+ return 0;
+}
+
#ifndef CLS_CLIENT_HIDE_IOCTX
int cls_2pc_queue_get_capacity(IoCtx& io_ctx, const std::string& queue_name, uint64_t& size) {
bufferlist in, out;
#endif
// optionally async method for getting capacity (bytes)
-// after answer is received, call cls_2pc_queue_get_capacity_result() to prase the results
+// after answer is received, call cls_2pc_queue_get_capacity_result() to parse the results
void cls_2pc_queue_get_capacity(ObjectReadOperation& op, bufferlist* obl, int* prval) {
bufferlist in;
op.exec(TPC_QUEUE_CLASS, TPC_QUEUE_GET_CAPACITY, in, obl, prval);
}
+#ifndef CLS_CLIENT_HIDE_IOCTX
+int cls_2pc_queue_get_topic_stats(IoCtx& io_ctx, const std::string& queue_name, uint32_t& committed_entries, uint64_t& size) {
+ bufferlist in, out;
+ const auto r = io_ctx.exec(queue_name, TPC_QUEUE_CLASS, TPC_QUEUE_GET_TOPIC_STATS, in, out);
+ if (r < 0 ) {
+ return r;
+ }
+
+ return cls_2pc_queue_get_topic_stats_result(out, committed_entries, size);
+}
+#endif
+
+// optionally async method for getting number of commited entries and size (bytes)
+// after answer is received, call cls_2pc_queue_get_topic_stats_result() to parse the results
+void cls_2pc_queue_get_topic_stats(ObjectReadOperation& op, bufferlist* obl, int* prval) {
+ bufferlist in;
+ op.exec(TPC_QUEUE_CLASS, TPC_QUEUE_GET_TOPIC_STATS, in, obl, prval);
+}
+
int cls_2pc_queue_reserve_result(const bufferlist& bl, cls_2pc_reservation::id_t& res_id) {
cls_2pc_queue_reserve_ret op_ret;
#ifndef CLS_CLIENT_HIDE_IOCTX
// return capacity (bytes)
int cls_2pc_queue_get_capacity(librados::IoCtx& io_ctx, const std::string& queue_name, uint64_t& size);
+// return the number of committed entries and size (bytes)
+int cls_2pc_queue_get_topic_stats(librados::IoCtx& io_ctx, const std::string& queue_name, uint32_t& committed_entries, uint64_t& size);
// make a reservation on the queue (in bytes) and number of expected entries (to calculate overhead)
// return a reservation id if reservations is possible, 0 otherwise
// after answer is received, call cls_2pc_queue_get_capacity_result() to parse the results
void cls_2pc_queue_get_capacity(librados::ObjectReadOperation& op, bufferlist* obl, int* prval);
+// optionally async method for getting capacity (bytes)
+// after answer is received, call cls_2pc_queue_get_topic_stats_result() to parse the results
+void cls_2pc_queue_get_topic_stats(librados::ObjectReadOperation& op, bufferlist* obl, int* prval);
+
int cls_2pc_queue_get_capacity_result(const bufferlist& bl, uint64_t& size);
+int cls_2pc_queue_get_topic_stats_result(const bufferlist& bl, uint32_t& committed_entries, uint64_t& size);
// optionally async method for making a reservation on the queue (in bytes) and number of expected entries (to calculate overhead)
// notes:
#define TPC_QUEUE_INIT "2pc_queue_init"
#define TPC_QUEUE_GET_CAPACITY "2pc_queue_get_capacity"
+#define TPC_QUEUE_GET_TOPIC_STATS "2pc_queue_get_topic_stats"
#define TPC_QUEUE_RESERVE "2pc_queue_reserve"
#define TPC_QUEUE_COMMIT "2pc_queue_commit"
#define TPC_QUEUE_ABORT "2pc_queue_abort"
{
using id_t = uint32_t;
inline static const id_t NO_ID{0};
- uint64_t size; // how many entries are reserved
+ uint64_t size; // how much size to reserve (bytes)
ceph::coarse_real_time timestamp; // when the reservation was done (used for cleaning stale reservations)
+ uint32_t entries; // how many entries are reserved
- cls_2pc_reservation(uint64_t _size, ceph::coarse_real_time _timestamp) :
- size(_size), timestamp(_timestamp) {}
+ cls_2pc_reservation(uint64_t _size, ceph::coarse_real_time _timestamp, uint32_t _entries) :
+ size(_size), timestamp(_timestamp), entries(_entries) {}
cls_2pc_reservation() = default;
void encode(ceph::buffer::list& bl) const {
- ENCODE_START(1, 1, bl);
+ ENCODE_START(2, 1, bl);
encode(size, bl);
encode(timestamp, bl);
+ encode(entries, bl);
ENCODE_FINISH(bl);
}
void decode(ceph::buffer::list::const_iterator& bl) {
- DECODE_START(1, bl);
+ DECODE_START(2, bl);
decode(size, bl);
decode(timestamp, bl);
+ if (struct_v >= 2) {
+ decode(entries, bl);
+ }
DECODE_FINISH(bl);
}
};
cls_2pc_reservation::id_t last_id{cls_2pc_reservation::NO_ID}; // last allocated id
cls_2pc_reservations reservations; // reservation list (keyed by id)
bool has_xattrs{false};
+ uint32_t committed_entries{0}; // how many entries have been committed so far
void encode(ceph::buffer::list& bl) const {
- ENCODE_START(1, 1, bl);
+ ENCODE_START(2, 1, bl);
encode(reserved_size, bl);
encode(last_id, bl);
encode(reservations, bl);
encode(has_xattrs, bl);
+ encode(committed_entries, bl);
ENCODE_FINISH(bl);
}
void decode(ceph::buffer::list::const_iterator& bl) {
- DECODE_START(1, bl);
+ DECODE_START(2, bl);
decode(reserved_size, bl);
decode(last_id, bl);
decode(reservations, bl);
decode(has_xattrs, bl);
+ if (struct_v >= 2) {
+ decode(committed_entries, bl);
+ }
DECODE_FINISH(bl);
}
};
};
WRITE_CLASS_ENCODER(cls_queue_get_capacity_ret)
+struct cls_queue_get_stats_ret {
+ uint64_t queue_size;
+ uint32_t queue_entries;
+
+ cls_queue_get_stats_ret() {}
+
+ void encode(ceph::buffer::list& bl) const {
+ ENCODE_START(1, 1, bl);
+ encode(queue_size, bl);
+ encode(queue_entries, bl);
+ ENCODE_FINISH(bl);
+ }
+
+ void decode(ceph::buffer::list::const_iterator& bl) {
+ DECODE_START(1, bl);
+ decode(queue_size, bl);
+ decode(queue_entries, bl);
+ DECODE_FINISH(bl);
+ }
+};
+WRITE_CLASS_ENCODER(cls_queue_get_stats_ret)
+
#endif /* CEPH_CLS_QUEUE_OPS_H */
return 0;
}
+int get_persistent_queue_stats_by_topic_name(const DoutPrefixProvider *dpp, librados::IoCtx &rados_ioctx,
+ const std::string &topic_name, rgw_topic_stats &stats, optional_yield y)
+{
+ cls_2pc_reservations reservations;
+ auto ret = cls_2pc_queue_list_reservations(rados_ioctx, topic_name, reservations);
+ if (ret < 0) {
+ ldpp_dout(dpp, 1) << "ERROR: failed to read queue list reservation: " << ret << dendl;
+ return ret;
+ }
+ stats.queue_reservations = reservations.size();
+
+ ret = cls_2pc_queue_get_topic_stats(rados_ioctx, topic_name, stats.queue_entries, stats.queue_size);
+ if (ret < 0) {
+ ldpp_dout(dpp, 1) << "ERROR: failed to get the queue size or the number of entries: " << ret << dendl;
+ return ret;
+ }
+
+ return 0;
+}
+
reservation_t::reservation_t(const DoutPrefixProvider* _dpp,
rgw::sal::RadosStore* _store,
const req_state* _s,
publish_abort(*this);
}
+void rgw_topic_stats::dump(Formatter *f) const {
+ f->open_object_section("Topic Stats");
+ f->dump_int("Reservations", queue_reservations);
+ f->dump_int("Size", queue_size);
+ f->dump_int("Entries", queue_entries);
+ f->close_section();
+}
+
} // namespace rgw::notify
~reservation_t();
};
+
+
+struct rgw_topic_stats {
+ std::size_t queue_reservations; // number of reservations
+ uint64_t queue_size; // in bytes
+ uint32_t queue_entries; // number of entries
+
+ void dump(Formatter *f) const;
+};
+
// create a reservation on the 2-phase-commit queue
- int publish_reserve(const DoutPrefixProvider *dpp,
+int publish_reserve(const DoutPrefixProvider *dpp,
EventType event_type,
reservation_t& reservation,
const RGWObjTags* req_tags);
// cancel the reservation
int publish_abort(reservation_t& reservation);
+int get_persistent_queue_stats_by_topic_name(const DoutPrefixProvider *dpp, librados::IoCtx &rados_ioctx,
+ const std::string &topic_name, rgw_topic_stats &stats, optional_yield y);
+
}
PUBSUB_NOTIFICATION_LIST,
PUBSUB_NOTIFICATION_GET,
PUBSUB_NOTIFICATION_RM,
+ PUBSUB_TOPIC_STATS,
SCRIPT_PUT,
SCRIPT_GET,
SCRIPT_RM,
{ "notification list", OPT::PUBSUB_NOTIFICATION_LIST },
{ "notification get", OPT::PUBSUB_NOTIFICATION_GET },
{ "notification rm", OPT::PUBSUB_NOTIFICATION_RM },
+ { "topic stats", OPT::PUBSUB_TOPIC_STATS },
{ "script put", OPT::SCRIPT_PUT },
{ "script get", OPT::SCRIPT_GET },
{ "script rm", OPT::SCRIPT_RM },
OPT::PUBSUB_NOTIFICATION_LIST,
OPT::PUBSUB_TOPIC_GET,
OPT::PUBSUB_NOTIFICATION_GET,
+ OPT::PUBSUB_TOPIC_STATS ,
OPT::SCRIPT_GET,
};
&& opt_cmd != OPT::PUBSUB_TOPIC_GET
&& opt_cmd != OPT::PUBSUB_NOTIFICATION_GET
&& opt_cmd != OPT::PUBSUB_TOPIC_RM
- && opt_cmd != OPT::PUBSUB_NOTIFICATION_RM) {
+ && opt_cmd != OPT::PUBSUB_NOTIFICATION_RM
+ && opt_cmd != OPT::PUBSUB_TOPIC_STATS ) {
cerr << "ERROR: --tenant is set, but there's no user ID" << std::endl;
return EINVAL;
}
return EINVAL;
}
- ret = rgw::notify::remove_persistent_topic(dpp(), static_cast<rgw::sal::RadosStore*>(driver)->getRados()->get_notif_pool_ctx(), topic_name, null_yield);
+ ret = rgw::notify::remove_persistent_topic(
+ dpp(), static_cast<rgw::sal::RadosStore*>(driver)->getRados()->get_notif_pool_ctx(), topic_name, null_yield);
if (ret < 0) {
cerr << "ERROR: could not remove persistent topic: " << cpp_strerror(-ret) << std::endl;
return -ret;
}
}
+ if (opt_cmd == OPT::PUBSUB_TOPIC_STATS) {
+ if (topic_name.empty()) {
+ cerr << "ERROR: topic name was not provided (via --topic)" << std::endl;
+ return EINVAL;
+ }
+
+ rgw::notify::rgw_topic_stats stats;
+ ret = rgw::notify::get_persistent_queue_stats_by_topic_name(
+ dpp(), static_cast<rgw::sal::RadosStore *>(driver)->getRados()->get_notif_pool_ctx(), topic_name,
+ stats, null_yield);
+ if (ret < 0) {
+ cerr << "ERROR: could not get persistent queue: " << cpp_strerror(-ret) << std::endl;
+ return -ret;
+ }
+ encode_json("", stats, formatter.get());
+ formatter->flush(cout);
+ }
+
if (opt_cmd == OPT::SCRIPT_PUT) {
if (!str_script_ctx) {
cerr << "ERROR: context was not provided (via --context)" << std::endl;
http_server.close()
+@attr('basic_test')
+def test_ps_s3_persistent_topic_stats():
+ """ test persistent topic stats """
+ conn = connection()
+ zonegroup = 'default'
+
+ # create random port for the http server
+ host = get_ip()
+ port = random.randint(10000, 20000)
+
+ # create bucket
+ bucket_name = gen_bucket_name()
+ bucket = conn.create_bucket(bucket_name)
+ topic_name = bucket_name + TOPIC_SUFFIX
+
+ # create s3 topic
+ endpoint_address = 'http://'+host+':'+str(port)
+ endpoint_args = 'push-endpoint='+endpoint_address+'&persistent=true'
+ topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
+ topic_arn = topic_conf.set_config()
+ # create s3 notification
+ notification_name = bucket_name + NOTIFICATION_SUFFIX
+ topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn,
+ 'Events': []
+ }]
+
+ s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
+ response, status = s3_notification_conf.set_config()
+ assert_equal(status/100, 2)
+
+ # topic stats
+ result = admin(['topic', 'stats', '--topic', topic_name])
+ parsed_result = json.loads(result[0])
+ assert_equal(parsed_result['Topic Stats']['Entries'], 0)
+ assert_equal(result[1], 0)
+
+ # create objects in the bucket (async)
+ number_of_objects = 10
+ client_threads = []
+ start_time = time.time()
+ for i in range(number_of_objects):
+ key = bucket.new_key('key-'+str(i))
+ content = str(os.urandom(1024*1024))
+ thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
+ thr.start()
+ client_threads.append(thr)
+ [thr.join() for thr in client_threads]
+ time_diff = time.time() - start_time
+ print('average time for creation + async http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
+
+ # topic stats
+ result = admin(['topic', 'stats', '--topic', topic_name])
+ parsed_result = json.loads(result[0])
+ assert_equal(parsed_result['Topic Stats']['Entries'], number_of_objects)
+ assert_equal(result[1], 0)
+
+ # delete objects from the bucket
+ client_threads = []
+ start_time = time.time()
+ count = 0
+ for key in bucket.list():
+ count += 1
+ thr = threading.Thread(target = key.delete, args=())
+ thr.start()
+ client_threads.append(thr)
+ if count%100 == 0:
+ [thr.join() for thr in client_threads]
+ time_diff = time.time() - start_time
+ print('average time for deletion + async http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
+ client_threads = []
+ start_time = time.time()
+
+ # topic stats
+ result = admin(['topic', 'stats', '--topic', topic_name])
+ parsed_result = json.loads(result[0])
+ assert_equal(parsed_result['Topic Stats']['Entries'], 2*number_of_objects)
+ assert_equal(result[1], 0)
+
+ # cleanup
+ s3_notification_conf.del_config()
+ topic_conf.del_config()
+ # delete the bucket
+ conn.delete_bucket(bucket_name)
+
@attr('manual_test')
def test_ps_s3_persistent_notification_pushback():
""" test pushing persistent notification pushback """