]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
fio: generalize for other ObjectStores
authorCasey Bodley <cbodley@redhat.com>
Tue, 29 Sep 2015 18:40:39 +0000 (14:40 -0400)
committerCasey Bodley <cbodley@redhat.com>
Tue, 13 Sep 2016 15:02:17 +0000 (11:02 -0400)
* renamed target to fio_ceph_objectstore
* moved into src/test/fio subdirectory
* added to cmake build
* added support for DDIR_READ
* added required fio option 'conf' to load a ceph configuration file
* added multiple collections per job to simulate parallelism from pgs
* creates objects of the appropriate size on setup
* added support for multiple jobs that share an ObjectStore instance

Signed-off-by: Casey Bodley <cbodley@redhat.com>
CMakeLists.txt
cmake/modules/Findfio.cmake [new file with mode: 0644]
src/common/config_opts.h
src/test/CMakeLists.txt
src/test/fio/CMakeLists.txt [new file with mode: 0644]
src/test/fio/ceph-objectstore.fio [new file with mode: 0644]
src/test/fio/fio_ceph_objectstore.cc [new file with mode: 0644]
src/test/fio_ceph_filestore.cc [deleted file]

index 7aeb4ae6acd5c96fbfb98f850cdbe8afac650634..0c19e2463e73a1c65ca877e8b9e860c555085aa6 100644 (file)
@@ -373,6 +373,11 @@ option(PG_DEBUG_REFS "PG Ref debugging is enabled" OFF)
 
 option(WITH_TESTS "enable the build of ceph-test package scripts/binaries" ON)
 
+option(WITH_FIO "Support for fio engines" OFF)
+if(WITH_FIO)
+  find_package(fio REQUIRED)
+endif(WITH_FIO)
+
 if(LINUX)
   add_definitions(-D__linux__)
 endif(LINUX)
diff --git a/cmake/modules/Findfio.cmake b/cmake/modules/Findfio.cmake
new file mode 100644 (file)
index 0000000..194f919
--- /dev/null
@@ -0,0 +1,12 @@
+# - Find Fio
+# Find the fio includes
+#
+# FIO_INCLUDE_DIR - where to find fio.h
+# FIO_FOUND - True if fio is found.
+
+find_path(FIO_INCLUDE_DIR NAMES fio.h HINTS ${FIO_ROOT_DIR})
+
+include(FindPackageHandleStandardArgs)
+find_package_handle_standard_args(fio DEFAULT_MSG FIO_INCLUDE_DIR)
+
+mark_as_advanced(FIO_INCLUDE_DIR)
index b601f300be7a3bb9212dc212780ff3b37d79e252..acfae3a4a9e625470489ba324c0fc1798935f588 100644 (file)
@@ -1184,6 +1184,8 @@ OPTION(journal_zero_on_create, OPT_BOOL, false)
 OPTION(journal_ignore_corruption, OPT_BOOL, false) // assume journal is not corrupt
 OPTION(journal_discard, OPT_BOOL, false) //using ssd disk as journal, whether support discard nouse journal-data.
 
+OPTION(fio_dir, OPT_STR, "/tmp/fio") // fio data directory for fio-objectstore
+
 OPTION(rados_mon_op_timeout, OPT_DOUBLE, 0) // how many seconds to wait for a response from the monitor before returning an error from a rados operation. 0 means on limit.
 OPTION(rados_osd_op_timeout, OPT_DOUBLE, 0) // how many seconds to wait for a response from osds before returning an error from a rados operation. 0 means no limit.
 OPTION(rados_tracing, OPT_BOOL, false) // true if LTTng-UST tracepoints should be enabled
index 8c1c2814ad4bcd6614717fc0e2011b8f96cef138..d9230720b6c32a03d1ed08ebb85305a10b04f0e6 100644 (file)
@@ -48,6 +48,9 @@ if(WITH_RBD)
   add_subdirectory(rbd_mirror)
 endif(WITH_RBD)
 add_subdirectory(system)
