]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/tools/store_nbd: add log emulation to fs_driver
authorSamuel Just <sjust@redhat.com>
Fri, 27 Aug 2021 21:41:22 +0000 (14:41 -0700)
committerSamuel Just <sjust@redhat.com>
Sat, 28 Aug 2021 04:57:02 +0000 (04:57 +0000)
Signed-off-by: Samuel Just <sjust@redhat.com>
src/crimson/tools/store_nbd/block_driver.h
src/crimson/tools/store_nbd/fs_driver.cc
src/crimson/tools/store_nbd/fs_driver.h

index 54da206cb95519289cd2de3c6bace08b8d20818a..7f86d31dd5d8949c8df02113e2a1d0e6eaf6860f 100644 (file)
@@ -28,9 +28,12 @@ public:
   struct config_t {
     std::string type;
     bool mkfs = false;
-    unsigned num_collections = 128;
+    unsigned num_pgs = 128;
+    unsigned log_size = 1000;
     unsigned object_size = 4<<20 /* 4MB, rbd default */;
     unsigned oi_size = 1<<9 /* 512b */;
+    unsigned log_entry_size = 1<<9 /* 512b */;
+    bool prepopulate_log = false;
     std::optional<std::string> path;
 
     bool is_futurized_store() const {
@@ -46,6 +49,14 @@ public:
       return oi_size > 0;
     }
 
+    bool log_enabled() const {
+      return log_entry_size > 0 && log_size > 0;
+    }
+
+    bool prepopulate_log_enabled() const {
+      return prepopulate_log;
+    }
+
     void populate_options(
       boost::program_options::options_description &desc)
     {
@@ -63,10 +74,28 @@ public:
         ->notifier([this](auto s) { path = s; }),
         "Path to device for backend"
        )
-       ("num-collections",
+       ("num-pgs",
         po::value<unsigned>()
-        ->notifier([this](auto s) { num_collections = s; }),
-        "Number of collections to use for futurized_store backends"
+        ->notifier([this](auto s) { num_pgs = s; }),
+        "Number of pgs to use for futurized_store backends"
+       )
+       ("log-size",
+        po::value<unsigned>()
+        ->notifier([this](auto s) { log_size = s; }),
+        "Number of log entries per pg to use for futurized_store backends"
+        ", 0 to disable"
+       )
+       ("log-entry-size",
+        po::value<unsigned>()
+        ->notifier([this](auto s) { log_entry_size = s; }),
+        "Size of each log entry per pg to use for futurized_store backends"
+        ", 0 to disable"
+       )
+       ("prepopulate-log",
+        po::value<bool>()
+        ->notifier([this](auto s) { prepopulate_log = s; }),
+        "Prepopulate log on mount"
+       )
        ("object-info-size",
         po::value<unsigned>()
         ->notifier([this](auto s) { log_entry_size = s; }),
index 1f3909ba07b2db4e32a47864e5ac850cc931d9da..5d34d529a1ba010c7b37cb9ff62e329a4d49eb4a 100644 (file)
@@ -2,6 +2,7 @@
 // vim: ts=8 sw=2 smarttab
 
 #include <boost/iterator/counting_iterator.hpp>
+#include <fmt/format.h>
 
 #include "os/Transaction.h"
 #include "fs_driver.h"
@@ -13,16 +14,94 @@ coll_t get_coll(unsigned num) {
   return coll_t(spg_t(pg_t(0, num)));
 }
 
+ghobject_t get_log_object(unsigned coll)
+{
+  return ghobject_t(
+    shard_id_t::NO_SHARD,
+    0,
+    (coll << 16),
+    "",
+    "",
+    0,
+    ghobject_t::NO_GEN);
+}
+
+std::string make_log_key(
+  unsigned i)
+{
+  return fmt::format("log_entry_{}", i);
+}
+
+void add_log_entry(
+  unsigned i,
+  unsigned entry_size,
+  std::map<std::string, ceph::buffer::list> *omap)
+{
+  assert(omap);
+  bufferlist bl;
+  bl.append(ceph::buffer::create('0', entry_size));
+
+  omap->emplace(std::make_pair(make_log_key(i), bl));
+}
+
+void populate_log(
+  ceph::os::Transaction &t,
+  FSDriver::pg_analogue_t &pg,
+  unsigned entry_size,
+  unsigned entries)
+{
+  t.touch(pg.collection->get_cid(), pg.log_object);
+  // omap_clear not yet implemented, TODO
+  // t.omap_clear(pg.collection->get_cid(), pg.log_object);
+
+  std::map<std::string, ceph::buffer::list> omap;
+  for (unsigned i = 0; i < entries; ++i) {
+    add_log_entry(i, entry_size, &omap);
+  }
+
+  t.omap_setkeys(
+    pg.collection->get_cid(),
+    pg.log_object,
+    omap);
+
+  pg.log_head = entries;
+}
+
+void update_log(
+  ceph::os::Transaction &t,
+  FSDriver::pg_analogue_t &pg,
+  unsigned entry_size,
+  unsigned entries)
+{
+  ++pg.log_head;
+  std::map<std::string, ceph::buffer::list> key;
+  add_log_entry(pg.log_head, entry_size, &key);
+
+  t.omap_setkeys(
+    pg.collection->get_cid(),
+    pg.log_object,
+    key);
+
+
+  while ((pg.log_head - pg.log_tail) > entries) {
+    t.omap_rmkey(
+      pg.collection->get_cid(),
+      pg.log_object,
+      make_log_key(pg.log_tail));
+    ++pg.log_tail;
+  }
+}
+
 FSDriver::offset_mapping_t FSDriver::map_offset(off_t offset)
 {
   uint32_t objid = offset / config.object_size;
-  uint32_t collid = objid % config.num_collections;
+  uint32_t collid = objid % config.num_pgs;
   return offset_mapping_t{
     collections[collid],
     ghobject_t(
       shard_id_t::NO_SHARD,
       0,
-      (collid << 16) | objid,
+      (collid << 16) | (objid + 1),
       "",
       "",
       0,
@@ -41,7 +120,7 @@ seastar::future<> FSDriver::write(
   bufferlist bl;
   bl.append(ptr);
   t.write(
-    mapping.chandle->get_cid(), 
+    mapping.pg.collection->get_cid(),
     mapping.object,
     mapping.offset,
     ptr.length(),
@@ -52,14 +131,22 @@ seastar::future<> FSDriver::write(
     bufferlist attr;
     attr.append(ceph::buffer::create(config.oi_size, '0'));
     t.setattr(
-      mapping.chandle->get_cid(),
+      mapping.pg.collection->get_cid(),
       mapping.object,
       "_",
       attr);
   }
 
+  if (config.log_enabled()) {
+    update_log(
+      t,
+      mapping.pg,
+      config.log_entry_size,
+      config.log_size);
+  }
+
   return fs->do_transaction(
-    mapping.chandle,
+    mapping.pg.collection,
     std::move(t));
 }
 
@@ -70,7 +157,7 @@ seastar::future<bufferlist> FSDriver::read(
   auto mapping = map_offset(offset);
   ceph_assert((mapping.offset + size) <= config.object_size);
   return fs->read(
-    mapping.chandle,
+    mapping.pg.collection,
     mapping.object,
     mapping.offset,
     size,
@@ -109,7 +196,7 @@ seastar::future<> FSDriver::mkfs()
   }).then([this] {
     return seastar::do_for_each(
       boost::counting_iterator<unsigned>(0),
-      boost::counting_iterator<unsigned>(config.num_collections),
+      boost::counting_iterator<unsigned>(config.num_pgs),
       [this](auto i) {
        return fs->create_new_collection(get_coll(i)
        ).then([this, i](auto coll) {
@@ -141,12 +228,27 @@ seastar::future<> FSDriver::mount()
   }).then([this] {
     return seastar::do_for_each(
       boost::counting_iterator<unsigned>(0),
-      boost::counting_iterator<unsigned>(config.num_collections),
+      boost::counting_iterator<unsigned>(config.num_pgs),
       [this](auto i) {
        return fs->open_collection(get_coll(i)
        ).then([this, i](auto ref) {
-         collections[i] = ref;
-         return seastar::now();
+         collections[i].collection = ref;
+         collections[i].log_object = get_log_object(i);
+         if (config.log_enabled()) {
+           ceph::os::Transaction t;
+           if (config.prepopulate_log_enabled()) {
+             populate_log(
+               t,
+               collections[i],
+               config.log_entry_size,
+               config.log_size);
+           }
+           return fs->do_transaction(
+             collections[i].collection,
+             std::move(t));
+         } else {
+           return seastar::now();
+         }
        });
       });
   }).then([this] {
index ee4b7fb0b4a55a01219f685bd712b1eba43417c5..24abf63baad5d7f5de05ab9384cc36dbe9697c96 100644 (file)
@@ -44,10 +44,18 @@ private:
   const config_t config;
   seastar::alien::instance& alien;
   std::unique_ptr<crimson::os::FuturizedStore> fs;
-  std::map<unsigned, crimson::os::CollectionRef> collections;
+
+  struct pg_analogue_t {
+    crimson::os::CollectionRef collection;
+
+    ghobject_t log_object;
+    unsigned log_tail = 0;
+    unsigned log_head = 0;
+  };
+  std::map<unsigned, pg_analogue_t> collections;
 
   struct offset_mapping_t {
-    crimson::os::CollectionRef chandle;
+    pg_analogue_t &pg;
     ghobject_t object;
     off_t offset;
   };
@@ -55,4 +63,16 @@ private:
 
   seastar::future<> mkfs();
   void init();
+
+  friend void populate_log(
+    ceph::os::Transaction &,
+    pg_analogue_t &,
+    unsigned,
+    unsigned);
+
+  friend void update_log(
+    ceph::os::Transaction &,
+    FSDriver::pg_analogue_t &,
+    unsigned,
+    unsigned);
 };