]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
test/allocsim: mmap threaded parser
authorPere Diaz Bou <pere-altea@hotmail.com>
Wed, 3 Jul 2024 10:21:46 +0000 (12:21 +0200)
committerPere Diaz Bou <pere-altea@hotmail.com>
Wed, 3 Jul 2024 10:21:46 +0000 (12:21 +0200)
Signed-off-by: Pere Diaz Bou <pere-altea@hotmail.com>
src/test/objectstore/allocsim/ops_replayer.cc

index 382ebdd948b8a1969baf41bcafb6bf2752a91c3f..9f8f56ccee5d8d45b438d7a4cca7f984fdf3bf8c 100644 (file)
@@ -1,5 +1,9 @@
 #include <algorithm>
 #include <cassert>
+#include <fcntl.h>
+#include <string_view>
+#include <sys/mman.h>
+#include <sys/stat.h>
 #include <thread>
 #include <condition_variable>
 #include <cstdint>
@@ -59,6 +63,30 @@ struct Op {
 
 };
 
+struct ParserContext {
+    map<string, shared_ptr<string>> string_cache;
+    vector<Op> ops;
+    char *start; // starts and ends in new line or eof
+    char *end;
+    uint64_t max_buffer_size;
+};
+
+class MemoryStreamBuf : public std::streambuf {
+public:
+    MemoryStreamBuf(const char* start, const char* end) {
+        this->setg(const_cast<char*>(start), const_cast<char*>(start), const_cast<char*>(end));
+    }
+};
+
+class MemoryInputStream : public std::istream {
+    MemoryStreamBuf _buffer;
+public:
+    MemoryInputStream(const char* start, const char* end)
+        : std::istream(&_buffer), _buffer(start, end) {
+        rdbuf(&_buffer);
+    }
+};
+
 void gen_buffer(bufferlist& bl, uint64_t size) {
     std::unique_ptr<char[]> buffer = std::make_unique<char[]>(size);
     std::independent_bits_engine<std::default_random_engine, CHAR_BIT, unsigned char> e;
@@ -69,7 +97,7 @@ void gen_buffer(bufferlist& bl, uint64_t size) {
 void completion_cb(librados::completion_t cb, void *arg) {
   Op *op = static_cast<Op*>(arg);
   // Process the completed operation here
-  std::cout << fmt::format("Completed op {} object={} range={}~{}", op->type, *op->object, op->offset, op->length) << std::endl;
+  // std::cout << fmt::format("Completed op {} object={} range={}~{}", op->type, *op->object, op->offset, op->length) << std::endl;
 
   delete op->completion;
   op->completion = nullptr;
@@ -84,6 +112,91 @@ void completion_cb(librados::completion_t cb, void *arg) {
   cv.notify_one();
 }
 
+void parse_entry_point(shared_ptr<ParserContext> context) {
+    string date, time, who, type, range, object, collection;
+    MemoryInputStream fstream(context->start, context->end);
+    const char* date_format_first_column = "%Y-%m-%d";
+    // we expect this input:
+    // 2024-05-10 12:06:24.990831+00:00 client.607247697.0:5632274 write 4096~4096 2:d03a455a:::08b0f2fd5f20f504e76c2dd3d24683a1:head 2.1c0b
+    while (fstream >> date){
+      // cout << date << endl;
+      tm t;
+      char* res = strptime(date.c_str(), date_format_first_column, &t);
+      if (res == nullptr) {
+        fstream.ignore(std::numeric_limits<std::streamsize>::max(), '\n');
+        continue;
+      }
+      fstream >> time >> who >> type >> range >> object >> collection;
+
+      date += " " + time;
+      // cout << date << endl;
+      // FIXME: this is wrong  but it returns a reasonable bad timestamp :P
+      const char* date_format_full = "%Y-%m-%d %H:%M:%S.%f%z";
+      res = strptime(date.c_str(), date_format_full, &t);
+      time_t at = mktime(&t);
+
+      // cout << fmt::format("{} {} {} {} {} {} {}", date, at, who, type, range, object, collection) << endl;
+
+      shared_ptr<string> who_ptr = make_shared<string>(who);
+      auto who_it = string_cache.find(who);
+      if (who_it == string_cache.end()) {
+        string_cache.insert({ who, who_ptr });
+      } else {
+        who_ptr = who_it->second;
+      }
+
+      shared_ptr<string> object_ptr = make_shared<string>(object);
+      auto object_it = string_cache.find(object);
+      if (object_it == string_cache.end()) {
+        string_cache.insert({ object, object_ptr });
+      } else {
+        object_ptr = object_it->second;
+      }
+
+      op_type ot;
+      if (type == "write") {
+          ot = Write;
+      } else if (type == "writefull") {
+          ot = WriteFull;
+      } else if (type == "read") {
+          ot = Read;
+      } else if (type == "sparse-read") {
+          ot = Read;
+      } else if (type == "truncate") {
+          ot = Truncate;
+      } else if (type == "zero") {
+          ot = Zero;
+      } else {
+          cout << "invalid type " << type << endl;
+          exit(1);
+      }
+
+      shared_ptr<string> collection_ptr = make_shared<string>(collection);
+      auto collection_it = string_cache.find(collection);
+      if (collection_it == string_cache.end()) {
+        string_cache.insert({ collection, collection_ptr });
+      } else {
+        collection_ptr = collection_it->second;
+      }
+
+      uint64_t offset = 0, length = 0;
+      stringstream range_stream(range);
+      string offset_str, length_str;
+      getline(range_stream, offset_str, '~');
+      offset = stoll(offset_str);
+
+      if (ot != Truncate) {
+          // Truncate doesn't only has one number
+          getline(range_stream, length_str, '~');
+          length = stoll(length_str);
+      }
+
+      context->max_buffer_size = max(length, context->max_buffer_size);
+
+      context->ops.push_back(Op(at, ot, offset, length, object_ptr, collection_ptr, who_ptr));
+    }
+}
+
 int main(int argc, char** argv) {
   vector<Op> ops;
   librados::Rados cluster;
@@ -100,90 +213,49 @@ int main(int argc, char** argv) {
   ceph_conf_path = argv[2];
   cout << file << endl;
 
-
-
-  string date, time, who, type, range, object, collection;
-  ifstream fstream(file, ifstream::in);
-  const char* date_format_first_column = "%Y-%m-%d";
-  // we expect this input:
-  // 2024-05-10 12:06:24.990831+00:00 client.607247697.0:5632274 write 4096~4096 2:d03a455a:::08b0f2fd5f20f504e76c2dd3d24683a1:head 2.1c0b
-  while (fstream >> date){
-    // cout << date << endl;
-    tm t;
-    char* res = strptime(date.c_str(), date_format_first_column, &t);
-    if (res == nullptr) {
-      fstream.ignore(std::numeric_limits<std::streamsize>::max(), '\n');
-      continue;
-    }
-    fstream >> time >> who >> type >> range >> object >> collection;
-
-    date += " " + time;
-    // cout << date << endl;
-    // FIXME: this is wrong  but it returns a reasonable bad timestamp :P
-    const char* date_format_full = "%Y-%m-%d %H:%M:%S.%f%z";
-    res = strptime(date.c_str(), date_format_full, &t);
-    time_t at = mktime(&t);
-
-    // cout << fmt::format("{} {} {} {} {} {} {}", date, at, who, type, range, object, collection) << endl;
-
-    shared_ptr<string> who_ptr = make_shared<string>(who);
-    auto who_it = string_cache.find(who);
-    if (who_it == string_cache.end()) {
-      string_cache.insert({ who, who_ptr });
-    } else {
-      who_ptr = who_it->second;
-    }
-
-    shared_ptr<string> object_ptr = make_shared<string>(object);
-    auto object_it = string_cache.find(object);
-    if (object_it == string_cache.end()) {
-      string_cache.insert({ object, object_ptr });
-    } else {
-      object_ptr = object_it->second;
-    }
-
-    op_type ot;
-    if (type == "write") {
-        ot = Write;
-    } else if (type == "writefull") {
-        ot = WriteFull;
-    } else if (type == "read") {
-        ot = Read;
-    } else if (type == "sparse-read") {
-        ot = Read;
-    } else if (type == "truncate") {
-        ot = Truncate;
-    } else if (type == "zero") {
-        ot = Zero;
-    } else {
-        cout << "invalid type " << type << endl;
-        exit(1);
-    }
-
-    shared_ptr<string> collection_ptr = make_shared<string>(collection);
-    auto collection_it = string_cache.find(collection);
-    if (collection_it == string_cache.end()) {
-      string_cache.insert({ collection, collection_ptr });
-    } else {
-      collection_ptr = collection_it->second;
+  uint64_t nthreads = 16;
+  vector<std::thread> parser_threads;
+  vector<shared_ptr<ParserContext>> parser_contexts;
+  int fd = open(file.c_str(), O_RDONLY);
+  if (fd == -1) {
+      cout << "Error opening file" << endl;
+  }
+  struct stat file_stat;
+  fstat(fd, &file_stat);
+  char* mapped_buffer = (char*)mmap(NULL, file_stat.st_size, PROT_READ, MAP_SHARED, fd, 0);
+  if (mapped_buffer == nullptr) {
+      cout << "error mapping buffer" << endl;
+  }
+  uint64_t start_offset = 0;
+  uint64_t step_size = file_stat.st_size / nthreads;
+  for (int i = 0; i < nthreads; i++) {
+    char* end = mapped_buffer + start_offset + step_size;
+    while(*end != '\n') {
+        end--;
     }
-
-    uint64_t offset = 0, length = 0;
-    stringstream range_stream(range);
-    string offset_str, length_str;
-    getline(range_stream, offset_str, '~');
-    offset = stoll(offset_str);
-
-    if (ot != Truncate) {
-        // Truncate doesn't only has one number
-        getline(range_stream, length_str, '~');
-        length = stoll(length_str);
+    if (i == nthreads-1) {
+        end = mapped_buffer + file_stat.st_size;
     }
+    shared_ptr<ParserContext> context = make_shared<ParserContext>();
+    context->start = mapped_buffer + start_offset;
+    context->end = end;
+    context->max_buffer_size = 0;
+    parser_contexts.push_back(context);
+    parser_threads.push_back(std::thread(parse_entry_point, context));
+    start_offset += (end - mapped_buffer - start_offset);
+  }
+  for (auto& t : parser_threads) {
+      t.join();
+  }
+  for (auto context : parser_contexts) {
+      string_cache.insert(context->string_cache.begin(), context->string_cache.end());
+      ops.insert(ops.end(), context->ops.begin(), context->ops.end());
+      max_buffer_size = max(context->max_buffer_size, max_buffer_size);
+      context->string_cache.clear();
+      context->ops.clear();
+  }
 
-    max_buffer_size = max(length, max_buffer_size);
 
-    ops.push_back(Op(at, ot, offset, length, object_ptr, collection_ptr, who_ptr));
-  }
 
   int ret = cluster.init2("client.admin", "ceph", 0);
   if (ret < 0) {