using queues_t = std::set<std::string>;
using entries_persistency_tracker = ceph::unordered_map<std::string, persistency_tracker>;
using queues_persistency_tracker = ceph::unordered_map<std::string, entries_persistency_tracker>;
+using rgw::persistent_topic_counters::CountersManager;
// use mmap/mprotect to allocate 128k coroutine stacks
auto make_stack_allocator() {
spawn::spawn(io_context, [this, queue_name](yield_context yield) {
cleanup_queue(queue_name, yield);
}, make_stack_allocator());
-
+
+ CountersManager queue_counters_container(queue_name, this->get_cct());
+
while (true) {
// if queue was empty the last time, sleep for idle timeout
if (is_idle) {
}
}
}
+
+ // updating perfcounters with topic stats
+ uint64_t entries_size;
+ uint32_t entries_number;
+ const auto ret = cls_2pc_queue_get_topic_stats(rados_ioctx, queue_name, entries_number, entries_size);
+ if (ret < 0) {
+ ldpp_dout(this, 1) << "ERROR: topic stats for topic: " << queue_name << ". error: " << ret << dendl;
+ } else {
+ queue_counters_container.set(l_rgw_persistent_topic_len, entries_number);
+ queue_counters_container.set(l_rgw_persistent_topic_size, entries_size);
+ }
}
}
using namespace ceph::perf_counters;
using namespace rgw::op_counters;
+using namespace rgw::persistent_topic_counters;
PerfCounters *perfcounter = NULL;
lpcb->add_time_avg(l_rgw_op_list_buckets_lat, "list_buckets_lat", "List buckets latency");
}
+void add_rgw_topic_counters(PerfCountersBuilder *lpcb) {
+ lpcb->set_prio_default(PerfCountersBuilder::PRIO_USEFUL);
+
+ lpcb->add_u64(l_rgw_persistent_topic_len, "persistent_topic_len", "Persistent topic queue length");
+ lpcb->add_u64(l_rgw_persistent_topic_size, "persistent_topic_size", "Persistent topic queue size");
+
+}
+
void frontend_counters_init(CephContext *cct) {
PerfCountersBuilder pcb(cct, "rgw", l_rgw_first, l_rgw_last);
add_rgw_frontend_counters(&pcb);
} // namespace rgw::op_counters
+namespace rgw::persistent_topic_counters {
+
+const std::string rgw_topic_counters_key = "rgw_topic";
+
+CountersManager::CountersManager(const std::string& topic_name, CephContext *cct)
+ : cct(cct)
+{
+ const std::string topic_key = ceph::perf_counters::key_create(rgw_topic_counters_key, {{"Topic", topic_name}});
+ PerfCountersBuilder pcb(cct, topic_key, l_rgw_topic_first, l_rgw_topic_last);
+ add_rgw_topic_counters(&pcb);
+ topic_counters = std::unique_ptr<PerfCounters>(pcb.create_perf_counters());
+ cct->get_perfcounters_collection()->add(topic_counters.get());
+}
+
+void CountersManager::set(int idx, uint64_t v) {
+ topic_counters->set(idx, v);
+}
+
+CountersManager::~CountersManager() {
+ cct->get_perfcounters_collection()->remove(topic_counters.get());
+}
+
+} // namespace rgw::persistent_topic_counters
+
int rgw_perf_start(CephContext *cct)
{
frontend_counters_init(cct);
l_rgw_op_last
};
+enum {
+ l_rgw_topic_first = 17000,
+
+ l_rgw_persistent_topic_len,
+ l_rgw_persistent_topic_size,
+
+ l_rgw_topic_last
+};
+
namespace rgw::op_counters {
struct CountersContainer {
void tinc(const CountersContainer &counters, int idx, ceph::timespan amt);
} // namespace rgw::op_counters
+
+namespace rgw::persistent_topic_counters {
+
+class CountersManager {
+ std::unique_ptr<PerfCounters> topic_counters;
+ CephContext *cct;
+
+public:
+ CountersManager(const std::string& name, CephContext *cct);
+
+ void set(int idx, uint64_t v);
+
+ ~CountersManager();
+
+};
+
+} // namespace rgw::persistent_topic_counters