]> git-server-git.apps.pok.os.sepia.ceph.com Git - rocksdb.git/commitdiff
Blob DB TTL extractor
authorYi Wu <yiwu@fb.com>
Fri, 28 Jul 2017 06:16:18 +0000 (23:16 -0700)
committerYi Wu <yiwu@fb.com>
Fri, 28 Jul 2017 16:41:52 +0000 (09:41 -0700)
Summary:
Introducing blob_db::TTLExtractor to replace extract_ttl_fn. The TTL
extractor can be use to extract TTL from keys insert with Put or
WriteBatch. Change over existing extract_ttl_fn are:
* If value is changed, it will be return via std::string* (rather than Slice*). With Slice* the new value has to be part of the existing value. With std::string* the limitation is removed.
* It can optionally return TTL or expiration.

Other changes in this PR:
* replace `std::chrono::system_clock` with `Env::NowMicros` so that I can mock time in tests.
* add several TTL tests.
* other minor naming change.
Closes https://github.com/facebook/rocksdb/pull/2659

Differential Revision: D5512627

Pulled By: yiwu-arbug

fbshipit-source-id: 0dfcb00d74d060b8534c6130c808e4d5d0a54440

CMakeLists.txt
TARGETS
src.mk
utilities/blob_db/blob_db.cc
utilities/blob_db/blob_db.h
utilities/blob_db/blob_db_impl.cc
utilities/blob_db/blob_db_impl.h
utilities/blob_db/blob_db_test.cc
utilities/blob_db/blob_log_format.h
utilities/blob_db/ttl_extractor.cc [new file with mode: 0644]
utilities/blob_db/ttl_extractor.h [new file with mode: 0644]

