ceph_rgw_multiparser \
ceph_scratchtool \
ceph_scratchtoolpp \
- ceph_smalliobench \
- ceph_smalliobenchdumb \
- ceph_smalliobenchfs \
- ceph_smalliobenchrbd \
ceph_test_* \
- ceph_tpbench \
ceph_xattr_bench \
ceph-coverage \
ceph-monstore-tool \
%{_bindir}/ceph_rgw_multiparser
%{_bindir}/ceph_scratchtool
%{_bindir}/ceph_scratchtoolpp
-%{_bindir}/ceph_smalliobench
-%{_bindir}/ceph_smalliobenchdumb
-%{_bindir}/ceph_smalliobenchfs
-%{_bindir}/ceph_smalliobenchrbd
%{_bindir}/ceph_test_*
-%{_bindir}/ceph_tpbench
%{_bindir}/ceph_xattr_bench
%{_bindir}/ceph-coverage
%{_bindir}/ceph-debugpack
usr/bin/ceph_rgw_multiparser
usr/bin/ceph_scratchtool
usr/bin/ceph_scratchtoolpp
-usr/bin/ceph_smalliobench
-usr/bin/ceph_smalliobenchdumb
-usr/bin/ceph_smalliobenchfs
-usr/bin/ceph_smalliobenchrbd
usr/bin/ceph_test_*
-usr/bin/ceph_tpbench
usr/bin/ceph_xattr_bench
usr/lib/ceph/ceph-monstore-update-crush.sh
usr/share/java/libcephfs-test.jar
+++ /dev/null
-#!/bin/sh
-
-NUM="$1"
-GAP="$2"
-DUR="$3"
-
-[ -z "$NUM" ] && NUM=30
-[ -z "$GAP" ] && GAP=5
-[ -z "$DUR" ] && DUR=30
-
-for n in `seq 1 $NUM`; do
- echo "Starting $n of $NUM ..."
- ceph_smalliobenchrbd --pool rbd --duration $DUR --disable-detailed-ops 1 &
- sleep $GAP
-done
-echo "Waiting..."
-wait
-echo "OK"
set_source_files_properties(unit.cc PROPERTIES
COMPILE_FLAGS ${UNITTEST_CXX_FLAGS})
-add_subdirectory(bench)
add_subdirectory(cls_hello)
add_subdirectory(cls_lock)
add_subdirectory(cls_log)
+++ /dev/null
-# smalliobench
-set(smalliobench_srcs
- small_io_bench.cc
- rados_backend.cc
- detailed_stat_collector.cc
- bencher.cc
- )
-add_executable(ceph_smalliobench
- ${smalliobench_srcs}
- )
-target_link_libraries(ceph_smalliobench librados Boost::program_options global
- ${BLKID_LIBRARIES} ${CMAKE_DL_LIBS})
-
-# ceph_smalliobenchrbd
-if(WITH_RBD)
- set(smalliobenchrbd_srcs
- small_io_bench_rbd.cc
- rbd_backend.cc
- detailed_stat_collector.cc
- bencher.cc
- )
- add_executable(ceph_smalliobenchrbd
- ${smalliobenchrbd_srcs}
- $<TARGET_OBJECTS:common_texttable_obj>)
- target_link_libraries(ceph_smalliobenchrbd
- librbd
- librados
- os
- global
- Boost::program_options
- ${BLKID_LIBRARIES}
- ${CMAKE_DL_LIBS}
- )
- add_dependencies(ceph_smalliobenchrbd
- cls_rbd
- cls_journal
- cls_lock)
- install(TARGETS
- ceph_smalliobenchrbd
- DESTINATION bin)
-endif(WITH_RBD)
-
-# ceph_smalliobenchfs
-set(ceph_smalliobenchfs_srcs
- small_io_bench_fs.cc
- testfilestore_backend.cc
- detailed_stat_collector.cc
- bencher.cc
- )
-add_executable(ceph_smalliobenchfs
- ${ceph_smalliobenchfs_srcs}
- )
-target_link_libraries(ceph_smalliobenchfs librados Boost::program_options os global
- ${BLKID_LIBRARIES} ${CMAKE_DL_LIBS})
-
-# ceph_smalliobenchdumb
-set(smalliobenchdumb_srcs
- small_io_bench_dumb.cc
- dumb_backend.cc
- detailed_stat_collector.cc
- bencher.cc
- )
-add_executable(ceph_smalliobenchdumb
- ${smalliobenchdumb_srcs}
- )
-target_link_libraries(ceph_smalliobenchdumb librados Boost::program_options os global
- ${BLKID_LIBRARIES} ${CMAKE_DL_LIBS})
-
-# ceph_tpbench
-set(tpbench_srcs
- tp_bench.cc
- detailed_stat_collector.cc)
-add_executable(ceph_tpbench
- ${tpbench_srcs}
- )
-target_link_libraries(ceph_tpbench librados Boost::program_options global
- ${BLKID_LIBRARIES} ${CMAKE_DL_LIBS})
-
-install(TARGETS
- ceph_smalliobench
- ceph_smalliobenchfs
- ceph_smalliobenchdumb
- ceph_tpbench
- DESTINATION bin)
+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-
-#ifndef BACKENDH
-#define BACKENDH
-
-#include "include/Context.h"
-
-class Backend {
-public:
- virtual void write(
- const string &oid,
- uint64_t offset,
- const bufferlist &bl,
- Context *on_applied,
- Context *on_commit) = 0;
-
- virtual void read(
- const string &oid,
- uint64_t offset,
- uint64_t length,
- bufferlist *bl,
- Context *on_complete) = 0;
- virtual ~Backend() {}
-};
-
-#endif
+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-
-#include "bencher.h"
-#include "include/utime.h"
-#include <unistd.h>
-#include "include/memory.h"
-#include "common/Mutex.h"
-#include "common/Cond.h"
-
-template<typename T>
-struct C_Holder : public Context {
- T obj;
- explicit C_Holder(
- T obj)
- : obj(obj) {}
- void finish(int r) override {
- return;
- }
-};
-
-struct OnDelete {
- Context *c;
- explicit OnDelete(Context *c) : c(c) {}
- ~OnDelete() { c->complete(0); }
-};
-
-struct Cleanup : public Context {
- Bencher *bench;
- explicit Cleanup(Bencher *bench) : bench(bench) {}
- void finish(int r) override {
- bench->complete_op();
- }
-};
-
-struct OnWriteApplied : public Context {
- Bencher *bench;
- uint64_t seq;
- ceph::shared_ptr<OnDelete> on_delete;
- OnWriteApplied(
- Bencher *bench, uint64_t seq,
- ceph::shared_ptr<OnDelete> on_delete
- ) : bench(bench), seq(seq), on_delete(on_delete) {}
- void finish(int r) override {
- bench->stat_collector->write_applied(seq);
- }
-};
-
-struct OnWriteCommit : public Context {
- Bencher *bench;
- uint64_t seq;
- ceph::shared_ptr<OnDelete> on_delete;
- OnWriteCommit(
- Bencher *bench, uint64_t seq,
- ceph::shared_ptr<OnDelete> on_delete
- ) : bench(bench), seq(seq), on_delete(on_delete) {}
- void finish(int r) override {
- bench->stat_collector->write_committed(seq);
- }
-};
-
-struct OnReadComplete : public Context {
- Bencher *bench;
- uint64_t seq;
- boost::scoped_ptr<bufferlist> bl;
- OnReadComplete(Bencher *bench, uint64_t seq, bufferlist *bl) :
- bench(bench), seq(seq), bl(bl) {}
- void finish(int r) override {
- bench->stat_collector->read_complete(seq);
- bench->complete_op();
- }
-};
-
-void Bencher::start_op() {
- Mutex::Locker l(lock);
- while (open_ops >= max_in_flight)
- open_ops_cond.Wait(lock);
- ++open_ops;
-}
-
-void Bencher::drain_ops() {
- Mutex::Locker l(lock);
- while (open_ops)
- open_ops_cond.Wait(lock);
-}
-
-void Bencher::complete_op() {
- Mutex::Locker l(lock);
- assert(open_ops > 0);
- --open_ops;
- open_ops_cond.Signal();
-}
-
-struct OnFinish {
- bool *done;
- Mutex *lock;
- Cond *cond;
- OnFinish(
- bool *done,
- Mutex *lock,
- Cond *cond) :
- done(done), lock(lock), cond(cond) {}
- ~OnFinish() {
- Mutex::Locker l(*lock);
- *done = true;
- cond->Signal();
- }
-};
-
-void Bencher::init(
- const set<std::string> &objects,
- uint64_t size,
- std::ostream *out
- )
-{
- bufferlist bl;
- for (uint64_t i = 0; i < size; ++i) {
- bl.append(0);
- }
- Mutex lock("init_lock");
- Cond cond;
- bool done = 0;
- {
- ceph::shared_ptr<OnFinish> on_finish(
- new OnFinish(&done, &lock, &cond));
- uint64_t num = 0;
- for (set<std::string>::const_iterator i = objects.begin();
- i != objects.end();
- ++i, ++num) {
- if (!(num % 20))
- *out << "Creating " << num << "/" << objects.size() << std::endl;
- backend->write(
- *i,
- 0,
- bl,
- new C_Holder<ceph::shared_ptr<OnFinish> >(on_finish),
- new C_Holder<ceph::shared_ptr<OnFinish> >(on_finish)
- );
- }
- }
- {
- Mutex::Locker l(lock);
- while (!done)
- cond.Wait(lock);
- }
-}
-
-void Bencher::run_bench()
-{
- time_t end = time(0) + max_duration;
- uint64_t ops = 0;
-
- bufferlist bl;
-
- while ((!max_duration || time(0) < end) && (!max_ops || ops < max_ops)) {
- start_op();
- uint64_t seq = stat_collector->next_seq();
- boost::tuple<std::string, uint64_t, uint64_t, OpType> next =
- (*op_dist)();
- string obj_name = next.get<0>();
- uint64_t offset = next.get<1>();
- uint64_t length = next.get<2>();
- OpType op_type = next.get<3>();
- switch (op_type) {
- case WRITE: {
- ceph::shared_ptr<OnDelete> on_delete(
- new OnDelete(new Cleanup(this)));
- stat_collector->start_write(seq, length);
- while (bl.length() < length) {
- bl.append(rand());
- }
- backend->write(
- obj_name,
- offset,
- bl,
- new OnWriteApplied(
- this, seq, on_delete),
- new OnWriteCommit(
- this, seq, on_delete)
- );
- break;
- }
- case READ: {
- stat_collector->start_read(seq, length);
- bufferlist *read_bl = new bufferlist;
- backend->read(
- obj_name,
- offset,
- length,
- read_bl,
- new OnReadComplete(
- this, seq, read_bl)
- );
- break;
- }
- default: {
- ceph_abort();
- }
- }
- ops++;
- }
- drain_ops();
-}
+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-
-#ifndef BENCHERH
-#define BENCHERH
-
-#include <utility>
-#include "distribution.h"
-#include "stat_collector.h"
-#include "backend.h"
-#include <boost/scoped_ptr.hpp>
-#include "common/Mutex.h"
-#include "common/Cond.h"
-#include "common/Thread.h"
-
-struct OnWriteApplied;
-struct OnWriteCommit;
-struct OnReadComplete;
-struct Cleanup;
-
-class Bencher : public Thread {
-public:
- enum OpType {
- WRITE,
- READ
- };
-
-private:
- boost::scoped_ptr<
- Distribution<boost::tuple<std::string,uint64_t,uint64_t, OpType> > > op_dist;
- ceph::shared_ptr<StatCollector> stat_collector;
- boost::scoped_ptr<Backend> backend;
- const uint64_t max_in_flight;
- const uint64_t max_duration;
- const uint64_t max_ops;
-
- Mutex lock;
- Cond open_ops_cond;
- uint64_t open_ops;
- void start_op();
- void drain_ops();
- void complete_op();
-public:
- Bencher(
- Distribution<boost::tuple<std::string, uint64_t, uint64_t, OpType> > *op_gen,
- ceph::shared_ptr<StatCollector> stat_collector,
- Backend *backend,
- uint64_t max_in_flight,
- uint64_t max_duration,
- uint64_t max_ops) :
- op_dist(op_gen),
- stat_collector(stat_collector),
- backend(backend),
- max_in_flight(max_in_flight),
- max_duration(max_duration),
- max_ops(max_ops),
- lock("Bencher::lock"),
- open_ops(0)
- {}
- Bencher(
- Distribution<boost::tuple<std::string, uint64_t, uint64_t, OpType> > *op_gen,
- StatCollector *stat_collector,
- Backend *backend,
- uint64_t max_in_flight,
- uint64_t max_duration,
- uint64_t max_ops) :
- op_dist(op_gen),
- stat_collector(stat_collector),
- backend(backend),
- max_in_flight(max_in_flight),
- max_duration(max_duration),
- max_ops(max_ops),
- lock("Bencher::lock"),
- open_ops(0)
- {}
- Bencher(
- Distribution<std::string> *object_gen,
- Distribution<uint64_t> *offset_gen,
- Distribution<uint64_t> *length_gen,
- Distribution<OpType> *op_type_gen,
- StatCollector *stat_collector,
- Backend *backend,
- uint64_t max_in_flight,
- uint64_t max_duration,
- uint64_t max_ops) :
- op_dist(
- new FourTupleDist<std::string, uint64_t, uint64_t, OpType>(
- object_gen, offset_gen, length_gen, op_type_gen)),
- stat_collector(stat_collector),
- backend(backend),
- max_in_flight(max_in_flight),
- max_duration(max_duration),
- max_ops(max_ops),
- lock("Bencher::lock"),
- open_ops(0)
- {}
-
- void init(
- const set<std::string> &objects,
- uint64_t size,
- std::ostream *out
- );
-
- void run_bench();
- void *entry() override {
- run_bench();
- return 0;
- }
- friend struct OnWriteApplied;
- friend struct OnWriteCommit;
- friend struct OnReadComplete;
- friend struct Cleanup;
-};
-
-class SequentialLoad :
- public Distribution<
- boost::tuple<string, uint64_t, uint64_t, Bencher::OpType> > {
- set<string> objects;
- uint64_t size;
- uint64_t length;
- set<string>::iterator object_pos;
- uint64_t cur_pos;
- boost::scoped_ptr<Distribution<Bencher::OpType> > op_dist;
- SequentialLoad(const SequentialLoad &other);
-public:
- SequentialLoad(
- const set<string> &_objects, uint64_t size,
- uint64_t length,
- Distribution<Bencher::OpType> *op_dist)
- : objects(_objects), size(size), length(length),
- object_pos(objects.begin()), cur_pos(0),
- op_dist(op_dist) {}
-
- boost::tuple<string, uint64_t, uint64_t, Bencher::OpType>
- operator()() override {
- boost::tuple<string, uint64_t, uint64_t, Bencher::OpType> ret =
- boost::make_tuple(*object_pos, cur_pos, length, (*op_dist)());
- cur_pos += length;
- if (cur_pos >= size) {
- cur_pos = 0;
- ++object_pos;
- }
- if (object_pos == objects.end())
- object_pos = objects.begin();
- return ret;
- }
-};
-#endif
+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-
-#include "detailed_stat_collector.h"
-#include <sys/time.h>
-#include <utility>
-#include <boost/tuple/tuple.hpp>
-
-void DetailedStatCollector::Op::dump(
- ostream *out,
- Formatter *f)
-{
- if (!out)
- return;
- f->open_object_section(type.c_str());
- f->dump_string("type", type);
- f->dump_float("start", start);
- f->dump_float("latency", latency);
- f->dump_int("size", size);
- f->dump_int("seq", seq);
- f->close_section();
- f->flush(*out);
- *out << std::endl;
-}
-
-static utime_t cur_time()
-{
- struct timeval tv;
- gettimeofday(&tv, 0);
- return utime_t(&tv);
-}
-
-DetailedStatCollector::Aggregator::Aggregator()
- : recent_size(0), total_size(0), recent_latency(0),
- total_latency(0), recent_ops(0), total_ops(0), started(false)
-{}
-
-void DetailedStatCollector::Aggregator::add(const Op &op)
-{
- if (!started) {
- last = first = op.start;
- started = true;
- }
- ++recent_ops;
- ++total_ops;
- recent_size += op.size;
- total_size += op.size;
- recent_latency += op.latency;
- total_latency += op.latency;
-}
-
-void DetailedStatCollector::Aggregator::dump(Formatter *f)
-{
- utime_t now = cur_time();
- f->dump_stream("time") << now;
- f->dump_float("avg_recent_latency", recent_latency / recent_ops);
- f->dump_float("avg_total_latency", total_latency / total_ops);
- f->dump_float("avg_recent_iops", recent_ops / (now - last));
- f->dump_float("avg_total_iops", total_ops / (now - first));
- f->dump_float("avg_recent_throughput", recent_size / (now - last));
- f->dump_float("avg_total_throughput", total_size / (now - first));
- f->dump_float("avg_recent_throughput_mb",
- (recent_size / (now - last)) / (1024*1024));
- f->dump_float("avg_total_throughput_mb",
- (total_size / (now - first)) / (1024*1024));
- f->dump_float("duration", now - last);
- last = now;
- recent_latency = 0;
- recent_size = 0;
- recent_ops = 0;
-}
-
-DetailedStatCollector::DetailedStatCollector(
- double bin_size,
- Formatter *formatter,
- ostream *out,
- ostream *summary_out,
- AdditionalPrinting *details
- ) : bin_size(bin_size), f(formatter), out(out),
- summary_out(summary_out), details(details),
- lock("Stat::lock"), cur_seq(0) {
- last_dump = cur_time();
-}
-
-uint64_t DetailedStatCollector::next_seq()
-{
- Mutex::Locker l(lock);
- if (summary_out && ((cur_time() - last_dump) > bin_size)) {
- f->open_object_section("stats");
- for (map<string, Aggregator>::iterator i = aggregators.begin();
- i != aggregators.end();
- ++i) {
- f->open_object_section(i->first.c_str());
- i->second.dump(f.get());
- f->close_section();
- }
- f->close_section();
- f->flush(*summary_out);
- *summary_out << std::endl;
- if (details) {
- (*details)(summary_out);
- *summary_out << std::endl;
- }
- last_dump = cur_time();
- }
- return cur_seq++;
-}
-
-void DetailedStatCollector::start_write(uint64_t seq, uint64_t length)
-{
- Mutex::Locker l(lock);
- utime_t now(cur_time());
- not_committed.insert(make_pair(seq, make_pair(length, now)));
- not_applied.insert(make_pair(seq, make_pair(length, now)));
-}
-
-void DetailedStatCollector::start_read(uint64_t seq, uint64_t length)
-{
- Mutex::Locker l(lock);
- utime_t now(cur_time());
- not_read.insert(make_pair(seq, make_pair(length, now)));
-}
-
-void DetailedStatCollector::write_applied(uint64_t seq)
-{
- Mutex::Locker l(lock);
- Op op(
- "write_applied",
- not_applied[seq].second,
- cur_time() - not_applied[seq].second,
- not_applied[seq].first,
- seq);
- op.dump(out, f.get());
- aggregators["write_applied"].add(op);
- not_applied.erase(seq);
-}
-
-void DetailedStatCollector::write_committed(uint64_t seq)
-{
- Mutex::Locker l(lock);
- Op op(
- "write_committed",
- not_committed[seq].second,
- cur_time() - not_committed[seq].second,
- not_committed[seq].first,
- seq);
- op.dump(out, f.get());
- aggregators["write_committed"].add(op);
- not_committed.erase(seq);
-}
-
-void DetailedStatCollector::read_complete(uint64_t seq)
-{
- Mutex::Locker l(lock);
- Op op(
- "read",
- not_read[seq].second,
- cur_time() - not_read[seq].second,
- not_read[seq].first,
- seq);
- op.dump(out, f.get());
- aggregators["read"].add(op);
- not_read.erase(seq);
-}
+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-
-#ifndef DETAILEDSTATCOLLECTERH
-#define DETAILEDSTATCOLLECTERH
-
-#include "stat_collector.h"
-#include "common/Formatter.h"
-#include <boost/scoped_ptr.hpp>
-#include "common/Mutex.h"
-#include "common/Cond.h"
-#include "include/utime.h"
-#include <list>
-#include <map>
-#include <boost/tuple/tuple.hpp>
-#include <ostream>
-
-class DetailedStatCollector : public StatCollector {
-public:
- class AdditionalPrinting {
- public:
- virtual void operator()(std::ostream *) = 0;
- virtual ~AdditionalPrinting() {}
- };
-private:
- struct Op {
- string type;
- utime_t start;
- double latency;
- uint64_t size;
- uint64_t seq;
- Op(
- string type,
- utime_t start,
- double latency,
- uint64_t size,
- uint64_t seq)
- : type(type), start(start), latency(latency),
- size(size), seq(seq) {}
- void dump(ostream *out, Formatter *f);
- };
- class Aggregator {
- uint64_t recent_size;
- uint64_t total_size;
- double recent_latency;
- double total_latency;
- utime_t last;
- utime_t first;
- uint64_t recent_ops;
- uint64_t total_ops;
- bool started;
- public:
- Aggregator();
-
- void add(const Op &op);
- void dump(Formatter *f);
- };
- const double bin_size;
- boost::scoped_ptr<Formatter> f;
- ostream *out;
- ostream *summary_out;
- boost::scoped_ptr<AdditionalPrinting> details;
- utime_t last_dump;
-
- Mutex lock;
- Cond cond;
-
- map<string, Aggregator> aggregators;
-
- map<uint64_t, pair<uint64_t, utime_t> > not_applied;
- map<uint64_t, pair<uint64_t, utime_t> > not_committed;
- map<uint64_t, pair<uint64_t, utime_t> > not_read;
-
- uint64_t cur_seq;
-
- void dump(
- const string &type,
- boost::tuple<utime_t, utime_t, uint64_t, uint64_t> stuff);
-public:
- DetailedStatCollector(
- double bin_size,
- Formatter *formatter,
- ostream *out,
- ostream *summary_out,
- AdditionalPrinting *details = 0
- );
-
- uint64_t next_seq() override;
- void start_write(uint64_t seq, uint64_t size) override;
- void start_read(uint64_t seq, uint64_t size) override;
- void write_applied(uint64_t seq) override;
- void write_committed(uint64_t seq) override;
- void read_complete(uint64_t seq) override;
-
-};
-
-#endif
+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-
-#ifndef DISTIRITBIONHPP
-#define DISTIRITBIONHPP
-
-#include <map>
-#include <set>
-#include <utility>
-#include <vector>
-#include <boost/random/mersenne_twister.hpp>
-#include <boost/random/uniform_int.hpp>
-#include <boost/random/uniform_real.hpp>
-#include <boost/scoped_ptr.hpp>
-#include <boost/tuple/tuple.hpp>
-
-typedef boost::mt11213b rngen_t;
-
-template <typename T>
-class Distribution {
-public:
- virtual T operator()() = 0;
- virtual ~Distribution() {}
-};
-
-template <typename T, typename U, typename V, typename W>
-class FourTupleDist : public Distribution<boost::tuple<T, U, V, W> > {
- boost::scoped_ptr<Distribution<T> > t;
- boost::scoped_ptr<Distribution<U> > u;
- boost::scoped_ptr<Distribution<V> > v;
- boost::scoped_ptr<Distribution<W> > w;
-public:
- FourTupleDist(
- Distribution<T> *t,
- Distribution<U> *u,
- Distribution<V> *v,
- Distribution<W> *w)
- : t(t), u(u), v(v), w(w) {}
- boost::tuple<T, U, V, W> operator()() override {
- return boost::make_tuple((*t)(), (*u)(), (*v)(), (*w)());
- }
-};
-
-template <typename T>
-class RandomDist : public Distribution<T> {
- rngen_t rng;
- std::map<uint64_t, T> contents;
-public:
- RandomDist(const rngen_t &rng, std::set<T> &initial) : rng(rng) {
- uint64_t count = 0;
- for (typename std::set<T>::iterator i = initial.begin();
- i != initial.end();
- ++i, ++count) {
- contents.insert(std::make_pair(count, *i));
- }
- }
- T operator()() override {
- assert(contents.size());
- boost::uniform_int<> value(0, contents.size() - 1);
- return contents.find(value(rng))->second;
- }
-};
-
-template <typename T>
-class WeightedDist : public Distribution<T> {
- rngen_t rng;
- double total;
- std::map<double, T> contents;
-public:
- WeightedDist(const rngen_t &rng, const std::set<std::pair<double, T> > &initial)
- : rng(rng), total(0) {
- for (typename std::set<std::pair<double, T> >::const_iterator i =
- initial.begin();
- i != initial.end();
- ++i) {
- total += i->first;
- contents.insert(std::make_pair(total, i->second));
- }
- }
- T operator()() override {
- return contents.lower_bound(
- boost::uniform_real<>(0, total)(rng))->second;
- }
-};
-
-template <typename T, typename U>
-class SequentialDist : public Distribution<T> {
- rngen_t rng;
- std::vector<T> contents;
- typename std::vector<T>::iterator cur;
-public:
- SequentialDist(rngen_t rng, U &initial) : rng(rng) {
- contents.insert(initial.begin(), initial.end());
- cur = contents.begin();
- }
- virtual T operator()() {
- assert(contents.size());
- if (cur == contents.end())
- cur = contents.begin();
- return *(cur++);
- }
-};
-
-class UniformRandom : public Distribution<uint64_t> {
- rngen_t rng;
- uint64_t min;
- uint64_t max;
-public:
- UniformRandom(const rngen_t &rng, uint64_t min, uint64_t max) :
- rng(rng), min(min), max(max) {}
- uint64_t operator()() override {
- return boost::uniform_int<uint64_t>(min, max)(rng);
- }
-};
-
-class Align : public Distribution<uint64_t> {
- boost::scoped_ptr<Distribution<uint64_t> > dist;
- uint64_t align;
-public:
- Align(Distribution<uint64_t> *dist, uint64_t align) :
- dist(dist), align(align) {}
- uint64_t operator()() override {
- uint64_t ret = (*dist)();
- return ret - (ret % align);
- }
-};
-
-class Uniform : public Distribution<uint64_t> {
- uint64_t val;
-public:
- explicit Uniform(uint64_t val) : val(val) {}
- uint64_t operator()() override {
- return val;
- }
-};
-
-#endif
+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-
-#include "acconfig.h"
-
-#include <unistd.h>
-#include "dumb_backend.h"
-
-string DumbBackend::get_full_path(const string &oid)
-{
- return path + "/" + oid;
-}
-
-void DumbBackend::_write(
- const string &oid,
- uint64_t offset,
- const bufferlist &bl,
- Context *on_applied,
- Context *on_commit)
-{
- string full_path(get_full_path(oid));
- int fd = ::open(
- full_path.c_str(), O_CREAT|O_WRONLY, 0777);
- if (fd < 0) {
- std::cout << full_path << ": errno is " << errno << std::endl;
- ceph_abort();
- }
-
- int r = ::lseek(fd, offset, SEEK_SET);
- if (r < 0) {
- r = errno;
- std::cout << "lseek failed, errno is: " << r << std::endl;
- ::close(fd);
- return;
- }
- bl.write_fd(fd);
- on_applied->complete(0);
- if (do_fsync)
- ::fsync(fd);
-#ifdef HAVE_SYNC_FILE_RANGE
- if (do_sync_file_range)
- ::sync_file_range(fd, offset, bl.length(),
- SYNC_FILE_RANGE_WAIT_AFTER);
-#else
-# warning "sync_file_range not supported!"
-#endif
-#ifdef HAVE_POSIX_FADVISE
- if (do_fadvise) {
- int fa_r = ::posix_fadvise(fd, offset, bl.length(), POSIX_FADV_DONTNEED);
- if (fa_r) {
- std::cout << "posix_fadvise failed, errno is: " << fa_r << std::endl;
- }
- }
-#else
-# warning "posix_fadvise not supported!"
-#endif
- ::close(fd);
- {
- Mutex::Locker l(pending_commit_mutex);
- pending_commits.insert(on_commit);
- }
- sem.Put();
-}
-
-void DumbBackend::read(
- const string &oid,
- uint64_t offset,
- uint64_t length,
- bufferlist *bl,
- Context *on_complete)
-{
- string full_path(get_full_path(oid));
- int fd = ::open(
- full_path.c_str(), 0, O_RDONLY);
- if (fd < 0) return;
-
- int r = ::lseek(fd, offset, SEEK_SET);
- if (r < 0) {
- r = errno;
- std::cout << "lseek failed, errno is: " << r << std::endl;
- ::close(fd);
- return;
- }
-
- bl->read_fd(fd, length);
- ::close(fd);
- on_complete->complete(0);
-}
-
-void DumbBackend::sync_loop()
-{
- while (1) {
- sleep(sync_interval);
- {
- Mutex::Locker l(sync_loop_mutex);
- if (sync_loop_stop != 0) {
- sync_loop_stop = 2;
- sync_loop_cond.Signal();
- break;
- }
- }
- tp.pause();
-#ifdef HAVE_SYS_SYNCFS
- ::syncfs(sync_fd);
-#else
- ::sync();
-#endif
- {
- Mutex::Locker l(pending_commit_mutex);
- for (set<Context*>::iterator i = pending_commits.begin();
- i != pending_commits.end();
- pending_commits.erase(i++)) {
- (*i)->complete(0);
- }
- }
- tp.unpause();
- }
-}
+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-
-#ifndef DUMBBACKEND
-#define DUMBBACKEND
-
-#include "backend.h"
-#include "include/Context.h"
-#include "os/ObjectStore.h"
-#include "common/WorkQueue.h"
-#include "common/Semaphore.h"
-
-#include <deque>
-
-class DumbBackend : public Backend {
- const string path;
-
- struct write_item {
- const string oid;
- bufferlist bl;
- uint64_t offset;
- Context *on_applied;
- Context *on_commit;
- write_item(
- const string &oid,
- const bufferlist &bl,
- uint64_t offset,
- Context *on_applied,
- Context *on_commit) :
- oid(oid), bl(bl), offset(offset), on_applied(on_applied),
- on_commit(on_commit) {}
- };
-
- Semaphore sem;
-
- bool do_fsync;
- bool do_sync_file_range;
- bool do_fadvise;
- unsigned sync_interval;
- int sync_fd;
- ThreadPool tp;
-
- class SyncThread : public Thread {
- DumbBackend *backend;
- public:
- explicit SyncThread(DumbBackend *backend) : backend(backend) {}
- void *entry() override {
- backend->sync_loop();
- return 0;
- }
- } thread;
- friend class SyncThread;
-
- Mutex sync_loop_mutex;
- Cond sync_loop_cond;
- int sync_loop_stop; // 0 for running, 1 for stopping, 2 for stopped
- void sync_loop();
-
- Mutex pending_commit_mutex;
- set<Context*> pending_commits;
-
- class WriteQueue : public ThreadPool::WorkQueue<write_item> {
- deque<write_item*> item_queue;
- DumbBackend *backend;
-
- public:
- WriteQueue(
- DumbBackend *_backend,
- time_t ti,
- ThreadPool *tp) :
- ThreadPool::WorkQueue<write_item>("DumbBackend::queue", ti, ti*10, tp),
- backend(_backend) {}
- bool _enqueue(write_item *item) override {
- item_queue.push_back(item);
- return true;
- }
- void _dequeue(write_item*) override { ceph_abort(); }
- write_item *_dequeue() override {
- if (item_queue.empty())
- return 0;
- write_item *retval = item_queue.front();
- item_queue.pop_front();
- return retval;
- }
- bool _empty() override {
- return item_queue.empty();
- }
- void _process(write_item *item, ThreadPool::TPHandle &) override {
- return backend->_write(
- item->oid,
- item->offset,
- item->bl,
- item->on_applied,
- item->on_commit);
- }
- void _clear() override {
- return item_queue.clear();
- }
- } queue;
- friend class WriteQueue;
-
- string get_full_path(const string &oid);
-
- void _write(
- const string &oid,
- uint64_t offset,
- const bufferlist &bl,
- Context *on_applied,
- Context *on_commit);
-
-public:
- DumbBackend(
- const string &path,
- bool do_fsync,
- bool do_sync_file_range,
- bool do_fadvise,
- unsigned sync_interval,
- int sync_fd,
- unsigned worker_threads,
- CephContext *cct)
- : path(path), do_fsync(do_fsync),
- do_sync_file_range(do_sync_file_range),
- do_fadvise(do_fadvise),
- sync_interval(sync_interval),
- sync_fd(sync_fd),
- tp(cct, "DumbBackend::tp", "tp_dumb_backend", worker_threads),
- thread(this),
- sync_loop_mutex("DumbBackend::sync_loop_mutex"),
- sync_loop_stop(0),
- pending_commit_mutex("DumbBackend::pending_commit_mutex"),
- queue(this, 20, &tp) {
- thread.create("thread");
- tp.start();
- for (unsigned i = 0; i < 10*worker_threads; ++i) {
- sem.Put();
- }
- }
- ~DumbBackend() override {
- {
- Mutex::Locker l(sync_loop_mutex);
- if (sync_loop_stop == 0)
- sync_loop_stop = 1;
- while (sync_loop_stop < 2)
- sync_loop_cond.Wait(sync_loop_mutex);
- }
- tp.stop();
- thread.join();
- }
- void write(
- const string &oid,
- uint64_t offset,
- const bufferlist &bl,
- Context *on_applied,
- Context *on_commit) override {
- sem.Get();
- queue.queue(
- new write_item(
- oid, bl, offset, on_applied, on_commit));
- }
-
- void read(
- const string &oid,
- uint64_t offset,
- uint64_t length,
- bufferlist *bl,
- Context *on_complete) override;
-};
-
-#endif
+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-
-#include "rados_backend.h"
-#include <boost/tuple/tuple.hpp>
-
-typedef boost::tuple<Context*, Context*, librados::AioCompletion*> arg_type;
-
-void on_applied(void *completion, void *_arg) {
- arg_type *arg = static_cast<arg_type*>(_arg);
- arg->get<1>()->complete(0);
-}
-
-void on_complete(void *completion, void *_arg) {
- arg_type *arg = static_cast<arg_type*>(_arg);
- arg->get<0>()->complete(0);
- arg->get<2>()->release();
- delete arg;
-}
-
-void RadosBackend::write(
- const string &oid,
- uint64_t offset,
- const bufferlist &bl,
- Context *on_write_applied,
- Context *on_commit)
-{
- librados::AioCompletion *completion = librados::Rados::aio_create_completion();
-
-
- void *arg = static_cast<void *>(new arg_type(on_commit, on_write_applied,
- completion));
-
- completion->set_safe_callback(
- arg,
- on_complete);
-
- completion->set_complete_callback(
- arg,
- on_applied);
-
- ioctx->aio_write(oid, completion, bl, bl.length(), offset);
-}
-
-void RadosBackend::read(
- const string &oid,
- uint64_t offset,
- uint64_t length,
- bufferlist *bl,
- Context *on_read_complete)
-{
- librados::AioCompletion *completion = librados::Rados::aio_create_completion();
-
-
- void *arg = static_cast<void *>(new arg_type(on_read_complete, 0,
- completion));
-
- completion->set_complete_callback(
- arg,
- on_complete);
-
- ioctx->aio_read(oid, completion, bl, length, offset);
-}
+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-
-#ifndef RADOSBACKENDH
-#define RADOSBACKENDH
-
-#include "backend.h"
-#include "include/Context.h"
-#include "include/rados/librados.hpp"
-
-class RadosBackend : public Backend {
- librados::IoCtx *ioctx;
-public:
- explicit RadosBackend(
- librados::IoCtx *ioctx)
- : ioctx(ioctx) {}
- void write(
- const string &oid,
- uint64_t offset,
- const bufferlist &bl,
- Context *on_applied,
- Context *on_commit) override;
-
- void read(
- const string &oid,
- uint64_t offset,
- uint64_t length,
- bufferlist *bl,
- Context *on_complete) override;
-};
-
-#endif
+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-
-#include "rbd_backend.h"
-#include <boost/tuple/tuple.hpp>
-
-typedef boost::tuple<Context*, Context*> arg_type;
-
-void on_complete(void *completion, void *_arg) {
- arg_type *arg = static_cast<arg_type*>(_arg);
- librbd::RBD::AioCompletion *comp =
- static_cast<librbd::RBD::AioCompletion *>(completion);
- ssize_t r = comp->get_return_value();
- assert(r >= 0);
- arg->get<0>()->complete(0);
- if (arg->get<1>())
- arg->get<1>()->complete(0);
- comp->release();
- delete arg;
-}
-
-void RBDBackend::write(
- const string &oid,
- uint64_t offset,
- const bufferlist &bl,
- Context *on_write_applied,
- Context *on_commit)
-{
- bufferlist &bl_non_const = const_cast<bufferlist&>(bl);
- ceph::shared_ptr<librbd::Image> image = (*m_images)[oid];
- void *arg = static_cast<void *>(new arg_type(on_commit, on_write_applied));
- librbd::RBD::AioCompletion *completion =
- new librbd::RBD::AioCompletion(arg, on_complete);
- int r = image->aio_write(offset, (size_t) bl_non_const.length(),
- bl_non_const, completion);
- assert(r >= 0);
-}
-
-void RBDBackend::read(
- const string &oid,
- uint64_t offset,
- uint64_t length,
- bufferlist *bl,
- Context *on_read_complete)
-{
- ceph::shared_ptr<librbd::Image> image = (*m_images)[oid];
- void *arg = static_cast<void *>(new arg_type(on_read_complete, NULL));
- librbd::RBD::AioCompletion *completion =
- new librbd::RBD::AioCompletion(arg, on_complete);
- int r = image->aio_read(offset, (size_t) length, *bl, completion);
- assert(r >= 0);
-}
+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-
-#ifndef CEPH_TEST_SMALLIOBENCH_RBD_BACKEND_H
-#define CEPH_TEST_SMALLIOBENCH_RBD_BACKEND_H
-
-#include "backend.h"
-#include "include/Context.h"
-#include "include/rbd/librbd.hpp"
-
-class RBDBackend : public Backend {
- map<string, ceph::shared_ptr<librbd::Image> > *m_images;
-public:
- explicit RBDBackend(map<string, ceph::shared_ptr<librbd::Image> > *images)
- : m_images(images) {}
- void write(
- const string &oid,
- uint64_t offset,
- const bufferlist &bl,
- Context *on_applied,
- Context *on_commit) override;
-
- void read(
- const string &oid,
- uint64_t offset,
- uint64_t length,
- bufferlist *bl,
- Context *on_complete) override;
-};
-
-#endif
+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-
-#include <boost/scoped_ptr.hpp>
-#include <boost/lexical_cast.hpp>
-#include <boost/program_options/option.hpp>
-#include <boost/program_options/options_description.hpp>
-#include <boost/program_options/variables_map.hpp>
-#include <boost/program_options/cmdline.hpp>
-#include <boost/program_options/parsers.hpp>
-#include <iostream>
-#include <set>
-#include <sstream>
-#include <stdlib.h>
-#include <fstream>
-
-#include "common/Formatter.h"
-
-#include "bencher.h"
-#include "rados_backend.h"
-#include "detailed_stat_collector.h"
-#include "distribution.h"
-
-namespace po = boost::program_options;
-using namespace std;
-
-int main(int argc, char **argv)
-{
- po::options_description desc("Allowed options");
- desc.add_options()
- ("help", "produce help message")
- ("num-concurrent-ops", po::value<unsigned>()->default_value(10),
- "set number of concurrent ops")
- ("num-objects", po::value<unsigned>()->default_value(500),
- "set number of objects to use")
- ("object-size", po::value<unsigned>()->default_value(4<<20),
- "set object size")
- ("io-size", po::value<unsigned>()->default_value(4<<10),
- "set io size")
- ("write-ratio", po::value<double>()->default_value(0.75),
- "set ratio of read to write")
- ("duration", po::value<unsigned>()->default_value(0),
- "set max duration, 0 for unlimited")
- ("max-ops", po::value<unsigned>()->default_value(0),
- "set max ops, 0 for unlimited")
- ("seed", po::value<unsigned>(),
- "seed")
- ("ceph-client-id", po::value<string>()->default_value("admin"),
- "set ceph client id")
- ("pool-name", po::value<string>()->default_value("data"),
- "set pool")
- ("op-dump-file", po::value<string>()->default_value(""),
- "set file for dumping op details, omit for stderr")
- ("init-only", po::value<bool>()->default_value(false),
- "populate object set")
- ("do-not-init", po::value<bool>()->default_value(false),
- "use existing object set")
- ("use-prefix", po::value<string>()->default_value(""),
- "use previously populated prefix")
- ("offset-align", po::value<unsigned>()->default_value(4096),
- "align offset by")
- ("sequential", po::value<bool>()->default_value(false),
- "use sequential access pattern")
- ("disable-detailed-ops", po::value<bool>()->default_value(false),
- "don't dump per op stats")
- ;
-
- po::variables_map vm;
- po::store(po::parse_command_line(argc, argv, desc), vm);
- po::notify(vm);
-
- if (vm.count("help")) {
- cout << desc << std::endl;
- return 1;
- }
-
- if (vm["do-not-init"].as<bool>() && !vm["use-prefix"].as<string>().size()) {
- cout << "Must supply prefix if do-not-init is specified" << std::endl;
- cout << desc << std::endl;
- return 1;
- }
-
- if (vm["init-only"].as<bool>() && !vm["use-prefix"].as<string>().size()) {
- cout << "Must supply prefix for init-only" << std::endl;
- cout << desc << std::endl;
- return 1;
- }
-
- string prefix;
- if (vm["use-prefix"].as<string>().size()) {
- prefix = vm["use-prefix"].as<string>();
- } else {
- char hostname_cstr[100];
- gethostname(hostname_cstr, 100);
- stringstream hostpid;
- hostpid << hostname_cstr << getpid() << "-";
- prefix = hostpid.str();
- }
-
- set<string> objects;
- for (unsigned i = 0; i < vm["num-objects"].as<unsigned>();
- ++i) {
- stringstream name;
- name << prefix << "-object_" << i;
- objects.insert(name.str());
- }
-
- rngen_t rng;
- if (vm.count("seed"))
- rng = rngen_t(vm["seed"].as<unsigned>());
-
- set<pair<double, Bencher::OpType> > ops;
- ops.insert(make_pair(vm["write-ratio"].as<double>(), Bencher::WRITE));
- ops.insert(make_pair(1-vm["write-ratio"].as<double>(), Bencher::READ));
-
- librados::Rados rados;
- librados::IoCtx ioctx;
- int r = rados.init(vm["ceph-client-id"].as<string>().c_str());
- if (r < 0) {
- cerr << "error in init r=" << r << std::endl;
- return -r;
- }
- r = rados.conf_read_file(NULL);
- if (r < 0) {
- cerr << "error in conf_read_file r=" << r << std::endl;
- return -r;
- }
- r = rados.conf_parse_env(NULL);
- if (r < 0) {
- cerr << "error in conf_parse_env r=" << r << std::endl;
- return -r;
- }
- r = rados.connect();
- if (r < 0) {
- cerr << "error in connect r=" << r << std::endl;
- return -r;
- }
- r = rados.ioctx_create(vm["pool-name"].as<string>().c_str(), ioctx);
- if (r < 0) {
- cerr << "error in ioctx_create r=" << r << std::endl;
- return -r;
- }
-
- ostream *detailed_ops = 0;
- ofstream myfile;
- if (vm["disable-detailed-ops"].as<bool>()) {
- detailed_ops = 0;
- } else if (vm["op-dump-file"].as<string>().size()) {
- myfile.open(vm["op-dump-file"].as<string>().c_str());
- detailed_ops = &myfile;
- } else {
- detailed_ops = &cerr;
- }
-
- Distribution<
- boost::tuple<string, uint64_t, uint64_t, Bencher::OpType> > *gen = 0;
- if (vm["sequential"].as<bool>()) {
- std::cout << "Using Sequential generator" << std::endl;
- gen = new SequentialLoad(
- objects,
- vm["object-size"].as<unsigned>(),
- vm["io-size"].as<unsigned>(),
- new WeightedDist<Bencher::OpType>(rng, ops)
- );
- } else {
- std::cout << "Using random generator" << std::endl;
- gen = new FourTupleDist<string, uint64_t, uint64_t, Bencher::OpType>(
- new RandomDist<string>(rng, objects),
- new Align(
- new UniformRandom(
- rng,
- 0,
- vm["object-size"].as<unsigned>() - vm["io-size"].as<unsigned>()),
- vm["offset-align"].as<unsigned>()
- ),
- new Uniform(vm["io-size"].as<unsigned>()),
- new WeightedDist<Bencher::OpType>(rng, ops)
- );
- }
-
- Bencher bencher(
- gen,
- new DetailedStatCollector(1, new JSONFormatter, detailed_ops, &cout),
- new RadosBackend(&ioctx),
- vm["num-concurrent-ops"].as<unsigned>(),
- vm["duration"].as<unsigned>(),
- vm["max-ops"].as<unsigned>());
-
- if (!vm["do-not-init"].as<bool>()) {
- bencher.init(objects, vm["object-size"].as<unsigned>(), &std::cout);
- cout << "Created objects..." << std::endl;
- } else {
- cout << "Not initing objects..." << std::endl;
- }
-
- if (!vm["init-only"].as<bool>()) {
- bencher.run_bench();
- } else {
- cout << "init-only" << std::endl;
- }
-
- rados.shutdown();
- if (vm["op-dump-file"].as<string>().size()) {
- myfile.close();
- }
- return 0;
-}
+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-
-#include "acconfig.h"
-
-#include <boost/scoped_ptr.hpp>
-#include <boost/lexical_cast.hpp>
-#include <boost/program_options/option.hpp>
-#include <boost/program_options/options_description.hpp>
-#include <boost/program_options/variables_map.hpp>
-#include <boost/program_options/cmdline.hpp>
-#include <boost/program_options/parsers.hpp>
-#include <iostream>
-#include <set>
-#include <sstream>
-#include <stdlib.h>
-#include <fstream>
-
-#include "common/Formatter.h"
-
-#include "bencher.h"
-#include "rados_backend.h"
-#include "detailed_stat_collector.h"
-#include "distribution.h"
-#include "global/global_init.h"
-#include "os/ObjectStore.h"
-#include "dumb_backend.h"
-
-namespace po = boost::program_options;
-using namespace std;
-
-int main(int argc, char **argv)
-{
- po::options_description desc("Allowed options");
- desc.add_options()
- ("help", "produce help message")
- ("num-concurrent-ops", po::value<unsigned>()->default_value(10),
- "set number of concurrent ops")
- ("num-objects", po::value<unsigned>()->default_value(500),
- "set number of objects to use")
- ("object-size", po::value<unsigned>()->default_value(4<<20),
- "set object size")
- ("io-size", po::value<unsigned>()->default_value(4<<10),
- "set io size")
- ("write-ratio", po::value<double>()->default_value(0.75),
- "set ratio of read to write")
- ("duration", po::value<unsigned>()->default_value(0),
- "set max duration, 0 for unlimited")
- ("max-ops", po::value<unsigned>()->default_value(0),
- "set max ops, 0 for unlimited")
- ("seed", po::value<unsigned>(),
- "seed")
- ("num-colls", po::value<unsigned>()->default_value(20),
- "number of collections")
- ("op-dump-file", po::value<string>()->default_value(""),
- "set file for dumping op details, omit for stderr")
- ("filestore-path", po::value<string>(),
- "path to filestore directory, mandatory")
- ("offset-align", po::value<unsigned>()->default_value(4096),
- "align offset by")
- ("fsync", po::value<bool>()->default_value(false),
- "fsync after each write")
- ("sync-file-range", po::value<bool>()->default_value(false),
- "sync-file-range after each write")
- ("fadvise", po::value<bool>()->default_value(false),
- "fadvise after each write")
- ("sync-interval", po::value<unsigned>()->default_value(30),
- "frequency to sync")
- ("sequential", po::value<bool>()->default_value(false),
- "use sequential access pattern")
- ("disable-detailed-ops", po::value<bool>()->default_value(false),
- "don't dump per op stats")
- ;
-
- vector<string> ceph_option_strings;
- po::variables_map vm;
- try {
- po::parsed_options parsed =
- po::command_line_parser(argc, argv).options(desc).allow_unregistered().run();
- po::store(
- parsed,
- vm);
- po::notify(vm);
-
- ceph_option_strings = po::collect_unrecognized(parsed.options,
- po::include_positional);
- } catch(po::error &e) {
- std::cerr << e.what() << std::endl;
- return 1;
- }
- vector<const char *> ceph_options, def_args;
- ceph_options.reserve(ceph_option_strings.size());
- for (vector<string>::iterator i = ceph_option_strings.begin();
- i != ceph_option_strings.end();
- ++i) {
- ceph_options.push_back(i->c_str());
- }
-
- auto cct = global_init(
- &def_args, ceph_options, CEPH_ENTITY_TYPE_CLIENT,
- CODE_ENVIRONMENT_UTILITY,
- CINIT_FLAG_NO_DEFAULT_CONFIG_FILE);
- common_init_finish(g_ceph_context);
- g_ceph_context->_conf->apply_changes(NULL);
-
- if (!vm.count("filestore-path")) {
- cout << "Must provide filestore-path" << std::endl
- << desc << std::endl;
- return 1;
- }
-
- if (vm.count("help")) {
- cout << desc << std::endl;
- return 1;
- }
-
- rngen_t rng;
- if (vm.count("seed"))
- rng = rngen_t(vm["seed"].as<unsigned>());
-
- set<pair<double, Bencher::OpType> > ops;
- ops.insert(make_pair(vm["write-ratio"].as<double>(), Bencher::WRITE));
- ops.insert(make_pair(1-vm["write-ratio"].as<double>(), Bencher::READ));
-
- cout << "Creating objects.." << std::endl;
- bufferlist bl;
- for (uint64_t i = 0; i < vm["object-size"].as<unsigned>(); ++i) {
- bl.append(0);
- }
- set<string> objects;
-
- for (uint64_t num = 0; num < vm["num-objects"].as<unsigned>(); ++num) {
- unsigned col_num = num % vm["num-colls"].as<unsigned>();
- stringstream coll, obj;
- coll << "collection_" << col_num;
- obj << "obj_" << num;
- if (num == col_num) {
- std::cout << "collection " << coll.str() << std::endl;
- string coll_str(
- vm["filestore-path"].as<string>() + string("/") + coll.str());
- int r = ::mkdir(
- coll_str.c_str(),
- 0777);
- if (r < 0) {
- std::cerr << "Error " << errno << " creating collection" << std::endl;
- return 1;
- }
- }
- objects.insert(coll.str() + "/" + obj.str());
- }
- string meta_str(vm["filestore-path"].as<string>() + string("/meta"));
- int r = ::mkdir(
- meta_str.c_str(),
- 0777);
- if (r < 0) {
- std::cerr << "Error " << errno << " creating collection" << std::endl;
- return 1;
- }
- r = ::open(meta_str.c_str(), 0);
- if (r < 0) {
- std::cerr << "Error " << errno << " opening meta" << std::endl;
- return 1;
- }
- int sync_fd = r;
-
- ostream *detailed_ops = 0;
- ofstream myfile;
- if (vm["disable-detailed-ops"].as<bool>()) {
- detailed_ops = 0;
- } else if (vm["op-dump-file"].as<string>().size()) {
- myfile.open(vm["op-dump-file"].as<string>().c_str());
- detailed_ops = &myfile;
- } else {
- detailed_ops = &cerr;
- }
-
- Distribution<
- boost::tuple<string, uint64_t, uint64_t, Bencher::OpType> > *gen = 0;
- if (vm["sequential"].as<bool>()) {
- std::cout << "Using Sequential generator" << std::endl;
- gen = new SequentialLoad(
- objects,
- vm["object-size"].as<unsigned>(),
- vm["io-size"].as<unsigned>(),
- new WeightedDist<Bencher::OpType>(rng, ops)
- );
- } else {
- std::cout << "Using random generator" << std::endl;
- gen = new FourTupleDist<string, uint64_t, uint64_t, Bencher::OpType>(
- new RandomDist<string>(rng, objects),
- new Align(
- new UniformRandom(
- rng,
- 0,
- vm["object-size"].as<unsigned>() - vm["io-size"].as<unsigned>()),
- vm["offset-align"].as<unsigned>()
- ),
- new Uniform(vm["io-size"].as<unsigned>()),
- new WeightedDist<Bencher::OpType>(rng, ops)
- );
- }
-
-#ifndef HAVE_SYNC_FILE_RANGE
- if (vm["sync-file-range"].as<bool>())
- std::cerr << "Warning: sync_file_range(2) not supported!" << std::endl;
-#endif
-
-#ifndef HAVE_POSIX_FADVISE
- if (vm["fadvise"].as<bool>())
- std::cerr << "Warning: posix_fadvise(2) not supported!" << std::endl;
-#endif
-
- Bencher bencher(
- gen,
- new DetailedStatCollector(1, new JSONFormatter, detailed_ops, &cout),
- new DumbBackend(
- vm["filestore-path"].as<string>(),
- vm["fsync"].as<bool>(),
- vm["sync-file-range"].as<bool>(),
- vm["fadvise"].as<bool>(),
- vm["sync-interval"].as<unsigned>(),
- sync_fd,
- 10,
- g_ceph_context),
- vm["num-concurrent-ops"].as<unsigned>(),
- vm["duration"].as<unsigned>(),
- vm["max-ops"].as<unsigned>());
-
- bencher.init(objects, vm["object-size"].as<unsigned>(), &std::cout);
- cout << "Created objects..." << std::endl;
-
- bencher.run_bench();
-
- if (vm["op-dump-file"].as<string>().size()) {
- myfile.close();
- }
- return 0;
-}
+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-
-#include <boost/scoped_ptr.hpp>
-#include <boost/lexical_cast.hpp>
-#include <boost/program_options/option.hpp>
-#include <boost/program_options/options_description.hpp>
-#include <boost/program_options/variables_map.hpp>
-#include <boost/program_options/cmdline.hpp>
-#include <boost/program_options/parsers.hpp>
-#include <iostream>
-#include <set>
-#include <sstream>
-#include <stdlib.h>
-#include <fstream>
-
-#include "common/Formatter.h"
-
-#include "bencher.h"
-#include "rados_backend.h"
-#include "detailed_stat_collector.h"
-#include "distribution.h"
-#include "global/global_init.h"
-#include "os/filestore/FileStore.h"
-#include "testfilestore_backend.h"
-#include "common/perf_counters.h"
-
-namespace po = boost::program_options;
-using namespace std;
-
-struct MorePrinting : public DetailedStatCollector::AdditionalPrinting {
- CephContext *cct;
- explicit MorePrinting(CephContext *cct) : cct(cct) {}
- void operator()(std::ostream *out) override {
- bufferlist bl;
- Formatter *f = Formatter::create("json-pretty");
- cct->get_perfcounters_collection()->dump_formatted(f, 0);
- f->flush(bl);
- delete f;
- bl.append('\0');
- *out << bl.c_str() << std::endl;
- }
-};
-
-int main(int argc, char **argv)
-{
- po::options_description desc("Allowed options");
- desc.add_options()
- ("help", "produce help message")
- ("num-concurrent-ops", po::value<unsigned>()->default_value(10),
- "set number of concurrent ops")
- ("num-objects", po::value<unsigned>()->default_value(500),
- "set number of objects to use")
- ("object-size", po::value<unsigned>()->default_value(4<<20),
- "set object size")
- ("io-size", po::value<unsigned>()->default_value(4<<10),
- "set io size")
- ("write-ratio", po::value<double>()->default_value(0.75),
- "set ratio of read to write")
- ("duration", po::value<unsigned>()->default_value(0),
- "set max duration, 0 for unlimited")
- ("max-ops", po::value<unsigned>()->default_value(0),
- "set max ops, 0 for unlimited")
- ("seed", po::value<unsigned>(),
- "seed")
- ("num-colls", po::value<unsigned>()->default_value(20),
- "number of collections")
- ("op-dump-file", po::value<string>()->default_value(""),
- "set file for dumping op details, omit for stderr")
- ("filestore-path", po::value<string>(),
- "path to filestore directory, mandatory")
- ("journal-path", po::value<string>(),
- "path to journal, mandatory")
- ("offset-align", po::value<unsigned>()->default_value(4096),
- "align offset by")
- ("write-infos", po::value<bool>()->default_value(false),
- "write info objects with main writes")
- ("sequential", po::value<bool>()->default_value(false),
- "do sequential writes like rbd")
- ("disable-detailed-ops", po::value<bool>()->default_value(false),
- "don't dump per op stats")
- ("num-writers", po::value<unsigned>()->default_value(1),
- "num write threads")
- ;
-
- vector<string> ceph_option_strings;
- po::variables_map vm;
- try {
- po::parsed_options parsed =
- po::command_line_parser(argc, argv).options(desc).allow_unregistered().run();
- po::store(
- parsed,
- vm);
- po::notify(vm);
-
- ceph_option_strings = po::collect_unrecognized(parsed.options,
- po::include_positional);
- } catch(po::error &e) {
- std::cerr << e.what() << std::endl;
- return 1;
- }
- vector<const char *> ceph_options, def_args;
- ceph_options.reserve(ceph_option_strings.size());
- for (vector<string>::iterator i = ceph_option_strings.begin();
- i != ceph_option_strings.end();
- ++i) {
- ceph_options.push_back(i->c_str());
- }
-
- auto cct = global_init(
- &def_args, ceph_options, CEPH_ENTITY_TYPE_CLIENT,
- CODE_ENVIRONMENT_UTILITY,
- CINIT_FLAG_NO_DEFAULT_CONFIG_FILE);
- common_init_finish(g_ceph_context);
- g_ceph_context->_conf->apply_changes(NULL);
-
- if (!vm.count("filestore-path") || !vm.count("journal-path")) {
- cout << "Must provide filestore-path and journal-path" << std::endl
- << desc << std::endl;
- return 1;
- }
-
- if (vm.count("help")) {
- cout << desc << std::endl;
- return 1;
- }
-
- rngen_t rng;
- if (vm.count("seed"))
- rng = rngen_t(vm["seed"].as<unsigned>());
-
- set<pair<double, Bencher::OpType> > ops;
- ops.insert(make_pair(vm["write-ratio"].as<double>(), Bencher::WRITE));
- ops.insert(make_pair(1-vm["write-ratio"].as<double>(), Bencher::READ));
-
- FileStore fs(g_ceph_context, vm["filestore-path"].as<string>(),
- vm["journal-path"].as<string>());
- ObjectStore::Sequencer osr(__func__);
-
- if (fs.mkfs() < 0) {
- cout << "mkfs failed" << std::endl;
- return 1;
- }
-
- if (fs.mount() < 0) {
- cout << "mount failed" << std::endl;
- return 1;
- }
-
- ostream *detailed_ops = 0;
- ofstream myfile;
- if (vm["disable-detailed-ops"].as<bool>()) {
- detailed_ops = 0;
- } else if (vm["op-dump-file"].as<string>().size()) {
- myfile.open(vm["op-dump-file"].as<string>().c_str());
- detailed_ops = &myfile;
- } else {
- detailed_ops = &cerr;
- }
-
- ceph::shared_ptr<StatCollector> col(
- new DetailedStatCollector(
- 1, new JSONFormatter, detailed_ops, &cout,
- new MorePrinting(g_ceph_context)));
-
- cout << "Creating objects.." << std::endl;
- bufferlist bl;
- for (uint64_t i = 0; i < vm["object-size"].as<unsigned>(); ++i) {
- bl.append(0);
- }
-
- for (uint64_t num = 0; num < vm["num-colls"].as<unsigned>(); ++num) {
- spg_t pgid(pg_t(num, 0), shard_id_t::NO_SHARD);
- std::cout << "collection " << pgid << std::endl;
- ObjectStore::Transaction t;
- t.create_collection(coll_t(pgid), 0);
- fs.apply_transaction(&osr, std::move(t));
- }
- {
- ObjectStore::Transaction t;
- t.create_collection(coll_t(), 0);
- fs.apply_transaction(&osr, std::move(t));
- }
-
- vector<ceph::shared_ptr<Bencher> > benchers(
- vm["num-writers"].as<unsigned>());
- for (vector<ceph::shared_ptr<Bencher> >::iterator i = benchers.begin();
- i != benchers.end();
- ++i) {
- set<string> objects;
- for (uint64_t num = 0; num < vm["num-objects"].as<unsigned>(); ++num) {
- unsigned col_num = num % vm["num-colls"].as<unsigned>();
- spg_t pgid(pg_t(col_num, 0), shard_id_t::NO_SHARD);
- stringstream obj;
- obj << "obj_" << num << "_bencher_" << (i - benchers.begin());
- objects.insert(coll_t(pgid).to_str() + string("/") + obj.str());
- }
- Distribution<
- boost::tuple<string, uint64_t, uint64_t, Bencher::OpType> > *gen = 0;
- if (vm["sequential"].as<bool>()) {
- std::cout << "Using Sequential generator" << std::endl;
- gen = new SequentialLoad(
- objects,
- vm["object-size"].as<unsigned>(),
- vm["io-size"].as<unsigned>(),
- new WeightedDist<Bencher::OpType>(rng, ops)
- );
- } else {
- std::cout << "Using random generator" << std::endl;
- gen = new FourTupleDist<string, uint64_t, uint64_t, Bencher::OpType>(
- new RandomDist<string>(rng, objects),
- new Align(
- new UniformRandom(
- rng,
- 0,
- vm["object-size"].as<unsigned>() - vm["io-size"].as<unsigned>()),
- vm["offset-align"].as<unsigned>()
- ),
- new Uniform(vm["io-size"].as<unsigned>()),
- new WeightedDist<Bencher::OpType>(rng, ops)
- );
- }
-
- Bencher *bencher = new Bencher(
- gen,
- col,
- new TestFileStoreBackend(&fs, vm["write-infos"].as<bool>()),
- vm["num-concurrent-ops"].as<unsigned>(),
- vm["duration"].as<unsigned>(),
- vm["max-ops"].as<unsigned>());
-
- bencher->init(objects, vm["object-size"].as<unsigned>(), &std::cout);
- cout << "Created objects..." << std::endl;
- (*i).reset(bencher);
- }
-
- for (vector<ceph::shared_ptr<Bencher> >::iterator i = benchers.begin();
- i != benchers.end();
- ++i) {
- (*i)->create("bencher");
- }
- for (vector<ceph::shared_ptr<Bencher> >::iterator i = benchers.begin();
- i != benchers.end();
- ++i) {
- (*i)->join();
- }
-
- fs.umount();
- if (vm["op-dump-file"].as<string>().size()) {
- myfile.close();
- }
- return 0;
-}
+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-
-#include <boost/lexical_cast.hpp>
-#include <boost/program_options/option.hpp>
-#include <boost/program_options/options_description.hpp>
-#include <boost/program_options/variables_map.hpp>
-#include <boost/program_options/cmdline.hpp>
-#include <boost/program_options/parsers.hpp>
-#include <iostream>
-#include <set>
-#include <sstream>
-#include <stdlib.h>
-#include <fstream>
-
-#include "common/Formatter.h"
-
-#include "bencher.h"
-#include "rbd_backend.h"
-#include "detailed_stat_collector.h"
-#include "distribution.h"
-
-namespace po = boost::program_options;
-using namespace std;
-
-int main(int argc, char **argv)
-{
- po::options_description desc("Allowed options");
- desc.add_options()
- ("help", "produce help message")
- ("num-concurrent-ops", po::value<unsigned>()->default_value(10),
- "set number of concurrent ops")
- ("num-images", po::value<unsigned>()->default_value(2),
- "set number of rbd images to use")
- ("image-size", po::value<unsigned>()->default_value(4096),
- "set image size in megabytes")
- ("order", po::value<unsigned>()->default_value(22),
- "set log_2(object size)")
- ("io-size", po::value<unsigned>()->default_value(4<<10),
- "set io size")
- ("write-ratio", po::value<double>()->default_value(0.25),
- "set ratio of read to write")
- ("duration", po::value<unsigned>()->default_value(0),
- "set max duration, 0 for unlimited")
- ("max-ops", po::value<unsigned>()->default_value(0),
- "set max ops, 0 for unlimited")
- ("seed", po::value<unsigned>(),
- "seed")
- ("ceph-client-id", po::value<string>()->default_value("admin"),
- "set ceph client id")
- ("pool-name", po::value<string>()->default_value("data"),
- "set pool")
- ("op-dump-file", po::value<string>()->default_value(""),
- "set file for dumping op details, omit for stderr")
- ("offset-align", po::value<unsigned>()->default_value(4096),
- "align offset by")
- ("sequential", po::value<bool>()->default_value(false),
- "use sequential access pattern")
- ("disable-detailed-ops", po::value<bool>()->default_value(false),
- "don't dump per op stats")
- ;
-
- po::variables_map vm;
- po::store(po::parse_command_line(argc, argv, desc), vm);
- po::notify(vm);
-
- if (vm.count("help")) {
- cout << desc << std::endl;
- return 1;
- }
-
- string prefix;
- char hostname_cstr[100];
- gethostname(hostname_cstr, 100);
- stringstream hostpid;
- hostpid << hostname_cstr << getpid() << "-";
- prefix = hostpid.str();
-
- set<string> image_names;
- for (unsigned i = 0; i < vm["num-images"].as<unsigned>();
- ++i) {
- stringstream name;
- name << prefix << "-image_" << i;
- image_names.insert(name.str());
- }
-
- rngen_t rng;
- if (vm.count("seed"))
- rng = rngen_t(vm["seed"].as<unsigned>());
-
- set<pair<double, Bencher::OpType> > ops;
- ops.insert(make_pair(vm["write-ratio"].as<double>(), Bencher::WRITE));
- ops.insert(make_pair(1-vm["write-ratio"].as<double>(), Bencher::READ));
-
- librados::Rados rados;
- librados::IoCtx ioctx;
- int r = rados.init(vm["ceph-client-id"].as<string>().c_str());
- if (r < 0) {
- cerr << "error in init r=" << r << std::endl;
- return -r;
- }
- r = rados.conf_read_file(NULL);
- if (r < 0) {
- cerr << "error in conf_read_file r=" << r << std::endl;
- return -r;
- }
- r = rados.conf_parse_env(NULL);
- if (r < 0) {
- cerr << "error in conf_parse_env r=" << r << std::endl;
- return -r;
- }
- r = rados.connect();
- if (r < 0) {
- cerr << "error in connect r=" << r << std::endl;
- return -r;
- }
- r = rados.ioctx_create(vm["pool-name"].as<string>().c_str(), ioctx);
- if (r < 0) {
- cerr << "error in ioctx_create r=" << r << std::endl;
- return -r;
- }
-
- ostream *detailed_ops = 0;
- ofstream myfile;
- if (vm["disable-detailed-ops"].as<bool>()) {
- detailed_ops = 0;
- } else if (vm["op-dump-file"].as<string>().size()) {
- myfile.open(vm["op-dump-file"].as<string>().c_str());
- detailed_ops = &myfile;
- } else {
- detailed_ops = &cerr;
- }
-
- librbd::RBD rbd;
- {
- map<string, ceph::shared_ptr<librbd::Image> > images;
- int order = vm["order"].as<unsigned>();
- uint64_t image_size = ((uint64_t)vm["image-size"].as<unsigned>()) << 20;
- for (set<string>::const_iterator i = image_names.begin();
- i != image_names.end(); ++i) {
- r = rbd.create(ioctx, i->c_str(), image_size, &order);
- if (r < 0) {
- cerr << "error creating image " << *i << " r=" << r << std::endl;
- return -r;
- }
- ceph::shared_ptr<librbd::Image> image(new librbd::Image());
- r = rbd.open(ioctx, *image, i->c_str());
- if (r < 0) {
- cerr << "error opening image " << *i << " r=" << r << std::endl;
- return -r;
- }
- images[*i] = image;
- }
-
- Distribution<
- boost::tuple<string, uint64_t, uint64_t, Bencher::OpType> > *gen = 0;
- if (vm["sequential"].as<bool>()) {
- std::cout << "Using Sequential generator" << std::endl;
- gen = new SequentialLoad(
- image_names,
- image_size,
- vm["io-size"].as<unsigned>(),
- new WeightedDist<Bencher::OpType>(rng, ops)
- );
- } else {
- std::cout << "Using random generator" << std::endl;
- gen = new FourTupleDist<string, uint64_t, uint64_t, Bencher::OpType>(
- new RandomDist<string>(rng, image_names),
- new Align(
- new UniformRandom(
- rng,
- 0,
- image_size - vm["io-size"].as<unsigned>()),
- vm["offset-align"].as<unsigned>()
- ),
- new Uniform(vm["io-size"].as<unsigned>()),
- new WeightedDist<Bencher::OpType>(rng, ops)
- );
- }
-
- Bencher bencher(
- gen,
- new DetailedStatCollector(1, new JSONFormatter, detailed_ops, &cout),
- new RBDBackend(&images),
- vm["num-concurrent-ops"].as<unsigned>(),
- vm["duration"].as<unsigned>(),
- vm["max-ops"].as<unsigned>());
-
- bencher.run_bench();
- }
-
- for (set<string>::const_iterator i = image_names.begin();
- i != image_names.end(); ++i) {
- rbd.remove(ioctx, i->c_str());
- }
- rados.shutdown();
- if (vm["op-dump-file"].as<string>().size()) {
- myfile.close();
- }
- return 0;
-}
+++ /dev/null
-import json
-import sys
-from pylab import hist
-import gzip
-import io
-
-def get_next_line(line, output):
- val = json.loads(line)
- if val['type'] not in output:
- output[val['type']] = {}
- for (name, value) in val.items():
- if name == "type":
- continue
- if name == "seq":
- continue
- if name not in output[val['type']]:
- output[val['type']][name] = []
- output[val['type']][name] += [float(value)]
-
-def wrapgz(gfilename):
- gfile = gzip.open(gfilename, 'rb')
- if sys.version_info[0] >= 3:
- gfile = io.TextIOWrapper(gfile)
- return gfile
-
-def read_all_input(filename):
- cur = {}
- openfn = open
- if ".gz" in filename:
- openfn = wrapgz
- with openfn(filename) as fh:
- for line in fh:
- get_next_line(line, cur)
- return cur
-
-def write_committed_latency(out, bins, **kwargs):
- hist(out['write_committed']['latency'], bins, **kwargs)
-
-def read_latency(out):
- hist(out['read']['latency'], 100)
-
-def com(out): return out['write_committed']['latency']
-def app(out): return out['write_applied']['latency']
+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-
-#ifndef STATCOLLECTORH
-#define STATCOLLECTORH
-
-#include <stdint.h>
-
-class StatCollector {
-public:
- virtual uint64_t next_seq() = 0;
- virtual void start_write(uint64_t seq, uint64_t size) = 0;
- virtual void start_read(uint64_t seq, uint64_t size) = 0;
- virtual void write_applied(uint64_t seq) = 0;
- virtual void write_committed(uint64_t seq) = 0;
- virtual void read_complete(uint64_t seq) = 0;
- virtual ~StatCollector() {}
-};
-
-#endif
+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-
-#include "testfilestore_backend.h"
-#include "global/global_init.h"
-#include "os/ObjectStore.h"
-
-
-TestFileStoreBackend::TestFileStoreBackend(
- ObjectStore *os, bool write_infos)
- : os(os), finisher(g_ceph_context), write_infos(write_infos)
-{
- finisher.start();
-}
-
-void TestFileStoreBackend::write(
- const string &oid,
- uint64_t offset,
- const bufferlist &bl,
- Context *on_applied,
- Context *on_commit)
-{
- ObjectStore::Transaction *t = new ObjectStore::Transaction;
- size_t sep = oid.find("/");
- assert(sep != string::npos);
- assert(sep + 1 < oid.size());
- coll_t c;
- bool valid_coll = c.parse(oid.substr(0, sep));
- assert(valid_coll);
- string coll_str = c.to_str();
-
- if (!osrs.count(coll_str))
- osrs.insert(make_pair(coll_str, ObjectStore::Sequencer(coll_str)));
- ObjectStore::Sequencer *osr = &(osrs.find(coll_str)->second);
-
- hobject_t h(sobject_t(oid.substr(sep+1), 0));
- h.pool = 0;
- t->write(c, ghobject_t(h), offset, bl.length(), bl);
-
- if (write_infos) {
- bufferlist bl2;
- for (uint64_t j = 0; j < 128; ++j) bl2.append(0);
- coll_t meta;
- hobject_t info(sobject_t(string("info_")+coll_str, 0));
- t->write(meta, ghobject_t(info), 0, bl2.length(), bl2);
- }
-
- os->queue_transaction(
- osr,
- std::move(*t),
- on_applied,
- on_commit);
- delete t;
-}
-
-void TestFileStoreBackend::read(
- const string &oid,
- uint64_t offset,
- uint64_t length,
- bufferlist *bl,
- Context *on_complete)
-{
- size_t sep = oid.find("/");
- assert(sep != string::npos);
- assert(sep + 1 < oid.size());
- coll_t c;
- bool valid_coll = c.parse(oid.substr(0, sep));
- assert(valid_coll);
- hobject_t h(sobject_t(oid.substr(sep+1), 0));
- h.pool = 0;
- os->read(c, ghobject_t(h), offset, length, *bl);
- finisher.queue(on_complete);
-}
+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-
-#ifndef TESTFILESTOREBACKENDH
-#define TESTFILESTOREBACKENDH
-
-#include "common/Finisher.h"
-#include "backend.h"
-#include "include/Context.h"
-#include "os/ObjectStore.h"
-
-class TestFileStoreBackend : public Backend {
- ObjectStore *os;
- Finisher finisher;
- map<string, ObjectStore::Sequencer> osrs;
- const bool write_infos;
-
-public:
- TestFileStoreBackend(ObjectStore *os, bool write_infos);
- ~TestFileStoreBackend() override {
- finisher.stop();
- }
- void write(
- const string &oid,
- uint64_t offset,
- const bufferlist &bl,
- Context *on_applied,
- Context *on_commit) override;
-
- void read(
- const string &oid,
- uint64_t offset,
- uint64_t length,
- bufferlist *bl,
- Context *on_complete) override;
-};
-
-#endif
+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-
-#include <boost/scoped_ptr.hpp>
-#include <boost/lexical_cast.hpp>
-#include <boost/program_options/option.hpp>
-#include <boost/program_options/options_description.hpp>
-#include <boost/program_options/variables_map.hpp>
-#include <boost/program_options/cmdline.hpp>
-#include <boost/program_options/parsers.hpp>
-#include <iostream>
-#include <set>
-#include <sstream>
-#include <stdlib.h>
-#include <fstream>
-
-#include "common/Formatter.h"
-
-#include "bencher.h"
-#include "rados_backend.h"
-#include "detailed_stat_collector.h"
-#include "distribution.h"
-#include "global/global_init.h"
-#include "common/WorkQueue.h"
-#include "common/Semaphore.h"
-#include "common/Finisher.h"
-
-namespace po = boost::program_options;
-using namespace std;
-class Queueable {
-public:
- virtual void queue(unsigned *) = 0;
- virtual void start() = 0;
- virtual void stop() = 0;
- virtual ~Queueable() {};
-};
-class Base : public Queueable {
- DetailedStatCollector *col;
- Semaphore *sem;
-public:
- Base(DetailedStatCollector *col,
- Semaphore *sem) : col(col), sem(sem) {}
- void queue(unsigned *item) override {
- col->read_complete(*item);
- sem->Put();
- delete item;
- }
- void start() override {}
- void stop() override {}
-};
-class WQWrapper : public Queueable {
- boost::scoped_ptr<ThreadPool::WorkQueue<unsigned> > wq;
- boost::scoped_ptr<ThreadPool> tp;
-public:
- WQWrapper(ThreadPool::WorkQueue<unsigned> *wq, ThreadPool *tp):
- wq(wq), tp(tp) {}
- void queue(unsigned *item) override { wq->queue(item); }
- void start() override { tp->start(); }
- void stop() override { tp->stop(); }
-};
-class FinisherWrapper : public Queueable {
- class CB : public Context {
- Queueable *next;
- unsigned *item;
- public:
- CB(Queueable *next, unsigned *item) : next(next), item(item) {}
- void finish(int) override {
- next->queue(item);
- }
- };
- Finisher f;
- Queueable *next;
-public:
- FinisherWrapper(CephContext *cct, Queueable *next) :
- f(cct), next(next) {}
- void queue(unsigned *item) override {
- f.queue(new CB(next, item));
- }
- void start() override { f.start(); }
- void stop() override { f.stop(); }
-};
-class PassAlong : public ThreadPool::WorkQueue<unsigned> {
- Queueable *next;
- list<unsigned*> q;
- bool _enqueue(unsigned *item) override {
- q.push_back(item);
- return true;
- }
- void _dequeue(unsigned *item) override { ceph_abort(); }
- unsigned *_dequeue() override {
- if (q.empty())
- return 0;
- unsigned *val = q.front();
- q.pop_front();
- return val;
- }
- void _process(unsigned *item, ThreadPool::TPHandle &) override {
- next->queue(item);
- }
- void _clear() override { q.clear(); }
- bool _empty() override { return q.empty(); }
-public:
- PassAlong(ThreadPool *tp, Queueable *_next) :
- ThreadPool::WorkQueue<unsigned>("TestQueue", 100, 100, tp), next(_next) {}
-};
-
-int main(int argc, char **argv)
-{
- po::options_description desc("Allowed options");
- desc.add_options()
- ("help", "produce help message")
- ("num-threads", po::value<unsigned>()->default_value(10),
- "set number of threads")
- ("queue-size", po::value<unsigned>()->default_value(30),
- "queue size")
- ("num-items", po::value<unsigned>()->default_value(3000000),
- "num items")
- ("layers", po::value<string>()->default_value(""),
- "layer desc")
- ;
-
- vector<string> ceph_option_strings;
- po::variables_map vm;
- try {
- po::parsed_options parsed =
- po::command_line_parser(argc, argv).options(desc).allow_unregistered().run();
- po::store(
- parsed,
- vm);
- po::notify(vm);
-
- ceph_option_strings = po::collect_unrecognized(parsed.options,
- po::include_positional);
- } catch(po::error &e) {
- std::cerr << e.what() << std::endl;
- return 1;
- }
- vector<const char *> ceph_options, def_args;
- ceph_options.reserve(ceph_option_strings.size());
- for (vector<string>::iterator i = ceph_option_strings.begin();
- i != ceph_option_strings.end();
- ++i) {
- ceph_options.push_back(i->c_str());
- }
-
- auto cct = global_init(
- &def_args, ceph_options, CEPH_ENTITY_TYPE_CLIENT,
- CODE_ENVIRONMENT_UTILITY,
- CINIT_FLAG_NO_DEFAULT_CONFIG_FILE);
- common_init_finish(g_ceph_context);
- g_ceph_context->_conf->apply_changes(NULL);
-
- if (vm.count("help")) {
- cout << desc << std::endl;
- return 1;
- }
-
- DetailedStatCollector col(1, new JSONFormatter, 0, &cout);
- Semaphore sem;
- for (unsigned i = 0; i < vm["queue-size"].as<unsigned>(); ++i)
- sem.Put();
-
- typedef list<Queueable*> QQ;
- QQ wqs;
- wqs.push_back(
- new Base(&col, &sem));
- string layers(vm["layers"].as<string>());
- unsigned num = 0;
- for (string::reverse_iterator i = layers.rbegin();
- i != layers.rend(); ++i) {
- stringstream ss;
- ss << "Test " << num;
- if (*i == 'q') {
- ThreadPool *tp =
- new ThreadPool(
- g_ceph_context, ss.str(), "tp_test", vm["num-threads"].as<unsigned>(), 0);
- wqs.push_back(
- new WQWrapper(
- new PassAlong(tp, wqs.back()),
- tp
- ));
- } else if (*i == 'f') {
- wqs.push_back(
- new FinisherWrapper(
- g_ceph_context, wqs.back()));
- }
- ++num;
- }
-
- for (QQ::iterator i = wqs.begin();
- i != wqs.end();
- ++i) {
- (*i)->start();
- }
-
- for (uint64_t i = 0; i < vm["num-items"].as<unsigned>(); ++i) {
- sem.Get();
- unsigned *item = new unsigned(col.next_seq());
- col.start_read(*item, 1);
- wqs.back()->queue(item);
- }
-
- for (QQ::iterator i = wqs.begin();
- i != wqs.end();
- ++i) {
- (*i)->stop();
- }
- for (QQ::iterator i = wqs.begin(); i != wqs.end(); wqs.erase(i++)) {
- delete *i;
- }
- return 0;
-}