* Author: Tri Dao, daominhtri0503@gmail.com
*/
#include "common/ceph_argparse.h"
+#include "common/ceph_mutex.h"
#include "common/common_init.h"
#include "common/hobject.h"
+
#include "global/global_context.h"
#include "global/global_init.h"
+#include <gtest/gtest.h>
+
+#include "include/Context.h"
#include "include/buffer_fwd.h"
#include "os/ObjectStore.h"
#include "test/objectstore/ObjectStoreImitator.h"
#include <boost/random/uniform_int.hpp>
#include <fmt/core.h>
-#include <gtest/gtest.h>
-#include <iostream>
+#include <mutex>
#include <string>
+#define dout_context g_ceph_context
+#define dout_subsys ceph_subsys_test
+
constexpr uint64_t _1Kb = 1024;
constexpr uint64_t _1Mb = 1024 * _1Kb;
constexpr uint64_t _1Gb = 1024 * _1Mb;
+typedef boost::mt11213b gen_type;
+
static bufferlist make_bl(size_t len, char c) {
bufferlist bl;
if (len > 0) {
// --------- FragmentationSimulator ----------
class FragmentationSimulator : public ::testing::TestWithParam<std::string> {
+ // Context that takes an arbitrary callback
+ struct C_Callback : Context {
+ C_Callback(std::function<void()> cb) : cb(cb) {}
+
+ std::function<void()> cb;
+ void finish(int r) override { cb(); }
+ };
+
public:
struct WorkloadGenerator {
virtual int generate_txns(ObjectStore::CollectionHandle &ch,
- ObjectStoreImitator *os) = 0;
+ ObjectStore *os) = 0;
virtual std::string name() = 0;
+ void register_txn(ObjectStore::Transaction &t) {
+ std::unique_lock l(in_flight_lock);
+ in_flight_txns++;
+ t.register_on_commit(new C_Callback([this]() -> void {
+ std::unique_lock l(in_flight_lock);
+ if (--in_flight_txns == 0) {
+ continue_cond.notify_all();
+ }
+ }));
+ }
+
+ // wait_till_finish blocks until all in-flight txns are finished
+ void wait_till_finish() {
+ std::unique_lock l(in_flight_lock);
+ continue_cond.wait(l, [this]() -> bool { return !in_flight_txns; });
+ }
+
WorkloadGenerator() {}
virtual ~WorkloadGenerator() {}
+
+ private:
+ unsigned in_flight_txns{0};
+ ceph::condition_variable continue_cond;
+ ceph::mutex in_flight_lock =
+ ceph::make_mutex("WorkloadGenerator::in_flight_lock");
};
using WorkloadGeneratorRef = std::shared_ptr<WorkloadGenerator>;
void add_generator(WorkloadGeneratorRef gen);
- int begin_simulation_with_generators();
+ int begin_simulation_with_generators(unsigned iterations);
void init(const std::string &alloc_type, uint64_t size,
uint64_t min_alloc_size = 4096);
void FragmentationSimulator::init(const std::string &alloc_type, uint64_t size,
uint64_t min_alloc_size) {
- std::cout << std::endl;
- std::cout << "Initializing ObjectStoreImitator" << std::endl;
+ dout(0) << dendl;
+ dout(20) << "Initializing ObjectStoreImitator" << dendl;
os = new ObjectStoreImitator(g_ceph_context, "", min_alloc_size);
- std::cout << "Initializing allocator: " << alloc_type << ", size: 0x"
- << std::hex << size << std::dec << "\n"
- << std::endl;
+ dout(0) << "Initializing allocator: " << alloc_type << ", size: 0x"
+ << std::hex << size << std::dec << dendl;
os->init_alloc(alloc_type, size);
}
void FragmentationSimulator::add_generator(WorkloadGeneratorRef gen) {
- std::cout << "Generator: " << gen->name() << " added\n";
+ dout(5) << "Generator: " << gen->name() << " added" << dendl;
generators.push_back(gen);
}
-int FragmentationSimulator::begin_simulation_with_generators() {
- for (auto &g : generators) {
- ObjectStore::CollectionHandle ch =
- os->create_new_collection(coll_t::meta());
- ObjectStore::Transaction t;
- t.create_collection(ch->cid, 0);
- os->queue_transaction(ch, std::move(t));
+int FragmentationSimulator::begin_simulation_with_generators(
+ unsigned iterations) {
+ ObjectStore::CollectionHandle ch = os->create_new_collection(coll_t::meta());
+
+ ObjectStore::Transaction t;
+ t.create_collection(ch->cid, 0);
+ os->queue_transaction(ch, std::move(t));
- int r = g->generate_txns(ch, os);
+ gen_type rng(time(0));
+ boost::uniform_int<> generator_idx(0, generators.size() - 1);
+
+ while (iterations--) {
+ int r = generators[generator_idx(rng)]->generate_txns(ch, os);
if (r < 0)
return r;
}
os->print_per_object_fragmentation();
os->print_per_access_fragmentation();
os->print_allocator_profile();
+
return 0;
}
struct SimpleCWGenerator : public FragmentationSimulator::WorkloadGenerator {
std::string name() override { return "SimpleCW"; }
int generate_txns(ObjectStore::CollectionHandle &ch,
- ObjectStoreImitator *os) override {
+ ObjectStore *os) override {
std::vector<ghobject_t> objs;
for (unsigned i{0}; i < 100; ++i) {
for (unsigned i{0}; i < 100; ++i) {
ObjectStore::Transaction t1;
t1.create(ch->get_cid(), objs[i]);
+ register_txn(t1);
tls.emplace_back(std::move(t1));
ObjectStore::Transaction t2;
t2.write(ch->get_cid(), objs[i], 0, _1Mb, make_bl(_1Mb, 'c'));
+ register_txn(t2);
tls.emplace_back(std::move(t2));
}
os->queue_transactions(ch, tls);
- os->verify_objects(ch);
-
- // reapply
- os->queue_transactions(ch, tls);
- os->verify_objects(ch);
+ wait_till_finish();
tls.clear();
// Overwrite on object
}
os->queue_transactions(ch, tls);
- os->verify_objects(ch);
+ wait_till_finish();
tls.clear();
for (unsigned i{0}; i < 50; ++i) {
}
os->queue_transactions(ch, tls);
- os->verify_objects(ch);
+ wait_till_finish();
tls.clear();
return 0;
}
};
-typedef boost::mt11213b gen_type;
-
struct RandomCWGenerator : public FragmentationSimulator::WorkloadGenerator {
+ RandomCWGenerator() : WorkloadGenerator(), rng(time(0)) {}
std::string name() override { return "RandomCW"; }
int generate_txns(ObjectStore::CollectionHandle &ch,
- ObjectStoreImitator *os) override {
+ ObjectStore *os) override {
hobject_t h1;
h1.oid = fmt::format("obj1");
tls.emplace_back(std::move(t2));
os->queue_transactions(ch, tls);
- os->verify_objects(ch);
+ wait_till_finish();
- gen_type rng(time(0));
boost::uniform_int<> u_size(0, _1Mb * 4);
boost::uniform_int<> u_offset(0, _1Mb);
tls.emplace_back(std::move(t4));
os->queue_transactions(ch, tls);
- os->verify_objects(ch);
+ wait_till_finish();
bufferlist dummy;
tls.clear();
return 0;
}
+
+private:
+ gen_type rng;
};
// Testing the Imitator with multiple threads. We're mainly testing for
: public FragmentationSimulator::WorkloadGenerator {
std::string name() override { return "MultiThreadedCW"; }
int generate_txns(ObjectStore::CollectionHandle &ch,
- ObjectStoreImitator *os) override {
+ ObjectStore *os) override {
auto t1 = std::thread([&]() {
hobject_t h1;
t1.join();
t2.join();
+ wait_till_finish();
return 0;
}
};
// Replay ops from OSD on the Simulator
+// Not tested
struct OpsReplayer : public FragmentationSimulator::WorkloadGenerator {
std::string name() override { return "OpsReplayer"; }
int generate_txns(ObjectStore::CollectionHandle &ch,
- ObjectStoreImitator *os) override {
+ ObjectStore *os) override {
std::unordered_map<std::string, std::string> row;
std::vector<std::string> col_names;
std::string line, col;
// skipping over '---'
std::getline(f, line);
- std::vector<ObjectStore::Transaction> tls;
while (std::getline(f, line)) {
stream.str(line);
for (unsigned i{0}; stream >> col; i++) {
}
}
- if (op_type != "read")
- tls.emplace_back(std::move(t));
+ if (op_type != "read") {
+ os->queue_transaction(ch, std::move(t));
+ wait_till_finish();
+ }
}
- os->queue_transactions(ch, tls);
-
return 0;
}
TEST_P(FragmentationSimulator, SimpleCWGenerator) {
init(GetParam(), _1Gb);
add_generator(std::make_shared<SimpleCWGenerator>());
- begin_simulation_with_generators();
+ begin_simulation_with_generators(1);
}
TEST_P(FragmentationSimulator, RandomCWGenerator) {
init(GetParam(), _1Mb * 16);
add_generator(std::make_shared<RandomCWGenerator>());
- begin_simulation_with_generators();
+ begin_simulation_with_generators(1);
}
TEST_P(FragmentationSimulator, MultiThreadedCWGenerator) {
init(GetParam(), _1Mb * 4);
add_generator(std::make_shared<MultiThreadedCWGenerator>());
- begin_simulation_with_generators();
+ begin_simulation_with_generators(1);
}
// ----------- main -----------
INSTANTIATE_TEST_SUITE_P(Allocator, FragmentationSimulator,
- ::testing::Values("stupid", "bitmap", "avl", "btree"));
+ ::testing::Values("stupid", "bitmap", "avl", "btree",
+ "hybrid"));
int main(int argc, char **argv) {
auto args = argv_to_vec(argc, argv);
*/
#include "test/objectstore/ObjectStoreImitator.h"
#include "common/Clock.h"
+#include "common/Finisher.h"
#include "common/errno.h"
#include "include/ceph_assert.h"
#include "include/intarith.h"
#include <algorithm>
#include <cmath>
-#define dout_context cct
+#define dout_context g_ceph_context
+#define dout_subsys ceph_subsys_test
#define OBJECT_MAX_SIZE 0xffffffff // 32 bits
+
// ---------- Allocator ----------
void ObjectStoreImitator::release_alloc(PExtentVector &old_extents) {
uint64_t re_add_key{0};
bluestore_pextent_t re_add;
- // std::cout << "current extents:\n";
- // for (auto &[l_off, e] : extent_map) {
- // std::cout << "l_off " << l_off << ", off " << e.offset << ", len "
- // << e.length << std::endl;
- // }
+ dout(20) << "current extents:" << dendl;
+ for (auto &[l_off, e] : extent_map) {
+ dout(20) << "l_off " << l_off << ", off " << e.offset << ", len "
+ << e.length << dendl;
+ }
- // std::cout << "wants to punch: off " << offset << ", len " << length
- // << std::endl;
+ dout(20) << "wants to punch: off " << offset << ", len " << length << dendl;
auto it = extent_map.lower_bound(offset);
if ((it == extent_map.end() || it->first > offset) &&
// diff between where we need to punch and current position
auto diff = offset - it->first;
- // std::cout << "diff " << diff << " , p_off " << it->first <<
- // std::endl;
+ dout(20) << "diff " << diff << " , p_off " << it->first << dendl;
// offset will be inside this extent
// otherwise skip over this extent and assume 'offset' has been passed
re_add.offset = it->second.offset + diff + length;
re_add.length = it->second.length - diff - length;
- // std::cout << "re_add: off " << re_add.offset << ", len "
- // << re_add.length << std::endl;
+ dout(20) << "re_add: off " << re_add.offset << ", len " << re_add.length
+ << dendl;
}
// Modify the remaining extent's length
}
old_extents = to_be_punched;
- // std::cout << "to be deleted\n";
- // for (auto e : to_be_punched) {
- // std::cout << "off " << e.offset << ", len " << e.length << std::endl;
- // }
+ dout(20) << "to be deleted:" << dendl;
+ for (auto e : to_be_punched) {
+ dout(20) << "off " << e.offset << ", len " << e.length << dendl;
+ }
}
void ObjectStoreImitator::Object::append(PExtentVector &ext, uint64_t offset) {
for (auto &e : ext) {
ceph_assert(e.length > 0);
- // std::cout << "adding off " << offset << ", len " << e.length
- // << std::endl;
+ dout(20) << "adding off " << offset << ", len " << e.length << dendl;
extent_map[offset] = e;
offset += e.length;
}
}
void ObjectStoreImitator::Object::verify_extents() {
- // std::cout << "Verifying extents:\n";
+ dout(20) << "Verifying extents:" << dendl;
uint64_t prev{0};
for (auto &[l_off, ext] : extent_map) {
- // std::cout << "logical offset: " << l_off
- // << ", extent offset: " << ext.offset
- // << ", extent length: " << ext.length << std::endl;
+ dout(20) << "logical offset: " << l_off << ", extent offset: " << ext.offset
+ << ", extent length: " << ext.length << dendl;
ceph_assert(ext.is_valid());
ceph_assert(ext.length > 0);
}
void ObjectStoreImitator::print_status() {
- std::cout << std::hex
- << "Fragmentation score: " << alloc->get_fragmentation_score()
- << " , fragmentation: " << alloc->get_fragmentation()
- << ", allocator type " << alloc->get_type() << ", capacity 0x"
- << alloc->get_capacity() << ", block size 0x"
- << alloc->get_block_size() << ", free 0x" << alloc->get_free()
- << std::dec << std::endl;
+ dout(0) << std::hex
+ << "Fragmentation score: " << alloc->get_fragmentation_score()
+ << " , fragmentation: " << alloc->get_fragmentation()
+ << ", allocator type: " << alloc->get_type() << ", capacity 0x"
+ << alloc->get_capacity() << ", block size 0x"
+ << alloc->get_block_size() << ", free 0x" << alloc->get_free()
+ << std::dec << dendl;
}
void ObjectStoreImitator::verify_objects(CollectionHandle &ch) {
double coll_total{0};
for (auto &[id, obj] : coll_ref->objects) {
double frag_score{1};
- unsigned i{1};
+ unsigned i{2};
uint64_t ext_size = 0;
PExtentVector extents;
}
coll_total += frag_score;
- std::cout << "Object: " << id.hobj.oid.name
- << ", hash: " << id.hobj.get_hash()
- << " fragmentation score: " << frag_score << std::endl;
+ dout(5) << "Object: " << id.hobj.oid.name
+ << ", hash: " << id.hobj.get_hash()
+ << " fragmentation score: " << frag_score << dendl;
}
double avg = coll_total / coll_ref->objects.size();
- std::cout << "Average obj fragmentation " << avg << std::endl;
+ dout(0) << "Collection average obj fragmentation: " << avg
+ << ", coll: " << coll_ref->get_cid().to_str() << dendl;
}
}
void ObjectStoreImitator::print_per_access_fragmentation() {
for (auto &[_, coll_ref] : coll_map) {
+ double coll_blks_read{0}, coll_jmps{0};
for (auto &[id, read_ops] : coll_ref->read_ops) {
unsigned blks{0}, jmps{0};
for (auto &op : read_ops) {
jmps += op.jmps;
}
- double avg_total_blks = (double)blks / read_ops.size();
+ double avg_blks_read = (double)blks / read_ops.size();
+ coll_blks_read += avg_blks_read;
+
double avg_jmps = (double)jmps / read_ops.size();
+ coll_jmps += avg_jmps;
+
double avg_jmps_per_blk = (double)jmps / (double)blks;
- std::cout << "Object: " << id.hobj.oid.name
- << ", average total blks read: " << avg_total_blks
- << ", average total jumps: " << avg_jmps
- << ", average jumps per block: " << avg_jmps_per_blk
- << std::endl;
+ dout(5) << "Object: " << id.hobj.oid.name
+ << ", average blks read: " << avg_blks_read
+ << ", average jumps: " << avg_jmps
+ << ", average jumps per block: " << avg_jmps_per_blk << dendl;
}
+
+ double coll_avg_blks_read = coll_blks_read / coll_ref->objects.size();
+ double coll_avg_jumps = coll_jmps / coll_ref->objects.size();
+ double coll_avg_jmps_per_blk = coll_avg_jumps / coll_avg_blks_read;
+
+ dout(0) << "Collection average total blks: " << coll_avg_blks_read
+ << ", collection average jumps: " << coll_avg_jumps
+ << ", collection average jumps per block: " << coll_avg_jmps_per_blk
+ << ", coll: " << coll_ref->get_cid().to_str() << dendl;
}
}
void ObjectStoreImitator::print_allocator_profile() {
double avg = alloc_time / alloc_ops;
- std::cout << "Total alloc ops latency: " << alloc_time
- << ", total ops: " << alloc_ops
- << ", average alloc op latency: " << avg << std::endl;
+ dout(0) << "Total alloc ops latency: " << alloc_time
+ << ", total ops: " << alloc_ops
+ << ", average alloc op latency: " << avg << dendl;
}
// ------- Transactions -------
std::vector<Transaction> &tls,
TrackedOpRef op,
ThreadPool::TPHandle *handle) {
+ std::list<Context *> on_applied, on_commit, on_applied_sync;
+ ObjectStore::Transaction::collect_contexts(tls, &on_applied, &on_commit,
+ &on_applied_sync);
+ Collection *c = static_cast<Collection *>(ch.get());
+
for (std::vector<Transaction>::iterator p = tls.begin(); p != tls.end();
++p) {
_add_transaction(&(*p));
}
- if (handle)
- handle->suspend_tp_timeout();
-
if (handle)
handle->reset_tp_timeout();
+ // Immediately complete contexts
+ for (auto c : on_applied_sync) {
+ c->complete(0);
+ }
+
+ if (!on_applied.empty()) {
+ if (c->commit_queue) {
+ c->commit_queue->queue(on_applied);
+ } else {
+ for (auto c : on_applied) {
+ c->complete(0);
+ }
+ }
+ }
+
+ if (!on_commit.empty()) {
+ for (auto c : on_commit) {
+ c->complete(0);
+ }
+ }
+
+ verify_objects(ch);
return 0;
}
}
c->read_ops[o->oid].push_back(op);
- // std::cout << "blks: " << op.blks << ", jmps: " << op.jmps
- // << ", offset: " << op.offset << ", length: " << op.length
- // << std::endl;
+ dout(20) << "blks: " << op.blks << ", jmps: " << op.jmps
+ << ", offset: " << op.offset << ", length: " << op.length << dendl;
return bl.length();
}
uint64_t offset;
uint64_t length;
unsigned blks;
- unsigned
- jmps; // # of times we have to stop iterating over continuous extents
+ unsigned jmps; // # times having to stop iterating over continuous extents
ReadOp(uint64_t offset = 0, uint64_t length = 0, unsigned blks = 0,
unsigned jmps = 0)
: offset(offset), length(length), blks(blks), jmps(jmps) {}
std::unordered_map<ghobject_t, std::vector<ReadOp>> read_ops;
ceph::shared_mutex lock = ceph::make_shared_mutex(
- "FragmentationSimulator::Collection::lock", true, false);
+ "ObjectStoreImitator::Collection::lock", true, false);
// Lock for 'objects'
ceph::recursive_mutex obj_lock = ceph::make_recursive_mutex(
- "FragmentationSimulator::Collection::obj_lock");
+ "ObjectStoreImitator::Collection::obj_lock");
bool exists;
///< rwlock to protect coll_map/new_coll_map
ceph::shared_mutex coll_lock =
- ceph::make_shared_mutex("FragmentationSimulator::coll_lock");
+ ceph::make_shared_mutex("ObjectStoreImitator::coll_lock");
std::unordered_map<coll_t, CollectionRef> coll_map;
std::unordered_map<coll_t, CollectionRef>
new_coll_map; // store collections that is opened via open_new_collection