]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
kv/RocksDBStore: fix default CF handling 18049/head
authorSage Weil <sage@redhat.com>
Thu, 5 Oct 2017 13:31:04 +0000 (08:31 -0500)
committerSage Weil <sage@redhat.com>
Thu, 5 Oct 2017 19:35:52 +0000 (14:35 -0500)
We need to close the default CF handle *only* if we explicitly opened it.

Also, take care to use that handle wherever we can.  This is mostly
paranoia--I'm not sure if rocksdb cares if you mix the handle you opened
with DefaultColumnFamily(), which is different.

Signed-off-by: Sage Weil <sage@redhat.com>
src/kv/RocksDBStore.cc
src/kv/RocksDBStore.h

index d4d9c45845bd1045e37cbd1bbd1721bcccd8cc0b..49f903dbbda2c13df9b54e34286fd4db16812e41 100644 (file)
@@ -476,6 +476,7 @@ int RocksDBStore::do_open(ostream &out, bool create_if_missing,
        add_column_family(p.name, static_cast<void*>(cf));
       }
     }
+    default_cf = db->DefaultColumnFamily();
   } else {
     std::vector<string> existing_cfs;
     status = rocksdb::DB::ListColumnFamilies(
@@ -490,6 +491,7 @@ int RocksDBStore::do_open(ostream &out, bool create_if_missing,
        derr << status.ToString() << dendl;
        return -EINVAL;
       }
+      default_cf = db->DefaultColumnFamily();
     } else {
       // we cannot change column families for a created database.  so, map
       // what options we are given to whatever cf's already exist.
@@ -530,12 +532,16 @@ int RocksDBStore::do_open(ostream &out, bool create_if_missing,
        return -EINVAL;
       }
       for (unsigned i = 0; i < existing_cfs.size(); ++i) {
-       if (existing_cfs[i] != rocksdb::kDefaultColumnFamilyName) {
+       if (existing_cfs[i] == rocksdb::kDefaultColumnFamilyName) {
+         default_cf = handles[i];
+         must_close_default_cf = true;
+       } else {
          add_column_family(existing_cfs[i], static_cast<void*>(handles[i]));
        }
       }
     }
   }
+  assert(default_cf != nullptr);
   
   PerfCountersBuilder plb(g_ceph_context, "rocksdb", l_rocksdb_first, l_rocksdb_last);
   plb.add_u64_counter(l_rocksdb_gets, "get", "Gets");
@@ -586,6 +592,11 @@ RocksDBStore::~RocksDBStore()
       static_cast<rocksdb::ColumnFamilyHandle*>(p.second));
     p.second = nullptr;
   }
+  if (must_close_default_cf) {
+    db->DestroyColumnFamilyHandle(default_cf);
+    must_close_default_cf = false;
+  }
+  default_cf = nullptr;
   delete db;
   db = nullptr;
 
@@ -785,7 +796,7 @@ void RocksDBStore::RocksDBTransactionImpl::set(
     put_bat(bat, cf, k, to_set_bl);
   } else {
     string key = combine_strings(prefix, k);
-    put_bat(bat, db->db->DefaultColumnFamily(), key, to_set_bl);
+    put_bat(bat, db->default_cf, key, to_set_bl);
   }
 }
 
@@ -812,7 +823,7 @@ void RocksDBStore::RocksDBTransactionImpl::rmkey(const string &prefix,
   if (cf) {
     bat.Delete(cf, rocksdb::Slice(k));
   } else {
-    bat.Delete(combine_strings(prefix, k));
+    bat.Delete(db->default_cf, combine_strings(prefix, k));
   }
 }
 
@@ -826,7 +837,7 @@ void RocksDBStore::RocksDBTransactionImpl::rmkey(const string &prefix,
   } else {
     string key;
     combine_strings(prefix, k, keylen, &key);
-    bat.Delete(rocksdb::Slice(key));
+    bat.Delete(db->default_cf, rocksdb::Slice(key));
   }
 }
 
@@ -837,7 +848,7 @@ void RocksDBStore::RocksDBTransactionImpl::rm_single_key(const string &prefix,
   if (cf) {
     bat.SingleDelete(cf, k);
   } else {
-    bat.SingleDelete(combine_strings(prefix, k));
+    bat.SingleDelete(db->default_cf, combine_strings(prefix, k));
   }
 }
 
@@ -860,14 +871,15 @@ void RocksDBStore::RocksDBTransactionImpl::rmkeys_by_prefix(const string &prefix
     if (db->enable_rmrange) {
       string endprefix = prefix;
       endprefix.push_back('\x01');
-      bat.DeleteRange(combine_strings(prefix, string()),
+      bat.DeleteRange(db->default_cf,
+                     combine_strings(prefix, string()),
                      combine_strings(endprefix, string()));
     } else {
       auto it = db->get_iterator(prefix);
       for (it->seek_to_first();
           it->valid();
           it->next()) {
-       bat.Delete(combine_strings(prefix, it->key()));
+       bat.Delete(db->default_cf, combine_strings(prefix, it->key()));
       }
     }
   }
@@ -895,7 +907,7 @@ void RocksDBStore::RocksDBTransactionImpl::rm_range_keys(const string &prefix,
   } else {
     if (db->enable_rmrange) {
       bat.DeleteRange(
-       db->db->DefaultColumnFamily(),
+       db->default_cf,
        rocksdb::Slice(combine_strings(prefix, start)),
        rocksdb::Slice(combine_strings(prefix, end)));
     } else {
@@ -905,7 +917,8 @@ void RocksDBStore::RocksDBTransactionImpl::rm_range_keys(const string &prefix,
        if (it->key() >= end) {
          break;
        }
-       bat.Delete(combine_strings(prefix, it->key()));
+       bat.Delete(db->default_cf,
+                  combine_strings(prefix, it->key()));
        it->next();
       }
     }
@@ -937,7 +950,7 @@ void RocksDBStore::RocksDBTransactionImpl::merge(
     // bufferlist::c_str() is non-constant, so we can't call c_str()
     if (to_set_bl.is_contiguous() && to_set_bl.length() > 0) {
       bat.Merge(
-       db->db->DefaultColumnFamily(),
+       db->default_cf,
        rocksdb::Slice(key),
        rocksdb::Slice(to_set_bl.buffers().front().c_str(), to_set_bl.length()));
     } else {
@@ -945,7 +958,7 @@ void RocksDBStore::RocksDBTransactionImpl::merge(
       rocksdb::Slice key_slice(key);
       vector<rocksdb::Slice> value_slices(to_set_bl.buffers().size());
       bat.Merge(
-       db->db->DefaultColumnFamily(),
+       db->default_cf,
        rocksdb::SliceParts(&key_slice, 1),
        prepare_sliceparts(to_set_bl, &value_slices));
     }
@@ -977,7 +990,7 @@ int RocksDBStore::get(
       std::string value;
       string k = combine_strings(prefix, key);
       auto status = db->Get(rocksdb::ReadOptions(),
-                           db->DefaultColumnFamily(),
+                           default_cf,
                            rocksdb::Slice(k),
                            &value);
       if (status.ok()) {
@@ -1012,7 +1025,7 @@ int RocksDBStore::get(
   } else {
     string k = combine_strings(prefix, key);
     s = db->Get(rocksdb::ReadOptions(),
-               db->DefaultColumnFamily(),
+               default_cf,
                rocksdb::Slice(k),
                &value);
   }
@@ -1050,7 +1063,7 @@ int RocksDBStore::get(
     string k;
     combine_strings(prefix, key, keylen, &k);
     s = db->Get(rocksdb::ReadOptions(),
-               db->DefaultColumnFamily(),
+               default_cf,
                rocksdb::Slice(k),
                &value);
   }
@@ -1091,7 +1104,13 @@ void RocksDBStore::compact()
 {
   logger->inc(l_rocksdb_compact);
   rocksdb::CompactRangeOptions options;
-  db->CompactRange(options, nullptr, nullptr);
+  db->CompactRange(options, default_cf, nullptr, nullptr);
+  for (auto cf : cf_handles) {
+    db->CompactRange(
+      options,
+      static_cast<rocksdb::ColumnFamilyHandle*>(cf.second),
+      nullptr, nullptr);
+  }
 }
 
 
@@ -1303,7 +1322,7 @@ string RocksDBStore::past_prefix(const string &prefix)
 RocksDBStore::WholeSpaceIterator RocksDBStore::get_wholespace_iterator()
 {
   return std::make_shared<RocksDBWholeSpaceIteratorImpl>(
-        db->NewIterator(rocksdb::ReadOptions()));
+    db->NewIterator(rocksdb::ReadOptions(), default_cf));
 }
 
 class CFIteratorImpl : public KeyValueDB::IteratorImpl {
index 031f8a6d2147b038a6ac19fde6c6d111a9fbd33d..a552fef0178190a5122a3f6fa80d9bdf998885f7 100644 (file)
@@ -81,6 +81,9 @@ class RocksDBStore : public KeyValueDB {
   uint64_t cache_size = 0;
   bool set_cache_flag = false;
 
+  bool must_close_default_cf = false;
+  rocksdb::ColumnFamilyHandle *default_cf = nullptr;
+
   int submit_common(rocksdb::WriteOptions& woptions, KeyValueDB::Transaction t);
   int install_cf_mergeop(const string &cf_name, rocksdb::ColumnFamilyOptions *cf_opt);
   int create_db_dir();