]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
test/fio: add options to insert attributes/omaps into write transaction.
authorIgor Fedotov <ifedotov@suse.com>
Mon, 20 Nov 2017 18:59:04 +0000 (21:59 +0300)
committerIgor Fedotov <ifedotov@suse.com>
Thu, 30 Nov 2017 13:57:03 +0000 (16:57 +0300)
Signed-off-by: Igor Fedotov <ifedotov@suse.com>
src/test/fio/ceph-bluestore.fio
src/test/fio/fio_ceph_objectstore.cc

index 6b348944296e8e731458f590de2a01c19d6ea8fa..221ad563fcb0af6f5d5b6c4d0035aa7aaeb95db7 100644 (file)
@@ -5,6 +5,28 @@ ioengine=libfio_ceph_objectstore.so # must be found in your LD_LIBRARY_PATH
 conf=ceph-bluestore.conf # must point to a valid ceph configuration file
 directory=/mnt/fio-bluestore # directory for osd_data
 
+#oi_attr_len=350-4000 # specifies OI(aka '_') attribute length range to couple
+                      # writes with. Default: 0 (disabled)
+
+#snapset_attr_len=35  # specifies snapset attribute length range to couple
+                      # writes with. Default: 0 (disabled)
+
+#_fastinfo_omap_len=186 # specifies _fastinfo omap entry length range to
+                        # couple writes with. Default: 0 (disabled)
+
+#pglog_simulation=1   # couples write and omap generation in OSD PG log manner.
+                      # Ceph's osd_min_pg_log_entries, osd_pg_log_trim_min,
+                                         # osd_pg_log_dups_tracked settings control cyclic
+                                         # omap keys creation/removal.
+                      # Following additional FIO pglog_ settings to apply too:
+
+#pglog_omap_len=173   # specifies PG log entry length range to couple
+                      # writes with. Default: 0 (disabled)
+
+#pglog_dup_omap_len=57 # specifies duplicate PG log entry length range
+                       # to couple writes with. Default: 0 (disabled)
+
+
 rw=randwrite
 iodepth=16
 
index e832a481ddaf0bc94323a635559deb428b48b2b7..531b73fb957b63c78b49b1fb2cb039a92f7411e5 100644 (file)
 #include "common/errno.h"
 #include "include/intarith.h"
 #include "include/stringify.h"
+#include "include/random.h"
 #include "common/perf_counters.h"
 
 #include <fio.h>
 #include <optgroup.h>
 
 #include "include/assert.h" // fio.h clobbers our assert.h
+#include <algorithm>
 
 #define dout_context g_ceph_context
 #define dout_subsys ceph_subsys_
@@ -33,6 +35,18 @@ namespace {
 struct Options {
   thread_data* td;
   char* conf;
+  unsigned long long
+    oi_attr_len_low,
+    oi_attr_len_high,
+    snapset_attr_len_low,
+    snapset_attr_len_high,
+    pglog_omap_len_low,
+    pglog_omap_len_high,
+    pglog_dup_omap_len_low,
+    pglog_dup_omap_len_high,
+    _fastinfo_omap_len_low,
+    _fastinfo_omap_len_high;
+  bool simulate_pglog;
 };
 
 template <class Func> // void Func(fio_option&)
@@ -54,10 +68,67 @@ static std::vector<fio_option> ceph_options{
     o.help   = "Path to a ceph configuration file";
     o.off1   = offsetof(Options, conf);
   }),
