#define dout_subsys ceph_subsys_bluestore
const string PREFIX_SUPER = "S"; // field -> value
+const string PREFIX_STAT = "T"; // field -> value(int64 array)
const string PREFIX_COLL = "C"; // collection name -> cnode_t
const string PREFIX_OBJ = "O"; // object name -> onode_t
const string PREFIX_OVERLAY = "V"; // u64 + offset -> data
}
#endif
+struct Int64ArrayMergeOperator : public KeyValueDB::MergeOperator {
+ virtual void merge_nonexistant(
+ const char *rdata, size_t rlen, std::string *new_value) override {
+ *new_value = std::string(rdata, rlen);
+ }
+ virtual void merge(
+ const char *ldata, size_t llen,
+ const char *rdata, size_t rlen,
+ std::string *new_value) {
+ assert(llen == rlen);
+ assert((rlen % 8) == 0);
+ new_value->resize(rlen);
+ const __le64* lv = (const __le64*)ldata;
+ const __le64* rv = (const __le64*)rdata;
+ __le64* nv = &(__le64&)new_value->at(0);
+ for (size_t i = 0; i < rlen >> 3; ++i) {
+ nv[i] = lv[i] + rv[i];
+ }
+ }
+ // We use each operator name and each prefix to construct the
+ // overall RocksDB operator name for consistency check at open time.
+ virtual string name() const {
+ return "int64_array";
+ }
+};
+
// BufferSpace
#undef dout_prefix
snprintf(fn, sizeof(fn), "%s/db", path.c_str());
string options;
stringstream err;
+ ceph::shared_ptr<Int64ArrayMergeOperator> merge_op(new Int64ArrayMergeOperator);
string kv_backend;
if (create) {
env = NULL;
return -EIO;
}
-
+
FreelistManager::setup_merge_operators(db);
+ db->set_merge_operator(PREFIX_STAT, merge_op);
if (kv_backend == "rocksdb")
options = g_conf->bluestore_rocksdb_options;
buf->bsize = block_size;
buf->available = (alloc->get_free() - bluefs_len);
+ bufferlist bl;
+ int r = db->get(PREFIX_STAT, "bluestore_statfs", &bl);
+ if (r >= 0) {
+ TransContext::volatile_statfs vstatfs;
+ if (size_t(bl.length()) >= sizeof(vstatfs.values)) {
+ auto it = bl.begin();
+ vstatfs.decode(it);
+
+ buf->allocated = vstatfs.allocated();
+ buf->stored = vstatfs.stored();
+ buf->compressed = vstatfs.compressed();
+ buf->compressed_original = vstatfs.compressed_original();
+ buf->compressed_allocated = vstatfs.compressed_allocated();
+ } else {
+ dout(10) << __func__ << " store_statfs is corrupt, using empty" << dendl;
+ }
+ } else {
+ dout(10) << __func__ << " store_statfs missed, using empty" << dendl;
+ }
+
+
dout(20) << __func__ << *buf << dendl;
return 0;
}
return txc;
}
+void BlueStore::_txc_update_store_statfs(TransContext *txc)
+{
+ if (txc->statfs_delta.is_empty())
+ return;
+
+ bufferlist bl;
+ txc->statfs_delta.encode(bl);
+
+ txc->t->merge(PREFIX_STAT, "bluestore_statfs", bl);
+ txc->statfs_delta.reset();
+}
+
void BlueStore::_txc_state_proc(TransContext *txc)
{
while (true) {
txc->allocated.clear();
txc->released.clear();
+ _txc_update_store_statfs(txc);
}
void BlueStore::_kv_sync_thread()
bluestore_lextent_t(blob, b_off + head_pad, length, 0);
b->ref_map.get(lex.offset, lex.length);
b->mark_used(lex.offset, lex.length);
+ txc->statfs_delta.stored() += lex.length;
dout(20) << __func__ << " lex 0x" << std::hex << offset << std::dec
<< ": " << lex << dendl;
dout(20) << __func__ << " old " << blob << ": " << *b << dendl;
bluestore_lextent_t(blob, offset - bstart, length, 0);
b->ref_map.get(lex.offset, lex.length);
b->mark_used(lex.offset, lex.length);
+ txc->statfs_delta.stored() += lex.length;
dout(20) << __func__ << " lex 0x" << std::hex << offset
<< std::dec << ": " << lex << dendl;
dout(20) << __func__ << " old " << blob << ": " << *b << dendl;
bluestore_lextent_t& lex = o->onode.extent_map[offset] =
bluestore_lextent_t(blob, offset % min_alloc_size, length);
b->ref_map.get(lex.offset, lex.length);
+ txc->statfs_delta.stored() += lex.length;
+
dout(20) << __func__ << " lex 0x" << std::hex << offset << std::dec
<< ": " << lex << dendl;
dout(20) << __func__ << " new " << blob << ": " << *b << dendl;
o->onode.punch_hole(offset, l, &wctx->lex_old);
o->onode.extent_map[offset] = bluestore_lextent_t(blob, 0, l, 0);
b->ref_map.get(0, l);
+ txc->statfs_delta.stored() += length;
dout(20) << __func__ << " lex 0x" << std::hex << offset << std::dec << ": "
<< o->onode.extent_map[offset] << dendl;
dout(20) << __func__ << " blob " << *b << dendl;
<< " -> 0x" << rawlen << " => 0x" << newlen
<< " with " << chdr.type
<< dec << dendl;
- l = &compressed_bl;
+ txc->statfs_delta.compressed() += rawlen;
+ txc->statfs_delta.compressed_original() += l->length();
+ txc->statfs_delta.compressed_allocated() += newlen; l = &compressed_bl;
final_length = newlen;
csum_length = newlen;
b->set_compressed(rawlen);
need -= l;
e.length = l;
txc->allocated.insert(e.offset, e.length);
+ txc->statfs_delta.allocated() += e.length;
b->extents.push_back(e);
final_length -= e.length;
hint = e.end();
bluestore_blob_t *b = c->get_blob_ptr(o, l.blob);
vector<bluestore_pextent_t> r;
b->put_ref(l.offset, l.length, min_alloc_size, &r);
+ txc->statfs_delta.stored() -= l.length;
for (auto e : r) {
dout(20) << __func__ << " release " << e << dendl;
txc->released.insert(e.offset, e.length);
+ txc->statfs_delta.allocated() -= e.length;
}
if (b->ref_map.empty()) {
+ if (b->is_compressed()) {
+ txc->statfs_delta.compressed() -= b->get_payload_length();
+ txc->statfs_delta.compressed_original() -= b->length;
+ txc->statfs_delta.compressed_allocated() -= b->get_ondisk_length();
+ }
dout(20) << __func__ << " rm blob " << *b << dendl;
if (l.blob >= 0) {
o->onode.blob_map.erase(l.blob);
newo->onode.extent_map[p.first] = p.second;
e->blob_map[-p.second.blob].ref_map.get(p.second.offset,
p.second.length);
+ txc->statfs_delta.stored() += p.second.length;
}
newo->bnode = e;
_dump_onode(newo);
vector<OnodeRef> wal_op_onodes;
interval_set<uint64_t> allocated, released;
+ struct volatile_statfs{
+ enum {
+ STATFS_ALLOCATED = 0,
+ STATFS_STORED,
+ STATFS_COMPRESSED_ORIGINAL,
+ STATFS_COMPRESSED,
+ STATFS_COMPRESSED_ALLOCATED,
+ STATFS_LAST
+ };
+ int64_t values[STATFS_LAST];
+ volatile_statfs() {
+ memset(this, 0, sizeof(volatile_statfs));
+ }
+ void reset() {
+ *this = volatile_statfs();
+ }
+ int64_t& allocated() {
+ return values[STATFS_ALLOCATED];
+ }
+ int64_t& stored() {
+ return values[STATFS_STORED];
+ }
+ int64_t& compressed_original() {
+ return values[STATFS_COMPRESSED_ORIGINAL];
+ }
+ int64_t& compressed() {
+ return values[STATFS_COMPRESSED];
+ }
+ int64_t& compressed_allocated() {
+ return values[STATFS_COMPRESSED_ALLOCATED];
+ }
+ bool is_empty() {
+ return values[STATFS_ALLOCATED] == 0 &&
+ values[STATFS_STORED] == 0 &&
+ values[STATFS_COMPRESSED] == 0 &&
+ values[STATFS_COMPRESSED_ORIGINAL] == 0 &&
+ values[STATFS_COMPRESSED_ALLOCATED] == 0;
+ }
+ void decode(bufferlist::iterator& it) {
+ for (size_t i = 0; i < STATFS_LAST; i++) {
+ ::decode(values[i], it);
+ }
+ }
+
+ void encode(bufferlist& bl) {
+ for (size_t i = 0; i < STATFS_LAST; i++) {
+ //::encode(ceph_le64(values[i]), bl);
+ ::encode(values[i], bl);
+ }
+ }
+ } statfs_delta;
+
IOContext ioc;
void _dump_bnode(BnodeRef b, int log_level=30);
TransContext *_txc_create(OpSequencer *osr);
+ void _txc_update_store_statfs(TransContext *txc);
void _txc_add_transaction(TransContext *txc, Transaction *t);
void _txc_write_nodes(TransContext *txc, KeyValueDB::Transaction t);
void _txc_state_proc(TransContext *txc);
}
}
+TEST_P(StoreTest, BluestoreStatFSTest) {
+ if(string(GetParam()) != "bluestore")
+ return;
+ g_conf->set_val("bluestore_compression", "force");
+ g_ceph_context->_conf->apply_changes(NULL);
+
+ ObjectStore::Sequencer osr("test");
+ int r;
+ coll_t cid;
+ ghobject_t hoid(hobject_t(sobject_t("Object 1", CEPH_NOSNAP)));
+ {
+ bufferlist in;
+ r = store->read(cid, hoid, 0, 5, in);
+ ASSERT_EQ(-ENOENT, r);
+ }
+ {
+ ObjectStore::Transaction t;
+ t.create_collection(cid, 0);
+ cerr << "Creating collection " << cid << std::endl;
+ r = apply_transaction(store, &osr, std::move(t));
+ ASSERT_EQ(r, 0);
+ }
+ {
+ bool exists = store->exists(cid, hoid);
+ ASSERT_TRUE(!exists);
+
+ ObjectStore::Transaction t;
+ t.touch(cid, hoid);
+ cerr << "Creating object " << hoid << std::endl;
+ r = apply_transaction(store, &osr, std::move(t));
+ ASSERT_EQ(r, 0);
+
+ exists = store->exists(cid, hoid);
+ ASSERT_EQ(true, exists);
+ }
+ uint64_t available0 = 0;
+ {
+ struct store_statfs_t statfs;
+ int r = store->statfs(&statfs);
+ ASSERT_EQ(r, 0);
+ ASSERT_EQ( 0u, statfs.allocated);
+ ASSERT_EQ( 0u, statfs.stored);
+ ASSERT_EQ(0x1000u, statfs.bsize);
+ ASSERT_EQ(g_conf->bluestore_block_size / 0x1000, statfs.blocks);
+ ASSERT_TRUE(statfs.available > 0u && statfs.available < g_conf->bluestore_block_size);
+ available0 = statfs.available;
+ }
+ {
+ ObjectStore::Transaction t;
+ bufferlist bl;
+ bl.append("abcde");
+ t.write(cid, hoid, 0, 5, bl);
+ cerr << "Append 5 bytes" << std::endl;
+ r = apply_transaction(store, &osr, std::move(t));
+ ASSERT_EQ(r, 0);
+
+ struct store_statfs_t statfs;
+ int r = store->statfs(&statfs);
+ ASSERT_EQ(r, 0);
+ ASSERT_EQ(5, statfs.stored);
+ ASSERT_EQ(0x10000, statfs.allocated);
+ ASSERT_EQ(available0 - 0x10000, statfs.available);
+ ASSERT_EQ(0, statfs.compressed);
+ ASSERT_EQ(0, statfs.compressed_original);
+ ASSERT_EQ(0, statfs.compressed_allocated);
+ }
+ {
+ ObjectStore::Transaction t;
+ std::string s(0x30000, 'a');
+ bufferlist bl;
+ bl.append(s);
+ t.write(cid, hoid, 0x10000, bl.length(), bl);
+ cerr << "Append 0x30000 compressible bytes" << std::endl;
+ r = apply_transaction(store, &osr, std::move(t));
+ ASSERT_EQ(r, 0);
+
+ struct store_statfs_t statfs;
+ int r = store->statfs(&statfs);
+ ASSERT_EQ(r, 0);
+ ASSERT_EQ(0x30005, statfs.stored);
+ ASSERT_EQ(0x20000, statfs.allocated);
+ ASSERT_EQ(available0 - 0x20000, statfs.available);
+ ASSERT_LE(statfs.compressed, 0x10000);
+ ASSERT_EQ(0x30000, statfs.compressed_original);
+ ASSERT_EQ(statfs.compressed_allocated, 0x10000);
+ }
+ {
+ ObjectStore::Transaction t;
+ t.zero(cid, hoid, 1, 3);
+ t.zero(cid, hoid, 0x20000, 9);
+ cerr << "Punch hole at 1~3, 0x20000~9" << std::endl;
+ r = apply_transaction(store, &osr, std::move(t));
+ ASSERT_EQ(r, 0);
+
+ struct store_statfs_t statfs;
+ int r = store->statfs(&statfs);
+ ASSERT_EQ(r, 0);
+ ASSERT_EQ(0x30005 - 3 - 9, statfs.stored);
+ ASSERT_EQ(0x20000, statfs.allocated);
+ ASSERT_EQ(available0 - 0x20000, statfs.available);
+ ASSERT_LE(statfs.compressed, 0x10000);
+ ASSERT_EQ(0x30000, statfs.compressed_original);
+ ASSERT_EQ(statfs.compressed_allocated, 0x10000);
+ }
+ {
+ ObjectStore::Transaction t;
+ std::string s(0x1000, 'b');
+ bufferlist bl;
+ bl.append(s);
+ t.write(cid, hoid, 1, bl.length(), bl);
+ t.write(cid, hoid, 0x10001, bl.length(), bl);
+ cerr << "Overwrite first and second(compressible) extents" << std::endl;
+ r = apply_transaction(store, &osr, std::move(t));
+ ASSERT_EQ(r, 0);
+
+ struct store_statfs_t statfs;
+ int r = store->statfs(&statfs);
+ ASSERT_EQ(r, 0);
+ ASSERT_EQ(0x30001 - 9 + 0x1000, statfs.stored);
+ ASSERT_EQ(0x30000, statfs.allocated);
+ ASSERT_EQ(available0 - 0x30000, statfs.available);
+ ASSERT_LE(statfs.compressed, 0x10000);
+ ASSERT_EQ(0x30000, statfs.compressed_original);
+ ASSERT_EQ(statfs.compressed_allocated, 0x10000);
+ }
+ {
+ ObjectStore::Transaction t;
+ std::string s(0x10000, 'c');
+ bufferlist bl;
+ bl.append(s);
+ t.write(cid, hoid, 0x10000, bl.length(), bl);
+ t.write(cid, hoid, 0x20000, bl.length(), bl);
+ t.write(cid, hoid, 0x30000, bl.length(), bl);
+ cerr << "Overwrite compressed extent with 3 uncompressible ones" << std::endl;
+ r = apply_transaction(store, &osr, std::move(t));
+ ASSERT_EQ(r, 0);
+
+ struct store_statfs_t statfs;
+ int r = store->statfs(&statfs);
+ ASSERT_EQ(r, 0);
+ ASSERT_EQ(0x30000 + 0x1001, statfs.stored);
+ ASSERT_EQ(0x40000, statfs.allocated);
+ ASSERT_LE(statfs.compressed, 0);
+ ASSERT_EQ(0, statfs.compressed_original);
+ ASSERT_EQ(0, statfs.compressed_allocated);
+ }
+ {
+ ObjectStore::Transaction t;
+ t.zero(cid, hoid, 0, 0x40000);
+ cerr << "Zero object" << std::endl;
+ r = apply_transaction(store, &osr, std::move(t));
+ ASSERT_EQ(r, 0);
+ struct store_statfs_t statfs;
+ int r = store->statfs(&statfs);
+ ASSERT_EQ(r, 0);
+ ASSERT_EQ(0u, statfs.allocated);
+ ASSERT_EQ(0u, statfs.stored);
+ ASSERT_EQ(0u, statfs.compressed_original);
+ ASSERT_EQ(0u, statfs.compressed);
+ ASSERT_EQ(0u, statfs.compressed_allocated);
+ }
+ {
+ ObjectStore::Transaction t;
+ std::string s(0x10000, 'c');
+ bufferlist bl;
+ bl.append(s);
+ bl.append(s);
+ bl.append(s);
+ bl.append(s.substr(0, 0x10000-2));
+ t.write(cid, hoid, 0, bl.length(), bl);
+ cerr << "Yet another compressible write" << std::endl;
+ r = apply_transaction(store, &osr, std::move(t));
+ ASSERT_EQ(r, 0);
+ struct store_statfs_t statfs;
+ r = store->statfs(&statfs);
+ ASSERT_EQ(r, 0);
+ ASSERT_EQ(0x40000 - 2, statfs.stored);
+ ASSERT_EQ(0x20000, statfs.allocated);
+ ASSERT_LE(statfs.compressed, 0x10000);
+ ASSERT_EQ(0x30000, statfs.compressed_original);
+ ASSERT_EQ(0x10000, statfs.compressed_allocated);
+ }
+
+ {
+ ObjectStore::Transaction t;
+ t.remove(cid, hoid);
+ t.remove_collection(cid);
+ cerr << "Cleaning" << std::endl;
+ r = apply_transaction(store, &osr, std::move(t));
+ ASSERT_EQ(r, 0);
+
+ struct store_statfs_t statfs;
+ r = store->statfs(&statfs);
+ ASSERT_EQ(r, 0);
+ ASSERT_EQ( 0u, statfs.allocated);
+ ASSERT_EQ( 0u, statfs.stored);
+ ASSERT_EQ( 0u, statfs.compressed_original);
+ ASSERT_EQ( 0u, statfs.compressed);
+ ASSERT_EQ( 0u, statfs.compressed_allocated);
+ }
+ g_conf->set_val("bluestore_compression", "none");
+ g_ceph_context->_conf->apply_changes(NULL);
+}
+
TEST_P(StoreTest, ManySmallWrite) {
ObjectStore::Sequencer osr("test");
int r;