]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
test/allocsim: performance improvements, gotta go fast
authorPere Diaz Bou <pere-altea@hotmail.com>
Thu, 4 Jul 2024 13:40:12 +0000 (15:40 +0200)
committerPere Diaz Bou <pere-altea@hotmail.com>
Mon, 8 Jul 2024 12:02:45 +0000 (14:02 +0200)
Signed-off-by: Pere Diaz Bou <pere-altea@hotmail.com>
src/test/objectstore/allocsim/ops_replayer.cc

index cafffa1ee0e1956c21098fc6e015cc11b151d21c..9b16ca07cc03737edba1bab38f9799a96dea92ff 100644 (file)
@@ -1,5 +1,7 @@
 #include <algorithm>
+#include <boost/program_options/value_semantic.hpp>
 #include <cassert>
+#include <cctype>
 #include <cstdlib>
 #include <fcntl.h>
 #include <sys/mman.h>
@@ -8,6 +10,7 @@
 #include <condition_variable>
 #include <cstdint>
 #include <ctime>
+#include <fstream>
 #include <filesystem>
 #include <mutex>
 #include <rados/buffer_fwd.h>
@@ -68,7 +71,9 @@ struct Op {
 };
 
 struct ParserContext {
-    map<string, shared_ptr<string>> string_cache;
+    map<string, shared_ptr<string>> collection_cache;
+    map<string, shared_ptr<string>> object_cache;
+    map<string, shared_ptr<string>> who_cache;
     vector<Op> ops;
     char *start; // starts and ends in new line or eof
     char *end;
@@ -116,71 +121,107 @@ void completion_cb(librados::completion_t cb, void *arg) {
   cv.notify_one();
 }
 
+
+uint64_t timestamp_parser(std::string& date) {
+  uint64_t timestamp = 0;
+  uint64_t year, month, day, hour, minute, second;
+  // expeted format
+  // 2024-05-10 12:06:24.792232+00:00
+  // 0123456789012345678------------
+  year = std::stoull(date.substr(0, 4));
+  month = std::stoull(date.substr(5, 2));
+  day = std::stoull(date.substr(8, 2));
+  hour = std::stoull(date.substr(11, 2));
+  minute = std::stoull(date.substr(14, 2));
+  second = std::stoull(date.substr(17, 2));
+  //  SECONDS SINCE JAN 01 1970. (UTC), we don't care about timestamp timezone accuracy
+  timestamp += (year - 1970) * 365 * 24 * 60 * 60;
+  timestamp += (month * 30 * 24 * 60 * 60); // Yes, 30 day month is the best format ever and you cannot complain
+  timestamp += (day * 24 * 60 * 60);
+  timestamp += (hour * 60 * 60);
+  timestamp += (minute * 60);
+  timestamp += second;
+  return timestamp;
+}
+
 void parse_entry_point(shared_ptr<ParserContext> context) {
   cout << fmt::format("Starting parser thread start={:p} end={:p}", context->start, context->end) << endl;
 
   string date, time, who, type, range, object, collection;
   MemoryInputStream fstream(context->start, context->end);
-  const char* date_format_first_column = "%Y-%m-%d";
   // we expect this input:
   // 2024-05-10 12:06:24.990831+00:00 client.607247697.0:5632274 write 4096~4096 2:d03a455a:::08b0f2fd5f20f504e76c2dd3d24683a1:head 2.1c0b
   while (fstream >> date){
     // cout << date << endl;
-    tm t;
-    char* res = strptime(date.c_str(), date_format_first_column, &t);
-    if (res == nullptr) {
+    if (!(date.size() > 4 && isdigit(date[0]) && isdigit(date[1]) && isdigit(date[2]) && isdigit(date[3]) && date[4] == '-')) {
       fstream.ignore(std::numeric_limits<std::streamsize>::max(), '\n');
       continue;
+
     }
     fstream >> time >> who >> type >> range >> object >> collection;
 
     date += " " + time;
     // cout << date << endl;
     // FIXME: this is wrong  but it returns a reasonable bad timestamp :P
-    const char* date_format_full = "%Y-%m-%d %H:%M:%S.%f%z";
-    res = strptime(date.c_str(), date_format_full, &t);
-    time_t at = mktime(&t);
+    // const char* date_format_full = "%Y-%m-%d %H:%M:%S.%f%z";
+    // res = strptime(date.c_str(), date_format_full, &t);
+    // time_t at = mktime(&t);
+    time_t at = timestamp_parser(date);
 
     // cout << fmt::format("{} {} {} {} {} {} {}", date, at, who, type, range, object, collection) << endl;
 
     shared_ptr<string> who_ptr = make_shared<string>(who);
-    auto who_it = string_cache.find(who);
-    if (who_it == string_cache.end()) {
-      string_cache.insert({ who, who_ptr });
+    auto who_it = context->who_cache.find(who);
+    if (who_it == context->who_cache.end()) {
+      context->who_cache.insert({ who, who_ptr });
     } else {
       who_ptr = who_it->second;
     }
 
     shared_ptr<string> object_ptr = make_shared<string>(object);
-    auto object_it = string_cache.find(object);
-    if (object_it == string_cache.end()) {
-      string_cache.insert({ object, object_ptr });
+    auto object_it = context->object_cache.find(object);
+    if (object_it == context->object_cache.end()) {
+      context->object_cache.insert({ object, object_ptr });
     } else {
       object_ptr = object_it->second;
     }
 
     op_type ot;
-    if (type == "write") {
-        ot = Write;
-    } else if (type == "writefull") {
-        ot = WriteFull;
-    } else if (type == "read") {
+    switch (type[0]) {
+      case 'r': {
         ot = Read;
-    } else if (type == "sparse-read") {
+        break;
+      }
+      case 's': {
         ot = Read;
-    } else if (type == "truncate") {
-        ot = Truncate;
-    } else if (type == "zero") {
+        break;
+      }
+      case 'z': {
         ot = Zero;
-    } else {
+        break;
+      }
+      case 't': {
+        ot = Truncate;
+        break;
+      }
+      case 'w': {
+        if (type.size() > 6) {
+          ot = WriteFull;
+        } else {
+          ot = Write;
+        }
+        break;
+      }
+      default: {
         cout << "invalid type " << type << endl;
         exit(1);
+      }
     }
 
     shared_ptr<string> collection_ptr = make_shared<string>(collection);
-    auto collection_it = string_cache.find(collection);
-    if (collection_it == string_cache.end()) {
-      string_cache.insert({ collection, collection_ptr });
+    auto collection_it = context->collection_cache.find(collection);
+    if (collection_it == context->collection_cache.end()) {
+      context->collection_cache.insert({ collection, collection_ptr });
     } else {
       collection_ptr = collection_it->second;
     }
@@ -288,6 +329,8 @@ int main(int argc, char** argv) {
   string file("input.txt");
   string ceph_conf_path("./ceph.conf");
   string pool("test_pool");
+  string input_ir_output("");
+  bool skip_do_ops = false;
 
   po::options_description po_options("Options");
   po_options.add_options()
@@ -298,6 +341,8 @@ int main(int argc, char** argv) {
     ("parser-threads", po::value<uint64_t>(&nparser_threads)->default_value(16), "Number of parser threads")
     ("worker-threads", po::value<uint64_t>(&nworker_threads)->default_value(16), "Number of I/O worker threads")
     ("pool", po::value<string>(&pool)->default_value("test_pool"), "Pool to use for I/O")
+    ("optimized-input-path", po::value<string>(&input_ir_output)->default_value(""), "Create a new input file that is optimzed for parsing. If not empty it will create it")
+    ("skip-do-ops", po::bool_switch(&skip_do_ops)->default_value(false), "Skip doing operations")
     ;
 
   po::options_description po_all("All options");
@@ -348,13 +393,23 @@ int main(int argc, char** argv) {
   }
   // reduce
   for (auto context : parser_contexts) {
-      string_cache.insert(context->string_cache.begin(), context->string_cache.end());
+      string_cache.insert(context->object_cache.begin(), context->object_cache.end());
+      string_cache.insert(context->collection_cache.begin(), context->collection_cache.end());
+      string_cache.insert(context->who_cache.begin(), context->who_cache.end());
       ops.insert(ops.end(), context->ops.begin(), context->ops.end());
       max_buffer_size = max(context->max_buffer_size, max_buffer_size);
-      context->string_cache.clear();
+      context->object_cache.clear();
+      context->collection_cache.clear();
+      context->who_cache.clear();
       context->ops.clear();
   }
 
+  if (!input_ir_output.empty()) {
+    // Create an optimized file for parsing
+    ofstream output(input_ir_output, ios::out);
+    output << "input-optimized" << endl;
+  }
+
   int ret = cluster.init2("client.admin", "ceph", 0);
   if (ret < 0) {
     std::cerr << "Couldn't init ceph! error " << ret << std::endl;
@@ -382,6 +437,9 @@ int main(int argc, char** argv) {
   std::cout << fmt::format("pool {} ready", pool) << std::endl;
 
 
+  if (skip_do_ops) {
+    return EXIT_SUCCESS;
+  }
   // process ops
   vector<thread> worker_threads;
   for (int i = 0; i < nworker_threads; i++) {
@@ -395,5 +453,5 @@ int main(int argc, char** argv) {
   }
 
   cout << ops.size() << endl;
-  return 0;
+  return EXIT_SUCCESS;
 }