* `SstFileWriter` now supports `Put`s and `Delete`s with user-defined timestamps. Note that the ingestion logic itself is not timestamp-aware yet.
* Allow a single write batch to include keys from multiple column families whose timestamps' formats can differ. For example, some column families may disable timestamp, while others enable timestamp.
* Add compaction priority information in RemoteCompaction, which can be used to schedule high priority job first.
+* Added new callback APIs `OnBlobFileCreationStarted`,`OnBlobFileCreated`and `OnBlobFileDeleted` in `EventListener` class of listener.h. It notifies listeners during creation/deletion of individual blob files in Integrated BlobDB. It also log blob file creation finished event and deletion event in LOG file.
### Public API change
* Remove obsolete implementation details FullKey and ParseFullKey from public API
* Add a public API RateLimiter::GetTotalPendingRequests() for the total number of requests that are pending for bytes in the rate limiter.
+* Extended `FlushJobInfo` and `CompactionJobInfo` in listener.h to provide information about the blob files generated by a flush/compaction and garbage collected during compaction in Integrated BlobDB. Added struct members `blob_file_addition_infos` and `blob_file_garbage_infos` that contain this information.
+* Extended parameter `output_file_names` of `CompactFiles` API to also include paths of the blob files generated by the compaction in Integrated BlobDB.
## 6.24.0 (2021-08-20)
### Bug Fixes
#include "db/blob/blob_index.h"
#include "db/blob/blob_log_format.h"
#include "db/blob/blob_log_writer.h"
+#include "db/event_helpers.h"
#include "db/version_set.h"
#include "file/filename.h"
#include "file/read_write_util.h"
Env::WriteLifeTimeHint write_hint,
const std::shared_ptr<IOTracer>& io_tracer,
BlobFileCompletionCallback* blob_callback,
+ BlobFileCreationReason creation_reason,
std::vector<std::string>* blob_file_paths,
std::vector<BlobFileAddition>* blob_file_additions)
: BlobFileBuilder([versions]() { return versions->NewFileNumber(); }, fs,
immutable_options, mutable_cf_options, file_options,
job_id, column_family_id, column_family_name, io_priority,
- write_hint, io_tracer, blob_callback, blob_file_paths,
- blob_file_additions) {}
+ write_hint, io_tracer, blob_callback, creation_reason,
+ blob_file_paths, blob_file_additions) {}
BlobFileBuilder::BlobFileBuilder(
std::function<uint64_t()> file_number_generator, FileSystem* fs,
Env::WriteLifeTimeHint write_hint,
const std::shared_ptr<IOTracer>& io_tracer,
BlobFileCompletionCallback* blob_callback,
+ BlobFileCreationReason creation_reason,
std::vector<std::string>* blob_file_paths,
std::vector<BlobFileAddition>* blob_file_additions)
: file_number_generator_(std::move(file_number_generator)),
write_hint_(write_hint),
io_tracer_(io_tracer),
blob_callback_(blob_callback),
+ creation_reason_(creation_reason),
blob_file_paths_(blob_file_paths),
blob_file_additions_(blob_file_additions),
blob_count_(0),
std::string blob_file_path =
BlobFileName(immutable_options_->cf_paths.front().path, blob_file_number);
+ if (blob_callback_) {
+ blob_callback_->OnBlobFileCreationStarted(
+ blob_file_path, column_family_name_, job_id_, creation_reason_);
+ }
+
std::unique_ptr<FSWritableFile> file;
{
const uint64_t blob_file_number = writer_->get_log_number();
+ if (blob_callback_) {
+ s = blob_callback_->OnBlobFileCompleted(
+ blob_file_paths_->back(), column_family_name_, job_id_,
+ blob_file_number, creation_reason_, s, checksum_value, checksum_method,
+ blob_count_, blob_bytes_);
+ }
+
assert(blob_file_additions_);
blob_file_additions_->emplace_back(blob_file_number, blob_count_, blob_bytes_,
std::move(checksum_method),
" total blobs, %" PRIu64 " total bytes",
column_family_name_.c_str(), job_id_, blob_file_number,
blob_count_, blob_bytes_);
- if (blob_callback_) {
- s = blob_callback_->OnBlobFileCompleted(blob_file_paths_->back());
- }
writer_.reset();
blob_count_ = 0;
return CloseBlobFile();
}
-void BlobFileBuilder::Abandon() {
+void BlobFileBuilder::Abandon(const Status& s) {
if (!IsBlobFileOpen()) {
return;
}
-
if (blob_callback_) {
// BlobFileBuilder::Abandon() is called because of error while writing to
// Blob files. So we can ignore the below error.
- blob_callback_->OnBlobFileCompleted(blob_file_paths_->back())
+ blob_callback_
+ ->OnBlobFileCompleted(blob_file_paths_->back(), column_family_name_,
+ job_id_, writer_->get_log_number(),
+ creation_reason_, s, "", "", blob_count_,
+ blob_bytes_)
.PermitUncheckedError();
}
#include "rocksdb/compression_type.h"
#include "rocksdb/env.h"
#include "rocksdb/rocksdb_namespace.h"
+#include "rocksdb/types.h"
namespace ROCKSDB_NAMESPACE {
Env::WriteLifeTimeHint write_hint,
const std::shared_ptr<IOTracer>& io_tracer,
BlobFileCompletionCallback* blob_callback,
+ BlobFileCreationReason creation_reason,
std::vector<std::string>* blob_file_paths,
std::vector<BlobFileAddition>* blob_file_additions);
Env::WriteLifeTimeHint write_hint,
const std::shared_ptr<IOTracer>& io_tracer,
BlobFileCompletionCallback* blob_callback,
+ BlobFileCreationReason creation_reason,
std::vector<std::string>* blob_file_paths,
std::vector<BlobFileAddition>* blob_file_additions);
Status Add(const Slice& key, const Slice& value, std::string* blob_index);
Status Finish();
- void Abandon();
+ void Abandon(const Status& s);
private:
bool IsBlobFileOpen() const;
Env::WriteLifeTimeHint write_hint_;
std::shared_ptr<IOTracer> io_tracer_;
BlobFileCompletionCallback* blob_callback_;
+ BlobFileCreationReason creation_reason_;
std::vector<std::string>* blob_file_paths_;
std::vector<BlobFileAddition>* blob_file_additions_;
std::unique_ptr<BlobLogWriter> writer_;
TestFileNumberGenerator(), fs_, &immutable_options, &mutable_cf_options,
&file_options_, job_id, column_family_id, column_family_name, io_priority,
write_hint, nullptr /*IOTracer*/, nullptr /*BlobFileCompletionCallback*/,
- &blob_file_paths, &blob_file_additions);
+ BlobFileCreationReason::kFlush, &blob_file_paths, &blob_file_additions);
std::vector<std::pair<std::string, std::string>> expected_key_value_pairs(
number_of_blobs);
TestFileNumberGenerator(), fs_, &immutable_options, &mutable_cf_options,
&file_options_, job_id, column_family_id, column_family_name, io_priority,
write_hint, nullptr /*IOTracer*/, nullptr /*BlobFileCompletionCallback*/,
- &blob_file_paths, &blob_file_additions);
+ BlobFileCreationReason::kFlush, &blob_file_paths, &blob_file_additions);
std::vector<std::pair<std::string, std::string>> expected_key_value_pairs(
number_of_blobs);
TestFileNumberGenerator(), fs_, &immutable_options, &mutable_cf_options,
&file_options_, job_id, column_family_id, column_family_name, io_priority,
write_hint, nullptr /*IOTracer*/, nullptr /*BlobFileCompletionCallback*/,
- &blob_file_paths, &blob_file_additions);
+ BlobFileCreationReason::kFlush, &blob_file_paths, &blob_file_additions);
for (size_t i = 0; i < number_of_blobs; ++i) {
const std::string key = std::to_string(i);
TestFileNumberGenerator(), fs_, &immutable_options, &mutable_cf_options,
&file_options_, job_id, column_family_id, column_family_name, io_priority,
write_hint, nullptr /*IOTracer*/, nullptr /*BlobFileCompletionCallback*/,
- &blob_file_paths, &blob_file_additions);
+ BlobFileCreationReason::kFlush, &blob_file_paths, &blob_file_additions);
const std::string key("1");
const std::string uncompressed_value(value_size, 'x');
TestFileNumberGenerator(), fs_, &immutable_options, &mutable_cf_options,
&file_options_, job_id, column_family_id, column_family_name, io_priority,
write_hint, nullptr /*IOTracer*/, nullptr /*BlobFileCompletionCallback*/,
- &blob_file_paths, &blob_file_additions);
+ BlobFileCreationReason::kFlush, &blob_file_paths, &blob_file_additions);
SyncPoint::GetInstance()->SetCallBack("CompressData:TamperWithReturnValue",
[](void* arg) {
TestFileNumberGenerator(), fs_, &immutable_options, &mutable_cf_options,
&file_options_, job_id, column_family_id, column_family_name, io_priority,
write_hint, nullptr /*IOTracer*/, nullptr /*BlobFileCompletionCallback*/,
- &blob_file_paths, &blob_file_additions);
+ BlobFileCreationReason::kFlush, &blob_file_paths, &blob_file_additions);
const std::string key("1");
const std::string value("deadbeef");
TestFileNumberGenerator(), fs_, &immutable_options, &mutable_cf_options,
&file_options_, job_id, column_family_id, column_family_name, io_priority,
write_hint, nullptr /*IOTracer*/, nullptr /*BlobFileCompletionCallback*/,
- &blob_file_paths, &blob_file_additions);
+ BlobFileCreationReason::kFlush, &blob_file_paths, &blob_file_additions);
SyncPoint::GetInstance()->SetCallBack(sync_point_, [this](void* arg) {
Status* const s = static_cast<Status*>(arg);
#pragma once
#include "db/error_handler.h"
+#include "db/event_helpers.h"
#include "file/sst_file_manager_impl.h"
#include "rocksdb/status.h"
class BlobFileCompletionCallback {
public:
-#ifdef ROCKSDB_LITE
- BlobFileCompletionCallback(SstFileManager* /*sst_file_manager*/,
- InstrumentedMutex* /*mutex*/,
- ErrorHandler* /*error_handler*/) {}
- Status OnBlobFileCompleted(const std::string& /*file_name*/) {
- return Status::OK();
- }
-#else
- BlobFileCompletionCallback(SstFileManager* sst_file_manager,
- InstrumentedMutex* mutex,
- ErrorHandler* error_handler)
+ BlobFileCompletionCallback(
+ SstFileManager* sst_file_manager, InstrumentedMutex* mutex,
+ ErrorHandler* error_handler, EventLogger* event_logger,
+ const std::vector<std::shared_ptr<EventListener>>& listeners,
+ const std::string& dbname)
: sst_file_manager_(sst_file_manager),
mutex_(mutex),
- error_handler_(error_handler) {}
+ error_handler_(error_handler),
+ event_logger_(event_logger),
+ listeners_(listeners),
+ dbname_(dbname) {}
- Status OnBlobFileCompleted(const std::string& file_name) {
+ void OnBlobFileCreationStarted(const std::string& file_name,
+ const std::string& column_family_name,
+ int job_id,
+ BlobFileCreationReason creation_reason) {
+#ifndef ROCKSDB_LITE
+ // Notify the listeners.
+ EventHelpers::NotifyBlobFileCreationStarted(listeners_, dbname_,
+ column_family_name, file_name,
+ job_id, creation_reason);
+#else
+ (void)file_name;
+ (void)column_family_name;
+ (void)job_id;
+ (void)creation_reason;
+#endif
+ }
+
+ Status OnBlobFileCompleted(const std::string& file_name,
+ const std::string& column_family_name, int job_id,
+ uint64_t file_number,
+ BlobFileCreationReason creation_reason,
+ const Status& report_status,
+ const std::string& checksum_value,
+ const std::string& checksum_method,
+ uint64_t blob_count, uint64_t blob_bytes) {
Status s;
+
+#ifndef ROCKSDB_LITE
auto sfm = static_cast<SstFileManagerImpl*>(sst_file_manager_);
if (sfm) {
// Report new blob files to SstFileManagerImpl
error_handler_->SetBGError(s, BackgroundErrorReason::kFlush);
}
}
+#endif // !ROCKSDB_LITE
+
+ // Notify the listeners.
+ EventHelpers::LogAndNotifyBlobFileCreationFinished(
+ event_logger_, listeners_, dbname_, column_family_name, file_name,
+ job_id, file_number, creation_reason,
+ (!report_status.ok() ? report_status : s),
+ (checksum_value.empty() ? kUnknownFileChecksum : checksum_value),
+ (checksum_method.empty() ? kUnknownFileChecksumFuncName
+ : checksum_method),
+ blob_count, blob_bytes);
return s;
}
SstFileManager* sst_file_manager_;
InstrumentedMutex* mutex_;
ErrorHandler* error_handler_;
-#endif // ROCKSDB_LITE
+ EventLogger* event_logger_;
+ std::vector<std::shared_ptr<EventListener>> listeners_;
+ std::string dbname_;
};
} // namespace ROCKSDB_NAMESPACE
SequenceNumber earliest_write_conflict_snapshot,
SnapshotChecker* snapshot_checker, bool paranoid_file_checks,
InternalStats* internal_stats, IOStatus* io_status,
- const std::shared_ptr<IOTracer>& io_tracer, EventLogger* event_logger,
+ const std::shared_ptr<IOTracer>& io_tracer,
+ BlobFileCreationReason blob_creation_reason, EventLogger* event_logger,
int job_id, const Env::IOPriority io_priority,
TableProperties* table_properties, Env::WriteLifeTimeHint write_hint,
const std::string* full_history_ts_low,
std::unique_ptr<BlobFileBuilder> blob_file_builder(
(mutable_cf_options.enable_blob_files && blob_file_additions)
- ? new BlobFileBuilder(versions, fs, &ioptions, &mutable_cf_options,
- &file_options, job_id,
- tboptions.column_family_id,
- tboptions.column_family_name, io_priority,
- write_hint, io_tracer, blob_callback,
- &blob_file_paths, blob_file_additions)
+ ? new BlobFileBuilder(
+ versions, fs, &ioptions, &mutable_cf_options, &file_options,
+ job_id, tboptions.column_family_id,
+ tboptions.column_family_name, io_priority, write_hint,
+ io_tracer, blob_callback, blob_creation_reason,
+ &blob_file_paths, blob_file_additions)
: nullptr);
CompactionIterator c_iter(
if (s.ok()) {
s = blob_file_builder->Finish();
} else {
- blob_file_builder->Abandon();
+ blob_file_builder->Abandon(s);
}
blob_file_builder.reset();
}
SnapshotChecker* snapshot_checker, bool paranoid_file_checks,
InternalStats* internal_stats, IOStatus* io_status,
const std::shared_ptr<IOTracer>& io_tracer,
+ BlobFileCreationReason blob_creation_reason,
EventLogger* event_logger = nullptr, int job_id = 0,
const Env::IOPriority io_priority = Env::IO_HIGH,
TableProperties* table_properties = nullptr,
std::unique_ptr<BlobFileBuilder> blob_file_builder(
mutable_cf_options->enable_blob_files
- ? new BlobFileBuilder(versions_, fs_.get(),
- sub_compact->compaction->immutable_options(),
- mutable_cf_options, &file_options_, job_id_,
- cfd->GetID(), cfd->GetName(),
- Env::IOPriority::IO_LOW, write_hint_,
- io_tracer_, blob_callback_, &blob_file_paths,
- &sub_compact->blob_file_additions)
+ ? new BlobFileBuilder(
+ versions_, fs_.get(),
+ sub_compact->compaction->immutable_options(),
+ mutable_cf_options, &file_options_, job_id_, cfd->GetID(),
+ cfd->GetName(), Env::IOPriority::IO_LOW, write_hint_,
+ io_tracer_, blob_callback_, BlobFileCreationReason::kCompaction,
+ &blob_file_paths, &sub_compact->blob_file_additions)
: nullptr);
TEST_SYNC_POINT("CompactionJob::Run():Inprogress");
if (status.ok()) {
status = blob_file_builder->Finish();
} else {
- blob_file_builder->Abandon();
+ blob_file_builder->Abandon(status);
}
blob_file_builder.reset();
}
closed_(false),
atomic_flush_install_cv_(&mutex_),
blob_callback_(immutable_db_options_.sst_file_manager.get(), &mutex_,
- &error_handler_) {
+ &error_handler_, &event_logger_,
+ immutable_db_options_.listeners, dbname_) {
// !batch_per_trx_ implies seq_per_batch_ because it is only unset for
// WriteUnprepared, which should use seq_per_batch_.
assert(batch_per_txn_ || seq_per_batch_);
if (output_file_names != nullptr) {
for (const auto& newf : c->edit()->GetNewFiles()) {
- (*output_file_names)
- .push_back(TableFileName(c->immutable_options()->cf_paths,
- newf.second.fd.GetNumber(),
- newf.second.fd.GetPathId()));
+ output_file_names->push_back(TableFileName(
+ c->immutable_options()->cf_paths, newf.second.fd.GetNumber(),
+ newf.second.fd.GetPathId()));
+ }
+
+ for (const auto& blob_file : c->edit()->GetBlobFileAdditions()) {
+ output_file_names->push_back(
+ BlobFileName(c->immutable_options()->cf_paths.front().path,
+ blob_file.GetBlobFileNumber()));
}
}
compaction_job_info->output_file_infos.push_back(CompactionFileInfo{
newf.first, file_number, meta.oldest_blob_file_number});
}
+ compaction_job_info->blob_compression_type =
+ c->mutable_cf_options()->blob_compression_type;
+
+ // Update BlobFilesInfo.
+ for (const auto& blob_file : c->edit()->GetBlobFileAdditions()) {
+ BlobFileAdditionInfo blob_file_addition_info(
+ BlobFileName(c->immutable_options()->cf_paths.front().path,
+ blob_file.GetBlobFileNumber()) /*blob_file_path*/,
+ blob_file.GetBlobFileNumber(), blob_file.GetTotalBlobCount(),
+ blob_file.GetTotalBlobBytes());
+ compaction_job_info->blob_file_addition_infos.emplace_back(
+ std::move(blob_file_addition_info));
+ }
+
+ // Update BlobFilesGarbageInfo.
+ for (const auto& blob_file : c->edit()->GetBlobFileGarbages()) {
+ BlobFileGarbageInfo blob_file_garbage_info(
+ BlobFileName(c->immutable_options()->cf_paths.front().path,
+ blob_file.GetBlobFileNumber()) /*blob_file_path*/,
+ blob_file.GetBlobFileNumber(), blob_file.GetGarbageBlobCount(),
+ blob_file.GetGarbageBlobBytes());
+ compaction_job_info->blob_file_garbage_infos.emplace_back(
+ std::move(blob_file_garbage_info));
+ }
}
#endif
&event_logger_, job_id, number, fname, file_deletion_status, GetName(),
immutable_db_options_.listeners);
}
+ if (type == kBlobFile) {
+ EventHelpers::LogAndNotifyBlobFileDeletion(
+ &event_logger_, immutable_db_options_.listeners, job_id, number, fname,
+ file_deletion_status, GetName());
+ }
}
// Diffs the files listed in filenames and those that do not
std::move(range_del_iters), &meta, &blob_file_additions,
snapshot_seqs, earliest_write_conflict_snapshot, snapshot_checker,
paranoid_file_checks, cfd->internal_stats(), &io_s, io_tracer_,
- &event_logger_, job_id, Env::IO_HIGH, nullptr /* table_properties */,
- write_hint, nullptr /*full_history_ts_low*/, &blob_callback_);
+ BlobFileCreationReason::kRecovery, &event_logger_, job_id,
+ Env::IO_HIGH, nullptr /* table_properties */, write_hint,
+ nullptr /*full_history_ts_low*/, &blob_callback_);
LogFlush(immutable_db_options_.info_log);
ROCKS_LOG_DEBUG(immutable_db_options_.info_log,
"[%s] [WriteLevel0TableForRecovery]"
const std::vector<std::shared_ptr<EventListener>>& listeners,
const std::string& db_name, const std::string& cf_name,
const std::string& file_path, int job_id, TableFileCreationReason reason) {
+ if (listeners.empty()) {
+ return;
+ }
TableFileCreationBriefInfo info;
info.db_name = db_name;
info.cf_name = cf_name;
BackgroundErrorReason reason, Status* bg_error, InstrumentedMutex* db_mutex,
bool* auto_recovery) {
#ifndef ROCKSDB_LITE
- if (listeners.size() == 0U) {
+ if (listeners.empty()) {
return;
}
db_mutex->AssertHeld();
}
#ifndef ROCKSDB_LITE
- if (listeners.size() == 0) {
+ if (listeners.empty()) {
return;
}
TableFileCreationInfo info;
event_logger->Log(jwriter);
#ifndef ROCKSDB_LITE
+ if (listeners.empty()) {
+ return;
+ }
TableFileDeletionInfo info;
info.db_name = dbname;
info.job_id = job_id;
const std::vector<std::shared_ptr<EventListener>>& listeners,
Status old_bg_error, InstrumentedMutex* db_mutex) {
#ifndef ROCKSDB_LITE
- if (listeners.size() > 0) {
+ if (!listeners.empty()) {
db_mutex->AssertHeld();
// release lock while notifying events
db_mutex->Unlock();
#endif // ROCKSDB_LITE
}
+#ifndef ROCKSDB_LITE
+void EventHelpers::NotifyBlobFileCreationStarted(
+ const std::vector<std::shared_ptr<EventListener>>& listeners,
+ const std::string& db_name, const std::string& cf_name,
+ const std::string& file_path, int job_id,
+ BlobFileCreationReason creation_reason) {
+ if (listeners.empty()) {
+ return;
+ }
+ BlobFileCreationBriefInfo info(db_name, cf_name, file_path, job_id,
+ creation_reason);
+ for (const auto& listener : listeners) {
+ listener->OnBlobFileCreationStarted(info);
+ }
+}
+#endif // !ROCKSDB_LITE
+
+void EventHelpers::LogAndNotifyBlobFileCreationFinished(
+ EventLogger* event_logger,
+ const std::vector<std::shared_ptr<EventListener>>& listeners,
+ const std::string& db_name, const std::string& cf_name,
+ const std::string& file_path, int job_id, uint64_t file_number,
+ BlobFileCreationReason creation_reason, const Status& s,
+ const std::string& file_checksum,
+ const std::string& file_checksum_func_name, uint64_t total_blob_count,
+ uint64_t total_blob_bytes) {
+ if (s.ok() && event_logger) {
+ JSONWriter jwriter;
+ AppendCurrentTime(&jwriter);
+ jwriter << "cf_name" << cf_name << "job" << job_id << "event"
+ << "blob_file_creation"
+ << "file_number" << file_number << "total_blob_count"
+ << total_blob_count << "total_blob_bytes" << total_blob_bytes
+ << "file_checksum" << file_checksum << "file_checksum_func_name"
+ << file_checksum_func_name << "status" << s.ToString();
+
+ jwriter.EndObject();
+ event_logger->Log(jwriter);
+ }
+
+#ifndef ROCKSDB_LITE
+ if (listeners.empty()) {
+ return;
+ }
+ BlobFileCreationInfo info(db_name, cf_name, file_path, job_id,
+ creation_reason, total_blob_count, total_blob_bytes,
+ s, file_checksum, file_checksum_func_name);
+ for (const auto& listener : listeners) {
+ listener->OnBlobFileCreated(info);
+ }
+ info.status.PermitUncheckedError();
+#else
+ (void)listeners;
+ (void)db_name;
+ (void)file_path;
+ (void)creation_reason;
+#endif
+}
+
+void EventHelpers::LogAndNotifyBlobFileDeletion(
+ EventLogger* event_logger,
+ const std::vector<std::shared_ptr<EventListener>>& listeners, int job_id,
+ uint64_t file_number, const std::string& file_path, const Status& status,
+ const std::string& dbname) {
+ if (event_logger) {
+ JSONWriter jwriter;
+ AppendCurrentTime(&jwriter);
+
+ jwriter << "job" << job_id << "event"
+ << "blob_file_deletion"
+ << "file_number" << file_number;
+ if (!status.ok()) {
+ jwriter << "status" << status.ToString();
+ }
+
+ jwriter.EndObject();
+ event_logger->Log(jwriter);
+ }
+#ifndef ROCKSDB_LITE
+ if (listeners.empty()) {
+ return;
+ }
+ BlobFileDeletionInfo info(dbname, file_path, job_id, status);
+ for (const auto& listener : listeners) {
+ listener->OnBlobFileDeleted(info);
+ }
+ info.status.PermitUncheckedError();
+#else
+ (void)listeners;
+ (void)dbname;
+ (void)file_path;
+#endif // !ROCKSDB_LITE
+}
+
} // namespace ROCKSDB_NAMESPACE
const std::vector<std::shared_ptr<EventListener>>& listeners,
Status bg_error, InstrumentedMutex* db_mutex);
+#ifndef ROCKSDB_LITE
+ static void NotifyBlobFileCreationStarted(
+ const std::vector<std::shared_ptr<EventListener>>& listeners,
+ const std::string& db_name, const std::string& cf_name,
+ const std::string& file_path, int job_id,
+ BlobFileCreationReason creation_reason);
+#endif // !ROCKSDB_LITE
+
+ static void LogAndNotifyBlobFileCreationFinished(
+ EventLogger* event_logger,
+ const std::vector<std::shared_ptr<EventListener>>& listeners,
+ const std::string& db_name, const std::string& cf_name,
+ const std::string& file_path, int job_id, uint64_t file_number,
+ BlobFileCreationReason creation_reason, const Status& s,
+ const std::string& file_checksum,
+ const std::string& file_checksum_func_name, uint64_t total_blob_count,
+ uint64_t total_blob_bytes);
+
+ static void LogAndNotifyBlobFileDeletion(
+ EventLogger* event_logger,
+ const std::vector<std::shared_ptr<EventListener>>& listeners, int job_id,
+ uint64_t file_number, const std::string& file_path, const Status& status,
+ const std::string& db_name);
+
private:
static void LogAndNotifyTableFileCreation(
EventLogger* event_logger,
&blob_file_additions, existing_snapshots_,
earliest_write_conflict_snapshot_, snapshot_checker_,
mutable_cf_options_.paranoid_file_checks, cfd_->internal_stats(),
- &io_s, io_tracer_, event_logger_, job_context_->job_id, Env::IO_HIGH,
- &table_properties_, write_hint, full_history_ts_low, blob_callback_,
- &num_input_entries, &memtable_payload_bytes, &memtable_garbage_bytes);
+ &io_s, io_tracer_, BlobFileCreationReason::kFlush, event_logger_,
+ job_context_->job_id, Env::IO_HIGH, &table_properties_, write_hint,
+ full_history_ts_low, blob_callback_, &num_input_entries,
+ &memtable_payload_bytes, &memtable_garbage_bytes);
if (!io_s.ok()) {
io_status_ = io_s;
}
info->largest_seqno = meta_.fd.largest_seqno;
info->table_properties = table_properties_;
info->flush_reason = cfd_->GetFlushReason();
+ info->blob_compression_type = mutable_cf_options_.blob_compression_type;
+
+ // Update BlobFilesInfo.
+ for (const auto& blob_file : edit_->GetBlobFileAdditions()) {
+ BlobFileAdditionInfo blob_file_addition_info(
+ BlobFileName(cfd_->ioptions()->cf_paths.front().path,
+ blob_file.GetBlobFileNumber()) /*blob_file_path*/,
+ blob_file.GetBlobFileNumber(), blob_file.GetTotalBlobCount(),
+ blob_file.GetTotalBlobBytes());
+ info->blob_file_addition_infos.emplace_back(
+ std::move(blob_file_addition_info));
+ }
return info;
}
+
#endif // !ROCKSDB_LITE
} // namespace ROCKSDB_NAMESPACE
file_syncs_success_.store(0);
file_truncates_.store(0);
file_truncates_success_.store(0);
+ blob_file_reads_.store(0);
+ blob_file_writes_.store(0);
+ blob_file_flushes_.store(0);
+ blob_file_closes_.store(0);
+ blob_file_syncs_.store(0);
+ blob_file_truncates_.store(0);
}
void OnFileReadFinish(const FileOperationInfo& info) override {
if (info.status.ok()) {
++file_reads_success_;
}
+ if (EndsWith(info.path, ".blob")) {
+ ++blob_file_reads_;
+ }
ReportDuration(info);
}
if (info.status.ok()) {
++file_writes_success_;
}
+ if (EndsWith(info.path, ".blob")) {
+ ++blob_file_writes_;
+ }
ReportDuration(info);
}
if (info.status.ok()) {
++file_flushes_success_;
}
+ if (EndsWith(info.path, ".blob")) {
+ ++blob_file_flushes_;
+ }
ReportDuration(info);
}
if (info.status.ok()) {
++file_closes_success_;
}
+ if (EndsWith(info.path, ".blob")) {
+ ++blob_file_closes_;
+ }
ReportDuration(info);
}
if (info.status.ok()) {
++file_syncs_success_;
}
+ if (EndsWith(info.path, ".blob")) {
+ ++blob_file_syncs_;
+ }
ReportDuration(info);
}
if (info.status.ok()) {
++file_truncates_success_;
}
+ if (EndsWith(info.path, ".blob")) {
+ ++blob_file_truncates_;
+ }
ReportDuration(info);
}
std::atomic<size_t> file_syncs_success_;
std::atomic<size_t> file_truncates_;
std::atomic<size_t> file_truncates_success_;
+ std::atomic<size_t> blob_file_reads_;
+ std::atomic<size_t> blob_file_writes_;
+ std::atomic<size_t> blob_file_flushes_;
+ std::atomic<size_t> blob_file_closes_;
+ std::atomic<size_t> blob_file_syncs_;
+ std::atomic<size_t> blob_file_truncates_;
private:
void ReportDuration(const FileOperationInfo& info) const {
}
}
+TEST_F(EventListenerTest, OnBlobFileOperationTest) {
+ Options options;
+ options.env = CurrentOptions().env;
+ options.create_if_missing = true;
+ TestFileOperationListener* listener = new TestFileOperationListener();
+ options.listeners.emplace_back(listener);
+ options.disable_auto_compactions = true;
+ options.enable_blob_files = true;
+ options.min_blob_size = 0;
+ options.enable_blob_garbage_collection = true;
+ options.blob_garbage_collection_age_cutoff = 0.5;
+
+ DestroyAndReopen(options);
+
+ ASSERT_OK(Put("Key1", "blob_value1"));
+ ASSERT_OK(Put("Key2", "blob_value2"));
+ ASSERT_OK(Put("Key3", "blob_value3"));
+ ASSERT_OK(Put("Key4", "blob_value4"));
+ ASSERT_OK(Flush());
+
+ ASSERT_OK(Put("Key3", "new_blob_value3"));
+ ASSERT_OK(Put("Key4", "new_blob_value4"));
+ ASSERT_OK(Flush());
+
+ ASSERT_OK(Put("Key5", "blob_value5"));
+ ASSERT_OK(Put("Key6", "blob_value6"));
+ ASSERT_OK(Flush());
+
+ ASSERT_GT(listener->blob_file_writes_.load(), 0U);
+ ASSERT_GT(listener->blob_file_flushes_.load(), 0U);
+ Close();
+
+ Reopen(options);
+ ASSERT_GT(listener->blob_file_closes_.load(), 0U);
+ ASSERT_GT(listener->blob_file_syncs_.load(), 0U);
+ if (true == options.use_direct_io_for_flush_and_compaction) {
+ ASSERT_GT(listener->blob_file_truncates_.load(), 0U);
+ }
+}
+
+class BlobDBJobLevelEventListenerTest : public EventListener {
+ public:
+ explicit BlobDBJobLevelEventListenerTest(EventListenerTest* test)
+ : test_(test), call_count_(0) {}
+
+ std::shared_ptr<BlobFileMetaData> GetBlobFileMetaData(
+ const VersionStorageInfo::BlobFiles& blob_files,
+ uint64_t blob_file_number) {
+ const auto it = blob_files.find(blob_file_number);
+
+ if (it == blob_files.end()) {
+ return nullptr;
+ }
+
+ const auto& meta = it->second;
+ assert(meta);
+
+ return meta;
+ }
+
+ const VersionStorageInfo::BlobFiles& GetBlobFiles() {
+ VersionSet* const versions = test_->dbfull()->TEST_GetVersionSet();
+ assert(versions);
+
+ ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault();
+ EXPECT_NE(cfd, nullptr);
+
+ Version* const current = cfd->current();
+ EXPECT_NE(current, nullptr);
+
+ const VersionStorageInfo* const storage_info = current->storage_info();
+ EXPECT_NE(storage_info, nullptr);
+
+ const auto& blob_files = storage_info->GetBlobFiles();
+ return blob_files;
+ }
+
+ std::vector<std::string> GetFlushedFiles() {
+ std::lock_guard<std::mutex> lock(mutex_);
+ std::vector<std::string> result;
+ for (const auto& fname : flushed_files_) {
+ result.push_back(fname);
+ }
+ return result;
+ }
+
+ void OnFlushCompleted(DB* /*db*/, const FlushJobInfo& info) override {
+ call_count_++;
+ EXPECT_FALSE(info.blob_file_addition_infos.empty());
+ const auto& blob_files = GetBlobFiles();
+ {
+ std::lock_guard<std::mutex> lock(mutex_);
+ flushed_files_.push_back(info.file_path);
+ }
+ EXPECT_EQ(info.blob_compression_type, kNoCompression);
+
+ for (const auto& blob_file_addition_info : info.blob_file_addition_infos) {
+ const auto meta = GetBlobFileMetaData(
+ blob_files, blob_file_addition_info.blob_file_number);
+ EXPECT_EQ(meta->GetBlobFileNumber(),
+ blob_file_addition_info.blob_file_number);
+ EXPECT_EQ(meta->GetTotalBlobBytes(),
+ blob_file_addition_info.total_blob_bytes);
+ EXPECT_EQ(meta->GetTotalBlobCount(),
+ blob_file_addition_info.total_blob_count);
+ EXPECT_FALSE(blob_file_addition_info.blob_file_path.empty());
+ }
+ }
+
+ void OnCompactionCompleted(DB* /*db*/, const CompactionJobInfo& ci) override {
+ call_count_++;
+ EXPECT_FALSE(ci.blob_file_garbage_infos.empty());
+ const auto& blob_files = GetBlobFiles();
+ EXPECT_EQ(ci.blob_compression_type, kNoCompression);
+
+ for (const auto& blob_file_addition_info : ci.blob_file_addition_infos) {
+ const auto meta = GetBlobFileMetaData(
+ blob_files, blob_file_addition_info.blob_file_number);
+ EXPECT_EQ(meta->GetBlobFileNumber(),
+ blob_file_addition_info.blob_file_number);
+ EXPECT_EQ(meta->GetTotalBlobBytes(),
+ blob_file_addition_info.total_blob_bytes);
+ EXPECT_EQ(meta->GetTotalBlobCount(),
+ blob_file_addition_info.total_blob_count);
+ EXPECT_FALSE(blob_file_addition_info.blob_file_path.empty());
+ }
+
+ for (const auto& blob_file_garbage_info : ci.blob_file_garbage_infos) {
+ EXPECT_GT(blob_file_garbage_info.blob_file_number, 0U);
+ EXPECT_GT(blob_file_garbage_info.garbage_blob_count, 0U);
+ EXPECT_GT(blob_file_garbage_info.garbage_blob_bytes, 0U);
+ EXPECT_FALSE(blob_file_garbage_info.blob_file_path.empty());
+ }
+ }
+
+ EventListenerTest* test_;
+ uint32_t call_count_;
+
+ private:
+ std::vector<std::string> flushed_files_;
+ std::mutex mutex_;
+};
+
+// Test OnFlushCompleted EventListener called for blob files
+TEST_F(EventListenerTest, BlobDBOnFlushCompleted) {
+ Options options;
+ options.env = CurrentOptions().env;
+ options.enable_blob_files = true;
+ options.create_if_missing = true;
+ options.disable_auto_compactions = true;
+
+ options.min_blob_size = 0;
+ BlobDBJobLevelEventListenerTest* blob_event_listener =
+ new BlobDBJobLevelEventListenerTest(this);
+ options.listeners.emplace_back(blob_event_listener);
+
+ DestroyAndReopen(options);
+
+ ASSERT_OK(Put("Key1", "blob_value1"));
+ ASSERT_OK(Put("Key2", "blob_value2"));
+ ASSERT_OK(Flush());
+
+ ASSERT_OK(Put("Key3", "blob_value3"));
+ ASSERT_OK(Flush());
+
+ ASSERT_EQ(Get("Key1"), "blob_value1");
+ ASSERT_EQ(Get("Key2"), "blob_value2");
+ ASSERT_EQ(Get("Key3"), "blob_value3");
+
+ ASSERT_GT(blob_event_listener->call_count_, 0U);
+}
+
+// Test OnCompactionCompleted EventListener called for blob files
+TEST_F(EventListenerTest, BlobDBOnCompactionCompleted) {
+ Options options;
+ options.env = CurrentOptions().env;
+ options.enable_blob_files = true;
+ options.create_if_missing = true;
+ options.disable_auto_compactions = true;
+ options.min_blob_size = 0;
+ BlobDBJobLevelEventListenerTest* blob_event_listener =
+ new BlobDBJobLevelEventListenerTest(this);
+ options.listeners.emplace_back(blob_event_listener);
+
+ options.enable_blob_garbage_collection = true;
+ options.blob_garbage_collection_age_cutoff = 0.5;
+
+ DestroyAndReopen(options);
+
+ ASSERT_OK(Put("Key1", "blob_value1"));
+ ASSERT_OK(Put("Key2", "blob_value2"));
+ ASSERT_OK(Put("Key3", "blob_value3"));
+ ASSERT_OK(Put("Key4", "blob_value4"));
+ ASSERT_OK(Flush());
+
+ ASSERT_OK(Put("Key3", "new_blob_value3"));
+ ASSERT_OK(Put("Key4", "new_blob_value4"));
+ ASSERT_OK(Flush());
+
+ ASSERT_OK(Put("Key5", "blob_value5"));
+ ASSERT_OK(Put("Key6", "blob_value6"));
+ ASSERT_OK(Flush());
+
+ blob_event_listener->call_count_ = 0;
+ constexpr Slice* begin = nullptr;
+ constexpr Slice* end = nullptr;
+
+ // On compaction, because of blob_garbage_collection_age_cutoff, it will
+ // delete the oldest blob file and create new blob file during compaction.
+ ASSERT_OK(db_->CompactRange(CompactRangeOptions(), begin, end));
+
+ // Make sure, OnCompactionCompleted is called.
+ ASSERT_GT(blob_event_listener->call_count_, 0U);
+}
+
+// Test CompactFiles calls OnCompactionCompleted EventListener for blob files
+// and populate the blob files info.
+TEST_F(EventListenerTest, BlobDBCompactFiles) {
+ Options options;
+ options.env = CurrentOptions().env;
+ options.enable_blob_files = true;
+ options.create_if_missing = true;
+ options.disable_auto_compactions = true;
+ options.min_blob_size = 0;
+ options.enable_blob_garbage_collection = true;
+ options.blob_garbage_collection_age_cutoff = 0.5;
+
+ BlobDBJobLevelEventListenerTest* blob_event_listener =
+ new BlobDBJobLevelEventListenerTest(this);
+ options.listeners.emplace_back(blob_event_listener);
+
+ DestroyAndReopen(options);
+
+ ASSERT_OK(Put("Key1", "blob_value1"));
+ ASSERT_OK(Put("Key2", "blob_value2"));
+ ASSERT_OK(Put("Key3", "blob_value3"));
+ ASSERT_OK(Put("Key4", "blob_value4"));
+ ASSERT_OK(Flush());
+
+ ASSERT_OK(Put("Key3", "new_blob_value3"));
+ ASSERT_OK(Put("Key4", "new_blob_value4"));
+ ASSERT_OK(Flush());
+
+ ASSERT_OK(Put("Key5", "blob_value5"));
+ ASSERT_OK(Put("Key6", "blob_value6"));
+ ASSERT_OK(Flush());
+
+ std::vector<std::string> output_file_names;
+ CompactionJobInfo compaction_job_info;
+
+ // On compaction, because of blob_garbage_collection_age_cutoff, it will
+ // delete the oldest blob file and create new blob file during compaction
+ // which will be populated in output_files_names.
+ ASSERT_OK(dbfull()->CompactFiles(
+ CompactionOptions(), blob_event_listener->GetFlushedFiles(), 1, -1,
+ &output_file_names, &compaction_job_info));
+
+ bool is_blob_in_output = false;
+ for (const auto& file : output_file_names) {
+ if (EndsWith(file, ".blob")) {
+ is_blob_in_output = true;
+ }
+ }
+ ASSERT_TRUE(is_blob_in_output);
+
+ for (const auto& blob_file_addition_info :
+ compaction_job_info.blob_file_addition_infos) {
+ EXPECT_GT(blob_file_addition_info.blob_file_number, 0U);
+ EXPECT_GT(blob_file_addition_info.total_blob_bytes, 0U);
+ EXPECT_GT(blob_file_addition_info.total_blob_count, 0U);
+ EXPECT_FALSE(blob_file_addition_info.blob_file_path.empty());
+ }
+
+ for (const auto& blob_file_garbage_info :
+ compaction_job_info.blob_file_garbage_infos) {
+ EXPECT_GT(blob_file_garbage_info.blob_file_number, 0U);
+ EXPECT_GT(blob_file_garbage_info.garbage_blob_count, 0U);
+ EXPECT_GT(blob_file_garbage_info.garbage_blob_bytes, 0U);
+ EXPECT_FALSE(blob_file_garbage_info.blob_file_path.empty());
+ }
+}
+
+class BlobDBFileLevelEventListener : public EventListener {
+ public:
+ BlobDBFileLevelEventListener() {
+ files_started_.store(0);
+ files_created_.store(0);
+ files_deleted_.store(0);
+ }
+
+ void OnBlobFileCreationStarted(
+ const BlobFileCreationBriefInfo& info) override {
+ files_started_++;
+ EXPECT_FALSE(info.db_name.empty());
+ EXPECT_FALSE(info.cf_name.empty());
+ EXPECT_FALSE(info.file_path.empty());
+ EXPECT_GT(info.job_id, 0);
+ }
+
+ void OnBlobFileCreated(const BlobFileCreationInfo& info) override {
+ files_created_++;
+ EXPECT_FALSE(info.db_name.empty());
+ EXPECT_FALSE(info.cf_name.empty());
+ EXPECT_FALSE(info.file_path.empty());
+ EXPECT_GT(info.job_id, 0);
+ EXPECT_GT(info.total_blob_count, 0U);
+ EXPECT_GT(info.total_blob_bytes, 0U);
+ EXPECT_EQ(info.file_checksum, kUnknownFileChecksum);
+ EXPECT_EQ(info.file_checksum_func_name, kUnknownFileChecksumFuncName);
+ EXPECT_TRUE(info.status.ok());
+ }
+
+ void OnBlobFileDeleted(const BlobFileDeletionInfo& info) override {
+ files_deleted_++;
+ EXPECT_FALSE(info.db_name.empty());
+ EXPECT_FALSE(info.file_path.empty());
+ EXPECT_GT(info.job_id, 0);
+ EXPECT_TRUE(info.status.ok());
+ }
+
+ void CheckCounters() {
+ EXPECT_EQ(files_started_, files_created_);
+ EXPECT_GT(files_started_, 0U);
+ EXPECT_GT(files_deleted_, 0U);
+ EXPECT_LT(files_deleted_, files_created_);
+ }
+
+ private:
+ std::atomic<uint32_t> files_started_;
+ std::atomic<uint32_t> files_created_;
+ std::atomic<uint32_t> files_deleted_;
+};
+
+TEST_F(EventListenerTest, BlobDBFileTest) {
+ Options options;
+ options.env = CurrentOptions().env;
+ options.enable_blob_files = true;
+ options.create_if_missing = true;
+ options.disable_auto_compactions = true;
+ options.min_blob_size = 0;
+ options.enable_blob_garbage_collection = true;
+ options.blob_garbage_collection_age_cutoff = 0.5;
+
+ BlobDBFileLevelEventListener* blob_event_listener =
+ new BlobDBFileLevelEventListener();
+ options.listeners.emplace_back(blob_event_listener);
+
+ DestroyAndReopen(options);
+
+ ASSERT_OK(Put("Key1", "blob_value1"));
+ ASSERT_OK(Put("Key2", "blob_value2"));
+ ASSERT_OK(Put("Key3", "blob_value3"));
+ ASSERT_OK(Put("Key4", "blob_value4"));
+ ASSERT_OK(Flush());
+
+ ASSERT_OK(Put("Key3", "new_blob_value3"));
+ ASSERT_OK(Put("Key4", "new_blob_value4"));
+ ASSERT_OK(Flush());
+
+ ASSERT_OK(Put("Key5", "blob_value5"));
+ ASSERT_OK(Put("Key6", "blob_value6"));
+ ASSERT_OK(Flush());
+
+ constexpr Slice* begin = nullptr;
+ constexpr Slice* end = nullptr;
+
+ // On compaction, because of blob_garbage_collection_age_cutoff, it will
+ // delete the oldest blob file and create new blob file during compaction.
+ ASSERT_OK(db_->CompactRange(CompactRangeOptions(), begin, end));
+
+ blob_event_listener->CheckCounters();
+}
+
} // namespace ROCKSDB_NAMESPACE
#endif // ROCKSDB_LITE
std::move(range_del_iters), &meta, nullptr /* blob_file_additions */,
{}, kMaxSequenceNumber, snapshot_checker,
false /* paranoid_file_checks*/, nullptr /* internal_stats */, &io_s,
- nullptr /*IOTracer*/, nullptr /* event_logger */, 0 /* job_id */,
- Env::IO_HIGH, nullptr /* table_properties */, write_hint);
+ nullptr /*IOTracer*/, BlobFileCreationReason::kRecovery,
+ nullptr /* event_logger */, 0 /* job_id */, Env::IO_HIGH,
+ nullptr /* table_properties */, write_hint);
ROCKS_LOG_INFO(db_options_.info_log,
"Log #%" PRIu64 ": %d ops saved to Table #%" PRIu64 " %s",
log, counter, meta.fd.GetNumber(),
class Status;
struct CompactionJobStats;
-struct TableFileCreationBriefInfo {
- // the name of the database where the file was created
+struct FileCreationBriefInfo {
+ FileCreationBriefInfo() = default;
+ FileCreationBriefInfo(const std::string& _db_name,
+ const std::string& _cf_name,
+ const std::string& _file_path, int _job_id)
+ : db_name(_db_name),
+ cf_name(_cf_name),
+ file_path(_file_path),
+ job_id(_job_id) {}
+ // the name of the database where the file was created.
std::string db_name;
// the name of the column family where the file was created.
std::string cf_name;
std::string file_path;
// the id of the job (which could be flush or compaction) that
// created the file.
- int job_id;
+ int job_id = 0;
+};
+
+struct TableFileCreationBriefInfo : public FileCreationBriefInfo {
// reason of creating the table.
TableFileCreationReason reason;
};
std::string file_checksum_func_name;
};
+struct BlobFileCreationBriefInfo : public FileCreationBriefInfo {
+ BlobFileCreationBriefInfo(const std::string& _db_name,
+ const std::string& _cf_name,
+ const std::string& _file_path, int _job_id,
+ BlobFileCreationReason _reason)
+ : FileCreationBriefInfo(_db_name, _cf_name, _file_path, _job_id),
+ reason(_reason) {}
+ // reason of creating the blob file.
+ BlobFileCreationReason reason;
+};
+
+struct BlobFileCreationInfo : public BlobFileCreationBriefInfo {
+ BlobFileCreationInfo(const std::string& _db_name, const std::string& _cf_name,
+ const std::string& _file_path, int _job_id,
+ BlobFileCreationReason _reason,
+ uint64_t _total_blob_count, uint64_t _total_blob_bytes,
+ Status _status, const std::string& _file_checksum,
+ const std::string& _file_checksum_func_name)
+ : BlobFileCreationBriefInfo(_db_name, _cf_name, _file_path, _job_id,
+ _reason),
+ total_blob_count(_total_blob_count),
+ total_blob_bytes(_total_blob_bytes),
+ status(_status),
+ file_checksum(_file_checksum),
+ file_checksum_func_name(_file_checksum_func_name) {}
+
+ // the number of blob in a file.
+ uint64_t total_blob_count;
+ // the total bytes in a file.
+ uint64_t total_blob_bytes;
+ // The status indicating whether the creation was successful or not.
+ Status status;
+ // The checksum of the blob file being created.
+ std::string file_checksum;
+ // The checksum function name of checksum generator used for this blob file.
+ std::string file_checksum_func_name;
+};
+
enum class CompactionReason : int {
kUnknown = 0,
// [Level] number of L0 files > level0_file_num_compaction_trigger
#ifndef ROCKSDB_LITE
-struct TableFileDeletionInfo {
+struct FileDeletionInfo {
+ FileDeletionInfo() = default;
+
+ FileDeletionInfo(const std::string& _db_name, const std::string& _file_path,
+ int _job_id, Status _status)
+ : db_name(_db_name),
+ file_path(_file_path),
+ job_id(_job_id),
+ status(_status) {}
// The name of the database where the file was deleted.
std::string db_name;
// The path to the deleted file.
std::string file_path;
// The id of the job which deleted the file.
- int job_id;
+ int job_id = 0;
// The status indicating whether the deletion was successful or not.
Status status;
};
+struct TableFileDeletionInfo : public FileDeletionInfo {};
+
+struct BlobFileDeletionInfo : public FileDeletionInfo {
+ BlobFileDeletionInfo(const std::string& _db_name,
+ const std::string& _file_path, int _job_id,
+ Status _status)
+ : FileDeletionInfo(_db_name, _file_path, _job_id, _status) {}
+};
+
enum class FileOperationType {
kRead,
kWrite,
}
};
+struct BlobFileInfo {
+ BlobFileInfo(const std::string& _blob_file_path,
+ const uint64_t _blob_file_number)
+ : blob_file_path(_blob_file_path), blob_file_number(_blob_file_number) {}
+
+ std::string blob_file_path;
+ uint64_t blob_file_number;
+};
+
+struct BlobFileAdditionInfo : public BlobFileInfo {
+ BlobFileAdditionInfo(const std::string& _blob_file_path,
+ const uint64_t _blob_file_number,
+ const uint64_t _total_blob_count,
+ const uint64_t _total_blob_bytes)
+ : BlobFileInfo(_blob_file_path, _blob_file_number),
+ total_blob_count(_total_blob_count),
+ total_blob_bytes(_total_blob_bytes) {}
+ uint64_t total_blob_count;
+ uint64_t total_blob_bytes;
+};
+
+struct BlobFileGarbageInfo : public BlobFileInfo {
+ BlobFileGarbageInfo(const std::string& _blob_file_path,
+ const uint64_t _blob_file_number,
+ const uint64_t _garbage_blob_count,
+ const uint64_t _garbage_blob_bytes)
+ : BlobFileInfo(_blob_file_path, _blob_file_number),
+ garbage_blob_count(_garbage_blob_count),
+ garbage_blob_bytes(_garbage_blob_bytes) {}
+ uint64_t garbage_blob_count;
+ uint64_t garbage_blob_bytes;
+};
+
struct FlushJobInfo {
// the id of the column family
uint32_t cf_id;
TableProperties table_properties;
FlushReason flush_reason;
+
+ // Compression algorithm used for blob output files
+ CompressionType blob_compression_type;
+
+ // Information about blob files created during flush in Integrated BlobDB.
+ std::vector<BlobFileAdditionInfo> blob_file_addition_infos;
};
struct CompactionFileInfo {
// Statistics and other additional details on the compaction
CompactionJobStats stats;
+
+ // Compression algorithm used for blob output files.
+ CompressionType blob_compression_type;
+
+ // Information about blob files created during compaction in Integrated
+ // BlobDB.
+ std::vector<BlobFileAdditionInfo> blob_file_addition_infos;
+
+ // Information about blob files deleted during compaction in Integrated
+ // BlobDB.
+ std::vector<BlobFileGarbageInfo> blob_file_garbage_infos;
};
struct MemTableInfo {
// initiate any further recovery actions needed
virtual void OnErrorRecoveryCompleted(Status /* old_bg_error */) {}
+ // A callback function for RocksDB which will be called before
+ // a blob file is being created. It will follow by OnBlobFileCreated after
+ // the creation finishes.
+ //
+ // Note that if applications would like to use the passed reference
+ // outside this function call, they should make copies from these
+ // returned value.
+ virtual void OnBlobFileCreationStarted(
+ const BlobFileCreationBriefInfo& /*info*/) {}
+
+ // A callback function for RocksDB which will be called whenever
+ // a blob file is created.
+ // It will be called whether the file is successfully created or not. User can
+ // check info.status to see if it succeeded or not.
+ //
+ // Note that if applications would like to use the passed reference
+ // outside this function call, they should make copies from these
+ // returned value.
+ virtual void OnBlobFileCreated(const BlobFileCreationInfo& /*info*/) {}
+
+ // A callback function for RocksDB which will be called whenever
+ // a blob file is deleted.
+ //
+ // Note that if applications would like to use the passed reference
+ // outside this function call, they should make copies from these
+ // returned value.
+ virtual void OnBlobFileDeleted(const BlobFileDeletionInfo& /*info*/) {}
+
virtual ~EventListener() {}
};
kMisc,
};
+enum class BlobFileCreationReason {
+ kFlush,
+ kCompaction,
+ kRecovery,
+};
+
// The types of files RocksDB uses in a DB directory. (Available for
// advanced options.)
enum FileType {