return cf_handles.count(prefix);
}
+std::string_view RocksDBStore::get_key_hash_view(const prefix_shards& shards, const char* key, const size_t keylen) {
+ uint32_t hash_l = std::min<uint32_t>(shards.hash_l, keylen);
+ uint32_t hash_h = std::min<uint32_t>(shards.hash_h, keylen);
+ return { key + hash_l, hash_h - hash_l };
+}
+
+rocksdb::ColumnFamilyHandle *RocksDBStore::get_key_cf(const prefix_shards& shards, const char* key, const size_t keylen) {
+ auto sv = get_key_hash_view(shards, key, keylen);
+ uint32_t hash = ceph_str_hash_rjenkins(sv.data(), sv.size());
+ return shards.handles[hash % shards.handles.size()];
+}
+
rocksdb::ColumnFamilyHandle *RocksDBStore::get_cf_handle(const std::string& prefix, const std::string& key) {
auto iter = cf_handles.find(prefix);
if (iter == cf_handles.end()) {
if (iter->second.handles.size() == 1) {
return iter->second.handles[0];
} else {
- uint32_t hash_l = std::min<uint32_t>(iter->second.hash_l, key.size());
- uint32_t hash_h = std::min<uint32_t>(iter->second.hash_h, key.size());
- uint32_t hash = ceph_str_hash_rjenkins(&key[hash_l], hash_h - hash_l);
- return iter->second.handles[hash % iter->second.handles.size()];
+ return get_key_cf(iter->second, key.data(), key.size());
}
}
}
if (iter->second.handles.size() == 1) {
return iter->second.handles[0];
} else {
- uint32_t hash_l = std::min<uint32_t>(iter->second.hash_l, keylen);
- uint32_t hash_h = std::min<uint32_t>(iter->second.hash_h, keylen);
- uint32_t hash = ceph_str_hash_rjenkins(&key[hash_l], hash_h - hash_l);
- return iter->second.handles[hash % iter->second.handles.size()];
+ return get_key_cf(iter->second, key, keylen);
+ }
+ }
+}
+
+/**
+ * If the specified IteratorBounds arg has both an upper and a lower bound defined, and they have equal placement hash
+ * strings, we can be sure that the entire iteration range exists in a single CF. In that case, we return the relevant
+ * CF handle. In all other cases, we return a nullptr to indicate that the specified bounds cannot necessarily be mapped
+ * to a single CF.
+ */
+rocksdb::ColumnFamilyHandle *RocksDBStore::get_cf_handle(const std::string& prefix, const IteratorBounds& bounds) {
+ if (!bounds.lower_bound || !bounds.upper_bound) {
+ return nullptr;
+ }
+ auto iter = cf_handles.find(prefix);
+ if (iter == cf_handles.end() || iter->second.hash_l != 0) {
+ return nullptr;
+ } else {
+ if (iter->second.handles.size() == 1) {
+ return iter->second.handles[0];
+ } else {
+ auto lower_bound_hash_str = get_key_hash_view(iter->second, bounds.lower_bound->data(), bounds.lower_bound->size());
+ auto upper_bound_hash_str = get_key_hash_view(iter->second, bounds.upper_bound->data(), bounds.upper_bound->size());
+ if (lower_bound_hash_str == upper_bound_hash_str) {
+ auto key = *bounds.lower_bound;
+ return get_key_cf(iter->second, key.data(), key.size());
+ } else {
+ return nullptr;
+ }
}
}
}
protected:
string prefix;
rocksdb::Iterator *dbiter;
+ const KeyValueDB::IteratorBounds bounds;
+ const rocksdb::Slice iterate_lower_bound;
+ const rocksdb::Slice iterate_upper_bound;
public:
- explicit CFIteratorImpl(const std::string& p,
- rocksdb::Iterator *iter)
- : prefix(p), dbiter(iter) { }
+ explicit CFIteratorImpl(const RocksDBStore* db,
+ const std::string& p,
+ rocksdb::ColumnFamilyHandle* cf,
+ KeyValueDB::IteratorBounds bounds_)
+ : prefix(p), bounds(std::move(bounds_)),
+ iterate_lower_bound(make_slice(bounds.lower_bound)),
+ iterate_upper_bound(make_slice(bounds.upper_bound))
+ {
+ auto options = rocksdb::ReadOptions();
+ if (bounds.lower_bound) {
+ options.iterate_lower_bound = &iterate_lower_bound;
+ }
+ if (bounds.upper_bound) {
+ options.iterate_upper_bound = &iterate_upper_bound;
+ }
+ dbiter = db->db->NewIterator(options, cf);
+ }
~CFIteratorImpl() {
delete dbiter;
}
const RocksDBStore* db;
KeyLess keyless;
string prefix;
+ const KeyValueDB::IteratorBounds bounds;
+ const rocksdb::Slice iterate_lower_bound;
+ const rocksdb::Slice iterate_upper_bound;
std::vector<rocksdb::Iterator*> iters;
public:
explicit ShardMergeIteratorImpl(const RocksDBStore* db,
const std::string& prefix,
- const std::vector<rocksdb::ColumnFamilyHandle*>& shards)
- : db(db), keyless(db->comparator), prefix(prefix)
+ const std::vector<rocksdb::ColumnFamilyHandle*>& shards,
+ KeyValueDB::IteratorBounds bounds_)
+ : db(db), keyless(db->comparator), prefix(prefix), bounds(std::move(bounds_)),
+ iterate_lower_bound(make_slice(bounds.lower_bound)),
+ iterate_upper_bound(make_slice(bounds.upper_bound))
{
iters.reserve(shards.size());
+ auto options = rocksdb::ReadOptions();
+ if (bounds.lower_bound) {
+ options.iterate_lower_bound = &iterate_lower_bound;
+ }
+ if (bounds.upper_bound) {
+ options.iterate_upper_bound = &iterate_upper_bound;
+ }
for (auto& s : shards) {
- iters.push_back(db->db->NewIterator(rocksdb::ReadOptions(), s));
+ iters.push_back(db->db->NewIterator(options, s));
}
}
~ShardMergeIteratorImpl() {
}
};
-KeyValueDB::Iterator RocksDBStore::get_iterator(const std::string& prefix, IteratorOpts opts)
+KeyValueDB::Iterator RocksDBStore::get_iterator(const std::string& prefix, IteratorOpts opts, IteratorBounds bounds)
{
auto cf_it = cf_handles.find(prefix);
if (cf_it != cf_handles.end()) {
+ rocksdb::ColumnFamilyHandle* cf = nullptr;
if (cf_it->second.handles.size() == 1) {
+ cf = cf_it->second.handles[0];
+ } else {
+ cf = get_cf_handle(prefix, bounds);
+ }
+ if (cf) {
return std::make_shared<CFIteratorImpl>(
- prefix,
- db->NewIterator(rocksdb::ReadOptions(), cf_it->second.handles[0]));
+ this,
+ prefix,
+ cf,
+ std::move(bounds));
} else {
return std::make_shared<ShardMergeIteratorImpl>(
this,
prefix,
- cf_it->second.handles);
+ cf_it->second.handles,
+ std::move(bounds));
}
} else {
- return KeyValueDB::get_iterator(prefix, opts);
+ return KeyValueDB::get_iterator(prefix, opts, std::move(bounds));
}
}
return db->NewIterator(rocksdb::ReadOptions(), cf);
}
-RocksDBStore::WholeSpaceIterator RocksDBStore::get_wholespace_iterator(IteratorOpts opts)
+RocksDBStore::WholeSpaceIterator RocksDBStore::get_wholespace_iterator(IteratorOpts opts, IteratorBounds bounds)
{
if (cf_handles.size() == 0) {
- rocksdb::ReadOptions opt = rocksdb::ReadOptions();
- if (opts & ITERATOR_NOCACHE)
- opt.fill_cache=false;
return std::make_shared<RocksDBWholeSpaceIteratorImpl>(
- db->NewIterator(opt, default_cf));
+ this, default_cf, opts, std::move(bounds));
} else {
return std::make_shared<WholeMergeIteratorImpl>(this);
}
RocksDBStore::WholeSpaceIterator RocksDBStore::get_default_cf_iterator()
{
- return std::make_shared<RocksDBWholeSpaceIteratorImpl>(
- db->NewIterator(rocksdb::ReadOptions(), default_cf));
+ return std::make_shared<RocksDBWholeSpaceIteratorImpl>(this, default_cf, 0, IteratorBounds());
}
int RocksDBStore::prepare_for_reshard(const std::string& new_sharding,
extern rocksdb::Logger *create_rocksdb_ceph_logger();
+inline rocksdb::Slice make_slice(const std::optional<std::string>& bound) {
+ if (bound) {
+ return {*bound};
+ } else {
+ return {};
+ }
+}
+
/**
* Uses RocksDB to implement the KeyValueDB interface
*/
uint64_t cache_size = 0;
bool set_cache_flag = false;
friend class ShardMergeIteratorImpl;
+ friend class CFIteratorImpl;
friend class WholeMergeIteratorImpl;
/*
* See RocksDB's definition of a column family(CF) and how to use it.
void add_column_family(const std::string& cf_name, uint32_t hash_l, uint32_t hash_h,
size_t shard_idx, rocksdb::ColumnFamilyHandle *handle);
bool is_column_family(const std::string& prefix);
+ std::string_view get_key_hash_view(const prefix_shards& shards, const char* key, const size_t keylen);
+ rocksdb::ColumnFamilyHandle *get_key_cf(const prefix_shards& shards, const char* key, const size_t keylen);
rocksdb::ColumnFamilyHandle *get_cf_handle(const std::string& prefix, const std::string& key);
rocksdb::ColumnFamilyHandle *get_cf_handle(const std::string& prefix, const char* key, size_t keylen);
+ rocksdb::ColumnFamilyHandle *get_cf_handle(const std::string& prefix, const IteratorBounds& bounds);
int submit_common(rocksdb::WriteOptions& woptions, KeyValueDB::Transaction t);
int install_cf_mergeop(const std::string &cf_name, rocksdb::ColumnFamilyOptions *cf_opt);
public KeyValueDB::WholeSpaceIteratorImpl {
protected:
rocksdb::Iterator *dbiter;
+ const KeyValueDB::IteratorBounds bounds;
+ const rocksdb::Slice iterate_lower_bound;
+ const rocksdb::Slice iterate_upper_bound;
public:
- explicit RocksDBWholeSpaceIteratorImpl(rocksdb::Iterator *iter) :
- dbiter(iter) { }
+ explicit RocksDBWholeSpaceIteratorImpl(const RocksDBStore* db,
+ rocksdb::ColumnFamilyHandle* cf,
+ const KeyValueDB::IteratorOpts opts,
+ KeyValueDB::IteratorBounds bounds_) :
+ bounds(std::move(bounds_)),
+ iterate_lower_bound(make_slice(bounds.lower_bound)),
+ iterate_upper_bound(make_slice(bounds.upper_bound))
+ {
+ rocksdb::ReadOptions options = rocksdb::ReadOptions();
+ if (opts & ITERATOR_NOCACHE)
+ options.fill_cache=false;
+ if (bounds.lower_bound) {
+ options.iterate_lower_bound = &iterate_lower_bound;
+ }
+ if (bounds.upper_bound) {
+ options.iterate_upper_bound = &iterate_upper_bound;
+ }
+ dbiter = db->db->NewIterator(options, cf);
+ }
//virtual ~RocksDBWholeSpaceIteratorImpl() { }
~RocksDBWholeSpaceIteratorImpl() override;
size_t value_size() override;
};
- Iterator get_iterator(const std::string& prefix, IteratorOpts opts = 0) override;
+ Iterator get_iterator(const std::string& prefix, IteratorOpts opts = 0, IteratorBounds = IteratorBounds()) override;
private:
/// this iterator spans single cf
rocksdb::Iterator* new_shard_iterator(rocksdb::ColumnFamilyHandle* cf);
return nullptr;
}
- WholeSpaceIterator get_wholespace_iterator(IteratorOpts opts = 0) override;
+ WholeSpaceIterator get_wholespace_iterator(IteratorOpts opts = 0, IteratorBounds bounds = IteratorBounds()) override;
private:
WholeSpaceIterator get_default_cf_iterator();
o->flush();
{
const string& prefix = o->get_omap_prefix();
- KeyValueDB::Iterator it = db->get_iterator(prefix);
string head, tail;
o->get_omap_header(&head);
o->get_omap_tail(&tail);
+ auto bounds = KeyValueDB::IteratorBounds();
+ bounds.lower_bound = head;
+ bounds.upper_bound = tail;
+ KeyValueDB::Iterator it = db->get_iterator(prefix, 0, std::move(bounds));
it->lower_bound(head);
while (it->valid()) {
if (it->key() == head) {
o->flush();
{
const string& prefix = o->get_omap_prefix();
- KeyValueDB::Iterator it = db->get_iterator(prefix);
string head, tail;
o->get_omap_key(string(), &head);
o->get_omap_tail(&tail);
+ auto bounds = KeyValueDB::IteratorBounds();
+ bounds.lower_bound = head;
+ bounds.upper_bound = tail;
+ KeyValueDB::Iterator it = db->get_iterator(prefix, 0, std::move(bounds));
it->lower_bound(head);
while (it->valid()) {
if (it->key() >= tail) {
}
o->flush();
dout(10) << __func__ << " has_omap = " << (int)o->onode.has_omap() <<dendl;
- KeyValueDB::Iterator it = db->get_iterator(o->get_omap_prefix());
+ auto bounds = KeyValueDB::IteratorBounds();
+ if (o->onode.has_omap()) {
+ std::string lower_bound, upper_bound;
+ o->get_omap_key(string(), &lower_bound);
+ o->get_omap_tail(&upper_bound);
+ bounds.lower_bound = std::move(lower_bound);
+ bounds.upper_bound = std::move(upper_bound);
+ }
+ KeyValueDB::Iterator it = db->get_iterator(o->get_omap_prefix(), 0, std::move(bounds));
return ObjectMap::ObjectMapIterator(new OmapIteratorImpl(c, o, it));
}
// otherwise rewrite_omap_key will corrupt data
ceph_assert(oldo->onode.flags == newo->onode.flags);
const string& prefix = newo->get_omap_prefix();
- KeyValueDB::Iterator it = db->get_iterator(prefix);
string head, tail;
oldo->get_omap_header(&head);
oldo->get_omap_tail(&tail);
+ auto bounds = KeyValueDB::IteratorBounds();
+ bounds.lower_bound = head;
+ bounds.upper_bound = tail;
+ KeyValueDB::Iterator it = db->get_iterator(prefix, 0, std::move(bounds));
it->lower_bound(head);
while (it->valid()) {
if (it->key() >= tail) {