]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
os/BlueStore: Adds improved per-store statistics including compression ones. 9486/head
authorIgor Fedotov <ifedotov@mirantis.com>
Mon, 6 Jun 2016 16:06:48 +0000 (19:06 +0300)
committerIgor Fedotov <ifedotov@mirantis.com>
Wed, 8 Jun 2016 14:16:05 +0000 (17:16 +0300)
Signed-off-by: Igor Fedotov <ifedotov@mirantis.com>
src/os/bluestore/BlueStore.cc
src/os/bluestore/BlueStore.h
src/test/objectstore/store_test.cc

index 668320f42b356cc32f9981233a7fc969b2820c18..149635124c1a4c447e62f7b3e28aa3ad99172702 100644 (file)
@@ -33,6 +33,7 @@
 #define dout_subsys ceph_subsys_bluestore
 
 const string PREFIX_SUPER = "S";   // field -> value
+const string PREFIX_STAT = "T";    // field -> value(int64 array)
 const string PREFIX_COLL = "C";    // collection name -> cnode_t
 const string PREFIX_OBJ = "O";     // object name -> onode_t
 const string PREFIX_OVERLAY = "V"; // u64 + offset -> data
@@ -565,6 +566,32 @@ void BlueStore::Cache::_audit_lru(const char *when)
 }
 #endif
 
+struct Int64ArrayMergeOperator : public KeyValueDB::MergeOperator {
+  virtual void merge_nonexistant(
+    const char *rdata, size_t rlen, std::string *new_value) override {
+    *new_value = std::string(rdata, rlen);
+  }
+  virtual void merge(
+    const char *ldata, size_t llen,
+    const char *rdata, size_t rlen,
+    std::string *new_value) {
+    assert(llen == rlen);
+    assert((rlen % 8) == 0);
+    new_value->resize(rlen);
+    const __le64* lv = (const __le64*)ldata;
+    const __le64* rv = (const __le64*)rdata;
+    __le64* nv = &(__le64&)new_value->at(0);
+    for (size_t i = 0; i < rlen >> 3; ++i) {
+      nv[i] = lv[i] + rv[i];
+    }
+  }
+  // We use each operator name and each prefix to construct the
+  // overall RocksDB operator name for consistency check at open time.
+  virtual string name() const {
+    return "int64_array";
+  }
+};
+
 // BufferSpace
 
 #undef dout_prefix
@@ -1524,6 +1551,7 @@ int BlueStore::_open_db(bool create)
   snprintf(fn, sizeof(fn), "%s/db", path.c_str());
   string options;
   stringstream err;
+  ceph::shared_ptr<Int64ArrayMergeOperator> merge_op(new Int64ArrayMergeOperator);
 
   string kv_backend;
   if (create) {
@@ -1744,8 +1772,9 @@ int BlueStore::_open_db(bool create)
     env = NULL;
     return -EIO;
   }
-  
+
   FreelistManager::setup_merge_operators(db);
+  db->set_merge_operator(PREFIX_STAT, merge_op);
 
   if (kv_backend == "rocksdb")
     options = g_conf->bluestore_rocksdb_options;
@@ -2894,6 +2923,27 @@ int BlueStore::statfs(struct store_statfs_t *buf)
   buf->bsize = block_size;
   buf->available = (alloc->get_free() - bluefs_len);
 
+  bufferlist bl;
+  int r = db->get(PREFIX_STAT, "bluestore_statfs", &bl);
+  if (r >= 0) {
+       TransContext::volatile_statfs vstatfs;
+     if (size_t(bl.length()) >= sizeof(vstatfs.values)) {
+       auto it = bl.begin();
+       vstatfs.decode(it);
+
+       buf->allocated = vstatfs.allocated();
+       buf->stored = vstatfs.stored();
+       buf->compressed = vstatfs.compressed();
+       buf->compressed_original = vstatfs.compressed_original();
+       buf->compressed_allocated = vstatfs.compressed_allocated();
+     } else {
+       dout(10) << __func__ << " store_statfs is corrupt, using empty" << dendl;
+     }
+  } else {
+    dout(10) << __func__ << " store_statfs missed, using empty" << dendl;
+  }
+
+
   dout(20) << __func__ << *buf << dendl;
   return 0;
 }
@@ -4263,6 +4313,18 @@ BlueStore::TransContext *BlueStore::_txc_create(OpSequencer *osr)
   return txc;
 }
 