+  make_option([] (fio_option& o) {
+    o.name   = "oi_attr_len";
+    o.lname  = "OI Attr length";
+    o.type   = FIO_OPT_STR_VAL;
+    o.help   = "Set OI(aka '_') attribute to specified length";
+    o.off1   = offsetof(Options, oi_attr_len_low);
+    o.off2   = offsetof(Options, oi_attr_len_high);
+    o.def    = 0;
+    o.minval = 0;
+  }),
+  make_option([] (fio_option& o) {
+    o.name   = "snapset_attr_len";
+    o.lname  = "Attr 'snapset' length";
+    o.type   = FIO_OPT_STR_VAL;
+    o.help   = "Set 'snapset' attribute to specified length";
+    o.off1   = offsetof(Options, snapset_attr_len_low);
+    o.off2   = offsetof(Options, snapset_attr_len_high);
+    o.def    = 0;
+    o.minval = 0;
+  }),
+  make_option([] (fio_option& o) {
+    o.name   = "_fastinfo_omap_len";
+    o.lname  = "'_fastinfo' omap entry length";
+    o.type   = FIO_OPT_STR_VAL;
+    o.help   = "Set '_fastinfo' OMAP attribute to specified length";
+    o.off1   = offsetof(Options, _fastinfo_omap_len_low);
+    o.off2   = offsetof(Options, _fastinfo_omap_len_high);
+    o.def    = 0;
+    o.minval = 0;
+  }),
+  make_option([] (fio_option& o) {
+    o.name   = "pglog_simulation";
+    o.lname  = "pglog behavior simulation";
+    o.type   = FIO_OPT_BOOL;
+    o.help   = "Enables PG Log simulation behavior";
+    o.off1   = offsetof(Options, simulate_pglog);
+    o.def    = "0";
+  }),
+  make_option([] (fio_option& o) {
+    o.name   = "pglog_omap_len";
+    o.lname  = "pglog omap entry length";
+    o.type   = FIO_OPT_STR_VAL;
+    o.help   = "Set pglog omap entry to specified length";
+    o.off1   = offsetof(Options, pglog_omap_len_low);
+    o.off2   = offsetof(Options, pglog_omap_len_high);
+    o.def    = 0;
+    o.minval = 0;
+  }),
+  make_option([] (fio_option& o) {
+    o.name   = "pglog_dup_omap_len";
+    o.lname  = "uplicate pglog omap entry length";
+    o.type   = FIO_OPT_STR_VAL;
+    o.help   = "Set duplicate pglog omap entry to specified length";
+    o.off1   = offsetof(Options, pglog_dup_omap_len_low);
+    o.off2   = offsetof(Options, pglog_dup_omap_len_high);
+    o.def    = 0;
+    o.minval = 0;
+  }),
   {} // fio expects a 'null'-terminated list
 };
 
