]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
kv/RocksDBStore: Add reshard ability to database.
authorAdam Kupczyk <akupczyk@redhat.com>
Thu, 9 Apr 2020 06:38:42 +0000 (08:38 +0200)
committerAdam Kupczyk <akupczyk@redhat.com>
Thu, 14 May 2020 16:09:58 +0000 (18:09 +0200)
Reshard ability allows to change current sharding schema, without change to data.

Signed-off-by: Adam Kupczyk <akupczyk@redhat.com>
src/kv/RocksDBStore.cc
src/kv/RocksDBStore.h
src/os/bluestore/BlueStore.cc
src/os/bluestore/BlueStore.h
src/os/bluestore/bluestore_tool.cc

index 62d7f93bad29fb172e31c7f8e0e038c9be3c0016..462939f002ec6a534eee2c5f57783f5e2a8bd7fa 100644 (file)
@@ -2666,3 +2666,377 @@ RocksDBStore::WholeSpaceIterator RocksDBStore::get_default_cf_iterator()
   return std::make_shared<RocksDBWholeSpaceIteratorImpl>(
     db->NewIterator(rocksdb::ReadOptions(), default_cf));
 }
+
+int RocksDBStore::prepare_for_reshard(const std::string& new_sharding,
+                                     std::vector<std::string>& to_process_columns,
+                                     std::vector<rocksdb::ColumnFamilyHandle*>& to_process_handles)
+{
+  //1. list existing columns
+  //2. apply merge operator to (main + columns) opts
+  //3. prepare std::vector<rocksdb::ColumnFamilyDescriptor> existing_cfs
+  //4. open db, acquire existing column handles
+  //5. calculate missing columns
+  //6. create missing columns
+  //7. construct cf_handles according to new sharding
+  //8. check is all cf_handles are filled
+
+  bool b;
+  std::vector<ColumnFamily> new_sharding_def;
+  char const* error_position;
+  std::string error_msg;
+  b = parse_sharding_def(new_sharding, new_sharding_def, &error_position, &error_msg);
+  if (!b) {
+    dout(1) << __func__ << " bad sharding: " << dendl;
+    dout(1) << __func__ << new_sharding << dendl;
+    dout(1) << __func__ << std::string(error_position - &new_sharding[0], ' ') << "^" << error_msg << dendl;
+    return -EINVAL;
+  }
+
+  //1. list existing columns
+
+  rocksdb::Status status;
+  std::vector<std::string> existing_columns;
+  rocksdb::Options opt;
+  int r = load_rocksdb_options(false, opt);
+  if (r) {
+    dout(1) << __func__ << " load rocksdb options failed" << dendl;
+    return r;
+  }
+  status = rocksdb::DB::ListColumnFamilies(rocksdb::DBOptions(opt), path, &existing_columns);
+  if (!status.ok()) {
+    derr << "Unable to list column families: " << status.ToString() << dendl;
+    return -EINVAL;
+  }
+  dout(5) << "existing columns = " << existing_columns << dendl;
+
+  //2. apply merge operator to (main + columns) opts
+  //3. prepare std::vector<rocksdb::ColumnFamilyDescriptor> cfs_to_open
+
+  std::vector<rocksdb::ColumnFamilyDescriptor> cfs_to_open;
+  for (const auto& full_name : existing_columns) {
+    //split col_name to <prefix>-<number>
+    std::string base_name;
+    size_t pos = full_name.find('-');
+    if (std::string::npos == pos)
+      base_name = full_name;
+    else
+      base_name = full_name.substr(0,pos);
+
+    rocksdb::ColumnFamilyOptions cf_opt(opt);
+    // search if we have options for this column
+    std::string options;
+    for (const auto& nsd : new_sharding_def) {
+      if (nsd.name == base_name) {
+       options = nsd.options;
+       break;
+      }
+    }
+    status = rocksdb::GetColumnFamilyOptionsFromString(cf_opt, options, &cf_opt);
+    if (!status.ok()) {
+      derr << __func__ << " failure parsing column options: " << options << dendl;
+      return -EINVAL;
+    }
+    if (base_name != rocksdb::kDefaultColumnFamilyName)
+      install_cf_mergeop(base_name, &cf_opt);
+    cfs_to_open.emplace_back(full_name, cf_opt);
+  }
+
+  //4. open db, acquire existing column handles
+  std::vector<rocksdb::ColumnFamilyHandle*> handles;
+  status = rocksdb::DB::Open(rocksdb::DBOptions(opt),
+                            path, cfs_to_open, &handles, &db);
+  if (!status.ok()) {
+    derr << status.ToString() << dendl;
+    return -EINVAL;
+  }
+  for (size_t i = 0; i < cfs_to_open.size(); i++) {
+    dout(10) << "column " << cfs_to_open[i].name << " handle " << (void*)handles[i] << dendl;
+  }
+
+  //5. calculate missing columns
+  std::vector<std::string> new_sharding_columns;
+  std::vector<std::string> missing_columns;
+  sharding_def_to_columns(new_sharding_def,
+                         new_sharding_columns);
+  dout(5) << "target columns = " << new_sharding_columns << dendl;
+  for (const auto& n : new_sharding_columns) {
+    bool found = false;
+    for (const auto& e : existing_columns) {
+      if (n == e) {
+       found = true;
+       break;
+      }
+    }
+    if (!found) {
+      missing_columns.push_back(n);
+    }
+  }
+  dout(5) << "missing columns = " << missing_columns << dendl;
+
+  //6. create missing columns
+  for (const auto& full_name : missing_columns) {
+    std::string base_name;
+    size_t pos = full_name.find('-');
+    if (std::string::npos == pos)
+      base_name = full_name;
+    else
+      base_name = full_name.substr(0,pos);
+
+    rocksdb::ColumnFamilyOptions cf_opt(opt);
+    // search if we have options for this column
+    std::string options;
+    for (const auto& nsd : new_sharding_def) {
+      if (nsd.name == base_name) {
+       options = nsd.options;
+       break;
+      }
+    }
+    status = rocksdb::GetColumnFamilyOptionsFromString(cf_opt, options, &cf_opt);
+    if (!status.ok()) {
+      derr << __func__ << " failure parsing column options: " << options << dendl;
+      return -EINVAL;
+    }
+    install_cf_mergeop(base_name, &cf_opt);
+    rocksdb::ColumnFamilyHandle *cf;
+    status = db->CreateColumnFamily(cf_opt, full_name, &cf);
+    if (!status.ok()) {
+      derr << __func__ << " Failed to create rocksdb column family: "
+          << full_name << dendl;
+      return -EINVAL;
+    }
+    dout(10) << "created column " << full_name << " handle = " << (void*)cf << dendl; 
+    existing_columns.push_back(full_name);
+    handles.push_back(cf);
+  }
+
+  //7. construct cf_handles according to new sharding
+  for (size_t i = 0; i < existing_columns.size(); i++) {
+    std::string full_name = existing_columns[i];
+    rocksdb::ColumnFamilyHandle *cf = handles[i];
+    std::string base_name;
+    size_t shard_idx = 0;
+    size_t pos = full_name.find('-');
+    dout(10) << "processing column " << full_name << dendl;
+    if (std::string::npos == pos) {
+      base_name = full_name;
+    } else {
+      base_name = full_name.substr(0,pos);
+      shard_idx = atoi(full_name.substr(pos+1).c_str());
+    }
+    if (rocksdb::kDefaultColumnFamilyName == base_name) {
+      default_cf = handles[i];
+      must_close_default_cf = true;
+    } else {
+      for (const auto& nsd : new_sharding_def) {
+       if (nsd.name == base_name) {
+         if (shard_idx < nsd.shard_cnt) {
+           add_column_family(base_name, nsd.hash_l, nsd.hash_h, shard_idx, cf);
+         } else {
+           //ignore columns with index larger then shard count
+         }
+         break;
+       }
+      }
+    }
+  }
+
+  //8. check if all cf_handles are filled
+  for (const auto& col : cf_handles) {
+    for (size_t i = 0; i < col.second.handles.size(); i++) {
+      if (col.second.handles[i] == nullptr) {
+       derr << "missing handle for column " << col.first << " shard " << i << dendl;
+       return -EIO;
+      }
+    }
+  }
+  to_process_columns = existing_columns;
+  to_process_handles = handles;
+  return 0;
+}
+
+int RocksDBStore::reshard_cleanup(const std::vector<std::string>& current_columns,
+                                 const std::vector<rocksdb::ColumnFamilyHandle*>& current_handles)
+{
+  std::vector<std::string> new_sharding_columns;
+  for (const auto& col: cf_handles) {
+    if (col.second.handles.size() == 1) {
+      new_sharding_columns.push_back(col.first);
+    } else {
+      for (size_t i = 0; i < col.second.handles.size(); i++) {
+       new_sharding_columns.push_back(col.first + "-" + to_string(i));
+      }
+    }
+  }
+
+  for (size_t i = 0; i < current_columns.size(); i++) {
+    bool found = false;
+    for (size_t j = 0; j < new_sharding_columns.size(); j++) {
+      if (current_columns[i] == new_sharding_columns[j]) {
+       found = true;
+       break;
+      }
+    }
+    if (found || current_columns[i] == rocksdb::kDefaultColumnFamilyName) {
+      dout(5) << "Column " << current_columns[i] << " is part of new sharding." << dendl;
+      continue;
+    }
+    dout(5) << "Column " << current_columns[i] << " not part of new sharding. Deleting." << dendl;
+
+    // verify that column is empty
+    rocksdb::Iterator* it;
+    it = db->NewIterator(rocksdb::ReadOptions(), current_handles[i]);
+    ceph_assert(it);
+    it->SeekToFirst();
+    ceph_assert(!it->Valid());
+    delete it;
+    rocksdb::Status status;
+    status = db->DropColumnFamily(current_handles[i]);
+    if (!status.ok()) {
+      derr << __func__ << " Failed to delete column: "
+          << current_columns[i] << dendl;
+      return -EINVAL;
+    }
+  }
+  return 0;
+}
+
+int RocksDBStore::reshard(const std::string& new_sharding)
+{
+  rocksdb::Status status;
+  int r;
+  std::vector<std::string> to_process_columns;
+  std::vector<rocksdb::ColumnFamilyHandle*> to_process_handles;
+
+  size_t bytes_in_batch = 0;
+  size_t keys_in_batch = 0;
+  size_t bytes_per_iterator = 0;
+  size_t keys_per_iterator = 0;
+
+  rocksdb::WriteBatch* bat = nullptr;
+
+  auto flush_batch = [&]() {
+    dout(10) << "flushing batch" << dendl;
+    rocksdb::WriteOptions woptions;
+    woptions.sync = true;
+    rocksdb::Status s = db->Write(woptions, bat);
+    ceph_assert(s.ok());
+    dout(25) << "processed " << keys_in_batch << " keys, for " << bytes_in_batch << " bytes" << dendl;
+    bytes_in_batch = 0;
+    keys_in_batch = 0;
+    delete bat;
+    bat = new rocksdb::WriteBatch();
+    ceph_assert(bat);
+  };
+
+  auto process_column = [&](rocksdb::ColumnFamilyHandle* handle,
+                           const std::string& fixed_prefix)
+  {
+    dout(10) << " column=" << (void*)handle << " prefix=" << fixed_prefix << dendl;
+    rocksdb::Iterator* it;
+    it = db->NewIterator(rocksdb::ReadOptions(), handle);
+    ceph_assert(it);
+    bat = new rocksdb::WriteBatch();
+    ceph_assert(bat);
+
+    for (it->SeekToFirst(); it->Valid(); it->Next()) {
+      rocksdb::Slice raw_key = it->key();
+      dout(30) << "key=" << pretty_binary_string(raw_key.ToString()) << dendl;
+      //check if need to refresh iterator
+      if (bytes_per_iterator > 10000000 ||
+         keys_per_iterator > 10000) {
+       dout(10) << "refreshing iterator" << dendl;
+       bytes_per_iterator = 0;
+       keys_per_iterator = 0;
+       std::string raw_key_str = raw_key.ToString();
+       delete it;
+       it = db->NewIterator(rocksdb::ReadOptions(), handle);
+       ceph_assert(it);
+       it->Seek(raw_key_str);
+       ceph_assert(it->Valid());
+      }
+      rocksdb::Slice value = it->value();
+      std::string prefix, key;
+      if (fixed_prefix.size() == 0) {
+       split_key(raw_key, &prefix, &key);
+      } else {
+       prefix = fixed_prefix;
+       key = raw_key.ToString();
+      }
+      std::string new_raw_key;
+      rocksdb::ColumnFamilyHandle* new_handle = get_cf_handle(prefix, key);
+      if (new_handle == nullptr) {
+       new_handle = default_cf;
+      }
+      if (handle == new_handle) {
+       continue;
+      }
+      if (new_handle == default_cf) {
+       new_raw_key = combine_strings(prefix, key);
+      } else {
+       new_raw_key = key;
+      }
+      bat->SingleDelete(handle, raw_key);
+      bat->Put(new_handle, new_raw_key, value);
+      dout(25) << "moving " << (void*)handle << "/" << pretty_binary_string(raw_key.ToString()) <<
+       " to " << (void*)new_handle << "/" << pretty_binary_string(new_raw_key) <<
+       " size " << value.size() << dendl;
+
+      bytes_in_batch += new_raw_key.size() * 2 + value.size();
+      keys_in_batch ++;
+      bytes_per_iterator += new_raw_key.size() * 2 + value.size();
+      keys_per_iterator ++;
+
+      //check if need to write batch
+      if (bytes_in_batch > 1000000 ||
+         keys_in_batch > 1000) {
+       flush_batch();
+      }
+    }
+    flush_batch();
+    delete it;
+    delete bat;
+  };
+
+  r = prepare_for_reshard(new_sharding, to_process_columns, to_process_handles);
+  if (r != 0) {
+    dout(1) << "failed to prepare db for reshard" << dendl;
+    goto cleanup;
+  }
+
+  ceph_assert(to_process_columns.size() == to_process_handles.size());
+  for (size_t idx = 0; idx < to_process_columns.size(); idx++) {
+    dout(5) << "Processing column=" << to_process_columns[idx] <<
+      " handle=" << to_process_handles[idx] << dendl;
+    if (to_process_columns[idx] == rocksdb::kDefaultColumnFamilyName) {
+      ceph_assert(to_process_handles[idx] == default_cf);
+      process_column(default_cf, std::string());
+    } else {
+      std::string fixed_prefix = to_process_columns[idx].substr(0, to_process_columns[idx].find('-'));
+      dout(10) << "Prefix: " << fixed_prefix << dendl;
+      process_column(to_process_handles[idx], fixed_prefix);
+    }
+  }
+
+  r = reshard_cleanup(to_process_columns, to_process_handles);
+  if (r != 0) {
+    dout(5) << "failed to cleanup after reshard" << dendl;
+    goto cleanup;
+  }
+  env->CreateDir(sharding_def_dir);
+  status = rocksdb::WriteStringToFile(env, new_sharding,
+                                     sharding_def_file, true);
+  if (!status.ok()) {
+    derr << __func__ << " cannot write to " << sharding_def_file << dendl;
+    r = -EIO;
+  }
+
+  cleanup:
+  //close column handles
+  for (const auto& col: cf_handles) {
+    for (size_t i = 0; i < col.second.handles.size(); i++) {
+      db->DestroyColumnFamilyHandle(col.second.handles[i]);
+    }
+  }
+  cf_handles.clear();
+  return r;
+}
index f1117909e49cb97a392ba6bdd0b2f240e88ccd0a..c6e1261325e12f08d7a88f404307d50ce43d34df 100644 (file)
@@ -520,6 +520,14 @@ err:
   WholeSpaceIterator get_wholespace_iterator(IteratorOpts opts = 0) override;
 private:
   WholeSpaceIterator get_default_cf_iterator();
