"seastore_journal_batch_flush_size"),
journal_segment_manager),
scanner(scanner)
-{}
+{
+ register_metrics();
+}
Journal::prep_replay_segments_fut
Journal::prep_replay_segments(
});
}
+void Journal::register_metrics()
+{
+ record_submitter.reset_stats();
+ namespace sm = seastar::metrics;
+ metrics.add_group(
+ "journal",
+ {
+ sm::make_counter(
+ "record_num",
+ [this] {
+ return record_submitter.get_record_batch_stats().num_io;
+ },
+ sm::description("total number of records submitted")
+ ),
+ sm::make_counter(
+ "record_batch_num",
+ [this] {
+ return record_submitter.get_record_batch_stats().num_io_grouped;
+ },
+ sm::description("total number of records batched")
+ ),
+ sm::make_counter(
+ "io_num",
+ [this] {
+ return record_submitter.get_io_depth_stats().num_io;
+ },
+ sm::description("total number of io submitted")
+ ),
+ sm::make_counter(
+ "io_depth_num",
+ [this] {
+ return record_submitter.get_io_depth_stats().num_io_grouped;
+ },
+ sm::description("total number of io depth")
+ ),
+ }
+ );
+}
+
Journal::JournalSegmentManager::JournalSegmentManager(
SegmentManager& segment_manager)
: segment_manager{segment_manager}
bool flush)
{
assert(!p_current_batch->is_submitting());
+ record_batch_stats.increment(
+ p_current_batch->get_num_records() + 1);
auto write_fut = [this, flush, record=std::move(record), &rsize]() mutable {
if (flush && p_current_batch->is_empty()) {
// fast path with direct write
#include <seastar/core/circular_buffer.hh>
#include <seastar/core/future.hh>
+#include <seastar/core/metrics.hh>
#include <seastar/core/shared_future.hh>
#include "include/ceph_assert.h"
using close_ertr = crimson::errorator<
crimson::ct_error::input_output_error>;
close_ertr::future<> close() {
+ metrics.clear();
return journal_segment_manager.close();
}
// OVERFLOW: outstanding_io > io_depth_limit is impossible
};
+ struct grouped_io_stats {
+ uint64_t num_io = 0;
+ uint64_t num_io_grouped = 0;
+
+ void increment(uint64_t num_grouped_io) {
+ ++num_io;
+ num_io_grouped += num_grouped_io;
+ }
+ };
+
public:
RecordSubmitter(std::size_t io_depth,
std::size_t batch_capacity,
std::size_t batch_flush_size,
JournalSegmentManager&);
+ grouped_io_stats get_record_batch_stats() const {
+ return record_batch_stats;
+ }
+
+ grouped_io_stats get_io_depth_stats() const {
+ return io_depth_stats;
+ }
+
+ void reset_stats() {
+ record_batch_stats = {};
+ io_depth_stats = {};
+ }
+
void set_write_pipeline(WritePipeline *_write_pipeline) {
write_pipeline = _write_pipeline;
}
void increment_io() {
++num_outstanding_io;
+ io_depth_stats.increment(num_outstanding_io);
update_state();
}
RecordBatch* p_current_batch = nullptr;
seastar::circular_buffer<RecordBatch*> free_batch_ptrs;
std::optional<seastar::promise<> > wait_submit_promise;
+
+ grouped_io_stats record_batch_stats;
+ grouped_io_stats io_depth_stats;
};
SegmentProvider* segment_provider = nullptr;
JournalSegmentManager journal_segment_manager;
RecordSubmitter record_submitter;
ExtentReader& scanner;
+ seastar::metrics::metric_group metrics;
/// return ordered vector of segments to replay
using replay_segments_t = std::vector<
record_header_t header,
const bufferlist &bl);
-private:
/// replays records starting at start through end of segment
replay_ertr::future<>
replay_segment(
segment_header_t header, ///< [in] segment header
delta_handler_t &delta_handler ///< [in] processes deltas in order
);
+
+ void register_metrics();
};
using JournalRef = std::unique_ptr<Journal>;