]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
kv/RocksDBStore: fix repair.
authorAdam Kupczyk <akupczyk@redhat.com>
Mon, 10 Feb 2020 15:25:44 +0000 (16:25 +0100)
committerAdam Kupczyk <akupczyk@redhat.com>
Fri, 17 Apr 2020 06:55:54 +0000 (08:55 +0200)
Now repair recovers sharding definition files.
If some column families were deleted, they are recreated on next open.

Signed-off-by: Adam Kupczyk <akupczyk@redhat.com>
src/kv/RocksDBStore.cc
src/kv/RocksDBStore.h

index 01aab1bdf33eff14eaf101d0aa02fd1fb8f5e1f0..caa1926b9f9ef591c3b94a3b1a4fcf9d6b2ad352 100644 (file)
@@ -49,7 +49,9 @@ using ceph::bufferlist;
 using ceph::bufferptr;
 using ceph::Formatter;
 
+static const char* sharding_def_dir = "sharding";
 static const char* sharding_def_file = "sharding/def";
+static const char* sharding_recreate = "sharding/recreate_columns";
 
 static bufferlist to_bufferlist(rocksdb::Slice in) {
   bufferlist bl;
@@ -570,8 +572,8 @@ bool RocksDBStore::parse_sharding_def(const std::string_view text_def_in,
   std::string error_msg_local;
   if (error_position == nullptr) {
     error_position = &error_position_local;
-    *error_position = nullptr;
   }
+  *error_position = nullptr;
   if (error_msg == nullptr) {
     error_msg = &error_msg_local;
     error_msg->clear();
@@ -663,7 +665,7 @@ void RocksDBStore::sharding_def_to_columns(const std::vector<ColumnFamily>& shar
 }
 
 int RocksDBStore::create_shards(const rocksdb::Options& opt,
-                               const vector<ColumnFamily>& sharding_def)
+                               const std::vector<ColumnFamily>& sharding_def)
 {
   for (auto& p : sharding_def) {
     // copy default CF settings, block cache, merge operators as
@@ -699,6 +701,147 @@ int RocksDBStore::create_shards(const rocksdb::Options& opt,
   return 0;
 }
 
+int RocksDBStore::apply_sharding(const rocksdb::Options& opt,
+                                const std::string& sharding_text)
+{
+  // create and open column families
+  if (!sharding_text.empty()) {
+    bool b;
+    int r;
+    rocksdb::Status status;
+    std::vector<ColumnFamily> sharding_def;
+    char const* error_position;
+    std::string error_msg;
+    b = parse_sharding_def(sharding_text, sharding_def, &error_position, &error_msg);
+    if (!b) {
+      dout(1) << __func__ << " bad sharding: " << dendl;
+      dout(1) << __func__ << sharding_text << dendl;
+      dout(1) << __func__ << std::string(error_position - &sharding_text[0], ' ') << "^" << error_msg << dendl;
+      return -EINVAL;
+    }
+    r = create_shards(opt, sharding_def);
+    if (r != 0 ) {
+      return r;
+    }
+    opt.env->CreateDir(sharding_def_dir);
+    status = rocksdb::WriteStringToFile(opt.env, sharding_text,
+                                       sharding_def_file, true);
+    if (!status.ok()) {
+      derr << __func__ << " cannot write to " << sharding_def_file << dendl;
+      return -EIO;
+    }
+  } else {
+    opt.env->DeleteFile(sharding_def_file);
+  }
+  return 0;
+}
+
+int RocksDBStore::verify_sharding(const rocksdb::Options& opt,
+                                 const std::string& sharding_text,
+                                 std::vector<rocksdb::ColumnFamilyDescriptor>& existing_cfs,
+                                 std::vector<std::pair<size_t, RocksDBStore::ColumnFamily> >& existing_cfs_shard,
+                                 std::vector<rocksdb::ColumnFamilyDescriptor>& missing_cfs,
+                                 std::vector<std::pair<size_t, RocksDBStore::ColumnFamily> >& missing_cfs_shard)
+{
+  rocksdb::Status status;
+  std::string stored_sharding_text;
+  status = opt.env->FileExists(sharding_def_file);
+  if (status.ok()) {
+    status = rocksdb::ReadFileToString(opt.env,
+                                      sharding_def_file,
+                                      &stored_sharding_text);
+    if(!status.ok()) {
+      derr << __func__ << " cannot read from " << sharding_def_file << dendl;
+      return -EIO;
+    }
+  } else {
+    //no "sharding_def" present
+  }
+  //check if sharding_def matches stored_sharding_def
+  std::vector<ColumnFamily> sharding_def;
+  std::vector<ColumnFamily> stored_sharding_def;
+  parse_sharding_def(sharding_text, sharding_def);
+  parse_sharding_def(stored_sharding_text, stored_sharding_def);
+
+  std::sort(sharding_def.begin(), sharding_def.end(),
+           [](ColumnFamily& a, ColumnFamily& b) { return a.name < b.name; } );
+  std::sort(stored_sharding_def.begin(), stored_sharding_def.end(),
+           [](ColumnFamily& a, ColumnFamily& b) { return a.name < b.name; } );
+
+  bool match = true;
+  if (sharding_def.size() != stored_sharding_def.size()) {
+    match = false;
+  } else {
+    for (size_t i = 0; i < sharding_def.size(); i++) {
+      auto& a = sharding_def[i];
+      auto& b = stored_sharding_def[i];
+      if ( (a.name != b.name) ||
+          (a.shard_cnt != b.shard_cnt) ||
+          (a.hash_l != b.hash_l) ||
+          (a.hash_h != b.hash_h) ) {
+       match = false;
+       break;
+      }
+    }
+  }
+  if (!match) {
+    derr << __func__ << " mismatch on sharding. requested = " << sharding_def
+        << " stored = " << stored_sharding_def << dendl;
+    return -EIO;
+  }
+  std::vector<string> rocksdb_cfs;
+  status = rocksdb::DB::ListColumnFamilies(rocksdb::DBOptions(opt),
+                                          path, &rocksdb_cfs);
+  if (!status.ok()) {
+    return -EIO;
+  }
+  dout(5) << __func__ << " column families from rocksdb: " << rocksdb_cfs << dendl;
+
+  auto emplace_cf = [&] (const RocksDBStore::ColumnFamily& column,
+                        int32_t shard_id,
+                        const std::string& shard_name,
+                        const rocksdb::ColumnFamilyOptions& opt) {
+    if (std::find(rocksdb_cfs.begin(), rocksdb_cfs.end(), shard_name) != rocksdb_cfs.end()) {
+      existing_cfs.emplace_back(shard_name, opt);
+      existing_cfs_shard.emplace_back(shard_id, column);
+    } else {
+      missing_cfs.emplace_back(shard_name, opt);
+      missing_cfs_shard.emplace_back(shard_id, column);
+    }
+  };
+
+  for (auto& column : stored_sharding_def) {
+    rocksdb::ColumnFamilyOptions cf_opt(opt);
+    status = rocksdb::GetColumnFamilyOptionsFromString(
+                                                      cf_opt, column.options, &cf_opt);
+    if (!status.ok()) {
+      derr << __func__ << " invalid db column family options for CF '"
+          << column.name << "': " << column.options << dendl;
+      return -EINVAL;
+    }
+    install_cf_mergeop(column.name, &cf_opt);
+
+    if (column.shard_cnt == 1) {
+      emplace_cf(column, 0, column.name, cf_opt);
+    } else {
+      for (size_t i = 0; i < column.shard_cnt; i++) {
+       std::string cf_name = column.name + "-" + to_string(i);
+       emplace_cf(column, i, cf_name, cf_opt);
+      }
+    }
+  }
+  existing_cfs.emplace_back("default", opt);
+
+ if (existing_cfs.size() != rocksdb_cfs.size()) {
+   std::vector<std::string> columns_from_stored;
+   sharding_def_to_columns(stored_sharding_def, columns_from_stored);
+   derr << __func__ << " extra columns in rocksdb. rocksdb columns = " << rocksdb_cfs
+       << " target columns = " << columns_from_stored << dendl;
+   return -EIO;
+ }
+  return 0;
+}
+
 std::ostream& operator<<(std::ostream& out, const RocksDBStore::ColumnFamily& cf)
 {
   out << "(";
@@ -736,100 +879,35 @@ int RocksDBStore::do_open(ostream &out,
       derr << status.ToString() << dendl;
       return -EINVAL;
     }
-    // create and open column families
-    if (!sharding_text.empty()) {
-      bool b;
-      std::vector<ColumnFamily> sharding_def;
-      b = parse_sharding_def(sharding_text, sharding_def);
-      if (!b) {
-       dout(1) << __func__ << " bad sharding: " << sharding_text << dendl;
-       return -EINVAL;
-      }
-      r = create_shards(opt, sharding_def);
-      if (r != 0 ) {
-       return r;
-      }
-      opt.env->CreateDir("sharding");
-      status = rocksdb::WriteStringToFile(opt.env, sharding_text,
-                                         sharding_def_file, true);
-      if (!status.ok()) {
-       derr << __func__ << " cannot write to " << sharding_def_file << dendl;
-       return -EIO;
-      }
-    } else {
-      status = opt.env->DeleteFile(sharding_def_file);
+    r = apply_sharding(opt, sharding_text);
+    if (r < 0) {
+      return r;
     }
     default_cf = db->DefaultColumnFamily();
   } else {
-    std::string stored_sharding_text;
-    status = opt.env->FileExists(sharding_def_file);
-    if (status.ok()) {
-      status = rocksdb::ReadFileToString(opt.env,
-                                        sharding_def_file,
-                                        &stored_sharding_text);
-      if(!status.ok()) {
-       derr << __func__ << " cannot read from " << sharding_def_file << dendl;
-       return -EIO;
-      }
-    } else {
-      //no "sharding_def" present
-    }
-    //check if sharding_def matches stored_sharding_def
-    std::vector<ColumnFamily> sharding_def;
-    parse_sharding_def(sharding_text, sharding_def);
-    std::vector<ColumnFamily> stored_sharding_def;
-    parse_sharding_def(stored_sharding_text, stored_sharding_def);
-
-    std::sort(sharding_def.begin(), sharding_def.end(),
-             [](ColumnFamily& a, ColumnFamily&b) {return a.name < b.name;});
-    std::sort(stored_sharding_def.begin(), stored_sharding_def.end(),
-             [](ColumnFamily& a, ColumnFamily&b) {return a.name < b.name;});
-
-    bool match = true;
-    if (sharding_def.size() != stored_sharding_def.size()) {
-      match = false;
-    } else {
-      for (size_t i = 0; i < sharding_def.size(); i++) {
-       auto& a = sharding_def[i];
-       auto& b = stored_sharding_def[i];
-       if ( (a.name != b.name) ||
-            (a.shard_cnt != b.shard_cnt) ||
-            (a.hash_l != b.hash_l) ||
-            (a.hash_h != b.hash_h) ) {
-         match = false;
-         break;
-       }
-      }
-    }
-    if (!match) {
-      derr << __func__ << " mismatch on sharding. requested = " << sharding_def
-          << " stored = " << stored_sharding_def << dendl;
-      return -EIO;
+    std::vector<rocksdb::ColumnFamilyDescriptor> existing_cfs;
+    std::vector<std::pair<size_t, RocksDBStore::ColumnFamily> > existing_cfs_shard;
+    std::vector<rocksdb::ColumnFamilyDescriptor> missing_cfs;
+    std::vector<std::pair<size_t, RocksDBStore::ColumnFamily> > missing_cfs_shard;
+
+    r = verify_sharding(opt, sharding_text,
+                       existing_cfs, existing_cfs_shard,
+                       missing_cfs, missing_cfs_shard);
+    if (r < 0) {
+      return r;
     }
+    std::string sharding_recreate_text;
+    status = rocksdb::ReadFileToString(opt.env,
+                                      sharding_recreate,
+                                      &sharding_recreate_text);
+    bool recreate_mode = status.ok() && sharding_recreate_text == "1";
 
-    std::vector<string> existing_cfs;
-    status = rocksdb::DB::ListColumnFamilies(
-      rocksdb::DBOptions(opt),
-      path,
-      &existing_cfs);
-    dout(5) << __func__ << " column families from rocksdb: " << existing_cfs << dendl;
-
-    std::sort(existing_cfs.begin(), existing_cfs.end(),
-             [&](const std::string& a, const std::string&b) {
-               return a < b;}
-             );
-
-    std::vector<std::string> columns_from_stored;
-    sharding_def_to_columns(stored_sharding_def, columns_from_stored);
-    columns_from_stored.push_back(rocksdb::kDefaultColumnFamilyName);
-    std::sort(columns_from_stored.begin(), columns_from_stored.end());
-
-    if (existing_cfs != columns_from_stored) {
-      derr << __func__ << " mismatch on sharding. rocksdb columns = " << existing_cfs
-          << " stored columns = " << columns_from_stored << dendl;
+    if (recreate_mode == false && missing_cfs.size() != 0) {
+      derr << __func__ << " missing column families: " << missing_cfs_shard << dendl;
+      return -EIO;
     }
 
-    if (columns_from_stored.empty()) {
+    if (existing_cfs.empty()) {
       // no column families
       if (open_readonly) {
        status = rocksdb::DB::Open(opt, path, &db);
@@ -842,51 +920,52 @@ int RocksDBStore::do_open(ostream &out,
       }
       default_cf = db->DefaultColumnFamily();
     } else {
-      std::vector<rocksdb::ColumnFamilyDescriptor> column_families;
-      for (auto& column : stored_sharding_def) {
-       rocksdb::ColumnFamilyOptions cf_opt(opt);
-       status = rocksdb::GetColumnFamilyOptionsFromString(
-         cf_opt, column.options, &cf_opt);
-       if (!status.ok()) {
-         derr << __func__ << " invalid db column family options for CF '"
-              << column.name << "': " << column.options << dendl;
-         return -EINVAL;
-       }
-       install_cf_mergeop(column.name, &cf_opt);
-       if (column.shard_cnt == 1) {
-         column_families.push_back(rocksdb::ColumnFamilyDescriptor(column.name, cf_opt));
-       } else {
-         for (size_t i = 0; i < column.shard_cnt; i++) {
-           std::string cf_name = column.name + "-" + to_string(i);
-           column_families.push_back(rocksdb::ColumnFamilyDescriptor(cf_name, cf_opt));
-         }
-       }
-      }
-      column_families.push_back(rocksdb::ColumnFamilyDescriptor("default",opt));
       std::vector<rocksdb::ColumnFamilyHandle*> handles;
       if (open_readonly) {
         status = rocksdb::DB::OpenForReadOnly(rocksdb::DBOptions(opt),
-                                             path, column_families,
+                                             path, existing_cfs,
                                              &handles, &db);
       } else {
         status = rocksdb::DB::Open(rocksdb::DBOptions(opt),
-                                  path, column_families, &handles, &db);
+                                  path, existing_cfs, &handles, &db);
       }
       if (!status.ok()) {
        derr << status.ToString() << dendl;
        return -EINVAL;
       }
+      ceph_assert(existing_cfs.size() == existing_cfs_shard.size() + 1);
+      ceph_assert(handles.size() == existing_cfs.size());
+      dout(10) << __func__ << " existing_cfs=" << existing_cfs.size() << dendl;
+      for (size_t i = 0; i < existing_cfs_shard.size(); i++) {
+       add_column_family(existing_cfs_shard[i].second.name,
+                         existing_cfs_shard[i].second.hash_l,
+                         existing_cfs_shard[i].second.hash_h,
+                         existing_cfs_shard[i].first,
+                         handles[i]);
+      }
+      default_cf = handles[handles.size() - 1];
+      must_close_default_cf = true;
 
-      size_t pos = 0;
-      for (auto& column : stored_sharding_def) {
-       for (size_t i = 0; i < column.shard_cnt; i++) {
-         add_column_family(column.name, column.hash_l, column.hash_h, i, handles[pos]);
-         pos++;
+      if (missing_cfs.size() > 0) {
+       dout(10) << __func__ << " missing_cfs=" << missing_cfs.size() << dendl;
+       ceph_assert(recreate_mode);
+       ceph_assert(missing_cfs.size() == missing_cfs_shard.size());
+       for (size_t i = 0; i < missing_cfs.size(); i++) {
+         rocksdb::ColumnFamilyHandle *cf;
+         status = db->CreateColumnFamily(missing_cfs[i].options, missing_cfs[i].name, &cf);
+         if (!status.ok()) {
+           derr << __func__ << " Failed to create rocksdb column family: "
+                << missing_cfs[i].name << dendl;
+           return -EINVAL;
+         }
+         add_column_family(missing_cfs_shard[i].second.name,
+                           missing_cfs_shard[i].second.hash_l,
+                           missing_cfs_shard[i].second.hash_h,
+                           missing_cfs_shard[i].first,
+                           cf);
        }
+       opt.env->DeleteFile(sharding_recreate);
       }
-      ceph_assert(pos == handles.size() - 1);
-      default_cf = handles[pos];
-      must_close_default_cf = true;
     }
   }
   ceph_assert(default_cf != nullptr);
@@ -977,6 +1056,7 @@ void RocksDBStore::close()
 
 int RocksDBStore::repair(std::ostream &out)
 {
+  rocksdb::Status status;
   rocksdb::Options opt;
   int r = load_rocksdb_options(false, opt);
   if (r) {
@@ -984,12 +1064,42 @@ int RocksDBStore::repair(std::ostream &out)
     out << "load rocksdb options failed" << std::endl;
     return r;
   }
-  rocksdb::Status status = rocksdb::RepairDB(path, opt);
+  //need to save sharding definition, repairDB will delete files it does not know
+  std::string stored_sharding_text;
+  status = opt.env->FileExists(sharding_def_file);
   if (status.ok()) {
+    status = rocksdb::ReadFileToString(opt.env,
+                                      sharding_def_file,
+                                      &stored_sharding_text);
+    if (!status.ok()) {
+      stored_sharding_text.clear();
+    }
+  }
+  dout(10) << __func__ << " stored_sharding: " << stored_sharding_text << dendl;
+  status = rocksdb::RepairDB(path, opt);
+  bool repaired = status.ok();
+  if (!stored_sharding_text.empty()) {
+    //recreate markers even if repair failed
+    opt.env->CreateDir(sharding_def_dir);
+    status = rocksdb::WriteStringToFile(opt.env, stored_sharding_text,
+                                       sharding_def_file, true);
+    if (!status.ok()) {
+      derr << __func__ << " cannot write to " << sharding_def_file << dendl;
+      return -1;
+    }
+    status = rocksdb::WriteStringToFile(opt.env, "1",
+                                         sharding_recreate, true);
+    if (!status.ok()) {
+      derr << __func__ << " cannot write to " << sharding_recreate << dendl;
+      return -1;
+    }
+  }
+
+  if (repaired && status.ok()) {
     return 0;
   } else {
     out << "repair rocksdb failed : " << status.ToString() << std::endl;
-    return 1;
+    return -1;
   }
 }
 
index f78b23bc55a71829829f4e76af82516ec185d249..ca36bdb7df19822044b8980c0de947f8266135ed 100644 (file)
@@ -16,6 +16,7 @@
 #include "rocksdb/iostats_context.h"
 #include "rocksdb/statistics.h"
 #include "rocksdb/table.h"
+#include "rocksdb/db.h"
 #include "kv/rocksdb_cache/BinnedLRUCache.h"
 #include <errno.h>
 #include "common/errno.h"
@@ -141,6 +142,15 @@ private:
                                      std::vector<std::string>& columns);
   int create_shards(const rocksdb::Options& opt,
                    const vector<ColumnFamily>& sharding_def);
+  int apply_sharding(const rocksdb::Options& opt,
+                    const std::string& sharding_text);
+  int verify_sharding(const rocksdb::Options& opt,
+                     const std::string& sharding_text,
+                     std::vector<rocksdb::ColumnFamilyDescriptor>& existing_cfs,
+                     std::vector<std::pair<size_t, RocksDBStore::ColumnFamily> >& existing_cfs_shard,
+                     std::vector<rocksdb::ColumnFamilyDescriptor>& missing_cfs,
+                     std::vector<std::pair<size_t, RocksDBStore::ColumnFamily> >& missing_cfs_shard);
+
   // manage async compactions
   ceph::mutex compact_queue_lock =
     ceph::make_mutex("RocksDBStore::compact_thread_lock");