utilities/blob_db/blob_log_reader.cc
utilities/blob_db/blob_log_writer.cc
utilities/blob_db/blob_log_format.cc
+ utilities/blob_db/ttl_extractor.cc
utilities/checkpoint/checkpoint_impl.cc
utilities/col_buf_decoder.cc
utilities/col_buf_encoder.cc
"utilities/blob_db/blob_log_reader.cc",
"utilities/blob_db/blob_log_writer.cc",
"utilities/blob_db/blob_log_format.cc",
+ "utilities/blob_db/ttl_extractor.cc",
"utilities/checkpoint/checkpoint_impl.cc",
"utilities/compaction_filters/remove_emptyvalue_compactionfilter.cc",
"utilities/convenience/info_log_finder.cc",
utilities/blob_db/blob_log_reader.cc \
utilities/blob_db/blob_log_writer.cc \
utilities/blob_db/blob_log_format.cc \
+ utilities/blob_db/ttl_extractor.cc \
utilities/checkpoint/checkpoint_impl.cc \
utilities/compaction_filters/remove_emptyvalue_compactionfilter.cc \
utilities/convenience/info_log_finder.cc \
#include "table/block.h"
#include "table/block_based_table_builder.h"
#include "table/block_builder.h"
-#include "util/crc32c.h"
#include "util/file_reader_writer.h"
#include "util/filename.h"
#include "utilities/blob_db/blob_db_impl.h"
bytes_per_sync(0),
blob_file_size(256 * 1024 * 1024),
num_concurrent_simple_blobs(4),
- default_ttl_extractor(false),
compression(kNoCompression) {}
} // namespace blob_db
#include "rocksdb/db.h"
#include "rocksdb/status.h"
#include "rocksdb/utilities/stackable_db.h"
+#include "utilities/blob_db/ttl_extractor.h"
namespace rocksdb {
// how many files to use for simple blobs at one time
uint32_t num_concurrent_simple_blobs;
- // this function is to be provided by client if they intend to
- // use Put API to provide TTL.
- // the first argument is the value in the Put API
- // in case you want to do some modifications to the value,
- // return a new Slice in the second.
- // otherwise just copy the input value into output.
- // the ttl should be extracted and returned in last pointer.
- // otherwise assign it to -1
- std::function<bool(const Slice&, Slice*, int32_t*)> extract_ttl_fn;
+ // Instead of setting TTL explicitly by calling PutWithTTL or PutUntil,
+ // applications can set a TTLExtractor which can extract TTL from key-value
+ // pairs.
+ std::shared_ptr<TTLExtractor> ttl_extractor;
// eviction callback.
// this function will be called for every blob that is getting
std::function<void(const ColumnFamilyHandle*, const Slice&, const Slice&)>
gc_evict_cb_fn;
- // default ttl extactor
- bool default_ttl_extractor;
-
// what compression to use for Blob's
CompressionType compression;
};
class BlobDB : public StackableDB {
- public:
- // the suffix to a blob value to represent "ttl:TTLVAL"
- static const uint64_t kTTLSuffixLength = 8;
-
public:
using rocksdb::StackableDB::Put;
return PutWithTTL(options, DefaultColumnFamily(), key, value, ttl);
}
+ // Put with expiration. Key with expiration time equal to -1
+ // means the key don't expire.
virtual Status PutUntil(const WriteOptions& options,
ColumnFamilyHandle* column_family, const Slice& key,
const Slice& value, int32_t expiration) = 0;
#include "utilities/blob_db/blob_db_impl.h"
#include <algorithm>
-#include <chrono>
#include <cinttypes>
-#include <ctime>
#include <iomanip>
#include <limits>
#include <memory>
namespace blob_db {
-struct GCStats {
- uint64_t blob_count = 0;
- uint64_t num_deletes = 0;
- uint64_t deleted_size = 0;
- uint64_t num_relocs = 0;
- uint64_t succ_deletes_lsm = 0;
- uint64_t overrided_while_delete = 0;
- uint64_t succ_relocs = 0;
- std::shared_ptr<BlobFile> newfile = nullptr;
-};
-
// BlobHandle is a pointer to the blob that is stored in the LSM
class BlobHandle {
public:
const DBOptions& db_options)
: BlobDB(nullptr),
db_impl_(nullptr),
- myenv_(db_options.env),
+ env_(db_options.env),
+ ttl_extractor_(blob_db_options.ttl_extractor.get()),
wo_set_(false),
bdb_options_(blob_db_options),
db_options_(db_options),
blob_dir_ = (bdb_options_.path_relative)
? dbname + "/" + bdb_options_.blob_dir
: bdb_options_.blob_dir;
-
- if (bdb_options_.default_ttl_extractor) {
- bdb_options_.extract_ttl_fn = &BlobDBImpl::ExtractTTLFromBlob;
- }
}
Status BlobDBImpl::LinkToBaseDB(DB* db) {
db_impl_ = dynamic_cast<DBImpl*>(db);
}
- myenv_ = db_->GetEnv();
+ env_ = db_->GetEnv();
opt_db_.reset(new OptimisticTransactionDBImpl(db, false));
- Status s = myenv_->CreateDirIfMissing(blob_dir_);
+ Status s = env_->CreateDirIfMissing(blob_dir_);
if (!s.ok()) {
ROCKS_LOG_WARN(db_options_.info_log,
"Failed to create blob directory: %s status: '%s'",
blob_dir_.c_str(), s.ToString().c_str());
}
- s = myenv_->NewDirectory(blob_dir_, &dir_ent_);
+ s = env_->NewDirectory(blob_dir_, &dir_ent_);
if (!s.ok()) {
ROCKS_LOG_WARN(db_options_.info_log,
"Failed to open blob directory: %s status: '%s'",
blob_dir_ = (bdb_options_.path_relative)
? db_->GetName() + "/" + bdb_options_.blob_dir
: bdb_options_.blob_dir;
-
- if (bdb_options_.default_ttl_extractor) {
- bdb_options_.extract_ttl_fn = &BlobDBImpl::ExtractTTLFromBlob;
- }
}
BlobDBImpl::~BlobDBImpl() {
return Status::NotSupported("No blob directory in options");
std::unique_ptr<Directory> dir_ent;
- Status s = myenv_->NewDirectory(blob_dir_, &dir_ent);
+ Status s = env_->NewDirectory(blob_dir_, &dir_ent);
if (!s.ok()) {
ROCKS_LOG_WARN(db_options_.info_log,
"Failed to open blob directory: %s status: '%s'",
Status BlobDBImpl::GetAllLogFiles(
std::set<std::pair<uint64_t, std::string>>* file_nums) {
std::vector<std::string> all_files;
- Status status = myenv_->GetChildren(blob_dir_, &all_files);
+ Status status = env_->GetChildren(blob_dir_, &all_files);
if (!status.ok()) {
return status;
}
for (auto f_iter : file_nums) {
std::string bfpath = BlobFileName(blob_dir_, f_iter.first);
uint64_t size_bytes;
- Status s1 = myenv_->GetFileSize(bfpath, &size_bytes);
+ Status s1 = env_->GetFileSize(bfpath, &size_bytes);
if (!s1.ok()) {
ROCKS_LOG_WARN(
db_options_.info_log,
// read header
std::shared_ptr<Reader> reader;
- reader = bfptr->OpenSequentialReader(myenv_, db_options_, env_options_);
+ reader = bfptr->OpenSequentialReader(env_, db_options_, env_options_);
s1 = reader->ReadHeader(&bfptr->header_);
if (!s1.ok()) {
ROCKS_LOG_ERROR(db_options_.info_log,
bfptr->header_valid_ = true;
std::shared_ptr<RandomAccessFileReader> ra_reader =
- GetOrOpenRandomAccessReader(bfptr, myenv_, env_options_);
+ GetOrOpenRandomAccessReader(bfptr, env_, env_options_);
BlobLogFooter bf;
s1 = bfptr->ReadFooter(&bf);
EnvOptions env_options = env_options_;
env_options.writable_file_max_buffer_size = 0;
- Status s = myenv_->ReopenWritableFile(fpath, &wfile, env_options);
+ Status s = env_->ReopenWritableFile(fpath, &wfile, env_options);
if (!s.ok()) {
ROCKS_LOG_ERROR(db_options_.info_log,
"Failed to open blob file for write: %s status: '%s'"
" exists: '%s'",
fpath.c_str(), s.ToString().c_str(),
- myenv_->FileExists(fpath).ToString().c_str());
+ env_->FileExists(fpath).ToString().c_str());
return s;
}
return bfile;
}
-bool BlobDBImpl::ExtractTTLFromBlob(const Slice& value, Slice* newval,
- int32_t* ttl_val) {
- *newval = value;
- *ttl_val = -1;
- if (value.size() <= BlobDB::kTTLSuffixLength) return false;
-
- int32_t ttl_tmp =
- DecodeFixed32(value.data() + value.size() - sizeof(int32_t));
- std::string ttl_exp(value.data() + value.size() - BlobDB::kTTLSuffixLength,
- 4);
- if (ttl_exp != "ttl:") return false;
-
- newval->remove_suffix(BlobDB::kTTLSuffixLength);
- *ttl_val = ttl_tmp;
- return true;
-}
-
-////////////////////////////////////////////////////////////////////////////////
-// A specific pattern is looked up at the end of the value part.
-// ttl:TTLVAL . if this pattern is found, PutWithTTL is called, otherwise
-// regular Put is called.
-////////////////////////////////////////////////////////////////////////////////
Status BlobDBImpl::Put(const WriteOptions& options,
ColumnFamilyHandle* column_family, const Slice& key,
const Slice& value) {
- Slice newval;
- int32_t ttl_val;
- if (bdb_options_.extract_ttl_fn) {
- bdb_options_.extract_ttl_fn(value, &newval, &ttl_val);
- return PutWithTTL(options, column_family, key, newval, ttl_val);
- }
-
- return PutWithTTL(options, column_family, key, value, -1);
+ std::string new_value;
+ Slice value_slice;
+ int32_t expiration = ExtractExpiration(key, value, &value_slice, &new_value);
+ return PutUntil(options, column_family, key, value_slice, expiration);
}
Status BlobDBImpl::Delete(const WriteOptions& options,
Status batch_rewrite_status_;
std::shared_ptr<BlobFile> last_file_;
bool has_put_;
+ std::string new_value_;
public:
explicit BlobInserter(BlobDBImpl* impl, SequenceNumber seq)
bool has_put() { return has_put_; }
virtual Status PutCF(uint32_t column_family_id, const Slice& key,
- const Slice& value_unc) override {
- Slice newval;
- int32_t ttl_val = -1;
- if (impl_->bdb_options_.extract_ttl_fn) {
- impl_->bdb_options_.extract_ttl_fn(value_unc, &newval, &ttl_val);
- } else {
- newval = value_unc;
- }
+ const Slice& value_slice) override {
+ Slice value_unc;
+ int32_t expiration =
+ impl_->ExtractExpiration(key, value_slice, &value_unc, &new_value_);
- int32_t expiration = -1;
- if (ttl_val != -1) {
- std::time_t cur_t = std::chrono::system_clock::to_time_t(
- std::chrono::system_clock::now());
- expiration = ttl_val + static_cast<int32_t>(cur_t);
- }
std::shared_ptr<BlobFile> bfile =
- (ttl_val != -1)
+ (expiration != -1)
? impl_->SelectBlobFileTTL(expiration)
: ((last_file_) ? last_file_ : impl_->SelectBlobFile());
if (last_file_ && last_file_ != bfile) {
ColumnFamilyHandle* column_family,
const Slice& key, const Slice& value,
int32_t ttl) {
- return PutUntil(
- options, column_family, key, value,
- (ttl != -1)
- ? ttl + static_cast<int32_t>(std::chrono::system_clock::to_time_t(
- std::chrono::system_clock::now()))
- : -1);
+ return PutUntil(options, column_family, key, value,
+ static_cast<int32_t>(EpochNow()) + ttl);
}
Slice BlobDBImpl::GetCompressedSlice(const Slice& raw,
return *compression_output;
}
+// TODO(yiwu): We should use uint64_t for expiration.
Status BlobDBImpl::PutUntil(const WriteOptions& options,
ColumnFamilyHandle* column_family, const Slice& key,
const Slice& value_unc, int32_t expiration) {
return s;
}
+// TODO(yiwu): We should return uint64_t after updating the rest of the code
+// to use uint64_t for expiration.
+int32_t BlobDBImpl::ExtractExpiration(const Slice& key, const Slice& value,
+ Slice* value_slice,
+ std::string* new_value) {
+ uint64_t expiration = kNoExpiration;
+ bool value_changed = false;
+ if (ttl_extractor_ != nullptr) {
+ bool has_ttl = ttl_extractor_->ExtractExpiration(
+ key, value, EpochNow(), &expiration, new_value, &value_changed);
+ if (!has_ttl) {
+ expiration = kNoExpiration;
+ }
+ }
+ *value_slice = value_changed ? Slice(*new_value) : value;
+ return (expiration == kNoExpiration) ? -1 : static_cast<int32_t>(expiration);
+}
+
Status BlobDBImpl::AppendBlob(const std::shared_ptr<BlobFile>& bfile,
const std::string& headerbuf, const Slice& key,
const Slice& value, std::string* index_entry) {
// takes locks when called
std::shared_ptr<RandomAccessFileReader> reader =
- GetOrOpenRandomAccessReader(bfile, myenv_, env_options_);
+ GetOrOpenRandomAccessReader(bfile, env_, env_options_);
if (value != nullptr) {
std::string* valueptr = value;
assert(!bfile->Immutable());
}
- std::time_t epoch_now =
- std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
+ uint64_t epoch_now = EpochNow();
for (auto bfile_pair : blob_files_) {
auto bfile = bfile_pair.second;
ROCKS_LOG_INFO(
db_options_.info_log,
- "Blob File %s %" PRIu64 " %" PRIu64 " %" PRIu64 " %" PRIu64 " %d",
+ "Blob File %s %" PRIu64 " %" PRIu64 " %" PRIu64 " %" PRIu64 " %" PRIu64,
bfile->PathName().c_str(), bfile->GetFileSize(), bfile->BlobCount(),
bfile->deleted_count_, bfile->deleted_size_,
(bfile->ttl_range_.second - epoch_now));
std::vector<std::shared_ptr<BlobFile>> process_files;
{
- std::time_t epoch_now =
- std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
+ uint64_t epoch_now = EpochNow();
ReadLock rl(&mutex_);
for (auto bfile : open_blob_files_) {
////////////////////////////////////////////////////////////////////////////////
Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr<BlobFile>& bfptr,
GCStats* gcstats) {
- std::chrono::system_clock::time_point now = std::chrono::system_clock::now();
- std::time_t tt = std::chrono::system_clock::to_time_t(now);
+ uint64_t tt = EpochNow();
std::shared_ptr<Reader> reader =
- bfptr->OpenSequentialReader(myenv_, db_options_, env_options_);
+ bfptr->OpenSequentialReader(env_, db_options_, env_options_);
if (!reader) {
ROCKS_LOG_ERROR(db_options_.info_log,
"File sequential reader could not be opened",
}
}
- Status s = myenv_->DeleteFile(bfile->PathName());
+ Status s = env_->DeleteFile(bfile->PathName());
if (!s.ok()) {
ROCKS_LOG_ERROR(db_options_.info_log,
"File failed to be deleted as obsolete %s",
bool BlobDBImpl::CallbackEvictsImpl(std::shared_ptr<BlobFile> bfile) {
std::shared_ptr<Reader> reader =
- bfile->OpenSequentialReader(myenv_, db_options_, env_options_);
+ bfile->OpenSequentialReader(env_, db_options_, env_options_);
if (!reader) {
ROCKS_LOG_ERROR(
db_options_.info_log,
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily());
return CommonGet(cfh->cfd(), key, index_entry, nullptr, sequence);
}
+
+std::vector<std::shared_ptr<BlobFile>> BlobDBImpl::TEST_GetBlobFiles() const {
+ std::vector<std::shared_ptr<BlobFile>> blob_files;
+ for (auto& p : blob_files_) {
+ blob_files.emplace_back(p.second);
+ }
+ return blob_files;
+}
+
+void BlobDBImpl::TEST_CloseBlobFile(std::shared_ptr<BlobFile>& bfile) {
+ CloseSeqWrite(bfile, false /*abort*/);
+}
+
+Status BlobDBImpl::TEST_GCFileAndUpdateLSM(std::shared_ptr<BlobFile>& bfile,
+ GCStats* gc_stats) {
+ return GCFileAndUpdateLSM(bfile, gc_stats);
+}
#endif // !NDEBUG
} // namespace blob_db
#include <atomic>
#include <condition_variable>
#include <ctime>
+#include <limits>
#include <list>
#include <memory>
#include <set>
class BlobFile;
class BlobDBImpl;
-struct GCStats;
class BlobDBFlushBeginListener : public EventListener {
public:
const std::shared_ptr<BlobFile>& rhs) const;
};
+struct GCStats {
+ uint64_t blob_count = 0;
+ uint64_t num_deletes = 0;
+ uint64_t deleted_size = 0;
+ uint64_t num_relocs = 0;
+ uint64_t succ_deletes_lsm = 0;
+ uint64_t overrided_while_delete = 0;
+ uint64_t succ_relocs = 0;
+ std::shared_ptr<BlobFile> newfile = nullptr;
+};
+
/**
* The implementation class for BlobDB. This manages the value
* part in TTL aware sequentially written files. These files are
friend class BlobDBIterator;
public:
+ static constexpr uint64_t kNoExpiration =
+ std::numeric_limits<uint64_t>::max();
+
using rocksdb::StackableDB::Put;
Status Put(const WriteOptions& options, ColumnFamilyHandle* column_family,
const Slice& key, const Slice& value) override;
#ifndef NDEBUG
Status TEST_GetSequenceNumber(const Slice& key, SequenceNumber* sequence);
+
+ std::vector<std::shared_ptr<BlobFile>> TEST_GetBlobFiles() const;
+
+ void TEST_CloseBlobFile(std::shared_ptr<BlobFile>& bfile);
+
+ Status TEST_GCFileAndUpdateLSM(std::shared_ptr<BlobFile>& bfile,
+ GCStats* gc_stats);
#endif // !NDEBUG
private:
- static bool ExtractTTLFromBlob(const Slice& value, Slice* newval,
- int32_t* ttl_val);
-
Status OpenPhase1();
Status CommonGet(const ColumnFamilyData* cfd, const Slice& key,
// appends a task into timer queue to close the file
void CloseIf(const std::shared_ptr<BlobFile>& bfile);
+ int32_t ExtractExpiration(const Slice& key, const Slice& value,
+ Slice* value_slice, std::string* new_value);
+
Status AppendBlob(const std::shared_ptr<BlobFile>& bfile,
const std::string& headerbuf, const Slice& key,
const Slice& value, std::string* index_entry);
std::vector<std::shared_ptr<BlobFile>>* to_process, uint64_t epoch,
uint64_t last_id, size_t files_to_collect);
- private:
+ uint64_t EpochNow() { return env_->NowMicros() / 1000000; }
+
// the base DB
DBImpl* db_impl_;
-
- Env* myenv_;
+ Env* env_;
+ TTLExtractor* ttl_extractor_;
// Optimistic Transaction DB used during Garbage collection
// for atomicity
public:
const int kMaxBlobSize = 1 << 14;
- BlobDBTest() : dbname_(test::TmpDir() + "/blob_db_test"), blob_db_(nullptr) {
+ class MockEnv : public EnvWrapper {
+ public:
+ MockEnv() : EnvWrapper(Env::Default()) {}
+
+ void set_now_micros(uint64_t now_micros) { now_micros_ = now_micros; }
+
+ uint64_t NowMicros() override { return now_micros_; }
+
+ private:
+ uint64_t now_micros_ = 0;
+ };
+
+ BlobDBTest()
+ : dbname_(test::TmpDir() + "/blob_db_test"),
+ mock_env_(new MockEnv()),
+ blob_db_(nullptr) {
Status s = DestroyBlobDB(dbname_, Options(), BlobDBOptions());
assert(s.ok());
}
}
}
+ void PutRandomUntil(const std::string &key, int32_t expiration, Random *rnd,
+ std::map<std::string, std::string> *data = nullptr) {
+ int len = rnd->Next() % kMaxBlobSize + 1;
+ std::string value = test::RandomHumanReadableString(rnd, len);
+ ASSERT_OK(blob_db_->PutUntil(WriteOptions(), Slice(key), Slice(value),
+ expiration));
+ if (data != nullptr) {
+ (*data)[key] = value;
+ }
+ }
+
void PutRandom(const std::string &key, Random *rnd,
std::map<std::string, std::string> *data = nullptr) {
- PutRandomWithTTL(key, -1, rnd, data);
+ int len = rnd->Next() % kMaxBlobSize + 1;
+ std::string value = test::RandomHumanReadableString(rnd, len);
+ ASSERT_OK(blob_db_->Put(WriteOptions(), Slice(key), Slice(value)));
+ if (data != nullptr) {
+ (*data)[key] = value;
+ }
}
void PutRandomToWriteBatch(
}
const std::string dbname_;
+ std::unique_ptr<MockEnv> mock_env_;
+ std::shared_ptr<TTLExtractor> ttl_extractor_;
BlobDB *blob_db_;
}; // class BlobDBTest
VerifyDB(data);
}
+TEST_F(BlobDBTest, PutWithTTL) {
+ Random rnd(301);
+ Options options;
+ options.env = mock_env_.get();
+ BlobDBOptionsImpl bdb_options;
+ bdb_options.ttl_range_secs = 1000;
+ bdb_options.blob_file_size = 256 * 1000 * 1000;
+ bdb_options.disable_background_tasks = true;
+ Open(bdb_options, options);
+ std::map<std::string, std::string> data;
+ mock_env_->set_now_micros(50 * 1000000);
+ for (size_t i = 0; i < 100; i++) {
+ int32_t ttl = rnd.Next() % 100;
+ PutRandomWithTTL("key" + ToString(i), ttl, &rnd,
+ (ttl < 50 ? nullptr : &data));
+ }
+ mock_env_->set_now_micros(100 * 1000000);
+ auto *bdb_impl = static_cast<BlobDBImpl *>(blob_db_);
+ auto blob_files = bdb_impl->TEST_GetBlobFiles();
+ ASSERT_EQ(1, blob_files.size());
+ ASSERT_TRUE(blob_files[0]->HasTTL());
+ bdb_impl->TEST_CloseBlobFile(blob_files[0]);
+ GCStats gc_stats;
+ ASSERT_OK(bdb_impl->TEST_GCFileAndUpdateLSM(blob_files[0], &gc_stats));
+ ASSERT_EQ(100 - data.size(), gc_stats.num_deletes);
+ ASSERT_EQ(data.size(), gc_stats.num_relocs);
+ VerifyDB(data);
+}
+
+TEST_F(BlobDBTest, PutUntil) {
+ Random rnd(301);
+ Options options;
+ options.env = mock_env_.get();
+ BlobDBOptionsImpl bdb_options;
+ bdb_options.ttl_range_secs = 1000;
+ bdb_options.blob_file_size = 256 * 1000 * 1000;
+ bdb_options.disable_background_tasks = true;
+ Open(bdb_options, options);
+ std::map<std::string, std::string> data;
+ mock_env_->set_now_micros(50 * 1000000);
+ for (size_t i = 0; i < 100; i++) {
+ int32_t expiration = rnd.Next() % 100 + 50;
+ PutRandomUntil("key" + ToString(i), expiration, &rnd,
+ (expiration < 100 ? nullptr : &data));
+ }
+ mock_env_->set_now_micros(100 * 1000000);
+ auto *bdb_impl = static_cast<BlobDBImpl *>(blob_db_);
+ auto blob_files = bdb_impl->TEST_GetBlobFiles();
+ ASSERT_EQ(1, blob_files.size());
+ ASSERT_TRUE(blob_files[0]->HasTTL());
+ bdb_impl->TEST_CloseBlobFile(blob_files[0]);
+ GCStats gc_stats;
+ ASSERT_OK(bdb_impl->TEST_GCFileAndUpdateLSM(blob_files[0], &gc_stats));
+ ASSERT_EQ(100 - data.size(), gc_stats.num_deletes);
+ ASSERT_EQ(data.size(), gc_stats.num_relocs);
+ VerifyDB(data);
+}
+
+TEST_F(BlobDBTest, TTLExtrator_NoTTL) {
+ // The default ttl extractor return no ttl for every key.
+ ttl_extractor_.reset(new TTLExtractor());
+ Random rnd(301);
+ Options options;
+ options.env = mock_env_.get();
+ BlobDBOptionsImpl bdb_options;
+ bdb_options.ttl_range_secs = 1000;
+ bdb_options.blob_file_size = 256 * 1000 * 1000;
+ bdb_options.num_concurrent_simple_blobs = 1;
+ bdb_options.ttl_extractor = ttl_extractor_;
+ bdb_options.disable_background_tasks = true;
+ Open(bdb_options, options);
+ std::map<std::string, std::string> data;
+ mock_env_->set_now_micros(0);
+ for (size_t i = 0; i < 100; i++) {
+ PutRandom("key" + ToString(i), &rnd, &data);
+ }
+ // very far in the future..
+ mock_env_->set_now_micros(std::numeric_limits<uint64_t>::max() - 10);
+ auto *bdb_impl = static_cast<BlobDBImpl *>(blob_db_);
+ auto blob_files = bdb_impl->TEST_GetBlobFiles();
+ ASSERT_EQ(1, blob_files.size());
+ ASSERT_FALSE(blob_files[0]->HasTTL());
+ bdb_impl->TEST_CloseBlobFile(blob_files[0]);
+ GCStats gc_stats;
+ ASSERT_OK(bdb_impl->TEST_GCFileAndUpdateLSM(blob_files[0], &gc_stats));
+ ASSERT_EQ(0, gc_stats.num_deletes);
+ ASSERT_EQ(100, gc_stats.num_relocs);
+ VerifyDB(data);
+}
+
+TEST_F(BlobDBTest, TTLExtractor_ExtractTTL) {
+ Random rnd(301);
+ class TestTTLExtractor : public TTLExtractor {
+ public:
+ explicit TestTTLExtractor(Random *r) : rnd(r) {}
+
+ virtual bool ExtractTTL(const Slice &key, const Slice &value, uint64_t *ttl,
+ std::string * /*new_value*/,
+ bool * /*value_changed*/) override {
+ *ttl = rnd->Next() % 100;
+ if (*ttl >= 50) {
+ data[key.ToString()] = value.ToString();
+ }
+ return true;
+ }
+
+ Random *rnd;
+ std::map<std::string, std::string> data;
+ };
+ ttl_extractor_.reset(new TestTTLExtractor(&rnd));
+ Options options;
+ options.env = mock_env_.get();
+ BlobDBOptionsImpl bdb_options;
+ bdb_options.ttl_range_secs = 1000;
+ bdb_options.blob_file_size = 256 * 1000 * 1000;
+ bdb_options.ttl_extractor = ttl_extractor_;
+ bdb_options.disable_background_tasks = true;
+ Open(bdb_options, options);
+ mock_env_->set_now_micros(50 * 1000000);
+ for (size_t i = 0; i < 100; i++) {
+ PutRandom("key" + ToString(i), &rnd);
+ }
+ mock_env_->set_now_micros(100 * 1000000);
+ auto *bdb_impl = static_cast<BlobDBImpl *>(blob_db_);
+ auto blob_files = bdb_impl->TEST_GetBlobFiles();
+ ASSERT_EQ(1, blob_files.size());
+ ASSERT_TRUE(blob_files[0]->HasTTL());
+ bdb_impl->TEST_CloseBlobFile(blob_files[0]);
+ GCStats gc_stats;
+ ASSERT_OK(bdb_impl->TEST_GCFileAndUpdateLSM(blob_files[0], &gc_stats));
+ auto &data = static_cast<TestTTLExtractor *>(ttl_extractor_.get())->data;
+ ASSERT_EQ(100 - data.size(), gc_stats.num_deletes);
+ ASSERT_EQ(data.size(), gc_stats.num_relocs);
+ VerifyDB(data);
+}
+
+TEST_F(BlobDBTest, TTLExtractor_ExtractExpiration) {
+ Random rnd(301);
+ class TestTTLExtractor : public TTLExtractor {
+ public:
+ explicit TestTTLExtractor(Random *r) : rnd(r) {}
+
+ virtual bool ExtractExpiration(const Slice &key, const Slice &value,
+ uint64_t /*now*/, uint64_t *expiration,
+ std::string * /*new_value*/,
+ bool * /*value_changed*/) override {
+ *expiration = rnd->Next() % 100 + 50;
+ if (*expiration >= 100) {
+ data[key.ToString()] = value.ToString();
+ }
+ return true;
+ }
+
+ Random *rnd;
+ std::map<std::string, std::string> data;
+ };
+ ttl_extractor_.reset(new TestTTLExtractor(&rnd));
+ Options options;
+ options.env = mock_env_.get();
+ BlobDBOptionsImpl bdb_options;
+ bdb_options.ttl_range_secs = 1000;
+ bdb_options.blob_file_size = 256 * 1000 * 1000;
+ bdb_options.ttl_extractor = ttl_extractor_;
+ bdb_options.disable_background_tasks = true;
+ Open(bdb_options, options);
+ mock_env_->set_now_micros(50 * 1000000);
+ for (size_t i = 0; i < 100; i++) {
+ PutRandom("key" + ToString(i), &rnd);
+ }
+ mock_env_->set_now_micros(100 * 1000000);
+ auto *bdb_impl = static_cast<BlobDBImpl *>(blob_db_);
+ auto blob_files = bdb_impl->TEST_GetBlobFiles();
+ ASSERT_EQ(1, blob_files.size());
+ ASSERT_TRUE(blob_files[0]->HasTTL());
+ bdb_impl->TEST_CloseBlobFile(blob_files[0]);
+ GCStats gc_stats;
+ ASSERT_OK(bdb_impl->TEST_GCFileAndUpdateLSM(blob_files[0], &gc_stats));
+ auto &data = static_cast<TestTTLExtractor *>(ttl_extractor_.get())->data;
+ ASSERT_EQ(100 - data.size(), gc_stats.num_deletes);
+ ASSERT_EQ(data.size(), gc_stats.num_relocs);
+ VerifyDB(data);
+}
+
+TEST_F(BlobDBTest, TTLExtractor_ChangeValue) {
+ class TestTTLExtractor : public TTLExtractor {
+ public:
+ const Slice kTTLSuffix = Slice("ttl:");
+
+ bool ExtractTTL(const Slice & /*key*/, const Slice &value, uint64_t *ttl,
+ std::string *new_value, bool *value_changed) override {
+ if (value.size() < 12) {
+ return false;
+ }
+ const char *p = value.data() + value.size() - 12;
+ if (kTTLSuffix != Slice(p, 4)) {
+ return false;
+ }
+ *ttl = DecodeFixed64(p + 4);
+ *new_value = Slice(value.data(), value.size() - 12).ToString();
+ *value_changed = true;
+ return true;
+ }
+ };
+ Random rnd(301);
+ Options options;
+ options.env = mock_env_.get();
+ BlobDBOptionsImpl bdb_options;
+ bdb_options.ttl_range_secs = 1000;
+ bdb_options.blob_file_size = 256 * 1000 * 1000;
+ bdb_options.ttl_extractor = std::make_shared<TestTTLExtractor>();
+ bdb_options.disable_background_tasks = true;
+ Open(bdb_options, options);
+ std::map<std::string, std::string> data;
+ mock_env_->set_now_micros(50 * 1000000);
+ for (size_t i = 0; i < 100; i++) {
+ int len = rnd.Next() % kMaxBlobSize + 1;
+ std::string key = "key" + ToString(i);
+ std::string value = test::RandomHumanReadableString(&rnd, len);
+ uint64_t ttl = rnd.Next() % 100;
+ std::string value_ttl = value + "ttl:";
+ PutFixed64(&value_ttl, ttl);
+ ASSERT_OK(blob_db_->Put(WriteOptions(), Slice(key), Slice(value_ttl)));
+ if (ttl >= 50) {
+ data[key] = value;
+ }
+ }
+ mock_env_->set_now_micros(100 * 1000000);
+ auto *bdb_impl = static_cast<BlobDBImpl *>(blob_db_);
+ auto blob_files = bdb_impl->TEST_GetBlobFiles();
+ ASSERT_EQ(1, blob_files.size());
+ ASSERT_TRUE(blob_files[0]->HasTTL());
+ bdb_impl->TEST_CloseBlobFile(blob_files[0]);
+ GCStats gc_stats;
+ ASSERT_OK(bdb_impl->TEST_GCFileAndUpdateLSM(blob_files[0], &gc_stats));
+ ASSERT_EQ(100 - data.size(), gc_stats.num_deletes);
+ ASSERT_EQ(data.size(), gc_stats.num_relocs);
+ VerifyDB(data);
+}
+
TEST_F(BlobDBTest, StackableDBGet) {
Random rnd(301);
BlobDBOptionsImpl bdb_options;
#include <cstddef>
#include <cstdint>
+#include <limits>
#include <memory>
#include <string>
#include <utility>
uint64_t GetBlobSize() const { return blob_size_; }
+ bool HasTTL() const {
+ return ttl_val_ != std::numeric_limits<uint32_t>::max();
+ }
+
uint32_t GetTTL() const { return ttl_val_; }
uint64_t GetTimeVal() const { return time_val_; }
--- /dev/null
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+#include "ttl_extractor.h"
+
+#include "util/coding.h"
+
+namespace rocksdb {
+namespace blob_db {
+
+bool TTLExtractor::ExtractTTL(const Slice& /*key*/, const Slice& /*value*/,
+ uint64_t* /*ttl*/, std::string* /*new_value*/,
+ bool* /*value_changed*/) {
+ return false;
+}
+
+bool TTLExtractor::ExtractExpiration(const Slice& key, const Slice& value,
+ uint64_t now, uint64_t* expiration,
+ std::string* new_value,
+ bool* value_changed) {
+ uint64_t ttl;
+ bool has_ttl = ExtractTTL(key, value, &ttl, new_value, value_changed);
+ if (has_ttl) {
+ *expiration = now + ttl;
+ }
+ return has_ttl;
+}
+
+} // namespace blob_db
+} // namespace rocksdb
--- /dev/null
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+#pragma once
+
+#include <memory>
+#include <string>
+
+#include "rocksdb/slice.h"
+
+namespace rocksdb {
+namespace blob_db {
+
+// TTLExtractor allow applications to extract TTL from key-value pairs.
+// This useful for applications using Put or WriteBatch to write keys and
+// don't intend to migrate to PutWithTTL or PutUntil.
+//
+// Applications can implement either ExtractTTL or ExtractExpiration. If both
+// are implemented, ExtractExpiration will take precedence.
+class TTLExtractor {
+ public:
+ // Extract TTL from key-value pair.
+ // Return true if the key has TTL, false otherwise. If key has TTL,
+ // TTL is pass back through ttl. The method can optionally modify the value,
+ // pass the result back through new_value, and also set value_changed to true.
+ virtual bool ExtractTTL(const Slice& key, const Slice& value, uint64_t* ttl,
+ std::string* new_value, bool* value_changed);
+
+ // Extract expiration time from key-value pair.
+ // Return true if the key has expiration time, false otherwise. If key has
+ // expiration time, it is pass back through expiration. The method can
+ // optionally modify the value, pass the result back through new_value,
+ // and also set value_changed to true.
+ virtual bool ExtractExpiration(const Slice& key, const Slice& value,
+ uint64_t now, uint64_t* expiration,
+ std::string* new_value, bool* value_changed);
+
+ virtual ~TTLExtractor() = default;
+};
+
+} // namespace blob_db
+} // namespace rocksdb