]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
test/allocsim: worker threads, op hashed by client
authorPere Diaz Bou <pere-altea@hotmail.com>
Wed, 3 Jul 2024 10:59:20 +0000 (12:59 +0200)
committerPere Diaz Bou <pere-altea@hotmail.com>
Wed, 3 Jul 2024 10:59:20 +0000 (12:59 +0200)
Signed-off-by: Pere Diaz Bou <pere-altea@hotmail.com>
src/test/objectstore/allocsim/ops_replayer.cc

index 9f8f56ccee5d8d45b438d7a4cca7f984fdf3bf8c..18a1739e42f2b6cd92169e7f8510458d70eea5ef 100644 (file)
@@ -1,6 +1,7 @@
 #include <algorithm>
 #include <cassert>
 #include <fcntl.h>
+#include <ranges>
 #include <string_view>
 #include <sys/mman.h>
 #include <sys/stat.h>
@@ -197,12 +198,77 @@ void parse_entry_point(shared_ptr<ParserContext> context) {
     }
 }
 
+void worker_thread_entry(uint64_t id, uint64_t nworker_threads, vector<Op> &ops, uint64_t max_buffer_size, uint64_t io_depth, librados::IoCtx* io) {
+
+    bufferlist bl;
+    gen_buffer(bl, max_buffer_size);
+    hash<string> hasher;
+
+    cout << "starting thread " << io_depth << endl;
+    for (auto &op : ops) {
+      {
+        std::unique_lock<std::mutex> lock(in_flight_mutex);
+        cv.wait(lock, [&io_depth] { return in_flight_ops < io_depth; });
+
+      }
+      size_t key = hasher(*op.who) % nworker_threads;
+      if (key != id) {
+          continue;
+      }
+      // cout << fmt::format("Running op {} object={} range={}~{}", op.type, *op.object, op.offset, op.length) << endl;
+      op.completion = librados::Rados::aio_create_completion(static_cast<void*>(&op), completion_cb);
+      switch (op.type) {
+        case Write: {
+          int ret = io->aio_write(*op.object, op.completion, bl, op.length, op.offset);
+          if (ret != 0) {
+            cout << fmt::format("Error writing ecode={}", ret) << endl;;
+          }
+          break;
+        }
+        case WriteFull: {
+          int ret = io->aio_write_full(*op.object, op.completion, bl);
+          if (ret != 0) {
+            cout << fmt::format("Error writing full ecode={}", ret) << endl;;
+          }
+          break;
+        }
+        case Read: {
+          bufferlist read;
+          int ret = io->aio_read(*op.object, op.completion, &op.read_bl, op.length, op.offset);
+          if (ret != 0) {
+            cout << fmt::format("Error reading ecode={}", ret) << endl;;
+          }
+          break;
+        }
+        case Truncate: {
+            librados::ObjectWriteOperation write_operation;
+            write_operation.truncate(op.offset);
+            int ret = io->aio_operate(*op.object, op.completion, &write_operation);
+            if (ret != 0) {
+              cout << fmt::format("Error truncating ecode={}", ret) << endl;;
+            }
+            break;
+        }
+        case Zero: {
+            librados::ObjectWriteOperation write_operation;
+            write_operation.zero(op.offset, op.length);
+            int ret = io->aio_operate(*op.object, op.completion, &write_operation);
+            if (ret != 0) {
+              cout << fmt::format("Error zeroing ecode={}", ret) << endl;;
+            }
+            break;
+        }
+      }
+      in_flight_ops++;
+    }
+}
+
 int main(int argc, char** argv) {
   vector<Op> ops;
   librados::Rados cluster;
   librados::IoCtx io;
   uint64_t max_buffer_size = 0;
-  uint64_t io_depth = 64;
+  uint64_t io_depth = 8;
   string file;
   std::filesystem::path ceph_conf_path;
 
@@ -255,8 +321,6 @@ int main(int argc, char** argv) {
       context->ops.clear();
   }
 
-
-
   int ret = cluster.init2("client.admin", "ceph", 0);
   if (ret < 0) {
     std::cerr << "Couldn't init ceph! error " << ret << std::endl;
@@ -286,60 +350,13 @@ int main(int argc, char** argv) {
 
   // process ops
   // Create a buffer big enough for every operation. We will take enoguh bytes from it for every operation
-  bufferlist bl;
-  gen_buffer(bl, max_buffer_size);
-
-  for (auto &op : ops) {
-    {
-      std::unique_lock<std::mutex> lock(in_flight_mutex);
-      cv.wait(lock, [&io_depth] { return in_flight_ops < io_depth; });
-
-    }
-    // cout << fmt::format("Running op {} object={} range={}~{}", op.type, *op.object, op.offset, op.length) << endl;
-    op.completion = librados::Rados::aio_create_completion(static_cast<void*>(&op), completion_cb);
-    switch (op.type) {
-      case Write: {
-        int ret = io.aio_write(*op.object, op.completion, bl, op.length, op.offset);
-        if (ret != 0) {
-          cout << fmt::format("Error writing ecode={}", ret) << endl;;
-        }
-        break;
-      }
-      case WriteFull: {
-        int ret = io.aio_write_full(*op.object, op.completion, bl);
-        if (ret != 0) {
-          cout << fmt::format("Error writing full ecode={}", ret) << endl;;
-        }
-        break;
-      }
-      case Read: {
-        bufferlist read;
-        int ret = io.aio_read(*op.object, op.completion, &op.read_bl, op.length, op.offset);
-        if (ret != 0) {
-          cout << fmt::format("Error reading ecode={}", ret) << endl;;
-        }
-        break;
-      }
-      case Truncate: {
-          librados::ObjectWriteOperation write_operation;
-          write_operation.truncate(op.offset);
-          int ret = io.aio_operate(*op.object, op.completion, &write_operation);
-          if (ret != 0) {
-            cout << fmt::format("Error truncating ecode={}", ret) << endl;;
-          }
-          break;
-      }
-      case Zero: {
-          librados::ObjectWriteOperation write_operation;
-          write_operation.zero(op.offset, op.length);
-          int ret = io.aio_operate(*op.object, op.completion, &write_operation);
-          if (ret != 0) {
-            cout << fmt::format("Error zeroing ecode={}", ret) << endl;;
-          }
-          break;
-      }
-    }
-    in_flight_ops++;
+  vector<thread> worker_threads;
+  uint64_t nworker_threads = 16;
+  for (int i = 0; i < nworker_threads; i++) {
+      worker_threads.push_back(thread(worker_thread_entry, i, nworker_threads, std::ref(ops), max_buffer_size, io_depth, &io));
+  }
+  for (auto& worker : worker_threads) {
+      worker.join();
   }
   while (in_flight_ops > 0) {
     std::this_thread::sleep_for(std::chrono::milliseconds(100));