src/omapbench
src/kvstorebench
ar-lib
+src/smalliobench
+src/smalliobenchdumb
+src/smalliobenchfs
# temporary directory used by e.g. "make distcheck", e.g. ceph-0.42
/ceph-[0-9]*/
testrados_LDADD = librados.la $(LIBGLOBAL_LDA)
bin_DEBUGPROGRAMS += testrados
+smalliobench_SOURCES = test/bench/small_io_bench.cc test/bench/rados_backend.cc test/bench/detailed_stat_collector.cc test/bench/bencher.cc
+smalliobench_LDADD = librados.la -lboost_program_options $(LIBGLOBAL_LDA)
+bin_DEBUGPROGRAMS += smalliobench
+
+smalliobenchfs_SOURCES = test/bench/small_io_bench_fs.cc test/bench/filestore_backend.cc test/bench/detailed_stat_collector.cc test/bench/bencher.cc
+smalliobenchfs_LDADD = librados.la -lboost_program_options $(LIBOS_LDA) $(LIBGLOBAL_LDA)
+bin_DEBUGPROGRAMS += smalliobenchfs
+
+smalliobenchdumb_SOURCES = test/bench/small_io_bench_dumb.cc test/bench/dumb_backend.cc test/bench/detailed_stat_collector.cc test/bench/bencher.cc
+smalliobenchdumb_LDADD = librados.la -lboost_program_options $(LIBOS_LDA) $(LIBGLOBAL_LDA)
+bin_DEBUGPROGRAMS += smalliobenchdumb
+
omapbench_SOURCES = test/omap_bench.cc
omapbench_LDADD = librados.la $(LIBGLOBAL_LDA)
bin_DEBUGPROGRAMS += omapbench
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+
+#ifndef BACKENDH
+#define BACKENDH
+
+#include "include/Context.h"
+
+class Backend {
+public:
+ virtual void write(
+ const string &oid,
+ uint64_t offset,
+ const bufferlist &bl,
+ Context *on_applied,
+ Context *on_commit) = 0;
+
+ virtual void read(
+ const string &oid,
+ uint64_t offset,
+ uint64_t length,
+ bufferlist *bl,
+ Context *on_complete) = 0;
+ virtual ~Backend() {}
+};
+
+#endif
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+
+#include "bencher.h"
+#include "include/utime.h"
+#include <unistd.h>
+#include <tr1/memory>
+#include "common/Mutex.h"
+#include "common/Cond.h"
+
+template<typename T>
+struct C_Holder : public Context {
+ T obj;
+ C_Holder(
+ T obj)
+ : obj(obj) {}
+ void finish(int r) {
+ return;
+ }
+};
+
+struct OnDelete {
+ Context *c;
+ OnDelete(Context *c) : c(c) {}
+ ~OnDelete() { c->complete(0); }
+};
+
+struct Cleanup : public Context {
+ Bencher *bench;
+ Cleanup(Bencher *bench) : bench(bench) {}
+ void finish(int r) {
+ bench->complete_op();
+ }
+};
+
+struct OnWriteApplied : public Context {
+ Bencher *bench;
+ uint64_t seq;
+ std::tr1::shared_ptr<OnDelete> on_delete;
+ OnWriteApplied(
+ Bencher *bench, uint64_t seq,
+ std::tr1::shared_ptr<OnDelete> on_delete
+ ) : bench(bench), seq(seq), on_delete(on_delete) {}
+ void finish(int r) {
+ bench->stat_collector->write_applied(seq);
+ }
+};
+
+struct OnWriteCommit : public Context {
+ Bencher *bench;
+ uint64_t seq;
+ std::tr1::shared_ptr<OnDelete> on_delete;
+ OnWriteCommit(
+ Bencher *bench, uint64_t seq,
+ std::tr1::shared_ptr<OnDelete> on_delete
+ ) : bench(bench), seq(seq), on_delete(on_delete) {}
+ void finish(int r) {
+ bench->stat_collector->write_committed(seq);
+ }
+};
+
+struct OnReadComplete : public Context {
+ Bencher *bench;
+ uint64_t seq;
+ boost::scoped_ptr<bufferlist> bl;
+ OnReadComplete(Bencher *bench, uint64_t seq, bufferlist *bl) :
+ bench(bench), seq(seq), bl(bl) {}
+ void finish(int r) {
+ bench->stat_collector->read_complete(seq);
+ bench->complete_op();
+ }
+};
+
+void Bencher::start_op() {
+ Mutex::Locker l(lock);
+ while (open_ops >= max_in_flight)
+ open_ops_cond.Wait(lock);
+ ++open_ops;
+}
+
+void Bencher::drain_ops() {
+ Mutex::Locker l(lock);
+ while (open_ops)
+ open_ops_cond.Wait(lock);
+}
+
+void Bencher::complete_op() {
+ Mutex::Locker l(lock);
+ assert(open_ops > 0);
+ --open_ops;
+ open_ops_cond.Signal();
+}
+
+struct OnFinish {
+ bool *done;
+ Mutex *lock;
+ Cond *cond;
+ OnFinish(
+ bool *done,
+ Mutex *lock,
+ Cond *cond) :
+ done(done), lock(lock), cond(cond) {}
+ ~OnFinish() {
+ Mutex::Locker l(*lock);
+ *done = true;
+ cond->Signal();
+ }
+};
+
+void Bencher::init(
+ const set<std::string> &objects,
+ uint64_t size,
+ std::ostream *out
+ )
+{
+ bufferlist bl;
+ for (uint64_t i = 0; i < size; ++i) {
+ bl.append(0);
+ }
+ Mutex lock("init_lock");
+ Cond cond;
+ bool done = 0;
+ {
+ std::tr1::shared_ptr<OnFinish> on_finish(
+ new OnFinish(&done, &lock, &cond));
+ uint64_t num = 0;
+ for (set<std::string>::const_iterator i = objects.begin();
+ i != objects.end();
+ ++i, ++num) {
+ if (!(num % 20))
+ *out << "Creating " << num << "/" << objects.size() << std::endl;
+ backend->write(
+ *i,
+ 0,
+ bl,
+ new C_Holder<std::tr1::shared_ptr<OnFinish> >(on_finish),
+ new C_Holder<std::tr1::shared_ptr<OnFinish> >(on_finish)
+ );
+ }
+ }
+ {
+ Mutex::Locker l(lock);
+ while (!done)
+ cond.Wait(lock);
+ }
+}
+
+void Bencher::run_bench()
+{
+ time_t end = time(0) + max_duration;
+ uint64_t ops = 0;
+
+ bufferlist bl;
+
+ while ((!max_duration || time(0) < end) && (!max_ops || ops < max_ops)) {
+ start_op();
+ uint64_t seq = stat_collector->next_seq();
+ boost::tuple<std::string, uint64_t, uint64_t, OpType> next =
+ (*op_dist)();
+ string obj_name = next.get<0>();
+ uint64_t offset = next.get<1>();
+ uint64_t length = next.get<2>();
+ OpType op_type = next.get<3>();
+ switch (op_type) {
+ case WRITE: {
+ std::tr1::shared_ptr<OnDelete> on_delete(
+ new OnDelete(new Cleanup(this)));
+ stat_collector->start_write(seq, length);
+ while (bl.length() < length) {
+ bl.append(rand());
+ }
+ backend->write(
+ obj_name,
+ offset,
+ bl,
+ new OnWriteApplied(
+ this, seq, on_delete),
+ new OnWriteCommit(
+ this, seq, on_delete)
+ );
+ break;
+ }
+ case READ: {
+ stat_collector->start_read(seq, length);
+ bufferlist *bl = new bufferlist;
+ backend->read(
+ obj_name,
+ offset,
+ length,
+ bl,
+ new OnReadComplete(
+ this, seq, bl)
+ );
+ break;
+ }
+ default: {
+ assert(0);
+ }
+ }
+ }
+ drain_ops();
+}
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+
+#ifndef BENCHERH
+#define BENCHERH
+
+#include <utility>
+#include "distribution.h"
+#include "stat_collector.h"
+#include "backend.h"
+#include <boost/scoped_ptr.hpp>
+#include "common/Mutex.h"
+#include "common/Cond.h"
+#include "common/Thread.h"
+
+class OnWriteApplied;
+class OnWriteCommit;
+class OnReadComplete;
+class Clenaup;
+
+class Bencher : public Thread {
+public:
+ enum OpType {
+ WRITE,
+ READ
+ };
+
+private:
+ boost::scoped_ptr<
+ Distribution<boost::tuple<std::string,uint64_t,uint64_t, OpType> > > op_dist;
+ std::tr1::shared_ptr<StatCollector> stat_collector;
+ boost::scoped_ptr<Backend> backend;
+ const uint64_t max_in_flight;
+ const uint64_t max_duration;
+ const uint64_t max_ops;
+
+ Mutex lock;
+ Cond open_ops_cond;
+ uint64_t open_ops;
+ void start_op();
+ void drain_ops();
+ void complete_op();
+public:
+ Bencher(
+ Distribution<boost::tuple<std::string, uint64_t, uint64_t, OpType> > *op_gen,
+ std::tr1::shared_ptr<StatCollector> stat_collector,
+ Backend *backend,
+ uint64_t max_in_flight,
+ uint64_t max_duration,
+ uint64_t max_ops) :
+ op_dist(op_gen),
+ stat_collector(stat_collector),
+ backend(backend),
+ max_in_flight(max_in_flight),
+ max_duration(max_duration),
+ max_ops(max_ops),
+ lock("Bencher::lock"),
+ open_ops(0)
+ {}
+ Bencher(
+ Distribution<boost::tuple<std::string, uint64_t, uint64_t, OpType> > *op_gen,
+ StatCollector *stat_collector,
+ Backend *backend,
+ uint64_t max_in_flight,
+ uint64_t max_duration,
+ uint64_t max_ops) :
+ op_dist(op_gen),
+ stat_collector(stat_collector),
+ backend(backend),
+ max_in_flight(max_in_flight),
+ max_duration(max_duration),
+ max_ops(max_ops),
+ lock("Bencher::lock"),
+ open_ops(0)
+ {}
+ Bencher(
+ Distribution<std::string> *object_gen,
+ Distribution<uint64_t> *offset_gen,
+ Distribution<uint64_t> *length_gen,
+ Distribution<OpType> *op_type_gen,
+ StatCollector *stat_collector,
+ Backend *backend,
+ uint64_t max_in_flight,
+ uint64_t max_duration,
+ uint64_t max_ops) :
+ op_dist(
+ new FourTupleDist<std::string, uint64_t, uint64_t, OpType>(
+ object_gen, offset_gen, length_gen, op_type_gen)),
+ stat_collector(stat_collector),
+ backend(backend),
+ max_in_flight(max_in_flight),
+ max_duration(max_duration),
+ max_ops(max_ops),
+ lock("Bencher::lock"),
+ open_ops(0)
+ {}
+
+ void init(
+ const set<std::string> &objects,
+ uint64_t size,
+ std::ostream *out
+ );
+
+ void run_bench();
+ void *entry() {
+ run_bench();
+ return 0;
+ }
+ friend class OnWriteApplied;
+ friend class OnWriteCommit;
+ friend class OnReadComplete;
+ friend class Cleanup;
+};
+
+class SequentialLoad :
+ public Distribution<
+ boost::tuple<string, uint64_t, uint64_t, Bencher::OpType> > {
+ set<string> objects;
+ uint64_t size;
+ uint64_t length;
+ set<string>::iterator object_pos;
+ uint64_t cur_pos;
+ boost::scoped_ptr<Distribution<Bencher::OpType> > op_dist;
+ SequentialLoad(const SequentialLoad &other);
+public:
+ SequentialLoad(
+ const set<string> &_objects, uint64_t size,
+ uint64_t length,
+ Distribution<Bencher::OpType> *op_dist)
+ : objects(_objects), size(size), length(length),
+ object_pos(objects.begin()), cur_pos(0),
+ op_dist(op_dist) {}
+
+ boost::tuple<string, uint64_t, uint64_t, Bencher::OpType>
+ operator()() {
+ boost::tuple<string, uint64_t, uint64_t, Bencher::OpType> ret =
+ boost::make_tuple(*object_pos, cur_pos, length, (*op_dist)());
+ cur_pos += length;
+ if (cur_pos > size) {
+ cur_pos = 0;
+ object_pos++;
+ }
+ if (object_pos == objects.end())
+ object_pos = objects.begin();
+ return ret;
+ }
+};
+#endif
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+
+#include "detailed_stat_collector.h"
+#include <sys/time.h>
+#include <utility>
+#include <boost/tuple/tuple.hpp>
+
+void DetailedStatCollector::Op::dump(
+ ostream *out,
+ Formatter *f)
+{
+ if (!out)
+ return;
+ f->open_object_section(type.c_str());
+ f->dump_string("type", type);
+ f->dump_float("start", start);
+ f->dump_float("latency", latency);
+ f->dump_int("size", size);
+ f->dump_int("seq", seq);
+ f->close_section();
+ f->flush(*out);
+ *out << std::endl;
+}
+
+static utime_t cur_time()
+{
+ struct timeval tv;
+ gettimeofday(&tv, 0);
+ return utime_t(&tv);
+}
+
+DetailedStatCollector::Aggregator::Aggregator()
+ : recent_size(0), total_size(0), recent_latency(0),
+ total_latency(0), recent_ops(0), total_ops(0), started(false)
+{}
+
+void DetailedStatCollector::Aggregator::add(const Op &op)
+{
+ if (!started) {
+ last = first = op.start;
+ started = true;
+ }
+ ++recent_ops;
+ ++total_ops;
+ recent_size += op.size;
+ total_size += op.size;
+ recent_latency += op.latency;
+ total_latency += op.latency;
+}
+
+void DetailedStatCollector::Aggregator::dump(Formatter *f)
+{
+ utime_t now = cur_time();
+ f->dump_stream("time") << now;
+ f->dump_float("avg_recent_latency", recent_latency / recent_ops);
+ f->dump_float("avg_total_latency", total_latency / total_ops);
+ f->dump_float("avg_recent_iops", recent_ops / (now - last));
+ f->dump_float("avg_total_iops", total_ops / (now - first));
+ f->dump_float("avg_recent_throughput", recent_size / (now - last));
+ f->dump_float("avg_total_throughput", total_size / (now - first));
+ f->dump_float("avg_recent_throughput_mb",
+ (recent_size / (now - last)) / (1024*1024));
+ f->dump_float("avg_total_throughput_mb",
+ (total_size / (now - first)) / (1024*1024));
+ f->dump_float("duration", now - last);
+ last = now;
+ recent_latency = 0;
+ recent_size = 0;
+ recent_ops = 0;
+}
+
+DetailedStatCollector::DetailedStatCollector(
+ double bin_size,
+ Formatter *formatter,
+ ostream *out,
+ ostream *summary_out,
+ AdditionalPrinting *details
+ ) : bin_size(bin_size), f(formatter), out(out),
+ summary_out(summary_out), details(details),
+ lock("Stat::lock"), cur_seq(0) {
+ last_dump = cur_time();
+}
+
+uint64_t DetailedStatCollector::next_seq()
+{
+ Mutex::Locker l(lock);
+ if (summary_out && ((cur_time() - last_dump) > bin_size)) {
+ f->open_object_section("stats");
+ for (map<string, Aggregator>::iterator i = aggregators.begin();
+ i != aggregators.end();
+ ++i) {
+ f->open_object_section(i->first.c_str());
+ i->second.dump(f.get());
+ f->close_section();
+ }
+ f->close_section();
+ f->flush(*summary_out);
+ *summary_out << std::endl;
+ if (details) {
+ (*details)(summary_out);
+ *summary_out << std::endl;
+ }
+ last_dump = cur_time();
+ }
+ return cur_seq++;
+}
+
+void DetailedStatCollector::start_write(uint64_t seq, uint64_t length)
+{
+ Mutex::Locker l(lock);
+ utime_t now(cur_time());
+ not_committed.insert(make_pair(seq, make_pair(length, now)));
+ not_applied.insert(make_pair(seq, make_pair(length, now)));
+}
+
+void DetailedStatCollector::start_read(uint64_t seq, uint64_t length)
+{
+ Mutex::Locker l(lock);
+ utime_t now(cur_time());
+ not_read.insert(make_pair(seq, make_pair(length, now)));
+}
+
+void DetailedStatCollector::write_applied(uint64_t seq)
+{
+ Mutex::Locker l(lock);
+ Op op(
+ "write_applied",
+ not_applied[seq].second,
+ cur_time() - not_applied[seq].second,
+ not_applied[seq].first,
+ seq);
+ op.dump(out, f.get());
+ aggregators["write_applied"].add(op);
+ not_applied.erase(seq);
+}
+
+void DetailedStatCollector::write_committed(uint64_t seq)
+{
+ Mutex::Locker l(lock);
+ Op op(
+ "write_committed",
+ not_committed[seq].second,
+ cur_time() - not_committed[seq].second,
+ not_committed[seq].first,
+ seq);
+ op.dump(out, f.get());
+ aggregators["write_committed"].add(op);
+ not_committed.erase(seq);
+}
+
+void DetailedStatCollector::read_complete(uint64_t seq)
+{
+ Mutex::Locker l(lock);
+ Op op(
+ "read",
+ not_read[seq].second,
+ cur_time() - not_read[seq].second,
+ not_read[seq].first,
+ seq);
+ op.dump(out, f.get());
+ aggregators["read"].add(op);
+ not_read.erase(seq);
+}
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+
+#ifndef DETAILEDSTATCOLLECTERH
+#define DETAILEDSTATCOLLECTERH
+
+#include "stat_collector.h"
+#include "common/Formatter.h"
+#include <boost/scoped_ptr.hpp>
+#include "common/Mutex.h"
+#include "common/Cond.h"
+#include "include/utime.h"
+#include <list>
+#include <map>
+#include <boost/tuple/tuple.hpp>
+#include <ostream>
+
+class DetailedStatCollector : public StatCollector {
+public:
+ class AdditionalPrinting {
+ public:
+ virtual void operator()(std::ostream *) = 0;
+ virtual ~AdditionalPrinting() {}
+ };
+private:
+ struct Op {
+ string type;
+ utime_t start;
+ double latency;
+ uint64_t size;
+ uint64_t seq;
+ Op(
+ string type,
+ utime_t start,
+ double latency,
+ uint64_t size,
+ uint64_t seq)
+ : type(type), start(start), latency(latency),
+ size(size), seq(seq) {}
+ void dump(ostream *out, Formatter *f);
+ };
+ class Aggregator {
+ uint64_t recent_size;
+ uint64_t total_size;
+ double recent_latency;
+ double total_latency;
+ utime_t last;
+ utime_t first;
+ uint64_t recent_ops;
+ uint64_t total_ops;
+ bool started;
+ public:
+ Aggregator();
+
+ void add(const Op &op);
+ void dump(Formatter *f);
+ };
+ const double bin_size;
+ boost::scoped_ptr<Formatter> f;
+ ostream *out;
+ ostream *summary_out;
+ boost::scoped_ptr<AdditionalPrinting> details;
+ utime_t last_dump;
+
+ Mutex lock;
+ Cond cond;
+
+ map<string, Aggregator> aggregators;
+
+ map<uint64_t, pair<uint64_t, utime_t> > not_applied;
+ map<uint64_t, pair<uint64_t, utime_t> > not_committed;
+ map<uint64_t, pair<uint64_t, utime_t> > not_read;
+
+ uint64_t cur_seq;
+
+ void dump(
+ const string &type,
+ boost::tuple<utime_t, utime_t, uint64_t, uint64_t> stuff);
+public:
+ DetailedStatCollector(
+ double bin_size,
+ Formatter *formatter,
+ ostream *out,
+ ostream *summary_out,
+ AdditionalPrinting *details = 0
+ );
+
+ uint64_t next_seq();
+ void start_write(uint64_t seq, uint64_t size);
+ void start_read(uint64_t seq, uint64_t size);
+ void write_applied(uint64_t seq);
+ void write_committed(uint64_t seq);
+ void read_complete(uint64_t seq);
+
+};
+
+#endif
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+
+#ifndef DISTIRITBIONHPP
+#define DISTIRITBIONHPP
+
+#include <map>
+#include <set>
+#include <utility>
+#include <vector>
+#include <boost/random/mersenne_twister.hpp>
+#include <boost/random/uniform_int.hpp>
+#include <boost/random/uniform_real.hpp>
+#include <boost/scoped_ptr.hpp>
+#include <boost/tuple/tuple.hpp>
+
+typedef boost::mt11213b rngen_t;
+
+template <typename T>
+class Distribution {
+public:
+ virtual T operator()() = 0;
+ virtual ~Distribution() {}
+};
+
+template <typename T, typename U, typename V, typename W>
+class FourTupleDist : public Distribution<boost::tuple<T, U, V, W> > {
+ boost::scoped_ptr<Distribution<T> > t;
+ boost::scoped_ptr<Distribution<U> > u;
+ boost::scoped_ptr<Distribution<V> > v;
+ boost::scoped_ptr<Distribution<W> > w;
+public:
+ FourTupleDist(
+ Distribution<T> *t,
+ Distribution<U> *u,
+ Distribution<V> *v,
+ Distribution<W> *w)
+ : t(t), u(u), v(v), w(w) {}
+ boost::tuple<T, U, V, W> operator()() {
+ return boost::make_tuple((*t)(), (*u)(), (*v)(), (*w)());
+ }
+};
+
+template <typename T>
+class RandomDist : public Distribution<T> {
+ rngen_t rng;
+ std::map<uint64_t, T> contents;
+public:
+ RandomDist(rngen_t rng, std::set<T> &initial) : rng(rng) {
+ uint64_t count = 0;
+ for (typename std::set<T>::iterator i = initial.begin();
+ i != initial.end();
+ ++i, ++count) {
+ contents.insert(std::make_pair(count, *i));
+ }
+ }
+ virtual T operator()() {
+ assert(contents.size());
+ boost::uniform_int<> value(0, contents.size() - 1);
+ return contents.find(value(rng))->second;
+ }
+};
+
+template <typename T>
+class WeightedDist : public Distribution<T> {
+ rngen_t rng;
+ double total;
+ std::map<double, T> contents;
+public:
+ WeightedDist(rngen_t rng, const std::set<std::pair<double, T> > &initial)
+ : rng(rng), total(0) {
+ for (typename std::set<std::pair<double, T> >::const_iterator i =
+ initial.begin();
+ i != initial.end();
+ ++i) {
+ total += i->first;
+ contents.insert(std::make_pair(total, i->second));
+ }
+ }
+ virtual T operator()() {
+ return contents.lower_bound(
+ boost::uniform_real<>(0, total)(rng))->second;
+ }
+};
+
+template <typename T, typename U>
+class SequentialDist : public Distribution<T> {
+ rngen_t rng;
+ std::vector<T> contents;
+ typename std::vector<T>::iterator cur;
+public:
+ SequentialDist(rngen_t rng, U &initial) : rng(rng) {
+ contents.insert(initial.begin(), initial.end());
+ cur = contents.begin();
+ }
+ virtual T operator()() {
+ assert(contents.size());
+ if (cur == contents.end())
+ cur = contents.begin();
+ return *(cur++);
+ }
+};
+
+class UniformRandom : public Distribution<uint64_t> {
+ rngen_t rng;
+ uint64_t min;
+ uint64_t max;
+public:
+ UniformRandom(rngen_t rng, uint64_t min, uint64_t max) :
+ rng(rng), min(min), max(max) {}
+ virtual uint64_t operator()() {
+ return boost::uniform_int<>(min, max)(rng);
+ }
+};
+
+class Align : public Distribution<uint64_t> {
+ boost::scoped_ptr<Distribution<uint64_t> > dist;
+ uint64_t align;
+public:
+ Align(Distribution<uint64_t> *dist, uint64_t align) :
+ dist(dist), align(align) {}
+ virtual uint64_t operator()() {
+ uint64_t ret = (*dist)();
+ return ret - (ret % align);
+ }
+};
+
+class Uniform : public Distribution<uint64_t> {
+ uint64_t val;
+public:
+ Uniform(uint64_t val) : val(val) {}
+ virtual uint64_t operator()() {
+ return val;
+ }
+};
+
+#endif
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+
+#include <unistd.h>
+#include "dumb_backend.h"
+
+string DumbBackend::get_full_path(const string &oid)
+{
+ return path + "/" + oid;
+}
+
+void DumbBackend::_write(
+ const string &oid,
+ uint64_t offset,
+ const bufferlist &bl,
+ Context *on_applied,
+ Context *on_commit)
+{
+ string full_path(get_full_path(oid));
+ int fd = ::open(
+ full_path.c_str(), O_CREAT|O_WRONLY, 0777);
+ if (fd < 0) {
+ std::cout << full_path << ": errno is " << errno << std::endl;
+ assert(0);
+ }
+ ::lseek(fd, offset, SEEK_SET);
+ bl.write_fd(fd);
+ on_applied->complete(0);
+ if (do_fsync)
+ ::fsync(fd);
+ if (do_sync_file_range)
+ ::sync_file_range(fd, offset, bl.length(),
+ SYNC_FILE_RANGE_WAIT_AFTER);
+ if (do_fadvise)
+ ::posix_fadvise(fd, offset, bl.length(),
+ POSIX_FADV_DONTNEED);
+ ::close(fd);
+ {
+ Mutex::Locker l(pending_commit_mutex);
+ pending_commits.insert(on_commit);
+ }
+ sem.Put();
+}
+
+void DumbBackend::read(
+ const string &oid,
+ uint64_t offset,
+ uint64_t length,
+ bufferlist *bl,
+ Context *on_complete)
+{
+ string full_path(get_full_path(oid));
+ int fd = ::open(
+ full_path.c_str(), 0, O_RDONLY);
+ if (fd < 0) return;
+
+ int r = ::lseek(fd, offset, SEEK_SET);
+ if (r < 0) {
+ ::close(fd);
+ return;
+ }
+
+ bl->read_fd(fd, length);
+ ::close(fd);
+ on_complete->complete(0);
+}
+
+void DumbBackend::sync_loop()
+{
+ while (1) {
+ sleep(sync_interval);
+ {
+ Mutex::Locker l(sync_loop_mutex);
+ if (sync_loop_stop != 0) {
+ sync_loop_stop = 2;
+ sync_loop_cond.Signal();
+ break;
+ }
+ }
+ tp.pause();
+#ifdef HAVE_SYS_SYNCFS
+ ::syncfs(sync_fd);
+#else
+ ::sync();
+#endif
+ {
+ Mutex::Locker l(pending_commit_mutex);
+ for (set<Context*>::iterator i = pending_commits.begin();
+ i != pending_commits.end();
+ pending_commits.erase(i++)) {
+ (*i)->complete(0);
+ }
+ }
+ tp.unpause();
+ }
+}
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+
+#ifndef DUMBBACKEND
+#define DUMBBACKEND
+
+#include "backend.h"
+#include "include/Context.h"
+#include "os/ObjectStore.h"
+#include "common/WorkQueue.h"
+#include "common/Semaphore.h"
+
+#include <deque>
+
+class DumbBackend : public Backend {
+ const string path;
+
+ struct write_item {
+ const string oid;
+ bufferlist bl;
+ uint64_t offset;
+ Context *on_applied;
+ Context *on_commit;
+ write_item(
+ const string &oid,
+ const bufferlist &bl,
+ uint64_t offset,
+ Context *on_applied,
+ Context *on_commit) :
+ oid(oid), bl(bl), offset(offset), on_applied(on_applied),
+ on_commit(on_commit) {}
+ };
+
+ Semaphore sem;
+
+ bool do_fsync;
+ bool do_sync_file_range;
+ bool do_fadvise;
+ unsigned sync_interval;
+ int sync_fd;
+ ThreadPool tp;
+
+ class SyncThread : public Thread {
+ DumbBackend *backend;
+ public:
+ SyncThread(DumbBackend *backend) : backend(backend) {}
+ void *entry() {
+ backend->sync_loop();
+ return 0;
+ }
+ } thread;
+ friend class SyncThread;
+
+ Mutex sync_loop_mutex;
+ Cond sync_loop_cond;
+ int sync_loop_stop; // 0 for running, 1 for stopping, 2 for stopped
+ void sync_loop();
+
+ Mutex pending_commit_mutex;
+ set<Context*> pending_commits;
+
+ class WriteQueue : public ThreadPool::WorkQueue<write_item> {
+ deque<write_item*> item_queue;
+ DumbBackend *backend;
+
+ public:
+ WriteQueue(
+ DumbBackend *backend,
+ time_t ti,
+ ThreadPool *tp) :
+ ThreadPool::WorkQueue<write_item>("DumbBackend::queue", ti, ti*10, tp),
+ backend(backend) {}
+ bool _enqueue(write_item *item) {
+ item_queue.push_back(item);
+ return true;
+ }
+ void _dequeue(write_item*) { assert(0); }
+ write_item *_dequeue() {
+ if (item_queue.empty())
+ return 0;
+ write_item *retval = item_queue.front();
+ item_queue.pop_front();
+ return retval;
+ }
+ bool _empty() {
+ return item_queue.empty();
+ }
+ void _process(write_item *item) {
+ return backend->_write(
+ item->oid,
+ item->offset,
+ item->bl,
+ item->on_applied,
+ item->on_commit);
+ delete item;
+ }
+ void _clear() {
+ return item_queue.clear();
+ }
+ } queue;
+ friend class WriteQueue;
+
+ string get_full_path(const string &oid);
+
+ void _write(
+ const string &oid,
+ uint64_t offset,
+ const bufferlist &bl,
+ Context *on_applied,
+ Context *on_commit);
+
+public:
+ DumbBackend(
+ const string &path,
+ bool do_fsync,
+ bool do_sync_file_range,
+ bool do_fadvise,
+ unsigned sync_interval,
+ int sync_fd,
+ unsigned worker_threads,
+ CephContext *cct)
+ : path(path), do_fsync(do_fsync),
+ do_sync_file_range(do_sync_file_range),
+ do_fadvise(do_fadvise),
+ sync_interval(sync_interval),
+ sync_fd(sync_fd),
+ tp(cct, "DumbBackend::tp", worker_threads),
+ thread(this),
+ sync_loop_mutex("DumbBackend::sync_loop_mutex"),
+ sync_loop_stop(0),
+ pending_commit_mutex("DumbBackend::pending_commit_mutex"),
+ queue(this, 20, &tp) {
+ thread.create();
+ tp.start();
+ for (unsigned i = 0; i < 10*worker_threads; ++i) {
+ sem.Put();
+ }
+ }
+ ~DumbBackend() {
+ {
+ Mutex::Locker l(sync_loop_mutex);
+ if (sync_loop_stop == 0)
+ sync_loop_stop = 1;
+ while (sync_loop_stop < 2)
+ sync_loop_cond.Wait(sync_loop_mutex);
+ }
+ tp.stop();
+ thread.join();
+ }
+ void write(
+ const string &oid,
+ uint64_t offset,
+ const bufferlist &bl,
+ Context *on_applied,
+ Context *on_commit) {
+ sem.Get();
+ queue.queue(
+ new write_item(
+ oid, bl, offset, on_applied, on_commit));
+ }
+
+ void read(
+ const string &oid,
+ uint64_t offset,
+ uint64_t length,
+ bufferlist *bl,
+ Context *on_complete);
+};
+
+#endif
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+
+#include "filestore_backend.h"
+#include "global/global_init.h"
+#include "os/ObjectStore.h"
+
+struct C_DeleteTransWrapper : public Context {
+ Context *c;
+ ObjectStore::Transaction *t;
+ C_DeleteTransWrapper(
+ ObjectStore::Transaction *t,
+ Context *c) : c(c), t(t) {}
+ void finish(int r) {
+ c->complete(r);
+ delete t;
+ }
+};
+
+FileStoreBackend::FileStoreBackend(
+ ObjectStore *os, bool write_infos)
+ : os(os), finisher(g_ceph_context), write_infos(write_infos)
+{
+ finisher.start();
+}
+
+void FileStoreBackend::write(
+ const string &oid,
+ uint64_t offset,
+ const bufferlist &bl,
+ Context *on_applied,
+ Context *on_commit)
+{
+ ObjectStore::Transaction *t = new ObjectStore::Transaction;
+ size_t sep = oid.find("/");
+ assert(sep != string::npos);
+ assert(sep + 1 < oid.size());
+ string coll_str(oid.substr(0, sep));
+
+ if (!osrs.count(coll_str))
+ osrs.insert(make_pair(coll_str, ObjectStore::Sequencer(coll_str)));
+ ObjectStore::Sequencer *osr = &(osrs.find(coll_str)->second);
+
+
+ coll_t c(coll_str);
+ hobject_t h(sobject_t(oid.substr(sep+1), 0));
+ t->write(c, h, offset, bl.length(), bl);
+
+ if (write_infos) {
+ bufferlist bl2;
+ for (uint64_t j = 0; j < 128; ++j) bl2.append(0);
+ coll_t meta("meta");
+ hobject_t info(sobject_t(string("info_")+coll_str, 0));
+ t->write(meta, info, 0, bl2.length(), bl2);
+ }
+
+ os->queue_transaction(
+ osr,
+ t,
+ new C_DeleteTransWrapper(t, on_applied),
+ on_commit);
+}
+
+void FileStoreBackend::read(
+ const string &oid,
+ uint64_t offset,
+ uint64_t length,
+ bufferlist *bl,
+ Context *on_complete)
+{
+ size_t sep = oid.find("/");
+ assert(sep != string::npos);
+ assert(sep + 1 < oid.size());
+ coll_t c(oid.substr(0, sep));
+ hobject_t h(sobject_t(oid.substr(sep+1), 0));
+ os->read(c, h, offset, length, *bl);
+ finisher.queue(on_complete);
+}
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+
+#ifndef FILESTOREBACKENDH
+#define FILESTOREBACKENDH
+
+#include "common/Finisher.h"
+#include "backend.h"
+#include "include/Context.h"
+#include "os/ObjectStore.h"
+
+class FileStoreBackend : public Backend {
+ ObjectStore *os;
+ Finisher finisher;
+ map<string, ObjectStore::Sequencer> osrs;
+ const bool write_infos;
+
+public:
+ FileStoreBackend(ObjectStore *os, bool write_infos);
+ ~FileStoreBackend() {
+ finisher.stop();
+ }
+ void write(
+ const string &oid,
+ uint64_t offset,
+ const bufferlist &bl,
+ Context *on_applied,
+ Context *on_commit);
+
+ void read(
+ const string &oid,
+ uint64_t offset,
+ uint64_t length,
+ bufferlist *bl,
+ Context *on_complete);
+};
+
+#endif
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+
+#include "rados_backend.h"
+#include <boost/tuple/tuple.hpp>
+
+typedef boost::tuple<Context*, Context*, librados::AioCompletion*> arg_type;
+
+void on_applied(void *completion, void *_arg) {
+ arg_type *arg = static_cast<arg_type*>(_arg);
+ arg->get<1>()->complete(0);
+}
+
+void on_complete(void *completion, void *_arg) {
+ arg_type *arg = static_cast<arg_type*>(_arg);
+ arg->get<0>()->complete(0);
+ arg->get<2>()->release();
+ delete arg;
+}
+
+void RadosBackend::write(
+ const string &oid,
+ uint64_t offset,
+ const bufferlist &bl,
+ Context *on_write_applied,
+ Context *on_commit)
+{
+ librados::AioCompletion *completion = librados::Rados::aio_create_completion();
+
+
+ void *arg = static_cast<void *>(new arg_type(on_commit, on_write_applied,
+ completion));
+
+ completion->set_safe_callback(
+ arg,
+ on_complete);
+
+ completion->set_complete_callback(
+ arg,
+ on_applied);
+
+ ioctx->aio_write(oid, completion, bl, bl.length(), offset);
+}
+
+void RadosBackend::read(
+ const string &oid,
+ uint64_t offset,
+ uint64_t length,
+ bufferlist *bl,
+ Context *on_read_complete)
+{
+ librados::AioCompletion *completion = librados::Rados::aio_create_completion();
+
+
+ void *arg = static_cast<void *>(new arg_type(on_read_complete, 0,
+ completion));
+
+ completion->set_complete_callback(
+ arg,
+ on_complete);
+
+ ioctx->aio_read(oid, completion, bl, length, offset);
+}
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+
+#ifndef RADOSBACKENDH
+#define RADOSBACKENDH
+
+#include "backend.h"
+#include "include/Context.h"
+#include "include/rados/librados.hpp"
+
+class RadosBackend : public Backend {
+ librados::IoCtx *ioctx;
+public:
+ RadosBackend(
+ librados::IoCtx *ioctx)
+ : ioctx(ioctx) {}
+ void write(
+ const string &oid,
+ uint64_t offset,
+ const bufferlist &bl,
+ Context *on_applied,
+ Context *on_commit);
+
+ void read(
+ const string &oid,
+ uint64_t offset,
+ uint64_t length,
+ bufferlist *bl,
+ Context *on_complete);
+};
+
+#endif
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+
+#include <boost/scoped_ptr.hpp>
+#include <boost/lexical_cast.hpp>
+#include <boost/program_options/option.hpp>
+#include <boost/program_options/options_description.hpp>
+#include <boost/program_options/variables_map.hpp>
+#include <boost/program_options/cmdline.hpp>
+#include <boost/program_options/parsers.hpp>
+#include <iostream>
+#include <set>
+#include <sstream>
+#include <stdlib.h>
+#include <fstream>
+#include <iostream>
+
+#include "common/Formatter.h"
+
+#include "bencher.h"
+#include "rados_backend.h"
+#include "detailed_stat_collector.h"
+#include "distribution.h"
+
+namespace po = boost::program_options;
+using namespace std;
+
+int main(int argc, char **argv)
+{
+ po::options_description desc("Allowed options");
+ desc.add_options()
+ ("help", "produce help message")
+ ("num-concurrent-ops", po::value<unsigned>()->default_value(10),
+ "set number of concurrent ops")
+ ("num-objects", po::value<unsigned>()->default_value(500),
+ "set number of objects to use")
+ ("object-size", po::value<unsigned>()->default_value(4<<20),
+ "set object size")
+ ("io-size", po::value<unsigned>()->default_value(4<<10),
+ "set io size")
+ ("write-ratio", po::value<double>()->default_value(0.75),
+ "set ratio of read to write")
+ ("duration", po::value<unsigned>()->default_value(0),
+ "set max duration, 0 for unlimited")
+ ("max-ops", po::value<unsigned>()->default_value(0),
+ "set max ops, 0 for unlimited")
+ ("seed", po::value<unsigned>(),
+ "seed")
+ ("ceph-client-id", po::value<string>()->default_value("admin"),
+ "set ceph client id")
+ ("pool-name", po::value<string>()->default_value("data"),
+ "set pool")
+ ("op-dump-file", po::value<string>()->default_value(""),
+ "set file for dumping op details, omit for stderr")
+ ("init-only", po::value<bool>()->default_value(false),
+ "populate object set")
+ ("use-prefix", po::value<string>()->default_value(""),
+ "use previously populated prefix")
+ ("offset-align", po::value<unsigned>()->default_value(4096),
+ "align offset by")
+ ("sequential", po::value<bool>()->default_value(false),
+ "use sequential access pattern")
+ ("disable-detailed-ops", po::value<bool>()->default_value(false),
+ "don't dump per op stats")
+ ;
+
+ po::variables_map vm;
+ po::store(po::parse_command_line(argc, argv, desc), vm);
+ po::notify(vm);
+
+ if (vm.count("help")) {
+ cout << desc << std::endl;
+ return 1;
+ }
+
+ if (vm["init-only"].as<bool>() && !vm["use-prefix"].as<string>().size()) {
+ cout << "Must supply prefix for init-only" << std::endl;
+ cout << desc << std::endl;
+ return 1;
+ }
+
+ string prefix;
+ if (vm["use-prefix"].as<string>().size()) {
+ prefix = vm["use_prefix"].as<string>();
+ } else {
+ char hostname_cstr[100];
+ gethostname(hostname_cstr, 100);
+ stringstream hostpid;
+ hostpid << hostname_cstr << getpid() << "-";
+ prefix = hostpid.str();
+ }
+
+ set<string> objects;
+ for (unsigned i = 0; i < vm["num-objects"].as<unsigned>();
+ ++i) {
+ stringstream name;
+ name << prefix << "-object_" << i;
+ objects.insert(name.str());
+ }
+
+ rngen_t rng;
+ if (vm.count("seed"))
+ rng = rngen_t(vm["seed"].as<unsigned>());
+
+ set<pair<double, Bencher::OpType> > ops;
+ ops.insert(make_pair(vm["write-ratio"].as<double>(), Bencher::WRITE));
+ ops.insert(make_pair(1-vm["write-ratio"].as<double>(), Bencher::READ));
+
+ librados::Rados rados;
+ librados::IoCtx ioctx;
+ int r = rados.init(vm["ceph-client-id"].as<string>().c_str());
+ if (r < 0) {
+ cerr << "error in init r=" << r << std::endl;
+ return -r;
+ }
+ r = rados.conf_read_file(NULL);
+ if (r < 0) {
+ cerr << "error in conf_read_file r=" << r << std::endl;
+ return -r;
+ }
+ r = rados.conf_parse_env(NULL);
+ if (r < 0) {
+ cerr << "error in conf_parse_env r=" << r << std::endl;
+ return -r;
+ }
+ r = rados.connect();
+ if (r < 0) {
+ cerr << "error in connect r=" << r << std::endl;
+ return -r;
+ }
+ r = rados.ioctx_create(vm["pool-name"].as<string>().c_str(), ioctx);
+ if (r < 0) {
+ cerr << "error in ioctx_create r=" << r << std::endl;
+ return -r;
+ }
+
+ ostream *detailed_ops = 0;
+ ofstream myfile;
+ if (vm["disable-detailed-ops"].as<bool>()) {
+ detailed_ops = 0;
+ } else if (vm["op-dump-file"].as<string>().size()) {
+ myfile.open(vm["op-dump-file"].as<string>().c_str());
+ detailed_ops = &myfile;
+ } else {
+ detailed_ops = &cerr;
+ }
+
+ Distribution<
+ boost::tuple<string, uint64_t, uint64_t, Bencher::OpType> > *gen = 0;
+ if (vm["sequential"].as<bool>()) {
+ std::cout << "Using Sequential generator" << std::endl;
+ gen = new SequentialLoad(
+ objects,
+ vm["object-size"].as<unsigned>(),
+ vm["io-size"].as<unsigned>(),
+ new WeightedDist<Bencher::OpType>(rng, ops)
+ );
+ } else {
+ std::cout << "Using random generator" << std::endl;
+ gen = new FourTupleDist<string, uint64_t, uint64_t, Bencher::OpType>(
+ new RandomDist<string>(rng, objects),
+ new Align(
+ new UniformRandom(
+ rng,
+ 0,
+ vm["object-size"].as<unsigned>() - vm["io-size"].as<unsigned>()),
+ vm["offset-align"].as<unsigned>()
+ ),
+ new Uniform(vm["io-size"].as<unsigned>()),
+ new WeightedDist<Bencher::OpType>(rng, ops)
+ );
+ }
+
+ Bencher bencher(
+ gen,
+ new DetailedStatCollector(1, new JSONFormatter, detailed_ops, &cout),
+ new RadosBackend(&ioctx),
+ vm["num-concurrent-ops"].as<unsigned>(),
+ vm["duration"].as<unsigned>(),
+ vm["max-ops"].as<unsigned>());
+
+ bencher.init(objects, vm["object-size"].as<unsigned>(), &std::cout);
+ cout << "Created objects..." << std::endl;
+
+ bencher.run_bench();
+
+ rados.shutdown();
+ if (vm["op-dump-file"].as<string>().size()) {
+ myfile.close();
+ }
+ return 0;
+}
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+
+#include <boost/scoped_ptr.hpp>
+#include <boost/lexical_cast.hpp>
+#include <boost/program_options/option.hpp>
+#include <boost/program_options/options_description.hpp>
+#include <boost/program_options/variables_map.hpp>
+#include <boost/program_options/cmdline.hpp>
+#include <boost/program_options/parsers.hpp>
+#include <iostream>
+#include <set>
+#include <sstream>
+#include <stdlib.h>
+#include <fstream>
+#include <iostream>
+
+#include "common/Formatter.h"
+
+#include "bencher.h"
+#include "rados_backend.h"
+#include "detailed_stat_collector.h"
+#include "distribution.h"
+#include "global/global_init.h"
+#include "os/FileStore.h"
+#include "dumb_backend.h"
+
+namespace po = boost::program_options;
+using namespace std;
+
+int main(int argc, char **argv)
+{
+ po::options_description desc("Allowed options");
+ desc.add_options()
+ ("help", "produce help message")
+ ("num-concurrent-ops", po::value<unsigned>()->default_value(10),
+ "set number of concurrent ops")
+ ("num-objects", po::value<unsigned>()->default_value(500),
+ "set number of objects to use")
+ ("object-size", po::value<unsigned>()->default_value(4<<20),
+ "set object size")
+ ("io-size", po::value<unsigned>()->default_value(4<<10),
+ "set io size")
+ ("write-ratio", po::value<double>()->default_value(0.75),
+ "set ratio of read to write")
+ ("duration", po::value<unsigned>()->default_value(0),
+ "set max duration, 0 for unlimited")
+ ("max-ops", po::value<unsigned>()->default_value(0),
+ "set max ops, 0 for unlimited")
+ ("seed", po::value<unsigned>(),
+ "seed")
+ ("num-colls", po::value<unsigned>()->default_value(20),
+ "number of collections")
+ ("op-dump-file", po::value<string>()->default_value(""),
+ "set file for dumping op details, omit for stderr")
+ ("filestore-path", po::value<string>(),
+ "path to filestore directory, mandatory")
+ ("offset-align", po::value<unsigned>()->default_value(4096),
+ "align offset by")
+ ("fsync", po::value<bool>()->default_value(false),
+ "fsync after each write")
+ ("sync-file-range", po::value<bool>()->default_value(false),
+ "sync-file-range after each write")
+ ("fadvise", po::value<bool>()->default_value(false),
+ "fadvise after each write")
+ ("sync-interval", po::value<unsigned>()->default_value(30),
+ "frequency to sync")
+ ("sequential", po::value<bool>()->default_value(false),
+ "use sequential access pattern")
+ ("disable-detailed-ops", po::value<bool>()->default_value(false),
+ "don't dump per op stats")
+ ;
+
+ po::variables_map vm;
+ po::parsed_options parsed =
+ po::command_line_parser(argc, argv).options(desc).allow_unregistered().run();
+ po::store(
+ parsed,
+ vm);
+ po::notify(vm);
+
+ vector<const char *> ceph_options, def_args;
+ vector<string> ceph_option_strings = po::collect_unrecognized(
+ parsed.options, po::include_positional);
+ ceph_options.reserve(ceph_option_strings.size());
+ for (vector<string>::iterator i = ceph_option_strings.begin();
+ i != ceph_option_strings.end();
+ ++i) {
+ ceph_options.push_back(i->c_str());
+ }
+
+ global_init(
+ &def_args, ceph_options, CEPH_ENTITY_TYPE_CLIENT,
+ CODE_ENVIRONMENT_UTILITY,
+ CINIT_FLAG_NO_DEFAULT_CONFIG_FILE);
+ common_init_finish(g_ceph_context);
+ g_ceph_context->_conf->apply_changes(NULL);
+
+ if (!vm.count("filestore-path")) {
+ cout << "Must provide filestore-path" << std::endl
+ << desc << std::endl;
+ return 1;
+ }
+
+ if (vm.count("help")) {
+ cout << desc << std::endl;
+ return 1;
+ }
+
+ rngen_t rng;
+ if (vm.count("seed"))
+ rng = rngen_t(vm["seed"].as<unsigned>());
+
+ set<pair<double, Bencher::OpType> > ops;
+ ops.insert(make_pair(vm["write-ratio"].as<double>(), Bencher::WRITE));
+ ops.insert(make_pair(1-vm["write-ratio"].as<double>(), Bencher::READ));
+
+ cout << "Creating objects.." << std::endl;
+ bufferlist bl;
+ for (uint64_t i = 0; i < vm["object-size"].as<unsigned>(); ++i) {
+ bl.append(0);
+ }
+ set<string> objects;
+
+ for (uint64_t num = 0; num < vm["num-objects"].as<unsigned>(); ++num) {
+ unsigned col_num = num % vm["num-colls"].as<unsigned>();
+ stringstream coll, obj;
+ coll << "collection_" << col_num;
+ obj << "obj_" << num;
+ if (num == col_num) {
+ std::cout << "collection " << coll.str() << std::endl;
+ string coll_str(
+ vm["filestore-path"].as<string>() + string("/") + coll.str());
+ int r = ::mkdir(
+ coll_str.c_str(),
+ 0777);
+ if (r < 0) {
+ std::cerr << "Error " << errno << " creating collection" << std::endl;
+ return 1;
+ }
+ }
+ objects.insert(coll.str() + "/" + obj.str());
+ }
+ string meta_str(vm["filestore-path"].as<string>() + string("/meta"));
+ int r = ::mkdir(
+ meta_str.c_str(),
+ 0777);
+ if (r < 0) {
+ std::cerr << "Error " << errno << " creating collection" << std::endl;
+ return 1;
+ }
+ r = ::open(meta_str.c_str(), 0);
+ if (r < 0) {
+ std::cerr << "Error " << errno << " opening meta" << std::endl;
+ return 1;
+ }
+ int sync_fd = r;
+
+ ostream *detailed_ops = 0;
+ ofstream myfile;
+ if (vm["disable-detailed-ops"].as<bool>()) {
+ detailed_ops = 0;
+ } else if (vm["op-dump-file"].as<string>().size()) {
+ myfile.open(vm["op-dump-file"].as<string>().c_str());
+ detailed_ops = &myfile;
+ } else {
+ detailed_ops = &cerr;
+ }
+
+ Distribution<
+ boost::tuple<string, uint64_t, uint64_t, Bencher::OpType> > *gen = 0;
+ if (vm["sequential"].as<bool>()) {
+ std::cout << "Using Sequential generator" << std::endl;
+ gen = new SequentialLoad(
+ objects,
+ vm["object-size"].as<unsigned>(),
+ vm["io-size"].as<unsigned>(),
+ new WeightedDist<Bencher::OpType>(rng, ops)
+ );
+ } else {
+ std::cout << "Using random generator" << std::endl;
+ gen = new FourTupleDist<string, uint64_t, uint64_t, Bencher::OpType>(
+ new RandomDist<string>(rng, objects),
+ new Align(
+ new UniformRandom(
+ rng,
+ 0,
+ vm["object-size"].as<unsigned>() - vm["io-size"].as<unsigned>()),
+ vm["offset-align"].as<unsigned>()
+ ),
+ new Uniform(vm["io-size"].as<unsigned>()),
+ new WeightedDist<Bencher::OpType>(rng, ops)
+ );
+ }
+
+ Bencher bencher(
+ gen,
+ new DetailedStatCollector(1, new JSONFormatter, detailed_ops, &cout),
+ new DumbBackend(
+ vm["filestore-path"].as<string>(),
+ vm["fsync"].as<bool>(),
+ vm["sync-file-range"].as<bool>(),
+ vm["fadvise"].as<bool>(),
+ vm["sync-interval"].as<unsigned>(),
+ sync_fd,
+ 10,
+ g_ceph_context),
+ vm["num-concurrent-ops"].as<unsigned>(),
+ vm["duration"].as<unsigned>(),
+ vm["max-ops"].as<unsigned>());
+
+ bencher.init(objects, vm["object-size"].as<unsigned>(), &std::cout);
+ cout << "Created objects..." << std::endl;
+
+ bencher.run_bench();
+
+ if (vm["op-dump-file"].as<string>().size()) {
+ myfile.close();
+ }
+ return 0;
+}
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+
+#include <boost/scoped_ptr.hpp>
+#include <boost/lexical_cast.hpp>
+#include <boost/program_options/option.hpp>
+#include <boost/program_options/options_description.hpp>
+#include <boost/program_options/variables_map.hpp>
+#include <boost/program_options/cmdline.hpp>
+#include <boost/program_options/parsers.hpp>
+#include <iostream>
+#include <set>
+#include <sstream>
+#include <stdlib.h>
+#include <fstream>
+#include <iostream>
+
+#include "common/Formatter.h"
+
+#include "bencher.h"
+#include "rados_backend.h"
+#include "detailed_stat_collector.h"
+#include "distribution.h"
+#include "global/global_init.h"
+#include "os/FileStore.h"
+#include "filestore_backend.h"
+#include "common/perf_counters.h"
+
+namespace po = boost::program_options;
+using namespace std;
+
+struct MorePrinting : public DetailedStatCollector::AdditionalPrinting {
+ CephContext *cct;
+ MorePrinting(CephContext *cct) : cct(cct) {}
+ void operator()(std::ostream *out) {
+ bufferlist bl;
+ cct->get_perfcounters_collection()->write_json_to_buf(bl, 0);
+ bl.append('\0');
+ *out << bl.c_str() << std::endl;
+ }
+};
+
+int main(int argc, char **argv)
+{
+ po::options_description desc("Allowed options");
+ desc.add_options()
+ ("help", "produce help message")
+ ("num-concurrent-ops", po::value<unsigned>()->default_value(10),
+ "set number of concurrent ops")
+ ("num-objects", po::value<unsigned>()->default_value(500),
+ "set number of objects to use")
+ ("object-size", po::value<unsigned>()->default_value(4<<20),
+ "set object size")
+ ("io-size", po::value<unsigned>()->default_value(4<<10),
+ "set io size")
+ ("write-ratio", po::value<double>()->default_value(0.75),
+ "set ratio of read to write")
+ ("duration", po::value<unsigned>()->default_value(0),
+ "set max duration, 0 for unlimited")
+ ("max-ops", po::value<unsigned>()->default_value(0),
+ "set max ops, 0 for unlimited")
+ ("seed", po::value<unsigned>(),
+ "seed")
+ ("num-colls", po::value<unsigned>()->default_value(20),
+ "number of collections")
+ ("op-dump-file", po::value<string>()->default_value(""),
+ "set file for dumping op details, omit for stderr")
+ ("filestore-path", po::value<string>(),
+ "path to filestore directory, mandatory")
+ ("journal-path", po::value<string>(),
+ "path to journal, mandatory")
+ ("offset-align", po::value<unsigned>()->default_value(4096),
+ "align offset by")
+ ("write-infos", po::value<bool>()->default_value(false),
+ "write info objects with main writes")
+ ("sequential", po::value<bool>()->default_value(false),
+ "do sequential writes like rbd")
+ ("disable-detailed-ops", po::value<bool>()->default_value(false),
+ "don't dump per op stats")
+ ("num-writers", po::value<unsigned>()->default_value(1),
+ "num write threads")
+ ;
+
+ po::variables_map vm;
+ po::parsed_options parsed =
+ po::command_line_parser(argc, argv).options(desc).allow_unregistered().run();
+ po::store(
+ parsed,
+ vm);
+ po::notify(vm);
+
+ vector<const char *> ceph_options, def_args;
+ vector<string> ceph_option_strings = po::collect_unrecognized(
+ parsed.options, po::include_positional);
+ ceph_options.reserve(ceph_option_strings.size());
+ for (vector<string>::iterator i = ceph_option_strings.begin();
+ i != ceph_option_strings.end();
+ ++i) {
+ ceph_options.push_back(i->c_str());
+ }
+
+ global_init(
+ &def_args, ceph_options, CEPH_ENTITY_TYPE_CLIENT,
+ CODE_ENVIRONMENT_UTILITY,
+ CINIT_FLAG_NO_DEFAULT_CONFIG_FILE);
+ common_init_finish(g_ceph_context);
+ g_ceph_context->_conf->apply_changes(NULL);
+
+ if (!vm.count("filestore-path") || !vm.count("journal-path")) {
+ cout << "Must provide filestore-path and journal-path" << std::endl
+ << desc << std::endl;
+ return 1;
+ }
+
+ if (vm.count("help")) {
+ cout << desc << std::endl;
+ return 1;
+ }
+
+ rngen_t rng;
+ if (vm.count("seed"))
+ rng = rngen_t(vm["seed"].as<unsigned>());
+
+ set<pair<double, Bencher::OpType> > ops;
+ ops.insert(make_pair(vm["write-ratio"].as<double>(), Bencher::WRITE));
+ ops.insert(make_pair(1-vm["write-ratio"].as<double>(), Bencher::READ));
+
+ FileStore fs(vm["filestore-path"].as<string>(),
+ vm["journal-path"].as<string>());
+ fs.mkfs();
+ fs.mount();
+
+ ostream *detailed_ops = 0;
+ ofstream myfile;
+ if (vm["disable-detailed-ops"].as<bool>()) {
+ detailed_ops = 0;
+ } else if (vm["op-dump-file"].as<string>().size()) {
+ myfile.open(vm["op-dump-file"].as<string>().c_str());
+ detailed_ops = &myfile;
+ } else {
+ detailed_ops = &cerr;
+ }
+
+ std::tr1::shared_ptr<StatCollector> col(
+ new DetailedStatCollector(
+ 1, new JSONFormatter, detailed_ops, &cout,
+ new MorePrinting(g_ceph_context)));
+
+ cout << "Creating objects.." << std::endl;
+ bufferlist bl;
+ for (uint64_t i = 0; i < vm["object-size"].as<unsigned>(); ++i) {
+ bl.append(0);
+ }
+
+ for (uint64_t num = 0; num < vm["num-colls"].as<unsigned>(); ++num) {
+ stringstream coll;
+ coll << "collection_" << num;
+ std::cout << "collection " << coll.str() << std::endl;
+ ObjectStore::Transaction t;
+ t.create_collection(coll_t(coll.str()));
+ fs.apply_transaction(t);
+ }
+ {
+ ObjectStore::Transaction t;
+ t.create_collection(coll_t(string("meta")));
+ fs.apply_transaction(t);
+ }
+
+ vector<std::tr1::shared_ptr<Bencher> > benchers(
+ vm["num-writers"].as<unsigned>());
+ for (vector<std::tr1::shared_ptr<Bencher> >::iterator i = benchers.begin();
+ i != benchers.end();
+ ++i) {
+ set<string> objects;
+ for (uint64_t num = 0; num < vm["num-objects"].as<unsigned>(); ++num) {
+ unsigned col_num = num % vm["num-colls"].as<unsigned>();
+ stringstream coll, obj;
+ coll << "collection_" << col_num;
+ obj << "obj_" << num << "_bencher_" << (i - benchers.begin());
+ objects.insert(coll.str() + string("/") + obj.str());
+ }
+ Distribution<
+ boost::tuple<string, uint64_t, uint64_t, Bencher::OpType> > *gen = 0;
+ if (vm["sequential"].as<bool>()) {
+ std::cout << "Using Sequential generator" << std::endl;
+ gen = new SequentialLoad(
+ objects,
+ vm["object-size"].as<unsigned>(),
+ vm["io-size"].as<unsigned>(),
+ new WeightedDist<Bencher::OpType>(rng, ops)
+ );
+ } else {
+ std::cout << "Using random generator" << std::endl;
+ gen = new FourTupleDist<string, uint64_t, uint64_t, Bencher::OpType>(
+ new RandomDist<string>(rng, objects),
+ new Align(
+ new UniformRandom(
+ rng,
+ 0,
+ vm["object-size"].as<unsigned>() - vm["io-size"].as<unsigned>()),
+ vm["offset-align"].as<unsigned>()
+ ),
+ new Uniform(vm["io-size"].as<unsigned>()),
+ new WeightedDist<Bencher::OpType>(rng, ops)
+ );
+ }
+
+ Bencher *bencher = new Bencher(
+ gen,
+ col,
+ new FileStoreBackend(&fs, vm["write-infos"].as<bool>()),
+ vm["num-concurrent-ops"].as<unsigned>(),
+ vm["duration"].as<unsigned>(),
+ vm["max-ops"].as<unsigned>());
+
+ bencher->init(objects, vm["object-size"].as<unsigned>(), &std::cout);
+ cout << "Created objects..." << std::endl;
+ (*i).reset(bencher);
+ }
+
+ for (vector<std::tr1::shared_ptr<Bencher> >::iterator i = benchers.begin();
+ i != benchers.end();
+ ++i) {
+ (*i)->create();
+ }
+ for (vector<std::tr1::shared_ptr<Bencher> >::iterator i = benchers.begin();
+ i != benchers.end();
+ ++i) {
+ (*i)->join();
+ }
+
+ fs.umount();
+ if (vm["op-dump-file"].as<string>().size()) {
+ myfile.close();
+ }
+ return 0;
+}
--- /dev/null
+import json
+import sys
+from pylab import hist
+import gzip
+
+def get_next_line(line, output):
+ val = json.loads(line)
+ if val['type'] not in output:
+ output[val['type']] = {}
+ for (name, value) in val.iteritems():
+ if name == "type":
+ continue
+ if name == "seq":
+ continue
+ if name not in output[val['type']]:
+ output[val['type']][name] = []
+ output[val['type']][name] += [float(value)]
+
+def wrapgz(gfilename):
+ def retval():
+ gfile = gzip.open(gfilename, 'rb')
+ gfile.__exit__ = lambda: gfile.close()
+ return gfile
+ return (gfilename, retval)
+
+def read_all_input(filename):
+ cur = {}
+ openfn = open
+ if ".gz" in filename:
+ openfn = wrapgz
+ with openfn(filename) as fh:
+ for line in fh:
+ get_next_line(line, cur);
+ return cur
+
+def write_committed_latency(out, bins, **kwargs):
+ hist(out['write_committed']['latency'], bins, **kwargs)
+
+def read_latency(out):
+ hist(out['read']['latency'], 100)
+
+def com(out): return out['write_committed']['latency']
+def app(out): return out['write_applied']['latency']
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+
+#ifndef STATCOLLECTORH
+#define STATCOLLECTORH
+
+#include <stdint.h>
+
+class StatCollector {
+public:
+ virtual uint64_t next_seq() = 0;
+ virtual void start_write(uint64_t seq, uint64_t size) = 0;
+ virtual void start_read(uint64_t seq, uint64_t size) = 0;
+ virtual void write_applied(uint64_t seq) = 0;
+ virtual void write_committed(uint64_t seq) = 0;
+ virtual void read_complete(uint64_t seq) = 0;
+ virtual ~StatCollector() {}
+};
+
+#endif