index 1d0ad1cb06e85cbdd913f206e6237df0cad62fdf..61e314fa50ce001d6d58c231b4d3d37ad6365fc5 100644 (file)
@@ -483,6 +483,7 @@ set(SOURCES
         utilities/blob_db/blob_log_reader.cc
         utilities/blob_db/blob_log_writer.cc
         utilities/blob_db/blob_log_format.cc
+        utilities/blob_db/ttl_extractor.cc
         utilities/checkpoint/checkpoint_impl.cc
         utilities/col_buf_decoder.cc
         utilities/col_buf_encoder.cc
diff --git a/TARGETS b/TARGETS
index 1bafb01caa09e273fe0bbbf429d20238c4322d6c..de64bf5f75b7c68b5ffe22e8014f6974902f55dd 100644 (file)
--- a/TARGETS
+++ b/TARGETS
@@ -212,6 +212,7 @@ cpp_library(
       "utilities/blob_db/blob_log_reader.cc",
       "utilities/blob_db/blob_log_writer.cc",
       "utilities/blob_db/blob_log_format.cc",
+      "utilities/blob_db/ttl_extractor.cc",
       "utilities/checkpoint/checkpoint_impl.cc",
       "utilities/compaction_filters/remove_emptyvalue_compactionfilter.cc",
       "utilities/convenience/info_log_finder.cc",
diff --git a/src.mk b/src.mk
index 8250947f59657e071df63c7a12910066e9ef8ed5..cb3383ff05604040b6004dcda8275e2cb903138e 100644 (file)
--- a/src.mk
+++ b/src.mk
@@ -159,6 +159,7 @@ LIB_SOURCES =                                                   \
   utilities/blob_db/blob_log_reader.cc                          \
   utilities/blob_db/blob_log_writer.cc                          \
   utilities/blob_db/blob_log_format.cc                          \
+  utilities/blob_db/ttl_extractor.cc                            \
   utilities/checkpoint/checkpoint_impl.cc                       \
   utilities/compaction_filters/remove_emptyvalue_compactionfilter.cc    \
   utilities/convenience/info_log_finder.cc                      \
index e2defe97ca0aafceeb94f90cc0ca0cbc531cfd53..ea60ad59b479d73b40dee34bdebf2ddc198d7e90 100644 (file)
@@ -17,7 +17,6 @@
 #include "table/block.h"
 #include "table/block_based_table_builder.h"
 #include "table/block_builder.h"
-#include "util/crc32c.h"
 #include "util/file_reader_writer.h"
 #include "util/filename.h"
 #include "utilities/blob_db/blob_db_impl.h"
@@ -163,7 +162,6 @@ BlobDBOptions::BlobDBOptions()
       bytes_per_sync(0),
       blob_file_size(256 * 1024 * 1024),
       num_concurrent_simple_blobs(4),
-      default_ttl_extractor(false),
       compression(kNoCompression) {}
 
 }  // namespace blob_db
index f45a42f60a9831a27a49a1db71417f589eeb94e1..dfb21383dda65d9b4426aa5250e7166b5e4fbed0 100644 (file)
@@ -13,6 +13,7 @@
 #include "rocksdb/db.h"
 #include "rocksdb/status.h"
 #include "rocksdb/utilities/stackable_db.h"
+#include "utilities/blob_db/ttl_extractor.h"
 
 namespace rocksdb {
 
@@ -64,15 +65,10 @@ struct BlobDBOptions {
   // how many files to use for simple blobs at one time
   uint32_t num_concurrent_simple_blobs;
 
-  // this function is to be provided by client if they intend to
-  // use Put API to provide TTL.
-  // the first argument is the value in the Put API
-  // in case you want to do some modifications to the value,
-  // return a new Slice in the second.
-  // otherwise just copy the input value into output.
-  // the ttl should be extracted and returned in last pointer.
-  // otherwise assign it to -1
-  std::function<bool(const Slice&, Slice*, int32_t*)> extract_ttl_fn;
+  // Instead of setting TTL explicitly by calling PutWithTTL or PutUntil,
+  // applications can set a TTLExtractor which can extract TTL from key-value
+  // pairs.
+  std::shared_ptr<TTLExtractor> ttl_extractor;
 
   // eviction callback.
   // this function will be called for every blob that is getting
@@ -80,9 +76,6 @@ struct BlobDBOptions {
   std::function<void(const ColumnFamilyHandle*, const Slice&, const Slice&)>
       gc_evict_cb_fn;
 
-  // default ttl extactor
-  bool default_ttl_extractor;
-
   // what compression to use for Blob's
   CompressionType compression;
 
@@ -95,10 +88,6 @@ struct BlobDBOptions {
 };
 
 class BlobDB : public StackableDB {
- public:
-  // the suffix to a blob value to represent "ttl:TTLVAL"
-  static const uint64_t kTTLSuffixLength = 8;
-
  public:
   using rocksdb::StackableDB::Put;
 
@@ -120,6 +109,8 @@ class BlobDB : public StackableDB {
     return PutWithTTL(options, DefaultColumnFamily(), key, value, ttl);
   }
 
+  // Put with expiration. Key with expiration time equal to -1
+  // means the key don't expire.
   virtual Status PutUntil(const WriteOptions& options,
                           ColumnFamilyHandle* column_family, const Slice& key,
                           const Slice& value, int32_t expiration) = 0;
index 1dd72b6bc3a7ffbe7b49e8a945c372e4c38c86f0..95deda5b0cc2905d628682b96477b301a6ffe79b 100644 (file)
@@ -6,9 +6,7 @@
 
 #include "utilities/blob_db/blob_db_impl.h"
 #include <algorithm>
-#include <chrono>
 #include <cinttypes>
-#include <ctime>
 #include <iomanip>
 #include <limits>
 #include <memory>
@@ -58,17 +56,6 @@ namespace rocksdb {
 
 namespace blob_db {
 
-struct GCStats {
-  uint64_t blob_count = 0;
-  uint64_t num_deletes = 0;
-  uint64_t deleted_size = 0;
-  uint64_t num_relocs = 0;
-  uint64_t succ_deletes_lsm = 0;
-  uint64_t overrided_while_delete = 0;
-  uint64_t succ_relocs = 0;
-  std::shared_ptr<BlobFile> newfile = nullptr;
-};
-
 // BlobHandle is a pointer to the blob that is stored in the LSM
 class BlobHandle {
  public:
@@ -192,7 +179,8 @@ BlobDBImpl::BlobDBImpl(const std::string& dbname,
                        const DBOptions& db_options)
     : BlobDB(nullptr),
       db_impl_(nullptr),
-      myenv_(db_options.env),
+      env_(db_options.env),
+      ttl_extractor_(blob_db_options.ttl_extractor.get()),
       wo_set_(false),
       bdb_options_(blob_db_options),
       db_options_(db_options),
@@ -218,10 +206,6 @@ BlobDBImpl::BlobDBImpl(const std::string& dbname,
   blob_dir_ = (bdb_options_.path_relative)
                   ? dbname + "/" + bdb_options_.blob_dir
                   : bdb_options_.blob_dir;
-
-  if (bdb_options_.default_ttl_extractor) {
-    bdb_options_.extract_ttl_fn = &BlobDBImpl::ExtractTTLFromBlob;
-  }
 }
 
 Status BlobDBImpl::LinkToBaseDB(DB* db) {
@@ -238,17 +222,17 @@ Status BlobDBImpl::LinkToBaseDB(DB* db) {
     db_impl_ = dynamic_cast<DBImpl*>(db);
   }
 
-  myenv_ = db_->GetEnv();
+  env_ = db_->GetEnv();
 
   opt_db_.reset(new OptimisticTransactionDBImpl(db, false));
 
-  Status s = myenv_->CreateDirIfMissing(blob_dir_);
+  Status s = env_->CreateDirIfMissing(blob_dir_);
   if (!s.ok()) {
     ROCKS_LOG_WARN(db_options_.info_log,
                    "Failed to create blob directory: %s status: '%s'",
                    blob_dir_.c_str(), s.ToString().c_str());
   }
-  s = myenv_->NewDirectory(blob_dir_, &dir_ent_);
+  s = env_->NewDirectory(blob_dir_, &dir_ent_);
   if (!s.ok()) {
     ROCKS_LOG_WARN(db_options_.info_log,
                    "Failed to open blob directory: %s status: '%s'",
@@ -293,10 +277,6 @@ BlobDBImpl::BlobDBImpl(DB* db, const BlobDBOptions& blob_db_options)
     blob_dir_ = (bdb_options_.path_relative)
                     ? db_->GetName() + "/" + bdb_options_.blob_dir
                     : bdb_options_.blob_dir;
-
-  if (bdb_options_.default_ttl_extractor) {
-    bdb_options_.extract_ttl_fn = &BlobDBImpl::ExtractTTLFromBlob;
-  }
 }
 
 BlobDBImpl::~BlobDBImpl() {
@@ -311,7 +291,7 @@ Status BlobDBImpl::OpenPhase1() {
     return Status::NotSupported("No blob directory in options");
 
   std::unique_ptr<Directory> dir_ent;
-  Status s = myenv_->NewDirectory(blob_dir_, &dir_ent);
+  Status s = env_->NewDirectory(blob_dir_, &dir_ent);
   if (!s.ok()) {
     ROCKS_LOG_WARN(db_options_.info_log,
                    "Failed to open blob directory: %s status: '%s'",
@@ -366,7 +346,7 @@ void BlobDBImpl::OnFlushBeginHandler(DB* db, const FlushJobInfo& info) {
 Status BlobDBImpl::GetAllLogFiles(
     std::set<std::pair<uint64_t, std::string>>* file_nums) {
   std::vector<std::string> all_files;
-  Status status = myenv_->GetChildren(blob_dir_, &all_files);
+  Status status = env_->GetChildren(blob_dir_, &all_files);
   if (!status.ok()) {
     return status;
   }
@@ -413,7 +393,7 @@ Status BlobDBImpl::OpenAllFiles() {
   for (auto f_iter : file_nums) {
     std::string bfpath = BlobFileName(blob_dir_, f_iter.first);
     uint64_t size_bytes;
-    Status s1 = myenv_->GetFileSize(bfpath, &size_bytes);
+    Status s1 = env_->GetFileSize(bfpath, &size_bytes);
     if (!s1.ok()) {
       ROCKS_LOG_WARN(
           db_options_.info_log,
@@ -436,7 +416,7 @@ Status BlobDBImpl::OpenAllFiles() {
 
     // read header
     std::shared_ptr<Reader> reader;
-    reader = bfptr->OpenSequentialReader(myenv_, db_options_, env_options_);
+    reader = bfptr->OpenSequentialReader(env_, db_options_, env_options_);
     s1 = reader->ReadHeader(&bfptr->header_);
     if (!s1.ok()) {
       ROCKS_LOG_ERROR(db_options_.info_log,
@@ -448,7 +428,7 @@ Status BlobDBImpl::OpenAllFiles() {
     bfptr->header_valid_ = true;
 
     std::shared_ptr<RandomAccessFileReader> ra_reader =
-        GetOrOpenRandomAccessReader(bfptr, myenv_, env_options_);
+        GetOrOpenRandomAccessReader(bfptr, env_, env_options_);
 
     BlobLogFooter bf;
     s1 = bfptr->ReadFooter(&bf);
@@ -586,13 +566,13 @@ Status BlobDBImpl::CreateWriterLocked(const std::shared_ptr<BlobFile>& bfile) {
   EnvOptions env_options = env_options_;
   env_options.writable_file_max_buffer_size = 0;
 
-  Status s = myenv_->ReopenWritableFile(fpath, &wfile, env_options);
+  Status s = env_->ReopenWritableFile(fpath, &wfile, env_options);
   if (!s.ok()) {
     ROCKS_LOG_ERROR(db_options_.info_log,
                     "Failed to open blob file for write: %s status: '%s'"
                     " exists: '%s'",
                     fpath.c_str(), s.ToString().c_str(),
-                    myenv_->FileExists(fpath).ToString().c_str());
+                    env_->FileExists(fpath).ToString().c_str());
     return s;
   }
 
@@ -788,39 +768,13 @@ std::shared_ptr<BlobFile> BlobDBImpl::SelectBlobFileTTL(uint32_t expiration) {
   return bfile;
 }
 
-bool BlobDBImpl::ExtractTTLFromBlob(const Slice& value, Slice* newval,
-                                    int32_t* ttl_val) {
-  *newval = value;
-  *ttl_val = -1;
-  if (value.size() <= BlobDB::kTTLSuffixLength) return false;
-
-  int32_t ttl_tmp =
-      DecodeFixed32(value.data() + value.size() - sizeof(int32_t));
-  std::string ttl_exp(value.data() + value.size() - BlobDB::kTTLSuffixLength,
-                      4);
-  if (ttl_exp != "ttl:") return false;
-
-  newval->remove_suffix(BlobDB::kTTLSuffixLength);
-  *ttl_val = ttl_tmp;
-  return true;
-}
-
-////////////////////////////////////////////////////////////////////////////////
-// A specific pattern is looked up at the end of the value part.
-// ttl:TTLVAL . if this pattern is found, PutWithTTL is called, otherwise
-// regular Put is called.
-////////////////////////////////////////////////////////////////////////////////
 Status BlobDBImpl::Put(const WriteOptions& options,
                        ColumnFamilyHandle* column_family, const Slice& key,
                        const Slice& value) {
-  Slice newval;
-  int32_t ttl_val;
-  if (bdb_options_.extract_ttl_fn) {
-    bdb_options_.extract_ttl_fn(value, &newval, &ttl_val);
-    return PutWithTTL(options, column_family, key, newval, ttl_val);
-  }
-
-  return PutWithTTL(options, column_family, key, value, -1);
+  std::string new_value;
+  Slice value_slice;
+  int32_t expiration = ExtractExpiration(key, value, &value_slice, &new_value);
+  return PutUntil(options, column_family, key, value_slice, expiration);
 }
 
 Status BlobDBImpl::Delete(const WriteOptions& options,
@@ -852,6 +806,7 @@ Status BlobDBImpl::Write(const WriteOptions& opts, WriteBatch* updates) {
     Status batch_rewrite_status_;
     std::shared_ptr<BlobFile> last_file_;
     bool has_put_;
+    std::string new_value_;
 
    public:
     explicit BlobInserter(BlobDBImpl* impl, SequenceNumber seq)
@@ -866,23 +821,13 @@ Status BlobDBImpl::Write(const WriteOptions& opts, WriteBatch* updates) {
     bool has_put() { return has_put_; }
 
     virtual Status PutCF(uint32_t column_family_id, const Slice& key,
-                         const Slice& value_unc) override {
-      Slice newval;
-      int32_t ttl_val = -1;
-      if (impl_->bdb_options_.extract_ttl_fn) {
-        impl_->bdb_options_.extract_ttl_fn(value_unc, &newval, &ttl_val);
-      } else {
-        newval = value_unc;
-      }
+                         const Slice& value_slice) override {
+      Slice value_unc;
+      int32_t expiration =
+          impl_->ExtractExpiration(key, value_slice, &value_unc, &new_value_);
 
-      int32_t expiration = -1;
-      if (ttl_val != -1) {
-        std::time_t cur_t = std::chrono::system_clock::to_time_t(
-            std::chrono::system_clock::now());
-        expiration = ttl_val + static_cast<int32_t>(cur_t);
-      }
       std::shared_ptr<BlobFile> bfile =
-          (ttl_val != -1)
+          (expiration != -1)
               ? impl_->SelectBlobFileTTL(expiration)
               : ((last_file_) ? last_file_ : impl_->SelectBlobFile());
       if (last_file_ && last_file_ != bfile) {
@@ -1004,12 +949,8 @@ Status BlobDBImpl::PutWithTTL(const WriteOptions& options,
                               ColumnFamilyHandle* column_family,
                               const Slice& key, const Slice& value,
                               int32_t ttl) {
-  return PutUntil(
-      options, column_family, key, value,
-      (ttl != -1)
-          ? ttl + static_cast<int32_t>(std::chrono::system_clock::to_time_t(
-                      std::chrono::system_clock::now()))
-          : -1);
+  return PutUntil(options, column_family, key, value,
+                  static_cast<int32_t>(EpochNow()) + ttl);
 }
 
 Slice BlobDBImpl::GetCompressedSlice(const Slice& raw,
@@ -1024,6 +965,7 @@ Slice BlobDBImpl::GetCompressedSlice(const Slice& raw,
   return *compression_output;
 }
 
+// TODO(yiwu): We should use uint64_t for expiration.
 Status BlobDBImpl::PutUntil(const WriteOptions& options,
                             ColumnFamilyHandle* column_family, const Slice& key,
                             const Slice& value_unc, int32_t expiration) {
@@ -1097,6 +1039,24 @@ Status BlobDBImpl::PutUntil(const WriteOptions& options,
   return s;
 }
 
+// TODO(yiwu): We should return uint64_t after updating the rest of the code
+// to use uint64_t for expiration.
+int32_t BlobDBImpl::ExtractExpiration(const Slice& key, const Slice& value,
+                                      Slice* value_slice,
+                                      std::string* new_value) {
+  uint64_t expiration = kNoExpiration;
+  bool value_changed = false;
+  if (ttl_extractor_ != nullptr) {
+    bool has_ttl = ttl_extractor_->ExtractExpiration(
+        key, value, EpochNow(), &expiration, new_value, &value_changed);
+    if (!has_ttl) {
+      expiration = kNoExpiration;
+    }
+  }
+  *value_slice = value_changed ? Slice(*new_value) : value;
+  return (expiration == kNoExpiration) ? -1 : static_cast<int32_t>(expiration);
+}
+
 Status BlobDBImpl::AppendBlob(const std::shared_ptr<BlobFile>& bfile,
                               const std::string& headerbuf, const Slice& key,
                               const Slice& value, std::string* index_entry) {
@@ -1240,7 +1200,7 @@ Status BlobDBImpl::CommonGet(const ColumnFamilyData* cfd, const Slice& key,
 
   // takes locks when called
   std::shared_ptr<RandomAccessFileReader> reader =
-      GetOrOpenRandomAccessReader(bfile, myenv_, env_options_);
+      GetOrOpenRandomAccessReader(bfile, env_, env_options_);
 
   if (value != nullptr) {
     std::string* valueptr = value;
@@ -1377,14 +1337,13 @@ std::pair<bool, int64_t> BlobDBImpl::SanityCheck(bool aborted) {
     assert(!bfile->Immutable());
   }
 
-  std::time_t epoch_now =
-      std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
+  uint64_t epoch_now = EpochNow();
 
   for (auto bfile_pair : blob_files_) {
     auto bfile = bfile_pair.second;
     ROCKS_LOG_INFO(
         db_options_.info_log,
-        "Blob File %s %" PRIu64 " %" PRIu64 " %" PRIu64 " %" PRIu64 " %d",
+        "Blob File %s %" PRIu64 " %" PRIu64 " %" PRIu64 " %" PRIu64 " %" PRIu64,
         bfile->PathName().c_str(), bfile->GetFileSize(), bfile->BlobCount(),
         bfile->deleted_count_, bfile->deleted_size_,
         (bfile->ttl_range_.second - epoch_now));
@@ -1603,8 +1562,7 @@ std::pair<bool, int64_t> BlobDBImpl::CheckSeqFiles(bool aborted) {
 
   std::vector<std::shared_ptr<BlobFile>> process_files;
   {
-    std::time_t epoch_now =
-        std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
+    uint64_t epoch_now = EpochNow();
 
     ReadLock rl(&mutex_);
     for (auto bfile : open_blob_files_) {
@@ -1713,11 +1671,10 @@ std::pair<bool, int64_t> BlobDBImpl::WaStats(bool aborted) {
 ////////////////////////////////////////////////////////////////////////////////
 Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr<BlobFile>& bfptr,
                                       GCStats* gcstats) {
-  std::chrono::system_clock::time_point now = std::chrono::system_clock::now();
-  std::time_t tt = std::chrono::system_clock::to_time_t(now);
+  uint64_t tt = EpochNow();
 
   std::shared_ptr<Reader> reader =
-      bfptr->OpenSequentialReader(myenv_, db_options_, env_options_);
+      bfptr->OpenSequentialReader(env_, db_options_, env_options_);
   if (!reader) {
     ROCKS_LOG_ERROR(db_options_.info_log,
                     "File sequential reader could not be opened",
@@ -1987,7 +1944,7 @@ std::pair<bool, int64_t> BlobDBImpl::DeleteObsFiles(bool aborted) {
       }
     }
 
-    Status s = myenv_->DeleteFile(bfile->PathName());
+    Status s = env_->DeleteFile(bfile->PathName());
     if (!s.ok()) {
       ROCKS_LOG_ERROR(db_options_.info_log,
                       "File failed to be deleted as obsolete %s",
@@ -2019,7 +1976,7 @@ std::pair<bool, int64_t> BlobDBImpl::DeleteObsFiles(bool aborted) {
 
 bool BlobDBImpl::CallbackEvictsImpl(std::shared_ptr<BlobFile> bfile) {
   std::shared_ptr<Reader> reader =
-      bfile->OpenSequentialReader(myenv_, db_options_, env_options_);
+      bfile->OpenSequentialReader(env_, db_options_, env_options_);
   if (!reader) {
     ROCKS_LOG_ERROR(
         db_options_.info_log,
@@ -2264,6 +2221,23 @@ Status BlobDBImpl::TEST_GetSequenceNumber(const Slice& key,
   auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily());
   return CommonGet(cfh->cfd(), key, index_entry, nullptr, sequence);
 }
+
+std::vector<std::shared_ptr<BlobFile>> BlobDBImpl::TEST_GetBlobFiles() const {
+  std::vector<std::shared_ptr<BlobFile>> blob_files;
+  for (auto& p : blob_files_) {
+    blob_files.emplace_back(p.second);
+  }
+  return blob_files;
+}
+
+void BlobDBImpl::TEST_CloseBlobFile(std::shared_ptr<BlobFile>& bfile) {
+  CloseSeqWrite(bfile, false /*abort*/);
+}
+
+Status BlobDBImpl::TEST_GCFileAndUpdateLSM(std::shared_ptr<BlobFile>& bfile,
+                                           GCStats* gc_stats) {
+  return GCFileAndUpdateLSM(bfile, gc_stats);
+}
 #endif  //  !NDEBUG
 
 }  // namespace blob_db
index a5c5822bb761919b4921c0728e032585024bbc65..8da5bbf652972417d2cb282431ed81d1e5aab3e6 100644 (file)
@@ -10,6 +10,7 @@
 #include <atomic>
 #include <condition_variable>
 #include <ctime>
+#include <limits>
 #include <list>
 #include <memory>
 #include <set>
@@ -45,7 +46,6 @@ namespace blob_db {
 
 class BlobFile;
 class BlobDBImpl;
-struct GCStats;
 
 class BlobDBFlushBeginListener : public EventListener {
  public:
@@ -134,6 +134,17 @@ struct blobf_compare_ttl {
                   const std::shared_ptr<BlobFile>& rhs) const;
 };
 
+struct GCStats {
+  uint64_t blob_count = 0;
+  uint64_t num_deletes = 0;
+  uint64_t deleted_size = 0;
+  uint64_t num_relocs = 0;
+  uint64_t succ_deletes_lsm = 0;
+  uint64_t overrided_while_delete = 0;
+  uint64_t succ_relocs = 0;
+  std::shared_ptr<BlobFile> newfile = nullptr;
+};
+
 /**
  * The implementation class for BlobDB. This manages the value
  * part in TTL aware sequentially written files. These files are
@@ -147,6 +158,9 @@ class BlobDBImpl : public BlobDB {
   friend class BlobDBIterator;
 
  public:
+  static constexpr uint64_t kNoExpiration =
+      std::numeric_limits<uint64_t>::max();
+
   using rocksdb::StackableDB::Put;
   Status Put(const WriteOptions& options, ColumnFamilyHandle* column_family,
              const Slice& key, const Slice& value) override;
@@ -200,12 +214,16 @@ class BlobDBImpl : public BlobDB {
 
 #ifndef NDEBUG
   Status TEST_GetSequenceNumber(const Slice& key, SequenceNumber* sequence);
+
+  std::vector<std::shared_ptr<BlobFile>> TEST_GetBlobFiles() const;
+
+  void TEST_CloseBlobFile(std::shared_ptr<BlobFile>& bfile);
+
+  Status TEST_GCFileAndUpdateLSM(std::shared_ptr<BlobFile>& bfile,
+                                 GCStats* gc_stats);
 #endif  //  !NDEBUG
 
  private:
-  static bool ExtractTTLFromBlob(const Slice& value, Slice* newval,
-                                 int32_t* ttl_val);
-
   Status OpenPhase1();
 
   Status CommonGet(const ColumnFamilyData* cfd, const Slice& key,
@@ -237,6 +255,9 @@ class BlobDBImpl : public BlobDB {
   // appends a task into timer queue to close the file
   void CloseIf(const std::shared_ptr<BlobFile>& bfile);
 
+  int32_t ExtractExpiration(const Slice& key, const Slice& value,
+                            Slice* value_slice, std::string* new_value);
+
   Status AppendBlob(const std::shared_ptr<BlobFile>& bfile,
                     const std::string& headerbuf, const Slice& key,
                     const Slice& value, std::string* index_entry);
@@ -346,11 +367,12 @@ class BlobDBImpl : public BlobDB {
       std::vector<std::shared_ptr<BlobFile>>* to_process, uint64_t epoch,
       uint64_t last_id, size_t files_to_collect);
 
- private:
+  uint64_t EpochNow() { return env_->NowMicros() / 1000000; }
+
   // the base DB
   DBImpl* db_impl_;
-
-  Env* myenv_;
+  Env* env_;
+  TTLExtractor* ttl_extractor_;
 
   // Optimistic Transaction DB used during Garbage collection
   // for atomicity
index 13ad7a2fa096aec61456471c8a70def33b8e93e3..6a43f6b77e9b7b50d042f11c797b4532414a9e47 100644 (file)
@@ -25,7 +25,22 @@ class BlobDBTest : public testing::Test {
  public:
   const int kMaxBlobSize = 1 << 14;
 
-  BlobDBTest() : dbname_(test::TmpDir() + "/blob_db_test"), blob_db_(nullptr) {
+  class MockEnv : public EnvWrapper {
+   public:
+    MockEnv() : EnvWrapper(Env::Default()) {}
+
+    void set_now_micros(uint64_t now_micros) { now_micros_ = now_micros; }
+
+    uint64_t NowMicros() override { return now_micros_; }
+
+   private:
+    uint64_t now_micros_ = 0;
+  };
+
+  BlobDBTest()
+      : dbname_(test::TmpDir() + "/blob_db_test"),
+        mock_env_(new MockEnv()),
+        blob_db_(nullptr) {
     Status s = DestroyBlobDB(dbname_, Options(), BlobDBOptions());
     assert(s.ok());
   }
@@ -59,9 +74,25 @@ class BlobDBTest : public testing::Test {
     }
   }
 
+  void PutRandomUntil(const std::string &key, int32_t expiration, Random *rnd,
+                      std::map<std::string, std::string> *data = nullptr) {
+    int len = rnd->Next() % kMaxBlobSize + 1;
+    std::string value = test::RandomHumanReadableString(rnd, len);
+    ASSERT_OK(blob_db_->PutUntil(WriteOptions(), Slice(key), Slice(value),
+                                 expiration));
+    if (data != nullptr) {
+      (*data)[key] = value;
+    }
+  }
+
   void PutRandom(const std::string &key, Random *rnd,
                  std::map<std::string, std::string> *data = nullptr) {
-    PutRandomWithTTL(key, -1, rnd, data);
+    int len = rnd->Next() % kMaxBlobSize + 1;
+    std::string value = test::RandomHumanReadableString(rnd, len);
+    ASSERT_OK(blob_db_->Put(WriteOptions(), Slice(key), Slice(value)));
+    if (data != nullptr) {
+      (*data)[key] = value;
+    }
   }
 
   void PutRandomToWriteBatch(
@@ -115,6 +146,8 @@ class BlobDBTest : public testing::Test {
   }
 
   const std::string dbname_;
+  std::unique_ptr<MockEnv> mock_env_;
+  std::shared_ptr<TTLExtractor> ttl_extractor_;
   BlobDB *blob_db_;
 };  // class BlobDBTest
 
@@ -130,6 +163,245 @@ TEST_F(BlobDBTest, Put) {
   VerifyDB(data);
 }
 
+TEST_F(BlobDBTest, PutWithTTL) {
+  Random rnd(301);
+  Options options;
+  options.env = mock_env_.get();
+  BlobDBOptionsImpl bdb_options;
+  bdb_options.ttl_range_secs = 1000;
+  bdb_options.blob_file_size = 256 * 1000 * 1000;
+  bdb_options.disable_background_tasks = true;
+  Open(bdb_options, options);
+  std::map<std::string, std::string> data;
+  mock_env_->set_now_micros(50 * 1000000);
+  for (size_t i = 0; i < 100; i++) {
+    int32_t ttl = rnd.Next() % 100;
+    PutRandomWithTTL("key" + ToString(i), ttl, &rnd,
+                     (ttl < 50 ? nullptr : &data));
+  }
+  mock_env_->set_now_micros(100 * 1000000);
+  auto *bdb_impl = static_cast<BlobDBImpl *>(blob_db_);
+  auto blob_files = bdb_impl->TEST_GetBlobFiles();
+  ASSERT_EQ(1, blob_files.size());
+  ASSERT_TRUE(blob_files[0]->HasTTL());
+  bdb_impl->TEST_CloseBlobFile(blob_files[0]);
+  GCStats gc_stats;
+  ASSERT_OK(bdb_impl->TEST_GCFileAndUpdateLSM(blob_files[0], &gc_stats));
+  ASSERT_EQ(100 - data.size(), gc_stats.num_deletes);
+  ASSERT_EQ(data.size(), gc_stats.num_relocs);
+  VerifyDB(data);
+}
+
+TEST_F(BlobDBTest, PutUntil) {
+  Random rnd(301);
+  Options options;
+  options.env = mock_env_.get();
+  BlobDBOptionsImpl bdb_options;
+  bdb_options.ttl_range_secs = 1000;
+  bdb_options.blob_file_size = 256 * 1000 * 1000;
+  bdb_options.disable_background_tasks = true;
+  Open(bdb_options, options);
+  std::map<std::string, std::string> data;
+  mock_env_->set_now_micros(50 * 1000000);
+  for (size_t i = 0; i < 100; i++) {
+    int32_t expiration = rnd.Next() % 100 + 50;
+    PutRandomUntil("key" + ToString(i), expiration, &rnd,
+                   (expiration < 100 ? nullptr : &data));
+  }
+  mock_env_->set_now_micros(100 * 1000000);
+  auto *bdb_impl = static_cast<BlobDBImpl *>(blob_db_);
+  auto blob_files = bdb_impl->TEST_GetBlobFiles();
+  ASSERT_EQ(1, blob_files.size());
+  ASSERT_TRUE(blob_files[0]->HasTTL());
+  bdb_impl->TEST_CloseBlobFile(blob_files[0]);
+  GCStats gc_stats;
+  ASSERT_OK(bdb_impl->TEST_GCFileAndUpdateLSM(blob_files[0], &gc_stats));
+  ASSERT_EQ(100 - data.size(), gc_stats.num_deletes);
+  ASSERT_EQ(data.size(), gc_stats.num_relocs);
+  VerifyDB(data);
+}
+
+TEST_F(BlobDBTest, TTLExtrator_NoTTL) {
+  // The default ttl extractor return no ttl for every key.
+  ttl_extractor_.reset(new TTLExtractor());
+  Random rnd(301);
+  Options options;
+  options.env = mock_env_.get();
+  BlobDBOptionsImpl bdb_options;
+  bdb_options.ttl_range_secs = 1000;
+  bdb_options.blob_file_size = 256 * 1000 * 1000;
+  bdb_options.num_concurrent_simple_blobs = 1;
+  bdb_options.ttl_extractor = ttl_extractor_;
+  bdb_options.disable_background_tasks = true;
+  Open(bdb_options, options);
+  std::map<std::string, std::string> data;
+  mock_env_->set_now_micros(0);
+  for (size_t i = 0; i < 100; i++) {
+    PutRandom("key" + ToString(i), &rnd, &data);
+  }
+  // very far in the future..
+  mock_env_->set_now_micros(std::numeric_limits<uint64_t>::max() - 10);
+  auto *bdb_impl = static_cast<BlobDBImpl *>(blob_db_);
+  auto blob_files = bdb_impl->TEST_GetBlobFiles();
+  ASSERT_EQ(1, blob_files.size());
+  ASSERT_FALSE(blob_files[0]->HasTTL());
+  bdb_impl->TEST_CloseBlobFile(blob_files[0]);
+  GCStats gc_stats;
+  ASSERT_OK(bdb_impl->TEST_GCFileAndUpdateLSM(blob_files[0], &gc_stats));
+  ASSERT_EQ(0, gc_stats.num_deletes);
+  ASSERT_EQ(100, gc_stats.num_relocs);
+  VerifyDB(data);
+}
+
+TEST_F(BlobDBTest, TTLExtractor_ExtractTTL) {
+  Random rnd(301);
+  class TestTTLExtractor : public TTLExtractor {
+   public:
+    explicit TestTTLExtractor(Random *r) : rnd(r) {}
+
+    virtual bool ExtractTTL(const Slice &key, const Slice &value, uint64_t *ttl,
+                            std::string * /*new_value*/,
+                            bool * /*value_changed*/) override {
+      *ttl = rnd->Next() % 100;
+      if (*ttl >= 50) {
+        data[key.ToString()] = value.ToString();
+      }
+      return true;
+    }
+
+    Random *rnd;
+    std::map<std::string, std::string> data;
+  };
+  ttl_extractor_.reset(new TestTTLExtractor(&rnd));
+  Options options;
+  options.env = mock_env_.get();
+  BlobDBOptionsImpl bdb_options;
+  bdb_options.ttl_range_secs = 1000;
+  bdb_options.blob_file_size = 256 * 1000 * 1000;
+  bdb_options.ttl_extractor = ttl_extractor_;
+  bdb_options.disable_background_tasks = true;
+  Open(bdb_options, options);
+  mock_env_->set_now_micros(50 * 1000000);
+  for (size_t i = 0; i < 100; i++) {
+    PutRandom("key" + ToString(i), &rnd);
+  }
+  mock_env_->set_now_micros(100 * 1000000);
+  auto *bdb_impl = static_cast<BlobDBImpl *>(blob_db_);
+  auto blob_files = bdb_impl->TEST_GetBlobFiles();
+  ASSERT_EQ(1, blob_files.size());
+  ASSERT_TRUE(blob_files[0]->HasTTL());
+  bdb_impl->TEST_CloseBlobFile(blob_files[0]);
+  GCStats gc_stats;
+  ASSERT_OK(bdb_impl->TEST_GCFileAndUpdateLSM(blob_files[0], &gc_stats));
+  auto &data = static_cast<TestTTLExtractor *>(ttl_extractor_.get())->data;
+  ASSERT_EQ(100 - data.size(), gc_stats.num_deletes);
+  ASSERT_EQ(data.size(), gc_stats.num_relocs);
+  VerifyDB(data);
+}
+
+TEST_F(BlobDBTest, TTLExtractor_ExtractExpiration) {
+  Random rnd(301);
+  class TestTTLExtractor : public TTLExtractor {
+   public:
+    explicit TestTTLExtractor(Random *r) : rnd(r) {}
+
+    virtual bool ExtractExpiration(const Slice &key, const Slice &value,
+                                   uint64_t /*now*/, uint64_t *expiration,
+                                   std::string * /*new_value*/,
+                                   bool * /*value_changed*/) override {
+      *expiration = rnd->Next() % 100 + 50;
+      if (*expiration >= 100) {
+        data[key.ToString()] = value.ToString();
+      }
+      return true;
+    }
+
+    Random *rnd;
+    std::map<std::string, std::string> data;
+  };
+  ttl_extractor_.reset(new TestTTLExtractor(&rnd));
+  Options options;
+  options.env = mock_env_.get();
+  BlobDBOptionsImpl bdb_options;
+  bdb_options.ttl_range_secs = 1000;
+  bdb_options.blob_file_size = 256 * 1000 * 1000;
+  bdb_options.ttl_extractor = ttl_extractor_;
+  bdb_options.disable_background_tasks = true;
+  Open(bdb_options, options);
+  mock_env_->set_now_micros(50 * 1000000);
+  for (size_t i = 0; i < 100; i++) {
+    PutRandom("key" + ToString(i), &rnd);
+  }
+  mock_env_->set_now_micros(100 * 1000000);
+  auto *bdb_impl = static_cast<BlobDBImpl *>(blob_db_);
+  auto blob_files = bdb_impl->TEST_GetBlobFiles();
+  ASSERT_EQ(1, blob_files.size());
+  ASSERT_TRUE(blob_files[0]->HasTTL());
+  bdb_impl->TEST_CloseBlobFile(blob_files[0]);
+  GCStats gc_stats;
+  ASSERT_OK(bdb_impl->TEST_GCFileAndUpdateLSM(blob_files[0], &gc_stats));
+  auto &data = static_cast<TestTTLExtractor *>(ttl_extractor_.get())->data;
+  ASSERT_EQ(100 - data.size(), gc_stats.num_deletes);
+  ASSERT_EQ(data.size(), gc_stats.num_relocs);
+  VerifyDB(data);
+}
+
+TEST_F(BlobDBTest, TTLExtractor_ChangeValue) {
+  class TestTTLExtractor : public TTLExtractor {
+   public:
+    const Slice kTTLSuffix = Slice("ttl:");
+
+    bool ExtractTTL(const Slice & /*key*/, const Slice &value, uint64_t *ttl,
+                    std::string *new_value, bool *value_changed) override {
+      if (value.size() < 12) {
+        return false;
+      }
+      const char *p = value.data() + value.size() - 12;
+      if (kTTLSuffix != Slice(p, 4)) {
+        return false;
+      }
+      *ttl = DecodeFixed64(p + 4);
+      *new_value = Slice(value.data(), value.size() - 12).ToString();
+      *value_changed = true;
+      return true;
+    }
+  };
+  Random rnd(301);
+  Options options;
+  options.env = mock_env_.get();
+  BlobDBOptionsImpl bdb_options;
+  bdb_options.ttl_range_secs = 1000;
+  bdb_options.blob_file_size = 256 * 1000 * 1000;
+  bdb_options.ttl_extractor = std::make_shared<TestTTLExtractor>();
+  bdb_options.disable_background_tasks = true;
+  Open(bdb_options, options);
+  std::map<std::string, std::string> data;
+  mock_env_->set_now_micros(50 * 1000000);
+  for (size_t i = 0; i < 100; i++) {
+    int len = rnd.Next() % kMaxBlobSize + 1;
+    std::string key = "key" + ToString(i);
+    std::string value = test::RandomHumanReadableString(&rnd, len);
+    uint64_t ttl = rnd.Next() % 100;
+    std::string value_ttl = value + "ttl:";
+    PutFixed64(&value_ttl, ttl);
+    ASSERT_OK(blob_db_->Put(WriteOptions(), Slice(key), Slice(value_ttl)));
+    if (ttl >= 50) {
+      data[key] = value;
+    }
+  }
+  mock_env_->set_now_micros(100 * 1000000);
+  auto *bdb_impl = static_cast<BlobDBImpl *>(blob_db_);
+  auto blob_files = bdb_impl->TEST_GetBlobFiles();
+  ASSERT_EQ(1, blob_files.size());
+  ASSERT_TRUE(blob_files[0]->HasTTL());
+  bdb_impl->TEST_CloseBlobFile(blob_files[0]);
+  GCStats gc_stats;
+  ASSERT_OK(bdb_impl->TEST_GCFileAndUpdateLSM(blob_files[0], &gc_stats));
+  ASSERT_EQ(100 - data.size(), gc_stats.num_deletes);
+  ASSERT_EQ(data.size(), gc_stats.num_relocs);
+  VerifyDB(data);
+}
+
 TEST_F(BlobDBTest, StackableDBGet) {
   Random rnd(301);
   BlobDBOptionsImpl bdb_options;
index b56cf205cc2e308c0e84d1a959163bb5ba4b20f5..f4e62fe2d9663534de1351707fea18486c0b2495 100644 (file)
@@ -11,6 +11,7 @@
 
 #include <cstddef>
 #include <cstdint>
+#include <limits>
 #include <memory>
 #include <string>
 #include <utility>
@@ -229,6 +230,10 @@ class BlobLogRecord {
 
   uint64_t GetBlobSize() const { return blob_size_; }
 
+  bool HasTTL() const {
+    return ttl_val_ != std::numeric_limits<uint32_t>::max();
+  }
+
   uint32_t GetTTL() const { return ttl_val_; }
 
   uint64_t GetTimeVal() const { return time_val_; }
diff --git a/utilities/blob_db/ttl_extractor.cc b/utilities/blob_db/ttl_extractor.cc
new file mode 100644 (file)
index 0000000..735b2f3
--- /dev/null
@@ -0,0 +1,31 @@
+//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
+//  This source code is licensed under both the GPLv2 (found in the
+//  COPYING file in the root directory) and Apache 2.0 License
+//  (found in the LICENSE.Apache file in the root directory).
+#include "ttl_extractor.h"
+
+#include "util/coding.h"
+
+namespace rocksdb {
+namespace blob_db {
+
+bool TTLExtractor::ExtractTTL(const Slice& /*key*/, const Slice& /*value*/,
+                              uint64_t* /*ttl*/, std::string* /*new_value*/,
+                              bool* /*value_changed*/) {
+  return false;
+}
+
+bool TTLExtractor::ExtractExpiration(const Slice& key, const Slice& value,
+                                     uint64_t now, uint64_t* expiration,
+                                     std::string* new_value,
+                                     bool* value_changed) {
+  uint64_t ttl;
+  bool has_ttl = ExtractTTL(key, value, &ttl, new_value, value_changed);
+  if (has_ttl) {
+    *expiration = now + ttl;
+  }
+  return has_ttl;
+}
+
+}  // namespace blob_db
+}  // namespace rocksdb
diff --git a/utilities/blob_db/ttl_extractor.h b/utilities/blob_db/ttl_extractor.h
new file mode 100644 (file)
index 0000000..51df944
--- /dev/null
@@ -0,0 +1,43 @@
+//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
+//  This source code is licensed under both the GPLv2 (found in the
+//  COPYING file in the root directory) and Apache 2.0 License
+//  (found in the LICENSE.Apache file in the root directory).
+#pragma once
+
+#include <memory>
+#include <string>
+
+#include "rocksdb/slice.h"
+
+namespace rocksdb {
+namespace blob_db {
+
+// TTLExtractor allow applications to extract TTL from key-value pairs.
+// This useful for applications using Put or WriteBatch to write keys and
+// don't intend to migrate to PutWithTTL or PutUntil.
+//
+// Applications can implement either ExtractTTL or ExtractExpiration. If both
+// are implemented, ExtractExpiration will take precedence.
+class TTLExtractor {
+ public:
+  // Extract TTL from key-value pair.
+  // Return true if the key has TTL, false otherwise. If key has TTL,
+  // TTL is pass back through ttl. The method can optionally modify the value,
+  // pass the result back through new_value, and also set value_changed to true.
+  virtual bool ExtractTTL(const Slice& key, const Slice& value, uint64_t* ttl,
+                          std::string* new_value, bool* value_changed);
+
+  // Extract expiration time from key-value pair.
+  // Return true if the key has expiration time, false otherwise. If key has
+  // expiration time, it is pass back through expiration. The method can
+  // optionally modify the value, pass the result back through new_value,
+  // and also set value_changed to true.
+  virtual bool ExtractExpiration(const Slice& key, const Slice& value,
+                                 uint64_t now, uint64_t* expiration,
+                                 std::string* new_value, bool* value_changed);
+
+  virtual ~TTLExtractor() = default;
+};
+
+}  // namespace blob_db
+}  // namespace rocksdb