]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
test/bench: small io benchmarker
authorSamuel Just <sam.just@inktank.com>
Sat, 6 Oct 2012 20:58:37 +0000 (13:58 -0700)
committerSamuel Just <sam.just@inktank.com>
Tue, 30 Oct 2012 20:31:09 +0000 (13:31 -0700)
Precreates objects and does writes to random offsets within
random objects.

Includes rados, filestore, and vanilla fs variants

Signed-off-by: Samuel Just <sam.just@inktank.com>
19 files changed:
.gitignore
src/Makefile.am
src/test/bench/backend.h [new file with mode: 0644]
src/test/bench/bencher.cc [new file with mode: 0644]
src/test/bench/bencher.h [new file with mode: 0644]
src/test/bench/detailed_stat_collector.cc [new file with mode: 0644]
src/test/bench/detailed_stat_collector.h [new file with mode: 0644]
src/test/bench/distribution.h [new file with mode: 0644]
src/test/bench/dumb_backend.cc [new file with mode: 0644]
src/test/bench/dumb_backend.h [new file with mode: 0644]
src/test/bench/filestore_backend.cc [new file with mode: 0644]
src/test/bench/filestore_backend.h [new file with mode: 0644]
src/test/bench/rados_backend.cc [new file with mode: 0644]
src/test/bench/rados_backend.h [new file with mode: 0644]
src/test/bench/small_io_bench.cc [new file with mode: 0644]
src/test/bench/small_io_bench_dumb.cc [new file with mode: 0644]
src/test/bench/small_io_bench_fs.cc [new file with mode: 0644]
src/test/bench/smalliobenchprocessor.py [new file with mode: 0644]
src/test/bench/stat_collector.h [new file with mode: 0644]

index 502f0183260d16d56a17dd090203bc03e588f44b..4cb34b7e06f4296b6052d7af483e8b5772d06f99 100644 (file)
@@ -62,6 +62,9 @@ src/ocf/rbd
 src/omapbench
 src/kvstorebench
 ar-lib
+src/smalliobench
+src/smalliobenchdumb
+src/smalliobenchfs
 
 # temporary directory used by e.g. "make distcheck", e.g. ceph-0.42
 /ceph-[0-9]*/
index a72df3863d226ee73c51e56671730b32444958af..f9b860fd255f02273bf87709769940aa06b316b6 100644 (file)
@@ -220,6 +220,18 @@ testrados_SOURCES = test/osd/TestRados.cc test/osd/TestOpStat.cc test/osd/Object
 testrados_LDADD = librados.la $(LIBGLOBAL_LDA)
 bin_DEBUGPROGRAMS += testrados
 
+smalliobench_SOURCES = test/bench/small_io_bench.cc test/bench/rados_backend.cc test/bench/detailed_stat_collector.cc test/bench/bencher.cc
+smalliobench_LDADD = librados.la -lboost_program_options $(LIBGLOBAL_LDA)
+bin_DEBUGPROGRAMS += smalliobench
+
+smalliobenchfs_SOURCES = test/bench/small_io_bench_fs.cc test/bench/filestore_backend.cc test/bench/detailed_stat_collector.cc test/bench/bencher.cc
+smalliobenchfs_LDADD = librados.la -lboost_program_options $(LIBOS_LDA) $(LIBGLOBAL_LDA)
+bin_DEBUGPROGRAMS += smalliobenchfs
+
+smalliobenchdumb_SOURCES = test/bench/small_io_bench_dumb.cc test/bench/dumb_backend.cc test/bench/detailed_stat_collector.cc test/bench/bencher.cc
+smalliobenchdumb_LDADD = librados.la -lboost_program_options $(LIBOS_LDA) $(LIBGLOBAL_LDA)
+bin_DEBUGPROGRAMS += smalliobenchdumb
+
 omapbench_SOURCES = test/omap_bench.cc
 omapbench_LDADD = librados.la $(LIBGLOBAL_LDA)
 bin_DEBUGPROGRAMS += omapbench