+if(WITH_FIO)
+  add_subdirectory(fio)
+endif()
 
 # test_timers
 add_executable(ceph_test_timers
diff --git a/src/test/fio/CMakeLists.txt b/src/test/fio/CMakeLists.txt
new file mode 100644 (file)
index 0000000..f349e32
--- /dev/null
@@ -0,0 +1,18 @@
+add_library(fio_ceph_objectstore SHARED fio_ceph_objectstore.cc)
+target_include_directories(fio_ceph_objectstore PUBLIC ${FIO_INCLUDE_DIR})
+
+# prevent fio from adding a 'typedef int bool'
+set(FIO_CFLAGS "-DCONFIG_HAVE_BOOL")
+
+# fio headers use typeof(), which requires c++11 extensions
+if(CMAKE_VERSION VERSION_LESS "3.1")
+  set_target_properties(fio_ceph_objectstore PROPERTIES
+    COMPILE_FLAGS "-std=gnu++11 ${FIO_CFLAGS}")
+else()
+  set_target_properties(fio_ceph_objectstore PROPERTIES
+    CXX_EXTENSIONS ON
+    COMPILE_FLAGS "${FIO_CFLAGS}")
+endif()
+
+target_link_libraries(fio_ceph_objectstore os global)
+install(TARGETS fio_ceph_objectstore DESTINATION lib)
diff --git a/src/test/fio/ceph-objectstore.fio b/src/test/fio/ceph-objectstore.fio
new file mode 100644 (file)
index 0000000..b2f244f
--- /dev/null
@@ -0,0 +1,19 @@
+######################################################################
+# Example test for the external fio ioengine for ObjectStore.
+#
+# Runs a 4k random write test against a ObjectStore configuration.
+#
+######################################################################
+[global]
+ioengine=./.libs/libfio_ceph_objectstore.so
+invalidate=0   # mandatory
+rw=randwrite
+size=1g
+bs=4k
+
+[ceph_objectstore]
+iodepth=1
+objectstore=filestore
+#filestore_debug=20
+directory=/mnt/fio_ceph_filestore.XXXXXXX
+filestore_journal=/var/lib/ceph/osd/journal-ram/fio_ceph_filestore.XXXXXXX
diff --git a/src/test/fio/fio_ceph_objectstore.cc b/src/test/fio/fio_ceph_objectstore.cc
new file mode 100644 (file)
index 0000000..ca86ec3
--- /dev/null
@@ -0,0 +1,406 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ *  Ceph ObjectStore engine
+ *
+ * IO engine using Ceph's ObjectStore class to test low-level performance of
+ * Ceph OSDs.
+ *
+ */
+
+#include <memory>
+#include <system_error>
+#include <vector>
+
+#include "os/ObjectStore.h"
+#include "global/global_init.h"
+#include "common/errno.h"
+#include "include/intarith.h"
+#include "include/stringify.h"
+
+#include <fio.h>
+#include <optgroup.h>
+
+#include "include/assert.h" // fio.h clobbers our assert.h
+
+
+// enable boost::intrusive_ptr<CephContext>
+void intrusive_ptr_add_ref(CephContext* cct) { cct->get(); }
+void intrusive_ptr_release(CephContext* cct) { cct->put(); }
+
+namespace {
+
+/// fio configuration options read from the job file
+struct Options {
+  thread_data* td;
+  char* conf;
+};
+
+template <class Func> // void Func(fio_option&)
+fio_option make_option(Func&& func)
+{
+  // zero-initialize and set common defaults
+  auto o = fio_option{};
+  o.category = FIO_OPT_C_ENGINE;
+  o.group    = FIO_OPT_G_RBD;
+  func(std::ref(o));
+  return o;
+}
+
+static std::vector<fio_option> ceph_options{
+  make_option([] (fio_option& o) {
+    o.name   = "conf";
+    o.lname  = "ceph configuration file";
+    o.type   = FIO_OPT_STR_STORE;
+    o.help   = "Path to a ceph configuration file";
+    o.off1   = offsetof(Options, conf);
+  }),
+  {} // fio expects a 'null'-terminated list
+};
+
+
+/// global engine state shared between all jobs within the process. this
+/// includes g_ceph_context and the ObjectStore instance
+struct Engine {
+  /// the initial g_ceph_context reference to be dropped on destruction
+  boost::intrusive_ptr<CephContext> cct;
+  std::unique_ptr<ObjectStore> os;
+
+  Engine(const thread_data* td);
+  ~Engine();
+
+  static Engine* get_instance(thread_data* td) {
+    // note: creates an Engine with the options associated with the first job
+    static Engine engine(td);
+    return &engine;
+  }
+};
+
+Engine::Engine(const thread_data* td)
+{
+  // add the ceph command line arguments
+  auto o = static_cast<const Options*>(td->eo);
+  if (!o->conf) {
+    throw std::runtime_error("missing conf option for ceph configuration file");
+  }
+  std::vector<const char*> args{
+    "-i", "0", // identify as osd.0 for osd_data and osd_journal
+    "--conf", o->conf, // use the requested conf file
+  };
+  if (td->o.directory) { // allow conf files to use ${fio_dir} for data
+    args.emplace_back("--fio_dir");
+    args.emplace_back(td->o.directory);
+  }
+
+  global_init(nullptr, args, CEPH_ENTITY_TYPE_OSD, CODE_ENVIRONMENT_UTILITY, 0);
+  common_init_finish(g_ceph_context);
+
+  // claim the g_ceph_context reference and release it on destruction
+  cct = boost::intrusive_ptr<CephContext>(g_ceph_context, false);
+
+  // create the ObjectStore
+  os.reset(ObjectStore::create(g_ceph_context,
+                               g_conf->osd_objectstore,
+                               g_conf->osd_data,
+                               g_conf->osd_journal));
+  if (!os)
+    throw std::runtime_error("bad objectstore type " + g_conf->osd_objectstore);
+
+  os->set_cache_shards(g_conf->osd_op_num_shards);
+
+  int r = os->mkfs();
+  if (r < 0)
+    throw std::system_error(-r, std::system_category(), "mkfs failed");
+
+  r = os->mount();
+  if (r < 0)
+    throw std::system_error(-r, std::system_category(), "mount failed");
+}
+
+Engine::~Engine()
+{
+  os->umount();
+}
+
+
+struct Collection {
+  spg_t pg;
+  coll_t cid;
+  ObjectStore::Sequencer sequencer;
+
+  // use big pool ids to avoid clashing with existing collections
+  static constexpr int64_t MIN_POOL_ID = 0x0000ffffffffffff;
+
+  Collection(const spg_t& pg)
+    : pg(pg), cid(pg), sequencer(stringify(pg)) {}
+};
+
+struct Object {
+  ghobject_t oid;
+  Collection& coll;
+
+  Object(const char* name, Collection& coll)
+    : oid(hobject_t(name, "", CEPH_NOSNAP, coll.pg.ps(), coll.pg.pool(), "")),
+      coll(coll) {}
+};
+
+/// treat each fio job like a separate pool with its own collections and objects
+struct Job {
+  Engine* engine; //< shared ptr to the global Engine
+  std::vector<Collection> collections; //< spread objects over collections
+  std::vector<Object> objects; //< associate an object with each fio_file
+  std::vector<io_u*> events; //< completions for fio_ceph_os_event()
+  const bool unlink; //< unlink objects on destruction
+
+  Job(Engine* engine, const thread_data* td);
+  ~Job();
+};
+
+Job::Job(Engine* engine, const thread_data* td)
+  : engine(engine),
+    events(td->o.iodepth),
+    unlink(td->o.unlink)
+{
+  // use the fio thread_number for our unique pool id
+  const uint64_t pool = Collection::MIN_POOL_ID + td->thread_number;
+
+  // create a collection for each object, up to osd_pool_default_pg_num
+  uint32_t count = g_conf->osd_pool_default_pg_num;
+  if (count > td->o.nr_files)
+    count = td->o.nr_files;
+
+  assert(count > 0);
+  collections.reserve(count);
+
+  const int split_bits = cbits(count - 1);
+
+  ObjectStore::Transaction t;
+  for (uint32_t i = 0; i < count; i++) {
+    auto pg = spg_t{pg_t{i, pool}};
+    collections.emplace_back(pg);
+
+    auto& cid = collections.back().cid;
+    if (!engine->os->collection_exists(cid))
+      t.create_collection(cid, split_bits);
+  }
+
+  const uint64_t file_size = td->o.size / max(1u, td->o.nr_files);
+
+  // create an object for each file in the job
+  for (uint32_t i = 0; i < td->o.nr_files; i++) {
+    auto f = td->files[i];
+    f->real_file_size = file_size;
+    f->engine_data = i;
+
+    // associate each object with a collection in a round-robin fashion
+    auto& coll = collections[i % collections.size()];
+
+    objects.emplace_back(f->file_name, coll);
+    auto& oid = objects.back().oid;
+
+    t.touch(coll.cid, oid);
+    t.truncate(coll.cid, oid, file_size);
+  }
+
+  // apply the entire transaction synchronously
+  ObjectStore::Sequencer sequencer("job init");
+  int r = engine->os->apply_transaction(&sequencer, std::move(t));
+  if (r)
+    throw std::system_error(r, std::system_category(), "job init");
+}
+
+Job::~Job()
+{
+  if (unlink) {
+    ObjectStore::Transaction t;
+    // remove our objects
+    for (auto& obj : objects) {
+      t.remove(obj.coll.cid, obj.oid);
+    }
+    // remove our collections
+    for (auto& coll : collections) {
+      t.remove_collection(coll.cid);
+    }
+    ObjectStore::Sequencer sequencer("job cleanup");
+    int r = engine->os->apply_transaction(&sequencer, std::move(t));
+    if (r)
+      derr << "job cleanup failed with " << cpp_strerror(-r) << dendl;
+  }
+}
+
+
+int fio_ceph_os_setup(thread_data* td)
+{
+  // if there are multiple jobs, they must run in the same process against a
+  // single instance of the ObjectStore. explicitly disable fio's default
+  // job-per-process configuration
+  td->o.use_thread = 1;
+
+  try {
+    // get or create the global Engine instance
+    auto engine = Engine::get_instance(td);
+    // create a Job for this thread
+    td->io_ops_data = new Job(engine, td);
+  } catch (std::exception& e) {
+    std::cerr << "setup failed with " << e.what() << std::endl;
+    return -1;
+  }
+  return 0;
+}
+
+void fio_ceph_os_cleanup(thread_data* td)
+{
+  auto job = static_cast<Job*>(td->io_ops_data);
+  td->io_ops_data = nullptr;
+  delete job;
+}
+
+
+io_u* fio_ceph_os_event(thread_data* td, int event)
+{
+  // return the requested event from fio_ceph_os_getevents()
+  auto job = static_cast<Job*>(td->io_ops_data);
+  return job->events[event];
+}
+
+int fio_ceph_os_getevents(thread_data* td, unsigned int min,
+                          unsigned int max, const timespec* t)
+{
+  auto job = static_cast<Job*>(td->io_ops_data);
+  unsigned int events = 0;
+  io_u* u;
+  unsigned int i;
+
+  // loop through inflight ios until we find 'min' completions
+  do {
+    io_u_qiter(&td->io_u_all, u, i) {
+      if (!(u->flags & IO_U_F_FLIGHT))
+        continue;
+
+      if (u->engine_data) {
+        u->engine_data = nullptr;
+        job->events[events] = u;
+        events++;
+      }
+    }
+    if (events >= min)
+      break;
+    usleep(100);
+  } while (1);
+
+  return events;
+}
+
+/// completion context for ObjectStore::queue_transaction()
+class UnitComplete : public Context {
+  io_u* u;
+ public:
+  UnitComplete(io_u* u) : u(u) {}
+  void finish(int r) {
+    // mark the pointer to indicate completion for fio_ceph_os_getevents()
+    u->engine_data = reinterpret_cast<void*>(1ull);
+  }
+};
+
+int fio_ceph_os_queue(thread_data* td, io_u* u)
+{
+  fio_ro_check(td, u);
+
+  auto job = static_cast<Job*>(td->io_ops_data);
+  auto& object = job->objects[u->file->engine_data];
+  auto& coll = object.coll;
+  auto& os = job->engine->os;
+
+  if (u->ddir == DDIR_WRITE) {
+    // provide a hint if we're likely to read this data back
+    const int flags = td_rw(td) ? CEPH_OSD_OP_FLAG_FADVISE_WILLNEED : 0;
+
+    bufferlist bl;
+    bl.push_back(buffer::create_static(u->xfer_buflen,
+                                       static_cast<char*>(u->xfer_buf)));
+    // enqueue a write transaction on the collection's sequencer
+    ObjectStore::Transaction t;
+    t.write(coll.cid, object.oid, u->offset, u->xfer_buflen, bl, flags);
+    os->queue_transaction(&coll.sequencer, std::move(t), new UnitComplete(u));
+    return FIO_Q_QUEUED;
+  }
+
+  if (u->ddir == DDIR_READ) {
+    // ObjectStore reads are synchronous, so make the call and return COMPLETED
+    bufferlist bl;
+    int r = os->read(coll.cid, object.oid, u->offset, u->xfer_buflen, bl);
+    if (r < 0) {
+      u->error = r;
+      td_verror(td, u->error, "xfer");
+    } else {
+      bl.copy(0, bl.length(), static_cast<char*>(u->xfer_buf));
+      u->resid = u->xfer_buflen - r;
+    }
+    return FIO_Q_COMPLETED;
+  }
+
+  derr << "WARNING: Only DDIR_READ and DDIR_WRITE are supported!" << dendl;
+  u->error = -EINVAL;
+  td_verror(td, u->error, "xfer");
+  return FIO_Q_COMPLETED;
+}
+
+int fio_ceph_os_commit(thread_data* td)
+{
+  // commit() allows the engine to batch up queued requests to be submitted all
+  // at once. it would be natural for queue() to collect transactions in a list,
+  // and use commit() to pass them all to ObjectStore::queue_transactions(). but
+  // because we spread objects over multiple collections, we a) need to use a
+  // different sequencer for each collection, and b) are less likely to see a
+  // benefit from batching requests within a collection
+  return 0;
+}
+
+// open/close are noops. we set the FIO_DISKLESSIO flag in ioengine_ops to
+// prevent fio from creating the files
+int fio_ceph_os_open(thread_data* td, fio_file* f) { return 0; }
+int fio_ceph_os_close(thread_data* td, fio_file* f) { return 0; }
+
+int fio_ceph_os_io_u_init(thread_data* td, io_u* u)
+{
+  // no data is allocated, we just use the pointer as a boolean 'completed' flag
+  u->engine_data = nullptr;
+  return 0;
+}
+
+void fio_ceph_os_io_u_free(thread_data* td, io_u* u)
+{
+  u->engine_data = nullptr;
+}
+
+
+// ioengine_ops for get_ioengine()
+struct ceph_ioengine : public ioengine_ops {
+  ceph_ioengine() : ioengine_ops({}) {
+    name        = "ceph-os";
+    version     = FIO_IOOPS_VERSION;
+    flags       = FIO_DISKLESSIO;
+    setup       = fio_ceph_os_setup;
+    queue       = fio_ceph_os_queue;
+    commit      = fio_ceph_os_commit;
+    getevents   = fio_ceph_os_getevents;
+    event       = fio_ceph_os_event;
+    cleanup     = fio_ceph_os_cleanup;
+    open_file   = fio_ceph_os_open;
+    close_file  = fio_ceph_os_close;
+    io_u_init   = fio_ceph_os_io_u_init;
+    io_u_free   = fio_ceph_os_io_u_free;
+    options     = ceph_options.data();
+    option_struct_size = sizeof(struct Options);
+  }
+};
+
+} // anonymous namespace
+
+extern "C" {
+// the exported fio engine interface
+void get_ioengine(struct ioengine_ops** ioengine_ptr) {
+  static ceph_ioengine ioengine;
+  *ioengine_ptr = &ioengine;
+}
+} // extern "C"
diff --git a/src/test/fio_ceph_filestore.cc b/src/test/fio_ceph_filestore.cc
deleted file mode 100644 (file)
index 24b1ed9..0000000
+++ /dev/null
@@ -1,344 +0,0 @@
-/*
- *  Ceph FileStore engine
- *
- * IO engine using Ceph's FileStore class to test low-level performance of
- * Ceph OSDs.
- *
- */
-
-#include "os/FileStore.h"
-#include "global/global_init.h"
-
-#include <fio.h>
-
-struct fio_ceph_filestore_iou {
-       struct io_u *io_u;
-       int io_complete;
-};
-
-struct ceph_filestore_data {
-       struct io_u **aio_events;
-       char *osd_path;
-       char *journal_path;
-       ObjectStore *fs;
-};
-
-struct ceph_filestore_options {
-       struct thread_data *td;
-       char *ceph_filestore_name;
-       char *pool_name;
-       char *client_name;
-};
-
-#if 0
-static struct fio_option options[] = {
-       {
-        .name     = "ceph_filestorename",
-        .lname    = "ceph_filestore engine ceph_filestorename",
-        .type     = FIO_OPT_STR_STORE,
-        .help     = "RBD name for RBD engine",
-        .off1     = offsetof(struct ceph_filestore_options, ceph_filestore_name),
-        .category = FIO_OPT_C_ENGINE,
-        .group    = FIO_OPT_G_RBD,
-        },
-       {
-        .name     = "pool",
-        .lname    = "ceph_filestore engine pool",
-        .type     = FIO_OPT_STR_STORE,
-        .help     = "Name of the pool hosting the RBD for the RBD engine",
-        .off1     = offsetof(struct ceph_filestore_options, pool_name),
-        .category = FIO_OPT_C_ENGINE,
-        .group    = FIO_OPT_G_RBD,
-        },
-       {
-        .name     = "clientname",
-        .lname    = "ceph_filestore engine clientname",
-        .type     = FIO_OPT_STR_STORE,
-        .help     = "Name of the ceph client to access the RBD for the RBD engine",
-        .off1     = offsetof(struct ceph_filestore_options, client_name),
-        .category = FIO_OPT_C_ENGINE,
-        .group    = FIO_OPT_G_RBD,
-        },
-       {
-        .name = NULL,
-        },
-};
-#endif
-
-/////////////////////////////
-
-
-struct OnCommitted : public Context {
-  struct io_u *io_u;
-  OnCommitted(struct io_u* io_u) : io_u(io_u) {}
-  void finish(int r) {
-  }
-};
-
-struct OnApplied : public Context {
-  struct io_u *io_u;
-  ObjectStore::Transaction *t;
-  OnApplied(struct io_u* io_u, ObjectStore::Transaction *t) : io_u(io_u), t(t) {}
-  void finish(int r) {
-
-       struct fio_ceph_filestore_iou *fio_ceph_filestore_iou =
-           (struct fio_ceph_filestore_iou *)io_u->engine_data;
-
-       fio_ceph_filestore_iou->io_complete = 1;
-
-
-       delete t;
-  }
-};
-
-
-
-
-static int _fio_setup_ceph_filestore_data(struct thread_data *td,
-                              struct ceph_filestore_data **ceph_filestore_data_ptr)
-{
-       struct ceph_filestore_data *ceph_filestore_data;
-
-       if (td->io_ops->data)
-               return 0;
-
-       ceph_filestore_data = (struct ceph_filestore_data*) malloc(sizeof(struct ceph_filestore_data));
-       if (!ceph_filestore_data)
-               goto failed;
-
-       memset(ceph_filestore_data, 0, sizeof(struct ceph_filestore_data));
-
-       ceph_filestore_data->aio_events = (struct io_u **) malloc(td->o.iodepth * sizeof(struct io_u *));
-       if (!ceph_filestore_data->aio_events)
-               goto failed;
-
-       memset(ceph_filestore_data->aio_events, 0, td->o.iodepth * sizeof(struct io_u *));
-
-       *ceph_filestore_data_ptr = ceph_filestore_data;
-
-       return 0;
-
-failed:
-       return 1;
-
-}
-
-static struct io_u *fio_ceph_filestore_event(struct thread_data *td, int event)
-{
-       struct ceph_filestore_data *ceph_filestore_data = (struct ceph_filestore_data *) td->io_ops->data;
-
-       return ceph_filestore_data->aio_events[event];
-}
-
-static int fio_ceph_filestore_getevents(struct thread_data *td, unsigned int min,
-                            unsigned int max, struct timespec *t)
-{
-       struct ceph_filestore_data *ceph_filestore_data = (struct ceph_filestore_data *) td->io_ops->data;
-       unsigned int events = 0;
-       struct io_u *io_u;
-       unsigned int i;
-       struct fio_ceph_filestore_iou *fov;
-
-       do {
-               io_u_qiter(&td->io_u_all, io_u, i) {
-                       if (!(io_u->flags & IO_U_F_FLIGHT))
-                               continue;
-
-                       fov = (struct fio_ceph_filestore_iou *)io_u->engine_data;
-
-                       if (fov->io_complete) {
-                               fov->io_complete = 0;
-                               ceph_filestore_data->aio_events[events] = io_u;
-                               events++;
-                       }
-
-               }
-               if (events < min)
-                       usleep(100);
-               else
-                       break;
-
-       } while (1);
-
-       return events;
-}
-
-static int fio_ceph_filestore_queue(struct thread_data *td, struct io_u *io_u)
-{
-       int r = -1;
-       char buf[32];
-       struct ceph_filestore_data *ceph_filestore_data = (struct ceph_filestore_data *) td->io_ops->data;
-       uint64_t len = io_u->xfer_buflen;
-       uint64_t off = io_u->offset;
-       ObjectStore *fs = ceph_filestore_data->fs;
-       object_t poid(buf);
-
-       bufferlist data;
-       snprintf(buf, sizeof(buf), "XXX_%lu_%lu", io_u->start_time.tv_usec, io_u->start_time.tv_sec);
-       data.append((char *)io_u->xfer_buf, io_u->xfer_buflen);
-
-       fio_ro_check(td, io_u);
-
-
-       ObjectStore::Transaction *t = new ObjectStore::Transaction;
-       if (!t) {
-
-               cout << "ObjectStore Transcation allocation failed." << std::endl;
-               goto failed;
-       }
-
-
-        if (io_u->ddir == DDIR_WRITE) {
-               t->write(coll_t(), hobject_t(poid), off, len, data);
-               //cout << "QUEUING transaction " << io_u << std::endl;
-               fs->queue_transaction(NULL, t, new OnApplied(io_u, t), new OnCommitted(io_u));
-       } else {
-               cout << "WARNING: No DDIR beside DDIR_WRITE supported!" << std::endl;
-               return FIO_Q_COMPLETED;
-       }
-
-       return FIO_Q_QUEUED;
-
-failed:
-       io_u->error = r;
-       td_verror(td, io_u->error, "xfer");
-       return FIO_Q_COMPLETED;
-}
-
-static int fio_ceph_filestore_init(struct thread_data *td)
-{
-       vector<const char*> args;
-       struct ceph_filestore_data *ceph_filestore_data = (struct ceph_filestore_data *) td->io_ops->data;
-       ObjectStore::Transaction ft;
-
-       global_init(NULL, args, CEPH_ENTITY_TYPE_OSD, CODE_ENVIRONMENT_UTILITY, 0);
-       //g_conf->journal_dio = false;
-       common_init_finish(g_ceph_context);
-       //g_ceph_context->_conf->set_val("debug_filestore", "20");
-       //g_ceph_context->_conf->set_val("debug_throttle", "20");
-       g_ceph_context->_conf->apply_changes(NULL);
-
-       ceph_filestore_data->osd_path = strdup("/mnt/fio_ceph_filestore.XXXXXXX");
-       ceph_filestore_data->journal_path = strdup("/var/lib/ceph/osd/journal-ram/fio_ceph_filestore.XXXXXXX");
-
-       mkdtemp(ceph_filestore_data->osd_path);
-       //mktemp(ceph_filestore_data->journal_path); // NOSPC issue
-
-       ObjectStore *fs = new FileStore(ceph_filestore_data->osd_path, ceph_filestore_data->journal_path);
-       ceph_filestore_data->fs = fs;
-
-       if (fs->mkfs() < 0) {
-               cout << "mkfs failed" << std::endl;
-               goto failed;
-       }
-       
-       if (fs->mount() < 0) {
-               cout << "mount failed" << std::endl;
-               goto failed;
-       }
-
-       ft.create_collection(coll_t());
-       fs->apply_transaction(ft);
-
-
-       return 0;
-
-failed:
-       return 1;
-
-}
-
-static void fio_ceph_filestore_cleanup(struct thread_data *td)
-{
-       struct ceph_filestore_data *ceph_filestore_data = (struct ceph_filestore_data *) td->io_ops->data;
-
-       if (ceph_filestore_data) {
-               free(ceph_filestore_data->aio_events);
-               free(ceph_filestore_data->osd_path);
-               free(ceph_filestore_data->journal_path);
-               free(ceph_filestore_data);
-       }
-
-}
-
-static int fio_ceph_filestore_setup(struct thread_data *td)
-{
-       int r = 0;
-       struct fio_file *f;
-       struct ceph_filestore_data *ceph_filestore_data = NULL;
-
-       /* allocate engine specific structure to deal with libceph_filestore. */
-       r = _fio_setup_ceph_filestore_data(td, &ceph_filestore_data);
-       if (r) {
-               log_err("fio_setup_ceph_filestore_data failed.\n");
-               goto cleanup;
-       }
-       td->io_ops->data = ceph_filestore_data;
-
-       /* taken from "net" engine. Pretend we deal with files,
-        * even if we do not have any ideas about files.
-        * The size of the FileStore is set instead of a artificial file.
-        */
-       if (!td->files_index) {
-               add_file(td, td->o.filename ? : "ceph_filestore", 0, 0);
-               td->o.nr_files = td->o.nr_files ? : 1;
-       }
-       f = td->files[0];
-       f->real_file_size = 1024 * 1024; 
-
-       return 0;
-
-cleanup:
-       fio_ceph_filestore_cleanup(td);
-       return r;
-}
-
-static int fio_ceph_filestore_open(struct thread_data *td, struct fio_file *f)
-{
-       return 0;
-}
-
-static void fio_ceph_filestore_io_u_free(struct thread_data *td, struct io_u *io_u)
-{
-       struct fio_ceph_filestore_iou *o = (struct fio_ceph_filestore_iou *) io_u->engine_data;
-
-       if (o) {
-               io_u->engine_data = NULL;
-               free(o);
-       }
-}
-
-static int fio_ceph_filestore_io_u_init(struct thread_data *td, struct io_u *io_u)
-{
-       struct fio_ceph_filestore_iou *o;
-
-       o = (struct fio_ceph_filestore_iou *) malloc(sizeof(*o));
-       o->io_complete = 0;
-       o->io_u = io_u;
-       io_u->engine_data = o;
-       return 0;
-}
-
-extern "C" {
-void get_ioengine(struct ioengine_ops **ioengine_ptr) {
-       struct ioengine_ops *ioengine;
-       *ioengine_ptr = (struct ioengine_ops *) malloc(sizeof(struct ioengine_ops));
-       ioengine = *ioengine_ptr;
-
-       strcpy(ioengine->name, "ceph_filestore");
-       ioengine->version        = FIO_IOOPS_VERSION;
-       ioengine->setup          = fio_ceph_filestore_setup;
-       ioengine->init           = fio_ceph_filestore_init;
-       //ioengine->prep           = fio_ceph_filestore_prep;
-       ioengine->queue          = fio_ceph_filestore_queue;
-       //ioengine->cancel         = fio_ceph_filestore_cancel;
-       ioengine->getevents      = fio_ceph_filestore_getevents;
-       ioengine->event          = fio_ceph_filestore_event;
-       ioengine->cleanup        = fio_ceph_filestore_cleanup;
-       ioengine->open_file      = fio_ceph_filestore_open;
-       //ioengine->close_file     = fio_ceph_filestore_close;
-       ioengine->io_u_init      = fio_ceph_filestore_io_u_init;
-       ioengine->io_u_free      = fio_ceph_filestore_io_u_free;
-}
-}
-