From: Igor Fedotov Date: Thu, 30 Nov 2017 13:59:26 +0000 (+0300) Subject: test/fio: add single pool mode to share collections among multiple jobs X-Git-Tag: v13.0.2~866^2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=9d7291f7ae6769307cbf0192361bdde53fd73b0e;p=ceph.git test/fio: add single pool mode to share collections among multiple jobs Signed-off-by: Igor Fedotov ifedotov@suse.com --- diff --git a/src/test/fio/fio_ceph_objectstore.cc b/src/test/fio/fio_ceph_objectstore.cc index 531b73fb957b..5f4414903a5e 100644 --- a/src/test/fio/fio_ceph_objectstore.cc +++ b/src/test/fio/fio_ceph_objectstore.cc @@ -47,6 +47,7 @@ struct Options { _fastinfo_omap_len_low, _fastinfo_omap_len_high; bool simulate_pglog; + bool single_pool_mode; }; template // void Func(fio_option&) @@ -126,9 +127,86 @@ static std::vector ceph_options{ o.def = 0; o.minval = 0; }), + make_option([] (fio_option& o) { + o.name = "single_pool_mode"; + o.lname = "single(shared among jobs) pool mode"; + o.type = FIO_OPT_BOOL; + o.help = "Enables the mode when all jobs run against the same pool"; + o.off1 = offsetof(Options, single_pool_mode); + o.def = "0"; + }), {} // fio expects a 'null'-terminated list }; + +struct Collection { + spg_t pg; + coll_t cid; + ObjectStore::Sequencer sequencer; + // Can't use mutex directly in vectors hence dynamic allocation + + ceph::unique_ptr lock; + uint64_t pglog_ver_head = 1; + uint64_t pglog_ver_tail = 1; + uint64_t pglog_dup_ver_tail = 1; + + // use big pool ids to avoid clashing with existing collections + static constexpr int64_t MIN_POOL_ID = 0x0000ffffffffffff; + + Collection(const spg_t& pg) + : pg(pg), cid(pg), sequencer(stringify(pg)), + lock(new std::mutex) { + sequencer.shard_hint = pg; + } +}; + +int init_collections(std::unique_ptr& os, + uint64_t pool, + std::vector& collections, + uint64_t count) +{ + assert(count > 0); + collections.reserve(count); + + const int split_bits = cbits(count - 1); + + ObjectStore::Transaction t; + for (uint32_t i = 0; i < count; i++) { + auto pg = spg_t{pg_t{i, pool}}; + collections.emplace_back(pg); + + auto& coll = collections.back(); + if (!os->collection_exists(coll.cid)) { + t.create_collection(coll.cid, split_bits); + ghobject_t pgmeta_oid(coll.pg.make_pgmeta_oid()); + t.touch(coll.cid, pgmeta_oid); + } + } + ObjectStore::Sequencer sequencer("Engine init"); + int r = os->apply_transaction(&sequencer, std::move(t)); + if (r) + derr << "Engine init failed with " << cpp_strerror(-r) << dendl; + return r; +} + +int destroy_collections( + std::unique_ptr& os, + std::vector& collections) +{ + ObjectStore::Transaction t; + // remove our collections + for (auto& coll : collections) { + ghobject_t pgmeta_oid(coll.pg.make_pgmeta_oid()); + t.remove(coll.cid, pgmeta_oid); + t.remove_collection(coll.cid); + } + ObjectStore::Sequencer sequencer("Engine cleanup"); + int r = os->apply_transaction(&sequencer, std::move(t)); + if (r) + derr << "Engine cleanup failed with " << cpp_strerror(-r) << dendl; + return r; +} + /// global engine state shared between all jobs within the process. this /// includes g_ceph_context and the ObjectStore instance struct Engine { @@ -136,8 +214,11 @@ struct Engine { boost::intrusive_ptr cct; std::unique_ptr os; + std::vector collections; //< shared collections to spread objects over + std::mutex lock; int ref_count; + const bool unlink; //< unlink objects on destruction Engine(thread_data* td); ~Engine(); @@ -172,6 +253,9 @@ struct Engine { f->flush(ostr); delete f; + if (unlink) { + destroy_collections(os, collections); + } os->umount(); dout(0) << ostr.str() << dendl; } @@ -179,7 +263,8 @@ struct Engine { }; Engine::Engine(thread_data* td) - : ref_count(0) + : ref_count(0), + unlink(td->o.unlink) { // add the ceph command line arguments auto o = static_cast(td->eo); @@ -236,6 +321,13 @@ Engine::Engine(thread_data* td) if (r < 0) throw std::system_error(-r, std::system_category(), "mount failed"); + // create shared collections up to osd_pool_default_pg_num + if (o->single_pool_mode) { + uint64_t count = g_conf->get_val("osd_pool_default_pg_num"); + if (count > td->o.nr_files) + count = td->o.nr_files; + init_collections(os, Collection::MIN_POOL_ID, collections, count); + } } Engine::~Engine() @@ -243,28 +335,6 @@ Engine::~Engine() assert(!ref_count); } - -struct Collection { - spg_t pg; - coll_t cid; - ObjectStore::Sequencer sequencer; - - // Can't use mutex directly in vectors hence dynamic allocation - ceph::unique_ptr lock; - uint64_t pglog_ver_head = 1; - uint64_t pglog_ver_tail = 1; - uint64_t pglog_dup_ver_tail = 1; - - // use big pool ids to avoid clashing with existing collections - static constexpr int64_t MIN_POOL_ID = 0x0000ffffffffffff; - - Collection(const spg_t& pg) - : pg(pg), cid(pg), sequencer(stringify(pg)), - lock(new std::mutex) { - sequencer.shard_hint = pg; - } -}; - struct Object { ghobject_t oid; Collection& coll; @@ -274,10 +344,11 @@ struct Object { coll(coll) {} }; -/// treat each fio job like a separate pool with its own collections and objects +/// treat each fio job either like a separate pool with its own collections and objects +/// or just a client using its own objects from the shared pool struct Job { Engine* engine; //< shared ptr to the global Engine - std::vector collections; //< spread objects over collections + std::vector collections; //< job's private collections to spread objects over std::vector objects; //< associate an object with each fio_file std::vector events; //< completions for fio_ceph_os_event() const bool unlink; //< unlink objects on destruction @@ -295,29 +366,6 @@ Job::Job(Engine* engine, const thread_data* td) unlink(td->o.unlink) { engine->ref(); - // use the fio thread_number for our unique pool id - const uint64_t pool = Collection::MIN_POOL_ID + td->thread_number; - - // create a collection for each object, up to osd_pool_default_pg_num - uint64_t count = g_conf->get_val("osd_pool_default_pg_num"); - if (count > td->o.nr_files) - count = td->o.nr_files; - - assert(count > 0); - collections.reserve(count); - - const int split_bits = cbits(count - 1); - - ObjectStore::Transaction t; - for (uint32_t i = 0; i < count; i++) { - auto pg = spg_t{pg_t{i, pool}}; - collections.emplace_back(pg); - - auto& cid = collections.back().cid; - if (!engine->os->collection_exists(cid)) - t.create_collection(cid, split_bits); - } - auto o = static_cast(td->eo); unsigned long long max_data = max(o->oi_attr_len_high, o->snapset_attr_len_high); @@ -326,7 +374,21 @@ Job::Job(Engine* engine, const thread_data* td) max_data = max(max_data, o->_fastinfo_omap_len_high); one_for_all_data = buffer::create(max_data); + std::vector* colls; + // create private collections up to osd_pool_default_pg_num + if (!o->single_pool_mode) { + uint64_t count = g_conf->get_val("osd_pool_default_pg_num"); + if (count > td->o.nr_files) + count = td->o.nr_files; + // use the fio thread_number for our unique pool id + const uint64_t pool = Collection::MIN_POOL_ID + td->thread_number + 1; + init_collections(engine->os, pool, collections, count); + colls = &collections; + } else { + colls = &engine->collections; + } const uint64_t file_size = td->o.size / max(1u, td->o.nr_files); + ObjectStore::Transaction t; // create an object for each file in the job for (uint32_t i = 0; i < td->o.nr_files; i++) { @@ -334,8 +396,8 @@ Job::Job(Engine* engine, const thread_data* td) f->real_file_size = file_size; f->engine_pos = i; - // associate each object with a collection in a round-robin fashion - auto& coll = collections[i % collections.size()]; + // associate each object with a collection in a round-robin fashion. + auto& coll = (*colls)[i % colls->size()]; objects.emplace_back(f->file_name, coll); auto& oid = objects.back().oid; @@ -356,15 +418,12 @@ Job::Job(Engine* engine, const thread_data* td) Job::~Job() { if (unlink) { + destroy_collections(engine->os, collections); ObjectStore::Transaction t; // remove our objects for (auto& obj : objects) { t.remove(obj.coll.cid, obj.oid); } - // remove our collections - for (auto& coll : collections) { - t.remove_collection(coll.cid); - } ObjectStore::Sequencer sequencer("job cleanup"); int r = engine->os->apply_transaction(&sequencer, std::move(t)); if (r)