diff --git a/src/test/bench/backend.h b/src/test/bench/backend.h
new file mode 100644 (file)
index 0000000..740e098
--- /dev/null
@@ -0,0 +1,26 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+
+#ifndef BACKENDH
+#define BACKENDH
+
+#include "include/Context.h"
+
+class Backend {
+public:
+  virtual void write(
+    const string &oid,
+    uint64_t offset,
+    const bufferlist &bl,
+    Context *on_applied,
+    Context *on_commit) = 0;
+
+  virtual void read(
+    const string &oid,
+    uint64_t offset,
+    uint64_t length,
+    bufferlist *bl,
+    Context *on_complete) = 0;
+  virtual ~Backend() {}
+};
+
+#endif
diff --git a/src/test/bench/bencher.cc b/src/test/bench/bencher.cc
new file mode 100644 (file)
index 0000000..e1d3c0b
--- /dev/null
@@ -0,0 +1,201 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+
+#include "bencher.h"
+#include "include/utime.h"
+#include <unistd.h>
+#include <tr1/memory>
+#include "common/Mutex.h"
+#include "common/Cond.h"
+
+template<typename T>
+struct C_Holder : public Context {
+  T obj;
+  C_Holder(
+    T obj)
+    : obj(obj) {}
+  void finish(int r) {
+    return;
+  }
+};
+
+struct OnDelete {
+  Context *c;
+  OnDelete(Context *c) : c(c) {}
+  ~OnDelete() { c->complete(0); }
+};
+
+struct Cleanup : public Context {
+  Bencher *bench;
+  Cleanup(Bencher *bench) : bench(bench) {}
+  void finish(int r) {
+    bench->complete_op();
+  }
+};
+
+struct OnWriteApplied : public Context {
+  Bencher *bench;
+  uint64_t seq;
+  std::tr1::shared_ptr<OnDelete> on_delete;
+  OnWriteApplied(
+    Bencher *bench, uint64_t seq,
+    std::tr1::shared_ptr<OnDelete> on_delete
+    ) : bench(bench), seq(seq), on_delete(on_delete) {}
+  void finish(int r) {
+    bench->stat_collector->write_applied(seq);
+  }
+};
+
+struct OnWriteCommit : public Context {
+  Bencher *bench;
+  uint64_t seq;
+  std::tr1::shared_ptr<OnDelete> on_delete;
+  OnWriteCommit(
+    Bencher *bench, uint64_t seq,
+    std::tr1::shared_ptr<OnDelete> on_delete
+    ) : bench(bench), seq(seq), on_delete(on_delete) {}
+  void finish(int r) {
+    bench->stat_collector->write_committed(seq);
+  }
+};
+
+struct OnReadComplete : public Context {
+  Bencher *bench;
+  uint64_t seq;
+  boost::scoped_ptr<bufferlist> bl;
+  OnReadComplete(Bencher *bench, uint64_t seq, bufferlist *bl) :
+    bench(bench), seq(seq), bl(bl) {}
+  void finish(int r) {
+    bench->stat_collector->read_complete(seq);
+    bench->complete_op();
+  }
+};
+
+void Bencher::start_op() {
+  Mutex::Locker l(lock);
+  while (open_ops >= max_in_flight)
+    open_ops_cond.Wait(lock);
+  ++open_ops;
+}
+
+void Bencher::drain_ops() {
+  Mutex::Locker l(lock);
+  while (open_ops)
+    open_ops_cond.Wait(lock);
+}
+
+void Bencher::complete_op() {
+  Mutex::Locker l(lock);
+  assert(open_ops > 0);
+  --open_ops;
+  open_ops_cond.Signal();
+}
+
+struct OnFinish {
+  bool *done;
+  Mutex *lock;
+  Cond *cond;
+  OnFinish(
+    bool *done,
+    Mutex *lock,
+    Cond *cond) :
+    done(done), lock(lock), cond(cond) {}
+  ~OnFinish() {
+    Mutex::Locker l(*lock);
+    *done = true;
+    cond->Signal();
+  }
+};
+
+void Bencher::init(
+  const set<std::string> &objects,
+  uint64_t size,
+  std::ostream *out
+  )
+{
+  bufferlist bl;
+  for (uint64_t i = 0; i < size; ++i) {
+    bl.append(0);
+  }
+  Mutex lock("init_lock");
+  Cond cond;
+  bool done = 0;
+  {
+    std::tr1::shared_ptr<OnFinish> on_finish(
+      new OnFinish(&done, &lock, &cond));
+    uint64_t num = 0;
+    for (set<std::string>::const_iterator i = objects.begin();
+        i != objects.end();
+        ++i, ++num) {
+      if (!(num % 20))
+       *out << "Creating " << num << "/" << objects.size() << std::endl;
+      backend->write(
+       *i,
+       0,
+       bl,
+       new C_Holder<std::tr1::shared_ptr<OnFinish> >(on_finish),
+       new C_Holder<std::tr1::shared_ptr<OnFinish> >(on_finish)
+       );
+    }
+  }
+  {
+    Mutex::Locker l(lock);
+    while (!done)
+      cond.Wait(lock);
+  }
+}
+
+void Bencher::run_bench()
+{
+  time_t end = time(0) + max_duration;
+  uint64_t ops = 0;
+
+  bufferlist bl;
+
+  while ((!max_duration || time(0) < end) && (!max_ops || ops < max_ops)) {
+    start_op();
+    uint64_t seq = stat_collector->next_seq();
+    boost::tuple<std::string, uint64_t, uint64_t, OpType> next =
+      (*op_dist)();
+    string obj_name = next.get<0>();
+    uint64_t offset = next.get<1>();
+    uint64_t length = next.get<2>();
+    OpType op_type = next.get<3>();
+    switch (op_type) {
+      case WRITE: {
+       std::tr1::shared_ptr<OnDelete> on_delete(
+         new OnDelete(new Cleanup(this)));
+       stat_collector->start_write(seq, length);
+       while (bl.length() < length) {
+         bl.append(rand());
+       }
+       backend->write(
+         obj_name,
+         offset,
+         bl,
+         new OnWriteApplied(
+           this, seq, on_delete),
+         new OnWriteCommit(
+           this, seq, on_delete)
+         );
+       break;
+      }
+      case READ: {
+       stat_collector->start_read(seq, length);
+       bufferlist *bl = new bufferlist;
+       backend->read(
+         obj_name,
+         offset,
+         length,
+         bl,
+         new OnReadComplete(
+           this, seq, bl)
+         );
+       break;
+      }
+      default: {
+       assert(0);
+      }
+    }
+  }
+  drain_ops();
+}
diff --git a/src/test/bench/bencher.h b/src/test/bench/bencher.h
new file mode 100644 (file)
index 0000000..9f48835
--- /dev/null
@@ -0,0 +1,147 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+
+#ifndef BENCHERH
+#define BENCHERH
+
+#include <utility>
+#include "distribution.h"
+#include "stat_collector.h"
+#include "backend.h"
+#include <boost/scoped_ptr.hpp>
+#include "common/Mutex.h"
+#include "common/Cond.h"
+#include "common/Thread.h"
+
+class OnWriteApplied;
+class OnWriteCommit;
+class OnReadComplete;
+class Clenaup;
+
+class Bencher : public Thread {
+public:
+  enum OpType {
+    WRITE,
+    READ
+  };
+
+private:
+  boost::scoped_ptr<
+    Distribution<boost::tuple<std::string,uint64_t,uint64_t, OpType> > > op_dist;
+  std::tr1::shared_ptr<StatCollector> stat_collector;
+  boost::scoped_ptr<Backend> backend;
+  const uint64_t max_in_flight;
+  const uint64_t max_duration;
+  const uint64_t max_ops;
+
+  Mutex lock;
+  Cond open_ops_cond;
+  uint64_t open_ops;
+  void start_op();
+  void drain_ops();
+  void complete_op();
+public:
+  Bencher(
+    Distribution<boost::tuple<std::string, uint64_t, uint64_t, OpType> > *op_gen,
+    std::tr1::shared_ptr<StatCollector> stat_collector,
+    Backend *backend,
+    uint64_t max_in_flight,
+    uint64_t max_duration,
+    uint64_t max_ops) :
+    op_dist(op_gen),
+    stat_collector(stat_collector),
+    backend(backend),
+    max_in_flight(max_in_flight),
+    max_duration(max_duration),
+    max_ops(max_ops),
+    lock("Bencher::lock"),
+    open_ops(0)
+  {}
+  Bencher(
+    Distribution<boost::tuple<std::string, uint64_t, uint64_t, OpType> > *op_gen,
+    StatCollector *stat_collector,
+    Backend *backend,
+    uint64_t max_in_flight,
+    uint64_t max_duration,
+    uint64_t max_ops) :
+    op_dist(op_gen),
+    stat_collector(stat_collector),
+    backend(backend),
+    max_in_flight(max_in_flight),
+    max_duration(max_duration),
+    max_ops(max_ops),
+    lock("Bencher::lock"),
+    open_ops(0)
+  {}
+  Bencher(
+    Distribution<std::string> *object_gen,
+    Distribution<uint64_t> *offset_gen,
+    Distribution<uint64_t> *length_gen,
+    Distribution<OpType> *op_type_gen,
+    StatCollector *stat_collector,
+    Backend *backend,
+    uint64_t max_in_flight,
+    uint64_t max_duration,
+    uint64_t max_ops) :
+    op_dist(
+      new FourTupleDist<std::string, uint64_t, uint64_t, OpType>(
+       object_gen, offset_gen, length_gen, op_type_gen)),
+    stat_collector(stat_collector),
+    backend(backend),
+    max_in_flight(max_in_flight),
+    max_duration(max_duration),
+    max_ops(max_ops),
+    lock("Bencher::lock"),
+    open_ops(0)
+  {}
+
+  void init(
+    const set<std::string> &objects,
+    uint64_t size,
+    std::ostream *out
+    );
+
+  void run_bench();
+  void *entry() {
+    run_bench();
+    return 0;
+  }
+  friend class OnWriteApplied;
+  friend class OnWriteCommit;
+  friend class OnReadComplete;
+  friend class Cleanup;
+};
+
+class SequentialLoad :
+  public Distribution<
+  boost::tuple<string, uint64_t, uint64_t, Bencher::OpType> > {
+  set<string> objects;
+  uint64_t size;
+  uint64_t length;
+  set<string>::iterator object_pos;
+  uint64_t cur_pos;
+  boost::scoped_ptr<Distribution<Bencher::OpType> > op_dist;
+  SequentialLoad(const SequentialLoad &other);
+public:
+  SequentialLoad(
+    const set<string> &_objects, uint64_t size,
+    uint64_t length,
+    Distribution<Bencher::OpType> *op_dist)
+    : objects(_objects), size(size), length(length),
+      object_pos(objects.begin()), cur_pos(0),
+      op_dist(op_dist) {}
+
+  boost::tuple<string, uint64_t, uint64_t, Bencher::OpType>
+  operator()() {
+    boost::tuple<string, uint64_t, uint64_t, Bencher::OpType> ret =
+      boost::make_tuple(*object_pos, cur_pos, length, (*op_dist)());
+    cur_pos += length;
+    if (cur_pos > size) {
+      cur_pos = 0;
+      object_pos++;
+    }
+    if (object_pos == objects.end())
+      object_pos = objects.begin();
+    return ret;
+  }
+};
+#endif
diff --git a/src/test/bench/detailed_stat_collector.cc b/src/test/bench/detailed_stat_collector.cc
new file mode 100644 (file)
index 0000000..d3aceae
--- /dev/null
@@ -0,0 +1,163 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+
+#include "detailed_stat_collector.h"
+#include <sys/time.h>
+#include <utility>
+#include <boost/tuple/tuple.hpp>
+
+void DetailedStatCollector::Op::dump(
+  ostream *out,
+  Formatter *f)
+{
+  if (!out)
+    return;
+  f->open_object_section(type.c_str());
+  f->dump_string("type", type);
+  f->dump_float("start", start);
+  f->dump_float("latency", latency);
+  f->dump_int("size", size);
+  f->dump_int("seq", seq);
+  f->close_section();
+  f->flush(*out);
+  *out << std::endl;
+}
+
+static utime_t cur_time()
+{
+  struct timeval tv;
+  gettimeofday(&tv, 0);
+  return utime_t(&tv);
+}
+
+DetailedStatCollector::Aggregator::Aggregator()
+  : recent_size(0), total_size(0), recent_latency(0),
+    total_latency(0), recent_ops(0), total_ops(0), started(false)
+{}
+
+void DetailedStatCollector::Aggregator::add(const Op &op)
+{
+  if (!started) {
+    last = first = op.start;
+    started = true;
+  }
+  ++recent_ops;
+  ++total_ops;
+  recent_size += op.size;
+  total_size += op.size;
+  recent_latency += op.latency;
+  total_latency += op.latency;
+}
+
+void DetailedStatCollector::Aggregator::dump(Formatter *f)
+{
+  utime_t now = cur_time();
+  f->dump_stream("time") << now;
+  f->dump_float("avg_recent_latency", recent_latency / recent_ops);
+  f->dump_float("avg_total_latency", total_latency / total_ops);
+  f->dump_float("avg_recent_iops", recent_ops / (now - last));
+  f->dump_float("avg_total_iops", total_ops / (now - first));
+  f->dump_float("avg_recent_throughput", recent_size / (now - last));
+  f->dump_float("avg_total_throughput", total_size / (now - first));
+  f->dump_float("avg_recent_throughput_mb",
+               (recent_size / (now - last)) / (1024*1024));
+  f->dump_float("avg_total_throughput_mb",
+               (total_size / (now - first)) / (1024*1024));
+  f->dump_float("duration", now - last);
+  last = now;
+  recent_latency = 0;
+  recent_size = 0;
+  recent_ops = 0;
+}
+
+DetailedStatCollector::DetailedStatCollector(
+  double bin_size,
+  Formatter *formatter,
+  ostream *out,
+  ostream *summary_out,
+  AdditionalPrinting *details
+  ) : bin_size(bin_size), f(formatter), out(out),
+      summary_out(summary_out), details(details),
+      lock("Stat::lock"), cur_seq(0) {
+  last_dump = cur_time();
+}
+
+uint64_t DetailedStatCollector::next_seq()
+{
+  Mutex::Locker l(lock);
+  if (summary_out && ((cur_time() - last_dump) > bin_size)) {
+    f->open_object_section("stats");
+    for (map<string, Aggregator>::iterator i = aggregators.begin();
+        i != aggregators.end();
+        ++i) {
+      f->open_object_section(i->first.c_str());
+      i->second.dump(f.get());
+      f->close_section();
+    }
+    f->close_section();
+    f->flush(*summary_out);
+    *summary_out << std::endl;
+    if (details) {
+      (*details)(summary_out);
+      *summary_out << std::endl;
+    }
+    last_dump = cur_time();
+  }
+  return cur_seq++;
+}
+
+void DetailedStatCollector::start_write(uint64_t seq, uint64_t length)
+{
+  Mutex::Locker l(lock);
+  utime_t now(cur_time());
+  not_committed.insert(make_pair(seq, make_pair(length, now)));
+  not_applied.insert(make_pair(seq, make_pair(length, now)));
+}
+
+void DetailedStatCollector::start_read(uint64_t seq, uint64_t length)
+{
+  Mutex::Locker l(lock);
+  utime_t now(cur_time());
+  not_read.insert(make_pair(seq, make_pair(length, now)));
+}
+
+void DetailedStatCollector::write_applied(uint64_t seq)
+{
+  Mutex::Locker l(lock);
+  Op op(
+    "write_applied",
+    not_applied[seq].second,
+    cur_time() - not_applied[seq].second,
+    not_applied[seq].first,
+    seq);
+  op.dump(out, f.get());
+  aggregators["write_applied"].add(op);
+  not_applied.erase(seq);
+}
+
+void DetailedStatCollector::write_committed(uint64_t seq)
+{
+  Mutex::Locker l(lock);
+  Op op(
+    "write_committed",
+    not_committed[seq].second,
+    cur_time() - not_committed[seq].second,
+    not_committed[seq].first,
+    seq);
+  op.dump(out, f.get());
+  aggregators["write_committed"].add(op);
+  not_committed.erase(seq);
+}
+
+void DetailedStatCollector::read_complete(uint64_t seq)
+{
+  Mutex::Locker l(lock);
+  Op op(
+    "read",
+    not_read[seq].second,
+    cur_time() - not_read[seq].second,
+    not_read[seq].first,
+    seq);
+  op.dump(out, f.get());
+  aggregators["read"].add(op);
+  not_read.erase(seq);
+}
diff --git a/src/test/bench/detailed_stat_collector.h b/src/test/bench/detailed_stat_collector.h
new file mode 100644 (file)
index 0000000..60b7cab
--- /dev/null
@@ -0,0 +1,96 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+
+#ifndef DETAILEDSTATCOLLECTERH
+#define DETAILEDSTATCOLLECTERH
+
+#include "stat_collector.h"
+#include "common/Formatter.h"
+#include <boost/scoped_ptr.hpp>
+#include "common/Mutex.h"
+#include "common/Cond.h"
+#include "include/utime.h"
+#include <list>
+#include <map>
+#include <boost/tuple/tuple.hpp>
+#include <ostream>
+
+class DetailedStatCollector : public StatCollector {
+public:
+  class AdditionalPrinting {
+  public:
+    virtual void operator()(std::ostream *) = 0;
+    virtual ~AdditionalPrinting() {}
+  };
+private:
+  struct Op {
+    string type;
+    utime_t start;
+    double latency;
+    uint64_t size;
+    uint64_t seq;
+    Op(
+      string type,
+      utime_t start,
+      double latency,
+      uint64_t size,
+      uint64_t seq)
+      : type(type), start(start), latency(latency),
+       size(size), seq(seq) {}
+    void dump(ostream *out, Formatter *f);
+  };
+  class Aggregator {
+    uint64_t recent_size;
+    uint64_t total_size;
+    double recent_latency;
+    double total_latency;
+    utime_t last;
+    utime_t first;
+    uint64_t recent_ops;
+    uint64_t total_ops;
+    bool started;
+  public:
+    Aggregator();
+
+    void add(const Op &op);
+    void dump(Formatter *f);
+  };
+  const double bin_size;
+  boost::scoped_ptr<Formatter> f;
+  ostream *out;
+  ostream *summary_out;
+  boost::scoped_ptr<AdditionalPrinting> details;
+  utime_t last_dump;
+
+  Mutex lock;
+  Cond cond;
+
+  map<string, Aggregator> aggregators;
+
+  map<uint64_t, pair<uint64_t, utime_t> > not_applied;
+  map<uint64_t, pair<uint64_t, utime_t> > not_committed;
+  map<uint64_t, pair<uint64_t, utime_t> > not_read;
+
+  uint64_t cur_seq;
+
+  void dump(
+    const string &type,
+    boost::tuple<utime_t, utime_t, uint64_t, uint64_t> stuff);
+public:
+  DetailedStatCollector(
+    double bin_size,
+    Formatter *formatter,
+    ostream *out,
+    ostream *summary_out,
+    AdditionalPrinting *details = 0
+    );
+
+  uint64_t next_seq();
+  void start_write(uint64_t seq, uint64_t size);
+  void start_read(uint64_t seq, uint64_t size);
+  void write_applied(uint64_t seq);
+  void write_committed(uint64_t seq);
+  void read_complete(uint64_t seq);
+
+};
+
+#endif
diff --git a/src/test/bench/distribution.h b/src/test/bench/distribution.h
new file mode 100644 (file)
index 0000000..56490ae
--- /dev/null
@@ -0,0 +1,136 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+
+#ifndef DISTIRITBIONHPP
+#define DISTIRITBIONHPP
+
+#include <map>
+#include <set>
+#include <utility>
+#include <vector>
+#include <boost/random/mersenne_twister.hpp>
+#include <boost/random/uniform_int.hpp>
+#include <boost/random/uniform_real.hpp>
+#include <boost/scoped_ptr.hpp>
+#include <boost/tuple/tuple.hpp>
+
+typedef boost::mt11213b rngen_t;
+
+template <typename T>
+class Distribution {
+public:
+  virtual T operator()() = 0;
+  virtual ~Distribution() {}
+};
+
+template <typename T, typename U, typename V, typename W>
+class FourTupleDist : public Distribution<boost::tuple<T, U, V, W> > {
+  boost::scoped_ptr<Distribution<T> > t;
+  boost::scoped_ptr<Distribution<U> > u;
+  boost::scoped_ptr<Distribution<V> > v;
+  boost::scoped_ptr<Distribution<W> > w;
+public:
+  FourTupleDist(
+    Distribution<T> *t,
+    Distribution<U> *u,
+    Distribution<V> *v,
+    Distribution<W> *w)
+    : t(t), u(u), v(v), w(w) {}
+  boost::tuple<T, U, V, W> operator()() {
+    return boost::make_tuple((*t)(), (*u)(), (*v)(), (*w)());
+  }
+};
+
+template <typename T>
+class RandomDist : public Distribution<T> {
+  rngen_t rng;
+  std::map<uint64_t, T> contents;
+public:
+  RandomDist(rngen_t rng, std::set<T> &initial) : rng(rng) {
+    uint64_t count = 0;
+    for (typename std::set<T>::iterator i = initial.begin();
+        i != initial.end();
+        ++i, ++count) {
+      contents.insert(std::make_pair(count, *i));
+    }
+  }
+  virtual T operator()() {
+    assert(contents.size());
+    boost::uniform_int<> value(0, contents.size() - 1);
+    return contents.find(value(rng))->second;
+  }
+};
+
+template <typename T>
+class WeightedDist : public Distribution<T> {
+  rngen_t rng;
+  double total;
+  std::map<double, T> contents;
+public:
+  WeightedDist(rngen_t rng, const std::set<std::pair<double, T> > &initial)
+    : rng(rng), total(0) {
+    for (typename std::set<std::pair<double, T> >::const_iterator i =
+          initial.begin();
+        i != initial.end();
+        ++i) {
+      total += i->first;
+      contents.insert(std::make_pair(total, i->second));
+    }
+  }
+  virtual T operator()() {
+    return contents.lower_bound(
+      boost::uniform_real<>(0, total)(rng))->second;
+  }
+};
+
+template <typename T, typename U>
+class SequentialDist : public Distribution<T> {
+  rngen_t rng;
+  std::vector<T> contents;
+  typename std::vector<T>::iterator cur;
+public:
+  SequentialDist(rngen_t rng, U &initial) : rng(rng) {
+    contents.insert(initial.begin(), initial.end());
+    cur = contents.begin();
+  }
+  virtual T operator()() {
+    assert(contents.size());
+    if (cur == contents.end())
+      cur = contents.begin();
+    return *(cur++);
+  }
+};
+
+class UniformRandom : public Distribution<uint64_t> {
+  rngen_t rng;
+  uint64_t min;
+  uint64_t max;
+public:
+  UniformRandom(rngen_t rng, uint64_t min, uint64_t max) :
+    rng(rng), min(min), max(max) {}
+  virtual uint64_t operator()() {
+    return boost::uniform_int<>(min, max)(rng);
+  }
+};
+
+class Align : public Distribution<uint64_t> {
+  boost::scoped_ptr<Distribution<uint64_t> > dist;
+  uint64_t align;
+public:
+  Align(Distribution<uint64_t> *dist, uint64_t align) :
+    dist(dist), align(align) {}
+  virtual uint64_t operator()() {
+    uint64_t ret = (*dist)();
+    return ret - (ret % align);
+  }
+};
+
+class Uniform : public Distribution<uint64_t> {
+  uint64_t val;
+public:
+  Uniform(uint64_t val) : val(val) {}
+  virtual uint64_t operator()() {
+    return val;
+  }
+};
+
+#endif
diff --git a/src/test/bench/dumb_backend.cc b/src/test/bench/dumb_backend.cc
new file mode 100644 (file)
index 0000000..a763c45
--- /dev/null
@@ -0,0 +1,95 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+
+#include <unistd.h>
+#include "dumb_backend.h"
+
+string DumbBackend::get_full_path(const string &oid)
+{
+       return path + "/" + oid;
+}
+
+void DumbBackend::_write(
+  const string &oid,
+  uint64_t offset,
+  const bufferlist &bl,
+  Context *on_applied,
+  Context *on_commit)
+{
+  string full_path(get_full_path(oid));
+  int fd = ::open(
+    full_path.c_str(), O_CREAT|O_WRONLY, 0777);
+  if (fd < 0) {
+    std::cout << full_path << ": errno is " << errno << std::endl;
+    assert(0);
+  }
+  ::lseek(fd, offset, SEEK_SET);
+  bl.write_fd(fd);
+  on_applied->complete(0);
+  if (do_fsync)
+    ::fsync(fd);
+  if (do_sync_file_range)
+    ::sync_file_range(fd, offset, bl.length(),
+                     SYNC_FILE_RANGE_WAIT_AFTER);
+  if (do_fadvise)
+    ::posix_fadvise(fd, offset, bl.length(),
+                   POSIX_FADV_DONTNEED);
+  ::close(fd);
+  {
+    Mutex::Locker l(pending_commit_mutex);
+    pending_commits.insert(on_commit);
+  }
+  sem.Put();
+}
+
+void DumbBackend::read(
+  const string &oid,
+  uint64_t offset,
+  uint64_t length,
+  bufferlist *bl,
+  Context *on_complete)
+{
+  string full_path(get_full_path(oid));
+  int fd = ::open(
+    full_path.c_str(), 0, O_RDONLY);
+  if (fd < 0) return;
+
+  int r = ::lseek(fd, offset, SEEK_SET);
+  if (r < 0) {
+    ::close(fd);
+    return;
+  }
+
+  bl->read_fd(fd, length);
+  ::close(fd);
+  on_complete->complete(0);
+}
+
+void DumbBackend::sync_loop()
+{
+  while (1) {
+    sleep(sync_interval);
+    {
+      Mutex::Locker l(sync_loop_mutex);
+      if (sync_loop_stop != 0) {
+       sync_loop_stop = 2;
+       sync_loop_cond.Signal();
+       break;
+      }
+    }
+    tp.pause();
+#ifdef HAVE_SYS_SYNCFS
+    ::syncfs(sync_fd);
+#else
+    ::sync();
+#endif
+    {
+      Mutex::Locker l(pending_commit_mutex);
+      for (set<Context*>::iterator i = pending_commits.begin();
+          i != pending_commits.end();
+          pending_commits.erase(i++)) {
+       (*i)->complete(0);
+      }
+    }
+    tp.unpause();
+  }
+}
diff --git a/src/test/bench/dumb_backend.h b/src/test/bench/dumb_backend.h
new file mode 100644 (file)
index 0000000..bbf8650
--- /dev/null
@@ -0,0 +1,169 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+
+#ifndef DUMBBACKEND
+#define DUMBBACKEND
+
+#include "backend.h"
+#include "include/Context.h"
+#include "os/ObjectStore.h"
+#include "common/WorkQueue.h"
+#include "common/Semaphore.h"
+
+#include <deque>
+
+class DumbBackend : public Backend {
+       const string path;
+
+  struct write_item {
+    const string oid;
+    bufferlist bl;
+    uint64_t offset;
+    Context *on_applied;
+    Context *on_commit;
+    write_item(
+      const string &oid,
+      const bufferlist &bl,
+      uint64_t offset,
+      Context *on_applied,
+      Context *on_commit) :
+      oid(oid), bl(bl), offset(offset), on_applied(on_applied),
+      on_commit(on_commit) {}
+  };
+
+  Semaphore sem;
+
+  bool do_fsync;
+  bool do_sync_file_range;
+  bool do_fadvise;
+  unsigned sync_interval;
+  int sync_fd;
+  ThreadPool tp;
+
+  class SyncThread : public Thread {
+    DumbBackend *backend;
+  public:
+    SyncThread(DumbBackend *backend) : backend(backend) {}
+    void *entry() {
+      backend->sync_loop();
+      return 0;
+    }
+  } thread;
+  friend class SyncThread;
+
+  Mutex sync_loop_mutex;
+  Cond sync_loop_cond;
+  int sync_loop_stop; // 0 for running, 1 for stopping, 2 for stopped
+  void sync_loop();
+
+  Mutex pending_commit_mutex;
+  set<Context*> pending_commits;
+
+  class WriteQueue : public ThreadPool::WorkQueue<write_item> {
+    deque<write_item*> item_queue;
+    DumbBackend *backend;
+
+  public:
+    WriteQueue(
+      DumbBackend *backend,
+      time_t ti,
+      ThreadPool *tp) :
+      ThreadPool::WorkQueue<write_item>("DumbBackend::queue", ti, ti*10, tp),
+      backend(backend) {}
+    bool _enqueue(write_item *item) {
+      item_queue.push_back(item);
+      return true;
+    }
+    void _dequeue(write_item*) { assert(0); }
+    write_item *_dequeue() {
+      if (item_queue.empty())
+       return 0;
+      write_item *retval = item_queue.front();
+      item_queue.pop_front();
+      return retval;
+    }
+    bool _empty() {
+      return item_queue.empty();
+    }
+    void _process(write_item *item) {
+      return backend->_write(
+       item->oid,
+       item->offset,
+       item->bl,
+       item->on_applied,
+       item->on_commit);
+      delete item;
+    }
+    void _clear() {
+      return item_queue.clear();
+    }
+  } queue;
+  friend class WriteQueue;
+
+  string get_full_path(const string &oid);
+
+  void _write(
+    const string &oid,
+    uint64_t offset,
+    const bufferlist &bl,
+    Context *on_applied,
+    Context *on_commit);
+
+public:
+  DumbBackend(
+    const string &path,
+    bool do_fsync,
+    bool do_sync_file_range,
+    bool do_fadvise,
+    unsigned sync_interval,
+    int sync_fd,
+    unsigned worker_threads,
+    CephContext *cct)
+    : path(path), do_fsync(do_fsync),
+      do_sync_file_range(do_sync_file_range),
+      do_fadvise(do_fadvise),
+      sync_interval(sync_interval),
+      sync_fd(sync_fd),
+      tp(cct, "DumbBackend::tp", worker_threads),
+      thread(this),
+      sync_loop_mutex("DumbBackend::sync_loop_mutex"),
+      sync_loop_stop(0),
+      pending_commit_mutex("DumbBackend::pending_commit_mutex"),
+      queue(this, 20, &tp) {
+    thread.create();
+    tp.start();
+    for (unsigned i = 0; i < 10*worker_threads; ++i) {
+      sem.Put();
+    }
+  }
+  ~DumbBackend() {
+    {
+      Mutex::Locker l(sync_loop_mutex);
+      if (sync_loop_stop == 0)
+       sync_loop_stop = 1;
+      while (sync_loop_stop < 2)
+       sync_loop_cond.Wait(sync_loop_mutex);
+    }
+    tp.stop();
+    thread.join();
+  }
+  void write(
+    const string &oid,
+    uint64_t offset,
+    const bufferlist &bl,
+    Context *on_applied,
+    Context *on_commit) {
+    sem.Get();
+    queue.queue(
+      new write_item(
+       oid, bl, offset, on_applied, on_commit));
+  }
+
+  void read(
+    const string &oid,
+    uint64_t offset,
+    uint64_t length,
+    bufferlist *bl,
+    Context *on_complete);
+};
+
+#endif
diff --git a/src/test/bench/filestore_backend.cc b/src/test/bench/filestore_backend.cc
new file mode 100644 (file)
index 0000000..44abee9
--- /dev/null
@@ -0,0 +1,77 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+
+#include "filestore_backend.h"
+#include "global/global_init.h"
+#include "os/ObjectStore.h"
+
+struct C_DeleteTransWrapper : public Context {
+  Context *c;
+  ObjectStore::Transaction *t;
+  C_DeleteTransWrapper(
+    ObjectStore::Transaction *t,
+    Context *c) : c(c), t(t) {}
+  void finish(int r) {
+    c->complete(r);
+    delete t;
+  }
+};
+
+FileStoreBackend::FileStoreBackend(
+  ObjectStore *os, bool write_infos)
+  : os(os), finisher(g_ceph_context), write_infos(write_infos)
+{
+  finisher.start();
+}
+
+void FileStoreBackend::write(
+  const string &oid,
+  uint64_t offset,
+  const bufferlist &bl,
+  Context *on_applied,
+  Context *on_commit)
+{
+  ObjectStore::Transaction *t = new ObjectStore::Transaction;
+  size_t sep = oid.find("/");
+  assert(sep != string::npos);
+  assert(sep + 1 < oid.size());
+  string coll_str(oid.substr(0, sep));
+
+  if (!osrs.count(coll_str))
+    osrs.insert(make_pair(coll_str, ObjectStore::Sequencer(coll_str)));
+  ObjectStore::Sequencer *osr = &(osrs.find(coll_str)->second);
+
+
+  coll_t c(coll_str);
+  hobject_t h(sobject_t(oid.substr(sep+1), 0));
+  t->write(c, h, offset, bl.length(), bl);
+
+  if (write_infos) {
+    bufferlist bl2;
+    for (uint64_t j = 0; j < 128; ++j) bl2.append(0);
+    coll_t meta("meta");
+    hobject_t info(sobject_t(string("info_")+coll_str, 0));
+    t->write(meta, info, 0, bl2.length(), bl2);
+  }
+
+  os->queue_transaction(
+    osr,
+    t,
+    new C_DeleteTransWrapper(t, on_applied),
+    on_commit);
+}
+
+void FileStoreBackend::read(
+  const string &oid,
+  uint64_t offset,
+  uint64_t length,
+  bufferlist *bl,
+  Context *on_complete)
+{
+  size_t sep = oid.find("/");
+  assert(sep != string::npos);
+  assert(sep + 1 < oid.size());
+  coll_t c(oid.substr(0, sep));
+  hobject_t h(sobject_t(oid.substr(sep+1), 0));
+  os->read(c, h, offset, length, *bl);
+  finisher.queue(on_complete);
+}
diff --git a/src/test/bench/filestore_backend.h b/src/test/bench/filestore_backend.h
new file mode 100644 (file)
index 0000000..7142993
--- /dev/null
@@ -0,0 +1,37 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+
+#ifndef FILESTOREBACKENDH
+#define FILESTOREBACKENDH
+
+#include "common/Finisher.h"
+#include "backend.h"
+#include "include/Context.h"
+#include "os/ObjectStore.h"
+
+class FileStoreBackend : public Backend {
+  ObjectStore *os;
+  Finisher finisher;
+  map<string, ObjectStore::Sequencer> osrs;
+  const bool write_infos;
+
+public:
+  FileStoreBackend(ObjectStore *os, bool write_infos);
+  ~FileStoreBackend() {
+    finisher.stop();
+  }
+  void write(
+    const string &oid,
+    uint64_t offset,
+    const bufferlist &bl,
+    Context *on_applied,
+    Context *on_commit);
+
+  void read(
+    const string &oid,
+    uint64_t offset,
+    uint64_t length,
+    bufferlist *bl,
+    Context *on_complete);
+};
+
+#endif
diff --git a/src/test/bench/rados_backend.cc b/src/test/bench/rados_backend.cc
new file mode 100644 (file)
index 0000000..89d808a
--- /dev/null
@@ -0,0 +1,62 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+
+#include "rados_backend.h"
+#include <boost/tuple/tuple.hpp>
+
+typedef boost::tuple<Context*, Context*, librados::AioCompletion*> arg_type;
+
+void on_applied(void *completion, void *_arg) {
+  arg_type *arg = static_cast<arg_type*>(_arg);
+  arg->get<1>()->complete(0);
+}
+
+void on_complete(void *completion, void *_arg) {
+  arg_type *arg = static_cast<arg_type*>(_arg);
+  arg->get<0>()->complete(0);
+  arg->get<2>()->release();
+  delete arg;
+}
+
+void RadosBackend::write(
+  const string &oid,
+  uint64_t offset,
+  const bufferlist &bl,
+  Context *on_write_applied,
+  Context *on_commit)
+{
+  librados::AioCompletion *completion = librados::Rados::aio_create_completion();
+
+
+  void *arg = static_cast<void *>(new arg_type(on_commit, on_write_applied,
+                                              completion));
+
+  completion->set_safe_callback(
+    arg,
+    on_complete);
+
+  completion->set_complete_callback(
+    arg,
+    on_applied);
+
+  ioctx->aio_write(oid, completion, bl, bl.length(), offset);
+}
+
+void RadosBackend::read(
+  const string &oid,
+  uint64_t offset,
+  uint64_t length,
+  bufferlist *bl,
+  Context *on_read_complete)
+{
+  librados::AioCompletion *completion = librados::Rados::aio_create_completion();
+
+
+  void *arg = static_cast<void *>(new arg_type(on_read_complete, 0,
+                                              completion));
+
+  completion->set_complete_callback(
+    arg,
+    on_complete);
+
+  ioctx->aio_read(oid, completion, bl, length, offset);
+}
diff --git a/src/test/bench/rados_backend.h b/src/test/bench/rados_backend.h
new file mode 100644 (file)
index 0000000..911d6c7
--- /dev/null
@@ -0,0 +1,31 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+
+#ifndef RADOSBACKENDH
+#define RADOSBACKENDH
+
+#include "backend.h"
+#include "include/Context.h"
+#include "include/rados/librados.hpp"
+
+class RadosBackend : public Backend {
+  librados::IoCtx *ioctx;
+public:
+  RadosBackend(
+    librados::IoCtx *ioctx)
+    : ioctx(ioctx) {}
+  void write(
+    const string &oid,
+    uint64_t offset,
+    const bufferlist &bl,
+    Context *on_applied,
+    Context *on_commit);
+
+  void read(
+    const string &oid,
+    uint64_t offset,
+    uint64_t length,
+    bufferlist *bl,
+    Context *on_complete);
+};
+
+#endif
diff --git a/src/test/bench/small_io_bench.cc b/src/test/bench/small_io_bench.cc
new file mode 100644 (file)
index 0000000..45d5c5d
--- /dev/null
@@ -0,0 +1,191 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+
+#include <boost/scoped_ptr.hpp>
+#include <boost/lexical_cast.hpp>
+#include <boost/program_options/option.hpp>
+#include <boost/program_options/options_description.hpp>
+#include <boost/program_options/variables_map.hpp>
+#include <boost/program_options/cmdline.hpp>
+#include <boost/program_options/parsers.hpp>
+#include <iostream>
+#include <set>
+#include <sstream>
+#include <stdlib.h>
+#include <fstream>
+#include <iostream>
+
+#include "common/Formatter.h"
+
+#include "bencher.h"
+#include "rados_backend.h"
+#include "detailed_stat_collector.h"
+#include "distribution.h"
+
+namespace po = boost::program_options;
+using namespace std;
+
+int main(int argc, char **argv)
+{
+  po::options_description desc("Allowed options");
+  desc.add_options()
+    ("help", "produce help message")
+    ("num-concurrent-ops", po::value<unsigned>()->default_value(10),
+     "set number of concurrent ops")
+    ("num-objects", po::value<unsigned>()->default_value(500),
+     "set number of objects to use")
+    ("object-size", po::value<unsigned>()->default_value(4<<20),
+     "set object size")
+    ("io-size", po::value<unsigned>()->default_value(4<<10),
+     "set io size")
+    ("write-ratio", po::value<double>()->default_value(0.75),
+     "set ratio of read to write")
+    ("duration", po::value<unsigned>()->default_value(0),
+     "set max duration, 0 for unlimited")
+    ("max-ops", po::value<unsigned>()->default_value(0),
+     "set max ops, 0 for unlimited")
+    ("seed", po::value<unsigned>(),
+     "seed")
+    ("ceph-client-id", po::value<string>()->default_value("admin"),
+     "set ceph client id")
+    ("pool-name", po::value<string>()->default_value("data"),
+     "set pool")
+    ("op-dump-file", po::value<string>()->default_value(""),
+     "set file for dumping op details, omit for stderr")
+    ("init-only", po::value<bool>()->default_value(false),
+     "populate object set")
+    ("use-prefix", po::value<string>()->default_value(""),
+     "use previously populated prefix")
+    ("offset-align", po::value<unsigned>()->default_value(4096),
+     "align offset by")
+    ("sequential", po::value<bool>()->default_value(false),
+     "use sequential access pattern")
+    ("disable-detailed-ops", po::value<bool>()->default_value(false),
+     "don't dump per op stats")
+    ;
+
+  po::variables_map vm;
+  po::store(po::parse_command_line(argc, argv, desc), vm);
+  po::notify(vm);
+
+  if (vm.count("help")) {
+    cout << desc << std::endl;
+    return 1;
+  }
+
+  if (vm["init-only"].as<bool>() && !vm["use-prefix"].as<string>().size()) {
+    cout << "Must supply prefix for init-only" << std::endl;
+    cout << desc << std::endl;
+    return 1;
+  }
+
+  string prefix;
+  if (vm["use-prefix"].as<string>().size()) {
+    prefix = vm["use_prefix"].as<string>();
+  } else {
+    char hostname_cstr[100];
+    gethostname(hostname_cstr, 100);
+    stringstream hostpid;
+    hostpid << hostname_cstr << getpid() << "-";
+    prefix = hostpid.str();
+  }
+
+  set<string> objects;
+  for (unsigned i = 0; i < vm["num-objects"].as<unsigned>();
+       ++i) {
+    stringstream name;
+    name << prefix << "-object_" << i;
+    objects.insert(name.str());
+  }
+
+  rngen_t rng;
+  if (vm.count("seed"))
+    rng = rngen_t(vm["seed"].as<unsigned>());
+
+  set<pair<double, Bencher::OpType> > ops;
+  ops.insert(make_pair(vm["write-ratio"].as<double>(), Bencher::WRITE));
+  ops.insert(make_pair(1-vm["write-ratio"].as<double>(), Bencher::READ));
+
+  librados::Rados rados;
+  librados::IoCtx ioctx;
+  int r = rados.init(vm["ceph-client-id"].as<string>().c_str());
+  if (r < 0) {
+    cerr << "error in init r=" << r << std::endl;
+    return -r;
+  }
+  r = rados.conf_read_file(NULL);
+  if (r < 0) {
+    cerr << "error in conf_read_file r=" << r << std::endl;
+    return -r;
+  }
+  r = rados.conf_parse_env(NULL);
+  if (r < 0) {
+    cerr << "error in conf_parse_env r=" << r << std::endl;
+    return -r;
+  }
+  r = rados.connect();
+  if (r < 0) {
+    cerr << "error in connect r=" << r << std::endl;
+    return -r;
+  }
+  r = rados.ioctx_create(vm["pool-name"].as<string>().c_str(), ioctx);
+  if (r < 0) {
+    cerr << "error in ioctx_create r=" << r << std::endl;
+    return -r;
+  }
+
+  ostream *detailed_ops = 0;
+  ofstream myfile;
+  if (vm["disable-detailed-ops"].as<bool>()) {
+    detailed_ops = 0;
+  } else if (vm["op-dump-file"].as<string>().size()) {
+    myfile.open(vm["op-dump-file"].as<string>().c_str());
+    detailed_ops = &myfile;
+  } else {
+    detailed_ops = &cerr;
+  }
+
+  Distribution<
+    boost::tuple<string, uint64_t, uint64_t, Bencher::OpType> > *gen = 0;
+  if (vm["sequential"].as<bool>()) {
+    std::cout << "Using Sequential generator" << std::endl;
+    gen = new SequentialLoad(
+      objects,
+      vm["object-size"].as<unsigned>(),
+      vm["io-size"].as<unsigned>(),
+      new WeightedDist<Bencher::OpType>(rng, ops)
+      );
+  } else {
+    std::cout << "Using random generator" << std::endl;
+    gen = new FourTupleDist<string, uint64_t, uint64_t, Bencher::OpType>(
+      new RandomDist<string>(rng, objects),
+      new Align(
+       new UniformRandom(
+         rng,
+         0,
+         vm["object-size"].as<unsigned>() - vm["io-size"].as<unsigned>()),
+       vm["offset-align"].as<unsigned>()
+       ),
+      new Uniform(vm["io-size"].as<unsigned>()),
+      new WeightedDist<Bencher::OpType>(rng, ops)
+      );
+  }
+
+  Bencher bencher(
+    gen,
+    new DetailedStatCollector(1, new JSONFormatter, detailed_ops, &cout),
+    new RadosBackend(&ioctx),
+    vm["num-concurrent-ops"].as<unsigned>(),
+    vm["duration"].as<unsigned>(),
+    vm["max-ops"].as<unsigned>());
+
+  bencher.init(objects, vm["object-size"].as<unsigned>(), &std::cout);
+  cout << "Created objects..." << std::endl;
+
+  bencher.run_bench();
+
+  rados.shutdown();
+  if (vm["op-dump-file"].as<string>().size()) {
+    myfile.close();
+  }
+  return 0;
+}
diff --git a/src/test/bench/small_io_bench_dumb.cc b/src/test/bench/small_io_bench_dumb.cc
new file mode 100644 (file)
index 0000000..46bdd0f
--- /dev/null
@@ -0,0 +1,220 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+
+#include <boost/scoped_ptr.hpp>
+#include <boost/lexical_cast.hpp>
+#include <boost/program_options/option.hpp>
+#include <boost/program_options/options_description.hpp>
+#include <boost/program_options/variables_map.hpp>
+#include <boost/program_options/cmdline.hpp>
+#include <boost/program_options/parsers.hpp>
+#include <iostream>
+#include <set>
+#include <sstream>
+#include <stdlib.h>
+#include <fstream>
+#include <iostream>
+
+#include "common/Formatter.h"
+
+#include "bencher.h"
+#include "rados_backend.h"
+#include "detailed_stat_collector.h"
+#include "distribution.h"
+#include "global/global_init.h"
+#include "os/FileStore.h"
+#include "dumb_backend.h"
+
+namespace po = boost::program_options;
+using namespace std;
+
+int main(int argc, char **argv)
+{
+  po::options_description desc("Allowed options");
+  desc.add_options()
+    ("help", "produce help message")
+    ("num-concurrent-ops", po::value<unsigned>()->default_value(10),
+     "set number of concurrent ops")
+    ("num-objects", po::value<unsigned>()->default_value(500),
+     "set number of objects to use")
+    ("object-size", po::value<unsigned>()->default_value(4<<20),
+     "set object size")
+    ("io-size", po::value<unsigned>()->default_value(4<<10),
+     "set io size")
+    ("write-ratio", po::value<double>()->default_value(0.75),
+     "set ratio of read to write")
+    ("duration", po::value<unsigned>()->default_value(0),
+     "set max duration, 0 for unlimited")
+    ("max-ops", po::value<unsigned>()->default_value(0),
+     "set max ops, 0 for unlimited")
+    ("seed", po::value<unsigned>(),
+     "seed")
+    ("num-colls", po::value<unsigned>()->default_value(20),
+     "number of collections")
+    ("op-dump-file", po::value<string>()->default_value(""),
+     "set file for dumping op details, omit for stderr")
+    ("filestore-path", po::value<string>(),
+     "path to filestore directory, mandatory")
+    ("offset-align", po::value<unsigned>()->default_value(4096),
+     "align offset by")
+    ("fsync", po::value<bool>()->default_value(false),
+     "fsync after each write")
+    ("sync-file-range", po::value<bool>()->default_value(false),
+     "sync-file-range after each write")
+    ("fadvise", po::value<bool>()->default_value(false),
+     "fadvise after each write")
+    ("sync-interval", po::value<unsigned>()->default_value(30),
+     "frequency to sync")
+    ("sequential", po::value<bool>()->default_value(false),
+     "use sequential access pattern")
+    ("disable-detailed-ops", po::value<bool>()->default_value(false),
+     "don't dump per op stats")
+    ;
+
+  po::variables_map vm;
+  po::parsed_options parsed =
+    po::command_line_parser(argc, argv).options(desc).allow_unregistered().run();
+  po::store(
+    parsed,
+    vm);
+  po::notify(vm);
+
+  vector<const char *> ceph_options, def_args;
+  vector<string> ceph_option_strings = po::collect_unrecognized(
+    parsed.options, po::include_positional);
+  ceph_options.reserve(ceph_option_strings.size());
+  for (vector<string>::iterator i = ceph_option_strings.begin();
+       i != ceph_option_strings.end();
+       ++i) {
+    ceph_options.push_back(i->c_str());
+  }
+
+  global_init(
+    &def_args, ceph_options, CEPH_ENTITY_TYPE_CLIENT,
+    CODE_ENVIRONMENT_UTILITY,
+    CINIT_FLAG_NO_DEFAULT_CONFIG_FILE);
+  common_init_finish(g_ceph_context);
+  g_ceph_context->_conf->apply_changes(NULL);
+
+  if (!vm.count("filestore-path")) {
+    cout << "Must provide filestore-path" << std::endl
+        << desc << std::endl;
+    return 1;
+  }
+
+  if (vm.count("help")) {
+    cout << desc << std::endl;
+    return 1;
+  }
+
+  rngen_t rng;
+  if (vm.count("seed"))
+    rng = rngen_t(vm["seed"].as<unsigned>());
+
+  set<pair<double, Bencher::OpType> > ops;
+  ops.insert(make_pair(vm["write-ratio"].as<double>(), Bencher::WRITE));
+  ops.insert(make_pair(1-vm["write-ratio"].as<double>(), Bencher::READ));
+
+  cout << "Creating objects.." << std::endl;
+  bufferlist bl;
+  for (uint64_t i = 0; i < vm["object-size"].as<unsigned>(); ++i) {
+    bl.append(0);
+  }
+  set<string> objects;
+
+  for (uint64_t num = 0; num < vm["num-objects"].as<unsigned>(); ++num) {
+    unsigned col_num = num % vm["num-colls"].as<unsigned>();
+    stringstream coll, obj;
+    coll << "collection_" << col_num;
+    obj << "obj_" << num;
+    if (num == col_num) {
+      std::cout << "collection " << coll.str() << std::endl;
+      string coll_str(
+       vm["filestore-path"].as<string>() + string("/") + coll.str());
+      int r = ::mkdir(
+       coll_str.c_str(),
+       0777);
+      if (r < 0) {
+       std::cerr << "Error " << errno << " creating collection" << std::endl;
+       return 1;
+      }
+    }
+    objects.insert(coll.str() + "/" + obj.str());
+  }
+  string meta_str(vm["filestore-path"].as<string>() + string("/meta"));
+  int r = ::mkdir(
+    meta_str.c_str(),
+    0777);
+  if (r < 0) {
+    std::cerr << "Error " << errno << " creating collection" << std::endl;
+    return 1;
+  }
+  r = ::open(meta_str.c_str(), 0);
+  if (r < 0) {
+    std::cerr << "Error " << errno << " opening meta" << std::endl;
+    return 1;
+  }
+  int sync_fd = r;
+
+  ostream *detailed_ops = 0;
+  ofstream myfile;
+  if (vm["disable-detailed-ops"].as<bool>()) {
+    detailed_ops = 0;
+  } else if (vm["op-dump-file"].as<string>().size()) {
+    myfile.open(vm["op-dump-file"].as<string>().c_str());
+    detailed_ops = &myfile;
+  } else {
+    detailed_ops = &cerr;
+  }
+
+  Distribution<
+    boost::tuple<string, uint64_t, uint64_t, Bencher::OpType> > *gen = 0;
+  if (vm["sequential"].as<bool>()) {
+    std::cout << "Using Sequential generator" << std::endl;
+    gen = new SequentialLoad(
+      objects,
+      vm["object-size"].as<unsigned>(),
+      vm["io-size"].as<unsigned>(),
+      new WeightedDist<Bencher::OpType>(rng, ops)
+      );
+  } else {
+    std::cout << "Using random generator" << std::endl;
+    gen = new FourTupleDist<string, uint64_t, uint64_t, Bencher::OpType>(
+      new RandomDist<string>(rng, objects),
+      new Align(
+       new UniformRandom(
+         rng,
+         0,
+         vm["object-size"].as<unsigned>() - vm["io-size"].as<unsigned>()),
+       vm["offset-align"].as<unsigned>()
+       ),
+      new Uniform(vm["io-size"].as<unsigned>()),
+      new WeightedDist<Bencher::OpType>(rng, ops)
+      );
+  }
+
+  Bencher bencher(
+    gen,
+    new DetailedStatCollector(1, new JSONFormatter, detailed_ops, &cout),
+    new DumbBackend(
+      vm["filestore-path"].as<string>(),
+      vm["fsync"].as<bool>(),
+      vm["sync-file-range"].as<bool>(),
+      vm["fadvise"].as<bool>(),
+      vm["sync-interval"].as<unsigned>(),
+      sync_fd,
+      10,
+      g_ceph_context),
+    vm["num-concurrent-ops"].as<unsigned>(),
+    vm["duration"].as<unsigned>(),
+    vm["max-ops"].as<unsigned>());
+
+  bencher.init(objects, vm["object-size"].as<unsigned>(), &std::cout);
+  cout << "Created objects..." << std::endl;
+
+  bencher.run_bench();
+
+  if (vm["op-dump-file"].as<string>().size()) {
+    myfile.close();
+  }
+  return 0;
+}
diff --git a/src/test/bench/small_io_bench_fs.cc b/src/test/bench/small_io_bench_fs.cc
new file mode 100644 (file)
index 0000000..6ce1394
--- /dev/null
@@ -0,0 +1,236 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+
+#include <boost/scoped_ptr.hpp>
+#include <boost/lexical_cast.hpp>
+#include <boost/program_options/option.hpp>
+#include <boost/program_options/options_description.hpp>
+#include <boost/program_options/variables_map.hpp>
+#include <boost/program_options/cmdline.hpp>
+#include <boost/program_options/parsers.hpp>
+#include <iostream>
+#include <set>
+#include <sstream>
+#include <stdlib.h>
+#include <fstream>
+#include <iostream>
+
+#include "common/Formatter.h"
+
+#include "bencher.h"
+#include "rados_backend.h"
+#include "detailed_stat_collector.h"
+#include "distribution.h"
+#include "global/global_init.h"
+#include "os/FileStore.h"
+#include "filestore_backend.h"
+#include "common/perf_counters.h"
+
+namespace po = boost::program_options;
+using namespace std;
+
+struct MorePrinting : public DetailedStatCollector::AdditionalPrinting {
+  CephContext *cct;
+  MorePrinting(CephContext *cct) : cct(cct) {}
+  void operator()(std::ostream *out) {
+    bufferlist bl;
+    cct->get_perfcounters_collection()->write_json_to_buf(bl, 0);
+    bl.append('\0');
+    *out << bl.c_str() << std::endl;
+  }
+};
+
+int main(int argc, char **argv)
+{
+  po::options_description desc("Allowed options");
+  desc.add_options()
+    ("help", "produce help message")
+    ("num-concurrent-ops", po::value<unsigned>()->default_value(10),
+     "set number of concurrent ops")
+    ("num-objects", po::value<unsigned>()->default_value(500),
+     "set number of objects to use")
+    ("object-size", po::value<unsigned>()->default_value(4<<20),
+     "set object size")
+    ("io-size", po::value<unsigned>()->default_value(4<<10),
+     "set io size")
+    ("write-ratio", po::value<double>()->default_value(0.75),
+     "set ratio of read to write")
+    ("duration", po::value<unsigned>()->default_value(0),
+     "set max duration, 0 for unlimited")
+    ("max-ops", po::value<unsigned>()->default_value(0),
+     "set max ops, 0 for unlimited")
+    ("seed", po::value<unsigned>(),
+     "seed")
+    ("num-colls", po::value<unsigned>()->default_value(20),
+     "number of collections")
+    ("op-dump-file", po::value<string>()->default_value(""),
+     "set file for dumping op details, omit for stderr")
+    ("filestore-path", po::value<string>(),
+     "path to filestore directory, mandatory")
+    ("journal-path", po::value<string>(),
+     "path to journal, mandatory")
+    ("offset-align", po::value<unsigned>()->default_value(4096),
+     "align offset by")
+    ("write-infos", po::value<bool>()->default_value(false),
+      "write info objects with main writes")
+    ("sequential", po::value<bool>()->default_value(false),
+     "do sequential writes like rbd")
+    ("disable-detailed-ops", po::value<bool>()->default_value(false),
+     "don't dump per op stats")
+    ("num-writers", po::value<unsigned>()->default_value(1),
+     "num write threads")
+    ;
+
+  po::variables_map vm;
+  po::parsed_options parsed =
+    po::command_line_parser(argc, argv).options(desc).allow_unregistered().run();
+  po::store(
+    parsed,
+    vm);
+  po::notify(vm);
+
+  vector<const char *> ceph_options, def_args;
+  vector<string> ceph_option_strings = po::collect_unrecognized(
+    parsed.options, po::include_positional);
+  ceph_options.reserve(ceph_option_strings.size());
+  for (vector<string>::iterator i = ceph_option_strings.begin();
+       i != ceph_option_strings.end();
+       ++i) {
+    ceph_options.push_back(i->c_str());
+  }
+
+  global_init(
+    &def_args, ceph_options, CEPH_ENTITY_TYPE_CLIENT,
+    CODE_ENVIRONMENT_UTILITY,
+    CINIT_FLAG_NO_DEFAULT_CONFIG_FILE);
+  common_init_finish(g_ceph_context);
+  g_ceph_context->_conf->apply_changes(NULL);
+
+  if (!vm.count("filestore-path") || !vm.count("journal-path")) {
+    cout << "Must provide filestore-path and journal-path" << std::endl
+        << desc << std::endl;
+    return 1;
+  }
+
+  if (vm.count("help")) {
+    cout << desc << std::endl;
+    return 1;
+  }
+
+  rngen_t rng;
+  if (vm.count("seed"))
+    rng = rngen_t(vm["seed"].as<unsigned>());
+
+  set<pair<double, Bencher::OpType> > ops;
+  ops.insert(make_pair(vm["write-ratio"].as<double>(), Bencher::WRITE));
+  ops.insert(make_pair(1-vm["write-ratio"].as<double>(), Bencher::READ));
+
+  FileStore fs(vm["filestore-path"].as<string>(),
+              vm["journal-path"].as<string>());
+  fs.mkfs();
+  fs.mount();
+
+  ostream *detailed_ops = 0;
+  ofstream myfile;
+  if (vm["disable-detailed-ops"].as<bool>()) {
+    detailed_ops = 0;
+  } else if (vm["op-dump-file"].as<string>().size()) {
+    myfile.open(vm["op-dump-file"].as<string>().c_str());
+    detailed_ops = &myfile;
+  } else {
+    detailed_ops = &cerr;
+  }
+
+  std::tr1::shared_ptr<StatCollector> col(
+    new DetailedStatCollector(
+      1, new JSONFormatter, detailed_ops, &cout,
+      new MorePrinting(g_ceph_context)));
+
+  cout << "Creating objects.." << std::endl;
+  bufferlist bl;
+  for (uint64_t i = 0; i < vm["object-size"].as<unsigned>(); ++i) {
+    bl.append(0);
+  }
+
+  for (uint64_t num = 0; num < vm["num-colls"].as<unsigned>(); ++num) {
+    stringstream coll;
+    coll << "collection_" << num;
+    std::cout << "collection " << coll.str() << std::endl;
+    ObjectStore::Transaction t;
+    t.create_collection(coll_t(coll.str()));
+    fs.apply_transaction(t);
+  }
+  {
+    ObjectStore::Transaction t;
+    t.create_collection(coll_t(string("meta")));
+    fs.apply_transaction(t);
+  }
+
+  vector<std::tr1::shared_ptr<Bencher> > benchers(
+    vm["num-writers"].as<unsigned>());
+  for (vector<std::tr1::shared_ptr<Bencher> >::iterator i = benchers.begin();
+       i != benchers.end();
+       ++i) {
+    set<string> objects;
+    for (uint64_t num = 0; num < vm["num-objects"].as<unsigned>(); ++num) {
+      unsigned col_num = num % vm["num-colls"].as<unsigned>();
+      stringstream coll, obj;
+      coll << "collection_" << col_num;
+      obj << "obj_" << num << "_bencher_" << (i - benchers.begin());
+      objects.insert(coll.str() + string("/") + obj.str());
+    }
+    Distribution<
+      boost::tuple<string, uint64_t, uint64_t, Bencher::OpType> > *gen = 0;
+    if (vm["sequential"].as<bool>()) {
+      std::cout << "Using Sequential generator" << std::endl;
+      gen = new SequentialLoad(
+       objects,
+       vm["object-size"].as<unsigned>(),
+       vm["io-size"].as<unsigned>(),
+       new WeightedDist<Bencher::OpType>(rng, ops)
+       );
+    } else {
+      std::cout << "Using random generator" << std::endl;
+      gen = new FourTupleDist<string, uint64_t, uint64_t, Bencher::OpType>(
+       new RandomDist<string>(rng, objects),
+       new Align(
+         new UniformRandom(
+           rng,
+           0,
+           vm["object-size"].as<unsigned>() - vm["io-size"].as<unsigned>()),
+         vm["offset-align"].as<unsigned>()
+         ),
+       new Uniform(vm["io-size"].as<unsigned>()),
+       new WeightedDist<Bencher::OpType>(rng, ops)
+       );
+    }
+
+    Bencher *bencher = new Bencher(
+      gen,
+      col,
+      new FileStoreBackend(&fs, vm["write-infos"].as<bool>()),
+      vm["num-concurrent-ops"].as<unsigned>(),
+      vm["duration"].as<unsigned>(),
+      vm["max-ops"].as<unsigned>());
+
+    bencher->init(objects, vm["object-size"].as<unsigned>(), &std::cout);
+    cout << "Created objects..." << std::endl;
+    (*i).reset(bencher);
+  }
+
+  for (vector<std::tr1::shared_ptr<Bencher> >::iterator i = benchers.begin();
+       i != benchers.end();
+       ++i) {
+    (*i)->create();
+  }
+  for (vector<std::tr1::shared_ptr<Bencher> >::iterator i = benchers.begin();
+       i != benchers.end();
+       ++i) {
+    (*i)->join();
+  }
+
+  fs.umount();
+  if (vm["op-dump-file"].as<string>().size()) {
+    myfile.close();
+  }
+  return 0;
+}
diff --git a/src/test/bench/smalliobenchprocessor.py b/src/test/bench/smalliobenchprocessor.py
new file mode 100644 (file)
index 0000000..b0a7f08
--- /dev/null
@@ -0,0 +1,43 @@
+import json
+import sys
+from pylab import hist
+import gzip
+
+def get_next_line(line, output):
+    val = json.loads(line)
+    if val['type'] not in output:
+        output[val['type']] = {}
+    for (name, value) in val.iteritems():
+        if name == "type":
+            continue
+        if name == "seq":
+            continue
+        if name not in output[val['type']]:
+            output[val['type']][name] = []
+        output[val['type']][name] += [float(value)]
+
+def wrapgz(gfilename):
+    def retval():
+        gfile = gzip.open(gfilename, 'rb')
+        gfile.__exit__ = lambda: gfile.close()
+        return gfile
+    return (gfilename, retval)
+
+def read_all_input(filename):
+    cur = {}
+    openfn = open
+    if ".gz" in filename:
+        openfn = wrapgz
+    with openfn(filename) as fh:
+        for line in fh:
+            get_next_line(line, cur);
+    return cur
+
+def write_committed_latency(out, bins, **kwargs):
+    hist(out['write_committed']['latency'], bins, **kwargs)
+
+def read_latency(out):
+    hist(out['read']['latency'], 100)
+
+def com(out): return out['write_committed']['latency']
+def app(out): return out['write_applied']['latency']
diff --git a/src/test/bench/stat_collector.h b/src/test/bench/stat_collector.h
new file mode 100644 (file)
index 0000000..4aef2bd
--- /dev/null
@@ -0,0 +1,19 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+
+#ifndef STATCOLLECTORH
+#define STATCOLLECTORH
+
+#include <stdint.h>
+
+class StatCollector {
+public:
+  virtual uint64_t next_seq() = 0;
+  virtual void start_write(uint64_t seq, uint64_t size) = 0;
+  virtual void start_read(uint64_t seq, uint64_t size) = 0;
+  virtual void write_applied(uint64_t seq) = 0;
+  virtual void write_committed(uint64_t seq) = 0;
+  virtual void read_complete(uint64_t seq) = 0;
+  virtual ~StatCollector() {}
+};
+
+#endif