+void BlueStore::_txc_update_store_statfs(TransContext *txc)
+{
+  if (txc->statfs_delta.is_empty())
+    return;
+
+  bufferlist bl;
+  txc->statfs_delta.encode(bl);
+
+  txc->t->merge(PREFIX_STAT, "bluestore_statfs", bl);
+  txc->statfs_delta.reset();
+}
+
 void BlueStore::_txc_state_proc(TransContext *txc)
 {
   while (true) {
@@ -4578,6 +4640,7 @@ void BlueStore::_txc_finalize_kv(TransContext *txc, KeyValueDB::Transaction t)
 
   txc->allocated.clear();
   txc->released.clear();
+  _txc_update_store_statfs(txc);
 }
 
 void BlueStore::_kv_sync_thread()
@@ -5655,6 +5718,7 @@ void BlueStore::_do_write_small(
        bluestore_lextent_t(blob, b_off + head_pad, length, 0);
       b->ref_map.get(lex.offset, lex.length);
       b->mark_used(lex.offset, lex.length);
+      txc->statfs_delta.stored() += lex.length;
       dout(20) << __func__ << "  lex 0x" << std::hex << offset << std::dec
               << ": " << lex << dendl;
       dout(20) << __func__ << "  old " << blob << ": " << *b << dendl;
@@ -5724,6 +5788,7 @@ void BlueStore::_do_write_small(
        bluestore_lextent_t(blob, offset - bstart, length, 0);
       b->ref_map.get(lex.offset, lex.length);
       b->mark_used(lex.offset, lex.length);
+      txc->statfs_delta.stored() += lex.length;
       dout(20) << __func__ << "  lex 0x" << std::hex << offset
               << std::dec << ": " << lex << dendl;
       dout(20) << __func__ << "  old " << blob << ": " << *b << dendl;
@@ -5748,6 +5813,8 @@ void BlueStore::_do_write_small(
   bluestore_lextent_t& lex = o->onode.extent_map[offset] =
     bluestore_lextent_t(blob, offset % min_alloc_size, length);
   b->ref_map.get(lex.offset, lex.length);
+  txc->statfs_delta.stored() += lex.length;
+
   dout(20) << __func__ << "  lex 0x" << std::hex << offset << std::dec
           << ": " << lex << dendl;
   dout(20) << __func__ << "  new " << blob << ": " << *b << dendl;
@@ -5781,6 +5848,7 @@ void BlueStore::_do_write_big(
     o->onode.punch_hole(offset, l, &wctx->lex_old);
     o->onode.extent_map[offset] = bluestore_lextent_t(blob, 0, l, 0);
     b->ref_map.get(0, l);
+    txc->statfs_delta.stored() += length;
     dout(20) << __func__ << "  lex 0x" << std::hex << offset << std::dec << ": "
             << o->onode.extent_map[offset] << dendl;
     dout(20) << __func__ << "  blob " << *b << dendl;
@@ -5840,7 +5908,9 @@ int BlueStore::_do_alloc_write(
                 << " -> 0x" << rawlen << " => 0x" << newlen
                 << " with " << chdr.type
                 << dec << dendl;
-       l = &compressed_bl;
+       txc->statfs_delta.compressed() += rawlen;
+       txc->statfs_delta.compressed_original() += l->length();
+       txc->statfs_delta.compressed_allocated() += newlen;     l = &compressed_bl;
        final_length = newlen;
        csum_length = newlen;
        b->set_compressed(rawlen);
@@ -5863,6 +5933,7 @@ int BlueStore::_do_alloc_write(
       need -= l;
       e.length = l;
       txc->allocated.insert(e.offset, e.length);
+      txc->statfs_delta.allocated() += e.length;
       b->extents.push_back(e);
       final_length -= e.length;
       hint = e.end();
@@ -5899,11 +5970,18 @@ void BlueStore::_wctx_finish(
     bluestore_blob_t *b = c->get_blob_ptr(o, l.blob);
     vector<bluestore_pextent_t> r;
     b->put_ref(l.offset, l.length, min_alloc_size, &r);
+    txc->statfs_delta.stored() -= l.length;
     for (auto e : r) {
       dout(20) << __func__ << " release " << e << dendl;
       txc->released.insert(e.offset, e.length);
+      txc->statfs_delta.allocated() -= e.length;
     }
     if (b->ref_map.empty()) {
+      if (b->is_compressed()) {
+       txc->statfs_delta.compressed() -= b->get_payload_length();
+       txc->statfs_delta.compressed_original() -= b->length;
+       txc->statfs_delta.compressed_allocated() -= b->get_ondisk_length();
+      }
       dout(20) << __func__ << " rm blob " << *b << dendl;
       if (l.blob >= 0) {
        o->onode.blob_map.erase(l.blob);
@@ -6470,6 +6548,7 @@ int BlueStore::_clone(TransContext *txc,
        newo->onode.extent_map[p.first] = p.second;
        e->blob_map[-p.second.blob].ref_map.get(p.second.offset,
                                                p.second.length);
+       txc->statfs_delta.stored() += p.second.length;
       }
       newo->bnode = e;
       _dump_onode(newo);
index a53ea05f8dcfb48003d6e8e941a8012439ee63fe..d2bcfe046c1f3707f40af4d021e59b07faf65f62 100644 (file)
@@ -588,6 +588,58 @@ public:
     vector<OnodeRef> wal_op_onodes;
 
     interval_set<uint64_t> allocated, released;
+    struct volatile_statfs{
+      enum {
+        STATFS_ALLOCATED = 0,
+        STATFS_STORED,
+        STATFS_COMPRESSED_ORIGINAL,
+        STATFS_COMPRESSED,
+        STATFS_COMPRESSED_ALLOCATED,
+        STATFS_LAST
+      };
+      int64_t values[STATFS_LAST];
+      volatile_statfs() {
+        memset(this, 0, sizeof(volatile_statfs));
+      }
+      void reset() {
+        *this = volatile_statfs();
+      }
+      int64_t& allocated() {
+        return values[STATFS_ALLOCATED];
+      }
+      int64_t& stored() {
+        return values[STATFS_STORED];
+      }
+      int64_t& compressed_original() {
+        return values[STATFS_COMPRESSED_ORIGINAL];
+      }
+      int64_t& compressed() {
+        return values[STATFS_COMPRESSED];
+      }
+      int64_t& compressed_allocated() {
+        return values[STATFS_COMPRESSED_ALLOCATED];
+      }
+      bool is_empty() {
+        return values[STATFS_ALLOCATED] == 0 &&
+          values[STATFS_STORED] == 0 &&
+          values[STATFS_COMPRESSED] == 0 &&
+          values[STATFS_COMPRESSED_ORIGINAL] == 0 &&
+          values[STATFS_COMPRESSED_ALLOCATED] == 0;
+      }
+      void decode(bufferlist::iterator& it) {
+        for (size_t i = 0; i < STATFS_LAST; i++) {
+          ::decode(values[i], it);
+        }
+      }
+
+      void encode(bufferlist& bl) {
+        for (size_t i = 0; i < STATFS_LAST; i++) {
+          //::encode(ceph_le64(values[i]), bl);
+          ::encode(values[i], bl);
+        }
+      }
+    } statfs_delta;
+
 
     IOContext ioc;
 
@@ -929,6 +981,7 @@ private:
   void _dump_bnode(BnodeRef b, int log_level=30);
 
   TransContext *_txc_create(OpSequencer *osr);
+  void _txc_update_store_statfs(TransContext *txc);
   void _txc_add_transaction(TransContext *txc, Transaction *t);
   void _txc_write_nodes(TransContext *txc, KeyValueDB::Transaction t);
   void _txc_state_proc(TransContext *txc);
index cf4938b6ff68b588963f8b424f174cec03d2e02f..c0220d22db1c768e7f53773b75a5f83092bd5999 100644 (file)
@@ -1091,6 +1091,210 @@ TEST_P(StoreTest, SimpleObjectTest) {
   }
 }
 
+TEST_P(StoreTest, BluestoreStatFSTest) {
+  if(string(GetParam()) != "bluestore")
+    return;
+  g_conf->set_val("bluestore_compression", "force");
+  g_ceph_context->_conf->apply_changes(NULL);
+
+  ObjectStore::Sequencer osr("test");
+  int r;
+  coll_t cid;
+  ghobject_t hoid(hobject_t(sobject_t("Object 1", CEPH_NOSNAP)));
+  {
+    bufferlist in;
+    r = store->read(cid, hoid, 0, 5, in);
+    ASSERT_EQ(-ENOENT, r);
+  }
+  {
+    ObjectStore::Transaction t;
+    t.create_collection(cid, 0);
+    cerr << "Creating collection " << cid << std::endl;
+    r = apply_transaction(store, &osr, std::move(t));
+    ASSERT_EQ(r, 0);
+  }
+  {
+    bool exists = store->exists(cid, hoid);
+    ASSERT_TRUE(!exists);
+
+    ObjectStore::Transaction t;
+    t.touch(cid, hoid);
+    cerr << "Creating object " << hoid << std::endl;
+    r = apply_transaction(store, &osr, std::move(t));
+    ASSERT_EQ(r, 0);
+
+    exists = store->exists(cid, hoid);
+    ASSERT_EQ(true, exists);
+  }
+  uint64_t available0 = 0;
+  {
+    struct store_statfs_t statfs;
+    int r = store->statfs(&statfs);
+    ASSERT_EQ(r, 0);
+    ASSERT_EQ( 0u, statfs.allocated);
+    ASSERT_EQ( 0u, statfs.stored);
+    ASSERT_EQ(0x1000u, statfs.bsize);
+    ASSERT_EQ(g_conf->bluestore_block_size / 0x1000, statfs.blocks);
+    ASSERT_TRUE(statfs.available > 0u && statfs.available < g_conf->bluestore_block_size);
+    available0 = statfs.available;
+  }
+  {
+    ObjectStore::Transaction t;
+    bufferlist bl;
+    bl.append("abcde");
+    t.write(cid, hoid, 0, 5, bl);
+    cerr << "Append 5 bytes" << std::endl;
+    r = apply_transaction(store, &osr, std::move(t));
+    ASSERT_EQ(r, 0);
+
+    struct store_statfs_t statfs;
+    int r = store->statfs(&statfs);
+    ASSERT_EQ(r, 0);
+    ASSERT_EQ(5, statfs.stored);
+    ASSERT_EQ(0x10000, statfs.allocated);
+    ASSERT_EQ(available0 - 0x10000, statfs.available);
+    ASSERT_EQ(0, statfs.compressed);
+    ASSERT_EQ(0, statfs.compressed_original);
+    ASSERT_EQ(0, statfs.compressed_allocated);
+  }
+  {
+    ObjectStore::Transaction t;
+    std::string s(0x30000, 'a');
+    bufferlist bl;
+    bl.append(s);
+    t.write(cid, hoid, 0x10000, bl.length(), bl);
+    cerr << "Append 0x30000 compressible bytes" << std::endl;
+    r = apply_transaction(store, &osr, std::move(t));
+    ASSERT_EQ(r, 0);
+
+    struct store_statfs_t statfs;
+    int r = store->statfs(&statfs);
+    ASSERT_EQ(r, 0);
+    ASSERT_EQ(0x30005, statfs.stored);
+    ASSERT_EQ(0x20000, statfs.allocated);
+    ASSERT_EQ(available0 - 0x20000, statfs.available);
+    ASSERT_LE(statfs.compressed, 0x10000);
+    ASSERT_EQ(0x30000, statfs.compressed_original);
+    ASSERT_EQ(statfs.compressed_allocated, 0x10000);
+  }
+  {
+    ObjectStore::Transaction t;
+    t.zero(cid, hoid, 1, 3);
+    t.zero(cid, hoid, 0x20000, 9);
+    cerr << "Punch hole at 1~3, 0x20000~9" << std::endl;
+    r = apply_transaction(store, &osr, std::move(t));
+    ASSERT_EQ(r, 0);
+
+    struct store_statfs_t statfs;
+    int r = store->statfs(&statfs);
+    ASSERT_EQ(r, 0);
+    ASSERT_EQ(0x30005 - 3 - 9, statfs.stored);
+    ASSERT_EQ(0x20000, statfs.allocated);
+    ASSERT_EQ(available0 - 0x20000, statfs.available);
+    ASSERT_LE(statfs.compressed, 0x10000);
+    ASSERT_EQ(0x30000, statfs.compressed_original);
+    ASSERT_EQ(statfs.compressed_allocated, 0x10000);
+  }
+  {
+    ObjectStore::Transaction t;
+    std::string s(0x1000, 'b');
+    bufferlist bl;
+    bl.append(s);
+    t.write(cid, hoid, 1, bl.length(), bl);
+    t.write(cid, hoid, 0x10001, bl.length(), bl);
+    cerr << "Overwrite first and second(compressible) extents" << std::endl;
+    r = apply_transaction(store, &osr, std::move(t));
+    ASSERT_EQ(r, 0);
+
+    struct store_statfs_t statfs;
+    int r = store->statfs(&statfs);
+    ASSERT_EQ(r, 0);
+    ASSERT_EQ(0x30001 - 9 + 0x1000, statfs.stored);
+    ASSERT_EQ(0x30000, statfs.allocated);
+    ASSERT_EQ(available0 - 0x30000, statfs.available);
+    ASSERT_LE(statfs.compressed, 0x10000);
+    ASSERT_EQ(0x30000, statfs.compressed_original);
+    ASSERT_EQ(statfs.compressed_allocated, 0x10000);
+  }
+  {
+    ObjectStore::Transaction t;
+    std::string s(0x10000, 'c');
+    bufferlist bl;
+    bl.append(s);
+    t.write(cid, hoid, 0x10000, bl.length(), bl);
+    t.write(cid, hoid, 0x20000, bl.length(), bl);
+    t.write(cid, hoid, 0x30000, bl.length(), bl);
+    cerr << "Overwrite compressed extent with 3 uncompressible ones" << std::endl;
+    r = apply_transaction(store, &osr, std::move(t));
+    ASSERT_EQ(r, 0);
+
+    struct store_statfs_t statfs;
+    int r = store->statfs(&statfs);
+    ASSERT_EQ(r, 0);
+    ASSERT_EQ(0x30000 + 0x1001, statfs.stored);
+    ASSERT_EQ(0x40000, statfs.allocated);
+    ASSERT_LE(statfs.compressed, 0);
+    ASSERT_EQ(0, statfs.compressed_original);
+    ASSERT_EQ(0, statfs.compressed_allocated);
+  }
+  {
+    ObjectStore::Transaction t;
+    t.zero(cid, hoid, 0, 0x40000);
+    cerr << "Zero object" << std::endl;
+    r = apply_transaction(store, &osr, std::move(t));
+    ASSERT_EQ(r, 0);
+    struct store_statfs_t statfs;
+    int r = store->statfs(&statfs);
+    ASSERT_EQ(r, 0);
+    ASSERT_EQ(0u, statfs.allocated);
+    ASSERT_EQ(0u, statfs.stored);
+    ASSERT_EQ(0u, statfs.compressed_original);
+    ASSERT_EQ(0u, statfs.compressed);
+    ASSERT_EQ(0u, statfs.compressed_allocated);
+  }
+  {
+    ObjectStore::Transaction t;
+    std::string s(0x10000, 'c');
+    bufferlist bl;
+    bl.append(s);
+    bl.append(s);
+    bl.append(s);
+    bl.append(s.substr(0, 0x10000-2));
+    t.write(cid, hoid, 0, bl.length(), bl);
+    cerr << "Yet another compressible write" << std::endl;
+    r = apply_transaction(store, &osr, std::move(t));
+    ASSERT_EQ(r, 0);
+    struct store_statfs_t statfs;
+    r = store->statfs(&statfs);
+    ASSERT_EQ(r, 0);
+    ASSERT_EQ(0x40000 - 2, statfs.stored);
+    ASSERT_EQ(0x20000, statfs.allocated);
+    ASSERT_LE(statfs.compressed, 0x10000);
+    ASSERT_EQ(0x30000, statfs.compressed_original);
+    ASSERT_EQ(0x10000, statfs.compressed_allocated);
+  }
+
+  {
+    ObjectStore::Transaction t;
+    t.remove(cid, hoid);
+    t.remove_collection(cid);
+    cerr << "Cleaning" << std::endl;
+    r = apply_transaction(store, &osr, std::move(t));
+    ASSERT_EQ(r, 0);
+
+    struct store_statfs_t statfs;
+    r = store->statfs(&statfs);
+    ASSERT_EQ(r, 0);
+    ASSERT_EQ( 0u, statfs.allocated);
+    ASSERT_EQ( 0u, statfs.stored);
+    ASSERT_EQ( 0u, statfs.compressed_original);
+    ASSERT_EQ( 0u, statfs.compressed);
+    ASSERT_EQ( 0u, statfs.compressed_allocated);
+  }
+  g_conf->set_val("bluestore_compression", "none");
+  g_ceph_context->_conf->apply_changes(NULL);
+}
+
 TEST_P(StoreTest, ManySmallWrite) {
   ObjectStore::Sequencer osr("test");
   int r;