+
+  int prepare_for_reshard(const std::string& new_sharding,
+                         std::vector<std::string>& to_process_columns,
+                         std::vector<rocksdb::ColumnFamilyHandle*>& to_process_handles);
+  int reshard_cleanup(const std::vector<std::string>& current_columns,
+                     const std::vector<rocksdb::ColumnFamilyHandle*>& current_handles);
+public:
+  int reshard(const std::string& new_sharding);
 };
 
 #endif
index 2cf70b5847d858c9831d64d45c81b2a54b0e41ec..1c0b6cc88f5f06cd7ba8229400829eb013cc2b8a 100644 (file)
@@ -5613,6 +5613,70 @@ void BlueStore::_sync_bluefs_and_fm()
   }
 }
 
+int BlueStore::open_db_environment(KeyValueDB **pdb)
+{
+  string kv_dir_fn;
+  string kv_backend;
+  _kv_only = true;
+  {
+    string type;
+    int r = read_meta("type", &type);
+    if (r < 0) {
+      derr << __func__ << " failed to load os-type: " << cpp_strerror(r)
+          << dendl;
+      return r;
+    }
+
+    if (type != "bluestore") {
+      derr << __func__ << " expected bluestore, but type is " << type << dendl;
+      return -EIO;
+    }
+  }
+  int r = _open_path();
+  if (r < 0)
+    return r;
+  r = _open_fsid(false);
+  if (r < 0)
+    goto out_path;
+
+  r = _read_fsid(&fsid);
+  if (r < 0)
+    goto out_fsid;
+
+  r = _lock_fsid();
+  if (r < 0)
+    goto out_fsid;
+
+  r = _open_bdev(false);
+  if (r < 0)
+    goto out_fsid;
+
+  r = _prepare_db_environment(false, false, &kv_dir_fn, &kv_backend);
+  if (r < 0)
+    goto out_bdev;
+
+  *pdb = db;
+  return 0;
+
+ out_bdev:
+  _close_bdev();
+ out_fsid:
+  _close_fsid();
+ out_path:
+  _close_path();
+
+  return r;
+}
+
+int BlueStore::close_db_environment()
+{
+  _close_db_and_around(false);
+  _close_bdev();
+  _close_fsid();
+  _close_path();
+  return 0;
+}
+
 int BlueStore::_prepare_db_environment(bool create, bool read_only,
                                       std::string* _fn, std::string* _kv_backend)
 {
index 44a3cfc538f9a9789f8bb8828fcb77dbaaeed42e..061bc96a5a256bbf06c262ab1f7e38887907f2e9 100644 (file)
@@ -2283,6 +2283,7 @@ private:
   void _close_db_and_around(bool read_only);
   int _prepare_db_environment(bool create, bool read_only,
                              std::string* kv_dir, std::string* kv_backend);
+  int _close_db_environment();
 
   // updates legacy bluefs related recs in DB to a state valid for
   // downgrades from nautilus.
@@ -2529,6 +2530,9 @@ public:
     return 0;
   }
 
