From 6bd4b29fa1ed7faf5791ef7055c56d124d2b6ab8 Mon Sep 17 00:00:00 2001 From: Pere Diaz Bou Date: Wed, 3 Jul 2024 12:21:46 +0200 Subject: [PATCH] test/allocsim: mmap threaded parser Signed-off-by: Pere Diaz Bou --- src/test/objectstore/allocsim/ops_replayer.cc | 234 ++++++++++++------ 1 file changed, 153 insertions(+), 81 deletions(-) diff --git a/src/test/objectstore/allocsim/ops_replayer.cc b/src/test/objectstore/allocsim/ops_replayer.cc index 382ebdd948b..9f8f56ccee5 100644 --- a/src/test/objectstore/allocsim/ops_replayer.cc +++ b/src/test/objectstore/allocsim/ops_replayer.cc @@ -1,5 +1,9 @@ #include #include +#include +#include +#include +#include #include #include #include @@ -59,6 +63,30 @@ struct Op { }; +struct ParserContext { + map> string_cache; + vector ops; + char *start; // starts and ends in new line or eof + char *end; + uint64_t max_buffer_size; +}; + +class MemoryStreamBuf : public std::streambuf { +public: + MemoryStreamBuf(const char* start, const char* end) { + this->setg(const_cast(start), const_cast(start), const_cast(end)); + } +}; + +class MemoryInputStream : public std::istream { + MemoryStreamBuf _buffer; +public: + MemoryInputStream(const char* start, const char* end) + : std::istream(&_buffer), _buffer(start, end) { + rdbuf(&_buffer); + } +}; + void gen_buffer(bufferlist& bl, uint64_t size) { std::unique_ptr buffer = std::make_unique(size); std::independent_bits_engine e; @@ -69,7 +97,7 @@ void gen_buffer(bufferlist& bl, uint64_t size) { void completion_cb(librados::completion_t cb, void *arg) { Op *op = static_cast(arg); // Process the completed operation here - std::cout << fmt::format("Completed op {} object={} range={}~{}", op->type, *op->object, op->offset, op->length) << std::endl; + // std::cout << fmt::format("Completed op {} object={} range={}~{}", op->type, *op->object, op->offset, op->length) << std::endl; delete op->completion; op->completion = nullptr; @@ -84,6 +112,91 @@ void completion_cb(librados::completion_t cb, void *arg) { cv.notify_one(); } +void parse_entry_point(shared_ptr context) { + string date, time, who, type, range, object, collection; + MemoryInputStream fstream(context->start, context->end); + const char* date_format_first_column = "%Y-%m-%d"; + // we expect this input: + // 2024-05-10 12:06:24.990831+00:00 client.607247697.0:5632274 write 4096~4096 2:d03a455a:::08b0f2fd5f20f504e76c2dd3d24683a1:head 2.1c0b + while (fstream >> date){ + // cout << date << endl; + tm t; + char* res = strptime(date.c_str(), date_format_first_column, &t); + if (res == nullptr) { + fstream.ignore(std::numeric_limits::max(), '\n'); + continue; + } + fstream >> time >> who >> type >> range >> object >> collection; + + date += " " + time; + // cout << date << endl; + // FIXME: this is wrong but it returns a reasonable bad timestamp :P + const char* date_format_full = "%Y-%m-%d %H:%M:%S.%f%z"; + res = strptime(date.c_str(), date_format_full, &t); + time_t at = mktime(&t); + + // cout << fmt::format("{} {} {} {} {} {} {}", date, at, who, type, range, object, collection) << endl; + + shared_ptr who_ptr = make_shared(who); + auto who_it = string_cache.find(who); + if (who_it == string_cache.end()) { + string_cache.insert({ who, who_ptr }); + } else { + who_ptr = who_it->second; + } + + shared_ptr object_ptr = make_shared(object); + auto object_it = string_cache.find(object); + if (object_it == string_cache.end()) { + string_cache.insert({ object, object_ptr }); + } else { + object_ptr = object_it->second; + } + + op_type ot; + if (type == "write") { + ot = Write; + } else if (type == "writefull") { + ot = WriteFull; + } else if (type == "read") { + ot = Read; + } else if (type == "sparse-read") { + ot = Read; + } else if (type == "truncate") { + ot = Truncate; + } else if (type == "zero") { + ot = Zero; + } else { + cout << "invalid type " << type << endl; + exit(1); + } + + shared_ptr collection_ptr = make_shared(collection); + auto collection_it = string_cache.find(collection); + if (collection_it == string_cache.end()) { + string_cache.insert({ collection, collection_ptr }); + } else { + collection_ptr = collection_it->second; + } + + uint64_t offset = 0, length = 0; + stringstream range_stream(range); + string offset_str, length_str; + getline(range_stream, offset_str, '~'); + offset = stoll(offset_str); + + if (ot != Truncate) { + // Truncate doesn't only has one number + getline(range_stream, length_str, '~'); + length = stoll(length_str); + } + + context->max_buffer_size = max(length, context->max_buffer_size); + + context->ops.push_back(Op(at, ot, offset, length, object_ptr, collection_ptr, who_ptr)); + } +} + int main(int argc, char** argv) { vector ops; librados::Rados cluster; @@ -100,90 +213,49 @@ int main(int argc, char** argv) { ceph_conf_path = argv[2]; cout << file << endl; - - - string date, time, who, type, range, object, collection; - ifstream fstream(file, ifstream::in); - const char* date_format_first_column = "%Y-%m-%d"; - // we expect this input: - // 2024-05-10 12:06:24.990831+00:00 client.607247697.0:5632274 write 4096~4096 2:d03a455a:::08b0f2fd5f20f504e76c2dd3d24683a1:head 2.1c0b - while (fstream >> date){ - // cout << date << endl; - tm t; - char* res = strptime(date.c_str(), date_format_first_column, &t); - if (res == nullptr) { - fstream.ignore(std::numeric_limits::max(), '\n'); - continue; - } - fstream >> time >> who >> type >> range >> object >> collection; - - date += " " + time; - // cout << date << endl; - // FIXME: this is wrong but it returns a reasonable bad timestamp :P - const char* date_format_full = "%Y-%m-%d %H:%M:%S.%f%z"; - res = strptime(date.c_str(), date_format_full, &t); - time_t at = mktime(&t); - - // cout << fmt::format("{} {} {} {} {} {} {}", date, at, who, type, range, object, collection) << endl; - - shared_ptr who_ptr = make_shared(who); - auto who_it = string_cache.find(who); - if (who_it == string_cache.end()) { - string_cache.insert({ who, who_ptr }); - } else { - who_ptr = who_it->second; - } - - shared_ptr object_ptr = make_shared(object); - auto object_it = string_cache.find(object); - if (object_it == string_cache.end()) { - string_cache.insert({ object, object_ptr }); - } else { - object_ptr = object_it->second; - } - - op_type ot; - if (type == "write") { - ot = Write; - } else if (type == "writefull") { - ot = WriteFull; - } else if (type == "read") { - ot = Read; - } else if (type == "sparse-read") { - ot = Read; - } else if (type == "truncate") { - ot = Truncate; - } else if (type == "zero") { - ot = Zero; - } else { - cout << "invalid type " << type << endl; - exit(1); - } - - shared_ptr collection_ptr = make_shared(collection); - auto collection_it = string_cache.find(collection); - if (collection_it == string_cache.end()) { - string_cache.insert({ collection, collection_ptr }); - } else { - collection_ptr = collection_it->second; + uint64_t nthreads = 16; + vector parser_threads; + vector> parser_contexts; + int fd = open(file.c_str(), O_RDONLY); + if (fd == -1) { + cout << "Error opening file" << endl; + } + struct stat file_stat; + fstat(fd, &file_stat); + char* mapped_buffer = (char*)mmap(NULL, file_stat.st_size, PROT_READ, MAP_SHARED, fd, 0); + if (mapped_buffer == nullptr) { + cout << "error mapping buffer" << endl; + } + uint64_t start_offset = 0; + uint64_t step_size = file_stat.st_size / nthreads; + for (int i = 0; i < nthreads; i++) { + char* end = mapped_buffer + start_offset + step_size; + while(*end != '\n') { + end--; } - - uint64_t offset = 0, length = 0; - stringstream range_stream(range); - string offset_str, length_str; - getline(range_stream, offset_str, '~'); - offset = stoll(offset_str); - - if (ot != Truncate) { - // Truncate doesn't only has one number - getline(range_stream, length_str, '~'); - length = stoll(length_str); + if (i == nthreads-1) { + end = mapped_buffer + file_stat.st_size; } + shared_ptr context = make_shared(); + context->start = mapped_buffer + start_offset; + context->end = end; + context->max_buffer_size = 0; + parser_contexts.push_back(context); + parser_threads.push_back(std::thread(parse_entry_point, context)); + start_offset += (end - mapped_buffer - start_offset); + } + for (auto& t : parser_threads) { + t.join(); + } + for (auto context : parser_contexts) { + string_cache.insert(context->string_cache.begin(), context->string_cache.end()); + ops.insert(ops.end(), context->ops.begin(), context->ops.end()); + max_buffer_size = max(context->max_buffer_size, max_buffer_size); + context->string_cache.clear(); + context->ops.clear(); + } - max_buffer_size = max(length, max_buffer_size); - ops.push_back(Op(at, ot, offset, length, object_ptr, collection_ptr, who_ptr)); - } int ret = cluster.init2("client.admin", "ceph", 0); if (ret < 0) { -- 2.39.5