From: Adam Kupczyk Date: Thu, 9 Apr 2020 06:38:42 +0000 (+0200) Subject: kv/RocksDBStore: Add reshard ability to database. X-Git-Tag: wip-pdonnell-testing-20200918.022351~1039^2~10 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=52ac25e5a6661457b3b597e0bb06d376e894c797;p=ceph-ci.git kv/RocksDBStore: Add reshard ability to database. Reshard ability allows to change current sharding schema, without change to data. Signed-off-by: Adam Kupczyk --- diff --git a/src/kv/RocksDBStore.cc b/src/kv/RocksDBStore.cc index 62d7f93bad2..462939f002e 100644 --- a/src/kv/RocksDBStore.cc +++ b/src/kv/RocksDBStore.cc @@ -2666,3 +2666,377 @@ RocksDBStore::WholeSpaceIterator RocksDBStore::get_default_cf_iterator() return std::make_shared( db->NewIterator(rocksdb::ReadOptions(), default_cf)); } + +int RocksDBStore::prepare_for_reshard(const std::string& new_sharding, + std::vector& to_process_columns, + std::vector& to_process_handles) +{ + //1. list existing columns + //2. apply merge operator to (main + columns) opts + //3. prepare std::vector existing_cfs + //4. open db, acquire existing column handles + //5. calculate missing columns + //6. create missing columns + //7. construct cf_handles according to new sharding + //8. check is all cf_handles are filled + + bool b; + std::vector new_sharding_def; + char const* error_position; + std::string error_msg; + b = parse_sharding_def(new_sharding, new_sharding_def, &error_position, &error_msg); + if (!b) { + dout(1) << __func__ << " bad sharding: " << dendl; + dout(1) << __func__ << new_sharding << dendl; + dout(1) << __func__ << std::string(error_position - &new_sharding[0], ' ') << "^" << error_msg << dendl; + return -EINVAL; + } + + //1. list existing columns + + rocksdb::Status status; + std::vector existing_columns; + rocksdb::Options opt; + int r = load_rocksdb_options(false, opt); + if (r) { + dout(1) << __func__ << " load rocksdb options failed" << dendl; + return r; + } + status = rocksdb::DB::ListColumnFamilies(rocksdb::DBOptions(opt), path, &existing_columns); + if (!status.ok()) { + derr << "Unable to list column families: " << status.ToString() << dendl; + return -EINVAL; + } + dout(5) << "existing columns = " << existing_columns << dendl; + + //2. apply merge operator to (main + columns) opts + //3. prepare std::vector cfs_to_open + + std::vector cfs_to_open; + for (const auto& full_name : existing_columns) { + //split col_name to - + std::string base_name; + size_t pos = full_name.find('-'); + if (std::string::npos == pos) + base_name = full_name; + else + base_name = full_name.substr(0,pos); + + rocksdb::ColumnFamilyOptions cf_opt(opt); + // search if we have options for this column + std::string options; + for (const auto& nsd : new_sharding_def) { + if (nsd.name == base_name) { + options = nsd.options; + break; + } + } + status = rocksdb::GetColumnFamilyOptionsFromString(cf_opt, options, &cf_opt); + if (!status.ok()) { + derr << __func__ << " failure parsing column options: " << options << dendl; + return -EINVAL; + } + if (base_name != rocksdb::kDefaultColumnFamilyName) + install_cf_mergeop(base_name, &cf_opt); + cfs_to_open.emplace_back(full_name, cf_opt); + } + + //4. open db, acquire existing column handles + std::vector handles; + status = rocksdb::DB::Open(rocksdb::DBOptions(opt), + path, cfs_to_open, &handles, &db); + if (!status.ok()) { + derr << status.ToString() << dendl; + return -EINVAL; + } + for (size_t i = 0; i < cfs_to_open.size(); i++) { + dout(10) << "column " << cfs_to_open[i].name << " handle " << (void*)handles[i] << dendl; + } + + //5. calculate missing columns + std::vector new_sharding_columns; + std::vector missing_columns; + sharding_def_to_columns(new_sharding_def, + new_sharding_columns); + dout(5) << "target columns = " << new_sharding_columns << dendl; + for (const auto& n : new_sharding_columns) { + bool found = false; + for (const auto& e : existing_columns) { + if (n == e) { + found = true; + break; + } + } + if (!found) { + missing_columns.push_back(n); + } + } + dout(5) << "missing columns = " << missing_columns << dendl; + + //6. create missing columns + for (const auto& full_name : missing_columns) { + std::string base_name; + size_t pos = full_name.find('-'); + if (std::string::npos == pos) + base_name = full_name; + else + base_name = full_name.substr(0,pos); + + rocksdb::ColumnFamilyOptions cf_opt(opt); + // search if we have options for this column + std::string options; + for (const auto& nsd : new_sharding_def) { + if (nsd.name == base_name) { + options = nsd.options; + break; + } + } + status = rocksdb::GetColumnFamilyOptionsFromString(cf_opt, options, &cf_opt); + if (!status.ok()) { + derr << __func__ << " failure parsing column options: " << options << dendl; + return -EINVAL; + } + install_cf_mergeop(base_name, &cf_opt); + rocksdb::ColumnFamilyHandle *cf; + status = db->CreateColumnFamily(cf_opt, full_name, &cf); + if (!status.ok()) { + derr << __func__ << " Failed to create rocksdb column family: " + << full_name << dendl; + return -EINVAL; + } + dout(10) << "created column " << full_name << " handle = " << (void*)cf << dendl; + existing_columns.push_back(full_name); + handles.push_back(cf); + } + + //7. construct cf_handles according to new sharding + for (size_t i = 0; i < existing_columns.size(); i++) { + std::string full_name = existing_columns[i]; + rocksdb::ColumnFamilyHandle *cf = handles[i]; + std::string base_name; + size_t shard_idx = 0; + size_t pos = full_name.find('-'); + dout(10) << "processing column " << full_name << dendl; + if (std::string::npos == pos) { + base_name = full_name; + } else { + base_name = full_name.substr(0,pos); + shard_idx = atoi(full_name.substr(pos+1).c_str()); + } + if (rocksdb::kDefaultColumnFamilyName == base_name) { + default_cf = handles[i]; + must_close_default_cf = true; + } else { + for (const auto& nsd : new_sharding_def) { + if (nsd.name == base_name) { + if (shard_idx < nsd.shard_cnt) { + add_column_family(base_name, nsd.hash_l, nsd.hash_h, shard_idx, cf); + } else { + //ignore columns with index larger then shard count + } + break; + } + } + } + } + + //8. check if all cf_handles are filled + for (const auto& col : cf_handles) { + for (size_t i = 0; i < col.second.handles.size(); i++) { + if (col.second.handles[i] == nullptr) { + derr << "missing handle for column " << col.first << " shard " << i << dendl; + return -EIO; + } + } + } + to_process_columns = existing_columns; + to_process_handles = handles; + return 0; +} + +int RocksDBStore::reshard_cleanup(const std::vector& current_columns, + const std::vector& current_handles) +{ + std::vector new_sharding_columns; + for (const auto& col: cf_handles) { + if (col.second.handles.size() == 1) { + new_sharding_columns.push_back(col.first); + } else { + for (size_t i = 0; i < col.second.handles.size(); i++) { + new_sharding_columns.push_back(col.first + "-" + to_string(i)); + } + } + } + + for (size_t i = 0; i < current_columns.size(); i++) { + bool found = false; + for (size_t j = 0; j < new_sharding_columns.size(); j++) { + if (current_columns[i] == new_sharding_columns[j]) { + found = true; + break; + } + } + if (found || current_columns[i] == rocksdb::kDefaultColumnFamilyName) { + dout(5) << "Column " << current_columns[i] << " is part of new sharding." << dendl; + continue; + } + dout(5) << "Column " << current_columns[i] << " not part of new sharding. Deleting." << dendl; + + // verify that column is empty + rocksdb::Iterator* it; + it = db->NewIterator(rocksdb::ReadOptions(), current_handles[i]); + ceph_assert(it); + it->SeekToFirst(); + ceph_assert(!it->Valid()); + delete it; + rocksdb::Status status; + status = db->DropColumnFamily(current_handles[i]); + if (!status.ok()) { + derr << __func__ << " Failed to delete column: " + << current_columns[i] << dendl; + return -EINVAL; + } + } + return 0; +} + +int RocksDBStore::reshard(const std::string& new_sharding) +{ + rocksdb::Status status; + int r; + std::vector to_process_columns; + std::vector to_process_handles; + + size_t bytes_in_batch = 0; + size_t keys_in_batch = 0; + size_t bytes_per_iterator = 0; + size_t keys_per_iterator = 0; + + rocksdb::WriteBatch* bat = nullptr; + + auto flush_batch = [&]() { + dout(10) << "flushing batch" << dendl; + rocksdb::WriteOptions woptions; + woptions.sync = true; + rocksdb::Status s = db->Write(woptions, bat); + ceph_assert(s.ok()); + dout(25) << "processed " << keys_in_batch << " keys, for " << bytes_in_batch << " bytes" << dendl; + bytes_in_batch = 0; + keys_in_batch = 0; + delete bat; + bat = new rocksdb::WriteBatch(); + ceph_assert(bat); + }; + + auto process_column = [&](rocksdb::ColumnFamilyHandle* handle, + const std::string& fixed_prefix) + { + dout(10) << " column=" << (void*)handle << " prefix=" << fixed_prefix << dendl; + rocksdb::Iterator* it; + it = db->NewIterator(rocksdb::ReadOptions(), handle); + ceph_assert(it); + bat = new rocksdb::WriteBatch(); + ceph_assert(bat); + + for (it->SeekToFirst(); it->Valid(); it->Next()) { + rocksdb::Slice raw_key = it->key(); + dout(30) << "key=" << pretty_binary_string(raw_key.ToString()) << dendl; + //check if need to refresh iterator + if (bytes_per_iterator > 10000000 || + keys_per_iterator > 10000) { + dout(10) << "refreshing iterator" << dendl; + bytes_per_iterator = 0; + keys_per_iterator = 0; + std::string raw_key_str = raw_key.ToString(); + delete it; + it = db->NewIterator(rocksdb::ReadOptions(), handle); + ceph_assert(it); + it->Seek(raw_key_str); + ceph_assert(it->Valid()); + } + rocksdb::Slice value = it->value(); + std::string prefix, key; + if (fixed_prefix.size() == 0) { + split_key(raw_key, &prefix, &key); + } else { + prefix = fixed_prefix; + key = raw_key.ToString(); + } + std::string new_raw_key; + rocksdb::ColumnFamilyHandle* new_handle = get_cf_handle(prefix, key); + if (new_handle == nullptr) { + new_handle = default_cf; + } + if (handle == new_handle) { + continue; + } + if (new_handle == default_cf) { + new_raw_key = combine_strings(prefix, key); + } else { + new_raw_key = key; + } + bat->SingleDelete(handle, raw_key); + bat->Put(new_handle, new_raw_key, value); + dout(25) << "moving " << (void*)handle << "/" << pretty_binary_string(raw_key.ToString()) << + " to " << (void*)new_handle << "/" << pretty_binary_string(new_raw_key) << + " size " << value.size() << dendl; + + bytes_in_batch += new_raw_key.size() * 2 + value.size(); + keys_in_batch ++; + bytes_per_iterator += new_raw_key.size() * 2 + value.size(); + keys_per_iterator ++; + + //check if need to write batch + if (bytes_in_batch > 1000000 || + keys_in_batch > 1000) { + flush_batch(); + } + } + flush_batch(); + delete it; + delete bat; + }; + + r = prepare_for_reshard(new_sharding, to_process_columns, to_process_handles); + if (r != 0) { + dout(1) << "failed to prepare db for reshard" << dendl; + goto cleanup; + } + + ceph_assert(to_process_columns.size() == to_process_handles.size()); + for (size_t idx = 0; idx < to_process_columns.size(); idx++) { + dout(5) << "Processing column=" << to_process_columns[idx] << + " handle=" << to_process_handles[idx] << dendl; + if (to_process_columns[idx] == rocksdb::kDefaultColumnFamilyName) { + ceph_assert(to_process_handles[idx] == default_cf); + process_column(default_cf, std::string()); + } else { + std::string fixed_prefix = to_process_columns[idx].substr(0, to_process_columns[idx].find('-')); + dout(10) << "Prefix: " << fixed_prefix << dendl; + process_column(to_process_handles[idx], fixed_prefix); + } + } + + r = reshard_cleanup(to_process_columns, to_process_handles); + if (r != 0) { + dout(5) << "failed to cleanup after reshard" << dendl; + goto cleanup; + } + env->CreateDir(sharding_def_dir); + status = rocksdb::WriteStringToFile(env, new_sharding, + sharding_def_file, true); + if (!status.ok()) { + derr << __func__ << " cannot write to " << sharding_def_file << dendl; + r = -EIO; + } + + cleanup: + //close column handles + for (const auto& col: cf_handles) { + for (size_t i = 0; i < col.second.handles.size(); i++) { + db->DestroyColumnFamilyHandle(col.second.handles[i]); + } + } + cf_handles.clear(); + return r; +} diff --git a/src/kv/RocksDBStore.h b/src/kv/RocksDBStore.h index f1117909e49..c6e1261325e 100644 --- a/src/kv/RocksDBStore.h +++ b/src/kv/RocksDBStore.h @@ -520,6 +520,14 @@ err: WholeSpaceIterator get_wholespace_iterator(IteratorOpts opts = 0) override; private: WholeSpaceIterator get_default_cf_iterator(); + + int prepare_for_reshard(const std::string& new_sharding, + std::vector& to_process_columns, + std::vector& to_process_handles); + int reshard_cleanup(const std::vector& current_columns, + const std::vector& current_handles); +public: + int reshard(const std::string& new_sharding); }; #endif diff --git a/src/os/bluestore/BlueStore.cc b/src/os/bluestore/BlueStore.cc index 2cf70b5847d..1c0b6cc88f5 100644 --- a/src/os/bluestore/BlueStore.cc +++ b/src/os/bluestore/BlueStore.cc @@ -5613,6 +5613,70 @@ void BlueStore::_sync_bluefs_and_fm() } } +int BlueStore::open_db_environment(KeyValueDB **pdb) +{ + string kv_dir_fn; + string kv_backend; + _kv_only = true; + { + string type; + int r = read_meta("type", &type); + if (r < 0) { + derr << __func__ << " failed to load os-type: " << cpp_strerror(r) + << dendl; + return r; + } + + if (type != "bluestore") { + derr << __func__ << " expected bluestore, but type is " << type << dendl; + return -EIO; + } + } + int r = _open_path(); + if (r < 0) + return r; + r = _open_fsid(false); + if (r < 0) + goto out_path; + + r = _read_fsid(&fsid); + if (r < 0) + goto out_fsid; + + r = _lock_fsid(); + if (r < 0) + goto out_fsid; + + r = _open_bdev(false); + if (r < 0) + goto out_fsid; + + r = _prepare_db_environment(false, false, &kv_dir_fn, &kv_backend); + if (r < 0) + goto out_bdev; + + *pdb = db; + return 0; + + out_bdev: + _close_bdev(); + out_fsid: + _close_fsid(); + out_path: + _close_path(); + + return r; +} + +int BlueStore::close_db_environment() +{ + _close_db_and_around(false); + _close_bdev(); + _close_fsid(); + _close_path(); + return 0; +} + int BlueStore::_prepare_db_environment(bool create, bool read_only, std::string* _fn, std::string* _kv_backend) { diff --git a/src/os/bluestore/BlueStore.h b/src/os/bluestore/BlueStore.h index 44a3cfc538f..061bc96a5a2 100644 --- a/src/os/bluestore/BlueStore.h +++ b/src/os/bluestore/BlueStore.h @@ -2283,6 +2283,7 @@ private: void _close_db_and_around(bool read_only); int _prepare_db_environment(bool create, bool read_only, std::string* kv_dir, std::string* kv_backend); + int _close_db_environment(); // updates legacy bluefs related recs in DB to a state valid for // downgrades from nautilus. @@ -2529,6 +2530,9 @@ public: return 0; } + int open_db_environment(KeyValueDB **pdb); + int close_db_environment(); + int write_meta(const std::string& key, const std::string& value) override; int read_meta(const std::string& key, std::string *value) override; diff --git a/src/os/bluestore/bluestore_tool.cc b/src/os/bluestore/bluestore_tool.cc index 092a05acb7f..209618aa400 100644 --- a/src/os/bluestore/bluestore_tool.cc +++ b/src/os/bluestore/bluestore_tool.cc @@ -19,6 +19,7 @@ #include "os/bluestore/BlueFS.h" #include "os/bluestore/BlueStore.h" #include "common/admin_socket.h" +#include "kv/RocksDBStore.h" namespace po = boost::program_options; @@ -226,6 +227,8 @@ int main(int argc, char **argv) string log_file; string key, value; vector allocs_name; + string empty_sharding(1, '\0'); + string new_sharding = empty_sharding; int log_level = 30; bool fsck_deep = false; po::options_description po_options("Options"); @@ -242,6 +245,7 @@ int main(int argc, char **argv) ("key,k", po::value(&key), "label metadata key name") ("value,v", po::value(&value), "label metadata value") ("allocator", po::value>(&allocs_name), "allocator to inspect: 'block'/'bluefs-wal'/'bluefs-db'/'bluefs-slow'") + ("sharding", po::value(&new_sharding), "new sharding to apply") ; po::options_description po_positional("Positional options"); po_positional.add_options() @@ -262,7 +266,8 @@ int main(int argc, char **argv) "bluefs-log-dump, " "free-dump, " "free-score, " - "bluefs-stats") + "bluefs-stats, " + "reshard") ; po::options_description po_all("All options"); po_all.add(po_options).add(po_positional); @@ -395,6 +400,16 @@ int main(int argc, char **argv) if (allocs_name.empty()) allocs_name = vector{"block", "bluefs-db", "bluefs-wal", "bluefs-slow"}; } + if (action == "reshard") { + if (path.empty()) { + cerr << "must specify bluestore path" << std::endl; + exit(EXIT_FAILURE); + } + if (new_sharding == empty_sharding) { + cerr << "must provide reshard specification" << std::endl; + exit(EXIT_FAILURE); + } + } vector args; if (log_file.size()) { args.push_back("--log-file"); @@ -880,6 +895,28 @@ int main(int argc, char **argv) } cout << std::string(out.c_str(), out.length()) << std::endl; bluestore.cold_close(); + } else if (action == "reshard") { + BlueStore bluestore(cct.get(), path); + KeyValueDB *db_ptr; + int r = bluestore.open_db_environment(&db_ptr); + if (r < 0) { + cerr << "error preparing db environment: " << cpp_strerror(r) << std::endl; + exit(EXIT_FAILURE); + } + if (r < 0) { + cerr << "error starting k-v inside bluestore: " << cpp_strerror(r) << std::endl; + exit(EXIT_FAILURE); + } + RocksDBStore* rocks_db = dynamic_cast(db_ptr); + ceph_assert(db_ptr); + ceph_assert(rocks_db); + r = rocks_db->reshard(new_sharding); + if (r < 0) { + cerr << "error resharding: " << cpp_strerror(r) << std::endl; + } else { + cout << "reshard success" << std::endl; + } + bluestore.close_db_environment(); } else { cerr << "unrecognized action " << action << std::endl; return 1;