From c58e15c72e6b73a7a7eab2b13124f2cf6326db49 Mon Sep 17 00:00:00 2001 From: Adam Kupczyk Date: Mon, 10 Feb 2020 16:25:44 +0100 Subject: [PATCH] kv/RocksDBStore: fix repair. Now repair recovers sharding definition files. If some column families were deleted, they are recreated on next open. Signed-off-by: Adam Kupczyk --- src/kv/RocksDBStore.cc | 354 +++++++++++++++++++++++++++-------------- src/kv/RocksDBStore.h | 10 ++ 2 files changed, 242 insertions(+), 122 deletions(-) diff --git a/src/kv/RocksDBStore.cc b/src/kv/RocksDBStore.cc index 01aab1bdf33..caa1926b9f9 100644 --- a/src/kv/RocksDBStore.cc +++ b/src/kv/RocksDBStore.cc @@ -49,7 +49,9 @@ using ceph::bufferlist; using ceph::bufferptr; using ceph::Formatter; +static const char* sharding_def_dir = "sharding"; static const char* sharding_def_file = "sharding/def"; +static const char* sharding_recreate = "sharding/recreate_columns"; static bufferlist to_bufferlist(rocksdb::Slice in) { bufferlist bl; @@ -570,8 +572,8 @@ bool RocksDBStore::parse_sharding_def(const std::string_view text_def_in, std::string error_msg_local; if (error_position == nullptr) { error_position = &error_position_local; - *error_position = nullptr; } + *error_position = nullptr; if (error_msg == nullptr) { error_msg = &error_msg_local; error_msg->clear(); @@ -663,7 +665,7 @@ void RocksDBStore::sharding_def_to_columns(const std::vector& shar } int RocksDBStore::create_shards(const rocksdb::Options& opt, - const vector& sharding_def) + const std::vector& sharding_def) { for (auto& p : sharding_def) { // copy default CF settings, block cache, merge operators as @@ -699,6 +701,147 @@ int RocksDBStore::create_shards(const rocksdb::Options& opt, return 0; } +int RocksDBStore::apply_sharding(const rocksdb::Options& opt, + const std::string& sharding_text) +{ + // create and open column families + if (!sharding_text.empty()) { + bool b; + int r; + rocksdb::Status status; + std::vector sharding_def; + char const* error_position; + std::string error_msg; + b = parse_sharding_def(sharding_text, sharding_def, &error_position, &error_msg); + if (!b) { + dout(1) << __func__ << " bad sharding: " << dendl; + dout(1) << __func__ << sharding_text << dendl; + dout(1) << __func__ << std::string(error_position - &sharding_text[0], ' ') << "^" << error_msg << dendl; + return -EINVAL; + } + r = create_shards(opt, sharding_def); + if (r != 0 ) { + return r; + } + opt.env->CreateDir(sharding_def_dir); + status = rocksdb::WriteStringToFile(opt.env, sharding_text, + sharding_def_file, true); + if (!status.ok()) { + derr << __func__ << " cannot write to " << sharding_def_file << dendl; + return -EIO; + } + } else { + opt.env->DeleteFile(sharding_def_file); + } + return 0; +} + +int RocksDBStore::verify_sharding(const rocksdb::Options& opt, + const std::string& sharding_text, + std::vector& existing_cfs, + std::vector >& existing_cfs_shard, + std::vector& missing_cfs, + std::vector >& missing_cfs_shard) +{ + rocksdb::Status status; + std::string stored_sharding_text; + status = opt.env->FileExists(sharding_def_file); + if (status.ok()) { + status = rocksdb::ReadFileToString(opt.env, + sharding_def_file, + &stored_sharding_text); + if(!status.ok()) { + derr << __func__ << " cannot read from " << sharding_def_file << dendl; + return -EIO; + } + } else { + //no "sharding_def" present + } + //check if sharding_def matches stored_sharding_def + std::vector sharding_def; + std::vector stored_sharding_def; + parse_sharding_def(sharding_text, sharding_def); + parse_sharding_def(stored_sharding_text, stored_sharding_def); + + std::sort(sharding_def.begin(), sharding_def.end(), + [](ColumnFamily& a, ColumnFamily& b) { return a.name < b.name; } ); + std::sort(stored_sharding_def.begin(), stored_sharding_def.end(), + [](ColumnFamily& a, ColumnFamily& b) { return a.name < b.name; } ); + + bool match = true; + if (sharding_def.size() != stored_sharding_def.size()) { + match = false; + } else { + for (size_t i = 0; i < sharding_def.size(); i++) { + auto& a = sharding_def[i]; + auto& b = stored_sharding_def[i]; + if ( (a.name != b.name) || + (a.shard_cnt != b.shard_cnt) || + (a.hash_l != b.hash_l) || + (a.hash_h != b.hash_h) ) { + match = false; + break; + } + } + } + if (!match) { + derr << __func__ << " mismatch on sharding. requested = " << sharding_def + << " stored = " << stored_sharding_def << dendl; + return -EIO; + } + std::vector rocksdb_cfs; + status = rocksdb::DB::ListColumnFamilies(rocksdb::DBOptions(opt), + path, &rocksdb_cfs); + if (!status.ok()) { + return -EIO; + } + dout(5) << __func__ << " column families from rocksdb: " << rocksdb_cfs << dendl; + + auto emplace_cf = [&] (const RocksDBStore::ColumnFamily& column, + int32_t shard_id, + const std::string& shard_name, + const rocksdb::ColumnFamilyOptions& opt) { + if (std::find(rocksdb_cfs.begin(), rocksdb_cfs.end(), shard_name) != rocksdb_cfs.end()) { + existing_cfs.emplace_back(shard_name, opt); + existing_cfs_shard.emplace_back(shard_id, column); + } else { + missing_cfs.emplace_back(shard_name, opt); + missing_cfs_shard.emplace_back(shard_id, column); + } + }; + + for (auto& column : stored_sharding_def) { + rocksdb::ColumnFamilyOptions cf_opt(opt); + status = rocksdb::GetColumnFamilyOptionsFromString( + cf_opt, column.options, &cf_opt); + if (!status.ok()) { + derr << __func__ << " invalid db column family options for CF '" + << column.name << "': " << column.options << dendl; + return -EINVAL; + } + install_cf_mergeop(column.name, &cf_opt); + + if (column.shard_cnt == 1) { + emplace_cf(column, 0, column.name, cf_opt); + } else { + for (size_t i = 0; i < column.shard_cnt; i++) { + std::string cf_name = column.name + "-" + to_string(i); + emplace_cf(column, i, cf_name, cf_opt); + } + } + } + existing_cfs.emplace_back("default", opt); + + if (existing_cfs.size() != rocksdb_cfs.size()) { + std::vector columns_from_stored; + sharding_def_to_columns(stored_sharding_def, columns_from_stored); + derr << __func__ << " extra columns in rocksdb. rocksdb columns = " << rocksdb_cfs + << " target columns = " << columns_from_stored << dendl; + return -EIO; + } + return 0; +} + std::ostream& operator<<(std::ostream& out, const RocksDBStore::ColumnFamily& cf) { out << "("; @@ -736,100 +879,35 @@ int RocksDBStore::do_open(ostream &out, derr << status.ToString() << dendl; return -EINVAL; } - // create and open column families - if (!sharding_text.empty()) { - bool b; - std::vector sharding_def; - b = parse_sharding_def(sharding_text, sharding_def); - if (!b) { - dout(1) << __func__ << " bad sharding: " << sharding_text << dendl; - return -EINVAL; - } - r = create_shards(opt, sharding_def); - if (r != 0 ) { - return r; - } - opt.env->CreateDir("sharding"); - status = rocksdb::WriteStringToFile(opt.env, sharding_text, - sharding_def_file, true); - if (!status.ok()) { - derr << __func__ << " cannot write to " << sharding_def_file << dendl; - return -EIO; - } - } else { - status = opt.env->DeleteFile(sharding_def_file); + r = apply_sharding(opt, sharding_text); + if (r < 0) { + return r; } default_cf = db->DefaultColumnFamily(); } else { - std::string stored_sharding_text; - status = opt.env->FileExists(sharding_def_file); - if (status.ok()) { - status = rocksdb::ReadFileToString(opt.env, - sharding_def_file, - &stored_sharding_text); - if(!status.ok()) { - derr << __func__ << " cannot read from " << sharding_def_file << dendl; - return -EIO; - } - } else { - //no "sharding_def" present - } - //check if sharding_def matches stored_sharding_def - std::vector sharding_def; - parse_sharding_def(sharding_text, sharding_def); - std::vector stored_sharding_def; - parse_sharding_def(stored_sharding_text, stored_sharding_def); - - std::sort(sharding_def.begin(), sharding_def.end(), - [](ColumnFamily& a, ColumnFamily&b) {return a.name < b.name;}); - std::sort(stored_sharding_def.begin(), stored_sharding_def.end(), - [](ColumnFamily& a, ColumnFamily&b) {return a.name < b.name;}); - - bool match = true; - if (sharding_def.size() != stored_sharding_def.size()) { - match = false; - } else { - for (size_t i = 0; i < sharding_def.size(); i++) { - auto& a = sharding_def[i]; - auto& b = stored_sharding_def[i]; - if ( (a.name != b.name) || - (a.shard_cnt != b.shard_cnt) || - (a.hash_l != b.hash_l) || - (a.hash_h != b.hash_h) ) { - match = false; - break; - } - } - } - if (!match) { - derr << __func__ << " mismatch on sharding. requested = " << sharding_def - << " stored = " << stored_sharding_def << dendl; - return -EIO; + std::vector existing_cfs; + std::vector > existing_cfs_shard; + std::vector missing_cfs; + std::vector > missing_cfs_shard; + + r = verify_sharding(opt, sharding_text, + existing_cfs, existing_cfs_shard, + missing_cfs, missing_cfs_shard); + if (r < 0) { + return r; } + std::string sharding_recreate_text; + status = rocksdb::ReadFileToString(opt.env, + sharding_recreate, + &sharding_recreate_text); + bool recreate_mode = status.ok() && sharding_recreate_text == "1"; - std::vector existing_cfs; - status = rocksdb::DB::ListColumnFamilies( - rocksdb::DBOptions(opt), - path, - &existing_cfs); - dout(5) << __func__ << " column families from rocksdb: " << existing_cfs << dendl; - - std::sort(existing_cfs.begin(), existing_cfs.end(), - [&](const std::string& a, const std::string&b) { - return a < b;} - ); - - std::vector columns_from_stored; - sharding_def_to_columns(stored_sharding_def, columns_from_stored); - columns_from_stored.push_back(rocksdb::kDefaultColumnFamilyName); - std::sort(columns_from_stored.begin(), columns_from_stored.end()); - - if (existing_cfs != columns_from_stored) { - derr << __func__ << " mismatch on sharding. rocksdb columns = " << existing_cfs - << " stored columns = " << columns_from_stored << dendl; + if (recreate_mode == false && missing_cfs.size() != 0) { + derr << __func__ << " missing column families: " << missing_cfs_shard << dendl; + return -EIO; } - if (columns_from_stored.empty()) { + if (existing_cfs.empty()) { // no column families if (open_readonly) { status = rocksdb::DB::Open(opt, path, &db); @@ -842,51 +920,52 @@ int RocksDBStore::do_open(ostream &out, } default_cf = db->DefaultColumnFamily(); } else { - std::vector column_families; - for (auto& column : stored_sharding_def) { - rocksdb::ColumnFamilyOptions cf_opt(opt); - status = rocksdb::GetColumnFamilyOptionsFromString( - cf_opt, column.options, &cf_opt); - if (!status.ok()) { - derr << __func__ << " invalid db column family options for CF '" - << column.name << "': " << column.options << dendl; - return -EINVAL; - } - install_cf_mergeop(column.name, &cf_opt); - if (column.shard_cnt == 1) { - column_families.push_back(rocksdb::ColumnFamilyDescriptor(column.name, cf_opt)); - } else { - for (size_t i = 0; i < column.shard_cnt; i++) { - std::string cf_name = column.name + "-" + to_string(i); - column_families.push_back(rocksdb::ColumnFamilyDescriptor(cf_name, cf_opt)); - } - } - } - column_families.push_back(rocksdb::ColumnFamilyDescriptor("default",opt)); std::vector handles; if (open_readonly) { status = rocksdb::DB::OpenForReadOnly(rocksdb::DBOptions(opt), - path, column_families, + path, existing_cfs, &handles, &db); } else { status = rocksdb::DB::Open(rocksdb::DBOptions(opt), - path, column_families, &handles, &db); + path, existing_cfs, &handles, &db); } if (!status.ok()) { derr << status.ToString() << dendl; return -EINVAL; } + ceph_assert(existing_cfs.size() == existing_cfs_shard.size() + 1); + ceph_assert(handles.size() == existing_cfs.size()); + dout(10) << __func__ << " existing_cfs=" << existing_cfs.size() << dendl; + for (size_t i = 0; i < existing_cfs_shard.size(); i++) { + add_column_family(existing_cfs_shard[i].second.name, + existing_cfs_shard[i].second.hash_l, + existing_cfs_shard[i].second.hash_h, + existing_cfs_shard[i].first, + handles[i]); + } + default_cf = handles[handles.size() - 1]; + must_close_default_cf = true; - size_t pos = 0; - for (auto& column : stored_sharding_def) { - for (size_t i = 0; i < column.shard_cnt; i++) { - add_column_family(column.name, column.hash_l, column.hash_h, i, handles[pos]); - pos++; + if (missing_cfs.size() > 0) { + dout(10) << __func__ << " missing_cfs=" << missing_cfs.size() << dendl; + ceph_assert(recreate_mode); + ceph_assert(missing_cfs.size() == missing_cfs_shard.size()); + for (size_t i = 0; i < missing_cfs.size(); i++) { + rocksdb::ColumnFamilyHandle *cf; + status = db->CreateColumnFamily(missing_cfs[i].options, missing_cfs[i].name, &cf); + if (!status.ok()) { + derr << __func__ << " Failed to create rocksdb column family: " + << missing_cfs[i].name << dendl; + return -EINVAL; + } + add_column_family(missing_cfs_shard[i].second.name, + missing_cfs_shard[i].second.hash_l, + missing_cfs_shard[i].second.hash_h, + missing_cfs_shard[i].first, + cf); } + opt.env->DeleteFile(sharding_recreate); } - ceph_assert(pos == handles.size() - 1); - default_cf = handles[pos]; - must_close_default_cf = true; } } ceph_assert(default_cf != nullptr); @@ -977,6 +1056,7 @@ void RocksDBStore::close() int RocksDBStore::repair(std::ostream &out) { + rocksdb::Status status; rocksdb::Options opt; int r = load_rocksdb_options(false, opt); if (r) { @@ -984,12 +1064,42 @@ int RocksDBStore::repair(std::ostream &out) out << "load rocksdb options failed" << std::endl; return r; } - rocksdb::Status status = rocksdb::RepairDB(path, opt); + //need to save sharding definition, repairDB will delete files it does not know + std::string stored_sharding_text; + status = opt.env->FileExists(sharding_def_file); if (status.ok()) { + status = rocksdb::ReadFileToString(opt.env, + sharding_def_file, + &stored_sharding_text); + if (!status.ok()) { + stored_sharding_text.clear(); + } + } + dout(10) << __func__ << " stored_sharding: " << stored_sharding_text << dendl; + status = rocksdb::RepairDB(path, opt); + bool repaired = status.ok(); + if (!stored_sharding_text.empty()) { + //recreate markers even if repair failed + opt.env->CreateDir(sharding_def_dir); + status = rocksdb::WriteStringToFile(opt.env, stored_sharding_text, + sharding_def_file, true); + if (!status.ok()) { + derr << __func__ << " cannot write to " << sharding_def_file << dendl; + return -1; + } + status = rocksdb::WriteStringToFile(opt.env, "1", + sharding_recreate, true); + if (!status.ok()) { + derr << __func__ << " cannot write to " << sharding_recreate << dendl; + return -1; + } + } + + if (repaired && status.ok()) { return 0; } else { out << "repair rocksdb failed : " << status.ToString() << std::endl; - return 1; + return -1; } } diff --git a/src/kv/RocksDBStore.h b/src/kv/RocksDBStore.h index f78b23bc55a..ca36bdb7df1 100644 --- a/src/kv/RocksDBStore.h +++ b/src/kv/RocksDBStore.h @@ -16,6 +16,7 @@ #include "rocksdb/iostats_context.h" #include "rocksdb/statistics.h" #include "rocksdb/table.h" +#include "rocksdb/db.h" #include "kv/rocksdb_cache/BinnedLRUCache.h" #include #include "common/errno.h" @@ -141,6 +142,15 @@ private: std::vector& columns); int create_shards(const rocksdb::Options& opt, const vector& sharding_def); + int apply_sharding(const rocksdb::Options& opt, + const std::string& sharding_text); + int verify_sharding(const rocksdb::Options& opt, + const std::string& sharding_text, + std::vector& existing_cfs, + std::vector >& existing_cfs_shard, + std::vector& missing_cfs, + std::vector >& missing_cfs_shard); + // manage async compactions ceph::mutex compact_queue_lock = ceph::make_mutex("RocksDBStore::compact_thread_lock"); -- 2.39.5