}
if (blob_inserter.has_put()) {
- CloseIf(blob_inserter.last_file());
+ s = CloseBlobFileIfNeeded(blob_inserter.last_file());
+ if (!s.ok()) {
+ return s;
+ }
}
// add deleted key to list of keys that have been deleted for book-keeping
extendTTL(&(bfile->ttl_range_), expiration);
}
- CloseIf(bfile);
+ if (s.ok()) {
+ s = CloseBlobFileIfNeeded(bfile);
+ }
TEST_SYNC_POINT("BlobDBImpl::PutUntil:Finish");
return s;
return std::make_pair(true, -1);
}
-std::pair<bool, int64_t> BlobDBImpl::CloseSeqWrite(
- std::shared_ptr<BlobFile> bfile, bool aborted) {
+Status BlobDBImpl::CloseBlobFile(std::shared_ptr<BlobFile> bfile) {
+ Status s;
+ ROCKS_LOG_INFO(db_options_.info_log, "Close blob file %" PRIu64,
+ bfile->BlobFileNumber());
{
WriteLock wl(&mutex_);
- // this prevents others from picking up this file
- open_blob_files_.erase(bfile);
-
- auto findit =
- std::find(open_simple_files_.begin(), open_simple_files_.end(), bfile);
- if (findit != open_simple_files_.end()) open_simple_files_.erase(findit);
+ if (bfile->HasTTL()) {
+ size_t erased __attribute__((__unused__)) = open_blob_files_.erase(bfile);
+ assert(erased == 1);
+ } else {
+ auto iter = std::find(open_simple_files_.begin(),
+ open_simple_files_.end(), bfile);
+ assert(iter != open_simple_files_.end());
+ open_simple_files_.erase(iter);
+ }
}
if (!bfile->closed_.load()) {
WriteLock lockbfile_w(&bfile->mutex_);
- bfile->WriteFooterAndCloseLocked();
+ s = bfile->WriteFooterAndCloseLocked();
}
- return std::make_pair(false, -1);
-}
-
-void BlobDBImpl::CloseIf(const std::shared_ptr<BlobFile>& bfile) {
- // atomic read
- bool close = bfile->GetFileSize() > bdb_options_.blob_file_size;
- if (!close) return;
-
- if (debug_level_ >= 2) {
- ROCKS_LOG_DEBUG(db_options_.info_log,
- "Scheduling file for close %s fsize: %" PRIu64
- " limit: %" PRIu64,
- bfile->PathName().c_str(), bfile->GetFileSize(),
- bdb_options_.blob_file_size);
+ if (!s.ok()) {
+ ROCKS_LOG_ERROR(db_options_.info_log,
+ "Failed to close blob file %" PRIu64 "with error: %s",
+ bfile->BlobFileNumber(), s.ToString().c_str());
}
- {
- WriteLock wl(&mutex_);
+ return s;
+}
- open_blob_files_.erase(bfile);
- auto findit =
- std::find(open_simple_files_.begin(), open_simple_files_.end(), bfile);
- if (findit != open_simple_files_.end()) {
- open_simple_files_.erase(findit);
- } else {
- ROCKS_LOG_WARN(db_options_.info_log,
- "File not found while closing %s fsize: %" PRIu64
- " Multithreaded Writes?",
- bfile->PathName().c_str(), bfile->GetFileSize());
- }
+Status BlobDBImpl::CloseBlobFileIfNeeded(std::shared_ptr<BlobFile>& bfile) {
+ // atomic read
+ if (bfile->GetFileSize() < bdb_options_.blob_file_size) {
+ return Status::OK();
}
-
- tqueue_.add(0, std::bind(&BlobDBImpl::CloseSeqWrite, this, bfile,
- std::placeholders::_1));
+ return CloseBlobFile(bfile);
}
bool BlobDBImpl::FileDeleteOk_SnapshotCheckLocked(
}
for (auto bfile : process_files) {
- CloseSeqWrite(bfile, false);
+ CloseBlobFile(bfile);
}
return std::make_pair(true, -1);
delete transaction;
}
ROCKS_LOG_INFO(
- db_options_.info_log, "%s blob file %" PRIu64 ".",
+ db_options_.info_log,
+ "%s blob file %" PRIu64
". Total blob records: %" PRIu64 ", Deletes: %" PRIu64 "/%" PRIu64
" succeeded, Relocates: %" PRIu64 "/%" PRIu64 " succeeded.",
s.ok() ? "Successfully garbage collected" : "Failed to garbage collect",
DeleteObsoleteFiles(false /*abort*/);
}
-void BlobDBImpl::TEST_CloseBlobFile(std::shared_ptr<BlobFile>& bfile) {
- CloseSeqWrite(bfile, false /*abort*/);
+Status BlobDBImpl::TEST_CloseBlobFile(std::shared_ptr<BlobFile>& bfile) {
+ return CloseBlobFile(bfile);
}
Status BlobDBImpl::TEST_GCFileAndUpdateLSM(std::shared_ptr<BlobFile>& bfile,
std::vector<std::shared_ptr<BlobFile>> TEST_GetObsoleteFiles() const;
- void TEST_CloseBlobFile(std::shared_ptr<BlobFile>& bfile);
+ Status TEST_CloseBlobFile(std::shared_ptr<BlobFile>& bfile);
Status TEST_GCFileAndUpdateLSM(std::shared_ptr<BlobFile>& bfile,
GCStats* gc_stats);
// this handler is called.
void OnFlushBeginHandler(DB* db, const FlushJobInfo& info);
- // timer queue callback to close a file by appending a footer
- // removes file from open files list
- std::pair<bool, int64_t> CloseSeqWrite(std::shared_ptr<BlobFile> bfile,
- bool aborted);
-
// is this file ready for Garbage collection. if the TTL of the file
// has expired or if threshold of the file has been evicted
// tt - current time
// collect all the blob log files from the blob directory
Status GetAllLogFiles(std::set<std::pair<uint64_t, std::string>>* file_nums);
- // appends a task into timer queue to close the file
- void CloseIf(const std::shared_ptr<BlobFile>& bfile);
+ // Close a file by appending a footer, and removes file from open files list.
+ Status CloseBlobFile(std::shared_ptr<BlobFile> bfile);
+
+ // Close a file if its size exceeds blob_file_size
+ Status CloseBlobFileIfNeeded(std::shared_ptr<BlobFile>& bfile);
uint64_t ExtractExpiration(const Slice& key, const Slice& value,
Slice* value_slice, std::string* new_value);
// epoch or version of the open files.
std::atomic<uint64_t> epoch_of_;
- // typically we keep 4 open blob files (simple i.e. no TTL)
+ // All opened non-TTL blob files.
std::vector<std::shared_ptr<BlobFile>> open_simple_files_;
// all the blob files which are currently being appended to based
auto blob_files = bdb_impl->TEST_GetBlobFiles();
ASSERT_EQ(1, blob_files.size());
ASSERT_TRUE(blob_files[0]->HasTTL());
- bdb_impl->TEST_CloseBlobFile(blob_files[0]);
+ ASSERT_OK(bdb_impl->TEST_CloseBlobFile(blob_files[0]));
GCStats gc_stats;
ASSERT_OK(bdb_impl->TEST_GCFileAndUpdateLSM(blob_files[0], &gc_stats));
ASSERT_EQ(100 - data.size(), gc_stats.num_deletes);
auto blob_files = bdb_impl->TEST_GetBlobFiles();
ASSERT_EQ(1, blob_files.size());
ASSERT_TRUE(blob_files[0]->HasTTL());
- bdb_impl->TEST_CloseBlobFile(blob_files[0]);
+ ASSERT_OK(bdb_impl->TEST_CloseBlobFile(blob_files[0]));
GCStats gc_stats;
ASSERT_OK(bdb_impl->TEST_GCFileAndUpdateLSM(blob_files[0], &gc_stats));
ASSERT_EQ(100 - data.size(), gc_stats.num_deletes);
auto blob_files = bdb_impl->TEST_GetBlobFiles();
ASSERT_EQ(1, blob_files.size());
ASSERT_FALSE(blob_files[0]->HasTTL());
- bdb_impl->TEST_CloseBlobFile(blob_files[0]);
+ ASSERT_OK(bdb_impl->TEST_CloseBlobFile(blob_files[0]));
GCStats gc_stats;
ASSERT_OK(bdb_impl->TEST_GCFileAndUpdateLSM(blob_files[0], &gc_stats));
ASSERT_EQ(0, gc_stats.num_deletes);
auto blob_files = bdb_impl->TEST_GetBlobFiles();
ASSERT_EQ(1, blob_files.size());
ASSERT_TRUE(blob_files[0]->HasTTL());
- bdb_impl->TEST_CloseBlobFile(blob_files[0]);
+ ASSERT_OK(bdb_impl->TEST_CloseBlobFile(blob_files[0]));
GCStats gc_stats;
ASSERT_OK(bdb_impl->TEST_GCFileAndUpdateLSM(blob_files[0], &gc_stats));
auto &data = static_cast<TestTTLExtractor *>(ttl_extractor_.get())->data;
auto blob_files = bdb_impl->TEST_GetBlobFiles();
ASSERT_EQ(1, blob_files.size());
ASSERT_TRUE(blob_files[0]->HasTTL());
- bdb_impl->TEST_CloseBlobFile(blob_files[0]);
+ ASSERT_OK(bdb_impl->TEST_CloseBlobFile(blob_files[0]));
GCStats gc_stats;
ASSERT_OK(bdb_impl->TEST_GCFileAndUpdateLSM(blob_files[0], &gc_stats));
auto &data = static_cast<TestTTLExtractor *>(ttl_extractor_.get())->data;
auto blob_files = bdb_impl->TEST_GetBlobFiles();
ASSERT_EQ(1, blob_files.size());
ASSERT_TRUE(blob_files[0]->HasTTL());
- bdb_impl->TEST_CloseBlobFile(blob_files[0]);
+ ASSERT_OK(bdb_impl->TEST_CloseBlobFile(blob_files[0]));
GCStats gc_stats;
ASSERT_OK(bdb_impl->TEST_GCFileAndUpdateLSM(blob_files[0], &gc_stats));
ASSERT_EQ(100 - data.size(), gc_stats.num_deletes);
}
auto blob_files = blob_db_impl->TEST_GetBlobFiles();
ASSERT_EQ(1, blob_files.size());
- blob_db_impl->TEST_CloseBlobFile(blob_files[0]);
+ ASSERT_OK(blob_db_impl->TEST_CloseBlobFile(blob_files[0]));
// Test for data in SST
size_t new_keys = 0;
for (int i = 0; i < 100; i++) {
static_cast_with_check<BlobDBImpl, BlobDB>(blob_db_);
auto blob_files = blob_db_impl->TEST_GetBlobFiles();
ASSERT_EQ(1, blob_files.size());
- blob_db_impl->TEST_CloseBlobFile(blob_files[0]);
+ ASSERT_OK(blob_db_impl->TEST_CloseBlobFile(blob_files[0]));
SyncPoint::GetInstance()->LoadDependency(
{{"BlobDBImpl::GCFileAndUpdateLSM:AfterGetForUpdate",
static_cast_with_check<BlobDBImpl, BlobDB>(blob_db_);
auto blob_files = blob_db_impl->TEST_GetBlobFiles();
ASSERT_EQ(1, blob_files.size());
- blob_db_impl->TEST_CloseBlobFile(blob_files[0]);
+ ASSERT_OK(blob_db_impl->TEST_CloseBlobFile(blob_files[0]));
mock_env_->set_now_micros(300 * 1000000);
SyncPoint::GetInstance()->LoadDependency(
ASSERT_EQ(11, blob_files.size());
ASSERT_TRUE(blob_files[0]->HasTTL());
ASSERT_TRUE(blob_files[0]->Immutable());
- blob_db_impl->TEST_CloseBlobFile(blob_files[0]);
for (int i = 1; i <= 10; i++) {
ASSERT_FALSE(blob_files[i]->HasTTL());
if (i < 10) {
ASSERT_EQ(1, blob_files.size());
std::shared_ptr<BlobFile> bfile = blob_files[0];
uint64_t bfile_number = bfile->BlobFileNumber();
- blob_db_impl->TEST_CloseBlobFile(bfile);
+ ASSERT_OK(blob_db_impl->TEST_CloseBlobFile(bfile));
switch (i) {
case 0: