]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
kv/RocksDBStore: Fixes dump of batch commited to rocksdb
authorAdam Kupczyk <akupczyk@redhat.com>
Fri, 29 May 2020 10:40:33 +0000 (12:40 +0200)
committerAdam Kupczyk <akupczyk@redhat.com>
Fri, 29 May 2020 10:43:17 +0000 (12:43 +0200)
Adapts dumping of rocksdb batch for operations on column families.

Signed-off-by: Adam Kupczyk <akupczyk@redhat.com>
src/kv/RocksDBStore.cc
src/kv/RocksDBStore.h

index 6360536c2c4a74d76c557f877ee55211d42300d8..d5baacc1344c8183424fa275232d69388a6af75e 100644 (file)
@@ -516,6 +516,7 @@ void RocksDBStore::add_column_family(const std::string& cf_name, uint32_t hash_l
   if (column.handles.size() <= shard_idx)
     column.handles.resize(shard_idx + 1);
   column.handles[shard_idx] = handle;
+  cf_ids_to_prefix.emplace(handle->GetID(), cf_name);
 }
 
 bool RocksDBStore::is_column_family(const std::string& prefix) {
@@ -1209,6 +1210,71 @@ void RocksDBStore::get_statistics(Formatter *f)
   }
 }
 
+struct RocksDBStore::RocksWBHandler: public rocksdb::WriteBatch::Handler {
+  RocksWBHandler(const RocksDBStore& db) : db(db) {}
+  const RocksDBStore& db;
+  std::stringstream seen;
+  int num_seen = 0;
+
+  void dump(const char* op_name,
+           uint32_t column_family_id,
+           const rocksdb::Slice& key_in,
+           const rocksdb::Slice* value = nullptr) {
+    string prefix;
+    string key;
+    ssize_t size = value ? value->size() : -1;
+    seen << std::endl << op_name << "(";
+
+    if (column_family_id == 0) {
+      db.split_key(key_in, &prefix, &key);
+    } else {
+      auto it = db.cf_ids_to_prefix.find(column_family_id);
+      ceph_assert(it != db.cf_ids_to_prefix.end());
+      prefix = it->second;
+      key = key_in.ToString();
+    }
+    seen << " prefix = " << prefix;
+    seen << " key = " << pretty_binary_string(key);
+    if (size != -1)
+      seen << " value size = " << std::to_string(size);
+    seen << ")";
+    num_seen++;
+  }
+  void Put(const rocksdb::Slice& key,
+          const rocksdb::Slice& value) override {
+    dump("Put", 0, key, &value);
+  }
+  rocksdb::Status PutCF(uint32_t column_family_id, const rocksdb::Slice& key,
+                       const rocksdb::Slice& value) override {
+    dump("PutCF", column_family_id, key, &value);
+    return rocksdb::Status::OK();
+  }
+  void SingleDelete(const rocksdb::Slice& key) override {
+    dump("SingleDelete", 0, key);
+  }
+  rocksdb::Status SingleDeleteCF(uint32_t column_family_id, const rocksdb::Slice& key) override {
+    dump("SingleDeleteCF", column_family_id, key);
+    return rocksdb::Status::OK();
+  }
+  void Delete(const rocksdb::Slice& key) override {
+    dump("Delete", 0, key);
+  }
+  rocksdb::Status DeleteCF(uint32_t column_family_id, const rocksdb::Slice& key) override {
+    dump("DeleteCF", column_family_id, key);
+    return rocksdb::Status::OK();
+  }
+  void Merge(const rocksdb::Slice& key,
+            const rocksdb::Slice& value) override {
+    dump("Merge", 0, key, &value);
+  }
+  rocksdb::Status MergeCF(uint32_t column_family_id, const rocksdb::Slice& key,
+                         const rocksdb::Slice& value) override {
+    dump("MergeCF", column_family_id, key, &value);
+    return rocksdb::Status::OK();
+  }
+  bool Continue() override { return num_seen < 50; }
+};
+
 int RocksDBStore::submit_common(rocksdb::WriteOptions& woptions, KeyValueDB::Transaction t) 
 {
   // enable rocksdb breakdown
@@ -1222,16 +1288,16 @@ int RocksDBStore::submit_common(rocksdb::WriteOptions& woptions, KeyValueDB::Tra
     static_cast<RocksDBTransactionImpl *>(t.get());
   woptions.disableWAL = disableWAL;
   lgeneric_subdout(cct, rocksdb, 30) << __func__;
-  RocksWBHandler bat_txc;
+  RocksWBHandler bat_txc(*this);
   _t->bat.Iterate(&bat_txc);
-  *_dout << " Rocksdb transaction: " << bat_txc.seen << dendl;
+  *_dout << " Rocksdb transaction: " << bat_txc.seen.str() << dendl;
   
   rocksdb::Status s = db->Write(woptions, &_t->bat);
   if (!s.ok()) {
-    RocksWBHandler rocks_txc;
+    RocksWBHandler rocks_txc(*this);
     _t->bat.Iterate(&rocks_txc);
     derr << __func__ << " error: " << s.ToString() << " code = " << s.code()
-         << " Rocksdb transaction: " << rocks_txc.seen << dendl;
+         << " Rocksdb transaction: " << rocks_txc.seen.str() << dendl;
   }
 
   if (cct->_conf->rocksdb_perf) {
index f1117909e49cb97a392ba6bdd0b2f240e88ccd0a..4b7735224f745f27ba674c3e1f96b7d4990e3471 100644 (file)
@@ -116,6 +116,7 @@ private:
     std::vector<rocksdb::ColumnFamilyHandle *> handles;
   };
   std::unordered_map<std::string, prefix_shards> cf_handles;
+  std::unordered_map<uint32_t, std::string> cf_ids_to_prefix;
 
   void add_column_family(const std::string& cf_name, uint32_t hash_l, uint32_t hash_h,
                         size_t shard_idx, rocksdb::ColumnFamilyHandle *handle);
@@ -261,50 +262,7 @@ public:
 
   int64_t estimate_prefix_size(const std::string& prefix,
                               const std::string& key_prefix) override;
-
-  struct  RocksWBHandler: public rocksdb::WriteBatch::Handler {
-    std::string seen ;
-    int num_seen = 0;
-    void Put(const rocksdb::Slice& key,
-                    const rocksdb::Slice& value) override {
-      std::string prefix ((key.ToString()).substr(0,1));
-      std::string key_to_decode ((key.ToString()).substr(2, std::string::npos));
-      uint64_t size = (value.ToString()).size();
-      seen += "\nPut( Prefix = " + prefix + " key = " 
-            + pretty_binary_string(key_to_decode) 
-            + " Value size = " + std::to_string(size) + ")";
-      num_seen++;
-    }
-    void SingleDelete(const rocksdb::Slice& key) override {
-      std::string prefix ((key.ToString()).substr(0,1));
-      std::string key_to_decode ((key.ToString()).substr(2, std::string::npos));
-      seen += "\nSingleDelete(Prefix = "+ prefix + " Key = " 
-            + pretty_binary_string(key_to_decode) + ")";
-      num_seen++;
-    }
-    void Delete(const rocksdb::Slice& key) override {
-      std::string prefix ((key.ToString()).substr(0,1));
-      std::string key_to_decode ((key.ToString()).substr(2, std::string::npos));
-      seen += "\nDelete( Prefix = " + prefix + " key = " 
-            + pretty_binary_string(key_to_decode) + ")";
-
-      num_seen++;
-    }
-    void Merge(const rocksdb::Slice& key,
-                      const rocksdb::Slice& value) override {
-      std::string prefix ((key.ToString()).substr(0,1));
-      std::string key_to_decode ((key.ToString()).substr(2, std::string::npos));
-      uint64_t size = (value.ToString()).size();
-      seen += "\nMerge( Prefix = " + prefix + " key = " 
-            + pretty_binary_string(key_to_decode) + " Value size = " 
-            + std::to_string(size) + ")";
-
-      num_seen++;
-    }
-    bool Continue() override { return num_seen < 50; }
-
-  };
-
+  struct RocksWBHandler;
   class RocksDBTransactionImpl : public KeyValueDB::TransactionImpl {
   public:
     rocksdb::WriteBatch bat;