]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
test/fio: add single pool mode to share collections among multiple jobs 19101/head
authorIgor Fedotov <ifedotov@suse.com>
Thu, 30 Nov 2017 13:59:26 +0000 (16:59 +0300)
committerIgor Fedotov <ifedotov@suse.com>
Thu, 30 Nov 2017 13:59:26 +0000 (16:59 +0300)
Signed-off-by: Igor Fedotov ifedotov@suse.com
src/test/fio/fio_ceph_objectstore.cc

index 531b73fb957b63c78b49b1fb2cb039a92f7411e5..5f4414903a5e5c8a8d50760866f2489429aa89b9 100644 (file)
@@ -47,6 +47,7 @@ struct Options {
     _fastinfo_omap_len_low,
     _fastinfo_omap_len_high;
   bool simulate_pglog;
+  bool single_pool_mode;
 };
 
 template <class Func> // void Func(fio_option&)
@@ -126,9 +127,86 @@ static std::vector<fio_option> ceph_options{
     o.def    = 0;
     o.minval = 0;
   }),
+  make_option([] (fio_option& o) {
+    o.name   = "single_pool_mode";
+    o.lname  = "single(shared among jobs) pool mode";
+    o.type   = FIO_OPT_BOOL;
+    o.help   = "Enables the mode when all jobs run against the same pool";
+    o.off1   = offsetof(Options, single_pool_mode);
+    o.def    = "0";
+  }),
   {} // fio expects a 'null'-terminated list
 };
 
