From: Pere Diaz Bou Date: Wed, 3 Jul 2024 10:59:20 +0000 (+0200) Subject: test/allocsim: worker threads, op hashed by client X-Git-Tag: v20.0.0~1596^2~1 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=100e44dbb1fb145fc1ada9044b93c5f525e6089f;p=ceph.git test/allocsim: worker threads, op hashed by client Signed-off-by: Pere Diaz Bou --- diff --git a/src/test/objectstore/allocsim/ops_replayer.cc b/src/test/objectstore/allocsim/ops_replayer.cc index 9f8f56ccee5..18a1739e42f 100644 --- a/src/test/objectstore/allocsim/ops_replayer.cc +++ b/src/test/objectstore/allocsim/ops_replayer.cc @@ -1,6 +1,7 @@ #include #include #include +#include #include #include #include @@ -197,12 +198,77 @@ void parse_entry_point(shared_ptr context) { } } +void worker_thread_entry(uint64_t id, uint64_t nworker_threads, vector &ops, uint64_t max_buffer_size, uint64_t io_depth, librados::IoCtx* io) { + + bufferlist bl; + gen_buffer(bl, max_buffer_size); + hash hasher; + + cout << "starting thread " << io_depth << endl; + for (auto &op : ops) { + { + std::unique_lock lock(in_flight_mutex); + cv.wait(lock, [&io_depth] { return in_flight_ops < io_depth; }); + + } + size_t key = hasher(*op.who) % nworker_threads; + if (key != id) { + continue; + } + // cout << fmt::format("Running op {} object={} range={}~{}", op.type, *op.object, op.offset, op.length) << endl; + op.completion = librados::Rados::aio_create_completion(static_cast(&op), completion_cb); + switch (op.type) { + case Write: { + int ret = io->aio_write(*op.object, op.completion, bl, op.length, op.offset); + if (ret != 0) { + cout << fmt::format("Error writing ecode={}", ret) << endl;; + } + break; + } + case WriteFull: { + int ret = io->aio_write_full(*op.object, op.completion, bl); + if (ret != 0) { + cout << fmt::format("Error writing full ecode={}", ret) << endl;; + } + break; + } + case Read: { + bufferlist read; + int ret = io->aio_read(*op.object, op.completion, &op.read_bl, op.length, op.offset); + if (ret != 0) { + cout << fmt::format("Error reading ecode={}", ret) << endl;; + } + break; + } + case Truncate: { + librados::ObjectWriteOperation write_operation; + write_operation.truncate(op.offset); + int ret = io->aio_operate(*op.object, op.completion, &write_operation); + if (ret != 0) { + cout << fmt::format("Error truncating ecode={}", ret) << endl;; + } + break; + } + case Zero: { + librados::ObjectWriteOperation write_operation; + write_operation.zero(op.offset, op.length); + int ret = io->aio_operate(*op.object, op.completion, &write_operation); + if (ret != 0) { + cout << fmt::format("Error zeroing ecode={}", ret) << endl;; + } + break; + } + } + in_flight_ops++; + } +} + int main(int argc, char** argv) { vector ops; librados::Rados cluster; librados::IoCtx io; uint64_t max_buffer_size = 0; - uint64_t io_depth = 64; + uint64_t io_depth = 8; string file; std::filesystem::path ceph_conf_path; @@ -255,8 +321,6 @@ int main(int argc, char** argv) { context->ops.clear(); } - - int ret = cluster.init2("client.admin", "ceph", 0); if (ret < 0) { std::cerr << "Couldn't init ceph! error " << ret << std::endl; @@ -286,60 +350,13 @@ int main(int argc, char** argv) { // process ops // Create a buffer big enough for every operation. We will take enoguh bytes from it for every operation - bufferlist bl; - gen_buffer(bl, max_buffer_size); - - for (auto &op : ops) { - { - std::unique_lock lock(in_flight_mutex); - cv.wait(lock, [&io_depth] { return in_flight_ops < io_depth; }); - - } - // cout << fmt::format("Running op {} object={} range={}~{}", op.type, *op.object, op.offset, op.length) << endl; - op.completion = librados::Rados::aio_create_completion(static_cast(&op), completion_cb); - switch (op.type) { - case Write: { - int ret = io.aio_write(*op.object, op.completion, bl, op.length, op.offset); - if (ret != 0) { - cout << fmt::format("Error writing ecode={}", ret) << endl;; - } - break; - } - case WriteFull: { - int ret = io.aio_write_full(*op.object, op.completion, bl); - if (ret != 0) { - cout << fmt::format("Error writing full ecode={}", ret) << endl;; - } - break; - } - case Read: { - bufferlist read; - int ret = io.aio_read(*op.object, op.completion, &op.read_bl, op.length, op.offset); - if (ret != 0) { - cout << fmt::format("Error reading ecode={}", ret) << endl;; - } - break; - } - case Truncate: { - librados::ObjectWriteOperation write_operation; - write_operation.truncate(op.offset); - int ret = io.aio_operate(*op.object, op.completion, &write_operation); - if (ret != 0) { - cout << fmt::format("Error truncating ecode={}", ret) << endl;; - } - break; - } - case Zero: { - librados::ObjectWriteOperation write_operation; - write_operation.zero(op.offset, op.length); - int ret = io.aio_operate(*op.object, op.completion, &write_operation); - if (ret != 0) { - cout << fmt::format("Error zeroing ecode={}", ret) << endl;; - } - break; - } - } - in_flight_ops++; + vector worker_threads; + uint64_t nworker_threads = 16; + for (int i = 0; i < nworker_threads; i++) { + worker_threads.push_back(thread(worker_thread_entry, i, nworker_threads, std::ref(ops), max_buffer_size, io_depth, &io)); + } + for (auto& worker : worker_threads) { + worker.join(); } while (in_flight_ops > 0) { std::this_thread::sleep_for(std::chrono::milliseconds(100));