]> git-server-git.apps.pok.os.sepia.ceph.com Git - rocksdb.git/commitdiff
Expose blob file information through the EventListener interface (#8675)
authorAkanksha Mahajan <akankshamahajan@fb.com>
Fri, 17 Sep 2021 00:17:40 +0000 (17:17 -0700)
committerFacebook GitHub Bot <facebook-github-bot@users.noreply.github.com>
Fri, 17 Sep 2021 00:23:36 +0000 (17:23 -0700)
Summary:
1. Extend FlushJobInfo and CompactionJobInfo with information about the blob files generated by flush/compaction jobs. This PR add two structures BlobFileInfo and BlobFileGarbageInfo that contains the required information of blob files.
 2. Notify the creation and deletion of blob files through OnBlobFileCreationStarted, OnBlobFileCreated, and OnBlobFileDeleted.
 3. Test OnFile*Finish operations notifications with Blob Files.
 4. Log the blob file creation/deletion events through EventLogger in Log file.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/8675

Test Plan: Add new unit tests in listener_test

Reviewed By: ltamasi

Differential Revision: D30412613

Pulled By: akankshamahajan15

fbshipit-source-id: ca51b63c6e8c8d0485a38c503572bc5a82bd5d07

19 files changed:
HISTORY.md
db/blob/blob_file_builder.cc
db/blob/blob_file_builder.h
db/blob/blob_file_builder_test.cc
db/blob/blob_file_completion_callback.h
db/builder.cc
db/builder.h
db/compaction/compaction_job.cc
db/db_impl/db_impl.cc
db/db_impl/db_impl_compaction_flush.cc
db/db_impl/db_impl_files.cc
db/db_impl/db_impl_open.cc
db/event_helpers.cc
db/event_helpers.h
db/flush_job.cc
db/listener_test.cc
db/repair.cc
include/rocksdb/listener.h
include/rocksdb/types.h

index 5d07ebaad386f9b81e59ad9d2fe7589bd20805df..eccbe3c976ba58f59427665809ef2444a185dd66 100644 (file)
 * `SstFileWriter` now supports `Put`s and `Delete`s with user-defined timestamps. Note that the ingestion logic itself is not timestamp-aware yet.
 * Allow a single write batch to include keys from multiple column families whose timestamps' formats can differ. For example, some column families may disable timestamp, while others enable timestamp.
 * Add compaction priority information in RemoteCompaction, which can be used to schedule high priority job first.
+* Added new callback APIs `OnBlobFileCreationStarted`,`OnBlobFileCreated`and `OnBlobFileDeleted` in `EventListener` class of listener.h. It notifies listeners during creation/deletion of individual blob files in Integrated BlobDB. It also log blob file creation finished event and deletion event in LOG file.
 
 ### Public API change
 * Remove obsolete implementation details FullKey and ParseFullKey from public API
 * Add a public API RateLimiter::GetTotalPendingRequests() for the total number of requests that are pending for bytes in the rate limiter.
+* Extended `FlushJobInfo` and `CompactionJobInfo` in listener.h to provide information about the blob files generated by a flush/compaction and garbage collected during compaction in Integrated BlobDB. Added struct members `blob_file_addition_infos` and `blob_file_garbage_infos` that contain this information.
+* Extended parameter `output_file_names` of `CompactFiles` API to also include paths of the blob files generated by the compaction in Integrated BlobDB.
 
 ## 6.24.0 (2021-08-20)
 ### Bug Fixes
index 4a3f3d4b02afa42c19f4a36111bdb9350cbf7a3e..48817984ab1099009bfa11f6719a41dffb0cf6b1 100644 (file)
@@ -12,6 +12,7 @@
 #include "db/blob/blob_index.h"
 #include "db/blob/blob_log_format.h"
 #include "db/blob/blob_log_writer.h"
+#include "db/event_helpers.h"
 #include "db/version_set.h"
 #include "file/filename.h"
 #include "file/read_write_util.h"
@@ -36,13 +37,14 @@ BlobFileBuilder::BlobFileBuilder(
     Env::WriteLifeTimeHint write_hint,
     const std::shared_ptr<IOTracer>& io_tracer,
     BlobFileCompletionCallback* blob_callback,
+    BlobFileCreationReason creation_reason,
     std::vector<std::string>* blob_file_paths,
     std::vector<BlobFileAddition>* blob_file_additions)
     : BlobFileBuilder([versions]() { return versions->NewFileNumber(); }, fs,
                       immutable_options, mutable_cf_options, file_options,
                       job_id, column_family_id, column_family_name, io_priority,
-                      write_hint, io_tracer, blob_callback, blob_file_paths,
-                      blob_file_additions) {}
+                      write_hint, io_tracer, blob_callback, creation_reason,
+                      blob_file_paths, blob_file_additions) {}
 
 BlobFileBuilder::BlobFileBuilder(
     std::function<uint64_t()> file_number_generator, FileSystem* fs,
@@ -53,6 +55,7 @@ BlobFileBuilder::BlobFileBuilder(
     Env::WriteLifeTimeHint write_hint,
     const std::shared_ptr<IOTracer>& io_tracer,
     BlobFileCompletionCallback* blob_callback,
+    BlobFileCreationReason creation_reason,
     std::vector<std::string>* blob_file_paths,
     std::vector<BlobFileAddition>* blob_file_additions)
     : file_number_generator_(std::move(file_number_generator)),
@@ -69,6 +72,7 @@ BlobFileBuilder::BlobFileBuilder(
       write_hint_(write_hint),
       io_tracer_(io_tracer),
       blob_callback_(blob_callback),
+      creation_reason_(creation_reason),
       blob_file_paths_(blob_file_paths),
       blob_file_additions_(blob_file_additions),
       blob_count_(0),
@@ -161,6 +165,11 @@ Status BlobFileBuilder::OpenBlobFileIfNeeded() {
   std::string blob_file_path =
       BlobFileName(immutable_options_->cf_paths.front().path, blob_file_number);
 
+  if (blob_callback_) {
+    blob_callback_->OnBlobFileCreationStarted(
+        blob_file_path, column_family_name_, job_id_, creation_reason_);
+  }
+
   std::unique_ptr<FSWritableFile> file;
 
   {
@@ -305,6 +314,13 @@ Status BlobFileBuilder::CloseBlobFile() {
 
   const uint64_t blob_file_number = writer_->get_log_number();
 
+  if (blob_callback_) {
+    s = blob_callback_->OnBlobFileCompleted(
+        blob_file_paths_->back(), column_family_name_, job_id_,
+        blob_file_number, creation_reason_, s, checksum_value, checksum_method,
+        blob_count_, blob_bytes_);
+  }
+
   assert(blob_file_additions_);
   blob_file_additions_->emplace_back(blob_file_number, blob_count_, blob_bytes_,
                                      std::move(checksum_method),
@@ -316,9 +332,6 @@ Status BlobFileBuilder::CloseBlobFile() {
                  " total blobs, %" PRIu64 " total bytes",
                  column_family_name_.c_str(), job_id_, blob_file_number,
                  blob_count_, blob_bytes_);
-  if (blob_callback_) {
-    s = blob_callback_->OnBlobFileCompleted(blob_file_paths_->back());
-  }
 
   writer_.reset();
   blob_count_ = 0;
@@ -340,15 +353,18 @@ Status BlobFileBuilder::CloseBlobFileIfNeeded() {
   return CloseBlobFile();
 }
 
-void BlobFileBuilder::Abandon() {
+void BlobFileBuilder::Abandon(const Status& s) {
   if (!IsBlobFileOpen()) {
     return;
   }
-
   if (blob_callback_) {
     // BlobFileBuilder::Abandon() is called because of error while writing to
     // Blob files. So we can ignore the below error.
-    blob_callback_->OnBlobFileCompleted(blob_file_paths_->back())
+    blob_callback_
+        ->OnBlobFileCompleted(blob_file_paths_->back(), column_family_name_,
+                              job_id_, writer_->get_log_number(),
+                              creation_reason_, s, "", "", blob_count_,
+                              blob_bytes_)
         .PermitUncheckedError();
   }
 
index 0929b6a775838a90330f38548609a37338cc015e..745af20eb450f188a4deffed01136df693c4154e 100644 (file)
@@ -13,6 +13,7 @@
 #include "rocksdb/compression_type.h"
 #include "rocksdb/env.h"
 #include "rocksdb/rocksdb_namespace.h"
+#include "rocksdb/types.h"
 
 namespace ROCKSDB_NAMESPACE {
 
@@ -41,6 +42,7 @@ class BlobFileBuilder {
                   Env::WriteLifeTimeHint write_hint,
                   const std::shared_ptr<IOTracer>& io_tracer,
                   BlobFileCompletionCallback* blob_callback,
+                  BlobFileCreationReason creation_reason,
                   std::vector<std::string>* blob_file_paths,
                   std::vector<BlobFileAddition>* blob_file_additions);
 
@@ -54,6 +56,7 @@ class BlobFileBuilder {
                   Env::WriteLifeTimeHint write_hint,
                   const std::shared_ptr<IOTracer>& io_tracer,
                   BlobFileCompletionCallback* blob_callback,
+                  BlobFileCreationReason creation_reason,
                   std::vector<std::string>* blob_file_paths,
                   std::vector<BlobFileAddition>* blob_file_additions);
 
@@ -64,7 +67,7 @@ class BlobFileBuilder {
 
   Status Add(const Slice& key, const Slice& value, std::string* blob_index);
   Status Finish();
-  void Abandon();
+  void Abandon(const Status& s);
 
  private:
   bool IsBlobFileOpen() const;
@@ -89,6 +92,7 @@ class BlobFileBuilder {
   Env::WriteLifeTimeHint write_hint_;
   std::shared_ptr<IOTracer> io_tracer_;
   BlobFileCompletionCallback* blob_callback_;
+  BlobFileCreationReason creation_reason_;
   std::vector<std::string>* blob_file_paths_;
   std::vector<BlobFileAddition>* blob_file_additions_;
   std::unique_ptr<BlobLogWriter> writer_;
index 08cfac00754551d6e292c2c11e625dc8e1fb7404..1b85d05e8ca81265800ab90d306d17b0bc0565aa 100644 (file)
@@ -145,7 +145,7 @@ TEST_F(BlobFileBuilderTest, BuildAndCheckOneFile) {
       TestFileNumberGenerator(), fs_, &immutable_options, &mutable_cf_options,
       &file_options_, job_id, column_family_id, column_family_name, io_priority,
       write_hint, nullptr /*IOTracer*/, nullptr /*BlobFileCompletionCallback*/,
-      &blob_file_paths, &blob_file_additions);
+      BlobFileCreationReason::kFlush, &blob_file_paths, &blob_file_additions);
 
   std::vector<std::pair<std::string, std::string>> expected_key_value_pairs(
       number_of_blobs);
@@ -229,7 +229,7 @@ TEST_F(BlobFileBuilderTest, BuildAndCheckMultipleFiles) {
       TestFileNumberGenerator(), fs_, &immutable_options, &mutable_cf_options,
       &file_options_, job_id, column_family_id, column_family_name, io_priority,
       write_hint, nullptr /*IOTracer*/, nullptr /*BlobFileCompletionCallback*/,
-      &blob_file_paths, &blob_file_additions);
+      BlobFileCreationReason::kFlush, &blob_file_paths, &blob_file_additions);
 
   std::vector<std::pair<std::string, std::string>> expected_key_value_pairs(
       number_of_blobs);
@@ -315,7 +315,7 @@ TEST_F(BlobFileBuilderTest, InlinedValues) {
       TestFileNumberGenerator(), fs_, &immutable_options, &mutable_cf_options,
       &file_options_, job_id, column_family_id, column_family_name, io_priority,
       write_hint, nullptr /*IOTracer*/, nullptr /*BlobFileCompletionCallback*/,
-      &blob_file_paths, &blob_file_additions);
+      BlobFileCreationReason::kFlush, &blob_file_paths, &blob_file_additions);
 
   for (size_t i = 0; i < number_of_blobs; ++i) {
     const std::string key = std::to_string(i);
@@ -368,7 +368,7 @@ TEST_F(BlobFileBuilderTest, Compression) {
       TestFileNumberGenerator(), fs_, &immutable_options, &mutable_cf_options,
       &file_options_, job_id, column_family_id, column_family_name, io_priority,
       write_hint, nullptr /*IOTracer*/, nullptr /*BlobFileCompletionCallback*/,
-      &blob_file_paths, &blob_file_additions);
+      BlobFileCreationReason::kFlush, &blob_file_paths, &blob_file_additions);
 
   const std::string key("1");
   const std::string uncompressed_value(value_size, 'x');
@@ -450,7 +450,7 @@ TEST_F(BlobFileBuilderTest, CompressionError) {
       TestFileNumberGenerator(), fs_, &immutable_options, &mutable_cf_options,
       &file_options_, job_id, column_family_id, column_family_name, io_priority,
       write_hint, nullptr /*IOTracer*/, nullptr /*BlobFileCompletionCallback*/,
-      &blob_file_paths, &blob_file_additions);
+      BlobFileCreationReason::kFlush, &blob_file_paths, &blob_file_additions);
 
   SyncPoint::GetInstance()->SetCallBack("CompressData:TamperWithReturnValue",
                                         [](void* arg) {
@@ -528,7 +528,7 @@ TEST_F(BlobFileBuilderTest, Checksum) {
       TestFileNumberGenerator(), fs_, &immutable_options, &mutable_cf_options,
       &file_options_, job_id, column_family_id, column_family_name, io_priority,
       write_hint, nullptr /*IOTracer*/, nullptr /*BlobFileCompletionCallback*/,
-      &blob_file_paths, &blob_file_additions);
+      BlobFileCreationReason::kFlush, &blob_file_paths, &blob_file_additions);
 
   const std::string key("1");
   const std::string value("deadbeef");
@@ -624,7 +624,7 @@ TEST_P(BlobFileBuilderIOErrorTest, IOError) {
       TestFileNumberGenerator(), fs_, &immutable_options, &mutable_cf_options,
       &file_options_, job_id, column_family_id, column_family_name, io_priority,
       write_hint, nullptr /*IOTracer*/, nullptr /*BlobFileCompletionCallback*/,
-      &blob_file_paths, &blob_file_additions);
+      BlobFileCreationReason::kFlush, &blob_file_paths, &blob_file_additions);
 
   SyncPoint::GetInstance()->SetCallBack(sync_point_, [this](void* arg) {
     Status* const s = static_cast<Status*>(arg);
index 42b6def893c713157f18834e4dc8c9983878d4b0..8c88de2121f92f9aecbaa2d442c6d25d7dce1dd6 100644 (file)
@@ -9,6 +9,7 @@
 #pragma once
 
 #include "db/error_handler.h"
+#include "db/event_helpers.h"
 #include "file/sst_file_manager_impl.h"
 #include "rocksdb/status.h"
 
@@ -16,23 +17,46 @@ namespace ROCKSDB_NAMESPACE {
 
 class BlobFileCompletionCallback {
  public:
-#ifdef ROCKSDB_LITE
-  BlobFileCompletionCallback(SstFileManager* /*sst_file_manager*/,
-                             InstrumentedMutex* /*mutex*/,
-                             ErrorHandler* /*error_handler*/) {}
-  Status OnBlobFileCompleted(const std::string& /*file_name*/) {
-    return Status::OK();
-  }
-#else
-  BlobFileCompletionCallback(SstFileManager* sst_file_manager,
-                             InstrumentedMutex* mutex,
-                             ErrorHandler* error_handler)
+  BlobFileCompletionCallback(
+      SstFileManager* sst_file_manager, InstrumentedMutex* mutex,
+      ErrorHandler* error_handler, EventLogger* event_logger,
+      const std::vector<std::shared_ptr<EventListener>>& listeners,
+      const std::string& dbname)
       : sst_file_manager_(sst_file_manager),
         mutex_(mutex),
-        error_handler_(error_handler) {}
+        error_handler_(error_handler),
+        event_logger_(event_logger),
+        listeners_(listeners),
+        dbname_(dbname) {}
 
-  Status OnBlobFileCompleted(const std::string& file_name) {
+  void OnBlobFileCreationStarted(const std::string& file_name,
+                                 const std::string& column_family_name,
+                                 int job_id,
+                                 BlobFileCreationReason creation_reason) {
+#ifndef ROCKSDB_LITE
+    // Notify the listeners.
+    EventHelpers::NotifyBlobFileCreationStarted(listeners_, dbname_,
+                                                column_family_name, file_name,
+                                                job_id, creation_reason);
+#else
+    (void)file_name;
+    (void)column_family_name;
+    (void)job_id;
+    (void)creation_reason;
+#endif
+  }
+
+  Status OnBlobFileCompleted(const std::string& file_name,
+                             const std::string& column_family_name, int job_id,
+                             uint64_t file_number,
+                             BlobFileCreationReason creation_reason,
+                             const Status& report_status,
+                             const std::string& checksum_value,
+                             const std::string& checksum_method,
+                             uint64_t blob_count, uint64_t blob_bytes) {
     Status s;
+
+#ifndef ROCKSDB_LITE
     auto sfm = static_cast<SstFileManagerImpl*>(sst_file_manager_);
     if (sfm) {
       // Report new blob files to SstFileManagerImpl
@@ -45,6 +69,17 @@ class BlobFileCompletionCallback {
         error_handler_->SetBGError(s, BackgroundErrorReason::kFlush);
       }
     }
+#endif  // !ROCKSDB_LITE
+
+    // Notify the listeners.
+    EventHelpers::LogAndNotifyBlobFileCreationFinished(
+        event_logger_, listeners_, dbname_, column_family_name, file_name,
+        job_id, file_number, creation_reason,
+        (!report_status.ok() ? report_status : s),
+        (checksum_value.empty() ? kUnknownFileChecksum : checksum_value),
+        (checksum_method.empty() ? kUnknownFileChecksumFuncName
+                                 : checksum_method),
+        blob_count, blob_bytes);
     return s;
   }
 
@@ -52,6 +87,8 @@ class BlobFileCompletionCallback {
   SstFileManager* sst_file_manager_;
   InstrumentedMutex* mutex_;
   ErrorHandler* error_handler_;
-#endif  // ROCKSDB_LITE
+  EventLogger* event_logger_;
+  std::vector<std::shared_ptr<EventListener>> listeners_;
+  std::string dbname_;
 };
 }  // namespace ROCKSDB_NAMESPACE
index 16e5744d1e95ef81c2a999535da1c7c9c2150398..0971e9dc0f45514b69621c056c6623121c3e0b66 100644 (file)
@@ -65,7 +65,8 @@ Status BuildTable(
     SequenceNumber earliest_write_conflict_snapshot,
     SnapshotChecker* snapshot_checker, bool paranoid_file_checks,
     InternalStats* internal_stats, IOStatus* io_status,
-    const std::shared_ptr<IOTracer>& io_tracer, EventLogger* event_logger,
+    const std::shared_ptr<IOTracer>& io_tracer,
+    BlobFileCreationReason blob_creation_reason, EventLogger* event_logger,
     int job_id, const Env::IOPriority io_priority,
     TableProperties* table_properties, Env::WriteLifeTimeHint write_hint,
     const std::string* full_history_ts_low,
@@ -178,12 +179,12 @@ Status BuildTable(
 
     std::unique_ptr<BlobFileBuilder> blob_file_builder(
         (mutable_cf_options.enable_blob_files && blob_file_additions)
-            ? new BlobFileBuilder(versions, fs, &ioptions, &mutable_cf_options,
-                                  &file_options, job_id,
-                                  tboptions.column_family_id,
-                                  tboptions.column_family_name, io_priority,
-                                  write_hint, io_tracer, blob_callback,
-                                  &blob_file_paths, blob_file_additions)
+            ? new BlobFileBuilder(
+                  versions, fs, &ioptions, &mutable_cf_options, &file_options,
+                  job_id, tboptions.column_family_id,
+                  tboptions.column_family_name, io_priority, write_hint,
+                  io_tracer, blob_callback, blob_creation_reason,
+                  &blob_file_paths, blob_file_additions)
             : nullptr);
 
     CompactionIterator c_iter(
@@ -311,7 +312,7 @@ Status BuildTable(
       if (s.ok()) {
         s = blob_file_builder->Finish();
       } else {
-        blob_file_builder->Abandon();
+        blob_file_builder->Abandon(s);
       }
       blob_file_builder.reset();
     }
index f8828f5c448dc4c870526abd93101ed97b68491f..c8f39b23742646a74d0fec3dc4956f40f236534f 100644 (file)
@@ -60,6 +60,7 @@ extern Status BuildTable(
     SnapshotChecker* snapshot_checker, bool paranoid_file_checks,
     InternalStats* internal_stats, IOStatus* io_status,
     const std::shared_ptr<IOTracer>& io_tracer,
+    BlobFileCreationReason blob_creation_reason,
     EventLogger* event_logger = nullptr, int job_id = 0,
     const Env::IOPriority io_priority = Env::IO_HIGH,
     TableProperties* table_properties = nullptr,
index 3e2ba08e76da6c0b6173069cbd6f01b352d96186..a89fccb996fae6aef827f249b60b8ee74cd96ad4 100644 (file)
@@ -1210,13 +1210,13 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
 
   std::unique_ptr<BlobFileBuilder> blob_file_builder(
       mutable_cf_options->enable_blob_files
-          ? new BlobFileBuilder(versions_, fs_.get(),
-                                sub_compact->compaction->immutable_options(),
-                                mutable_cf_options, &file_options_, job_id_,
-                                cfd->GetID(), cfd->GetName(),
-                                Env::IOPriority::IO_LOW, write_hint_,
-                                io_tracer_, blob_callback_, &blob_file_paths,
-                                &sub_compact->blob_file_additions)
+          ? new BlobFileBuilder(
+                versions_, fs_.get(),
+                sub_compact->compaction->immutable_options(),
+                mutable_cf_options, &file_options_, job_id_, cfd->GetID(),
+                cfd->GetName(), Env::IOPriority::IO_LOW, write_hint_,
+                io_tracer_, blob_callback_, BlobFileCreationReason::kCompaction,
+                &blob_file_paths, &sub_compact->blob_file_additions)
           : nullptr);
 
   TEST_SYNC_POINT("CompactionJob::Run():Inprogress");
@@ -1427,7 +1427,7 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
     if (status.ok()) {
       status = blob_file_builder->Finish();
     } else {
-      blob_file_builder->Abandon();
+      blob_file_builder->Abandon(status);
     }
     blob_file_builder.reset();
   }
index f23502b4749af9eb15db35863cd49c74db162dac..a3d5bb19bab9d5af9323ab8f7ccd9e6a937abfdc 100644 (file)
@@ -237,7 +237,8 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname,
       closed_(false),
       atomic_flush_install_cv_(&mutex_),
       blob_callback_(immutable_db_options_.sst_file_manager.get(), &mutex_,
-                     &error_handler_) {
+                     &error_handler_, &event_logger_,
+                     immutable_db_options_.listeners, dbname_) {
   // !batch_per_trx_ implies seq_per_batch_ because it is only unset for
   // WriteUnprepared, which should use seq_per_batch_.
   assert(batch_per_txn_ || seq_per_batch_);
index b1679d75657c8256fe2992c3dc6663a87b1e6ea3..590a2be8eb6219b52b4796adf8763e82092d6df9 100644 (file)
@@ -1353,10 +1353,15 @@ Status DBImpl::CompactFilesImpl(
 
   if (output_file_names != nullptr) {
     for (const auto& newf : c->edit()->GetNewFiles()) {
-      (*output_file_names)
-          .push_back(TableFileName(c->immutable_options()->cf_paths,
-                                   newf.second.fd.GetNumber(),
-                                   newf.second.fd.GetPathId()));
+      output_file_names->push_back(TableFileName(
+          c->immutable_options()->cf_paths, newf.second.fd.GetNumber(),
+          newf.second.fd.GetPathId()));
+    }
+
+    for (const auto& blob_file : c->edit()->GetBlobFileAdditions()) {
+      output_file_names->push_back(
+          BlobFileName(c->immutable_options()->cf_paths.front().path,
+                       blob_file.GetBlobFileNumber()));
     }
   }
 
@@ -3461,6 +3466,30 @@ void DBImpl::BuildCompactionJobInfo(
     compaction_job_info->output_file_infos.push_back(CompactionFileInfo{
         newf.first, file_number, meta.oldest_blob_file_number});
   }
+  compaction_job_info->blob_compression_type =
+      c->mutable_cf_options()->blob_compression_type;
+
+  // Update BlobFilesInfo.
+  for (const auto& blob_file : c->edit()->GetBlobFileAdditions()) {
+    BlobFileAdditionInfo blob_file_addition_info(
+        BlobFileName(c->immutable_options()->cf_paths.front().path,
+                     blob_file.GetBlobFileNumber()) /*blob_file_path*/,
+        blob_file.GetBlobFileNumber(), blob_file.GetTotalBlobCount(),
+        blob_file.GetTotalBlobBytes());
+    compaction_job_info->blob_file_addition_infos.emplace_back(
+        std::move(blob_file_addition_info));
+  }
+
+  // Update BlobFilesGarbageInfo.
+  for (const auto& blob_file : c->edit()->GetBlobFileGarbages()) {
+    BlobFileGarbageInfo blob_file_garbage_info(
+        BlobFileName(c->immutable_options()->cf_paths.front().path,
+                     blob_file.GetBlobFileNumber()) /*blob_file_path*/,
+        blob_file.GetBlobFileNumber(), blob_file.GetGarbageBlobCount(),
+        blob_file.GetGarbageBlobBytes());
+    compaction_job_info->blob_file_garbage_infos.emplace_back(
+        std::move(blob_file_garbage_info));
+  }
 }
 #endif
 
index 96aac73b32e89ac07f8a0eb686380cf6a34acca2..4eb46a2c8864cb65daed6511141a738907cf4fb0 100644 (file)
@@ -360,6 +360,11 @@ void DBImpl::DeleteObsoleteFileImpl(int job_id, const std::string& fname,
         &event_logger_, job_id, number, fname, file_deletion_status, GetName(),
         immutable_db_options_.listeners);
   }
+  if (type == kBlobFile) {
+    EventHelpers::LogAndNotifyBlobFileDeletion(
+        &event_logger_, immutable_db_options_.listeners, job_id, number, fname,
+        file_deletion_status, GetName());
+  }
 }
 
 // Diffs the files listed in filenames and those that do not
index 25e4bc5a383b0f2b690b23363e25643309fb62b8..ee40f7e51ec49a4de496f497a5049da246e2d1e1 100644 (file)
@@ -1424,8 +1424,9 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
           std::move(range_del_iters), &meta, &blob_file_additions,
           snapshot_seqs, earliest_write_conflict_snapshot, snapshot_checker,
           paranoid_file_checks, cfd->internal_stats(), &io_s, io_tracer_,
-          &event_logger_, job_id, Env::IO_HIGH, nullptr /* table_properties */,
-          write_hint, nullptr /*full_history_ts_low*/, &blob_callback_);
+          BlobFileCreationReason::kRecovery, &event_logger_, job_id,
+          Env::IO_HIGH, nullptr /* table_properties */, write_hint,
+          nullptr /*full_history_ts_low*/, &blob_callback_);
       LogFlush(immutable_db_options_.info_log);
       ROCKS_LOG_DEBUG(immutable_db_options_.info_log,
                       "[%s] [WriteLevel0TableForRecovery]"
index 0073302a046a4b830cff2b52adc0c835fc3c2eb4..fd1c6f013250854965c67240a0bf94fae8a06c11 100644 (file)
@@ -37,6 +37,9 @@ void EventHelpers::NotifyTableFileCreationStarted(
     const std::vector<std::shared_ptr<EventListener>>& listeners,
     const std::string& db_name, const std::string& cf_name,
     const std::string& file_path, int job_id, TableFileCreationReason reason) {
+  if (listeners.empty()) {
+    return;
+  }
   TableFileCreationBriefInfo info;
   info.db_name = db_name;
   info.cf_name = cf_name;
@@ -54,7 +57,7 @@ void EventHelpers::NotifyOnBackgroundError(
     BackgroundErrorReason reason, Status* bg_error, InstrumentedMutex* db_mutex,
     bool* auto_recovery) {
 #ifndef ROCKSDB_LITE
-  if (listeners.size() == 0U) {
+  if (listeners.empty()) {
     return;
   }
   db_mutex->AssertHeld();
@@ -163,7 +166,7 @@ void EventHelpers::LogAndNotifyTableFileCreationFinished(
   }
 
 #ifndef ROCKSDB_LITE
-  if (listeners.size() == 0) {
+  if (listeners.empty()) {
     return;
   }
   TableFileCreationInfo info;
@@ -210,6 +213,9 @@ void EventHelpers::LogAndNotifyTableFileDeletion(
   event_logger->Log(jwriter);
 
 #ifndef ROCKSDB_LITE
+  if (listeners.empty()) {
+    return;
+  }
   TableFileDeletionInfo info;
   info.db_name = dbname;
   info.job_id = job_id;
@@ -230,7 +236,7 @@ void EventHelpers::NotifyOnErrorRecoveryCompleted(
     const std::vector<std::shared_ptr<EventListener>>& listeners,
     Status old_bg_error, InstrumentedMutex* db_mutex) {
 #ifndef ROCKSDB_LITE
-  if (listeners.size() > 0) {
+  if (!listeners.empty()) {
     db_mutex->AssertHeld();
     // release lock while notifying events
     db_mutex->Unlock();
@@ -247,4 +253,98 @@ void EventHelpers::NotifyOnErrorRecoveryCompleted(
 #endif  // ROCKSDB_LITE
 }
 
+#ifndef ROCKSDB_LITE
+void EventHelpers::NotifyBlobFileCreationStarted(
+    const std::vector<std::shared_ptr<EventListener>>& listeners,
+    const std::string& db_name, const std::string& cf_name,
+    const std::string& file_path, int job_id,
+    BlobFileCreationReason creation_reason) {
+  if (listeners.empty()) {
+    return;
+  }
+  BlobFileCreationBriefInfo info(db_name, cf_name, file_path, job_id,
+                                 creation_reason);
+  for (const auto& listener : listeners) {
+    listener->OnBlobFileCreationStarted(info);
+  }
+}
+#endif  // !ROCKSDB_LITE
+
+void EventHelpers::LogAndNotifyBlobFileCreationFinished(
+    EventLogger* event_logger,
+    const std::vector<std::shared_ptr<EventListener>>& listeners,
+    const std::string& db_name, const std::string& cf_name,
+    const std::string& file_path, int job_id, uint64_t file_number,
+    BlobFileCreationReason creation_reason, const Status& s,
+    const std::string& file_checksum,
+    const std::string& file_checksum_func_name, uint64_t total_blob_count,
+    uint64_t total_blob_bytes) {
+  if (s.ok() && event_logger) {
+    JSONWriter jwriter;
+    AppendCurrentTime(&jwriter);
+    jwriter << "cf_name" << cf_name << "job" << job_id << "event"
+            << "blob_file_creation"
+            << "file_number" << file_number << "total_blob_count"
+            << total_blob_count << "total_blob_bytes" << total_blob_bytes
+            << "file_checksum" << file_checksum << "file_checksum_func_name"
+            << file_checksum_func_name << "status" << s.ToString();
+
+    jwriter.EndObject();
+    event_logger->Log(jwriter);
+  }
+
+#ifndef ROCKSDB_LITE
+  if (listeners.empty()) {
+    return;
+  }
+  BlobFileCreationInfo info(db_name, cf_name, file_path, job_id,
+                            creation_reason, total_blob_count, total_blob_bytes,
+                            s, file_checksum, file_checksum_func_name);
+  for (const auto& listener : listeners) {
+    listener->OnBlobFileCreated(info);
+  }
+  info.status.PermitUncheckedError();
+#else
+  (void)listeners;
+  (void)db_name;
+  (void)file_path;
+  (void)creation_reason;
+#endif
+}
+
+void EventHelpers::LogAndNotifyBlobFileDeletion(
+    EventLogger* event_logger,
+    const std::vector<std::shared_ptr<EventListener>>& listeners, int job_id,
+    uint64_t file_number, const std::string& file_path, const Status& status,
+    const std::string& dbname) {
+  if (event_logger) {
+    JSONWriter jwriter;
+    AppendCurrentTime(&jwriter);
+
+    jwriter << "job" << job_id << "event"
+            << "blob_file_deletion"
+            << "file_number" << file_number;
+    if (!status.ok()) {
+      jwriter << "status" << status.ToString();
+    }
+
+    jwriter.EndObject();
+    event_logger->Log(jwriter);
+  }
+#ifndef ROCKSDB_LITE
+  if (listeners.empty()) {
+    return;
+  }
+  BlobFileDeletionInfo info(dbname, file_path, job_id, status);
+  for (const auto& listener : listeners) {
+    listener->OnBlobFileDeleted(info);
+  }
+  info.status.PermitUncheckedError();
+#else
+  (void)listeners;
+  (void)dbname;
+  (void)file_path;
+#endif  // !ROCKSDB_LITE
+}
+
 }  // namespace ROCKSDB_NAMESPACE
index abc00981c1cd97e9903808dd5dc9b909d8fd7572..f8b7f1d517719f1b9f4c99bb5214a4e607fb846b 100644 (file)
@@ -47,6 +47,30 @@ class EventHelpers {
       const std::vector<std::shared_ptr<EventListener>>& listeners,
       Status bg_error, InstrumentedMutex* db_mutex);
 
+#ifndef ROCKSDB_LITE
+  static void NotifyBlobFileCreationStarted(
+      const std::vector<std::shared_ptr<EventListener>>& listeners,
+      const std::string& db_name, const std::string& cf_name,
+      const std::string& file_path, int job_id,
+      BlobFileCreationReason creation_reason);
+#endif  // !ROCKSDB_LITE
+
+  static void LogAndNotifyBlobFileCreationFinished(
+      EventLogger* event_logger,
+      const std::vector<std::shared_ptr<EventListener>>& listeners,
+      const std::string& db_name, const std::string& cf_name,
+      const std::string& file_path, int job_id, uint64_t file_number,
+      BlobFileCreationReason creation_reason, const Status& s,
+      const std::string& file_checksum,
+      const std::string& file_checksum_func_name, uint64_t total_blob_count,
+      uint64_t total_blob_bytes);
+
+  static void LogAndNotifyBlobFileDeletion(
+      EventLogger* event_logger,
+      const std::vector<std::shared_ptr<EventListener>>& listeners, int job_id,
+      uint64_t file_number, const std::string& file_path, const Status& status,
+      const std::string& db_name);
+
  private:
   static void LogAndNotifyTableFileCreation(
       EventLogger* event_logger,
index d520b709efc943268a3bc9d3cd6b97d6b768f119..a6b93f2b27de73dafd4beec23c32a9880d8bbcd8 100644 (file)
@@ -900,9 +900,10 @@ Status FlushJob::WriteLevel0Table() {
           &blob_file_additions, existing_snapshots_,
           earliest_write_conflict_snapshot_, snapshot_checker_,
           mutable_cf_options_.paranoid_file_checks, cfd_->internal_stats(),
-          &io_s, io_tracer_, event_logger_, job_context_->job_id, Env::IO_HIGH,
-          &table_properties_, write_hint, full_history_ts_low, blob_callback_,
-          &num_input_entries, &memtable_payload_bytes, &memtable_garbage_bytes);
+          &io_s, io_tracer_, BlobFileCreationReason::kFlush, event_logger_,
+          job_context_->job_id, Env::IO_HIGH, &table_properties_, write_hint,
+          full_history_ts_low, blob_callback_, &num_input_entries,
+          &memtable_payload_bytes, &memtable_garbage_bytes);
       if (!io_s.ok()) {
         io_status_ = io_s;
       }
@@ -1021,8 +1022,21 @@ std::unique_ptr<FlushJobInfo> FlushJob::GetFlushJobInfo() const {
   info->largest_seqno = meta_.fd.largest_seqno;
   info->table_properties = table_properties_;
   info->flush_reason = cfd_->GetFlushReason();
+  info->blob_compression_type = mutable_cf_options_.blob_compression_type;
+
+  // Update BlobFilesInfo.
+  for (const auto& blob_file : edit_->GetBlobFileAdditions()) {
+    BlobFileAdditionInfo blob_file_addition_info(
+        BlobFileName(cfd_->ioptions()->cf_paths.front().path,
+                     blob_file.GetBlobFileNumber()) /*blob_file_path*/,
+        blob_file.GetBlobFileNumber(), blob_file.GetTotalBlobCount(),
+        blob_file.GetTotalBlobBytes());
+    info->blob_file_addition_infos.emplace_back(
+        std::move(blob_file_addition_info));
+  }
   return info;
 }
+
 #endif  // !ROCKSDB_LITE
 
 }  // namespace ROCKSDB_NAMESPACE
index e9e6bc7ff577b0e7f24fce434a1be9a1e0d446c0..ce53d297bedc66191dd686abe758e13530b8358e 100644 (file)
@@ -1000,6 +1000,12 @@ class TestFileOperationListener : public EventListener {
     file_syncs_success_.store(0);
     file_truncates_.store(0);
     file_truncates_success_.store(0);
+    blob_file_reads_.store(0);
+    blob_file_writes_.store(0);
+    blob_file_flushes_.store(0);
+    blob_file_closes_.store(0);
+    blob_file_syncs_.store(0);
+    blob_file_truncates_.store(0);
   }
 
   void OnFileReadFinish(const FileOperationInfo& info) override {
@@ -1007,6 +1013,9 @@ class TestFileOperationListener : public EventListener {
     if (info.status.ok()) {
       ++file_reads_success_;
     }
+    if (EndsWith(info.path, ".blob")) {
+      ++blob_file_reads_;
+    }
     ReportDuration(info);
   }
 
@@ -1015,6 +1024,9 @@ class TestFileOperationListener : public EventListener {
     if (info.status.ok()) {
       ++file_writes_success_;
     }
+    if (EndsWith(info.path, ".blob")) {
+      ++blob_file_writes_;
+    }
     ReportDuration(info);
   }
 
@@ -1023,6 +1035,9 @@ class TestFileOperationListener : public EventListener {
     if (info.status.ok()) {
       ++file_flushes_success_;
     }
+    if (EndsWith(info.path, ".blob")) {
+      ++blob_file_flushes_;
+    }
     ReportDuration(info);
   }
 
@@ -1031,6 +1046,9 @@ class TestFileOperationListener : public EventListener {
     if (info.status.ok()) {
       ++file_closes_success_;
     }
+    if (EndsWith(info.path, ".blob")) {
+      ++blob_file_closes_;
+    }
     ReportDuration(info);
   }
 
@@ -1039,6 +1057,9 @@ class TestFileOperationListener : public EventListener {
     if (info.status.ok()) {
       ++file_syncs_success_;
     }
+    if (EndsWith(info.path, ".blob")) {
+      ++blob_file_syncs_;
+    }
     ReportDuration(info);
   }
 
@@ -1047,6 +1068,9 @@ class TestFileOperationListener : public EventListener {
     if (info.status.ok()) {
       ++file_truncates_success_;
     }
+    if (EndsWith(info.path, ".blob")) {
+      ++blob_file_truncates_;
+    }
     ReportDuration(info);
   }
 
@@ -1064,6 +1088,12 @@ class TestFileOperationListener : public EventListener {
   std::atomic<size_t> file_syncs_success_;
   std::atomic<size_t> file_truncates_;
   std::atomic<size_t> file_truncates_success_;
+  std::atomic<size_t> blob_file_reads_;
+  std::atomic<size_t> blob_file_writes_;
+  std::atomic<size_t> blob_file_flushes_;
+  std::atomic<size_t> blob_file_closes_;
+  std::atomic<size_t> blob_file_syncs_;
+  std::atomic<size_t> blob_file_truncates_;
 
  private:
   void ReportDuration(const FileOperationInfo& info) const {
@@ -1113,6 +1143,379 @@ TEST_F(EventListenerTest, OnFileOperationTest) {
   }
 }
 
+TEST_F(EventListenerTest, OnBlobFileOperationTest) {
+  Options options;
+  options.env = CurrentOptions().env;
+  options.create_if_missing = true;
+  TestFileOperationListener* listener = new TestFileOperationListener();
+  options.listeners.emplace_back(listener);
+  options.disable_auto_compactions = true;
+  options.enable_blob_files = true;
+  options.min_blob_size = 0;
+  options.enable_blob_garbage_collection = true;
+  options.blob_garbage_collection_age_cutoff = 0.5;
+
+  DestroyAndReopen(options);
+
+  ASSERT_OK(Put("Key1", "blob_value1"));
+  ASSERT_OK(Put("Key2", "blob_value2"));
+  ASSERT_OK(Put("Key3", "blob_value3"));
+  ASSERT_OK(Put("Key4", "blob_value4"));
+  ASSERT_OK(Flush());
+
+  ASSERT_OK(Put("Key3", "new_blob_value3"));
+  ASSERT_OK(Put("Key4", "new_blob_value4"));
+  ASSERT_OK(Flush());
+
+  ASSERT_OK(Put("Key5", "blob_value5"));
+  ASSERT_OK(Put("Key6", "blob_value6"));
+  ASSERT_OK(Flush());
+
+  ASSERT_GT(listener->blob_file_writes_.load(), 0U);
+  ASSERT_GT(listener->blob_file_flushes_.load(), 0U);
+  Close();
+
+  Reopen(options);
+  ASSERT_GT(listener->blob_file_closes_.load(), 0U);
+  ASSERT_GT(listener->blob_file_syncs_.load(), 0U);
+  if (true == options.use_direct_io_for_flush_and_compaction) {
+    ASSERT_GT(listener->blob_file_truncates_.load(), 0U);
+  }
+}
+
+class BlobDBJobLevelEventListenerTest : public EventListener {
+ public:
+  explicit BlobDBJobLevelEventListenerTest(EventListenerTest* test)
+      : test_(test), call_count_(0) {}
+
+  std::shared_ptr<BlobFileMetaData> GetBlobFileMetaData(
+      const VersionStorageInfo::BlobFiles& blob_files,
+      uint64_t blob_file_number) {
+    const auto it = blob_files.find(blob_file_number);
+
+    if (it == blob_files.end()) {
+      return nullptr;
+    }
+
+    const auto& meta = it->second;
+    assert(meta);
+
+    return meta;
+  }
+
+  const VersionStorageInfo::BlobFiles& GetBlobFiles() {
+    VersionSet* const versions = test_->dbfull()->TEST_GetVersionSet();
+    assert(versions);
+
+    ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault();
+    EXPECT_NE(cfd, nullptr);
+
+    Version* const current = cfd->current();
+    EXPECT_NE(current, nullptr);
+
+    const VersionStorageInfo* const storage_info = current->storage_info();
+    EXPECT_NE(storage_info, nullptr);
+
+    const auto& blob_files = storage_info->GetBlobFiles();
+    return blob_files;
+  }
+
+  std::vector<std::string> GetFlushedFiles() {
+    std::lock_guard<std::mutex> lock(mutex_);
+    std::vector<std::string> result;
+    for (const auto& fname : flushed_files_) {
+      result.push_back(fname);
+    }
+    return result;
+  }
+
+  void OnFlushCompleted(DB* /*db*/, const FlushJobInfo& info) override {
+    call_count_++;
+    EXPECT_FALSE(info.blob_file_addition_infos.empty());
+    const auto& blob_files = GetBlobFiles();
+    {
+      std::lock_guard<std::mutex> lock(mutex_);
+      flushed_files_.push_back(info.file_path);
+    }
+    EXPECT_EQ(info.blob_compression_type, kNoCompression);
+
+    for (const auto& blob_file_addition_info : info.blob_file_addition_infos) {
+      const auto meta = GetBlobFileMetaData(
+          blob_files, blob_file_addition_info.blob_file_number);
+      EXPECT_EQ(meta->GetBlobFileNumber(),
+                blob_file_addition_info.blob_file_number);
+      EXPECT_EQ(meta->GetTotalBlobBytes(),
+                blob_file_addition_info.total_blob_bytes);
+      EXPECT_EQ(meta->GetTotalBlobCount(),
+                blob_file_addition_info.total_blob_count);
+      EXPECT_FALSE(blob_file_addition_info.blob_file_path.empty());
+    }
+  }
+
+  void OnCompactionCompleted(DB* /*db*/, const CompactionJobInfo& ci) override {
+    call_count_++;
+    EXPECT_FALSE(ci.blob_file_garbage_infos.empty());
+    const auto& blob_files = GetBlobFiles();
+    EXPECT_EQ(ci.blob_compression_type, kNoCompression);
+
+    for (const auto& blob_file_addition_info : ci.blob_file_addition_infos) {
+      const auto meta = GetBlobFileMetaData(
+          blob_files, blob_file_addition_info.blob_file_number);
+      EXPECT_EQ(meta->GetBlobFileNumber(),
+                blob_file_addition_info.blob_file_number);
+      EXPECT_EQ(meta->GetTotalBlobBytes(),
+                blob_file_addition_info.total_blob_bytes);
+      EXPECT_EQ(meta->GetTotalBlobCount(),
+                blob_file_addition_info.total_blob_count);
+      EXPECT_FALSE(blob_file_addition_info.blob_file_path.empty());
+    }
+
+    for (const auto& blob_file_garbage_info : ci.blob_file_garbage_infos) {
+      EXPECT_GT(blob_file_garbage_info.blob_file_number, 0U);
+      EXPECT_GT(blob_file_garbage_info.garbage_blob_count, 0U);
+      EXPECT_GT(blob_file_garbage_info.garbage_blob_bytes, 0U);
+      EXPECT_FALSE(blob_file_garbage_info.blob_file_path.empty());
+    }
+  }
+
+  EventListenerTest* test_;
+  uint32_t call_count_;
+
+ private:
+  std::vector<std::string> flushed_files_;
+  std::mutex mutex_;
+};
+
+// Test OnFlushCompleted EventListener called for blob files
+TEST_F(EventListenerTest, BlobDBOnFlushCompleted) {
+  Options options;
+  options.env = CurrentOptions().env;
+  options.enable_blob_files = true;
+  options.create_if_missing = true;
+  options.disable_auto_compactions = true;
+
+  options.min_blob_size = 0;
+  BlobDBJobLevelEventListenerTest* blob_event_listener =
+      new BlobDBJobLevelEventListenerTest(this);
+  options.listeners.emplace_back(blob_event_listener);
+
+  DestroyAndReopen(options);
+
+  ASSERT_OK(Put("Key1", "blob_value1"));
+  ASSERT_OK(Put("Key2", "blob_value2"));
+  ASSERT_OK(Flush());
+
+  ASSERT_OK(Put("Key3", "blob_value3"));
+  ASSERT_OK(Flush());
+
+  ASSERT_EQ(Get("Key1"), "blob_value1");
+  ASSERT_EQ(Get("Key2"), "blob_value2");
+  ASSERT_EQ(Get("Key3"), "blob_value3");
+
+  ASSERT_GT(blob_event_listener->call_count_, 0U);
+}
+
+// Test OnCompactionCompleted EventListener called for blob files
+TEST_F(EventListenerTest, BlobDBOnCompactionCompleted) {
+  Options options;
+  options.env = CurrentOptions().env;
+  options.enable_blob_files = true;
+  options.create_if_missing = true;
+  options.disable_auto_compactions = true;
+  options.min_blob_size = 0;
+  BlobDBJobLevelEventListenerTest* blob_event_listener =
+      new BlobDBJobLevelEventListenerTest(this);
+  options.listeners.emplace_back(blob_event_listener);
+
+  options.enable_blob_garbage_collection = true;
+  options.blob_garbage_collection_age_cutoff = 0.5;
+
+  DestroyAndReopen(options);
+
+  ASSERT_OK(Put("Key1", "blob_value1"));
+  ASSERT_OK(Put("Key2", "blob_value2"));
+  ASSERT_OK(Put("Key3", "blob_value3"));
+  ASSERT_OK(Put("Key4", "blob_value4"));
+  ASSERT_OK(Flush());
+
+  ASSERT_OK(Put("Key3", "new_blob_value3"));
+  ASSERT_OK(Put("Key4", "new_blob_value4"));
+  ASSERT_OK(Flush());
+
+  ASSERT_OK(Put("Key5", "blob_value5"));
+  ASSERT_OK(Put("Key6", "blob_value6"));
+  ASSERT_OK(Flush());
+
+  blob_event_listener->call_count_ = 0;
+  constexpr Slice* begin = nullptr;
+  constexpr Slice* end = nullptr;
+
+  // On compaction, because of blob_garbage_collection_age_cutoff, it will
+  // delete the oldest blob file and create new blob file during compaction.
+  ASSERT_OK(db_->CompactRange(CompactRangeOptions(), begin, end));
+
+  // Make sure, OnCompactionCompleted is called.
+  ASSERT_GT(blob_event_listener->call_count_, 0U);
+}
+
+// Test CompactFiles calls OnCompactionCompleted EventListener for blob files
+// and populate the blob files info.
+TEST_F(EventListenerTest, BlobDBCompactFiles) {
+  Options options;
+  options.env = CurrentOptions().env;
+  options.enable_blob_files = true;
+  options.create_if_missing = true;
+  options.disable_auto_compactions = true;
+  options.min_blob_size = 0;
+  options.enable_blob_garbage_collection = true;
+  options.blob_garbage_collection_age_cutoff = 0.5;
+
+  BlobDBJobLevelEventListenerTest* blob_event_listener =
+      new BlobDBJobLevelEventListenerTest(this);
+  options.listeners.emplace_back(blob_event_listener);
+
+  DestroyAndReopen(options);
+
+  ASSERT_OK(Put("Key1", "blob_value1"));
+  ASSERT_OK(Put("Key2", "blob_value2"));
+  ASSERT_OK(Put("Key3", "blob_value3"));
+  ASSERT_OK(Put("Key4", "blob_value4"));
+  ASSERT_OK(Flush());
+
+  ASSERT_OK(Put("Key3", "new_blob_value3"));
+  ASSERT_OK(Put("Key4", "new_blob_value4"));
+  ASSERT_OK(Flush());
+
+  ASSERT_OK(Put("Key5", "blob_value5"));
+  ASSERT_OK(Put("Key6", "blob_value6"));
+  ASSERT_OK(Flush());
+
+  std::vector<std::string> output_file_names;
+  CompactionJobInfo compaction_job_info;
+
+  // On compaction, because of blob_garbage_collection_age_cutoff, it will
+  // delete the oldest blob file and create new blob file during compaction
+  // which will be populated in output_files_names.
+  ASSERT_OK(dbfull()->CompactFiles(
+      CompactionOptions(), blob_event_listener->GetFlushedFiles(), 1, -1,
+      &output_file_names, &compaction_job_info));
+
+  bool is_blob_in_output = false;
+  for (const auto& file : output_file_names) {
+    if (EndsWith(file, ".blob")) {
+      is_blob_in_output = true;
+    }
+  }
+  ASSERT_TRUE(is_blob_in_output);
+
+  for (const auto& blob_file_addition_info :
+       compaction_job_info.blob_file_addition_infos) {
+    EXPECT_GT(blob_file_addition_info.blob_file_number, 0U);
+    EXPECT_GT(blob_file_addition_info.total_blob_bytes, 0U);
+    EXPECT_GT(blob_file_addition_info.total_blob_count, 0U);
+    EXPECT_FALSE(blob_file_addition_info.blob_file_path.empty());
+  }
+
+  for (const auto& blob_file_garbage_info :
+       compaction_job_info.blob_file_garbage_infos) {
+    EXPECT_GT(blob_file_garbage_info.blob_file_number, 0U);
+    EXPECT_GT(blob_file_garbage_info.garbage_blob_count, 0U);
+    EXPECT_GT(blob_file_garbage_info.garbage_blob_bytes, 0U);
+    EXPECT_FALSE(blob_file_garbage_info.blob_file_path.empty());
+  }
+}
+
+class BlobDBFileLevelEventListener : public EventListener {
+ public:
+  BlobDBFileLevelEventListener() {
+    files_started_.store(0);
+    files_created_.store(0);
+    files_deleted_.store(0);
+  }
+
+  void OnBlobFileCreationStarted(
+      const BlobFileCreationBriefInfo& info) override {
+    files_started_++;
+    EXPECT_FALSE(info.db_name.empty());
+    EXPECT_FALSE(info.cf_name.empty());
+    EXPECT_FALSE(info.file_path.empty());
+    EXPECT_GT(info.job_id, 0);
+  }
+
+  void OnBlobFileCreated(const BlobFileCreationInfo& info) override {
+    files_created_++;
+    EXPECT_FALSE(info.db_name.empty());
+    EXPECT_FALSE(info.cf_name.empty());
+    EXPECT_FALSE(info.file_path.empty());
+    EXPECT_GT(info.job_id, 0);
+    EXPECT_GT(info.total_blob_count, 0U);
+    EXPECT_GT(info.total_blob_bytes, 0U);
+    EXPECT_EQ(info.file_checksum, kUnknownFileChecksum);
+    EXPECT_EQ(info.file_checksum_func_name, kUnknownFileChecksumFuncName);
+    EXPECT_TRUE(info.status.ok());
+  }
+
+  void OnBlobFileDeleted(const BlobFileDeletionInfo& info) override {
+    files_deleted_++;
+    EXPECT_FALSE(info.db_name.empty());
+    EXPECT_FALSE(info.file_path.empty());
+    EXPECT_GT(info.job_id, 0);
+    EXPECT_TRUE(info.status.ok());
+  }
+
+  void CheckCounters() {
+    EXPECT_EQ(files_started_, files_created_);
+    EXPECT_GT(files_started_, 0U);
+    EXPECT_GT(files_deleted_, 0U);
+    EXPECT_LT(files_deleted_, files_created_);
+  }
+
+ private:
+  std::atomic<uint32_t> files_started_;
+  std::atomic<uint32_t> files_created_;
+  std::atomic<uint32_t> files_deleted_;
+};
+
+TEST_F(EventListenerTest, BlobDBFileTest) {
+  Options options;
+  options.env = CurrentOptions().env;
+  options.enable_blob_files = true;
+  options.create_if_missing = true;
+  options.disable_auto_compactions = true;
+  options.min_blob_size = 0;
+  options.enable_blob_garbage_collection = true;
+  options.blob_garbage_collection_age_cutoff = 0.5;
+
+  BlobDBFileLevelEventListener* blob_event_listener =
+      new BlobDBFileLevelEventListener();
+  options.listeners.emplace_back(blob_event_listener);
+
+  DestroyAndReopen(options);
+
+  ASSERT_OK(Put("Key1", "blob_value1"));
+  ASSERT_OK(Put("Key2", "blob_value2"));
+  ASSERT_OK(Put("Key3", "blob_value3"));
+  ASSERT_OK(Put("Key4", "blob_value4"));
+  ASSERT_OK(Flush());
+
+  ASSERT_OK(Put("Key3", "new_blob_value3"));
+  ASSERT_OK(Put("Key4", "new_blob_value4"));
+  ASSERT_OK(Flush());
+
+  ASSERT_OK(Put("Key5", "blob_value5"));
+  ASSERT_OK(Put("Key6", "blob_value6"));
+  ASSERT_OK(Flush());
+
+  constexpr Slice* begin = nullptr;
+  constexpr Slice* end = nullptr;
+
+  // On compaction, because of blob_garbage_collection_age_cutoff, it will
+  // delete the oldest blob file and create new blob file during compaction.
+  ASSERT_OK(db_->CompactRange(CompactRangeOptions(), begin, end));
+
+  blob_event_listener->CheckCounters();
+}
+
 }  // namespace ROCKSDB_NAMESPACE
 
 #endif  // ROCKSDB_LITE
index 3efe63dfcc5363e59f13469238ea3ac142469044..42b874f45ae15764c0d73477f0e52a103d7c3cc5 100644 (file)
@@ -450,8 +450,9 @@ class Repairer {
           std::move(range_del_iters), &meta, nullptr /* blob_file_additions */,
           {}, kMaxSequenceNumber, snapshot_checker,
           false /* paranoid_file_checks*/, nullptr /* internal_stats */, &io_s,
-          nullptr /*IOTracer*/, nullptr /* event_logger */, 0 /* job_id */,
-          Env::IO_HIGH, nullptr /* table_properties */, write_hint);
+          nullptr /*IOTracer*/, BlobFileCreationReason::kRecovery,
+          nullptr /* event_logger */, 0 /* job_id */, Env::IO_HIGH,
+          nullptr /* table_properties */, write_hint);
       ROCKS_LOG_INFO(db_options_.info_log,
                      "Log #%" PRIu64 ": %d ops saved to Table #%" PRIu64 " %s",
                      log, counter, meta.fd.GetNumber(),
index a70836a272392157b0731a754e8eede413657176..c79de186d6c2faf925f144f5ba6790ad8dd8eedb 100644 (file)
@@ -29,8 +29,16 @@ class ColumnFamilyHandle;
 class Status;
 struct CompactionJobStats;
 
-struct TableFileCreationBriefInfo {
-  // the name of the database where the file was created
+struct FileCreationBriefInfo {
+  FileCreationBriefInfo() = default;
+  FileCreationBriefInfo(const std::string& _db_name,
+                        const std::string& _cf_name,
+                        const std::string& _file_path, int _job_id)
+      : db_name(_db_name),
+        cf_name(_cf_name),
+        file_path(_file_path),
+        job_id(_job_id) {}
+  // the name of the database where the file was created.
   std::string db_name;
   // the name of the column family where the file was created.
   std::string cf_name;
@@ -38,7 +46,10 @@ struct TableFileCreationBriefInfo {
   std::string file_path;
   // the id of the job (which could be flush or compaction) that
   // created the file.
-  int job_id;
+  int job_id = 0;
+};
+
+struct TableFileCreationBriefInfo : public FileCreationBriefInfo {
   // reason of creating the table.
   TableFileCreationReason reason;
 };
@@ -59,6 +70,44 @@ struct TableFileCreationInfo : public TableFileCreationBriefInfo {
   std::string file_checksum_func_name;
 };
 
+struct BlobFileCreationBriefInfo : public FileCreationBriefInfo {
+  BlobFileCreationBriefInfo(const std::string& _db_name,
+                            const std::string& _cf_name,
+                            const std::string& _file_path, int _job_id,
+                            BlobFileCreationReason _reason)
+      : FileCreationBriefInfo(_db_name, _cf_name, _file_path, _job_id),
+        reason(_reason) {}
+  // reason of creating the blob file.
+  BlobFileCreationReason reason;
+};
+
+struct BlobFileCreationInfo : public BlobFileCreationBriefInfo {
+  BlobFileCreationInfo(const std::string& _db_name, const std::string& _cf_name,
+                       const std::string& _file_path, int _job_id,
+                       BlobFileCreationReason _reason,
+                       uint64_t _total_blob_count, uint64_t _total_blob_bytes,
+                       Status _status, const std::string& _file_checksum,
+                       const std::string& _file_checksum_func_name)
+      : BlobFileCreationBriefInfo(_db_name, _cf_name, _file_path, _job_id,
+                                  _reason),
+        total_blob_count(_total_blob_count),
+        total_blob_bytes(_total_blob_bytes),
+        status(_status),
+        file_checksum(_file_checksum),
+        file_checksum_func_name(_file_checksum_func_name) {}
+
+  // the number of blob in a file.
+  uint64_t total_blob_count;
+  // the total bytes in a file.
+  uint64_t total_blob_bytes;
+  // The status indicating whether the creation was successful or not.
+  Status status;
+  // The checksum of the blob file being created.
+  std::string file_checksum;
+  // The checksum function name of checksum generator used for this blob file.
+  std::string file_checksum_func_name;
+};
+
 enum class CompactionReason : int {
   kUnknown = 0,
   // [Level] number of L0 files > level0_file_num_compaction_trigger
@@ -150,17 +199,34 @@ struct WriteStallInfo {
 
 #ifndef ROCKSDB_LITE
 
-struct TableFileDeletionInfo {
+struct FileDeletionInfo {
+  FileDeletionInfo() = default;
+
+  FileDeletionInfo(const std::string& _db_name, const std::string& _file_path,
+                   int _job_id, Status _status)
+      : db_name(_db_name),
+        file_path(_file_path),
+        job_id(_job_id),
+        status(_status) {}
   // The name of the database where the file was deleted.
   std::string db_name;
   // The path to the deleted file.
   std::string file_path;
   // The id of the job which deleted the file.
-  int job_id;
+  int job_id = 0;
   // The status indicating whether the deletion was successful or not.
   Status status;
 };
 
+struct TableFileDeletionInfo : public FileDeletionInfo {};
+
+struct BlobFileDeletionInfo : public FileDeletionInfo {
+  BlobFileDeletionInfo(const std::string& _db_name,
+                       const std::string& _file_path, int _job_id,
+                       Status _status)
+      : FileDeletionInfo(_db_name, _file_path, _job_id, _status) {}
+};
+
 enum class FileOperationType {
   kRead,
   kWrite,
@@ -206,6 +272,39 @@ struct FileOperationInfo {
   }
 };
 
+struct BlobFileInfo {
+  BlobFileInfo(const std::string& _blob_file_path,
+               const uint64_t _blob_file_number)
+      : blob_file_path(_blob_file_path), blob_file_number(_blob_file_number) {}
+
+  std::string blob_file_path;
+  uint64_t blob_file_number;
+};
+
+struct BlobFileAdditionInfo : public BlobFileInfo {
+  BlobFileAdditionInfo(const std::string& _blob_file_path,
+                       const uint64_t _blob_file_number,
+                       const uint64_t _total_blob_count,
+                       const uint64_t _total_blob_bytes)
+      : BlobFileInfo(_blob_file_path, _blob_file_number),
+        total_blob_count(_total_blob_count),
+        total_blob_bytes(_total_blob_bytes) {}
+  uint64_t total_blob_count;
+  uint64_t total_blob_bytes;
+};
+
+struct BlobFileGarbageInfo : public BlobFileInfo {
+  BlobFileGarbageInfo(const std::string& _blob_file_path,
+                      const uint64_t _blob_file_number,
+                      const uint64_t _garbage_blob_count,
+                      const uint64_t _garbage_blob_bytes)
+      : BlobFileInfo(_blob_file_path, _blob_file_number),
+        garbage_blob_count(_garbage_blob_count),
+        garbage_blob_bytes(_garbage_blob_bytes) {}
+  uint64_t garbage_blob_count;
+  uint64_t garbage_blob_bytes;
+};
+
 struct FlushJobInfo {
   // the id of the column family
   uint32_t cf_id;
@@ -239,6 +338,12 @@ struct FlushJobInfo {
   TableProperties table_properties;
 
   FlushReason flush_reason;
+
+  // Compression algorithm used for blob output files
+  CompressionType blob_compression_type;
+
+  // Information about blob files created during flush in Integrated BlobDB.
+  std::vector<BlobFileAdditionInfo> blob_file_addition_infos;
 };
 
 struct CompactionFileInfo {
@@ -299,6 +404,17 @@ struct CompactionJobInfo {
 
   // Statistics and other additional details on the compaction
   CompactionJobStats stats;
+
+  // Compression algorithm used for blob output files.
+  CompressionType blob_compression_type;
+
+  // Information about blob files created during compaction in Integrated
+  // BlobDB.
+  std::vector<BlobFileAdditionInfo> blob_file_addition_infos;
+
+  // Information about blob files deleted during compaction in Integrated
+  // BlobDB.
+  std::vector<BlobFileGarbageInfo> blob_file_garbage_infos;
 };
 
 struct MemTableInfo {
@@ -555,6 +671,34 @@ class EventListener : public Customizable {
   // initiate any further recovery actions needed
   virtual void OnErrorRecoveryCompleted(Status /* old_bg_error */) {}
 
+  // A callback function for RocksDB which will be called before
+  // a blob file is being created. It will follow by OnBlobFileCreated after
+  // the creation finishes.
+  //
+  // Note that if applications would like to use the passed reference
+  // outside this function call, they should make copies from these
+  // returned value.
+  virtual void OnBlobFileCreationStarted(
+      const BlobFileCreationBriefInfo& /*info*/) {}
+
+  // A callback function for RocksDB which will be called whenever
+  // a blob file is created.
+  // It will be called whether the file is successfully created or not. User can
+  // check info.status to see if it succeeded or not.
+  //
+  // Note that if applications would like to use the passed reference
+  // outside this function call, they should make copies from these
+  // returned value.
+  virtual void OnBlobFileCreated(const BlobFileCreationInfo& /*info*/) {}
+
+  // A callback function for RocksDB which will be called whenever
+  // a blob file is deleted.
+  //
+  // Note that if applications would like to use the passed reference
+  // outside this function call, they should make copies from these
+  // returned value.
+  virtual void OnBlobFileDeleted(const BlobFileDeletionInfo& /*info*/) {}
+
   virtual ~EventListener() {}
 };
 
index 14dc39ea3ae90c4c16bb07a06b1ea36b745a69df..20cff59b80fd030f6af49975fca60201d39793cd 100644 (file)
@@ -26,6 +26,12 @@ enum class TableFileCreationReason {
   kMisc,
 };
 
+enum class BlobFileCreationReason {
+  kFlush,
+  kCompaction,
+  kRecovery,
+};
+
 // The types of files RocksDB uses in a DB directory. (Available for
 // advanced options.)
 enum FileType {