]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
test/fio: introduce fio ioengine: fio_ceph_messenger
authorRoman Penyaev <rpenyaev@suse.de>
Tue, 16 Oct 2018 09:52:01 +0000 (11:52 +0200)
committerRoman Penyaev <rpenyaev@suse.de>
Wed, 16 Jan 2019 14:53:49 +0000 (15:53 +0100)
This patch introduces new FIO engine which main goal is to test
bare Ceph messenger transport layer performance.  Engine submits
requests in async manner and polls for completion.  Completions
are kept in lockless ring buffer so on hot path no muteces or
conditions are used in order to get maximum bandwidth and less
latency.

Signed-off-by: Roman Penyaev <rpenyaev@suse.de>
CMakeLists.txt
src/test/fio/CMakeLists.txt
src/test/fio/README.md
src/test/fio/ceph-messenger.conf [new file with mode: 0644]
src/test/fio/ceph-messenger.fio [new file with mode: 0644]
src/test/fio/fio_ceph_messenger.cc [new file with mode: 0644]
src/test/fio/ring_buffer.h [new file with mode: 0644]

index d2935a36f8d4ac5a7d847cb254155005316612b4..f414087c82a79291094241049547840270d61c38 100644 (file)
@@ -580,7 +580,10 @@ option(WITH_SYSTEM_FIO "require and build with system fio" OFF)
 if(WITH_SYSTEM_FIO)
   find_package(fio REQUIRED)
 elseif(WITH_FIO)
-  set(FIO_INCLUDE_DIR ${CMAKE_BINARY_DIR}/src/fio)
+  if (NOT FIO_INCLUDE_DIR)
+    # Use local external fio if include directory is not set
+    set(FIO_INCLUDE_DIR ${CMAKE_BINARY_DIR}/src/fio)
+  endif()
   include(BuildFIO)
   build_fio()
 endif()
index 81783d45aabbefddaed01a0562f8530701df0092..4bfbaefe04d04f43de59b0566f9dd32da8940006 100644 (file)
@@ -1,6 +1,11 @@
+# ObjectStore
 add_library(fio_ceph_objectstore SHARED fio_ceph_objectstore.cc)
 target_include_directories(fio_ceph_objectstore SYSTEM PUBLIC ${FIO_INCLUDE_DIR})
 
+# Messenger
+add_library(fio_ceph_messenger SHARED fio_ceph_messenger.cc)
+target_include_directories(fio_ceph_messenger SYSTEM PUBLIC ${FIO_INCLUDE_DIR})
+
 # prevent fio from adding a 'typedef int bool'
 set(FIO_CFLAGS "-DCONFIG_HAVE_BOOL")
 
@@ -8,9 +13,17 @@ set(FIO_CFLAGS "-DCONFIG_HAVE_BOOL")
 set_target_properties(fio_ceph_objectstore PROPERTIES
   CXX_EXTENSIONS ON
   COMPILE_FLAGS "${FIO_CFLAGS}")
+set_target_properties(fio_ceph_messenger PROPERTIES
+  CXX_EXTENSIONS ON
+  COMPILE_FLAGS "${FIO_CFLAGS}")
 
 if(WITH_FIO)
   add_dependencies(fio_ceph_objectstore fio_ext)
+  add_dependencies(fio_ceph_messenger fio_ext)
 endif()
+
 target_link_libraries(fio_ceph_objectstore os global)
 install(TARGETS fio_ceph_objectstore DESTINATION lib)