+  int open_db_environment(KeyValueDB **pdb);
+  int close_db_environment();
+
   int write_meta(const std::string& key, const std::string& value) override;
   int read_meta(const std::string& key, std::string *value) override;
 
index 092a05acb7faa065a5c74f336a929561ef3513cf..209618aa400b2871fc0153090f7d895759a63689 100644 (file)
@@ -19,6 +19,7 @@
 #include "os/bluestore/BlueFS.h"
 #include "os/bluestore/BlueStore.h"
 #include "common/admin_socket.h"
+#include "kv/RocksDBStore.h"
 
 namespace po = boost::program_options;
 
@@ -226,6 +227,8 @@ int main(int argc, char **argv)
   string log_file;
   string key, value;
   vector<string> allocs_name;
+  string empty_sharding(1, '\0');
+  string new_sharding = empty_sharding;
   int log_level = 30;
   bool fsck_deep = false;
   po::options_description po_options("Options");
@@ -242,6 +245,7 @@ int main(int argc, char **argv)
     ("key,k", po::value<string>(&key), "label metadata key name")
     ("value,v", po::value<string>(&value), "label metadata value")
     ("allocator", po::value<vector<string>>(&allocs_name), "allocator to inspect: 'block'/'bluefs-wal'/'bluefs-db'/'bluefs-slow'")
