#include <inttypes.h>
+#include "db/builder.h"
#include "rocksdb/db.h"
#include "rocksdb/env.h"
#include "rocksdb/sst_file_writer.h"
-
-#include "db/builder.h"
-#include "table/sst_file_writer_collectors.h"
#include "table/table_builder.h"
#include "util/file_reader_writer.h"
#include "util/file_util.h"
file_info->version =
DecodeFixed32(external_sst_file_version_iter->second.c_str());
- if (file_info->version == 2) {
- // version 2 imply that we have global sequence number
-
- // TODO(tec): Implement version 2 ingestion
- file_info->sequence_number = 0;
- } else if (file_info->version == 1) {
+ if (file_info->version == 1) {
// version 1 imply that all sequence numbers in table equal 0
file_info->sequence_number = 0;
} else {
if (file_info_list[i].num_entries == 0) {
return Status::InvalidArgument("File contain no entries");
}
-
- if (file_info_list[i].version == 2) {
- // version 2 imply that file have only Put Operations
- // with global Sequence Number
-
- // TODO(tec): Implement changing file global sequence number
- } else if (file_info_list[i].version == 1) {
- // version 1 imply that file have only Put Operations
- // with Sequence Number = 0
- } else {
- // Unknown version !
+ if (file_info_list[i].version != 1) {
return Status::InvalidArgument(
"Generated table version is not supported");
}
+ // version 1 imply that file have only Put Operations with Sequence Number =
+ // 0
meta_list[i].smallest =
InternalKey(file_info_list[i].smallest_key,
for (size_t i = 0; i < num_files; i++) {
StopWatch sw(env_, nullptr, 0, µ_list[i], false);
InternalKey range_start(file_info_list[i].smallest_key,
- kMaxSequenceNumber, kValueTypeForSeek);
+ kMaxSequenceNumber, kTypeValue);
iter->Seek(range_start.Encode());
status = iter->status();
static const SequenceNumber kMaxSequenceNumber =
((0x1ull << 56) - 1);
-static const SequenceNumber kDisableGlobalSequenceNumber = port::kMaxUint64;
-
struct ParsedInternalKey {
Slice user_key;
SequenceNumber sequence;
return Slice(key_, key_n);
}
- // Copy the key into IterKey own buf_
- void OwnKey() {
- assert(IsKeyPinned() == true);
-
- EnlargeBufferIfNeeded(key_size_);
- memcpy(buf_, key_, key_size_);
- key_ = buf_;
- }
-
// Update the sequence number in the internal key. Guarantees not to
// invalidate slices to the key (and the user key).
void UpdateInternalKey(uint64_t seq, ValueType t) {
std::vector<std::thread> threads;
while (range_id < 5000) {
- int range_start = range_id * 10;
+ int range_start = (range_id * 20);
int range_end = range_start + 10;
file_keys.clear();
range_id++;
}
-
- for (int rid = 0; rid < 5000; rid++) {
- int range_start = rid * 10;
- int range_end = range_start + 10;
-
- ASSERT_EQ(Get(Key(range_start)), Key(range_start)) << rid;
- ASSERT_EQ(Get(Key(range_end)), Key(range_end)) << rid;
- for (int k = range_start + 1; k < range_end; k++) {
- std::string v = Key(k) + ToString(rid);
- ASSERT_EQ(Get(Key(k)), v) << rid;
- }
- }
}
TEST_F(ExternalSSTFileTest, PickedLevelDynamic) {
class Comparator;
+// Table Properties that are specific to tables created by SstFileWriter.
+struct ExternalSstFilePropertyNames {
+ // value of this property is a fixed int32 number.
+ static const std::string kVersion;
+};
+
// ExternalSstFileInfo include information about sst files created
// using SstFileWriter
struct ExternalSstFileInfo {
Status Finish(ExternalSstFileInfo* file_info = nullptr);
private:
+ class SstFileWriterPropertiesCollectorFactory;
+ class SstFileWriterPropertiesCollector;
struct Rep;
Rep* rep_;
};
UserCollectedProperties user_collected_properties;
UserCollectedProperties readable_properties;
- // The offset of the value of each property in the file.
- std::map<std::string, uint64_t> properties_offsets;
-
// convert this object to a human readable form
// @prop_delim: delimiter for each property.
std::string ToString(const std::string& prop_delim = "; ",
key_.TrimAppend(shared, p, non_shared);
key_pinned_ = false;
}
-
- if (global_seqno_ != kDisableGlobalSequenceNumber) {
- // If we are reading a file with a global sequence number we should
- // expect that all encoded sequence numbers are zeros and all value
- // types are kTypeValue
- assert(GetInternalKeySeqno(key_.GetKey()) == 0);
- assert(ExtractValueType(key_.GetKey()) == ValueType::kTypeValue);
-
- if (key_pinned_) {
- // TODO(tec): Investigate updating the seqno in the loaded block
- // directly instead of doing a copy and update.
-
- // We cannot use the key address in the block directly because
- // we have a global_seqno_ that will overwrite the encoded one.
- key_.OwnKey();
- key_pinned_ = false;
- }
-
- key_.UpdateInternalKey(global_seqno_, ValueType::kTypeValue);
- }
-
value_ = Slice(p + non_shared, value_length);
while (restart_index_ + 1 < num_restarts_ &&
GetRestartPoint(restart_index_ + 1) < current_) {
return DecodeFixed32(data_ + size_ - sizeof(uint32_t));
}
-Block::Block(BlockContents&& contents, SequenceNumber _global_seqno,
- size_t read_amp_bytes_per_bit, Statistics* statistics)
+Block::Block(BlockContents&& contents, size_t read_amp_bytes_per_bit,
+ Statistics* statistics)
: contents_(std::move(contents)),
data_(contents_.data.data()),
- size_(contents_.data.size()),
- global_seqno_(_global_seqno) {
+ size_(contents_.data.size()) {
if (size_ < sizeof(uint32_t)) {
size_ = 0; // Error marker
} else {
if (iter != nullptr) {
iter->Initialize(cmp, data_, restart_offset_, num_restarts,
- prefix_index_ptr, global_seqno_, read_amp_bitmap_.get());
+ prefix_index_ptr, read_amp_bitmap_.get());
} else {
iter = new BlockIter(cmp, data_, restart_offset_, num_restarts,
- prefix_index_ptr, global_seqno_,
- read_amp_bitmap_.get());
+ prefix_index_ptr, read_amp_bitmap_.get());
}
if (read_amp_bitmap_) {
class Block {
public:
// Initialize the block with the specified contents.
- explicit Block(BlockContents&& contents, SequenceNumber _global_seqno,
- size_t read_amp_bytes_per_bit = 0,
+ explicit Block(BlockContents&& contents, size_t read_amp_bytes_per_bit = 0,
Statistics* statistics = nullptr);
~Block() = default;
// Report an approximation of how much memory has been used.
size_t ApproximateMemoryUsage() const;
- SequenceNumber global_seqno() const { return global_seqno_; }
-
private:
BlockContents contents_;
const char* data_; // contents_.data.data()
uint32_t restart_offset_; // Offset in data_ of restart array
std::unique_ptr<BlockPrefixIndex> prefix_index_;
std::unique_ptr<BlockReadAmpBitmap> read_amp_bitmap_;
- // All keys in the block will have seqno = global_seqno_, regardless of
- // the encoded value (kDisableGlobalSequenceNumber means disabled)
- const SequenceNumber global_seqno_;
// No copying allowed
Block(const Block&);
status_(Status::OK()),
prefix_index_(nullptr),
key_pinned_(false),
- global_seqno_(kDisableGlobalSequenceNumber),
read_amp_bitmap_(nullptr),
last_bitmap_offset_(0) {}
BlockIter(const Comparator* comparator, const char* data, uint32_t restarts,
uint32_t num_restarts, BlockPrefixIndex* prefix_index,
- SequenceNumber global_seqno, BlockReadAmpBitmap* read_amp_bitmap)
+ BlockReadAmpBitmap* read_amp_bitmap)
: BlockIter() {
Initialize(comparator, data, restarts, num_restarts, prefix_index,
- global_seqno, read_amp_bitmap);
+ read_amp_bitmap);
}
void Initialize(const Comparator* comparator, const char* data,
uint32_t restarts, uint32_t num_restarts,
- BlockPrefixIndex* prefix_index, SequenceNumber global_seqno,
+ BlockPrefixIndex* prefix_index,
BlockReadAmpBitmap* read_amp_bitmap) {
assert(data_ == nullptr); // Ensure it is called only once
assert(num_restarts > 0); // Ensure the param is valid
current_ = restarts_;
restart_index_ = num_restarts_;
prefix_index_ = prefix_index;
- global_seqno_ = global_seqno;
read_amp_bitmap_ = read_amp_bitmap;
last_bitmap_offset_ = current_ + 1;
}
size_t TEST_CurrentEntrySize() { return NextEntryOffset() - current_; }
- uint32_t ValueOffset() const {
- return static_cast<uint32_t>(value_.data() - data_);
- }
-
private:
const Comparator* comparator_;
const char* data_; // underlying block contents
Status status_;
BlockPrefixIndex* prefix_index_;
bool key_pinned_;
- SequenceNumber global_seqno_;
// read-amp bitmap
BlockReadAmpBitmap* read_amp_bitmap_;
BlockContents results(std::move(ubuf), size, true, type);
- Block* block = new Block(std::move(results), kDisableGlobalSequenceNumber);
+ Block* block = new Block(std::move(results));
// make cache key by appending the file offset to the cache prefix id
char* end = EncodeVarint64(
#include "table/internal_iterator.h"
#include "table/meta_blocks.h"
#include "table/persistent_cache_helper.h"
-#include "table/sst_file_writer_collectors.h"
#include "table/two_level_iterator.h"
#include "util/coding.h"
const ImmutableCFOptions& ioptions, bool do_uncompress,
const Slice& compression_dict,
const PersistentCacheOptions& cache_options,
- SequenceNumber global_seqno,
size_t read_amp_bytes_per_bit) {
BlockContents contents;
Status s = ReadBlockContents(file, footer, options, handle, &contents, ioptions,
do_uncompress, compression_dict, cache_options);
if (s.ok()) {
- result->reset(new Block(std::move(contents), global_seqno,
- read_amp_bytes_per_bit, ioptions.statistics));
+ result->reset(new Block(std::move(contents), read_amp_bytes_per_bit,
+ ioptions.statistics));
}
return s;
const Comparator* comparator, IndexReader** index_reader,
const PersistentCacheOptions& cache_options) {
std::unique_ptr<Block> index_block;
- auto s = ReadBlockFromFile(
- file, footer, ReadOptions(), index_handle, &index_block, ioptions,
- true /* decompress */, Slice() /*compression dict*/, cache_options,
- kDisableGlobalSequenceNumber, 0 /* read_amp_bytes_per_bit */);
+ auto s = ReadBlockFromFile(file, footer, ReadOptions(), index_handle,
+ &index_block, ioptions, true /* decompress */,
+ Slice() /*compression dict*/, cache_options,
+ 0 /* read_amp_bytes_per_bit */);
if (s.ok()) {
*index_reader = new BinarySearchIndexReader(
bool hash_index_allow_collision,
const PersistentCacheOptions& cache_options) {
std::unique_ptr<Block> index_block;
- auto s = ReadBlockFromFile(
- file, footer, ReadOptions(), index_handle, &index_block, ioptions,
- true /* decompress */, Slice() /*compression dict*/, cache_options,
- kDisableGlobalSequenceNumber, 0 /* read_amp_bytes_per_bit */);
+ auto s = ReadBlockFromFile(file, footer, ReadOptions(), index_handle,
+ &index_block, ioptions, true /* decompress */,
+ Slice() /*compression dict*/, cache_options,
+ 0 /* read_amp_bytes_per_bit */);
if (!s.ok()) {
return s;
filter_type(FilterType::kNoFilter),
whole_key_filtering(_table_opt.whole_key_filtering),
prefix_filtering(true),
- range_del_block(nullptr),
- global_seqno(kDisableGlobalSequenceNumber) {}
+ range_del_block(nullptr) {}
const ImmutableCFOptions& ioptions;
const EnvOptions& env_options;
CachableEntry<FilterBlockReader> filter_entry;
CachableEntry<IndexReader> index_entry;
unique_ptr<Block> range_del_block;
-
- // If global_seqno is used, all Keys in this file will have the same
- // seqno with value `global_seqno`.
- //
- // A value of kDisableGlobalSequenceNumber means that this feature is disabled
- // and every key have it's own seqno.
- SequenceNumber global_seqno;
};
BlockBasedTable::~BlockBasedTable() {
}
return true;
}
-
-SequenceNumber GetGlobalSequenceNumber(const TableProperties& table_properties,
- Logger* info_log) {
- auto& props = table_properties.user_collected_properties;
-
- auto version_pos = props.find(ExternalSstFilePropertyNames::kVersion);
- auto seqno_pos = props.find(ExternalSstFilePropertyNames::kGlobalSeqno);
-
- if (version_pos == props.end()) {
- if (seqno_pos != props.end()) {
- // This is not an external sst file, global_seqno is not supported.
- assert(false);
- Log(InfoLogLevel::ERROR_LEVEL, info_log,
- "A non-external sst file have global seqno property with value %s",
- seqno_pos->second.c_str());
- }
- return kDisableGlobalSequenceNumber;
- }
-
- uint32_t version = DecodeFixed32(version_pos->second.c_str());
- if (version < 2) {
- if (seqno_pos != props.end() || version != 1) {
- // This is a v1 external sst file, global_seqno is not supported.
- assert(false);
- Log(InfoLogLevel::ERROR_LEVEL, info_log,
- "An external sst file with version %u have global seqno property "
- "with value %s",
- version, seqno_pos->second.c_str());
- }
- return kDisableGlobalSequenceNumber;
- }
-
- SequenceNumber global_seqno = DecodeFixed64(seqno_pos->second.c_str());
-
- if (global_seqno > kMaxSequenceNumber) {
- assert(false);
- Log(InfoLogLevel::ERROR_LEVEL, info_log,
- "An external sst file with version %u have global seqno property "
- "with value %llu, which is greater than kMaxSequenceNumber",
- version, global_seqno);
- }
-
- return global_seqno;
-}
} // namespace
Slice BlockBasedTable::GetCacheKey(const char* cache_key_prefix,
"Encountered error while reading data from range del block %s",
s.ToString().c_str());
} else {
- rep->range_del_block.reset(new Block(
- std::move(range_del_block_contents), kDisableGlobalSequenceNumber));
+ rep->range_del_block.reset(
+ new Block(std::move(range_del_block_contents)));
}
}
}
rep->prefix_filtering &= IsFeatureSupported(
*(rep->table_properties),
BlockBasedTablePropertyNames::kPrefixFiltering, rep->ioptions.info_log);
-
- rep->global_seqno = GetGlobalSequenceNumber(*(rep->table_properties),
- rep->ioptions.info_log);
}
// pre-fetching of blocks is turned on
rep->file.get(), rep->footer, ReadOptions(),
rep->footer.metaindex_handle(), &meta, rep->ioptions,
true /* decompress */, Slice() /*compression dict*/,
- rep->persistent_cache_options, kDisableGlobalSequenceNumber,
- 0 /* read_amp_bytes_per_bit */);
+ rep->persistent_cache_options, 0 /* read_amp_bytes_per_bit */);
if (!s.ok()) {
Log(InfoLogLevel::ERROR_LEVEL, rep->ioptions.info_log,
// Insert uncompressed block into block cache
if (s.ok()) {
- block->value =
- new Block(std::move(contents), compressed_block->global_seqno(),
- read_amp_bytes_per_bit,
- statistics); // uncompressed block
+ block->value = new Block(std::move(contents), read_amp_bytes_per_bit,
+ statistics); // uncompressed block
assert(block->value->compression_type() == kNoCompression);
if (block_cache != nullptr && block->value->cachable() &&
read_options.fill_cache) {
}
if (raw_block->compression_type() != kNoCompression) {
- block->value = new Block(std::move(contents), raw_block->global_seqno(),
- read_amp_bytes_per_bit,
- statistics); // uncompressed block
+ block->value = new Block(std::move(contents), read_amp_bytes_per_bit,
+ statistics); // compressed block
} else {
block->value = raw_block;
raw_block = nullptr;
std::unique_ptr<Block> raw_block;
{
StopWatch sw(rep->ioptions.env, statistics, READ_BLOCK_GET_MICROS);
- s = ReadBlockFromFile(
- rep->file.get(), rep->footer, ro, handle, &raw_block, rep->ioptions,
- block_cache_compressed == nullptr, compression_dict,
- rep->persistent_cache_options, rep->global_seqno,
- rep->table_options.read_amp_bytes_per_bit);
+ s = ReadBlockFromFile(rep->file.get(), rep->footer, ro, handle,
+ &raw_block, rep->ioptions,
+ block_cache_compressed == nullptr,
+ compression_dict, rep->persistent_cache_options,
+ rep->table_options.read_amp_bytes_per_bit);
}
if (s.ok()) {
}
}
std::unique_ptr<Block> block_value;
- s = ReadBlockFromFile(
- rep->file.get(), rep->footer, ro, handle, &block_value, rep->ioptions,
- true /* compress */, compression_dict, rep->persistent_cache_options,
- rep->global_seqno, rep->table_options.read_amp_bytes_per_bit);
+ s = ReadBlockFromFile(rep->file.get(), rep->footer, ro, handle,
+ &block_value, rep->ioptions, true /* compress */,
+ compression_dict, rep->persistent_cache_options,
+ rep->table_options.read_amp_bytes_per_bit);
if (s.ok()) {
block.value = block_value.release();
}
BlockContents contents;
contents.data = rawblock;
contents.cachable = false;
- Block reader(std::move(contents), kDisableGlobalSequenceNumber);
+ Block reader(std::move(contents));
// read contents of block sequentially
int count = 0;
// create block reader
BlockContents contents_ref(contents.data, contents.cachable,
contents.compression_type);
- Block reader1(std::move(contents), kDisableGlobalSequenceNumber);
- Block reader2(std::move(contents_ref), kDisableGlobalSequenceNumber);
+ Block reader1(std::move(contents));
+ Block reader2(std::move(contents_ref));
std::unique_ptr<const SliceTransform> prefix_extractor(
NewFixedPrefixTransform(prefix_size));
BlockContents contents;
contents.data = rawblock;
contents.cachable = true;
- Block reader(std::move(contents), kDisableGlobalSequenceNumber,
- kBytesPerBit, stats.get());
+ Block reader(std::move(contents), kBytesPerBit, stats.get());
// read contents of block sequentially
size_t read_bytes = 0;
BlockContents contents;
contents.data = rawblock;
contents.cachable = true;
- Block reader(std::move(contents), kDisableGlobalSequenceNumber,
- kBytesPerBit, stats.get());
+ Block reader(std::move(contents), kBytesPerBit, stats.get());
size_t read_bytes = 0;
BlockIter *iter = static_cast<BlockIter *>(
BlockContents contents;
contents.data = rawblock;
contents.cachable = true;
- Block reader(std::move(contents), kDisableGlobalSequenceNumber,
- kBytesPerBit, stats.get());
+ Block reader(std::move(contents), kBytesPerBit, stats.get());
size_t read_bytes = 0;
BlockIter *iter = static_cast<BlockIter *>(
return s;
}
- Block properties_block(std::move(block_contents),
- kDisableGlobalSequenceNumber);
- BlockIter iter;
- properties_block.NewIterator(BytewiseComparator(), &iter);
+ Block properties_block(std::move(block_contents));
+ std::unique_ptr<InternalIterator> iter(
+ properties_block.NewIterator(BytewiseComparator()));
auto new_table_properties = new TableProperties();
// All pre-defined properties of type uint64_t
};
std::string last_key;
- for (iter.SeekToFirst(); iter.Valid(); iter.Next()) {
- s = iter.status();
+ for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
+ s = iter->status();
if (!s.ok()) {
break;
}
- auto key = iter.key().ToString();
+ auto key = iter->key().ToString();
// properties block is strictly sorted with no duplicate key.
assert(last_key.empty() ||
BytewiseComparator()->Compare(key, last_key) > 0);
last_key = key;
- auto raw_val = iter.value();
+ auto raw_val = iter->value();
auto pos = predefined_uint64_properties.find(key);
- new_table_properties->properties_offsets.insert(
- {key, handle.offset() + iter.ValueOffset()});
-
if (pos != predefined_uint64_properties.end()) {
// handle predefined rocksdb properties
uint64_t val;
if (!s.ok()) {
return s;
}
- Block metaindex_block(std::move(metaindex_contents),
- kDisableGlobalSequenceNumber);
+ Block metaindex_block(std::move(metaindex_contents));
std::unique_ptr<InternalIterator> meta_iter(
metaindex_block.NewIterator(BytewiseComparator()));
if (!s.ok()) {
return s;
}
- Block metaindex_block(std::move(metaindex_contents),
- kDisableGlobalSequenceNumber);
+ Block metaindex_block(std::move(metaindex_contents));
std::unique_ptr<InternalIterator> meta_iter;
meta_iter.reset(metaindex_block.NewIterator(BytewiseComparator()));
}
// Finding metablock
- Block metaindex_block(std::move(metaindex_contents),
- kDisableGlobalSequenceNumber);
+ Block metaindex_block(std::move(metaindex_contents));
std::unique_ptr<InternalIterator> meta_iter;
meta_iter.reset(metaindex_block.NewIterator(BytewiseComparator()));
#include "db/dbformat.h"
#include "rocksdb/table.h"
#include "table/block_based_table_builder.h"
-#include "table/sst_file_writer_collectors.h"
#include "util/file_reader_writer.h"
+#include "util/string_util.h"
namespace rocksdb {
const std::string ExternalSstFilePropertyNames::kVersion =
"rocksdb.external_sst_file.version";
-const std::string ExternalSstFilePropertyNames::kGlobalSeqno =
- "rocksdb.external_sst_file.global_seqno";
+
+// PropertiesCollector used to add properties specific to tables
+// generated by SstFileWriter
+class SstFileWriter::SstFileWriterPropertiesCollector
+ : public IntTblPropCollector {
+ public:
+ explicit SstFileWriterPropertiesCollector(int32_t version)
+ : version_(version) {}
+
+ virtual Status InternalAdd(const Slice& key, const Slice& value,
+ uint64_t file_size) override {
+ // Intentionally left blank. Have no interest in collecting stats for
+ // individual key/value pairs.
+ return Status::OK();
+ }
+
+ virtual Status Finish(UserCollectedProperties* properties) override {
+ std::string version_val;
+ PutFixed32(&version_val, static_cast<int32_t>(version_));
+ properties->insert({ExternalSstFilePropertyNames::kVersion, version_val});
+ return Status::OK();
+ }
+
+ virtual const char* Name() const override {
+ return "SstFileWriterPropertiesCollector";
+ }
+
+ virtual UserCollectedProperties GetReadableProperties() const override {
+ return {{ExternalSstFilePropertyNames::kVersion, ToString(version_)}};
+ }
+
+ private:
+ int32_t version_;
+};
+
+class SstFileWriter::SstFileWriterPropertiesCollectorFactory
+ : public IntTblPropCollectorFactory {
+ public:
+ explicit SstFileWriterPropertiesCollectorFactory(int32_t version)
+ : version_(version) {}
+
+ virtual IntTblPropCollector* CreateIntTblPropCollector(
+ uint32_t column_family_id) override {
+ return new SstFileWriterPropertiesCollector(version_);
+ }
+
+ virtual const char* Name() const override {
+ return "SstFileWriterPropertiesCollector";
+ }
+
+ private:
+ int32_t version_;
+};
struct SstFileWriter::Rep {
Rep(const EnvOptions& _env_options, const Options& options,
// SstFileWriter properties collector to add SstFileWriter version.
int_tbl_prop_collector_factories.emplace_back(
- new SstFileWriterPropertiesCollectorFactory(2 /* version */,
- 0 /* global_seqno*/));
+ new SstFileWriterPropertiesCollectorFactory(1 /* version */));
// User collector factories
auto user_collector_factories =
r->column_family_name, unknown_level);
r->file_writer.reset(
new WritableFileWriter(std::move(sst_file), r->env_options));
-
- // TODO(tec) : If table_factory is using compressed block cache, we will
- // be adding the external sst file blocks into it, which is wasteful.
r->builder.reset(r->ioptions.table_factory->NewTableBuilder(
table_builder_options,
TablePropertiesCollectorFactory::Context::kUnknownColumnFamily,
r->file_info.file_size = 0;
r->file_info.num_entries = 0;
r->file_info.sequence_number = 0;
- r->file_info.version = 2;
+ r->file_info.version = 1;
return s;
}
r->file_info.largest_key.assign(user_key.data(), user_key.size());
r->file_info.file_size = r->builder->FileSize();
- // TODO(tec) : For external SST files we could omit the seqno and type.
r->ikey.Set(user_key, 0 /* Sequence Number */,
ValueType::kTypeValue /* Put */);
r->builder->Add(r->ikey.Encode(), value);
+++ /dev/null
-// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
-// This source code is licensed under the BSD-style license found in the
-// LICENSE file in the root directory of this source tree. An additional grant
-// of patent rights can be found in the PATENTS file in the same directory.
-
-#pragma once
-#include <string>
-#include "rocksdb/types.h"
-#include "util/string_util.h"
-
-namespace rocksdb {
-
-// Table Properties that are specific to tables created by SstFileWriter.
-struct ExternalSstFilePropertyNames {
- // value of this property is a fixed uint32 number.
- static const std::string kVersion;
- // value of this property is a fixed uint64 number.
- static const std::string kGlobalSeqno;
-};
-
-// PropertiesCollector used to add properties specific to tables
-// generated by SstFileWriter
-class SstFileWriterPropertiesCollector : public IntTblPropCollector {
- public:
- explicit SstFileWriterPropertiesCollector(int32_t version,
- SequenceNumber global_seqno)
- : version_(version), global_seqno_(global_seqno) {}
-
- virtual Status InternalAdd(const Slice& key, const Slice& value,
- uint64_t file_size) override {
- // Intentionally left blank. Have no interest in collecting stats for
- // individual key/value pairs.
- return Status::OK();
- }
-
- virtual Status Finish(UserCollectedProperties* properties) override {
- // File version
- std::string version_val;
- PutFixed32(&version_val, static_cast<uint32_t>(version_));
- properties->insert({ExternalSstFilePropertyNames::kVersion, version_val});
-
- // Global Sequence number
- std::string seqno_val;
- PutFixed64(&seqno_val, static_cast<uint64_t>(global_seqno_));
- properties->insert({ExternalSstFilePropertyNames::kGlobalSeqno, seqno_val});
-
- return Status::OK();
- }
-
- virtual const char* Name() const override {
- return "SstFileWriterPropertiesCollector";
- }
-
- virtual UserCollectedProperties GetReadableProperties() const override {
- return {{ExternalSstFilePropertyNames::kVersion, ToString(version_)}};
- }
-
- private:
- int32_t version_;
- SequenceNumber global_seqno_;
-};
-
-class SstFileWriterPropertiesCollectorFactory
- : public IntTblPropCollectorFactory {
- public:
- explicit SstFileWriterPropertiesCollectorFactory(int32_t version,
- SequenceNumber global_seqno)
- : version_(version), global_seqno_(global_seqno) {}
-
- virtual IntTblPropCollector* CreateIntTblPropCollector(
- uint32_t column_family_id) override {
- return new SstFileWriterPropertiesCollector(version_, global_seqno_);
- }
-
- virtual const char* Name() const override {
- return "SstFileWriterPropertiesCollector";
- }
-
- private:
- int32_t version_;
- SequenceNumber global_seqno_;
-};
-
-} // namespace rocksdb
#include "table/meta_blocks.h"
#include "table/plain_table_factory.h"
#include "table/scoped_arena_iterator.h"
-#include "table/sst_file_writer_collectors.h"
#include "util/compression.h"
#include "util/random.h"
#include "util/statistics.h"
BlockContents contents;
contents.data = data_;
contents.cachable = false;
- block_ = new Block(std::move(contents), kDisableGlobalSequenceNumber);
+ block_ = new Block(std::move(contents));
return Status::OK();
}
virtual InternalIterator* NewIterator() const override {
// rocksdb still works.
}
-TEST_F(BlockBasedTableTest, TableWithGlobalSeqno) {
- BlockBasedTableOptions bbto;
- test::StringSink* sink = new test::StringSink();
- unique_ptr<WritableFileWriter> file_writer(test::GetWritableFileWriter(sink));
- Options options;
- options.table_factory.reset(NewBlockBasedTableFactory(bbto));
- const ImmutableCFOptions ioptions(options);
- InternalKeyComparator ikc(options.comparator);
- std::vector<std::unique_ptr<IntTblPropCollectorFactory>>
- int_tbl_prop_collector_factories;
- int_tbl_prop_collector_factories.emplace_back(
- new SstFileWriterPropertiesCollectorFactory(2 /* version */,
- 0 /* global_seqno*/));
- std::string column_family_name;
- std::unique_ptr<TableBuilder> builder(options.table_factory->NewTableBuilder(
- TableBuilderOptions(ioptions, ikc, &int_tbl_prop_collector_factories,
- kNoCompression, CompressionOptions(),
- nullptr /* compression_dict */,
- false /* skip_filters */, column_family_name, -1),
- TablePropertiesCollectorFactory::Context::kUnknownColumnFamily,
- file_writer.get()));
-
- for (char c = 'a'; c <= 'z'; ++c) {
- std::string key(8, c);
- std::string value = key;
- InternalKey ik(key, 0, kTypeValue);
-
- builder->Add(ik.Encode(), value);
- }
- ASSERT_OK(builder->Finish());
- file_writer->Flush();
-
- test::RandomRWStringSink ss_rw(sink);
- uint32_t version;
- uint64_t global_seqno;
- uint64_t global_seqno_offset;
-
- // Helper function to get version, global_seqno, global_seqno_offset
- std::function<void()> GetVersionAndGlobalSeqno = [&]() {
- unique_ptr<RandomAccessFileReader> file_reader(
- test::GetRandomAccessFileReader(
- new test::StringSource(ss_rw.contents(), 73342, true)));
-
- TableProperties* props = nullptr;
- ASSERT_OK(ReadTableProperties(file_reader.get(), ss_rw.contents().size(),
- kBlockBasedTableMagicNumber, ioptions,
- &props));
-
- UserCollectedProperties user_props = props->user_collected_properties;
- version = DecodeFixed32(
- user_props[ExternalSstFilePropertyNames::kVersion].c_str());
- global_seqno = DecodeFixed64(
- user_props[ExternalSstFilePropertyNames::kGlobalSeqno].c_str());
- global_seqno_offset =
- props->properties_offsets[ExternalSstFilePropertyNames::kGlobalSeqno];
-
- delete props;
- };
-
- // Helper function to update the value of the global seqno in the file
- std::function<void(uint64_t)> SetGlobalSeqno = [&](uint64_t val) {
- std::string new_global_seqno;
- PutFixed64(&new_global_seqno, val);
-
- ASSERT_OK(ss_rw.Write(global_seqno_offset, new_global_seqno));
- };
-
- // Helper function to get the contents of the table InternalIterator
- unique_ptr<TableReader> table_reader;
- std::function<InternalIterator*()> GetTableInternalIter = [&]() {
- unique_ptr<RandomAccessFileReader> file_reader(
- test::GetRandomAccessFileReader(
- new test::StringSource(ss_rw.contents(), 73342, true)));
-
- options.table_factory->NewTableReader(
- TableReaderOptions(ioptions, EnvOptions(), ikc), std::move(file_reader),
- ss_rw.contents().size(), &table_reader);
-
- return table_reader->NewIterator(ReadOptions());
- };
-
- GetVersionAndGlobalSeqno();
- ASSERT_EQ(2, version);
- ASSERT_EQ(0, global_seqno);
-
- InternalIterator* iter = GetTableInternalIter();
- char current_c = 'a';
- for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
- ParsedInternalKey pik;
- ASSERT_TRUE(ParseInternalKey(iter->key(), &pik));
-
- ASSERT_EQ(pik.type, ValueType::kTypeValue);
- ASSERT_EQ(pik.sequence, 0);
- ASSERT_EQ(pik.user_key, iter->value());
- ASSERT_EQ(pik.user_key.ToString(), std::string(8, current_c));
- current_c++;
- }
- ASSERT_EQ(current_c, 'z' + 1);
- delete iter;
-
- // Update global sequence number to 10
- SetGlobalSeqno(10);
- GetVersionAndGlobalSeqno();
- ASSERT_EQ(2, version);
- ASSERT_EQ(10, global_seqno);
-
- iter = GetTableInternalIter();
- current_c = 'a';
- for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
- ParsedInternalKey pik;
- ASSERT_TRUE(ParseInternalKey(iter->key(), &pik));
-
- ASSERT_EQ(pik.type, ValueType::kTypeValue);
- ASSERT_EQ(pik.sequence, 10);
- ASSERT_EQ(pik.user_key, iter->value());
- ASSERT_EQ(pik.user_key.ToString(), std::string(8, current_c));
- current_c++;
- }
- ASSERT_EQ(current_c, 'z' + 1);
-
- // Verify Seek
- for (char c = 'a'; c <= 'z'; c++) {
- std::string k = std::string(8, c);
- InternalKey ik(k, 10, kValueTypeForSeek);
- iter->Seek(ik.Encode());
- ASSERT_TRUE(iter->Valid());
-
- ParsedInternalKey pik;
- ASSERT_TRUE(ParseInternalKey(iter->key(), &pik));
-
- ASSERT_EQ(pik.type, ValueType::kTypeValue);
- ASSERT_EQ(pik.sequence, 10);
- ASSERT_EQ(pik.user_key.ToString(), k);
- ASSERT_EQ(iter->value().ToString(), k);
- }
- delete iter;
-
- // Update global sequence number to 3
- SetGlobalSeqno(3);
- GetVersionAndGlobalSeqno();
- ASSERT_EQ(2, version);
- ASSERT_EQ(3, global_seqno);
-
- iter = GetTableInternalIter();
- current_c = 'a';
- for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
- ParsedInternalKey pik;
- ASSERT_TRUE(ParseInternalKey(iter->key(), &pik));
-
- ASSERT_EQ(pik.type, ValueType::kTypeValue);
- ASSERT_EQ(pik.sequence, 3);
- ASSERT_EQ(pik.user_key, iter->value());
- ASSERT_EQ(pik.user_key.ToString(), std::string(8, current_c));
- current_c++;
- }
- ASSERT_EQ(current_c, 'z' + 1);
-
- // Verify Seek
- for (char c = 'a'; c <= 'z'; c++) {
- std::string k = std::string(8, c);
- // seqno=4 is less than 3 so we still should get our key
- InternalKey ik(k, 4, kValueTypeForSeek);
- iter->Seek(ik.Encode());
- ASSERT_TRUE(iter->Valid());
-
- ParsedInternalKey pik;
- ASSERT_TRUE(ParseInternalKey(iter->key(), &pik));
-
- ASSERT_EQ(pik.type, ValueType::kTypeValue);
- ASSERT_EQ(pik.sequence, 3);
- ASSERT_EQ(pik.user_key.ToString(), k);
- ASSERT_EQ(iter->value().ToString(), k);
- }
-
- delete iter;
-}
-
} // namespace rocksdb
int main(int argc, char** argv) {
#include "util/string_util.h"
#include "util/sync_point.h"
#include "util/testharness.h"
-#include "util/testutil.h"
namespace rocksdb {
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
rocksdb::SyncPoint::GetInstance()->LoadDependency({});
rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks();
- test::DestroyDir(env_, dummy_files_dir_);
+ DestroyDir(dummy_files_dir_);
+ }
+
+ void DestroyDir(const std::string& dir) {
+ if (env_->FileExists(dir).IsNotFound()) {
+ return;
+ }
+ std::vector<std::string> files_in_dir;
+ EXPECT_OK(env_->GetChildren(dir, &files_in_dir));
+ for (auto& file_in_dir : files_in_dir) {
+ if (file_in_dir == "." || file_in_dir == "..") {
+ continue;
+ }
+ EXPECT_OK(env_->DeleteFile(dir + "/" + file_in_dir));
+ }
+ EXPECT_OK(env_->DeleteDir(dir));
}
void DestroyAndCreateDir(const std::string& dir) {
- ASSERT_OK(test::DestroyDir(env_, dir));
+ DestroyDir(dir);
EXPECT_OK(env_->CreateDir(dir));
}
// We will delete the trash directory, that mean that DeleteScheduler wont
// be able to move files to trash and will delete files them immediately.
- ASSERT_OK(test::DestroyDir(env_, trash_dir_));
+ DestroyDir(trash_dir_);
for (int i = 0; i < 10; i++) {
std::string file_name = "data_" + ToString(i) + ".data";
ASSERT_OK(delete_scheduler_->DeleteFile(NewDummyFile(file_name)));
size_t last_flush_;
};
-// A wrapper around a StringSink to give it a RandomRWFile interface
-class RandomRWStringSink : public RandomRWFile {
- public:
- explicit RandomRWStringSink(StringSink* ss) : ss_(ss) {}
-
- Status Write(uint64_t offset, const Slice& data) {
- if (offset + data.size() > ss_->contents_.size()) {
- ss_->contents_.resize(offset + data.size(), '\0');
- }
-
- char* pos = const_cast<char*>(ss_->contents_.data() + offset);
- memcpy(pos, data.data(), data.size());
- return Status::OK();
- }
-
- Status Read(uint64_t offset, size_t n, Slice* result, char* scratch) const {
- *result = Slice(nullptr, 0);
- if (offset < ss_->contents_.size()) {
- size_t str_res_sz =
- std::min(static_cast<size_t>(ss_->contents_.size() - offset), n);
- *result = Slice(ss_->contents_.data() + offset, str_res_sz);
- }
- return Status::OK();
- }
-
- Status Flush() { return Status::OK(); }
-
- Status Sync() { return Status::OK(); }
-
- Status Close() { return Status::OK(); }
-
- const std::string& contents() const { return ss_->contents(); }
-
- private:
- StringSink* ss_;
-};
-
// Like StringSink, this writes into a string. Unlink StringSink, it
// has some initial content and overwrites it, just like a recycled
// log file.
if (!s.ok()) {
return s;
}
- Block block(std::move(contents), kDisableGlobalSequenceNumber);
+ Block block(std::move(contents));
BlockIter bit;
InternalIterator* it = block.NewIterator(nullptr, &bit);
it->SeekToFirst();