+
+struct Collection {
+  spg_t pg;
+  coll_t cid;
+  ObjectStore::Sequencer sequencer;
+  // Can't use mutex directly in vectors hence dynamic allocation
+
+  ceph::unique_ptr<std::mutex> lock;
+  uint64_t pglog_ver_head = 1;
+  uint64_t pglog_ver_tail = 1;
+  uint64_t pglog_dup_ver_tail = 1;
+
+  // use big pool ids to avoid clashing with existing collections
+  static constexpr int64_t MIN_POOL_ID = 0x0000ffffffffffff;
+
+  Collection(const spg_t& pg)
+    : pg(pg), cid(pg), sequencer(stringify(pg)),
+        lock(new std::mutex) {
+    sequencer.shard_hint = pg;
+  }
+};
+
+int init_collections(std::unique_ptr<ObjectStore>& os,
+                     uint64_t pool,
+                     std::vector<Collection>& collections,
+                     uint64_t count)
+{
+  assert(count > 0);
+  collections.reserve(count);
+
+  const int split_bits = cbits(count - 1);
+
+  ObjectStore::Transaction t;
+  for (uint32_t i = 0; i < count; i++) {
+    auto pg = spg_t{pg_t{i, pool}};
+    collections.emplace_back(pg);
+
+    auto& coll = collections.back();
+    if (!os->collection_exists(coll.cid)) {
+      t.create_collection(coll.cid, split_bits);
+      ghobject_t pgmeta_oid(coll.pg.make_pgmeta_oid());
+      t.touch(coll.cid, pgmeta_oid);
+    }
+  }
+  ObjectStore::Sequencer sequencer("Engine init");
+  int r = os->apply_transaction(&sequencer, std::move(t));
+  if (r)
+    derr << "Engine init failed with " << cpp_strerror(-r) << dendl;
+  return r;
+}
+
+int destroy_collections(
+  std::unique_ptr<ObjectStore>& os,
+  std::vector<Collection>& collections)
+{
+  ObjectStore::Transaction t;
+  // remove our collections
+  for (auto& coll : collections) {
+    ghobject_t pgmeta_oid(coll.pg.make_pgmeta_oid());
+    t.remove(coll.cid, pgmeta_oid);
+    t.remove_collection(coll.cid);
+  }
+  ObjectStore::Sequencer sequencer("Engine cleanup");
+  int r = os->apply_transaction(&sequencer, std::move(t));
+  if (r)
+    derr << "Engine cleanup failed with " << cpp_strerror(-r) << dendl;
+  return r;
+}
+
 /// global engine state shared between all jobs within the process. this
 /// includes g_ceph_context and the ObjectStore instance
 struct Engine {
@@ -136,8 +214,11 @@ struct Engine {
   boost::intrusive_ptr<CephContext> cct;
   std::unique_ptr<ObjectStore> os;
 
+  std::vector<Collection> collections; //< shared collections to spread objects over
+
   std::mutex lock;
   int ref_count;
+  const bool unlink; //< unlink objects on destruction
 
   Engine(thread_data* td);
   ~Engine();
@@ -172,6 +253,9 @@ struct Engine {
       f->flush(ostr);
       delete f;
 
+      if (unlink) {
+       destroy_collections(os, collections);
+      }
       os->umount();
       dout(0) <<  ostr.str() << dendl;
     }
@@ -179,7 +263,8 @@ struct Engine {
 };
 
 Engine::Engine(thread_data* td)
-  : ref_count(0)
+  : ref_count(0),
+    unlink(td->o.unlink)
 {
   // add the ceph command line arguments
   auto o = static_cast<Options*>(td->eo);
@@ -236,6 +321,13 @@ Engine::Engine(thread_data* td)
   if (r < 0)
     throw std::system_error(-r, std::system_category(), "mount failed");
 
+  // create shared collections up to osd_pool_default_pg_num
+  if (o->single_pool_mode) {
+    uint64_t count = g_conf->get_val<uint64_t>("osd_pool_default_pg_num");
+    if (count > td->o.nr_files)
+      count = td->o.nr_files;
+    init_collections(os, Collection::MIN_POOL_ID, collections, count);
+  }
 }
 
 Engine::~Engine()
@@ -243,28 +335,6 @@ Engine::~Engine()
   assert(!ref_count);
 }
 
-
-struct Collection {
-  spg_t pg;
-  coll_t cid;
-  ObjectStore::Sequencer sequencer;
-
-  // Can't use mutex directly in vectors hence dynamic allocation
-  ceph::unique_ptr<std::mutex> lock;
-  uint64_t pglog_ver_head = 1;
-  uint64_t pglog_ver_tail = 1;
-  uint64_t pglog_dup_ver_tail = 1;
-
-  // use big pool ids to avoid clashing with existing collections
-  static constexpr int64_t MIN_POOL_ID = 0x0000ffffffffffff;
-
-  Collection(const spg_t& pg)
-    : pg(pg), cid(pg), sequencer(stringify(pg)),
-        lock(new std::mutex) {
-  sequencer.shard_hint = pg;
-  }
-};
-
 struct Object {
   ghobject_t oid;
   Collection& coll;
@@ -274,10 +344,11 @@ struct Object {
       coll(coll) {}
 };
 
-/// treat each fio job like a separate pool with its own collections and objects
+/// treat each fio job either like a separate pool with its own collections and objects
+/// or just a client using its own objects from the shared pool
 struct Job {
   Engine* engine; //< shared ptr to the global Engine
-  std::vector<Collection> collections; //< spread objects over collections
+  std::vector<Collection> collections; //< job's private collections to spread objects over
   std::vector<Object> objects; //< associate an object with each fio_file
   std::vector<io_u*> events; //< completions for fio_ceph_os_event()
   const bool unlink; //< unlink objects on destruction
@@ -295,29 +366,6 @@ Job::Job(Engine* engine, const thread_data* td)
     unlink(td->o.unlink)
 {
   engine->ref();
-  // use the fio thread_number for our unique pool id
-  const uint64_t pool = Collection::MIN_POOL_ID + td->thread_number;
-
-  // create a collection for each object, up to osd_pool_default_pg_num
-  uint64_t count = g_conf->get_val<uint64_t>("osd_pool_default_pg_num");
-  if (count > td->o.nr_files)
-    count = td->o.nr_files;
-
-  assert(count > 0);
-  collections.reserve(count);
-
-  const int split_bits = cbits(count - 1);
-
-  ObjectStore::Transaction t;
-  for (uint32_t i = 0; i < count; i++) {
-    auto pg = spg_t{pg_t{i, pool}};
-    collections.emplace_back(pg);
-
-    auto& cid = collections.back().cid;
-    if (!engine->os->collection_exists(cid))
-      t.create_collection(cid, split_bits);
-  }
-
   auto o = static_cast<Options*>(td->eo);
   unsigned long long max_data = max(o->oi_attr_len_high,
                                  o->snapset_attr_len_high);
@@ -326,7 +374,21 @@ Job::Job(Engine* engine, const thread_data* td)
   max_data = max(max_data, o->_fastinfo_omap_len_high);
   one_for_all_data = buffer::create(max_data);
 
+  std::vector<Collection>* colls;
+  // create private collections up to osd_pool_default_pg_num
+  if (!o->single_pool_mode) {
+    uint64_t count = g_conf->get_val<uint64_t>("osd_pool_default_pg_num");
+    if (count > td->o.nr_files)
+      count = td->o.nr_files;
+    // use the fio thread_number for our unique pool id
+    const uint64_t pool = Collection::MIN_POOL_ID + td->thread_number + 1;
+    init_collections(engine->os, pool, collections, count);
+    colls = &collections;
+  } else {
+    colls = &engine->collections;
+  }
   const uint64_t file_size = td->o.size / max(1u, td->o.nr_files);
+  ObjectStore::Transaction t;
 
   // create an object for each file in the job
   for (uint32_t i = 0; i < td->o.nr_files; i++) {
@@ -334,8 +396,8 @@ Job::Job(Engine* engine, const thread_data* td)
     f->real_file_size = file_size;
     f->engine_pos = i;
 
-    // associate each object with a collection in a round-robin fashion
-    auto& coll = collections[i % collections.size()];
+    // associate each object with a collection in a round-robin fashion.
+    auto& coll = (*colls)[i % colls->size()];
 
     objects.emplace_back(f->file_name, coll);
     auto& oid = objects.back().oid;
@@ -356,15 +418,12 @@ Job::Job(Engine* engine, const thread_data* td)
 Job::~Job()
 {
   if (unlink) {
+    destroy_collections(engine->os, collections);
     ObjectStore::Transaction t;
     // remove our objects
     for (auto& obj : objects) {
       t.remove(obj.coll.cid, obj.oid);
     }
-    // remove our collections
-    for (auto& coll : collections) {
-      t.remove_collection(coll.cid);
-    }
     ObjectStore::Sequencer sequencer("job cleanup");
     int r = engine->os->apply_transaction(&sequencer, std::move(t));
     if (r)