using ceph::bufferptr;
using ceph::Formatter;
+static const char* sharding_def_dir = "sharding";
static const char* sharding_def_file = "sharding/def";
+static const char* sharding_recreate = "sharding/recreate_columns";
static bufferlist to_bufferlist(rocksdb::Slice in) {
bufferlist bl;
std::string error_msg_local;
if (error_position == nullptr) {
error_position = &error_position_local;
- *error_position = nullptr;
}
+ *error_position = nullptr;
if (error_msg == nullptr) {
error_msg = &error_msg_local;
error_msg->clear();
}
int RocksDBStore::create_shards(const rocksdb::Options& opt,
- const vector<ColumnFamily>& sharding_def)
+ const std::vector<ColumnFamily>& sharding_def)
{
for (auto& p : sharding_def) {
// copy default CF settings, block cache, merge operators as
return 0;
}
+int RocksDBStore::apply_sharding(const rocksdb::Options& opt,
+ const std::string& sharding_text)
+{
+ // create and open column families
+ if (!sharding_text.empty()) {
+ bool b;
+ int r;
+ rocksdb::Status status;
+ std::vector<ColumnFamily> sharding_def;
+ char const* error_position;
+ std::string error_msg;
+ b = parse_sharding_def(sharding_text, sharding_def, &error_position, &error_msg);
+ if (!b) {
+ dout(1) << __func__ << " bad sharding: " << dendl;
+ dout(1) << __func__ << sharding_text << dendl;
+ dout(1) << __func__ << std::string(error_position - &sharding_text[0], ' ') << "^" << error_msg << dendl;
+ return -EINVAL;
+ }
+ r = create_shards(opt, sharding_def);
+ if (r != 0 ) {
+ return r;
+ }
+ opt.env->CreateDir(sharding_def_dir);
+ status = rocksdb::WriteStringToFile(opt.env, sharding_text,
+ sharding_def_file, true);
+ if (!status.ok()) {
+ derr << __func__ << " cannot write to " << sharding_def_file << dendl;
+ return -EIO;
+ }
+ } else {
+ opt.env->DeleteFile(sharding_def_file);
+ }
+ return 0;
+}
+
+int RocksDBStore::verify_sharding(const rocksdb::Options& opt,
+ const std::string& sharding_text,
+ std::vector<rocksdb::ColumnFamilyDescriptor>& existing_cfs,
+ std::vector<std::pair<size_t, RocksDBStore::ColumnFamily> >& existing_cfs_shard,
+ std::vector<rocksdb::ColumnFamilyDescriptor>& missing_cfs,
+ std::vector<std::pair<size_t, RocksDBStore::ColumnFamily> >& missing_cfs_shard)
+{
+ rocksdb::Status status;
+ std::string stored_sharding_text;
+ status = opt.env->FileExists(sharding_def_file);
+ if (status.ok()) {
+ status = rocksdb::ReadFileToString(opt.env,
+ sharding_def_file,
+ &stored_sharding_text);
+ if(!status.ok()) {
+ derr << __func__ << " cannot read from " << sharding_def_file << dendl;
+ return -EIO;
+ }
+ } else {
+ //no "sharding_def" present
+ }
+ //check if sharding_def matches stored_sharding_def
+ std::vector<ColumnFamily> sharding_def;
+ std::vector<ColumnFamily> stored_sharding_def;
+ parse_sharding_def(sharding_text, sharding_def);
+ parse_sharding_def(stored_sharding_text, stored_sharding_def);
+
+ std::sort(sharding_def.begin(), sharding_def.end(),
+ [](ColumnFamily& a, ColumnFamily& b) { return a.name < b.name; } );
+ std::sort(stored_sharding_def.begin(), stored_sharding_def.end(),
+ [](ColumnFamily& a, ColumnFamily& b) { return a.name < b.name; } );
+
+ bool match = true;
+ if (sharding_def.size() != stored_sharding_def.size()) {
+ match = false;
+ } else {
+ for (size_t i = 0; i < sharding_def.size(); i++) {
+ auto& a = sharding_def[i];
+ auto& b = stored_sharding_def[i];
+ if ( (a.name != b.name) ||
+ (a.shard_cnt != b.shard_cnt) ||
+ (a.hash_l != b.hash_l) ||
+ (a.hash_h != b.hash_h) ) {
+ match = false;
+ break;
+ }
+ }
+ }
+ if (!match) {
+ derr << __func__ << " mismatch on sharding. requested = " << sharding_def
+ << " stored = " << stored_sharding_def << dendl;
+ return -EIO;
+ }
+ std::vector<string> rocksdb_cfs;
+ status = rocksdb::DB::ListColumnFamilies(rocksdb::DBOptions(opt),
+ path, &rocksdb_cfs);
+ if (!status.ok()) {
+ return -EIO;
+ }
+ dout(5) << __func__ << " column families from rocksdb: " << rocksdb_cfs << dendl;
+
+ auto emplace_cf = [&] (const RocksDBStore::ColumnFamily& column,
+ int32_t shard_id,
+ const std::string& shard_name,
+ const rocksdb::ColumnFamilyOptions& opt) {
+ if (std::find(rocksdb_cfs.begin(), rocksdb_cfs.end(), shard_name) != rocksdb_cfs.end()) {
+ existing_cfs.emplace_back(shard_name, opt);
+ existing_cfs_shard.emplace_back(shard_id, column);
+ } else {
+ missing_cfs.emplace_back(shard_name, opt);
+ missing_cfs_shard.emplace_back(shard_id, column);
+ }
+ };
+
+ for (auto& column : stored_sharding_def) {
+ rocksdb::ColumnFamilyOptions cf_opt(opt);
+ status = rocksdb::GetColumnFamilyOptionsFromString(
+ cf_opt, column.options, &cf_opt);
+ if (!status.ok()) {
+ derr << __func__ << " invalid db column family options for CF '"
+ << column.name << "': " << column.options << dendl;
+ return -EINVAL;
+ }
+ install_cf_mergeop(column.name, &cf_opt);
+
+ if (column.shard_cnt == 1) {
+ emplace_cf(column, 0, column.name, cf_opt);
+ } else {
+ for (size_t i = 0; i < column.shard_cnt; i++) {
+ std::string cf_name = column.name + "-" + to_string(i);
+ emplace_cf(column, i, cf_name, cf_opt);
+ }
+ }
+ }
+ existing_cfs.emplace_back("default", opt);
+
+ if (existing_cfs.size() != rocksdb_cfs.size()) {
+ std::vector<std::string> columns_from_stored;
+ sharding_def_to_columns(stored_sharding_def, columns_from_stored);
+ derr << __func__ << " extra columns in rocksdb. rocksdb columns = " << rocksdb_cfs
+ << " target columns = " << columns_from_stored << dendl;
+ return -EIO;
+ }
+ return 0;
+}
+
std::ostream& operator<<(std::ostream& out, const RocksDBStore::ColumnFamily& cf)
{
out << "(";
derr << status.ToString() << dendl;
return -EINVAL;
}
- // create and open column families
- if (!sharding_text.empty()) {
- bool b;
- std::vector<ColumnFamily> sharding_def;
- b = parse_sharding_def(sharding_text, sharding_def);
- if (!b) {
- dout(1) << __func__ << " bad sharding: " << sharding_text << dendl;
- return -EINVAL;
- }
- r = create_shards(opt, sharding_def);
- if (r != 0 ) {
- return r;
- }
- opt.env->CreateDir("sharding");
- status = rocksdb::WriteStringToFile(opt.env, sharding_text,
- sharding_def_file, true);
- if (!status.ok()) {
- derr << __func__ << " cannot write to " << sharding_def_file << dendl;
- return -EIO;
- }
- } else {
- status = opt.env->DeleteFile(sharding_def_file);
+ r = apply_sharding(opt, sharding_text);
+ if (r < 0) {
+ return r;
}
default_cf = db->DefaultColumnFamily();
} else {
- std::string stored_sharding_text;
- status = opt.env->FileExists(sharding_def_file);
- if (status.ok()) {
- status = rocksdb::ReadFileToString(opt.env,
- sharding_def_file,
- &stored_sharding_text);
- if(!status.ok()) {
- derr << __func__ << " cannot read from " << sharding_def_file << dendl;
- return -EIO;
- }
- } else {
- //no "sharding_def" present
- }
- //check if sharding_def matches stored_sharding_def
- std::vector<ColumnFamily> sharding_def;
- parse_sharding_def(sharding_text, sharding_def);
- std::vector<ColumnFamily> stored_sharding_def;
- parse_sharding_def(stored_sharding_text, stored_sharding_def);
-
- std::sort(sharding_def.begin(), sharding_def.end(),
- [](ColumnFamily& a, ColumnFamily&b) {return a.name < b.name;});
- std::sort(stored_sharding_def.begin(), stored_sharding_def.end(),
- [](ColumnFamily& a, ColumnFamily&b) {return a.name < b.name;});
-
- bool match = true;
- if (sharding_def.size() != stored_sharding_def.size()) {
- match = false;
- } else {
- for (size_t i = 0; i < sharding_def.size(); i++) {
- auto& a = sharding_def[i];
- auto& b = stored_sharding_def[i];
- if ( (a.name != b.name) ||
- (a.shard_cnt != b.shard_cnt) ||
- (a.hash_l != b.hash_l) ||
- (a.hash_h != b.hash_h) ) {
- match = false;
- break;
- }
- }
- }
- if (!match) {
- derr << __func__ << " mismatch on sharding. requested = " << sharding_def
- << " stored = " << stored_sharding_def << dendl;
- return -EIO;
+ std::vector<rocksdb::ColumnFamilyDescriptor> existing_cfs;
+ std::vector<std::pair<size_t, RocksDBStore::ColumnFamily> > existing_cfs_shard;
+ std::vector<rocksdb::ColumnFamilyDescriptor> missing_cfs;
+ std::vector<std::pair<size_t, RocksDBStore::ColumnFamily> > missing_cfs_shard;
+
+ r = verify_sharding(opt, sharding_text,
+ existing_cfs, existing_cfs_shard,
+ missing_cfs, missing_cfs_shard);
+ if (r < 0) {
+ return r;
}
+ std::string sharding_recreate_text;
+ status = rocksdb::ReadFileToString(opt.env,
+ sharding_recreate,
+ &sharding_recreate_text);
+ bool recreate_mode = status.ok() && sharding_recreate_text == "1";
- std::vector<string> existing_cfs;
- status = rocksdb::DB::ListColumnFamilies(
- rocksdb::DBOptions(opt),
- path,
- &existing_cfs);
- dout(5) << __func__ << " column families from rocksdb: " << existing_cfs << dendl;
-
- std::sort(existing_cfs.begin(), existing_cfs.end(),
- [&](const std::string& a, const std::string&b) {
- return a < b;}
- );
-
- std::vector<std::string> columns_from_stored;
- sharding_def_to_columns(stored_sharding_def, columns_from_stored);
- columns_from_stored.push_back(rocksdb::kDefaultColumnFamilyName);
- std::sort(columns_from_stored.begin(), columns_from_stored.end());
-
- if (existing_cfs != columns_from_stored) {
- derr << __func__ << " mismatch on sharding. rocksdb columns = " << existing_cfs
- << " stored columns = " << columns_from_stored << dendl;
+ if (recreate_mode == false && missing_cfs.size() != 0) {
+ derr << __func__ << " missing column families: " << missing_cfs_shard << dendl;
+ return -EIO;
}
- if (columns_from_stored.empty()) {
+ if (existing_cfs.empty()) {
// no column families
if (open_readonly) {
status = rocksdb::DB::Open(opt, path, &db);
}
default_cf = db->DefaultColumnFamily();
} else {
- std::vector<rocksdb::ColumnFamilyDescriptor> column_families;
- for (auto& column : stored_sharding_def) {
- rocksdb::ColumnFamilyOptions cf_opt(opt);
- status = rocksdb::GetColumnFamilyOptionsFromString(
- cf_opt, column.options, &cf_opt);
- if (!status.ok()) {
- derr << __func__ << " invalid db column family options for CF '"
- << column.name << "': " << column.options << dendl;
- return -EINVAL;
- }
- install_cf_mergeop(column.name, &cf_opt);
- if (column.shard_cnt == 1) {
- column_families.push_back(rocksdb::ColumnFamilyDescriptor(column.name, cf_opt));
- } else {
- for (size_t i = 0; i < column.shard_cnt; i++) {
- std::string cf_name = column.name + "-" + to_string(i);
- column_families.push_back(rocksdb::ColumnFamilyDescriptor(cf_name, cf_opt));
- }
- }
- }
- column_families.push_back(rocksdb::ColumnFamilyDescriptor("default",opt));
std::vector<rocksdb::ColumnFamilyHandle*> handles;
if (open_readonly) {
status = rocksdb::DB::OpenForReadOnly(rocksdb::DBOptions(opt),
- path, column_families,
+ path, existing_cfs,
&handles, &db);
} else {
status = rocksdb::DB::Open(rocksdb::DBOptions(opt),
- path, column_families, &handles, &db);
+ path, existing_cfs, &handles, &db);
}
if (!status.ok()) {
derr << status.ToString() << dendl;
return -EINVAL;
}
+ ceph_assert(existing_cfs.size() == existing_cfs_shard.size() + 1);
+ ceph_assert(handles.size() == existing_cfs.size());
+ dout(10) << __func__ << " existing_cfs=" << existing_cfs.size() << dendl;
+ for (size_t i = 0; i < existing_cfs_shard.size(); i++) {
+ add_column_family(existing_cfs_shard[i].second.name,
+ existing_cfs_shard[i].second.hash_l,
+ existing_cfs_shard[i].second.hash_h,
+ existing_cfs_shard[i].first,
+ handles[i]);
+ }
+ default_cf = handles[handles.size() - 1];
+ must_close_default_cf = true;
- size_t pos = 0;
- for (auto& column : stored_sharding_def) {
- for (size_t i = 0; i < column.shard_cnt; i++) {
- add_column_family(column.name, column.hash_l, column.hash_h, i, handles[pos]);
- pos++;
+ if (missing_cfs.size() > 0) {
+ dout(10) << __func__ << " missing_cfs=" << missing_cfs.size() << dendl;
+ ceph_assert(recreate_mode);
+ ceph_assert(missing_cfs.size() == missing_cfs_shard.size());
+ for (size_t i = 0; i < missing_cfs.size(); i++) {
+ rocksdb::ColumnFamilyHandle *cf;
+ status = db->CreateColumnFamily(missing_cfs[i].options, missing_cfs[i].name, &cf);
+ if (!status.ok()) {
+ derr << __func__ << " Failed to create rocksdb column family: "
+ << missing_cfs[i].name << dendl;
+ return -EINVAL;
+ }
+ add_column_family(missing_cfs_shard[i].second.name,
+ missing_cfs_shard[i].second.hash_l,
+ missing_cfs_shard[i].second.hash_h,
+ missing_cfs_shard[i].first,
+ cf);
}
+ opt.env->DeleteFile(sharding_recreate);
}
- ceph_assert(pos == handles.size() - 1);
- default_cf = handles[pos];
- must_close_default_cf = true;
}
}
ceph_assert(default_cf != nullptr);
int RocksDBStore::repair(std::ostream &out)
{
+ rocksdb::Status status;
rocksdb::Options opt;
int r = load_rocksdb_options(false, opt);
if (r) {
out << "load rocksdb options failed" << std::endl;
return r;
}
- rocksdb::Status status = rocksdb::RepairDB(path, opt);
+ //need to save sharding definition, repairDB will delete files it does not know
+ std::string stored_sharding_text;
+ status = opt.env->FileExists(sharding_def_file);
if (status.ok()) {
+ status = rocksdb::ReadFileToString(opt.env,
+ sharding_def_file,
+ &stored_sharding_text);
+ if (!status.ok()) {
+ stored_sharding_text.clear();
+ }
+ }
+ dout(10) << __func__ << " stored_sharding: " << stored_sharding_text << dendl;
+ status = rocksdb::RepairDB(path, opt);
+ bool repaired = status.ok();
+ if (!stored_sharding_text.empty()) {
+ //recreate markers even if repair failed
+ opt.env->CreateDir(sharding_def_dir);
+ status = rocksdb::WriteStringToFile(opt.env, stored_sharding_text,
+ sharding_def_file, true);
+ if (!status.ok()) {
+ derr << __func__ << " cannot write to " << sharding_def_file << dendl;
+ return -1;
+ }
+ status = rocksdb::WriteStringToFile(opt.env, "1",
+ sharding_recreate, true);
+ if (!status.ok()) {
+ derr << __func__ << " cannot write to " << sharding_recreate << dendl;
+ return -1;
+ }
+ }
+
+ if (repaired && status.ok()) {
return 0;
} else {
out << "repair rocksdb failed : " << status.ToString() << std::endl;
- return 1;
+ return -1;
}
}