_fastinfo_omap_len_low,
_fastinfo_omap_len_high;
bool simulate_pglog;
+ bool single_pool_mode;
};
template <class Func> // void Func(fio_option&)
o.def = 0;
o.minval = 0;
}),
+ make_option([] (fio_option& o) {
+ o.name = "single_pool_mode";
+ o.lname = "single(shared among jobs) pool mode";
+ o.type = FIO_OPT_BOOL;
+ o.help = "Enables the mode when all jobs run against the same pool";
+ o.off1 = offsetof(Options, single_pool_mode);
+ o.def = "0";
+ }),
{} // fio expects a 'null'-terminated list
};
+
+struct Collection {
+ spg_t pg;
+ coll_t cid;
+ ObjectStore::Sequencer sequencer;
+ // Can't use mutex directly in vectors hence dynamic allocation
+
+ ceph::unique_ptr<std::mutex> lock;
+ uint64_t pglog_ver_head = 1;
+ uint64_t pglog_ver_tail = 1;
+ uint64_t pglog_dup_ver_tail = 1;
+
+ // use big pool ids to avoid clashing with existing collections
+ static constexpr int64_t MIN_POOL_ID = 0x0000ffffffffffff;
+
+ Collection(const spg_t& pg)
+ : pg(pg), cid(pg), sequencer(stringify(pg)),
+ lock(new std::mutex) {
+ sequencer.shard_hint = pg;
+ }
+};
+
+int init_collections(std::unique_ptr<ObjectStore>& os,
+ uint64_t pool,
+ std::vector<Collection>& collections,
+ uint64_t count)
+{
+ assert(count > 0);
+ collections.reserve(count);
+
+ const int split_bits = cbits(count - 1);
+
+ ObjectStore::Transaction t;
+ for (uint32_t i = 0; i < count; i++) {
+ auto pg = spg_t{pg_t{i, pool}};
+ collections.emplace_back(pg);
+
+ auto& coll = collections.back();
+ if (!os->collection_exists(coll.cid)) {
+ t.create_collection(coll.cid, split_bits);
+ ghobject_t pgmeta_oid(coll.pg.make_pgmeta_oid());
+ t.touch(coll.cid, pgmeta_oid);
+ }
+ }
+ ObjectStore::Sequencer sequencer("Engine init");
+ int r = os->apply_transaction(&sequencer, std::move(t));
+ if (r)
+ derr << "Engine init failed with " << cpp_strerror(-r) << dendl;
+ return r;
+}
+
+int destroy_collections(
+ std::unique_ptr<ObjectStore>& os,
+ std::vector<Collection>& collections)
+{
+ ObjectStore::Transaction t;
+ // remove our collections
+ for (auto& coll : collections) {
+ ghobject_t pgmeta_oid(coll.pg.make_pgmeta_oid());
+ t.remove(coll.cid, pgmeta_oid);
+ t.remove_collection(coll.cid);
+ }
+ ObjectStore::Sequencer sequencer("Engine cleanup");
+ int r = os->apply_transaction(&sequencer, std::move(t));
+ if (r)
+ derr << "Engine cleanup failed with " << cpp_strerror(-r) << dendl;
+ return r;
+}
+
/// global engine state shared between all jobs within the process. this
/// includes g_ceph_context and the ObjectStore instance
struct Engine {
boost::intrusive_ptr<CephContext> cct;
std::unique_ptr<ObjectStore> os;
+ std::vector<Collection> collections; //< shared collections to spread objects over
+
std::mutex lock;
int ref_count;
+ const bool unlink; //< unlink objects on destruction
Engine(thread_data* td);
~Engine();
f->flush(ostr);
delete f;
+ if (unlink) {
+ destroy_collections(os, collections);
+ }
os->umount();
dout(0) << ostr.str() << dendl;
}
};
Engine::Engine(thread_data* td)
- : ref_count(0)
+ : ref_count(0),
+ unlink(td->o.unlink)
{
// add the ceph command line arguments
auto o = static_cast<Options*>(td->eo);
if (r < 0)
throw std::system_error(-r, std::system_category(), "mount failed");
+ // create shared collections up to osd_pool_default_pg_num
+ if (o->single_pool_mode) {
+ uint64_t count = g_conf->get_val<uint64_t>("osd_pool_default_pg_num");
+ if (count > td->o.nr_files)
+ count = td->o.nr_files;
+ init_collections(os, Collection::MIN_POOL_ID, collections, count);
+ }
}
Engine::~Engine()
assert(!ref_count);
}
-
-struct Collection {
- spg_t pg;
- coll_t cid;
- ObjectStore::Sequencer sequencer;
-
- // Can't use mutex directly in vectors hence dynamic allocation
- ceph::unique_ptr<std::mutex> lock;
- uint64_t pglog_ver_head = 1;
- uint64_t pglog_ver_tail = 1;
- uint64_t pglog_dup_ver_tail = 1;
-
- // use big pool ids to avoid clashing with existing collections
- static constexpr int64_t MIN_POOL_ID = 0x0000ffffffffffff;
-
- Collection(const spg_t& pg)
- : pg(pg), cid(pg), sequencer(stringify(pg)),
- lock(new std::mutex) {
- sequencer.shard_hint = pg;
- }
-};
-
struct Object {
ghobject_t oid;
Collection& coll;
coll(coll) {}
};
-/// treat each fio job like a separate pool with its own collections and objects
+/// treat each fio job either like a separate pool with its own collections and objects
+/// or just a client using its own objects from the shared pool
struct Job {
Engine* engine; //< shared ptr to the global Engine
- std::vector<Collection> collections; //< spread objects over collections
+ std::vector<Collection> collections; //< job's private collections to spread objects over
std::vector<Object> objects; //< associate an object with each fio_file
std::vector<io_u*> events; //< completions for fio_ceph_os_event()
const bool unlink; //< unlink objects on destruction
unlink(td->o.unlink)
{
engine->ref();
- // use the fio thread_number for our unique pool id
- const uint64_t pool = Collection::MIN_POOL_ID + td->thread_number;
-
- // create a collection for each object, up to osd_pool_default_pg_num
- uint64_t count = g_conf->get_val<uint64_t>("osd_pool_default_pg_num");
- if (count > td->o.nr_files)
- count = td->o.nr_files;
-
- assert(count > 0);
- collections.reserve(count);
-
- const int split_bits = cbits(count - 1);
-
- ObjectStore::Transaction t;
- for (uint32_t i = 0; i < count; i++) {
- auto pg = spg_t{pg_t{i, pool}};
- collections.emplace_back(pg);
-
- auto& cid = collections.back().cid;
- if (!engine->os->collection_exists(cid))
- t.create_collection(cid, split_bits);
- }
-
auto o = static_cast<Options*>(td->eo);
unsigned long long max_data = max(o->oi_attr_len_high,
o->snapset_attr_len_high);
max_data = max(max_data, o->_fastinfo_omap_len_high);
one_for_all_data = buffer::create(max_data);
+ std::vector<Collection>* colls;
+ // create private collections up to osd_pool_default_pg_num
+ if (!o->single_pool_mode) {
+ uint64_t count = g_conf->get_val<uint64_t>("osd_pool_default_pg_num");
+ if (count > td->o.nr_files)
+ count = td->o.nr_files;
+ // use the fio thread_number for our unique pool id
+ const uint64_t pool = Collection::MIN_POOL_ID + td->thread_number + 1;
+ init_collections(engine->os, pool, collections, count);
+ colls = &collections;
+ } else {
+ colls = &engine->collections;
+ }
const uint64_t file_size = td->o.size / max(1u, td->o.nr_files);
+ ObjectStore::Transaction t;
// create an object for each file in the job
for (uint32_t i = 0; i < td->o.nr_files; i++) {
f->real_file_size = file_size;
f->engine_pos = i;
- // associate each object with a collection in a round-robin fashion
- auto& coll = collections[i % collections.size()];
+ // associate each object with a collection in a round-robin fashion.
+ auto& coll = (*colls)[i % colls->size()];
objects.emplace_back(f->file_name, coll);
auto& oid = objects.back().oid;
Job::~Job()
{
if (unlink) {
+ destroy_collections(engine->os, collections);
ObjectStore::Transaction t;
// remove our objects
for (auto& obj : objects) {
t.remove(obj.coll.cid, obj.oid);
}
- // remove our collections
- for (auto& coll : collections) {
- t.remove_collection(coll.cid);
- }
ObjectStore::Sequencer sequencer("job cleanup");
int r = engine->os->apply_transaction(&sequencer, std::move(t));
if (r)