struct config_t {
std::string type;
bool mkfs = false;
- unsigned num_collections = 128;
+ unsigned num_pgs = 128;
+ unsigned log_size = 1000;
unsigned object_size = 4<<20 /* 4MB, rbd default */;
unsigned oi_size = 1<<9 /* 512b */;
+ unsigned log_entry_size = 1<<9 /* 512b */;
+ bool prepopulate_log = false;
std::optional<std::string> path;
bool is_futurized_store() const {
return oi_size > 0;
}
+ bool log_enabled() const {
+ return log_entry_size > 0 && log_size > 0;
+ }
+
+ bool prepopulate_log_enabled() const {
+ return prepopulate_log;
+ }
+
void populate_options(
boost::program_options::options_description &desc)
{
->notifier([this](auto s) { path = s; }),
"Path to device for backend"
)
- ("num-collections",
+ ("num-pgs",
po::value<unsigned>()
- ->notifier([this](auto s) { num_collections = s; }),
- "Number of collections to use for futurized_store backends"
+ ->notifier([this](auto s) { num_pgs = s; }),
+ "Number of pgs to use for futurized_store backends"
+ )
+ ("log-size",
+ po::value<unsigned>()
+ ->notifier([this](auto s) { log_size = s; }),
+ "Number of log entries per pg to use for futurized_store backends"
+ ", 0 to disable"
+ )
+ ("log-entry-size",
+ po::value<unsigned>()
+ ->notifier([this](auto s) { log_entry_size = s; }),
+ "Size of each log entry per pg to use for futurized_store backends"
+ ", 0 to disable"
+ )
+ ("prepopulate-log",
+ po::value<bool>()
+ ->notifier([this](auto s) { prepopulate_log = s; }),
+ "Prepopulate log on mount"
+ )
("object-info-size",
po::value<unsigned>()
->notifier([this](auto s) { log_entry_size = s; }),
// vim: ts=8 sw=2 smarttab
#include <boost/iterator/counting_iterator.hpp>
+#include <fmt/format.h>
#include "os/Transaction.h"
#include "fs_driver.h"
return coll_t(spg_t(pg_t(0, num)));
}
+ghobject_t get_log_object(unsigned coll)
+{
+ return ghobject_t(
+ shard_id_t::NO_SHARD,
+ 0,
+ (coll << 16),
+ "",
+ "",
+ 0,
+ ghobject_t::NO_GEN);
+}
+
+std::string make_log_key(
+ unsigned i)
+{
+ return fmt::format("log_entry_{}", i);
+}
+
+void add_log_entry(
+ unsigned i,
+ unsigned entry_size,
+ std::map<std::string, ceph::buffer::list> *omap)
+{
+ assert(omap);
+ bufferlist bl;
+ bl.append(ceph::buffer::create('0', entry_size));
+
+ omap->emplace(std::make_pair(make_log_key(i), bl));
+}
+
+void populate_log(
+ ceph::os::Transaction &t,
+ FSDriver::pg_analogue_t &pg,
+ unsigned entry_size,
+ unsigned entries)
+{
+ t.touch(pg.collection->get_cid(), pg.log_object);
+ // omap_clear not yet implemented, TODO
+ // t.omap_clear(pg.collection->get_cid(), pg.log_object);
+
+ std::map<std::string, ceph::buffer::list> omap;
+ for (unsigned i = 0; i < entries; ++i) {
+ add_log_entry(i, entry_size, &omap);
+ }
+
+ t.omap_setkeys(
+ pg.collection->get_cid(),
+ pg.log_object,
+ omap);
+
+ pg.log_head = entries;
+}
+
+void update_log(
+ ceph::os::Transaction &t,
+ FSDriver::pg_analogue_t &pg,
+ unsigned entry_size,
+ unsigned entries)
+{
+ ++pg.log_head;
+ std::map<std::string, ceph::buffer::list> key;
+ add_log_entry(pg.log_head, entry_size, &key);
+
+ t.omap_setkeys(
+ pg.collection->get_cid(),
+ pg.log_object,
+ key);
+
+
+ while ((pg.log_head - pg.log_tail) > entries) {
+ t.omap_rmkey(
+ pg.collection->get_cid(),
+ pg.log_object,
+ make_log_key(pg.log_tail));
+ ++pg.log_tail;
+ }
+}
+
FSDriver::offset_mapping_t FSDriver::map_offset(off_t offset)
{
uint32_t objid = offset / config.object_size;
- uint32_t collid = objid % config.num_collections;
+ uint32_t collid = objid % config.num_pgs;
return offset_mapping_t{
collections[collid],
ghobject_t(
shard_id_t::NO_SHARD,
0,
- (collid << 16) | objid,
+ (collid << 16) | (objid + 1),
"",
"",
0,
bufferlist bl;
bl.append(ptr);
t.write(
- mapping.chandle->get_cid(),
+ mapping.pg.collection->get_cid(),
mapping.object,
mapping.offset,
ptr.length(),
bufferlist attr;
attr.append(ceph::buffer::create(config.oi_size, '0'));
t.setattr(
- mapping.chandle->get_cid(),
+ mapping.pg.collection->get_cid(),
mapping.object,
"_",
attr);
}
+ if (config.log_enabled()) {
+ update_log(
+ t,
+ mapping.pg,
+ config.log_entry_size,
+ config.log_size);
+ }
+
return fs->do_transaction(
- mapping.chandle,
+ mapping.pg.collection,
std::move(t));
}
auto mapping = map_offset(offset);
ceph_assert((mapping.offset + size) <= config.object_size);
return fs->read(
- mapping.chandle,
+ mapping.pg.collection,
mapping.object,
mapping.offset,
size,
}).then([this] {
return seastar::do_for_each(
boost::counting_iterator<unsigned>(0),
- boost::counting_iterator<unsigned>(config.num_collections),
+ boost::counting_iterator<unsigned>(config.num_pgs),
[this](auto i) {
return fs->create_new_collection(get_coll(i)
).then([this, i](auto coll) {
}).then([this] {
return seastar::do_for_each(
boost::counting_iterator<unsigned>(0),
- boost::counting_iterator<unsigned>(config.num_collections),
+ boost::counting_iterator<unsigned>(config.num_pgs),
[this](auto i) {
return fs->open_collection(get_coll(i)
).then([this, i](auto ref) {
- collections[i] = ref;
- return seastar::now();
+ collections[i].collection = ref;
+ collections[i].log_object = get_log_object(i);
+ if (config.log_enabled()) {
+ ceph::os::Transaction t;
+ if (config.prepopulate_log_enabled()) {
+ populate_log(
+ t,
+ collections[i],
+ config.log_entry_size,
+ config.log_size);
+ }
+ return fs->do_transaction(
+ collections[i].collection,
+ std::move(t));
+ } else {
+ return seastar::now();
+ }
});
});
}).then([this] {