+    ("sharding", po::value<string>(&new_sharding), "new sharding to apply")
     ;
   po::options_description po_positional("Positional options");
   po_positional.add_options()
@@ -262,7 +266,8 @@ int main(int argc, char **argv)
         "bluefs-log-dump, "
         "free-dump, "
         "free-score, "
-        "bluefs-stats")
+        "bluefs-stats, "
+        "reshard")
     ;
   po::options_description po_all("All options");
   po_all.add(po_options).add(po_positional);
@@ -395,6 +400,16 @@ int main(int argc, char **argv)
     if (allocs_name.empty())
       allocs_name = vector<string>{"block", "bluefs-db", "bluefs-wal", "bluefs-slow"};
   }
+  if (action == "reshard") {
+    if (path.empty()) {
+      cerr << "must specify bluestore path" << std::endl;
+      exit(EXIT_FAILURE);
+    }
+    if (new_sharding == empty_sharding) {
+      cerr << "must provide reshard specification" << std::endl;
+      exit(EXIT_FAILURE);
+    }
+  }
   vector<const char*> args;
   if (log_file.size()) {
     args.push_back("--log-file");
@@ -880,6 +895,28 @@ int main(int argc, char **argv)
     }
     cout << std::string(out.c_str(), out.length()) << std::endl;
      bluestore.cold_close();
+  } else if (action == "reshard") {
+    BlueStore bluestore(cct.get(), path);
+    KeyValueDB *db_ptr;
+    int r = bluestore.open_db_environment(&db_ptr);
+    if (r < 0) {
+      cerr << "error preparing db environment: " << cpp_strerror(r) << std::endl;
+      exit(EXIT_FAILURE);
+    }
+    if (r < 0) {
+      cerr << "error starting k-v inside bluestore: " << cpp_strerror(r) << std::endl;
+      exit(EXIT_FAILURE);
+    }
+    RocksDBStore* rocks_db = dynamic_cast<RocksDBStore*>(db_ptr);
+    ceph_assert(db_ptr);
+    ceph_assert(rocks_db);
+    r = rocks_db->reshard(new_sharding);
+    if (r < 0) {
+      cerr << "error resharding: " << cpp_strerror(r) << std::endl;
+    } else {
+      cout << "reshard success" << std::endl;
+    }
+    bluestore.close_db_environment();
   } else {
     cerr << "unrecognized action " << action << std::endl;
     return 1;