+
+target_link_libraries(fio_ceph_messenger os global)
+install(TARGETS fio_ceph_messenger DESTINATION lib)
index a2f67a3880180d0a3ace3554247d0a18bdd56e52..91a98af9e23183b8ca5723f7b68a80bc2585239a 100644 (file)
@@ -86,3 +86,31 @@ If you installed ceph in any other place (cmake -DCMAKE_INSTALL_PREFIX=${CEPH_IN
     rados=yes make
 
 "-ltcmalloc" is necessary if ceph was compiled with tcmalloc.
+
+Messenger
+---------
+
+This fio engine allows you to test CEPH messenger transport layer, without
+any disk activities involved.
+
+To build fio_ceph_messenger:
+```
+  ./do_cmake.sh -DWITH_FIO=ON
+  cd build
+  make fio_ceph_messenger
+```
+If you install the ceph libraries to a location that isn't in your
+LD_LIBRARY_PATH, be sure to add it:
+
+    export LD_LIBRARY_PATH=/path/to/install/lib
+
+To view the fio options specific to the messenger engine:
+
+    ./fio --enghelp=libfio_ceph_messenger.so
+
+The ceph_conf_file= option requires a ceph configuration file (ceph.conf),
+see ceph-messenger.conf and ceph-messenger.fio for details.
+
+To run:
+
+    ./fio ./ceph-messenger.fio
diff --git a/src/test/fio/ceph-messenger.conf b/src/test/fio/ceph-messenger.conf
new file mode 100644 (file)
index 0000000..8d83d36
--- /dev/null
@@ -0,0 +1,7 @@
+[global]
+
+ms_type=async+posix
+ms_crc_data=false
+ms_crc_header=false
+ms_dispatch_throttle_bytes=0
+debug_ms=0/0
diff --git a/src/test/fio/ceph-messenger.fio b/src/test/fio/ceph-messenger.fio
new file mode 100644 (file)
index 0000000..97ca6fa
--- /dev/null
@@ -0,0 +1,20 @@
+[global]
+bs=4k
+size=1g
+iodepth=128
+
+ioengine=libfio_ceph_messenger.so
+#ceph_conf_file=ceph-messenger.conf
+
+hostname=127.0.0.1
+port=5555
+
+ms_type=async+posix # or async+dpdk or async+rdma
+
+[client]
+receiver=0
+rw=write
+
+[server]
+receiver=1
+rw=read
diff --git a/src/test/fio/fio_ceph_messenger.cc b/src/test/fio/fio_ceph_messenger.cc
new file mode 100644 (file)
index 0000000..b30da49
--- /dev/null
@@ -0,0 +1,692 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ *  CEPH messenger engine
+ *
+ *  FIO engine which uses ceph messenger as a transport.  See corresponding
+ *  FIO client and server jobs for details.
+ */
+
+#include "global/global_init.h"
+#include "msg/Messenger.h"
+#include "messages/MOSDOp.h"
+#include "messages/MOSDOpReply.h"
+#include "common/perf_counters.h"
+#include "ring_buffer.h"
+
+#include <fio.h>
+#include <flist.h>
+#include <optgroup.h>
+
+#define dout_context g_ceph_context
+#define dout_subsys ceph_subsys_
+
+enum ceph_msgr_type {
+  CEPH_MSGR_TYPE_UNDEF,
+  CEPH_MSGR_TYPE_POSIX,
+  CEPH_MSGR_TYPE_DPDK,
+  CEPH_MSGR_TYPE_RDMA,
+};
+
+const char *ceph_msgr_types[] = { "undef", "async+posix",
+                                 "async+dpdk", "async+rdma" };
+
+struct ceph_msgr_options {
+  struct thread_data *td__;
+  unsigned int is_receiver;
+  unsigned int is_single;
+  unsigned int port;
+  const char *hostname;
+  const char *conffile;
+  enum ceph_msgr_type ms_type;
+};
+
+class FioDispatcher;
+
+struct ceph_msgr_data {
+  ceph_msgr_data(struct ceph_msgr_options *o_, unsigned iodepth) :
+    o(o_) {
+    INIT_FLIST_HEAD(&io_inflight_list);
+    INIT_FLIST_HEAD(&io_pending_list);
+    ring_buffer_init(&io_completed_q, iodepth);
+    pthread_spin_init(&spin, PTHREAD_PROCESS_PRIVATE);
+  }
+
+  struct ceph_msgr_options *o;
+  Messenger *msgr = NULL;
+  FioDispatcher *disp = NULL;
+  pthread_spinlock_t spin;
+  struct ring_buffer io_completed_q;
+  struct flist_head io_inflight_list;
+  struct flist_head io_pending_list;
+  unsigned int io_inflight_nr = 0;
+  unsigned int io_pending_nr  = 0;
+};
+
+struct ceph_msgr_io {
+  struct flist_head list;
+  struct ceph_msgr_data *data;
+  struct io_u *io_u;
+  MOSDOp *req_msg; /** Cached request, valid only for sender */
+};
+
+struct ceph_msgr_reply_io {
+  struct flist_head list;
+  MOSDOpReply *rep;
+};
+
+static void *str_to_ptr(const std::string &str)
+{
+  return (void *)strtoul(str.c_str(), NULL, 16);
+}
+
+static std::string ptr_to_str(void *ptr)
+{
+  char buf[32];
+
+  snprintf(buf, sizeof(buf), "%llx", (unsigned long long)ptr);
+  return std::string(buf);
+}
+
+/*
+ * Used for refcounters print on the last context put, almost duplicates
+ * global context refcounter, sigh.
+ */
+static std::atomic<int> ctx_ref(1);
+
+static void create_or_get_ceph_context(struct ceph_msgr_options *o)
+{
+  if (g_ceph_context) {
+    g_ceph_context->get();
+    ctx_ref++;
+    return;
+  }
+
+  boost::intrusive_ptr<CephContext> cct;
+  vector<const char*> args;
+
+  if (o->conffile)
+    args = { "--conf", o->conffile };
+
+  cct = global_init(NULL, args, CEPH_ENTITY_TYPE_CLIENT,
+                   CODE_ENVIRONMENT_UTILITY,
+                   CINIT_FLAG_NO_DEFAULT_CONFIG_FILE);
+  /* Will use g_ceph_context instead */
+  cct.detach();
+
+  common_init_finish(g_ceph_context);
+  g_ceph_context->_conf.apply_changes(NULL);
+}
+
+static void put_ceph_context(void)
+{
+  if (--ctx_ref == 0) {
+    ostringstream ostr;
+    Formatter* f;
+
+    f = Formatter::create("json-pretty");
+    g_ceph_context->get_perfcounters_collection()->dump_formatted(f, false);
+    ostr << ">>>>>>>>>>>>> PERFCOUNTERS BEGIN <<<<<<<<<<<<" << std::endl;
+    f->flush(ostr);
+    ostr << ">>>>>>>>>>>>>  PERFCOUNTERS END  <<<<<<<<<<<<" << std::endl;
+
+    delete f;
+    dout(0) <<  ostr.str() << dendl;
+  }
+
+  g_ceph_context->put();
+}
+
+static void ceph_msgr_sender_on_reply(const object_t &oid)
+{
+  struct ceph_msgr_data *data;
+  struct ceph_msgr_io *io;
+
+  /*
+   * Here we abuse object and use it as a raw pointer. Since this is
+   * only for benchmarks and testing we do not care about anything
+   * but performance.  So no need to use global structure in order
+   * to search for reply, just send a pointer and get it back.
+   */
+
+  io = (decltype(io))str_to_ptr(oid.name);
+  data = io->data;
+  ring_buffer_enqueue(&data->io_completed_q, (void *)io);
+}
+
+
+class ReplyCompletion : public Message::CompletionHook {
+  struct ceph_msgr_io *m_io;
+
+public:
+  ReplyCompletion(MOSDOpReply *rep, struct ceph_msgr_io *io) :
+    Message::CompletionHook(rep),
+    m_io(io) {
+  }
+  void finish(int err) override {
+    struct ceph_msgr_data *data = m_io->data;
+
+    ring_buffer_enqueue(&data->io_completed_q, (void *)m_io);
+  }
+};
+
+static void ceph_msgr_receiver_on_request(struct ceph_msgr_data *data,
+                                         MOSDOp *req)
+{
+  MOSDOpReply *rep;
+
+  rep = new MOSDOpReply(req, 0, 0, 0, false);
+  rep->set_connection(req->get_connection());
+
+  pthread_spin_lock(&data->spin);
+  if (data->io_inflight_nr) {
+    struct ceph_msgr_io *io;
+
+    data->io_inflight_nr--;
+    io = flist_first_entry(&data->io_inflight_list,
+                          struct ceph_msgr_io, list);
+    flist_del(&io->list);
+    pthread_spin_unlock(&data->spin);
+
+    rep->set_completion_hook(new ReplyCompletion(rep, io));
+    rep->get_connection()->send_message(rep);
+  } else {
+    struct ceph_msgr_reply_io *rep_io;
+
+    rep_io = (decltype(rep_io))malloc(sizeof(*rep_io));
+    rep_io->rep = rep;
+
+    data->io_pending_nr++;
+    flist_add_tail(&rep_io->list, &data->io_pending_list);
+    pthread_spin_unlock(&data->spin);
+  }
+}
+
+class FioDispatcher : public Dispatcher {
+  struct ceph_msgr_data *m_data;
+
+public:
+  FioDispatcher(struct ceph_msgr_data *data):
+    Dispatcher(g_ceph_context),
+    m_data(data) {
+    require_authorizer = false;
+  }
+  bool ms_can_fast_dispatch_any() const override {
+    return true;
+  }
+  bool ms_can_fast_dispatch(const Message *m) const override {
+    switch (m->get_type()) {
+    case CEPH_MSG_OSD_OP:
+      return m_data->o->is_receiver;
+    case CEPH_MSG_OSD_OPREPLY:
+      return !m_data->o->is_receiver;
+    default:
+      return false;
+    }
+  }
+  void ms_handle_fast_connect(Connection *con) override {
+  }
+  void ms_handle_fast_accept(Connection *con) override {
+  }
+  bool ms_dispatch(Message *m) override {
+    return true;
+  }
+  void ms_fast_dispatch(Message *m) override {
+    if (m_data->o->is_receiver) {
+      MOSDOp *req;
+
+      /*
+       * Server side, handle request.
+       */
+
+      req = static_cast<MOSDOp*>(m);
+      req->finish_decode();
+
+      ceph_msgr_receiver_on_request(m_data, req);
+    } else {
+      MOSDOpReply *rep;
+
+      /*
+       * Client side, get reply, extract objid and mark
+       * IO as completed.
+       */
+
+      rep = static_cast<MOSDOpReply*>(m);
+      ceph_msgr_sender_on_reply(rep->get_oid());
+    }
+    m->put();
+  }
+  bool ms_handle_reset(Connection *con) override {
+    return true;
+  }
+  void ms_handle_remote_reset(Connection *con) override {
+  }
+  bool ms_handle_refused(Connection *con) override {
+    return false;
+  }
+  int ms_handle_authentication(Connection *con) override {
+    return 1;
+  }
+};
+
+static entity_addr_t hostname_to_addr(struct ceph_msgr_options *o)
+{
+  entity_addr_t addr;
+
+  addr.parse(o->hostname);
+  addr.set_port(o->port);
+  addr.set_nonce(0);
+
+  return addr;
+}
+
+static Messenger *create_messenger(struct ceph_msgr_options *o)
+{
+  entity_name_t ename = o->is_receiver ?
+    entity_name_t::OSD(0) : entity_name_t::CLIENT(0);
+  std::string lname = o->is_receiver ?
+    "receiver" : "sender";
+
+  /* Does anybody really uses those flags in messenger? Seems not. */
+  unsigned flags = o->is_receiver ?
+    Messenger::HAS_HEAVY_TRAFFIC |
+    Messenger::HAS_MANY_CONNECTIONS : 0;
+
+  std::string ms_type = o->ms_type != CEPH_MSGR_TYPE_UNDEF ?
+    ceph_msgr_types[o->ms_type] :
+    g_ceph_context->_conf.get_val<std::string>("ms_type");
+
+  Messenger *msgr = Messenger::create(g_ceph_context, ms_type.c_str(),
+                                     ename, lname, 0, flags);
+  if (o->is_receiver) {
+    msgr->set_default_policy(Messenger::Policy::stateless_server(0));
+    msgr->bind(hostname_to_addr(o));
+  } else {
+    msgr->set_default_policy(Messenger::Policy::lossless_client(0));
+  }
+  msgr->start();
+
+  return msgr;
+}
+
+static Messenger *single_msgr;
+static std::atomic<int> single_msgr_ref;
+static vector<FioDispatcher *> single_msgr_disps;
+
+static void init_messenger(struct ceph_msgr_data *data)
+{
+  struct ceph_msgr_options *o = data->o;
+  FioDispatcher *disp;
+  Messenger *msgr;
+
+  disp = new FioDispatcher(data);
+  if (o->is_single) {
+    /*
+     * Single messenger instance for the whole FIO
+     */
+
+    if (!single_msgr) {
+      msgr = create_messenger(o);
+      single_msgr = msgr;
+    } else {
+      msgr = single_msgr;
+    }
+    single_msgr_disps.push_back(disp);
+    single_msgr_ref++;
+  } else {
+    /*
+     * Messenger instance per FIO thread
+     */
+    msgr = create_messenger(o);
+  }
+  msgr->add_dispatcher_head(disp);
+
+  data->disp = disp;
+  data->msgr = msgr;
+}
+
+static void free_messenger(struct ceph_msgr_data *data)
+{
+  data->msgr->shutdown();
+  data->msgr->wait();
+  delete data->msgr;
+}
+
+static void put_messenger(struct ceph_msgr_data *data)
+{
+  struct ceph_msgr_options *o = data->o;
+
+  if (o->is_single) {
+    if (--single_msgr_ref == 0) {
+      free_messenger(data);
+      /*
+       * In case of a single messenger instance we have to
+       * free dispatchers after actual messenger destruction.
+       */
+      for (auto disp : single_msgr_disps)
+       delete disp;
+      single_msgr = NULL;
+    }
+  } else {
+    free_messenger(data);
+    delete data->disp;
+  }
+  data->disp = NULL;
+  data->msgr = NULL;
+}
+
+static int fio_ceph_msgr_setup(struct thread_data *td)
+{
+  struct ceph_msgr_options *o = (decltype(o))td->eo;
+  ceph_msgr_data *data;
+
+  /* We have to manage global resources so we use threads */
+  td->o.use_thread = 1;
+
+  create_or_get_ceph_context(o);
+
+  if (!td->io_ops_data) {
+    data = new ceph_msgr_data(o, td->o.iodepth);
+    init_messenger(data);
+    td->io_ops_data = (void *)data;
+  }
+
+  return 0;
+}
+
+static void fio_ceph_msgr_cleanup(struct thread_data *td)
+{
+  struct ceph_msgr_data *data;
+  unsigned nr;
+
+  data = (decltype(data))td->io_ops_data;
+  put_messenger(data);
+
+  nr = ring_buffer_used_size(&data->io_completed_q);
+  if (nr)
+    fprintf(stderr, "fio: io_completed_nr==%d, but should be zero\n",
+           nr);
+  if (data->io_inflight_nr)
+    fprintf(stderr, "fio: io_inflight_nr==%d, but should be zero\n",
+           data->io_inflight_nr);
+  if (data->io_pending_nr)
+    fprintf(stderr, "fio: io_pending_nr==%d, but should be zero\n",
+           data->io_pending_nr);
+  if (!flist_empty(&data->io_inflight_list))
+    fprintf(stderr, "fio: io_inflight_list is not empty\n");
+  if (!flist_empty(&data->io_pending_list))
+    fprintf(stderr, "fio: io_pending_list is not empty\n");
+
+  ring_buffer_deinit(&data->io_completed_q);
+  delete data;
+  put_ceph_context();
+}
+
+static int fio_ceph_msgr_io_u_init(struct thread_data *td, struct io_u *io_u)
+{
+  struct ceph_msgr_options *o = (decltype(o))td->eo;
+  struct ceph_msgr_io *io;
+  MOSDOp *req_msg = NULL;
+
+  io = (decltype(io))malloc(sizeof(*io));
+  io->io_u = io_u;
+  io->data = (decltype(io->data))td->io_ops_data;
+
+  if (!o->is_receiver) {
+    object_t oid(ptr_to_str(io));
+    pg_t pgid;
+    object_locator_t oloc;
+    hobject_t hobj(oid, oloc.key, CEPH_NOSNAP, pgid.ps(),
+                  pgid.pool(), oloc.nspace);
+    spg_t spgid(pgid);
+    entity_inst_t dest(entity_name_t::OSD(0), hostname_to_addr(o));
+
+    Messenger *msgr = io->data->msgr;
+    ConnectionRef con = msgr->connect_to(dest.name.type(),
+                                        entity_addrvec_t(dest.addr));
+
+    req_msg = new MOSDOp(0, 0, hobj, spgid, 0, 0, 0);
+    req_msg->set_connection(con);
+  }
+
+  io->req_msg = req_msg;
+  io_u->engine_data = (void *)io;
+
+  return 0;
+}
+
+static void fio_ceph_msgr_io_u_free(struct thread_data *td, struct io_u *io_u)
+{
+  struct ceph_msgr_io *io;
+
+  io = (decltype(io))io_u->engine_data;
+  if (io) {
+    io_u->engine_data = NULL;
+    if (io->req_msg)
+      io->req_msg->put();
+    free(io);
+  }
+}
+
+static enum fio_q_status ceph_msgr_sender_queue(struct thread_data *td,
+                                               struct io_u *io_u)
+{
+  struct ceph_msgr_data *data;
+  struct ceph_msgr_io *io;
+
+  bufferlist buflist = bufferlist::static_from_mem(
+    (char *)io_u->buf, io_u->buflen);
+
+  io = (decltype(io))io_u->engine_data;
+  data = (decltype(data))td->io_ops_data;
+
+  /* No handy method to clear ops before reusage? Ok */
+  io->req_msg->ops.clear();
+
+  /* Here we do not care about direction, always send as write */
+  io->req_msg->write(0, io_u->buflen, buflist);
+  /* Keep message alive */
+  io->req_msg->get();
+  io->req_msg->get_connection()->send_message(io->req_msg);
+
+  return FIO_Q_QUEUED;
+}
+
+static int fio_ceph_msgr_getevents(struct thread_data *td, unsigned int min,
+                                  unsigned int max, const struct timespec *ts)
+{
+  struct ceph_msgr_data *data;
+  unsigned int nr;
+
+  data = (decltype(data))td->io_ops_data;
+
+  /*
+   * Check io_u.c : if min == 0 -> ts is valid and equal to zero,
+   * if min != 0 -> ts is NULL.
+   */
+  assert(!min ^ !ts);
+
+  nr = ring_buffer_used_size(&data->io_completed_q);
+  if (nr >= min)
+    /* We got something */
+    return min(nr, max);
+
+  /* Here we are only if min != 0 and ts == NULL */
+  assert(min && !ts);
+
+  while ((nr = ring_buffer_used_size(&data->io_completed_q)) < min &&
+        !td->terminate) {
+    /* Poll, no disk IO, so we expect response immediately. */
+    usleep(10);
+  }
+
+  return min(nr, max);
+}
+
+static struct io_u *fio_ceph_msgr_event(struct thread_data *td, int event)
+{
+  struct ceph_msgr_data *data;
+  struct ceph_msgr_io *io;
+
+  data = (decltype(data))td->io_ops_data;
+  io = (decltype(io))ring_buffer_dequeue(&data->io_completed_q);
+
+  return io->io_u;
+}
+
+static enum fio_q_status ceph_msgr_receiver_queue(struct thread_data *td,
+                                                 struct io_u *io_u)
+{
+  struct ceph_msgr_data *data;
+  struct ceph_msgr_io *io;
+
+  io = (decltype(io))io_u->engine_data;
+  data = io->data;
+  pthread_spin_lock(&data->spin);
+  if (data->io_pending_nr) {
+    struct ceph_msgr_reply_io *rep_io;
+    MOSDOpReply *rep;
+
+    data->io_pending_nr--;
+    rep_io = flist_first_entry(&data->io_pending_list,
+                              struct ceph_msgr_reply_io,
+                              list);
+    flist_del(&rep_io->list);
+    rep = rep_io->rep;
+    pthread_spin_unlock(&data->spin);
+    free(rep_io);
+
+    rep->set_completion_hook(new ReplyCompletion(rep, io));
+    rep->get_connection()->send_message(rep);
+  } else {
+    data->io_inflight_nr++;
+    flist_add_tail(&io->list, &data->io_inflight_list);
+    pthread_spin_unlock(&data->spin);
+  }
+
+  return FIO_Q_QUEUED;
+}
+
+static enum fio_q_status fio_ceph_msgr_queue(struct thread_data *td,
+                                            struct io_u *io_u)
+{
+  struct ceph_msgr_options *o = (decltype(o))td->eo;
+
+  if (o->is_receiver)
+    return ceph_msgr_receiver_queue(td, io_u);
+  else
+    return ceph_msgr_sender_queue(td, io_u);
+}
+
+static int fio_ceph_msgr_open_file(struct thread_data *td, struct fio_file *f)
+{
+  return 0;
+}
+
+static int fio_ceph_msgr_close_file(struct thread_data *, struct fio_file *)
+{
+  return 0;
+}
+
+template <class Func>
+fio_option make_option(Func&& func)
+{
+  auto o = fio_option{};
+  o.category = FIO_OPT_C_ENGINE;
+  func(std::ref(o));
+  return o;
+}
+
+static std::vector<fio_option> options {
+  make_option([] (fio_option& o) {
+    o.name  = "receiver";
+    o.lname = "CEPH messenger is receiver";
+    o.type  = FIO_OPT_BOOL;
+    o.off1  = offsetof(struct ceph_msgr_options, is_receiver);
+    o.help  = "CEPH messenger is sender or receiver";
+    o.def   = "0";
+  }),
+  make_option([] (fio_option& o) {
+    o.name  = "single_instance";
+    o.lname = "Single instance of CEPH messenger ";
+    o.type  = FIO_OPT_BOOL;
+    o.off1  = offsetof(struct ceph_msgr_options, is_single);
+    o.help  = "CEPH messenger is a created once for all threads";
+    o.def   = "0";
+  }),
+  make_option([] (fio_option& o) {
+    o.name  = "hostname";
+    o.lname = "CEPH messenger hostname";
+    o.type  = FIO_OPT_STR_STORE;
+    o.off1  = offsetof(struct ceph_msgr_options, hostname);
+    o.help  = "Hostname for CEPH messenger engine";
+  }),
+  make_option([] (fio_option& o) {
+    o.name   = "port";
+    o.lname  = "CEPH messenger engine port";
+    o.type   = FIO_OPT_INT;
+    o.off1   = offsetof(struct ceph_msgr_options, port);
+    o.maxval = 65535;
+    o.minval = 1;
+    o.help   = "Port to use for CEPH messenger";
+  }),
+  make_option([] (fio_option& o) {
+    o.name  = "ms_type";
+    o.lname = "CEPH messenger transport type: async+posix, async+dpdk, async+rdma";
+    o.type  = FIO_OPT_STR;
+    o.off1  = offsetof(struct ceph_msgr_options, ms_type);
+    o.help  = "Transport type for CEPH messenger, see 'ms async transport type' corresponding CEPH documentation page";
+    o.def   = "undef";
+
+    o.posval[0].ival = "undef";
+    o.posval[0].oval = CEPH_MSGR_TYPE_UNDEF;
+
+    o.posval[1].ival = "async+posix";
+    o.posval[1].oval = CEPH_MSGR_TYPE_POSIX;
+    o.posval[1].help = "POSIX API";
+
+    o.posval[2].ival = "async+dpdk";
+    o.posval[2].oval = CEPH_MSGR_TYPE_DPDK;
+    o.posval[2].help = "DPDK";
+
+    o.posval[3].ival = "async+rdma";
+    o.posval[3].oval = CEPH_MSGR_TYPE_RDMA;
+    o.posval[3].help = "RDMA";
+  }),
+  make_option([] (fio_option& o) {
+    o.name  = "ceph_conf_file";
+    o.lname = "CEPH configuration file";
+    o.type  = FIO_OPT_STR_STORE;
+    o.off1  = offsetof(struct ceph_msgr_options, conffile);
+    o.help  = "Path to CEPH configuration file";
+  }),
+  {} /* Last NULL */
+};
+
+static struct ioengine_ops ioengine;
+
+extern "C" {
+
+void get_ioengine(struct ioengine_ops** ioengine_ptr)
+{
+  /*
+   * Main ioengine structure
+   */
+  ioengine.name        = "ceph-msgr";
+  ioengine.version     = FIO_IOOPS_VERSION;
+  ioengine.flags       = FIO_DISKLESSIO | FIO_UNIDIR | FIO_PIPEIO;
+  ioengine.setup       = fio_ceph_msgr_setup;
+  ioengine.queue       = fio_ceph_msgr_queue;
+  ioengine.getevents   = fio_ceph_msgr_getevents;
+  ioengine.event       = fio_ceph_msgr_event;
+  ioengine.cleanup     = fio_ceph_msgr_cleanup;
+  ioengine.open_file   = fio_ceph_msgr_open_file;
+  ioengine.close_file  = fio_ceph_msgr_close_file;
+  ioengine.io_u_init   = fio_ceph_msgr_io_u_init;
+  ioengine.io_u_free   = fio_ceph_msgr_io_u_free;
+  ioengine.option_struct_size = sizeof(struct ceph_msgr_options);
+  ioengine.options     = options.data();
+
+  *ioengine_ptr = &ioengine;
+}
+} // extern "C"
diff --git a/src/test/fio/ring_buffer.h b/src/test/fio/ring_buffer.h
new file mode 100644 (file)
index 0000000..0e1eb62
--- /dev/null
@@ -0,0 +1,102 @@
+/*
+ * Very simple and fast lockless ring buffer implementatation for
+ * one producer and one consumer.
+ */
+
+#include <stdint.h>
+#include <stddef.h>
+
+/* Do not overcomplicate, choose generic x86 case */
+#define L1_CACHE_BYTES 64
+#define __cacheline_aligned __attribute__((__aligned__(L1_CACHE_BYTES)))
+
+struct ring_buffer
+{
+  unsigned int read_idx   __cacheline_aligned;
+  unsigned int write_idx  __cacheline_aligned;
+  unsigned int size;
+  unsigned int low_mask;
+  unsigned int high_mask;
+  unsigned int bit_shift;
+  void         *data_ptr;
+};
+
+static inline unsigned int upper_power_of_two(unsigned int v)
+{
+  v--;
+  v |= v >> 1;
+  v |= v >> 2;
+  v |= v >> 4;
+  v |= v >> 8;
+  v |= v >> 16;
+  v++;
+
+  return v;
+}
+
+static inline int ring_buffer_init(struct ring_buffer* rbuf, unsigned int size)
+{
+  /* Must be pow2 */
+  if (((size-1) & size))
+    size = upper_power_of_two(size);
+
+  size *= sizeof(void *);
+  rbuf->data_ptr = malloc(size);
+  rbuf->size = size;
+  rbuf->read_idx = 0;
+  rbuf->write_idx = 0;
+  rbuf->bit_shift = __builtin_ffs(sizeof(void *))-1;
+  rbuf->low_mask = rbuf->size - 1;
+  rbuf->high_mask = rbuf->size * 2 - 1;
+
+  return 0;
+}
+
+static inline void ring_buffer_deinit(struct ring_buffer* rbuf)
+{
+  free(rbuf->data_ptr);
+}
+
+static inline unsigned int ring_buffer_used_size(const struct ring_buffer* rbuf)
+{
+  __sync_synchronize();
+  return ((rbuf->write_idx - rbuf->read_idx) & rbuf->high_mask) >>
+    rbuf->bit_shift;
+}
+
+static inline void ring_buffer_enqueue(struct ring_buffer* rbuf, void *ptr)
+{
+
+  unsigned int idx;
+
+  /*
+   * Be aware: we do not check that buffer can be full,
+   * assume user of the ring buffer can't submit more.
+   */
+
+  idx = rbuf->write_idx & rbuf->low_mask;
+  *(void **)((uintptr_t)rbuf->data_ptr + idx) = ptr;
+  /* Barrier to be sure stored pointer will be seen properly */
+  __sync_synchronize();
+  rbuf->write_idx = (rbuf->write_idx + sizeof(ptr)) & rbuf->high_mask;
+}
+
+static inline void *ring_buffer_dequeue(struct ring_buffer* rbuf)
+{
+
+  unsigned idx;
+  void *ptr;
+
+  /*
+   * Be aware: we do not check that buffer can be empty,
+   * assume user of the ring buffer called ring_buffer_used_size(),
+   * which returns actual used size and introduces memory barrier
+   * explicitly.
+   */
+
+  idx = rbuf->read_idx & rbuf->low_mask;
+  ptr = *(void **)((uintptr_t)rbuf->data_ptr + idx);
+  rbuf->read_idx = (rbuf->read_idx + sizeof(ptr)) & rbuf->high_mask;
+
+  return ptr;
+}