]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
test/allocsim: ops scrapper arg parsing 58423/head
authorPere Diaz Bou <pere-altea@hotmail.com>
Thu, 4 Jul 2024 10:20:07 +0000 (12:20 +0200)
committerPere Diaz Bou <pere-altea@hotmail.com>
Fri, 5 Jul 2024 08:36:16 +0000 (10:36 +0200)
```
fedora :: pere/scrapper-replayer/build 130 ยป ./bin/replayer -h
All options:

Options:
  -h [ --help ]                 produce help message
  -i arg (=input.txt)           Input file (output of op_scraper.py)
  --ceph-conf arg (=ceph.conf)  Path to ceph conf
  --io-depth arg (=64)          I/O depth
  --parser-threads arg (=16)    Number of parser threads
  --worker-threads arg (=16)    Number of I/O worker threads
  --pool arg (=test_pool)       Pool to use for I/O
```

Signed-off-by: Pere Diaz Bou <pere-altea@hotmail.com>
src/test/objectstore/CMakeLists.txt
src/test/objectstore/allocsim/CMakeLists.txt [new file with mode: 0644]
src/test/objectstore/allocsim/ops_replayer.cc

index 8a6a0e7ed91439f804fb8c576fc0507e4f1a157e..bddff3f6727625805713d5e4e3e6b5c31124b19c 100644 (file)
@@ -24,6 +24,8 @@ target_link_libraries(ceph_test_objectstore
 install(TARGETS ceph_test_objectstore
   DESTINATION ${CMAKE_INSTALL_BINDIR})
 
+add_subdirectory(allocsim)
+
 add_executable(ceph_test_keyvaluedb
   test_kv.cc)
 target_link_libraries(ceph_test_keyvaluedb
diff --git a/src/test/objectstore/allocsim/CMakeLists.txt b/src/test/objectstore/allocsim/CMakeLists.txt
new file mode 100644 (file)
index 0000000..cbfbc69
--- /dev/null
@@ -0,0 +1,10 @@
+
+
+add_executable(replayer ops_replayer.cc)
+
+target_link_libraries(replayer
+    PRIVATE
+    fmt
+    librados
+    Boost::program_options
+)
index d92630763454c05faf02ec42cfa529f9a9549661..cafffa1ee0e1956c21098fc6e015cc11b151d21c 100644 (file)
@@ -1,5 +1,6 @@
 #include <algorithm>
 #include <cassert>
+#include <cstdlib>
 #include <fcntl.h>
 #include <sys/mman.h>
 #include <sys/stat.h>
 #include <iostream>
 #include <vector>
 
+#include <boost/program_options/variables_map.hpp>
+#include <boost/program_options/parsers.hpp>
+
+namespace po = boost::program_options;
+
 
 using namespace std;
 using namespace ceph;
@@ -111,176 +117,202 @@ void completion_cb(librados::completion_t cb, void *arg) {
 }
 
 void parse_entry_point(shared_ptr<ParserContext> context) {
-    string date, time, who, type, range, object, collection;
-    MemoryInputStream fstream(context->start, context->end);
-    const char* date_format_first_column = "%Y-%m-%d";
-    // we expect this input:
-    // 2024-05-10 12:06:24.990831+00:00 client.607247697.0:5632274 write 4096~4096 2:d03a455a:::08b0f2fd5f20f504e76c2dd3d24683a1:head 2.1c0b
-    while (fstream >> date){
-      // cout << date << endl;
-      tm t;
-      char* res = strptime(date.c_str(), date_format_first_column, &t);
-      if (res == nullptr) {
-        fstream.ignore(std::numeric_limits<std::streamsize>::max(), '\n');
-        continue;
-      }
-      fstream >> time >> who >> type >> range >> object >> collection;
-
-      date += " " + time;
-      // cout << date << endl;
-      // FIXME: this is wrong  but it returns a reasonable bad timestamp :P
-      const char* date_format_full = "%Y-%m-%d %H:%M:%S.%f%z";
-      res = strptime(date.c_str(), date_format_full, &t);
-      time_t at = mktime(&t);
-
-      // cout << fmt::format("{} {} {} {} {} {} {}", date, at, who, type, range, object, collection) << endl;
-
-      shared_ptr<string> who_ptr = make_shared<string>(who);
-      auto who_it = string_cache.find(who);
-      if (who_it == string_cache.end()) {
-        string_cache.insert({ who, who_ptr });
-      } else {
-        who_ptr = who_it->second;
-      }
+  cout << fmt::format("Starting parser thread start={:p} end={:p}", context->start, context->end) << endl;
+
+  string date, time, who, type, range, object, collection;
+  MemoryInputStream fstream(context->start, context->end);
+  const char* date_format_first_column = "%Y-%m-%d";
+  // we expect this input:
+  // 2024-05-10 12:06:24.990831+00:00 client.607247697.0:5632274 write 4096~4096 2:d03a455a:::08b0f2fd5f20f504e76c2dd3d24683a1:head 2.1c0b
+  while (fstream >> date){
+    // cout << date << endl;
+    tm t;
+    char* res = strptime(date.c_str(), date_format_first_column, &t);
+    if (res == nullptr) {
+      fstream.ignore(std::numeric_limits<std::streamsize>::max(), '\n');
+      continue;
+    }
+    fstream >> time >> who >> type >> range >> object >> collection;
+
+    date += " " + time;
+    // cout << date << endl;
+    // FIXME: this is wrong  but it returns a reasonable bad timestamp :P
+    const char* date_format_full = "%Y-%m-%d %H:%M:%S.%f%z";
+    res = strptime(date.c_str(), date_format_full, &t);
+    time_t at = mktime(&t);
+
+    // cout << fmt::format("{} {} {} {} {} {} {}", date, at, who, type, range, object, collection) << endl;
+
+    shared_ptr<string> who_ptr = make_shared<string>(who);
+    auto who_it = string_cache.find(who);
+    if (who_it == string_cache.end()) {
+      string_cache.insert({ who, who_ptr });
+    } else {
+      who_ptr = who_it->second;
+    }
 
-      shared_ptr<string> object_ptr = make_shared<string>(object);
-      auto object_it = string_cache.find(object);
-      if (object_it == string_cache.end()) {
-        string_cache.insert({ object, object_ptr });
-      } else {
-        object_ptr = object_it->second;
-      }
+    shared_ptr<string> object_ptr = make_shared<string>(object);
+    auto object_it = string_cache.find(object);
+    if (object_it == string_cache.end()) {
+      string_cache.insert({ object, object_ptr });
+    } else {
+      object_ptr = object_it->second;
+    }
 
-      op_type ot;
-      if (type == "write") {
-          ot = Write;
-      } else if (type == "writefull") {
-          ot = WriteFull;
-      } else if (type == "read") {
-          ot = Read;
-      } else if (type == "sparse-read") {
-          ot = Read;
-      } else if (type == "truncate") {
-          ot = Truncate;
-      } else if (type == "zero") {
-          ot = Zero;
-      } else {
-          cout << "invalid type " << type << endl;
-          exit(1);
-      }
+    op_type ot;
+    if (type == "write") {
+        ot = Write;
+    } else if (type == "writefull") {
+        ot = WriteFull;
+    } else if (type == "read") {
+        ot = Read;
+    } else if (type == "sparse-read") {
+        ot = Read;
+    } else if (type == "truncate") {
+        ot = Truncate;
+    } else if (type == "zero") {
+        ot = Zero;
+    } else {
+        cout << "invalid type " << type << endl;
+        exit(1);
+    }
 
-      shared_ptr<string> collection_ptr = make_shared<string>(collection);
-      auto collection_it = string_cache.find(collection);
-      if (collection_it == string_cache.end()) {
-        string_cache.insert({ collection, collection_ptr });
-      } else {
-        collection_ptr = collection_it->second;
-      }
+    shared_ptr<string> collection_ptr = make_shared<string>(collection);
+    auto collection_it = string_cache.find(collection);
+    if (collection_it == string_cache.end()) {
+      string_cache.insert({ collection, collection_ptr });
+    } else {
+      collection_ptr = collection_it->second;
+    }
 
-      uint64_t offset = 0, length = 0;
-      stringstream range_stream(range);
-      string offset_str, length_str;
-      getline(range_stream, offset_str, '~');
-      offset = stoll(offset_str);
+    uint64_t offset = 0, length = 0;
+    stringstream range_stream(range);
+    string offset_str, length_str;
+    getline(range_stream, offset_str, '~');
+    offset = stoll(offset_str);
 
-      if (ot != Truncate) {
-          // Truncate doesn't only has one number
-          getline(range_stream, length_str, '~');
-          length = stoll(length_str);
-      }
+    if (ot != Truncate) {
+        // Truncate only has one number
+        getline(range_stream, length_str, '~');
+        length = stoll(length_str);
+    }
 
-      context->max_buffer_size = max(length, context->max_buffer_size);
+    context->max_buffer_size = max(length, context->max_buffer_size);
 
-      context->ops.push_back(Op(at, ot, offset, length, object_ptr, collection_ptr, who_ptr));
-    }
+    context->ops.push_back(Op(at, ot, offset, length, object_ptr, collection_ptr, who_ptr));
+  }
 }
 
 void worker_thread_entry(uint64_t id, uint64_t nworker_threads, vector<Op> &ops, uint64_t max_buffer_size, uint64_t io_depth, librados::IoCtx* io) {
-
-    bufferlist bl;
-    gen_buffer(bl, max_buffer_size);
-    hash<string> hasher;
-
-    cout << "starting thread " << io_depth << endl;
-    for (auto &op : ops) {
-      {
-        std::unique_lock<std::mutex> lock(in_flight_mutex);
-        cv.wait(lock, [&io_depth] { return in_flight_ops < io_depth; });
-
+  // Create a buffer big enough for every operation. We will take enoguh bytes from it for every operation
+  bufferlist bl;
+  gen_buffer(bl, max_buffer_size);
+  hash<string> hasher;
+
+  cout << fmt::format("Starting thread {} with io_depth={} max_buffer_size={}", id, io_depth, max_buffer_size) << endl;
+  for (auto &op : ops) {
+    {
+      std::unique_lock<std::mutex> lock(in_flight_mutex);
+      cv.wait(lock, [&io_depth] { return in_flight_ops < io_depth; });
+    }
+    size_t key = hasher(*op.who) % nworker_threads;
+    if (key != id) {
+        continue;
+    }
+    // cout << fmt::format("Running op {} object={} range={}~{}", op.type, *op.object, op.offset, op.length) << endl;
+    op.completion = librados::Rados::aio_create_completion(static_cast<void*>(&op), completion_cb);
+    switch (op.type) {
+      case Write: {
+        bufferlist trimmed;
+        trimmed.substr_of(bl, 0, op.length);
+        int ret = io->aio_write(*op.object, op.completion, trimmed, op.length, op.offset);
+        if (ret != 0) {
+          cout << fmt::format("Error writing ecode={}", ret) << endl;;
+        }
+        break;
       }
-      size_t key = hasher(*op.who) % nworker_threads;
-      if (key != id) {
-          continue;
+      case WriteFull: {
+        bufferlist trimmed;
+        trimmed.substr_of(bl, 0, op.length);
+        int ret = io->aio_write_full(*op.object, op.completion, trimmed);
+        if (ret != 0) {
+          cout << fmt::format("Error writing full ecode={}", ret) << endl;;
+        }
+        break;
       }
-      // cout << fmt::format("Running op {} object={} range={}~{}", op.type, *op.object, op.offset, op.length) << endl;
-      op.completion = librados::Rados::aio_create_completion(static_cast<void*>(&op), completion_cb);
-      switch (op.type) {
-        case Write: {
-          bufferlist trimmed;
-          trimmed.substr_of(bl, 0, op.length);
-          int ret = io->aio_write(*op.object, op.completion, trimmed, op.length, op.offset);
-          if (ret != 0) {
-            cout << fmt::format("Error writing ecode={}", ret) << endl;;
-          }
-          break;
+      case Read: {
+        bufferlist read;
+        int ret = io->aio_read(*op.object, op.completion, &op.read_bl, op.length, op.offset);
+        if (ret != 0) {
+          cout << fmt::format("Error reading ecode={}", ret) << endl;;
         }
-        case WriteFull: {
-          bufferlist trimmed;
-          trimmed.substr_of(bl, 0, op.length);
-          int ret = io->aio_write_full(*op.object, op.completion, trimmed);
+        break;
+      }
+      case Truncate: {
+          librados::ObjectWriteOperation write_operation;
+          write_operation.truncate(op.offset);
+          int ret = io->aio_operate(*op.object, op.completion, &write_operation);
           if (ret != 0) {
-            cout << fmt::format("Error writing full ecode={}", ret) << endl;;
+            cout << fmt::format("Error truncating ecode={}", ret) << endl;;
           }
           break;
-        }
-        case Read: {
-          bufferlist read;
-          int ret = io->aio_read(*op.object, op.completion, &op.read_bl, op.length, op.offset);
+      }
+      case Zero: {
+          librados::ObjectWriteOperation write_operation;
+          write_operation.zero(op.offset, op.length);
+          int ret = io->aio_operate(*op.object, op.completion, &write_operation);
           if (ret != 0) {
-            cout << fmt::format("Error reading ecode={}", ret) << endl;;
+            cout << fmt::format("Error zeroing ecode={}", ret) << endl;;
           }
           break;
-        }
-        case Truncate: {
-            librados::ObjectWriteOperation write_operation;
-            write_operation.truncate(op.offset);
-            int ret = io->aio_operate(*op.object, op.completion, &write_operation);
-            if (ret != 0) {
-              cout << fmt::format("Error truncating ecode={}", ret) << endl;;
-            }
-            break;
-        }
-        case Zero: {
-            librados::ObjectWriteOperation write_operation;
-            write_operation.zero(op.offset, op.length);
-            int ret = io->aio_operate(*op.object, op.completion, &write_operation);
-            if (ret != 0) {
-              cout << fmt::format("Error zeroing ecode={}", ret) << endl;;
-            }
-            break;
-        }
       }
-      in_flight_ops++;
     }
+    in_flight_ops++;
+  }
+}
+
+void usage(po::options_description &desc) {
+  cout << desc << std::endl;
 }
 
 int main(int argc, char** argv) {
   vector<Op> ops;
   librados::Rados cluster;
   librados::IoCtx io;
-  uint64_t max_buffer_size = 0;
-  uint64_t io_depth = 8;
-  string file;
-  std::filesystem::path ceph_conf_path;
+  uint64_t max_buffer_size = 0; // We can use a single buffer for writes and trim it at will. The buffer will be the size of the maximum length op.
 
-  if (argc < 3) {
-    cout << fmt::format("usage: ops_replayer file ceph.conf") << endl;
+  // options
+  uint64_t io_depth = 8;
+  uint64_t nparser_threads = 16;
+  uint64_t nworker_threads = 16;
+  string file("input.txt");
+  string ceph_conf_path("./ceph.conf");
+  string pool("test_pool");
+
+  po::options_description po_options("Options");
+  po_options.add_options()
+    ("help,h", "produce help message")
+    (",i", po::value<string>(&file)->default_value("input.txt"), "Input file (output of op_scraper.py)")
+    ("ceph-conf", po::value<string>(&ceph_conf_path)->default_value("ceph.conf"), "Path to ceph conf")
+    ("io-depth", po::value<uint64_t>(&io_depth)->default_value(64), "I/O depth")
+    ("parser-threads", po::value<uint64_t>(&nparser_threads)->default_value(16), "Number of parser threads")
+    ("worker-threads", po::value<uint64_t>(&nworker_threads)->default_value(16), "Number of I/O worker threads")
+    ("pool", po::value<string>(&pool)->default_value("test_pool"), "Pool to use for I/O")
+    ;
+
+  po::options_description po_all("All options");
+  po_all.add(po_options);
+
+  po::variables_map vm;
+  po::parsed_options parsed = po::command_line_parser(argc, argv).options(po_all).allow_unregistered().run();
+  po::store( parsed, vm);
+  po::notify(vm);
+  if (vm.count("help")) {
+    usage(po_all);
+    exit(EXIT_SUCCESS);
   }
-  file = argv[1];
-  ceph_conf_path = argv[2];
-  cout << file << endl;
 
-  uint64_t nthreads = 16;
+  // Parse input file
   vector<std::thread> parser_threads;
   vector<shared_ptr<ParserContext>> parser_contexts;
   int fd = open(file.c_str(), O_RDONLY);
@@ -294,13 +326,13 @@ int main(int argc, char** argv) {
       cout << "error mapping buffer" << endl;
   }
   uint64_t start_offset = 0;
-  uint64_t step_size = file_stat.st_size / nthreads;
-  for (int i = 0; i < nthreads; i++) {
+  uint64_t step_size = file_stat.st_size / nparser_threads;
+  for (int i = 0; i < nparser_threads; i++) {
     char* end = mapped_buffer + start_offset + step_size;
     while(*end != '\n') {
         end--;
     }
-    if (i == nthreads-1) {
+    if (i == nparser_threads - 1) {
         end = mapped_buffer + file_stat.st_size;
     }
     shared_ptr<ParserContext> context = make_shared<ParserContext>();
@@ -314,6 +346,7 @@ int main(int argc, char** argv) {
   for (auto& t : parser_threads) {
       t.join();
   }
+  // reduce
   for (auto context : parser_contexts) {
       string_cache.insert(context->string_cache.begin(), context->string_cache.end());
       ops.insert(ops.end(), context->ops.begin(), context->ops.end());
@@ -341,18 +374,16 @@ int main(int argc, char** argv) {
     return EXIT_FAILURE;
   }
   std::cout << "cluster connect ready" << std::endl;
-  cluster.ioctx_create("test_pool", io);
+  cluster.ioctx_create(pool.c_str(), io);
   if (ret < 0) {
     std::cerr << "Couldn't set up ioctx! error " << ret << std::endl;
     exit(EXIT_FAILURE);
   }
-  std::cout << "test-pool ready" << std::endl;
+  std::cout << fmt::format("pool {} ready", pool) << std::endl;
 
 
   // process ops
-  // Create a buffer big enough for every operation. We will take enoguh bytes from it for every operation
   vector<thread> worker_threads;
-  uint64_t nworker_threads = 16;
   for (int i = 0; i < nworker_threads; i++) {
       worker_threads.push_back(thread(worker_thread_entry, i, nworker_threads, std::ref(ops), max_buffer_size, io_depth, &io));
   }
@@ -362,7 +393,6 @@ int main(int argc, char** argv) {
   while (in_flight_ops > 0) {
     std::this_thread::sleep_for(std::chrono::milliseconds(100));
   }
-  // io.write(const std::string &oid, bufferlist &bl, size_t len, uint64_t off)
 
   cout << ops.size() << endl;
   return 0;