using ceph::bufferptr;
using ceph::Formatter;
+static const char* sharding_def_file = "sharding/def";
+
static bufferlist to_bufferlist(rocksdb::Slice in) {
bufferlist bl;
bl.append(bufferptr(in.data(), in.size()));
}
int RocksDBStore::install_cf_mergeop(
- const string &cf_name,
+ const string &key_prefix,
rocksdb::ColumnFamilyOptions *cf_opt)
{
ceph_assert(cf_opt != nullptr);
cf_opt->merge_operator.reset();
for (auto& i : merge_ops) {
- if (i.first == cf_name) {
+ if (i.first == key_prefix) {
cf_opt->merge_operator.reset(new MergeOperatorLinker(i.second));
}
}
}
int RocksDBStore::create_and_open(ostream &out,
- const vector<ColumnFamily>& cfs)
+ const std::string& cfs)
{
int r = create_db_dir();
if (r < 0)
return r;
- if (cfs.empty()) {
- return do_open(out, true, false, nullptr);
- } else {
- return do_open(out, true, false, &cfs);
- }
+ return do_open(out, true, false, cfs);
}
int RocksDBStore::load_rocksdb_options(bool create_if_missing, rocksdb::Options& opt)
<< dendl;
opt.merge_operator.reset(new MergeOperatorRouter(*this));
+ comparator = opt.comparator;
+ return 0;
+}
+
+void RocksDBStore::add_column_family(const std::string& cf_name, uint32_t hash_l, uint32_t hash_h,
+ size_t shard_idx, rocksdb::ColumnFamilyHandle *handle) {
+ dout(10) << __func__ << " column_name=" << cf_name << " shard_idx=" << shard_idx <<
+ " hash_l=" << hash_l << " hash_h=" << hash_h << " handle=" << (void*) handle << dendl;
+ bool exists = cf_handles.count(cf_name) > 0;
+ auto& column = cf_handles[cf_name];
+ if (exists) {
+ ceph_assert(hash_l == column.hash_l);
+ ceph_assert(hash_h == column.hash_h);
+ } else {
+ ceph_assert(hash_l < hash_h);
+ column.hash_l = hash_l;
+ column.hash_h = hash_h;
+ }
+ if (column.handles.size() <= shard_idx)
+ column.handles.resize(shard_idx + 1);
+ column.handles[shard_idx] = handle;
+}
+
+bool RocksDBStore::is_column_family(const std::string& prefix) {
+ return cf_handles.count(prefix);
+}
+rocksdb::ColumnFamilyHandle *RocksDBStore::get_cf_handle(const std::string& prefix, const std::string& key) {
+ auto iter = cf_handles.find(prefix);
+ if (iter == cf_handles.end())
+ return nullptr;
+ else {
+ if (iter->second.handles.size() == 1) {
+ return iter->second.handles[0];
+ } else {
+ uint32_t hash_l = std::min<uint32_t>(iter->second.hash_l, key.size());
+ uint32_t hash_h = std::min<uint32_t>(iter->second.hash_h, key.size());
+ uint32_t hash = ceph_str_hash_rjenkins(&key[hash_l], hash_h - hash_l);
+ return iter->second.handles[hash % iter->second.handles.size()];
+ }
+ }
+}
+
+rocksdb::ColumnFamilyHandle *RocksDBStore::get_cf_handle(const std::string& prefix, const char* key, size_t keylen) {
+ auto iter = cf_handles.find(prefix);
+ if (iter == cf_handles.end())
+ return nullptr;
+ else {
+ if (iter->second.handles.size() == 1) {
+ return iter->second.handles[0];
+ } else {
+ uint32_t hash_l = std::min<uint32_t>(iter->second.hash_l, keylen);
+ uint32_t hash_h = std::min<uint32_t>(iter->second.hash_h, keylen);
+ uint32_t hash = ceph_str_hash_rjenkins(&key[hash_l], hash_h - hash_l);
+ return iter->second.handles[hash % iter->second.handles.size()];
+ }
+ }
+}
+
+/**
+ * Definition of sharding:
+ * space-separated list of: column_def [ '=' options ]
+ * column_def := column_name '(' shard_count ')'
+ * column_def := column_name '(' shard_count ',' hash_begin '-' ')'
+ * column_def := column_name '(' shard_count ',' hash_begin '-' hash_end ')'
+ * I=write_buffer_size=1048576 O(6) m(7,10-) prefix(4,0-10)=disable_auto_compactions=true,max_bytes_for_level_base=1048576
+ */
+bool RocksDBStore::parse_sharding_def(const std::string_view text_def_in,
+ std::vector<ColumnFamily>& sharding_def,
+ char const* *error_position,
+ std::string *error_msg)
+{
+ std::string_view text_def = text_def_in;
+ char const* error_position_local = nullptr;
+ std::string error_msg_local;
+ if (error_position == nullptr) {
+ error_position = &error_position_local;
+ *error_position = nullptr;
+ }
+ if (error_msg == nullptr) {
+ error_msg = &error_msg_local;
+ error_msg->clear();
+ }
+
+ sharding_def.clear();
+ while (!text_def.empty()) {
+ std::string_view options;
+ std::string_view name;
+ size_t shard_cnt = 1;
+ uint32_t l_bound = 0;
+ uint32_t h_bound = std::numeric_limits<uint32_t>::max();
+
+ std::string_view column_def;
+ size_t spos = text_def.find(' ');
+ if (spos == std::string_view::npos) {
+ column_def = text_def;
+ text_def = std::string_view(text_def.end(), 0);
+ } else {
+ column_def = text_def.substr(0, spos);
+ text_def = text_def.substr(spos + 1);
+ }
+ size_t eqpos = column_def.find('=');
+ if (eqpos != std::string_view::npos) {
+ options = column_def.substr(eqpos + 1);
+ column_def = column_def.substr(0, eqpos);
+ }
+
+ std::string_view shards_def;
+ size_t bpos = column_def.find('(');
+ if (bpos != std::string_view::npos) {
+ name = column_def.substr(0, bpos);
+ const char* nptr = &column_def[bpos + 1];
+ char* endptr;
+ shard_cnt = strtol(nptr, &endptr, 10);
+ if (nptr == endptr) {
+ *error_position = nptr;
+ *error_msg = "expecting integer";
+ break;
+ }
+ nptr = endptr;
+ if (*nptr == ',') {
+ nptr++;
+ l_bound = strtol(nptr, &endptr, 10);
+ if (nptr == endptr) {
+ *error_position = nptr;
+ *error_msg = "expecting integer";
+ break;
+ }
+ nptr = endptr;
+ if (*nptr != '-') {
+ *error_position = nptr;
+ *error_msg = "expecting '-'";
+ break;
+ }
+ nptr++;
+ h_bound = strtol(nptr, &endptr, 10);
+ if (nptr == endptr) {
+ h_bound = std::numeric_limits<uint32_t>::max();
+ }
+ nptr = endptr;
+ }
+ if (*nptr != ')') {
+ *error_position = nptr;
+ *error_msg = "expecting ')'";
+ break;
+ }
+ } else {
+ name = column_def;
+ }
+ sharding_def.emplace_back(std::string(name), shard_cnt, std::string(options), l_bound, h_bound);
+ }
+ return *error_position == nullptr;
+}
+
+void RocksDBStore::sharding_def_to_columns(const std::vector<ColumnFamily>& sharding_def,
+ std::vector<std::string>& columns)
+{
+ columns.clear();
+ for (size_t i = 0; i < sharding_def.size(); i++) {
+ if (sharding_def[i].shard_cnt == 1) {
+ columns.push_back(sharding_def[i].name);
+ } else {
+ for (size_t j = 0; j < sharding_def[i].shard_cnt; j++) {
+ columns.push_back(sharding_def[i].name + "-" + to_string(j));
+ }
+ }
+ }
+}
+
+int RocksDBStore::create_shards(const rocksdb::Options& opt,
+ const vector<ColumnFamily>& sharding_def)
+{
+ for (auto& p : sharding_def) {
+ // copy default CF settings, block cache, merge operators as
+ // the base for new CF
+ rocksdb::ColumnFamilyOptions cf_opt(opt);
+ // user input options will override the base options
+ rocksdb::Status status;
+ status = rocksdb::GetColumnFamilyOptionsFromString(
+ cf_opt, p.options, &cf_opt);
+ if (!status.ok()) {
+ derr << __func__ << " invalid db column family option string for CF: "
+ << p.name << dendl;
+ return -EINVAL;
+ }
+ install_cf_mergeop(p.name, &cf_opt);
+ for (size_t idx = 0; idx < p.shard_cnt; idx++) {
+ std::string cf_name;
+ if (p.shard_cnt == 1)
+ cf_name = p.name;
+ else
+ cf_name = p.name + "-" + to_string(idx);
+ rocksdb::ColumnFamilyHandle *cf;
+ status = db->CreateColumnFamily(cf_opt, cf_name, &cf);
+ if (!status.ok()) {
+ derr << __func__ << " Failed to create rocksdb column family: "
+ << cf_name << dendl;
+ return -EINVAL;
+ }
+ // store the new CF handle
+ add_column_family(p.name, p.hash_l, p.hash_h, idx, cf);
+ }
+ }
return 0;
}
+std::ostream& operator<<(std::ostream& out, const RocksDBStore::ColumnFamily& cf)
+{
+ out << "(";
+ out << cf.name;
+ out << ",";
+ out << cf.shard_cnt;
+ out << ",";
+ out << cf.hash_l;
+ out << "-";
+ if (cf.hash_h != std::numeric_limits<uint32_t>::max()) {
+ out << cf.hash_h;
+ }
+ out << ",";
+ out << cf.options;
+ out << ")";
+ return out;
+}
+
int RocksDBStore::do_open(ostream &out,
bool create_if_missing,
bool open_readonly,
- const vector<ColumnFamily>* cfs)
+ const std::string& sharding_text)
{
ceph_assert(!(create_if_missing && open_readonly));
rocksdb::Options opt;
return -EINVAL;
}
// create and open column families
- if (cfs) {
- for (auto& p : *cfs) {
- // copy default CF settings, block cache, merge operators as
- // the base for new CF
- rocksdb::ColumnFamilyOptions cf_opt(opt);
- // user input options will override the base options
- status = rocksdb::GetColumnFamilyOptionsFromString(
- cf_opt, p.option, &cf_opt);
- if (!status.ok()) {
- derr << __func__ << " invalid db column family option string for CF: "
- << p.name << dendl;
- return -EINVAL;
- }
- install_cf_mergeop(p.name, &cf_opt);
- rocksdb::ColumnFamilyHandle *cf;
- status = db->CreateColumnFamily(cf_opt, p.name, &cf);
- if (!status.ok()) {
- derr << __func__ << " Failed to create rocksdb column family: "
- << p.name << dendl;
- return -EINVAL;
- }
- // store the new CF handle
- add_column_family(p.name, static_cast<void*>(cf));
+ if (!sharding_text.empty()) {
+ bool b;
+ std::vector<ColumnFamily> sharding_def;
+ b = parse_sharding_def(sharding_text, sharding_def);
+ if (!b) {
+ dout(1) << __func__ << " bad sharding: " << sharding_text << dendl;
+ return -EINVAL;
+ }
+ r = create_shards(opt, sharding_def);
+ if (r != 0 ) {
+ return r;
+ }
+ opt.env->CreateDir("sharding");
+ status = rocksdb::WriteStringToFile(opt.env, sharding_text,
+ sharding_def_file, true);
+ if (!status.ok()) {
+ derr << __func__ << " cannot write to " << sharding_def_file << dendl;
+ return -EIO;
}
+ } else {
+ status = opt.env->DeleteFile(sharding_def_file);
}
default_cf = db->DefaultColumnFamily();
} else {
+ std::string stored_sharding_text;
+ status = opt.env->FileExists(sharding_def_file);
+ if (status.ok()) {
+ status = rocksdb::ReadFileToString(opt.env,
+ sharding_def_file,
+ &stored_sharding_text);
+ if(!status.ok()) {
+ derr << __func__ << " cannot read from " << sharding_def_file << dendl;
+ return -EIO;
+ }
+ } else {
+ //no "sharding_def" present
+ }
+ //check if sharding_def matches stored_sharding_def
+ std::vector<ColumnFamily> sharding_def;
+ parse_sharding_def(sharding_text, sharding_def);
+ std::vector<ColumnFamily> stored_sharding_def;
+ parse_sharding_def(stored_sharding_text, stored_sharding_def);
+
+ std::sort(sharding_def.begin(), sharding_def.end(),
+ [](ColumnFamily& a, ColumnFamily&b) {return a.name < b.name;});
+ std::sort(stored_sharding_def.begin(), stored_sharding_def.end(),
+ [](ColumnFamily& a, ColumnFamily&b) {return a.name < b.name;});
+
+ bool match = true;
+ if (sharding_def.size() != stored_sharding_def.size()) {
+ match = false;
+ } else {
+ for (size_t i = 0; i < sharding_def.size(); i++) {
+ auto& a = sharding_def[i];
+ auto& b = stored_sharding_def[i];
+ if ( (a.name != b.name) ||
+ (a.shard_cnt != b.shard_cnt) ||
+ (a.hash_l != b.hash_l) ||
+ (a.hash_h != b.hash_h) ) {
+ match = false;
+ break;
+ }
+ }
+ }
+ if (!match) {
+ derr << __func__ << " mismatch on sharding. requested = " << sharding_def
+ << " stored = " << stored_sharding_def << dendl;
+ return -EIO;
+ }
+
std::vector<string> existing_cfs;
status = rocksdb::DB::ListColumnFamilies(
rocksdb::DBOptions(opt),
path,
&existing_cfs);
- dout(1) << __func__ << " column families: " << existing_cfs << dendl;
- if (existing_cfs.empty()) {
+ dout(5) << __func__ << " column families from rocksdb: " << existing_cfs << dendl;
+
+ std::sort(existing_cfs.begin(), existing_cfs.end(),
+ [&](const std::string& a, const std::string&b) {
+ return a < b;}
+ );
+
+ std::vector<std::string> columns_from_stored;
+ sharding_def_to_columns(stored_sharding_def, columns_from_stored);
+ columns_from_stored.push_back(rocksdb::kDefaultColumnFamilyName);
+ std::sort(columns_from_stored.begin(), columns_from_stored.end());
+
+ if (existing_cfs != columns_from_stored) {
+ derr << __func__ << " mismatch on sharding. rocksdb columns = " << existing_cfs
+ << " stored columns = " << columns_from_stored << dendl;
+ }
+
+ if (columns_from_stored.empty()) {
// no column families
if (open_readonly) {
status = rocksdb::DB::Open(opt, path, &db);
}
default_cf = db->DefaultColumnFamily();
} else {
- // we cannot change column families for a created database. so, map
- // what options we are given to whatever cf's already exist.
std::vector<rocksdb::ColumnFamilyDescriptor> column_families;
- for (auto& n : existing_cfs) {
- // copy default CF settings, block cache, merge operators as
- // the base for new CF
+ for (auto& column : stored_sharding_def) {
rocksdb::ColumnFamilyOptions cf_opt(opt);
- bool found = false;
- if (cfs) {
- for (auto& i : *cfs) {
- if (i.name == n) {
- found = true;
- status = rocksdb::GetColumnFamilyOptionsFromString(
- cf_opt, i.option, &cf_opt);
- if (!status.ok()) {
- derr << __func__ << " invalid db column family options for CF '"
- << i.name << "': " << i.option << dendl;
- return -EINVAL;
- }
- }
- }
- }
- if (n != rocksdb::kDefaultColumnFamilyName) {
- install_cf_mergeop(n, &cf_opt);
+ status = rocksdb::GetColumnFamilyOptionsFromString(
+ cf_opt, column.options, &cf_opt);
+ if (!status.ok()) {
+ derr << __func__ << " invalid db column family options for CF '"
+ << column.name << "': " << column.options << dendl;
+ return -EINVAL;
}
- column_families.push_back(rocksdb::ColumnFamilyDescriptor(n, cf_opt));
- if (!found && n != rocksdb::kDefaultColumnFamilyName) {
- dout(1) << __func__ << " column family '" << n
- << "' exists but not expected" << dendl;
+ install_cf_mergeop(column.name, &cf_opt);
+ if (column.shard_cnt == 1) {
+ column_families.push_back(rocksdb::ColumnFamilyDescriptor(column.name, cf_opt));
+ } else {
+ for (size_t i = 0; i < column.shard_cnt; i++) {
+ std::string cf_name = column.name + "-" + to_string(i);
+ column_families.push_back(rocksdb::ColumnFamilyDescriptor(cf_name, cf_opt));
+ }
}
}
+ column_families.push_back(rocksdb::ColumnFamilyDescriptor("default",opt));
std::vector<rocksdb::ColumnFamilyHandle*> handles;
if (open_readonly) {
status = rocksdb::DB::OpenForReadOnly(rocksdb::DBOptions(opt),
derr << status.ToString() << dendl;
return -EINVAL;
}
- for (unsigned i = 0; i < existing_cfs.size(); ++i) {
- if (existing_cfs[i] == rocksdb::kDefaultColumnFamilyName) {
- default_cf = handles[i];
- must_close_default_cf = true;
- } else {
- add_column_family(existing_cfs[i], static_cast<void*>(handles[i]));
+
+ size_t pos = 0;
+ for (auto& column : stored_sharding_def) {
+ for (size_t i = 0; i < column.shard_cnt; i++) {
+ add_column_family(column.name, column.hash_l, column.hash_h, i, handles[pos]);
+ pos++;
}
}
+ ceph_assert(pos == handles.size() - 1);
+ default_cf = handles[pos];
+ must_close_default_cf = true;
}
}
ceph_assert(default_cf != nullptr);
// Ensure db is destroyed before dependent db_cache and filterpolicy
for (auto& p : cf_handles) {
- db->DestroyColumnFamilyHandle(
- static_cast<rocksdb::ColumnFamilyHandle*>(p.second));
- p.second = nullptr;
+ for (size_t i = 0; i < p.second.handles.size(); i++) {
+ db->DestroyColumnFamilyHandle(p.second.handles[i]);
+ }
}
+ cf_handles.clear();
if (must_close_default_cf) {
db->DestroyColumnFamilyHandle(default_cf);
must_close_default_cf = false;
int64_t RocksDBStore::estimate_prefix_size(const string& prefix,
const string& key_prefix)
{
- auto cf = get_cf_handle(prefix);
uint64_t size = 0;
uint8_t flags =
//rocksdb::DB::INCLUDE_MEMTABLES | // do not include memtables...
rocksdb::DB::INCLUDE_FILES;
- if (cf) {
- string start = key_prefix + string(1, '\x00');
- string limit = key_prefix + string("\xff\xff\xff\xff");
- rocksdb::Range r(start, limit);
- db->GetApproximateSizes(cf, &r, 1, &size, flags);
+ auto p_iter = cf_handles.find(prefix);
+ if (p_iter != cf_handles.end()) {
+ for (auto cf : p_iter->second.handles) {
+ uint64_t s = 0;
+ string start = key_prefix + string(1, '\x00');
+ string limit = key_prefix + string("\xff\xff\xff\xff");
+ rocksdb::Range r(start, limit);
+ db->GetApproximateSizes(cf, &r, 1, &s, flags);
+ size += s;
+ }
} else {
string start = combine_strings(prefix , key_prefix);
string limit = combine_strings(prefix , key_prefix + "\xff\xff\xff\xff");
const string &k,
const bufferlist &to_set_bl)
{
- auto cf = db->get_cf_handle(prefix);
+ auto cf = db->get_cf_handle(prefix, k);
if (cf) {
put_bat(bat, cf, k, to_set_bl);
} else {
const char *k, size_t keylen,
const bufferlist &to_set_bl)
{
- auto cf = db->get_cf_handle(prefix);
+ auto cf = db->get_cf_handle(prefix, k, keylen);
if (cf) {
string key(k, keylen); // fixme?
put_bat(bat, cf, key, to_set_bl);
void RocksDBStore::RocksDBTransactionImpl::rmkey(const string &prefix,
const string &k)
{
- auto cf = db->get_cf_handle(prefix);
+ auto cf = db->get_cf_handle(prefix, k);
if (cf) {
bat.Delete(cf, rocksdb::Slice(k));
} else {
const char *k,
size_t keylen)
{
- auto cf = db->get_cf_handle(prefix);
+ auto cf = db->get_cf_handle(prefix, k, keylen);
if (cf) {
bat.Delete(cf, rocksdb::Slice(k, keylen));
} else {
void RocksDBStore::RocksDBTransactionImpl::rm_single_key(const string &prefix,
const string &k)
{
- auto cf = db->get_cf_handle(prefix);
+ auto cf = db->get_cf_handle(prefix, k);
if (cf) {
bat.SingleDelete(cf, k);
} else {
void RocksDBStore::RocksDBTransactionImpl::rmkeys_by_prefix(const string &prefix)
{
- auto cf = db->get_cf_handle(prefix);
- uint64_t cnt = db->delete_range_threshold;
- bat.SetSavePoint();
- auto it = db->get_iterator(prefix);
- for (it->seek_to_first(); it->valid(); it->next()) {
- if (!cnt) {
- bat.RollbackToSavePoint();
- if (cf) {
- string endprefix = "\xff\xff\xff\xff"; // FIXME: this is cheating...
- bat.DeleteRange(cf, string(), endprefix);
- } else {
- string endprefix = prefix;
+ auto p_iter = db->cf_handles.find(prefix);
+ if (p_iter == db->cf_handles.end()) {
+ uint64_t cnt = db->delete_range_threshold;
+ bat.SetSavePoint();
+ auto it = db->get_iterator(prefix);
+ for (it->seek_to_first(); it->valid() && (--cnt) != 0; it->next()) {
+ bat.Delete(db->default_cf, combine_strings(prefix, it->key()));
+ }
+ if (cnt == 0) {
+ bat.RollbackToSavePoint();
+ string endprefix = prefix;
endprefix.push_back('\x01');
bat.DeleteRange(db->default_cf,
combine_strings(prefix, string()),
combine_strings(endprefix, string()));
- }
- return;
- }
- if (cf) {
- bat.Delete(cf, rocksdb::Slice(it->key()));
} else {
- bat.Delete(db->default_cf, combine_strings(prefix, it->key()));
+ bat.PopSavePoint();
+ }
+ } else {
+ ceph_assert(p_iter->second.handles.size() >= 1);
+ for (auto cf : p_iter->second.handles) {
+ uint64_t cnt = db->delete_range_threshold;
+ bat.SetSavePoint();
+ auto it = db->new_shard_iterator(cf);
+ for (it->SeekToFirst(); it->Valid() && (--cnt) != 0; it->Next()) {
+ bat.Delete(cf, it->key());
+ }
+ if (cnt == 0) {
+ bat.RollbackToSavePoint();
+ string endprefix = "\xff\xff\xff\xff"; // FIXME: this is cheating...
+ bat.DeleteRange(cf, string(), endprefix);
+ } else {
+ bat.PopSavePoint();
+ }
}
- --cnt;
}
- bat.PopSavePoint();
}
void RocksDBStore::RocksDBTransactionImpl::rm_range_keys(const string &prefix,
const string &start,
const string &end)
{
- auto cf = db->get_cf_handle(prefix);
-
- uint64_t cnt = db->delete_range_threshold;
- auto it = db->get_iterator(prefix);
- bat.SetSavePoint();
- it->lower_bound(start);
- while (it->valid()) {
- if (it->key() >= end) {
- break;
+ auto p_iter = db->cf_handles.find(prefix);
+ if (p_iter == db->cf_handles.end()) {
+ uint64_t cnt = db->delete_range_threshold;
+ bat.SetSavePoint();
+ auto it = db->get_iterator(prefix);
+ for (it->lower_bound(start);
+ it->valid() && db->comparator->Compare(it->key(), end) < 0 && (--cnt) != 0;
+ it->next()) {
+ bat.Delete(db->default_cf, combine_strings(prefix, it->key()));
}
- if (!cnt) {
+ if (cnt == 0) {
bat.RollbackToSavePoint();
- if (cf) {
- bat.DeleteRange(cf, rocksdb::Slice(start), rocksdb::Slice(end));
+ bat.DeleteRange(db->default_cf,
+ rocksdb::Slice(combine_strings(prefix, start)),
+ rocksdb::Slice(combine_strings(prefix, end)));
+ } else {
+ bat.PopSavePoint();
+ }
+ } else {
+ ceph_assert(p_iter->second.handles.size() >= 1);
+ for (auto cf : p_iter->second.handles) {
+ uint64_t cnt = db->delete_range_threshold;
+ bat.SetSavePoint();
+ rocksdb::Iterator* it = db->new_shard_iterator(cf);
+ ceph_assert(it != nullptr);
+ for (it->Seek(start);
+ it->Valid() && db->comparator->Compare(it->key(), end) < 0 && (--cnt) != 0;
+ it->Next()) {
+ bat.Delete(cf, it->key());
+ }
+ if (cnt == 0) {
+ bat.RollbackToSavePoint();
+ bat.DeleteRange(cf, rocksdb::Slice(start), rocksdb::Slice(end));
} else {
- bat.DeleteRange(db->default_cf,
- rocksdb::Slice(combine_strings(prefix, start)),
- rocksdb::Slice(combine_strings(prefix, end)));
+ bat.PopSavePoint();
}
- return;
+ delete it;
}
- if (cf) {
- bat.Delete(cf, rocksdb::Slice(it->key()));
- } else {
- bat.Delete(db->default_cf, combine_strings(prefix, it->key()));
- }
- it->next();
- --cnt;
}
- bat.PopSavePoint();
}
void RocksDBStore::RocksDBTransactionImpl::merge(
const string &k,
const bufferlist &to_set_bl)
{
- auto cf = db->get_cf_handle(prefix);
+ auto cf = db->get_cf_handle(prefix, k);
if (cf) {
// bufferlist::c_str() is non-constant, so we can't call c_str()
if (to_set_bl.is_contiguous() && to_set_bl.length() > 0) {
{
rocksdb::PinnableSlice value;
utime_t start = ceph_clock_now();
- auto cf = get_cf_handle(prefix);
- if (cf) {
+ if (cf_handles.count(prefix) > 0) {
for (auto& key : keys) {
+ auto cf_handle = get_cf_handle(prefix, key);
auto status = db->Get(rocksdb::ReadOptions(),
- cf,
+ cf_handle,
rocksdb::Slice(key),
&value);
if (status.ok()) {
int r = 0;
rocksdb::PinnableSlice value;
rocksdb::Status s;
- auto cf = get_cf_handle(prefix);
+ auto cf = get_cf_handle(prefix, key);
if (cf) {
s = db->Get(rocksdb::ReadOptions(),
cf,
int r = 0;
rocksdb::PinnableSlice value;
rocksdb::Status s;
- auto cf = get_cf_handle(prefix);
+ auto cf = get_cf_handle(prefix, key, keylen);
if (cf) {
s = db->Get(rocksdb::ReadOptions(),
cf,
int RocksDBStore::split_key(rocksdb::Slice in, string *prefix, string *key)
{
size_t prefix_len = 0;
-
+
// Find separator inside Slice
char* separator = (char*) memchr(in.data(), 0, in.size());
if (separator == NULL)
rocksdb::CompactRangeOptions options;
db->CompactRange(options, default_cf, nullptr, nullptr);
for (auto cf : cf_handles) {
- db->CompactRange(
- options,
- static_cast<rocksdb::ColumnFamilyHandle*>(cf.second),
- nullptr, nullptr);
+ for (auto shard_cf : cf.second.handles) {
+ db->CompactRange(
+ options,
+ shard_cf,
+ nullptr, nullptr);
+ }
}
}
-
void RocksDBStore::compact_thread_entry()
{
std::unique_lock l{compact_queue_lock};
}
};
+class ShardMergeIteratorImpl : public KeyValueDB::IteratorImpl {
+private:
+ struct KeyLess {
+ private:
+ const rocksdb::Comparator* comparator;
+ public:
+ KeyLess(const rocksdb::Comparator* comparator) : comparator(comparator) { };
+
+ bool operator()(rocksdb::Iterator* a, rocksdb::Iterator* b) const
+ {
+ if (a->Valid()) {
+ if (b->Valid()) {
+ return comparator->Compare(a->key(), b->key()) < 0;
+ } else {
+ return true;
+ }
+ } else {
+ if (b->Valid()) {
+ return false;
+ } else {
+ return static_cast<void*>(a) < static_cast<void*>(b);
+ }
+ }
+ }
+ };
+
+ const RocksDBStore* db;
+ KeyLess keyless;
+ string prefix;
+ std::vector<rocksdb::Iterator*> iters;
+public:
+ explicit ShardMergeIteratorImpl(const RocksDBStore* db,
+ const std::string& prefix,
+ const std::vector<rocksdb::ColumnFamilyHandle*>& shards)
+ : db(db), keyless(db->comparator), prefix(prefix)
+ {
+ iters.reserve(shards.size());
+ for (auto& s : shards) {
+ iters.push_back(db->db->NewIterator(rocksdb::ReadOptions(), s));
+ }
+ }
+ ~ShardMergeIteratorImpl() {
+ for (auto& it : iters) {
+ delete it;
+ }
+ }
+ int seek_to_first() override {
+ for (auto& it : iters) {
+ it->SeekToFirst();
+ if (!it->status().ok()) {
+ return -1;
+ }
+ }
+ //all iterators seeked, sort
+ std::sort(iters.begin(), iters.end(), keyless);
+ return 0;
+ }
+ int seek_to_last() override {
+ for (auto& it : iters) {
+ it->SeekToLast();
+ if (!it->status().ok()) {
+ return -1;
+ }
+ }
+ for (size_t i = 0; i < iters.size(); i++) {
+ if (keyless(iters[0], iters[i])) {
+ swap(iters[0], iters[i]);
+ }
+ //it might happen that cf was empty
+ if (iters[i]->Valid()) {
+ iters[i]->Next();
+ }
+ }
+ //no need to sort, as at most 1 iterator is valid now
+ return 0;
+ }
+ int upper_bound(const string &after) override {
+ rocksdb::Slice slice_bound(after);
+ for (auto& it : iters) {
+ it->Seek(slice_bound);
+ if (it->Valid() && it->key() == after) {
+ it->Next();
+ }
+ if (!it->status().ok()) {
+ return -1;
+ }
+ }
+ std::sort(iters.begin(), iters.end(), keyless);
+ return 0;
+ }
+ int lower_bound(const string &to) override {
+ rocksdb::Slice slice_bound(to);
+ for (auto& it : iters) {
+ it->Seek(slice_bound);
+ if (!it->status().ok()) {
+ return -1;
+ }
+ }
+ std::sort(iters.begin(), iters.end(), keyless);
+ return 0;
+ }
+ int next() override {
+ int r = -1;
+ if (iters[0]->Valid()) {
+ iters[0]->Next();
+ if (iters[0]->status().ok()) {
+ r = 0;
+ //bubble up
+ for (size_t i = 0; i < iters.size() - 1; i++) {
+ if (keyless(iters[i], iters[i + 1])) {
+ //matches, fixed
+ break;
+ }
+ std::swap(iters[i], iters[i + 1]);
+ }
+ }
+ }
+ return r;
+ }
+ // iters are sorted, so
+ // a[0] < b[0] < c[0] < d[0]
+ // a[0] > a[-1], a[0] > b[-1], a[0] > c[-1], a[0] > d[-1]
+ // so, prev() will be one of:
+ // a[-1], b[-1], c[-1], d[-1]
+ // prev() will be the one that is *largest* of them
+ //
+ // alg:
+ // 1. go prev() on each iterator we can
+ // 2. select largest key from those iterators
+ // 3. go next() on all iterators except (2)
+ // 4. sort
+ int prev() override {
+ std::vector<rocksdb::Iterator*> prev_done;
+ //1
+ for (auto it: iters) {
+ if (it->Valid()) {
+ it->Prev();
+ if (it->Valid()) {
+ prev_done.push_back(it);
+ } else {
+ it->SeekToFirst();
+ }
+ } else {
+ it->SeekToLast();
+ if (it->Valid()) {
+ prev_done.push_back(it);
+ }
+ }
+ }
+ if (prev_done.size() == 0) {
+ /* there is no previous element */
+ if (iters[0]->Valid()) {
+ iters[0]->Prev();
+ ceph_assert(!iters[0]->Valid());
+ }
+ return -1;
+ }
+ //2,3
+ rocksdb::Iterator* highest = prev_done[0];
+ for (size_t i = 1; i < prev_done.size(); i++) {
+ if (keyless(highest, prev_done[i])) {
+ highest->Next();
+ highest = prev_done[i];
+ } else {
+ prev_done[i]->Next();
+ }
+ }
+ //4
+ //insert highest in the beginning, and shift values until we pick highest
+ //untouched rest is sorted - we just prev()/next() them
+ rocksdb::Iterator* hold = highest;
+ for (size_t i = 0; i < iters.size(); i++) {
+ std::swap(hold, iters[i]);
+ if (hold == highest) break;
+ }
+ ceph_assert(hold == highest);
+ return 0;
+ }
+ bool valid() override {
+ return iters[0]->Valid();
+ }
+ string key() override {
+ return iters[0]->key().ToString();
+ }
+ std::pair<std::string, std::string> raw_key() override {
+ return make_pair(prefix, key());
+ }
+ bufferlist value() override {
+ return to_bufferlist(iters[0]->value());
+ }
+ bufferptr value_as_ptr() override {
+ rocksdb::Slice val = iters[0]->value();
+ return bufferptr(val.data(), val.size());
+ }
+ int status() override {
+ return iters[0]->status().ok() ? 0 : -1;
+ }
+};
+
KeyValueDB::Iterator RocksDBStore::get_iterator(const std::string& prefix)
{
- rocksdb::ColumnFamilyHandle *cf_handle =
- static_cast<rocksdb::ColumnFamilyHandle*>(get_cf_handle(prefix));
- if (cf_handle) {
- return std::make_shared<CFIteratorImpl>(
- prefix,
- db->NewIterator(rocksdb::ReadOptions(), cf_handle));
+ auto cf_it = cf_handles.find(prefix);
+ if (cf_it != cf_handles.end()) {
+ if (cf_it->second.handles.size() == 1) {
+ return std::make_shared<CFIteratorImpl>(
+ prefix,
+ db->NewIterator(rocksdb::ReadOptions(), cf_it->second.handles[0]));
+ } else {
+ return std::make_shared<ShardMergeIteratorImpl>(
+ this,
+ prefix,
+ cf_it->second.handles);
+ }
} else {
return KeyValueDB::get_iterator(prefix);
}
}
+
+rocksdb::Iterator* RocksDBStore::new_shard_iterator(rocksdb::ColumnFamilyHandle* cf)
+{
+ return db->NewIterator(rocksdb::ReadOptions(), cf);
+}
#include <time.h>
#include <sys/mount.h>
#include "kv/KeyValueDB.h"
+#include "kv/RocksDBStore.h"
#include "include/Context.h"
#include "common/ceph_argparse.h"
#include "global/global_init.h"
if(string(GetParam()) != "rocksdb")
return;
- std::vector<KeyValueDB::ColumnFamily> cfs;
- cfs.push_back(KeyValueDB::ColumnFamily("cf1", ""));
- cfs.push_back(KeyValueDB::ColumnFamily("cf2", ""));
+ std::string cfs("cf1 cf2");
ASSERT_EQ(0, db->init(g_conf()->bluestore_rocksdb_options));
cout << "creating two column families and opening them" << std::endl;
ASSERT_EQ(0, db->create_and_open(cout, cfs));
if(string(GetParam()) != "rocksdb")
return;
- std::vector<KeyValueDB::ColumnFamily> cfs;
- cfs.push_back(KeyValueDB::ColumnFamily("cf1", ""));
+ std::string cfs("cf1");
ASSERT_EQ(0, db->init(g_conf()->bluestore_rocksdb_options));
cout << "creating one column family and opening it" << std::endl;
ASSERT_EQ(0, db->create_and_open(cout, cfs));
fini();
}
+TEST_P(KVTest, RocksDBShardingIteratorTest) {
+ if(string(GetParam()) != "rocksdb")
+ return;
+
+ std::string cfs("A(6)");
+ ASSERT_EQ(0, db->init(g_conf()->bluestore_rocksdb_options));
+ cout << "creating one column family and opening it" << std::endl;
+ ASSERT_EQ(0, db->create_and_open(cout, cfs));
+ {
+ KeyValueDB::Transaction t = db->get_transaction();
+ for (int v = 100; v <= 999; v++) {
+ std::string str = to_string(v);
+ bufferlist val;
+ val.append(str);
+ t->set("A", str, val);
+ }
+ ASSERT_EQ(0, db->submit_transaction_sync(t));
+ }
+ {
+ KeyValueDB::Iterator it = db->get_iterator("A");
+ int pos = 0;
+ ASSERT_EQ(it->lower_bound(to_string(pos)), 0);
+ for (pos = 100; pos <= 999; pos++) {
+ ASSERT_EQ(it->valid(), true);
+ ASSERT_EQ(it->key(), to_string(pos));
+ ASSERT_EQ(it->value().to_str(), to_string(pos));
+ it->next();
+ }
+ ASSERT_EQ(it->valid(), false);
+ pos = 999;
+ ASSERT_EQ(it->lower_bound(to_string(pos)), 0);
+ for (pos = 999; pos >= 100; pos--) {
+ ASSERT_EQ(it->valid(), true);
+ ASSERT_EQ(it->key(), to_string(pos));
+ ASSERT_EQ(it->value().to_str(), to_string(pos));
+ it->prev();
+ }
+ ASSERT_EQ(it->valid(), false);
+ }
+ fini();
+}
+
TEST_P(KVTest, RocksDBCFMerge) {
if(string(GetParam()) != "rocksdb")
return;
int r = db->set_merge_operator("cf1",p);
if (r < 0)
return; // No merge operators for this database type
- std::vector<KeyValueDB::ColumnFamily> cfs;
- cfs.push_back(KeyValueDB::ColumnFamily("cf1", ""));
+ std::string cfs("cf1");
ASSERT_EQ(0, db->init(g_conf()->bluestore_rocksdb_options));
cout << "creating one column family and opening it" << std::endl;
ASSERT_EQ(0, db->create_and_open(cout, cfs));
if(string(GetParam()) != "rocksdb")
GTEST_SKIP();
- std::vector<KeyValueDB::ColumnFamily> cfs;
- cfs.push_back(KeyValueDB::ColumnFamily("cf1", ""));
+ std::string cfs("cf1");
ASSERT_EQ(0, db->init(g_conf()->bluestore_rocksdb_options));
cout << "creating one column family and opening it" << std::endl;
ASSERT_EQ(0, db->create_and_open(cout));
if(string(GetParam()) != "rocksdb")
GTEST_SKIP();
- std::vector<KeyValueDB::ColumnFamily> cfs;
- cfs.push_back(KeyValueDB::ColumnFamily("cf1", ""));
+ std::string cfs("cf1");
ASSERT_EQ(0, db->init(g_conf()->bluestore_rocksdb_options));
cout << "creating one column family and opening it" << std::endl;
ASSERT_EQ(0, db->create_and_open(cout, cfs));
fini();
}
+TEST_P(KVTest, RocksDB_parse_sharding_def) {
+ if(string(GetParam()) != "rocksdb")
+ GTEST_SKIP();
+
+ bool result;
+ std::vector<RocksDBStore::ColumnFamily> sharding_def;
+ char const* error_position = nullptr;
+ std::string error_msg;
+
+ std::string_view text_def = "A(10,0-30) B(6)=option1,option2=aaaa C";
+ result = RocksDBStore::parse_sharding_def(text_def,
+ sharding_def,
+ &error_position,
+ &error_msg);
+
+ ASSERT_EQ(result, true);
+ ASSERT_EQ(error_position, nullptr);
+ ASSERT_EQ(error_msg, "");
+ std::cout << text_def << std::endl;
+ if (error_position) std::cout << std::string(error_position - text_def.begin(), ' ') << "^" << error_msg << std::endl;
+
+ ASSERT_EQ(sharding_def.size(), 3);
+ ASSERT_EQ(sharding_def[0].name, "A");
+ ASSERT_EQ(sharding_def[0].shard_cnt, 10);
+ ASSERT_EQ(sharding_def[0].hash_l, 0);
+ ASSERT_EQ(sharding_def[0].hash_h, 30);
+
+ ASSERT_EQ(sharding_def[1].name, "B");
+ ASSERT_EQ(sharding_def[1].shard_cnt, 6);
+ ASSERT_EQ(sharding_def[1].options, "option1,option2=aaaa");
+ ASSERT_EQ(sharding_def[2].name, "C");
+ ASSERT_EQ(sharding_def[2].shard_cnt, 1);
+
+
+ text_def = "A(10 B(6)=option C";
+ result = RocksDBStore::parse_sharding_def(text_def,
+ sharding_def,
+ &error_position,
+ &error_msg);
+ std::cout << text_def << std::endl;
+ if (error_position)
+ std::cout << std::string(error_position - text_def.begin(), ' ') << "^" << error_msg << std::endl;
+ ASSERT_EQ(result, false);
+ ASSERT_NE(error_position, nullptr);
+ ASSERT_NE(error_msg, "");
+
+ text_def = "A(10,1) B(6)=option C";
+ result = RocksDBStore::parse_sharding_def(text_def,
+ sharding_def,
+ &error_position,
+ &error_msg);
+ std::cout << text_def << std::endl;
+ std::cout << std::string(error_position - text_def.begin(), ' ') << "^" << error_msg << std::endl;
+ ASSERT_EQ(result, false);
+ ASSERT_NE(error_position, nullptr);
+ ASSERT_NE(error_msg, "");
+}
+
+
INSTANTIATE_TEST_SUITE_P(
KeyValueDB,
KVTest,