-
 /// global engine state shared between all jobs within the process. this
 /// includes g_ceph_context and the ObjectStore instance
 struct Engine {
@@ -68,7 +139,7 @@ struct Engine {
   std::mutex lock;
   int ref_count;
 
-  Engine(const thread_data* td);
+  Engine(thread_data* td);
   ~Engine();
 
   static Engine* get_instance(thread_data* td) {
@@ -99,18 +170,19 @@ struct Engine {
       ostr << "Generate db histogram: ";
       os->generate_db_histogram(f);
       f->flush(ostr);
-
       delete f;
+
       os->umount();
       dout(0) <<  ostr.str() << dendl;
     }
   }
 };
 
-Engine::Engine(const thread_data* td) : ref_count(0)
+Engine::Engine(thread_data* td)
+  : ref_count(0)
 {
   // add the ceph command line arguments
-  auto o = static_cast<const Options*>(td->eo);
+  auto o = static_cast<Options*>(td->eo);
   if (!o->conf) {
     throw std::runtime_error("missing conf option for ceph configuration file");
   }
@@ -145,6 +217,17 @@ Engine::Engine(const thread_data* td) : ref_count(0)
     num_shards = g_conf->osd_op_num_shards_ssd;
   os->set_cache_shards(num_shards);
 
+  //normalize options
+  o->oi_attr_len_high = max(o->oi_attr_len_low, o->oi_attr_len_high);
+  o->snapset_attr_len_high = max(o->snapset_attr_len_low,
+                                     o->snapset_attr_len_high);
+  o->pglog_omap_len_high = max(o->pglog_omap_len_low,
+                                   o->pglog_omap_len_high);
+  o->pglog_dup_omap_len_high = max(o->pglog_dup_omap_len_low,
+                                       o->pglog_dup_omap_len_high);
+  o->_fastinfo_omap_len_high = max(o->_fastinfo_omap_len_low,
+                                       o->_fastinfo_omap_len_high);
+
   int r = os->mkfs();
   if (r < 0)
     throw std::system_error(-r, std::system_category(), "mkfs failed");
@@ -152,6 +235,7 @@ Engine::Engine(const thread_data* td) : ref_count(0)
   r = os->mount();
   if (r < 0)
     throw std::system_error(-r, std::system_category(), "mount failed");
+
 }
 
 Engine::~Engine()
@@ -165,12 +249,19 @@ struct Collection {
   coll_t cid;
   ObjectStore::Sequencer sequencer;
 
+  // Can't use mutex directly in vectors hence dynamic allocation
+  ceph::unique_ptr<std::mutex> lock;
+  uint64_t pglog_ver_head = 1;
+  uint64_t pglog_ver_tail = 1;
+  uint64_t pglog_dup_ver_tail = 1;
+
   // use big pool ids to avoid clashing with existing collections
   static constexpr int64_t MIN_POOL_ID = 0x0000ffffffffffff;
 
   Collection(const spg_t& pg)
-    : pg(pg), cid(pg), sequencer(stringify(pg)) {
-    sequencer.shard_hint = pg;
+    : pg(pg), cid(pg), sequencer(stringify(pg)),
+        lock(new std::mutex) {
+  sequencer.shard_hint = pg;
   }
 };
 
@@ -191,6 +282,9 @@ struct Job {
   std::vector<io_u*> events; //< completions for fio_ceph_os_event()
   const bool unlink; //< unlink objects on destruction
 
+  bufferptr one_for_all_data; //< preallocated buffer long enough
+                              //< to use for vairious operations
+
   Job(Engine* engine, const thread_data* td);
   ~Job();
 };
@@ -224,6 +318,14 @@ Job::Job(Engine* engine, const thread_data* td)
       t.create_collection(cid, split_bits);
   }
 
+  auto o = static_cast<Options*>(td->eo);
+  unsigned long long max_data = max(o->oi_attr_len_high,
+                                 o->snapset_attr_len_high);
+  max_data = max(max_data, o->pglog_omap_len_high);
+  max_data = max(max_data, o->pglog_dup_omap_len_high);
+  max_data = max(max_data, o->_fastinfo_omap_len_high);
+  one_for_all_data = buffer::create(max_data);
+
   const uint64_t file_size = td->o.size / max(1u, td->o.nr_files);
 
   // create an object for each file in the job
@@ -246,7 +348,7 @@ Job::Job(Engine* engine, const thread_data* td)
   ObjectStore::Sequencer sequencer("job init");
   int r = engine->os->apply_transaction(&sequencer, std::move(t));
   if (r) {
-   engine->deref();
+    engine->deref();
     throw std::system_error(r, std::system_category(), "job init");
   }
 }
