From 3f4ffe0703e8482d2784102ea3a3cb57b8364673 Mon Sep 17 00:00:00 2001 From: Pere Diaz Bou Date: Tue, 2 Jul 2024 12:56:51 +0200 Subject: [PATCH] test/allocsim: osd op scraper replayer For now this is a simple single threaded replayer with a 64 iodepth where we load all ops from a file and try to push them in order as soon as we can. Signed-off-by: Pere Diaz Bou --- src/test/objectstore/allocsim/ops_replayer.cc | 230 ++++++++++++++++++ 1 file changed, 230 insertions(+) create mode 100644 src/test/objectstore/allocsim/ops_replayer.cc diff --git a/src/test/objectstore/allocsim/ops_replayer.cc b/src/test/objectstore/allocsim/ops_replayer.cc new file mode 100644 index 00000000000..687d27a0d76 --- /dev/null +++ b/src/test/objectstore/allocsim/ops_replayer.cc @@ -0,0 +1,230 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + + +using namespace std; +using namespace ceph; + + +static map> string_cache; +static std::atomic in_flight_ops(0); +static std::condition_variable cv; +static std::mutex in_flight_mutex; + +enum op_type { + Write, + Read +}; + +struct Op { + time_t at; + op_type type; + uint64_t offset; + uint64_t length; + shared_ptr object; + shared_ptr collection; + shared_ptr who; + librados::AioCompletion *completion; + bufferlist read_bl; + + Op( + time_t at, + op_type type, + uint64_t offset, + uint64_t length, + shared_ptr object, + shared_ptr collection, + shared_ptr who + ) : at(at), type(type), offset(offset), length(length), object(object), collection(collection), who(who), completion(nullptr) {} + +}; + +void gen_buffer(bufferlist& bl, uint64_t size) { + std::unique_ptr buffer = std::make_unique(size); + std::independent_bits_engine e; + std::generate(buffer.get(), buffer.get()+size, std::ref(e)); + bl.append(buffer.get(), size); +} + +void completion_cb(librados::completion_t cb, void *arg) { + Op *op = static_cast(arg); + // Process the completed operation here + std::cout << fmt::format("Completed op {} object={} range={}~{}", op->type, *op->object, op->offset, op->length) << std::endl; + + delete op->completion; + op->completion = nullptr; + if (op->type == Read) { + op->read_bl.clear(); + } + + { + std::lock_guard lock(in_flight_mutex); + in_flight_ops--; + } + cv.notify_one(); +} + +int main(int argc, char** argv) { + vector ops; + librados::Rados cluster; + librados::IoCtx io; + uint64_t max_buffer_size = 0; + uint64_t io_depth = 64; + string file; + std::filesystem::path ceph_conf_path; + + if (argc < 3) { + cout << fmt::format("usage: ops_replayer file ceph.conf") << endl; + } + file = argv[1]; + ceph_conf_path = argv[2]; + cout << file << endl; + + + + string date, time, who, type, range, object, collection; + ifstream fstream(file, ifstream::in); + const char* date_format_first_column = "%Y-%m-%d"; + // we expect this input: + // 2024-05-10 12:06:24.990831+00:00 client.607247697.0:5632274 write 4096~4096 2:d03a455a:::08b0f2fd5f20f504e76c2dd3d24683a1:head 2.1c0b + while (fstream >> date){ + cout << date << endl; + tm t; + char* res = strptime(date.c_str(), date_format_first_column, &t); + if (res == nullptr) { + fstream.ignore(std::numeric_limits::max(), '\n'); + continue; + } + fstream >> time >> who >> type >> range >> object >> collection; + + date += " " + time; + cout << date << endl; + // FIXME: this is wrong but it returns a reasonable bad timestamp :P + const char* date_format_full = "%Y-%m-%d %H:%M:%S.%f%z"; + res = strptime(date.c_str(), date_format_full, &t); + time_t at = mktime(&t); + + cout << fmt::format("{} {} {} {} {} {} {}", date, at, who, type, range, object, collection) << endl; + + shared_ptr who_ptr = make_shared(who); + auto who_it = string_cache.find(who); + if (who_it == string_cache.end()) { + string_cache.insert({ who, who_ptr }); + } else { + who_ptr = who_it->second; + } + + shared_ptr object_ptr = make_shared(object); + auto object_it = string_cache.find(object); + if (object_it == string_cache.end()) { + string_cache.insert({ object, object_ptr }); + } else { + object_ptr = object_it->second; + } + + shared_ptr collection_ptr = make_shared(collection); + auto collection_it = string_cache.find(collection); + if (collection_it == string_cache.end()) { + string_cache.insert({ collection, collection_ptr }); + } else { + collection_ptr = collection_it->second; + } + + uint64_t offset = 0, length = 0; + stringstream range_stream(range); + string offset_str, length_str; + getline(range_stream, offset_str, '~'); + getline(range_stream, length_str, '~'); + offset = stoll(offset_str); + length = stoll(length_str); + + max_buffer_size = max(length, max_buffer_size); + + op_type ot = type == "write" ? Write : Read; + ops.push_back(Op(at, ot, offset, length, object_ptr, collection_ptr, who_ptr)); + } + + int ret = cluster.init2("client.admin", "ceph", 0); + if (ret < 0) { + std::cerr << "Couldn't init ceph! error " << ret << std::endl; + return EXIT_FAILURE; + } + std::cout << "cluster init ready" << std::endl; + + ret = cluster.conf_read_file(ceph_conf_path.c_str()); + if (ret < 0) { + std::cerr << "Couldn't read the Ceph configuration file! error " << ret << std::endl; + return EXIT_FAILURE; + } + std::cout << "cluster config ready" << std::endl; + ret = cluster.connect(); + if (ret < 0) { + std::cerr << "Couldn't connect to cluster! error " << ret << std::endl; + return EXIT_FAILURE; + } + std::cout << "cluster connect ready" << std::endl; + cluster.ioctx_create("test_pool", io); + if (ret < 0) { + std::cerr << "Couldn't set up ioctx! error " << ret << std::endl; + exit(EXIT_FAILURE); + } + std::cout << "test-pool ready" << std::endl; + + + // process ops + // Create a buffer big enough for every operation. We will take enoguh bytes from it for every operation + bufferlist bl; + gen_buffer(bl, max_buffer_size); + + for (auto &op : ops) { + { + std::unique_lock lock(in_flight_mutex); + cv.wait(lock, [&io_depth] { return in_flight_ops < io_depth; }); + + } + cout << fmt::format("Running op {} object={} range={}~{}", op.type, *op.object, op.offset, op.length) << endl; + op.completion = librados::Rados::aio_create_completion(static_cast(&op), completion_cb); + switch (op.type) { + case Write: { + int ret = io.aio_write(*op.object, op.completion, bl, op.length, op.offset); + if (ret != 0) { + cout << fmt::format("Error writing ecode={}", ret) << endl;; + } + break; + } + case Read: { + bufferlist read; + int ret = io.aio_read(*op.object, op.completion, &op.read_bl, op.length, op.offset); + if (ret != 0) { + cout << fmt::format("Error reading ecode={}", ret) << endl;; + } + break; + } + } + in_flight_ops++; + } + while (in_flight_ops > 0) { + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + // io.write(const std::string &oid, bufferlist &bl, size_t len, uint64_t off) + + cout << ops.size() << endl; + return 0; +} -- 2.39.5