From: Pere Diaz Bou Date: Thu, 4 Jul 2024 10:20:07 +0000 (+0200) Subject: test/allocsim: ops scrapper arg parsing X-Git-Tag: testing/wip-vshankar-testing-20240718.183435-debug~93^2 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=93888d3940bce6499d00666a42636c37a59c1501;p=ceph-ci.git test/allocsim: ops scrapper arg parsing ``` fedora :: pere/scrapper-replayer/build 130 ยป ./bin/replayer -h All options: Options: -h [ --help ] produce help message -i arg (=input.txt) Input file (output of op_scraper.py) --ceph-conf arg (=ceph.conf) Path to ceph conf --io-depth arg (=64) I/O depth --parser-threads arg (=16) Number of parser threads --worker-threads arg (=16) Number of I/O worker threads --pool arg (=test_pool) Pool to use for I/O ``` Signed-off-by: Pere Diaz Bou --- diff --git a/src/test/objectstore/CMakeLists.txt b/src/test/objectstore/CMakeLists.txt index 8a6a0e7ed91..bddff3f6727 100644 --- a/src/test/objectstore/CMakeLists.txt +++ b/src/test/objectstore/CMakeLists.txt @@ -24,6 +24,8 @@ target_link_libraries(ceph_test_objectstore install(TARGETS ceph_test_objectstore DESTINATION ${CMAKE_INSTALL_BINDIR}) +add_subdirectory(allocsim) + add_executable(ceph_test_keyvaluedb test_kv.cc) target_link_libraries(ceph_test_keyvaluedb diff --git a/src/test/objectstore/allocsim/CMakeLists.txt b/src/test/objectstore/allocsim/CMakeLists.txt new file mode 100644 index 00000000000..cbfbc698863 --- /dev/null +++ b/src/test/objectstore/allocsim/CMakeLists.txt @@ -0,0 +1,10 @@ + + +add_executable(replayer ops_replayer.cc) + +target_link_libraries(replayer + PRIVATE + fmt + librados + Boost::program_options +) diff --git a/src/test/objectstore/allocsim/ops_replayer.cc b/src/test/objectstore/allocsim/ops_replayer.cc index d9263076345..cafffa1ee0e 100644 --- a/src/test/objectstore/allocsim/ops_replayer.cc +++ b/src/test/objectstore/allocsim/ops_replayer.cc @@ -1,5 +1,6 @@ #include #include +#include #include #include #include @@ -20,6 +21,11 @@ #include #include +#include +#include + +namespace po = boost::program_options; + using namespace std; using namespace ceph; @@ -111,176 +117,202 @@ void completion_cb(librados::completion_t cb, void *arg) { } void parse_entry_point(shared_ptr context) { - string date, time, who, type, range, object, collection; - MemoryInputStream fstream(context->start, context->end); - const char* date_format_first_column = "%Y-%m-%d"; - // we expect this input: - // 2024-05-10 12:06:24.990831+00:00 client.607247697.0:5632274 write 4096~4096 2:d03a455a:::08b0f2fd5f20f504e76c2dd3d24683a1:head 2.1c0b - while (fstream >> date){ - // cout << date << endl; - tm t; - char* res = strptime(date.c_str(), date_format_first_column, &t); - if (res == nullptr) { - fstream.ignore(std::numeric_limits::max(), '\n'); - continue; - } - fstream >> time >> who >> type >> range >> object >> collection; - - date += " " + time; - // cout << date << endl; - // FIXME: this is wrong but it returns a reasonable bad timestamp :P - const char* date_format_full = "%Y-%m-%d %H:%M:%S.%f%z"; - res = strptime(date.c_str(), date_format_full, &t); - time_t at = mktime(&t); - - // cout << fmt::format("{} {} {} {} {} {} {}", date, at, who, type, range, object, collection) << endl; - - shared_ptr who_ptr = make_shared(who); - auto who_it = string_cache.find(who); - if (who_it == string_cache.end()) { - string_cache.insert({ who, who_ptr }); - } else { - who_ptr = who_it->second; - } + cout << fmt::format("Starting parser thread start={:p} end={:p}", context->start, context->end) << endl; + + string date, time, who, type, range, object, collection; + MemoryInputStream fstream(context->start, context->end); + const char* date_format_first_column = "%Y-%m-%d"; + // we expect this input: + // 2024-05-10 12:06:24.990831+00:00 client.607247697.0:5632274 write 4096~4096 2:d03a455a:::08b0f2fd5f20f504e76c2dd3d24683a1:head 2.1c0b + while (fstream >> date){ + // cout << date << endl; + tm t; + char* res = strptime(date.c_str(), date_format_first_column, &t); + if (res == nullptr) { + fstream.ignore(std::numeric_limits::max(), '\n'); + continue; + } + fstream >> time >> who >> type >> range >> object >> collection; + + date += " " + time; + // cout << date << endl; + // FIXME: this is wrong but it returns a reasonable bad timestamp :P + const char* date_format_full = "%Y-%m-%d %H:%M:%S.%f%z"; + res = strptime(date.c_str(), date_format_full, &t); + time_t at = mktime(&t); + + // cout << fmt::format("{} {} {} {} {} {} {}", date, at, who, type, range, object, collection) << endl; + + shared_ptr who_ptr = make_shared(who); + auto who_it = string_cache.find(who); + if (who_it == string_cache.end()) { + string_cache.insert({ who, who_ptr }); + } else { + who_ptr = who_it->second; + } - shared_ptr object_ptr = make_shared(object); - auto object_it = string_cache.find(object); - if (object_it == string_cache.end()) { - string_cache.insert({ object, object_ptr }); - } else { - object_ptr = object_it->second; - } + shared_ptr object_ptr = make_shared(object); + auto object_it = string_cache.find(object); + if (object_it == string_cache.end()) { + string_cache.insert({ object, object_ptr }); + } else { + object_ptr = object_it->second; + } - op_type ot; - if (type == "write") { - ot = Write; - } else if (type == "writefull") { - ot = WriteFull; - } else if (type == "read") { - ot = Read; - } else if (type == "sparse-read") { - ot = Read; - } else if (type == "truncate") { - ot = Truncate; - } else if (type == "zero") { - ot = Zero; - } else { - cout << "invalid type " << type << endl; - exit(1); - } + op_type ot; + if (type == "write") { + ot = Write; + } else if (type == "writefull") { + ot = WriteFull; + } else if (type == "read") { + ot = Read; + } else if (type == "sparse-read") { + ot = Read; + } else if (type == "truncate") { + ot = Truncate; + } else if (type == "zero") { + ot = Zero; + } else { + cout << "invalid type " << type << endl; + exit(1); + } - shared_ptr collection_ptr = make_shared(collection); - auto collection_it = string_cache.find(collection); - if (collection_it == string_cache.end()) { - string_cache.insert({ collection, collection_ptr }); - } else { - collection_ptr = collection_it->second; - } + shared_ptr collection_ptr = make_shared(collection); + auto collection_it = string_cache.find(collection); + if (collection_it == string_cache.end()) { + string_cache.insert({ collection, collection_ptr }); + } else { + collection_ptr = collection_it->second; + } - uint64_t offset = 0, length = 0; - stringstream range_stream(range); - string offset_str, length_str; - getline(range_stream, offset_str, '~'); - offset = stoll(offset_str); + uint64_t offset = 0, length = 0; + stringstream range_stream(range); + string offset_str, length_str; + getline(range_stream, offset_str, '~'); + offset = stoll(offset_str); - if (ot != Truncate) { - // Truncate doesn't only has one number - getline(range_stream, length_str, '~'); - length = stoll(length_str); - } + if (ot != Truncate) { + // Truncate only has one number + getline(range_stream, length_str, '~'); + length = stoll(length_str); + } - context->max_buffer_size = max(length, context->max_buffer_size); + context->max_buffer_size = max(length, context->max_buffer_size); - context->ops.push_back(Op(at, ot, offset, length, object_ptr, collection_ptr, who_ptr)); - } + context->ops.push_back(Op(at, ot, offset, length, object_ptr, collection_ptr, who_ptr)); + } } void worker_thread_entry(uint64_t id, uint64_t nworker_threads, vector &ops, uint64_t max_buffer_size, uint64_t io_depth, librados::IoCtx* io) { - - bufferlist bl; - gen_buffer(bl, max_buffer_size); - hash hasher; - - cout << "starting thread " << io_depth << endl; - for (auto &op : ops) { - { - std::unique_lock lock(in_flight_mutex); - cv.wait(lock, [&io_depth] { return in_flight_ops < io_depth; }); - + // Create a buffer big enough for every operation. We will take enoguh bytes from it for every operation + bufferlist bl; + gen_buffer(bl, max_buffer_size); + hash hasher; + + cout << fmt::format("Starting thread {} with io_depth={} max_buffer_size={}", id, io_depth, max_buffer_size) << endl; + for (auto &op : ops) { + { + std::unique_lock lock(in_flight_mutex); + cv.wait(lock, [&io_depth] { return in_flight_ops < io_depth; }); + } + size_t key = hasher(*op.who) % nworker_threads; + if (key != id) { + continue; + } + // cout << fmt::format("Running op {} object={} range={}~{}", op.type, *op.object, op.offset, op.length) << endl; + op.completion = librados::Rados::aio_create_completion(static_cast(&op), completion_cb); + switch (op.type) { + case Write: { + bufferlist trimmed; + trimmed.substr_of(bl, 0, op.length); + int ret = io->aio_write(*op.object, op.completion, trimmed, op.length, op.offset); + if (ret != 0) { + cout << fmt::format("Error writing ecode={}", ret) << endl;; + } + break; } - size_t key = hasher(*op.who) % nworker_threads; - if (key != id) { - continue; + case WriteFull: { + bufferlist trimmed; + trimmed.substr_of(bl, 0, op.length); + int ret = io->aio_write_full(*op.object, op.completion, trimmed); + if (ret != 0) { + cout << fmt::format("Error writing full ecode={}", ret) << endl;; + } + break; } - // cout << fmt::format("Running op {} object={} range={}~{}", op.type, *op.object, op.offset, op.length) << endl; - op.completion = librados::Rados::aio_create_completion(static_cast(&op), completion_cb); - switch (op.type) { - case Write: { - bufferlist trimmed; - trimmed.substr_of(bl, 0, op.length); - int ret = io->aio_write(*op.object, op.completion, trimmed, op.length, op.offset); - if (ret != 0) { - cout << fmt::format("Error writing ecode={}", ret) << endl;; - } - break; + case Read: { + bufferlist read; + int ret = io->aio_read(*op.object, op.completion, &op.read_bl, op.length, op.offset); + if (ret != 0) { + cout << fmt::format("Error reading ecode={}", ret) << endl;; } - case WriteFull: { - bufferlist trimmed; - trimmed.substr_of(bl, 0, op.length); - int ret = io->aio_write_full(*op.object, op.completion, trimmed); + break; + } + case Truncate: { + librados::ObjectWriteOperation write_operation; + write_operation.truncate(op.offset); + int ret = io->aio_operate(*op.object, op.completion, &write_operation); if (ret != 0) { - cout << fmt::format("Error writing full ecode={}", ret) << endl;; + cout << fmt::format("Error truncating ecode={}", ret) << endl;; } break; - } - case Read: { - bufferlist read; - int ret = io->aio_read(*op.object, op.completion, &op.read_bl, op.length, op.offset); + } + case Zero: { + librados::ObjectWriteOperation write_operation; + write_operation.zero(op.offset, op.length); + int ret = io->aio_operate(*op.object, op.completion, &write_operation); if (ret != 0) { - cout << fmt::format("Error reading ecode={}", ret) << endl;; + cout << fmt::format("Error zeroing ecode={}", ret) << endl;; } break; - } - case Truncate: { - librados::ObjectWriteOperation write_operation; - write_operation.truncate(op.offset); - int ret = io->aio_operate(*op.object, op.completion, &write_operation); - if (ret != 0) { - cout << fmt::format("Error truncating ecode={}", ret) << endl;; - } - break; - } - case Zero: { - librados::ObjectWriteOperation write_operation; - write_operation.zero(op.offset, op.length); - int ret = io->aio_operate(*op.object, op.completion, &write_operation); - if (ret != 0) { - cout << fmt::format("Error zeroing ecode={}", ret) << endl;; - } - break; - } } - in_flight_ops++; } + in_flight_ops++; + } +} + +void usage(po::options_description &desc) { + cout << desc << std::endl; } int main(int argc, char** argv) { vector ops; librados::Rados cluster; librados::IoCtx io; - uint64_t max_buffer_size = 0; - uint64_t io_depth = 8; - string file; - std::filesystem::path ceph_conf_path; + uint64_t max_buffer_size = 0; // We can use a single buffer for writes and trim it at will. The buffer will be the size of the maximum length op. - if (argc < 3) { - cout << fmt::format("usage: ops_replayer file ceph.conf") << endl; + // options + uint64_t io_depth = 8; + uint64_t nparser_threads = 16; + uint64_t nworker_threads = 16; + string file("input.txt"); + string ceph_conf_path("./ceph.conf"); + string pool("test_pool"); + + po::options_description po_options("Options"); + po_options.add_options() + ("help,h", "produce help message") + (",i", po::value(&file)->default_value("input.txt"), "Input file (output of op_scraper.py)") + ("ceph-conf", po::value(&ceph_conf_path)->default_value("ceph.conf"), "Path to ceph conf") + ("io-depth", po::value(&io_depth)->default_value(64), "I/O depth") + ("parser-threads", po::value(&nparser_threads)->default_value(16), "Number of parser threads") + ("worker-threads", po::value(&nworker_threads)->default_value(16), "Number of I/O worker threads") + ("pool", po::value(&pool)->default_value("test_pool"), "Pool to use for I/O") + ; + + po::options_description po_all("All options"); + po_all.add(po_options); + + po::variables_map vm; + po::parsed_options parsed = po::command_line_parser(argc, argv).options(po_all).allow_unregistered().run(); + po::store( parsed, vm); + po::notify(vm); + if (vm.count("help")) { + usage(po_all); + exit(EXIT_SUCCESS); } - file = argv[1]; - ceph_conf_path = argv[2]; - cout << file << endl; - uint64_t nthreads = 16; + // Parse input file vector parser_threads; vector> parser_contexts; int fd = open(file.c_str(), O_RDONLY); @@ -294,13 +326,13 @@ int main(int argc, char** argv) { cout << "error mapping buffer" << endl; } uint64_t start_offset = 0; - uint64_t step_size = file_stat.st_size / nthreads; - for (int i = 0; i < nthreads; i++) { + uint64_t step_size = file_stat.st_size / nparser_threads; + for (int i = 0; i < nparser_threads; i++) { char* end = mapped_buffer + start_offset + step_size; while(*end != '\n') { end--; } - if (i == nthreads-1) { + if (i == nparser_threads - 1) { end = mapped_buffer + file_stat.st_size; } shared_ptr context = make_shared(); @@ -314,6 +346,7 @@ int main(int argc, char** argv) { for (auto& t : parser_threads) { t.join(); } + // reduce for (auto context : parser_contexts) { string_cache.insert(context->string_cache.begin(), context->string_cache.end()); ops.insert(ops.end(), context->ops.begin(), context->ops.end()); @@ -341,18 +374,16 @@ int main(int argc, char** argv) { return EXIT_FAILURE; } std::cout << "cluster connect ready" << std::endl; - cluster.ioctx_create("test_pool", io); + cluster.ioctx_create(pool.c_str(), io); if (ret < 0) { std::cerr << "Couldn't set up ioctx! error " << ret << std::endl; exit(EXIT_FAILURE); } - std::cout << "test-pool ready" << std::endl; + std::cout << fmt::format("pool {} ready", pool) << std::endl; // process ops - // Create a buffer big enough for every operation. We will take enoguh bytes from it for every operation vector worker_threads; - uint64_t nworker_threads = 16; for (int i = 0; i < nworker_threads; i++) { worker_threads.push_back(thread(worker_thread_entry, i, nworker_threads, std::ref(ops), max_buffer_size, io_depth, &io)); } @@ -362,7 +393,6 @@ int main(int argc, char** argv) { while (in_flight_ops > 0) { std::this_thread::sleep_for(std::chrono::milliseconds(100)); } - // io.write(const std::string &oid, bufferlist &bl, size_t len, uint64_t off) cout << ops.size() << endl; return 0;