@@ -349,6 +451,9 @@ int fio_ceph_os_queue(thread_data* td, io_u* u)
 {
   fio_ro_check(td, u);
 
+
+
+  auto o = static_cast<const Options*>(td->eo);
   auto job = static_cast<Job*>(td->io_ops_data);
   auto& object = job->objects[u->file->engine_pos];
   auto& coll = object.coll;
@@ -362,9 +467,116 @@ int fio_ceph_os_queue(thread_data* td, io_u* u)
     bl.push_back(buffer::copy(reinterpret_cast<char*>(u->xfer_buf),
                               u->xfer_buflen ) );
 
+    map<string,bufferptr> attrset;
+    map<string, bufferlist> omaps;
     // enqueue a write transaction on the collection's sequencer
     ObjectStore::Transaction t;
+    char ver_key[64];
+
+    // fill attrs if any
+    if (o->oi_attr_len_high) {
+      assert(o->oi_attr_len_high >= o->oi_attr_len_low);
+      // fill with the garbage as we do not care of the actual content...
+      job->one_for_all_data.set_length(
+        ceph::util::generate_random_number(
+         o->oi_attr_len_low, o->oi_attr_len_high));
+      attrset["_"] = job->one_for_all_data;
+    }
+    if (o->snapset_attr_len_high) {
+      assert(o->snapset_attr_len_high >= o->snapset_attr_len_low);
+      job->one_for_all_data.set_length(
+        ceph::util::generate_random_number
+         (o->snapset_attr_len_low, o->snapset_attr_len_high));
+      attrset["snapset"] = job->one_for_all_data;
+
+    }
+    if (o->_fastinfo_omap_len_high) {
+      assert(o->_fastinfo_omap_len_high >= o->_fastinfo_omap_len_low);
+      // fill with the garbage as we do not care of the actual content...
+      job->one_for_all_data.set_length(
+       ceph::util::generate_random_number(
+         o->_fastinfo_omap_len_low, o->_fastinfo_omap_len_high));
+      omaps["_fastinfo"].append(job->one_for_all_data);
+    }
+
+    uint64_t pglog_trim_head = 0, pglog_trim_tail = 0;
+    uint64_t pglog_dup_trim_head = 0, pglog_dup_trim_tail = 0;
+    if (o->simulate_pglog) {
+
+      uint64_t pglog_ver_cnt = 0;
+      {
+       std::lock_guard<std::mutex> l(*coll.lock);
+        pglog_ver_cnt = coll.pglog_ver_head++;
+       if (o->pglog_omap_len_high &&
+           pglog_ver_cnt >=
+             coll.pglog_ver_tail +
+               g_conf->osd_min_pg_log_entries + g_conf->osd_pg_log_trim_min) {
+         pglog_trim_tail = coll.pglog_ver_tail;
+         coll.pglog_ver_tail = pglog_trim_head =
+           pglog_trim_tail + g_conf->osd_pg_log_trim_min;
+
+         if (o->pglog_dup_omap_len_high &&
+             pglog_ver_cnt >=
+               coll.pglog_dup_ver_tail + g_conf->osd_pg_log_dups_tracked +
+                 g_conf->osd_pg_log_trim_min) {
+           pglog_dup_trim_tail = coll.pglog_dup_ver_tail;
+           coll.pglog_dup_ver_tail = pglog_dup_trim_head =
+             pglog_dup_trim_tail + g_conf->osd_pg_log_trim_min;
+         }
+       }
+      }
+
+      if (o->pglog_omap_len_high) {
+       assert(o->pglog_omap_len_high >= o->pglog_omap_len_low);
+       snprintf(ver_key, sizeof(ver_key),
+         "0000000011.%020llu", (unsigned long long)pglog_ver_cnt);
+       // fill with the garbage as we do not care of the actual content...
+        job->one_for_all_data.set_length(
+         ceph::util::generate_random_number(
+           o->pglog_omap_len_low, o->pglog_omap_len_high));
+       omaps[ver_key].append(job->one_for_all_data);
+      }
+      if (o->pglog_dup_omap_len_high) {
+       //insert dup
+       assert(o->pglog_dup_omap_len_high >= o->pglog_dup_omap_len_low);
+        for( auto i = pglog_trim_tail; i < pglog_trim_head; ++i) {
+         snprintf(ver_key, sizeof(ver_key),
+           "dup_0000000011.%020llu", (unsigned long long)i);
+         // fill with the garbage as we do not care of the actual content...
+         job->one_for_all_data.set_length(
+           ceph::util::generate_random_number(
+             o->pglog_dup_omap_len_low, o->pglog_dup_omap_len_high));
+         omaps[ver_key].append(job->one_for_all_data);
+       }
+      }
+    }
+
+    if (attrset.size()) {
+      t.setattrs(coll.cid, object.oid, attrset);
+    }
     t.write(coll.cid, object.oid, u->offset, u->xfer_buflen, bl, flags);
+
+    set<string> rmkeys;
+    for( auto i = pglog_trim_tail; i < pglog_trim_head; ++i) {
+       snprintf(ver_key, sizeof(ver_key),
+         "0000000011.%020llu", (unsigned long long)i);
+       rmkeys.emplace(ver_key);
+    }
+    for( auto i = pglog_dup_trim_tail; i < pglog_dup_trim_head; ++i) {
+       snprintf(ver_key, sizeof(ver_key),
+         "dup_0000000011.%020llu", (unsigned long long)i);
+       rmkeys.emplace(ver_key);
+    }
+
+    if (rmkeys.size()) {
+      ghobject_t pgmeta_oid(coll.pg.make_pgmeta_oid());
+      t.omap_rmkeys(coll.cid, pgmeta_oid, rmkeys);
+    }
+
+    if (omaps.size()) {
+      ghobject_t pgmeta_oid(coll.pg.make_pgmeta_oid());
+      t.omap_setkeys(coll.cid, pgmeta_oid, omaps);
+    }
     os->queue_transaction(&coll.sequencer,
                           std::move(t),
                           nullptr,