]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
test/allocsim: replayer merge files by timestmap 58748/head
authorPere Diaz Bou <pere-altea@hotmail.com>
Tue, 23 Jul 2024 13:19:43 +0000 (15:19 +0200)
committerPere Diaz Bou <pere-altea@hotmail.com>
Tue, 23 Jul 2024 13:19:43 +0000 (15:19 +0200)
Signed-off-by: Pere Diaz Bou <pere-altea@hotmail.com>
src/test/objectstore/allocsim/ops_replayer.cc

index 6f4d9501ee9e106f5d236fcc569389fbd1f980be..fd947f5c4547a6a3ac6d825a482a87d21f4d18bf 100644 (file)
@@ -13,8 +13,8 @@
 #include <fstream>
 #include <filesystem>
 #include <mutex>
-#include <rados/buffer_fwd.h>
-#include <rados/librados.hpp>
+#include "include/rados/buffer_fwd.h"
+#include "include/rados/librados.hpp"
 #include <atomic>
 #include <fmt/format.h>
 #include <map>
@@ -321,6 +321,11 @@ void worker_thread_entry(uint64_t id, uint64_t nworker_threads, vector<Op> &ops,
   }
 }
 
+
+int op_comparison_by_date(const Op& lhs, const Op& rhs) {
+  return lhs.at < rhs.at;
+}
+
 void usage(po::options_description &desc) {
   cout << desc << std::endl;
 }
@@ -343,7 +348,7 @@ int main(int argc, char** argv) {
   po::options_description po_options("Options");
   po_options.add_options()
     ("help,h", "produce help message")
-    (",i", po::value<string>(&file)->default_value("input.txt"), "Input file (output of op_scraper.py)")
+    ("input-files,i", po::value<vector<string>>()->multitoken(), "List of input files (output of op_scraper.py). Multiple files will be merged and sorted by time order")
     ("ceph-conf", po::value<string>(&ceph_conf_path)->default_value("ceph.conf"), "Path to ceph conf")
     ("io-depth", po::value<uint64_t>(&io_depth)->default_value(64), "I/O depth")
     ("parser-threads", po::value<uint64_t>(&nparser_threads)->default_value(16), "Number of parser threads")
@@ -359,53 +364,73 @@ int main(int argc, char** argv) {
   po::parsed_options parsed = po::command_line_parser(argc, argv).options(po_all).allow_unregistered().run();
   po::store( parsed, vm);
   po::notify(vm);
+  
   if (vm.count("help")) {
     usage(po_all);
     exit(EXIT_SUCCESS);
   }
-
-  // Parse input file
-  vector<std::thread> parser_threads;
-  vector<shared_ptr<ParserContext>> parser_contexts;
-  int fd = open(file.c_str(), O_RDONLY);
-  if (fd == -1) {
-      cout << "Error opening file" << endl;
-  }
-  struct stat file_stat;
-  fstat(fd, &file_stat);
-  char* mapped_buffer = (char*)mmap(NULL, file_stat.st_size, PROT_READ, MAP_SHARED, fd, 0);
-  if (mapped_buffer == nullptr) {
-      cout << "error mapping buffer" << endl;
-  }
-  uint64_t start_offset = 0;
-  uint64_t step_size = file_stat.st_size / nparser_threads;
-  for (int i = 0; i < nparser_threads; i++) {
-    char* end = mapped_buffer + start_offset + step_size;
-    while(*end != '\n') {
-        end--;
+  
+  assert(vm.count("input-files") > 0);
+  
+  vector<string> input_files = vm["input-files"].as<vector<string>>();
+
+  vector<shared_ptr<ParserContext>> complete_parser_contexts; // list of ALL parser contexts so that shared_ptrs do not die.
+  for (auto &file : input_files) {
+    // Parse input file
+    vector<std::thread> parser_threads;
+    vector<shared_ptr<ParserContext>> parser_contexts;
+    cout << fmt::format("parsing file {}", file) << endl;
+    int fd = open(file.c_str(), O_RDONLY);
+    if (fd == -1) {
+        cout << "Error opening file" << endl;
+        exit(EXIT_FAILURE);
     }
-    if (i == nparser_threads - 1) {
-        end = mapped_buffer + file_stat.st_size;
+    struct stat file_stat;
+    fstat(fd, &file_stat);
+    char* mapped_buffer = (char*)mmap(NULL, file_stat.st_size, PROT_READ, MAP_SHARED, fd, 0);
+    if (mapped_buffer == nullptr) {
+        cout << "error mapping buffer" << endl;
+        exit(EXIT_FAILURE);
     }
-    shared_ptr<ParserContext> context = make_shared<ParserContext>();
-    context->start = mapped_buffer + start_offset;
-    context->end = end;
-    context->max_buffer_size = 0;
-    parser_contexts.push_back(context);
-    parser_threads.push_back(std::thread(parse_entry_point, context));
-    start_offset += (end - mapped_buffer - start_offset);
-  }
-  for (auto& t : parser_threads) {
-      t.join();
+    uint64_t start_offset = 0;
+    uint64_t step_size = file_stat.st_size / nparser_threads;
+    for (int i = 0; i < nparser_threads; i++) {
+      char* end = mapped_buffer + start_offset + step_size;
+      while(*end != '\n') {
+          end--;
+      }
+      if (i == nparser_threads - 1) {
+          end = mapped_buffer + file_stat.st_size;
+      }
+      shared_ptr<ParserContext> context = make_shared<ParserContext>();
+      context->start = mapped_buffer + start_offset;
+      context->end = end;
+      context->max_buffer_size = 0;
+      parser_contexts.push_back(context);
+      parser_threads.push_back(std::thread(parse_entry_point, context));
+      start_offset += (end - mapped_buffer - start_offset);
+    }
+    for (auto& t : parser_threads) {
+        t.join();
+    }
+    // reduce
+    for (auto context : parser_contexts) {
+        ops.insert(ops.end(), context->ops.begin(), context->ops.end());
+        max_buffer_size = max(context->max_buffer_size, max_buffer_size);
+        // context->ops.clear();
+    }
+    munmap(mapped_buffer, file_stat.st_size);
+    complete_parser_contexts.insert(complete_parser_contexts.end(), parser_contexts.begin(), parser_contexts.end());
   }
-  // reduce
-  for (auto context : parser_contexts) {
-      ops.insert(ops.end(), context->ops.begin(), context->ops.end());
-      max_buffer_size = max(context->max_buffer_size, max_buffer_size);
-      // context->ops.clear();
+  
+  cout << "Sorting ops by date..." << endl;
+  sort(ops.begin(), ops.end(), op_comparison_by_date);
+  cout << "Sorting ops by date done" << endl;
+  
+  if (skip_do_ops) {
+    return EXIT_SUCCESS;
   }
-
-
+  
   int ret = cluster.init2("client.admin", "ceph", 0);
   if (ret < 0) {
     std::cerr << "Couldn't init ceph! error " << ret << std::endl;
@@ -433,9 +458,6 @@ int main(int argc, char** argv) {
   std::cout << fmt::format("pool {} ready", pool) << std::endl;
 
 
-  if (skip_do_ops) {
-    return EXIT_SUCCESS;
-  }
   // process ops
   vector<thread> worker_threads;
   for (int i = 0; i < nworker_threads; i++) {