From: Igor Fedotov Date: Mon, 6 Jun 2016 16:06:48 +0000 (+0300) Subject: os/BlueStore: Adds improved per-store statistics including compression ones. X-Git-Tag: v11.0.0~271^2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=975a31bda36fa590f5a197f9882ba2ca8d4e924a;p=ceph.git os/BlueStore: Adds improved per-store statistics including compression ones. Signed-off-by: Igor Fedotov --- diff --git a/src/os/bluestore/BlueStore.cc b/src/os/bluestore/BlueStore.cc index 668320f42b3..149635124c1 100644 --- a/src/os/bluestore/BlueStore.cc +++ b/src/os/bluestore/BlueStore.cc @@ -33,6 +33,7 @@ #define dout_subsys ceph_subsys_bluestore const string PREFIX_SUPER = "S"; // field -> value +const string PREFIX_STAT = "T"; // field -> value(int64 array) const string PREFIX_COLL = "C"; // collection name -> cnode_t const string PREFIX_OBJ = "O"; // object name -> onode_t const string PREFIX_OVERLAY = "V"; // u64 + offset -> data @@ -565,6 +566,32 @@ void BlueStore::Cache::_audit_lru(const char *when) } #endif +struct Int64ArrayMergeOperator : public KeyValueDB::MergeOperator { + virtual void merge_nonexistant( + const char *rdata, size_t rlen, std::string *new_value) override { + *new_value = std::string(rdata, rlen); + } + virtual void merge( + const char *ldata, size_t llen, + const char *rdata, size_t rlen, + std::string *new_value) { + assert(llen == rlen); + assert((rlen % 8) == 0); + new_value->resize(rlen); + const __le64* lv = (const __le64*)ldata; + const __le64* rv = (const __le64*)rdata; + __le64* nv = &(__le64&)new_value->at(0); + for (size_t i = 0; i < rlen >> 3; ++i) { + nv[i] = lv[i] + rv[i]; + } + } + // We use each operator name and each prefix to construct the + // overall RocksDB operator name for consistency check at open time. + virtual string name() const { + return "int64_array"; + } +}; + // BufferSpace #undef dout_prefix @@ -1524,6 +1551,7 @@ int BlueStore::_open_db(bool create) snprintf(fn, sizeof(fn), "%s/db", path.c_str()); string options; stringstream err; + ceph::shared_ptr merge_op(new Int64ArrayMergeOperator); string kv_backend; if (create) { @@ -1744,8 +1772,9 @@ int BlueStore::_open_db(bool create) env = NULL; return -EIO; } - + FreelistManager::setup_merge_operators(db); + db->set_merge_operator(PREFIX_STAT, merge_op); if (kv_backend == "rocksdb") options = g_conf->bluestore_rocksdb_options; @@ -2894,6 +2923,27 @@ int BlueStore::statfs(struct store_statfs_t *buf) buf->bsize = block_size; buf->available = (alloc->get_free() - bluefs_len); + bufferlist bl; + int r = db->get(PREFIX_STAT, "bluestore_statfs", &bl); + if (r >= 0) { + TransContext::volatile_statfs vstatfs; + if (size_t(bl.length()) >= sizeof(vstatfs.values)) { + auto it = bl.begin(); + vstatfs.decode(it); + + buf->allocated = vstatfs.allocated(); + buf->stored = vstatfs.stored(); + buf->compressed = vstatfs.compressed(); + buf->compressed_original = vstatfs.compressed_original(); + buf->compressed_allocated = vstatfs.compressed_allocated(); + } else { + dout(10) << __func__ << " store_statfs is corrupt, using empty" << dendl; + } + } else { + dout(10) << __func__ << " store_statfs missed, using empty" << dendl; + } + + dout(20) << __func__ << *buf << dendl; return 0; } @@ -4263,6 +4313,18 @@ BlueStore::TransContext *BlueStore::_txc_create(OpSequencer *osr) return txc; } +void BlueStore::_txc_update_store_statfs(TransContext *txc) +{ + if (txc->statfs_delta.is_empty()) + return; + + bufferlist bl; + txc->statfs_delta.encode(bl); + + txc->t->merge(PREFIX_STAT, "bluestore_statfs", bl); + txc->statfs_delta.reset(); +} + void BlueStore::_txc_state_proc(TransContext *txc) { while (true) { @@ -4578,6 +4640,7 @@ void BlueStore::_txc_finalize_kv(TransContext *txc, KeyValueDB::Transaction t) txc->allocated.clear(); txc->released.clear(); + _txc_update_store_statfs(txc); } void BlueStore::_kv_sync_thread() @@ -5655,6 +5718,7 @@ void BlueStore::_do_write_small( bluestore_lextent_t(blob, b_off + head_pad, length, 0); b->ref_map.get(lex.offset, lex.length); b->mark_used(lex.offset, lex.length); + txc->statfs_delta.stored() += lex.length; dout(20) << __func__ << " lex 0x" << std::hex << offset << std::dec << ": " << lex << dendl; dout(20) << __func__ << " old " << blob << ": " << *b << dendl; @@ -5724,6 +5788,7 @@ void BlueStore::_do_write_small( bluestore_lextent_t(blob, offset - bstart, length, 0); b->ref_map.get(lex.offset, lex.length); b->mark_used(lex.offset, lex.length); + txc->statfs_delta.stored() += lex.length; dout(20) << __func__ << " lex 0x" << std::hex << offset << std::dec << ": " << lex << dendl; dout(20) << __func__ << " old " << blob << ": " << *b << dendl; @@ -5748,6 +5813,8 @@ void BlueStore::_do_write_small( bluestore_lextent_t& lex = o->onode.extent_map[offset] = bluestore_lextent_t(blob, offset % min_alloc_size, length); b->ref_map.get(lex.offset, lex.length); + txc->statfs_delta.stored() += lex.length; + dout(20) << __func__ << " lex 0x" << std::hex << offset << std::dec << ": " << lex << dendl; dout(20) << __func__ << " new " << blob << ": " << *b << dendl; @@ -5781,6 +5848,7 @@ void BlueStore::_do_write_big( o->onode.punch_hole(offset, l, &wctx->lex_old); o->onode.extent_map[offset] = bluestore_lextent_t(blob, 0, l, 0); b->ref_map.get(0, l); + txc->statfs_delta.stored() += length; dout(20) << __func__ << " lex 0x" << std::hex << offset << std::dec << ": " << o->onode.extent_map[offset] << dendl; dout(20) << __func__ << " blob " << *b << dendl; @@ -5840,7 +5908,9 @@ int BlueStore::_do_alloc_write( << " -> 0x" << rawlen << " => 0x" << newlen << " with " << chdr.type << dec << dendl; - l = &compressed_bl; + txc->statfs_delta.compressed() += rawlen; + txc->statfs_delta.compressed_original() += l->length(); + txc->statfs_delta.compressed_allocated() += newlen; l = &compressed_bl; final_length = newlen; csum_length = newlen; b->set_compressed(rawlen); @@ -5863,6 +5933,7 @@ int BlueStore::_do_alloc_write( need -= l; e.length = l; txc->allocated.insert(e.offset, e.length); + txc->statfs_delta.allocated() += e.length; b->extents.push_back(e); final_length -= e.length; hint = e.end(); @@ -5899,11 +5970,18 @@ void BlueStore::_wctx_finish( bluestore_blob_t *b = c->get_blob_ptr(o, l.blob); vector r; b->put_ref(l.offset, l.length, min_alloc_size, &r); + txc->statfs_delta.stored() -= l.length; for (auto e : r) { dout(20) << __func__ << " release " << e << dendl; txc->released.insert(e.offset, e.length); + txc->statfs_delta.allocated() -= e.length; } if (b->ref_map.empty()) { + if (b->is_compressed()) { + txc->statfs_delta.compressed() -= b->get_payload_length(); + txc->statfs_delta.compressed_original() -= b->length; + txc->statfs_delta.compressed_allocated() -= b->get_ondisk_length(); + } dout(20) << __func__ << " rm blob " << *b << dendl; if (l.blob >= 0) { o->onode.blob_map.erase(l.blob); @@ -6470,6 +6548,7 @@ int BlueStore::_clone(TransContext *txc, newo->onode.extent_map[p.first] = p.second; e->blob_map[-p.second.blob].ref_map.get(p.second.offset, p.second.length); + txc->statfs_delta.stored() += p.second.length; } newo->bnode = e; _dump_onode(newo); diff --git a/src/os/bluestore/BlueStore.h b/src/os/bluestore/BlueStore.h index a53ea05f8dc..d2bcfe046c1 100644 --- a/src/os/bluestore/BlueStore.h +++ b/src/os/bluestore/BlueStore.h @@ -588,6 +588,58 @@ public: vector wal_op_onodes; interval_set allocated, released; + struct volatile_statfs{ + enum { + STATFS_ALLOCATED = 0, + STATFS_STORED, + STATFS_COMPRESSED_ORIGINAL, + STATFS_COMPRESSED, + STATFS_COMPRESSED_ALLOCATED, + STATFS_LAST + }; + int64_t values[STATFS_LAST]; + volatile_statfs() { + memset(this, 0, sizeof(volatile_statfs)); + } + void reset() { + *this = volatile_statfs(); + } + int64_t& allocated() { + return values[STATFS_ALLOCATED]; + } + int64_t& stored() { + return values[STATFS_STORED]; + } + int64_t& compressed_original() { + return values[STATFS_COMPRESSED_ORIGINAL]; + } + int64_t& compressed() { + return values[STATFS_COMPRESSED]; + } + int64_t& compressed_allocated() { + return values[STATFS_COMPRESSED_ALLOCATED]; + } + bool is_empty() { + return values[STATFS_ALLOCATED] == 0 && + values[STATFS_STORED] == 0 && + values[STATFS_COMPRESSED] == 0 && + values[STATFS_COMPRESSED_ORIGINAL] == 0 && + values[STATFS_COMPRESSED_ALLOCATED] == 0; + } + void decode(bufferlist::iterator& it) { + for (size_t i = 0; i < STATFS_LAST; i++) { + ::decode(values[i], it); + } + } + + void encode(bufferlist& bl) { + for (size_t i = 0; i < STATFS_LAST; i++) { + //::encode(ceph_le64(values[i]), bl); + ::encode(values[i], bl); + } + } + } statfs_delta; + IOContext ioc; @@ -929,6 +981,7 @@ private: void _dump_bnode(BnodeRef b, int log_level=30); TransContext *_txc_create(OpSequencer *osr); + void _txc_update_store_statfs(TransContext *txc); void _txc_add_transaction(TransContext *txc, Transaction *t); void _txc_write_nodes(TransContext *txc, KeyValueDB::Transaction t); void _txc_state_proc(TransContext *txc); diff --git a/src/test/objectstore/store_test.cc b/src/test/objectstore/store_test.cc index cf4938b6ff6..c0220d22db1 100644 --- a/src/test/objectstore/store_test.cc +++ b/src/test/objectstore/store_test.cc @@ -1091,6 +1091,210 @@ TEST_P(StoreTest, SimpleObjectTest) { } } +TEST_P(StoreTest, BluestoreStatFSTest) { + if(string(GetParam()) != "bluestore") + return; + g_conf->set_val("bluestore_compression", "force"); + g_ceph_context->_conf->apply_changes(NULL); + + ObjectStore::Sequencer osr("test"); + int r; + coll_t cid; + ghobject_t hoid(hobject_t(sobject_t("Object 1", CEPH_NOSNAP))); + { + bufferlist in; + r = store->read(cid, hoid, 0, 5, in); + ASSERT_EQ(-ENOENT, r); + } + { + ObjectStore::Transaction t; + t.create_collection(cid, 0); + cerr << "Creating collection " << cid << std::endl; + r = apply_transaction(store, &osr, std::move(t)); + ASSERT_EQ(r, 0); + } + { + bool exists = store->exists(cid, hoid); + ASSERT_TRUE(!exists); + + ObjectStore::Transaction t; + t.touch(cid, hoid); + cerr << "Creating object " << hoid << std::endl; + r = apply_transaction(store, &osr, std::move(t)); + ASSERT_EQ(r, 0); + + exists = store->exists(cid, hoid); + ASSERT_EQ(true, exists); + } + uint64_t available0 = 0; + { + struct store_statfs_t statfs; + int r = store->statfs(&statfs); + ASSERT_EQ(r, 0); + ASSERT_EQ( 0u, statfs.allocated); + ASSERT_EQ( 0u, statfs.stored); + ASSERT_EQ(0x1000u, statfs.bsize); + ASSERT_EQ(g_conf->bluestore_block_size / 0x1000, statfs.blocks); + ASSERT_TRUE(statfs.available > 0u && statfs.available < g_conf->bluestore_block_size); + available0 = statfs.available; + } + { + ObjectStore::Transaction t; + bufferlist bl; + bl.append("abcde"); + t.write(cid, hoid, 0, 5, bl); + cerr << "Append 5 bytes" << std::endl; + r = apply_transaction(store, &osr, std::move(t)); + ASSERT_EQ(r, 0); + + struct store_statfs_t statfs; + int r = store->statfs(&statfs); + ASSERT_EQ(r, 0); + ASSERT_EQ(5, statfs.stored); + ASSERT_EQ(0x10000, statfs.allocated); + ASSERT_EQ(available0 - 0x10000, statfs.available); + ASSERT_EQ(0, statfs.compressed); + ASSERT_EQ(0, statfs.compressed_original); + ASSERT_EQ(0, statfs.compressed_allocated); + } + { + ObjectStore::Transaction t; + std::string s(0x30000, 'a'); + bufferlist bl; + bl.append(s); + t.write(cid, hoid, 0x10000, bl.length(), bl); + cerr << "Append 0x30000 compressible bytes" << std::endl; + r = apply_transaction(store, &osr, std::move(t)); + ASSERT_EQ(r, 0); + + struct store_statfs_t statfs; + int r = store->statfs(&statfs); + ASSERT_EQ(r, 0); + ASSERT_EQ(0x30005, statfs.stored); + ASSERT_EQ(0x20000, statfs.allocated); + ASSERT_EQ(available0 - 0x20000, statfs.available); + ASSERT_LE(statfs.compressed, 0x10000); + ASSERT_EQ(0x30000, statfs.compressed_original); + ASSERT_EQ(statfs.compressed_allocated, 0x10000); + } + { + ObjectStore::Transaction t; + t.zero(cid, hoid, 1, 3); + t.zero(cid, hoid, 0x20000, 9); + cerr << "Punch hole at 1~3, 0x20000~9" << std::endl; + r = apply_transaction(store, &osr, std::move(t)); + ASSERT_EQ(r, 0); + + struct store_statfs_t statfs; + int r = store->statfs(&statfs); + ASSERT_EQ(r, 0); + ASSERT_EQ(0x30005 - 3 - 9, statfs.stored); + ASSERT_EQ(0x20000, statfs.allocated); + ASSERT_EQ(available0 - 0x20000, statfs.available); + ASSERT_LE(statfs.compressed, 0x10000); + ASSERT_EQ(0x30000, statfs.compressed_original); + ASSERT_EQ(statfs.compressed_allocated, 0x10000); + } + { + ObjectStore::Transaction t; + std::string s(0x1000, 'b'); + bufferlist bl; + bl.append(s); + t.write(cid, hoid, 1, bl.length(), bl); + t.write(cid, hoid, 0x10001, bl.length(), bl); + cerr << "Overwrite first and second(compressible) extents" << std::endl; + r = apply_transaction(store, &osr, std::move(t)); + ASSERT_EQ(r, 0); + + struct store_statfs_t statfs; + int r = store->statfs(&statfs); + ASSERT_EQ(r, 0); + ASSERT_EQ(0x30001 - 9 + 0x1000, statfs.stored); + ASSERT_EQ(0x30000, statfs.allocated); + ASSERT_EQ(available0 - 0x30000, statfs.available); + ASSERT_LE(statfs.compressed, 0x10000); + ASSERT_EQ(0x30000, statfs.compressed_original); + ASSERT_EQ(statfs.compressed_allocated, 0x10000); + } + { + ObjectStore::Transaction t; + std::string s(0x10000, 'c'); + bufferlist bl; + bl.append(s); + t.write(cid, hoid, 0x10000, bl.length(), bl); + t.write(cid, hoid, 0x20000, bl.length(), bl); + t.write(cid, hoid, 0x30000, bl.length(), bl); + cerr << "Overwrite compressed extent with 3 uncompressible ones" << std::endl; + r = apply_transaction(store, &osr, std::move(t)); + ASSERT_EQ(r, 0); + + struct store_statfs_t statfs; + int r = store->statfs(&statfs); + ASSERT_EQ(r, 0); + ASSERT_EQ(0x30000 + 0x1001, statfs.stored); + ASSERT_EQ(0x40000, statfs.allocated); + ASSERT_LE(statfs.compressed, 0); + ASSERT_EQ(0, statfs.compressed_original); + ASSERT_EQ(0, statfs.compressed_allocated); + } + { + ObjectStore::Transaction t; + t.zero(cid, hoid, 0, 0x40000); + cerr << "Zero object" << std::endl; + r = apply_transaction(store, &osr, std::move(t)); + ASSERT_EQ(r, 0); + struct store_statfs_t statfs; + int r = store->statfs(&statfs); + ASSERT_EQ(r, 0); + ASSERT_EQ(0u, statfs.allocated); + ASSERT_EQ(0u, statfs.stored); + ASSERT_EQ(0u, statfs.compressed_original); + ASSERT_EQ(0u, statfs.compressed); + ASSERT_EQ(0u, statfs.compressed_allocated); + } + { + ObjectStore::Transaction t; + std::string s(0x10000, 'c'); + bufferlist bl; + bl.append(s); + bl.append(s); + bl.append(s); + bl.append(s.substr(0, 0x10000-2)); + t.write(cid, hoid, 0, bl.length(), bl); + cerr << "Yet another compressible write" << std::endl; + r = apply_transaction(store, &osr, std::move(t)); + ASSERT_EQ(r, 0); + struct store_statfs_t statfs; + r = store->statfs(&statfs); + ASSERT_EQ(r, 0); + ASSERT_EQ(0x40000 - 2, statfs.stored); + ASSERT_EQ(0x20000, statfs.allocated); + ASSERT_LE(statfs.compressed, 0x10000); + ASSERT_EQ(0x30000, statfs.compressed_original); + ASSERT_EQ(0x10000, statfs.compressed_allocated); + } + + { + ObjectStore::Transaction t; + t.remove(cid, hoid); + t.remove_collection(cid); + cerr << "Cleaning" << std::endl; + r = apply_transaction(store, &osr, std::move(t)); + ASSERT_EQ(r, 0); + + struct store_statfs_t statfs; + r = store->statfs(&statfs); + ASSERT_EQ(r, 0); + ASSERT_EQ( 0u, statfs.allocated); + ASSERT_EQ( 0u, statfs.stored); + ASSERT_EQ( 0u, statfs.compressed_original); + ASSERT_EQ( 0u, statfs.compressed); + ASSERT_EQ( 0u, statfs.compressed_allocated); + } + g_conf->set_val("bluestore_compression", "none"); + g_ceph_context->_conf->apply_changes(NULL); +} + TEST_P(StoreTest, ManySmallWrite) { ObjectStore::Sequencer osr("test"); int r;