From 627c466915abae53559f6995b54ab9e84a0ac8a9 Mon Sep 17 00:00:00 2001 From: Pere Diaz Bou Date: Tue, 23 Jul 2024 15:19:43 +0200 Subject: [PATCH] test/allocsim: replayer merge files by timestmap Signed-off-by: Pere Diaz Bou --- src/test/objectstore/allocsim/ops_replayer.cc | 112 +++++++++++------- 1 file changed, 67 insertions(+), 45 deletions(-) diff --git a/src/test/objectstore/allocsim/ops_replayer.cc b/src/test/objectstore/allocsim/ops_replayer.cc index 6f4d9501ee9e1..fd947f5c4547a 100644 --- a/src/test/objectstore/allocsim/ops_replayer.cc +++ b/src/test/objectstore/allocsim/ops_replayer.cc @@ -13,8 +13,8 @@ #include #include #include -#include -#include +#include "include/rados/buffer_fwd.h" +#include "include/rados/librados.hpp" #include #include #include @@ -321,6 +321,11 @@ void worker_thread_entry(uint64_t id, uint64_t nworker_threads, vector &ops, } } + +int op_comparison_by_date(const Op& lhs, const Op& rhs) { + return lhs.at < rhs.at; +} + void usage(po::options_description &desc) { cout << desc << std::endl; } @@ -343,7 +348,7 @@ int main(int argc, char** argv) { po::options_description po_options("Options"); po_options.add_options() ("help,h", "produce help message") - (",i", po::value(&file)->default_value("input.txt"), "Input file (output of op_scraper.py)") + ("input-files,i", po::value>()->multitoken(), "List of input files (output of op_scraper.py). Multiple files will be merged and sorted by time order") ("ceph-conf", po::value(&ceph_conf_path)->default_value("ceph.conf"), "Path to ceph conf") ("io-depth", po::value(&io_depth)->default_value(64), "I/O depth") ("parser-threads", po::value(&nparser_threads)->default_value(16), "Number of parser threads") @@ -359,53 +364,73 @@ int main(int argc, char** argv) { po::parsed_options parsed = po::command_line_parser(argc, argv).options(po_all).allow_unregistered().run(); po::store( parsed, vm); po::notify(vm); + if (vm.count("help")) { usage(po_all); exit(EXIT_SUCCESS); } - - // Parse input file - vector parser_threads; - vector> parser_contexts; - int fd = open(file.c_str(), O_RDONLY); - if (fd == -1) { - cout << "Error opening file" << endl; - } - struct stat file_stat; - fstat(fd, &file_stat); - char* mapped_buffer = (char*)mmap(NULL, file_stat.st_size, PROT_READ, MAP_SHARED, fd, 0); - if (mapped_buffer == nullptr) { - cout << "error mapping buffer" << endl; - } - uint64_t start_offset = 0; - uint64_t step_size = file_stat.st_size / nparser_threads; - for (int i = 0; i < nparser_threads; i++) { - char* end = mapped_buffer + start_offset + step_size; - while(*end != '\n') { - end--; + + assert(vm.count("input-files") > 0); + + vector input_files = vm["input-files"].as>(); + + vector> complete_parser_contexts; // list of ALL parser contexts so that shared_ptrs do not die. + for (auto &file : input_files) { + // Parse input file + vector parser_threads; + vector> parser_contexts; + cout << fmt::format("parsing file {}", file) << endl; + int fd = open(file.c_str(), O_RDONLY); + if (fd == -1) { + cout << "Error opening file" << endl; + exit(EXIT_FAILURE); } - if (i == nparser_threads - 1) { - end = mapped_buffer + file_stat.st_size; + struct stat file_stat; + fstat(fd, &file_stat); + char* mapped_buffer = (char*)mmap(NULL, file_stat.st_size, PROT_READ, MAP_SHARED, fd, 0); + if (mapped_buffer == nullptr) { + cout << "error mapping buffer" << endl; + exit(EXIT_FAILURE); } - shared_ptr context = make_shared(); - context->start = mapped_buffer + start_offset; - context->end = end; - context->max_buffer_size = 0; - parser_contexts.push_back(context); - parser_threads.push_back(std::thread(parse_entry_point, context)); - start_offset += (end - mapped_buffer - start_offset); - } - for (auto& t : parser_threads) { - t.join(); + uint64_t start_offset = 0; + uint64_t step_size = file_stat.st_size / nparser_threads; + for (int i = 0; i < nparser_threads; i++) { + char* end = mapped_buffer + start_offset + step_size; + while(*end != '\n') { + end--; + } + if (i == nparser_threads - 1) { + end = mapped_buffer + file_stat.st_size; + } + shared_ptr context = make_shared(); + context->start = mapped_buffer + start_offset; + context->end = end; + context->max_buffer_size = 0; + parser_contexts.push_back(context); + parser_threads.push_back(std::thread(parse_entry_point, context)); + start_offset += (end - mapped_buffer - start_offset); + } + for (auto& t : parser_threads) { + t.join(); + } + // reduce + for (auto context : parser_contexts) { + ops.insert(ops.end(), context->ops.begin(), context->ops.end()); + max_buffer_size = max(context->max_buffer_size, max_buffer_size); + // context->ops.clear(); + } + munmap(mapped_buffer, file_stat.st_size); + complete_parser_contexts.insert(complete_parser_contexts.end(), parser_contexts.begin(), parser_contexts.end()); } - // reduce - for (auto context : parser_contexts) { - ops.insert(ops.end(), context->ops.begin(), context->ops.end()); - max_buffer_size = max(context->max_buffer_size, max_buffer_size); - // context->ops.clear(); + + cout << "Sorting ops by date..." << endl; + sort(ops.begin(), ops.end(), op_comparison_by_date); + cout << "Sorting ops by date done" << endl; + + if (skip_do_ops) { + return EXIT_SUCCESS; } - - + int ret = cluster.init2("client.admin", "ceph", 0); if (ret < 0) { std::cerr << "Couldn't init ceph! error " << ret << std::endl; @@ -433,9 +458,6 @@ int main(int argc, char** argv) { std::cout << fmt::format("pool {} ready", pool) << std::endl; - if (skip_do_ops) { - return EXIT_SUCCESS; - } // process ops vector worker_threads; for (int i = 0; i < nworker_threads; i++) { -- 2.39.5