struct Collection {
spg_t pg;
coll_t cid;
- ObjectStore::Sequencer sequencer;
+ ObjectStore::CollectionHandle ch;
// Can't use mutex directly in vectors hence dynamic allocation
ceph::unique_ptr<std::mutex> lock;
// use big pool ids to avoid clashing with existing collections
static constexpr int64_t MIN_POOL_ID = 0x0000ffffffffffff;
- Collection(const spg_t& pg)
- : pg(pg), cid(pg), sequencer(stringify(pg)),
+ Collection(const spg_t& pg, ObjectStore::CollectionHandle _ch)
+ : pg(pg), cid(pg), ch(_ch),
lock(new std::mutex) {
- sequencer.shard_hint = pg;
}
};
+int destroy_collections(
+ std::unique_ptr<ObjectStore>& os,
+ std::vector<Collection>& collections)
+{
+ ObjectStore::Transaction t;
+ bool failed = false;
+ // remove our collections
+ for (auto& coll : collections) {
+ ghobject_t pgmeta_oid(coll.pg.make_pgmeta_oid());
+ t.remove(coll.cid, pgmeta_oid);
+ t.remove_collection(coll.cid);
+ int r = os->apply_transaction(coll.ch, std::move(t));
+ if (r && !failed) {
+ derr << "Engine cleanup failed with " << cpp_strerror(-r) << dendl;
+ failed = true;
+ }
+ }
+ return 0;
+}
+
int init_collections(std::unique_ptr<ObjectStore>& os,
uint64_t pool,
std::vector<Collection>& collections,
const int split_bits = cbits(count - 1);
- ObjectStore::Transaction t;
for (uint32_t i = 0; i < count; i++) {
auto pg = spg_t{pg_t{i, pool}};
- collections.emplace_back(pg);
+ coll_t cid(pg);
+
+ bool exists = os->collection_exists(cid);
+ auto ch = exists ?
+ os->open_collection(cid) :
+ os->create_new_collection(cid) ;
+ collections.emplace_back(pg, ch);
+
+ ObjectStore::Transaction t;
auto& coll = collections.back();
- if (!os->collection_exists(coll.cid)) {
+ if (!exists) {
t.create_collection(coll.cid, split_bits);
ghobject_t pgmeta_oid(coll.pg.make_pgmeta_oid());
t.touch(coll.cid, pgmeta_oid);
+ int r = os->apply_transaction(coll.ch, std::move(t));
+ if (r) {
+ derr << "Engine init failed with " << cpp_strerror(-r) << dendl;
+ destroy_collections(os, collections);
+ return r;
+ }
}
}
- ObjectStore::Sequencer sequencer("Engine init");
- int r = os->apply_transaction(&sequencer, std::move(t));
- if (r)
- derr << "Engine init failed with " << cpp_strerror(-r) << dendl;
- return r;
-}
-
-int destroy_collections(
- std::unique_ptr<ObjectStore>& os,
- std::vector<Collection>& collections)
-{
- ObjectStore::Transaction t;
- // remove our collections
- for (auto& coll : collections) {
- ghobject_t pgmeta_oid(coll.pg.make_pgmeta_oid());
- t.remove(coll.cid, pgmeta_oid);
- t.remove_collection(coll.cid);
- }
- ObjectStore::Sequencer sequencer("Engine cleanup");
- int r = os->apply_transaction(&sequencer, std::move(t));
- if (r)
- derr << "Engine cleanup failed with " << cpp_strerror(-r) << dendl;
- return r;
+ return 0;
}
/// global engine state shared between all jobs within the process. this
objects.emplace_back(f->file_name, coll);
auto& oid = objects.back().oid;
-
t.touch(coll.cid, oid);
t.truncate(coll.cid, oid, file_size);
- }
-
- // apply the entire transaction synchronously
- ObjectStore::Sequencer sequencer("job init");
- int r = engine->os->apply_transaction(&sequencer, std::move(t));
- if (r) {
- engine->deref();
- throw std::system_error(r, std::system_category(), "job init");
+ int r = engine->os->apply_transaction(coll.ch, std::move(t));
+ if (r) {
+ engine->deref();
+ throw std::system_error(r, std::system_category(), "job init");
+ }
}
}
Job::~Job()
{
if (unlink) {
- destroy_collections(engine->os, collections);
ObjectStore::Transaction t;
+ bool failed = false;
// remove our objects
for (auto& obj : objects) {
t.remove(obj.coll.cid, obj.oid);
+ int r = engine->os->apply_transaction(obj.coll.ch, std::move(t));
+ if (r && !failed) {
+ derr << "job cleanup failed with " << cpp_strerror(-r) << dendl;
+ failed = true;
+ }
}
- ObjectStore::Sequencer sequencer("job cleanup");
- int r = engine->os->apply_transaction(&sequencer, std::move(t));
- if (r)
- derr << "job cleanup failed with " << cpp_strerror(-r) << dendl;
+ destroy_collections(engine->os, collections);
}
engine->deref();
}
-
int fio_ceph_os_setup(thread_data* td)
{
// if there are multiple jobs, they must run in the same process against a
map<string,bufferptr> attrset;
map<string, bufferlist> omaps;
- // enqueue a write transaction on the collection's sequencer
+ // enqueue a write transaction on the collection's handle
ObjectStore::Transaction t;
char ver_key[64];
ghobject_t pgmeta_oid(coll.pg.make_pgmeta_oid());
t.omap_setkeys(coll.cid, pgmeta_oid, omaps);
}
- os->queue_transaction(&coll.sequencer,
+ os->queue_transaction(coll.ch,
std::move(t),
nullptr,
new UnitComplete(u));
if (u->ddir == DDIR_READ) {
// ObjectStore reads are synchronous, so make the call and return COMPLETED
bufferlist bl;
- int r = os->read(coll.cid, object.oid, u->offset, u->xfer_buflen, bl);
+ int r = os->read(coll.ch, object.oid, u->offset, u->xfer_buflen, bl);
if (r < 0) {
u->error = r;
td_verror(td, u->error, "xfer");