}
TEST_F(ExternalSSTFileBasicTest, IngestFileWithGlobalSeqnoPickedSeqno) {
- Options options = CurrentOptions();
- DestroyAndReopen(options);
- std::map<std::string, std::string> true_data;
-
- int file_id = 1;
-
- ASSERT_OK(GenerateAndAddExternalFile(options, {1, 2, 3, 4, 5, 6}, file_id++,
- &true_data));
- // File dont overwrite any keys, No seqno needed
- ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 0);
-
- ASSERT_OK(GenerateAndAddExternalFile(options, {10, 11, 12, 13}, file_id++,
- &true_data));
- // File dont overwrite any keys, No seqno needed
- ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 0);
-
- ASSERT_OK(
- GenerateAndAddExternalFile(options, {1, 4, 6}, file_id++, &true_data));
- // File overwrite some keys, a seqno will be assigned
- ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 1);
-
- ASSERT_OK(
- GenerateAndAddExternalFile(options, {11, 15, 19}, file_id++, &true_data));
- // File overwrite some keys, a seqno will be assigned
- ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 2);
-
- ASSERT_OK(
- GenerateAndAddExternalFile(options, {120, 130}, file_id++, &true_data));
- // File dont overwrite any keys, No seqno needed
- ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 2);
-
- ASSERT_OK(
- GenerateAndAddExternalFile(options, {1, 130}, file_id++, &true_data));
- // File overwrite some keys, a seqno will be assigned
- ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 3);
-
- // Write some keys through normal write path
- for (int i = 0; i < 50; i++) {
- ASSERT_OK(Put(Key(i), "memtable"));
- true_data[Key(i)] = "memtable";
- }
- SequenceNumber last_seqno = dbfull()->GetLatestSequenceNumber();
-
- ASSERT_OK(
- GenerateAndAddExternalFile(options, {60, 61, 62}, file_id++, &true_data));
- // File dont overwrite any keys, No seqno needed
- ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), last_seqno);
-
- ASSERT_OK(
- GenerateAndAddExternalFile(options, {40, 41, 42}, file_id++, &true_data));
- // File overwrite some keys, a seqno will be assigned
- ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), last_seqno + 1);
-
- ASSERT_OK(
- GenerateAndAddExternalFile(options, {20, 30, 40}, file_id++, &true_data));
- // File overwrite some keys, a seqno will be assigned
- ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), last_seqno + 2);
-
- const Snapshot* snapshot = db_->GetSnapshot();
-
- // We will need a seqno for the file regardless if the file overwrite
- // keys in the DB or not because we have a snapshot
- ASSERT_OK(
- GenerateAndAddExternalFile(options, {1000, 1002}, file_id++, &true_data));
- // A global seqno will be assigned anyway because of the snapshot
- ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), last_seqno + 3);
-
- ASSERT_OK(
- GenerateAndAddExternalFile(options, {2000, 3002}, file_id++, &true_data));
- // A global seqno will be assigned anyway because of the snapshot
- ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), last_seqno + 4);
-
- ASSERT_OK(GenerateAndAddExternalFile(options, {1, 20, 40, 100, 150},
- file_id++, &true_data));
- // A global seqno will be assigned anyway because of the snapshot
- ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), last_seqno + 5);
-
- db_->ReleaseSnapshot(snapshot);
-
- ASSERT_OK(
- GenerateAndAddExternalFile(options, {5000, 5001}, file_id++, &true_data));
- // No snapshot anymore, no need to assign a seqno
- ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), last_seqno + 5);
-
- size_t kcnt = 0;
- VerifyDBFromMap(true_data, &kcnt, false);
+ do {
+ Options options = CurrentOptions();
+ DestroyAndReopen(options);
+ std::map<std::string, std::string> true_data;
+
+ int file_id = 1;
+
+ ASSERT_OK(GenerateAndAddExternalFile(options, {1, 2, 3, 4, 5, 6}, file_id++,
+ &true_data));
+ // File dont overwrite any keys, No seqno needed
+ ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 0);
+
+ ASSERT_OK(GenerateAndAddExternalFile(options, {10, 11, 12, 13}, file_id++,
+ &true_data));
+ // File dont overwrite any keys, No seqno needed
+ ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 0);
+
+ ASSERT_OK(
+ GenerateAndAddExternalFile(options, {1, 4, 6}, file_id++, &true_data));
+ // File overwrite some keys, a seqno will be assigned
+ ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 1);
+
+ ASSERT_OK(GenerateAndAddExternalFile(options, {11, 15, 19}, file_id++,
+ &true_data));
+ // File overwrite some keys, a seqno will be assigned
+ ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 2);
+
+ ASSERT_OK(
+ GenerateAndAddExternalFile(options, {120, 130}, file_id++, &true_data));
+ // File dont overwrite any keys, No seqno needed
+ ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 2);
+
+ ASSERT_OK(
+ GenerateAndAddExternalFile(options, {1, 130}, file_id++, &true_data));
+ // File overwrite some keys, a seqno will be assigned
+ ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 3);
+
+ // Write some keys through normal write path
+ for (int i = 0; i < 50; i++) {
+ ASSERT_OK(Put(Key(i), "memtable"));
+ true_data[Key(i)] = "memtable";
+ }
+ SequenceNumber last_seqno = dbfull()->GetLatestSequenceNumber();
+
+ ASSERT_OK(GenerateAndAddExternalFile(options, {60, 61, 62}, file_id++,
+ &true_data));
+ // File dont overwrite any keys, No seqno needed
+ ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), last_seqno);
+
+ ASSERT_OK(GenerateAndAddExternalFile(options, {40, 41, 42}, file_id++,
+ &true_data));
+ // File overwrite some keys, a seqno will be assigned
+ ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), last_seqno + 1);
+
+ ASSERT_OK(GenerateAndAddExternalFile(options, {20, 30, 40}, file_id++,
+ &true_data));
+ // File overwrite some keys, a seqno will be assigned
+ ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), last_seqno + 2);
+
+ const Snapshot* snapshot = db_->GetSnapshot();
+
+ // We will need a seqno for the file regardless if the file overwrite
+ // keys in the DB or not because we have a snapshot
+ ASSERT_OK(GenerateAndAddExternalFile(options, {1000, 1002}, file_id++,
+ &true_data));
+ // A global seqno will be assigned anyway because of the snapshot
+ ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), last_seqno + 3);
+
+ ASSERT_OK(GenerateAndAddExternalFile(options, {2000, 3002}, file_id++,
+ &true_data));
+ // A global seqno will be assigned anyway because of the snapshot
+ ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), last_seqno + 4);
+
+ ASSERT_OK(GenerateAndAddExternalFile(options, {1, 20, 40, 100, 150},
+ file_id++, &true_data));
+ // A global seqno will be assigned anyway because of the snapshot
+ ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), last_seqno + 5);
+
+ db_->ReleaseSnapshot(snapshot);
+
+ ASSERT_OK(GenerateAndAddExternalFile(options, {5000, 5001}, file_id++,
+ &true_data));
+ // No snapshot anymore, no need to assign a seqno
+ ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), last_seqno + 5);
+
+ size_t kcnt = 0;
+ VerifyDBFromMap(true_data, &kcnt, false);
+ } while (ChangeCompactOptions());
}
TEST_F(ExternalSSTFileBasicTest, FadviseTrigger) {
bool consumed_seqno = false;
bool force_global_seqno = false;
- const SequenceNumber last_seqno = versions_->LastSequence();
+
if (ingestion_options_.snapshot_consistency && !db_snapshots_->empty()) {
// We need to assign a global sequence number to all the files even
// if the dont overlap with any ranges since we have snapshots
force_global_seqno = true;
}
-
+ const SequenceNumber last_seqno = versions_->LastSequence();
SuperVersion* super_version = cfd_->GetSuperVersion();
edit_.SetColumnFamily(cfd_->GetID());
// The levels that the files will be ingested into
+
for (IngestedFileInfo& f : files_to_ingest_) {
- bool overlap_with_db = false;
- status = AssignLevelForIngestedFile(super_version, &f, &overlap_with_db);
+ SequenceNumber assigned_seqno = 0;
+ status = AssignLevelAndSeqnoForIngestedFile(
+ super_version, force_global_seqno, cfd_->ioptions()->compaction_style,
+ &f, &assigned_seqno);
if (!status.ok()) {
return status;
}
-
- if (overlap_with_db || force_global_seqno) {
- status = AssignGlobalSeqnoForIngestedFile(&f, last_seqno + 1);
+ status = AssignGlobalSeqnoForIngestedFile(&f, assigned_seqno);
+ TEST_SYNC_POINT_CALLBACK("ExternalSstFileIngestionJob::Run",
+ &assigned_seqno);
+ if (assigned_seqno == last_seqno + 1) {
consumed_seqno = true;
- } else {
- status = AssignGlobalSeqnoForIngestedFile(&f, 0);
}
-
if (!status.ok()) {
return status;
}
-
edit_.AddFile(f.picked_level, f.fd.GetNumber(), f.fd.GetPathId(),
f.fd.GetFileSize(), f.smallest_internal_key(),
f.largest_internal_key(), f.assigned_seqno, f.assigned_seqno,
return status;
}
-Status ExternalSstFileIngestionJob::AssignLevelForIngestedFile(
- SuperVersion* sv, IngestedFileInfo* file_to_ingest, bool* overlap_with_db) {
- *overlap_with_db = false;
+Status ExternalSstFileIngestionJob::AssignLevelAndSeqnoForIngestedFile(
+ SuperVersion* sv, bool force_global_seqno, CompactionStyle compaction_style,
+ IngestedFileInfo* file_to_ingest, SequenceNumber* assigned_seqno) {
+ Status status;
+ *assigned_seqno = 0;
+ const SequenceNumber last_seqno = versions_->LastSequence();
+ if (force_global_seqno) {
+ *assigned_seqno = last_seqno + 1;
+ if (compaction_style == kCompactionStyleUniversal) {
+ file_to_ingest->picked_level = 0;
+ return status;
+ }
+ }
+ bool overlap_with_db = false;
Arena arena;
ReadOptions ro;
ro.total_order_seek = true;
- Status status;
int target_level = 0;
auto* vstorage = cfd_->current()->storage_info();
for (int lvl = 0; lvl < cfd_->NumberLevels(); lvl++) {
if (!status.ok()) {
return status;
}
-
if (overlap_with_level) {
// We must use L0 or any level higher than `lvl` to be able to overwrite
// the keys that we overlap with in this level, We also need to assign
// this file a seqno to overwrite the existing keys in level `lvl`
- *overlap_with_db = true;
+ overlap_with_db = true;
break;
}
+
+ if (compaction_style == kCompactionStyleUniversal && lvl != 0) {
+ const std::vector<FileMetaData*>& level_files =
+ vstorage->LevelFiles(lvl);
+ const SequenceNumber level_largest_seqno =
+ (*max_element(level_files.begin(), level_files.end(),
+ [](FileMetaData* f1, FileMetaData* f2) {
+ return f1->largest_seqno < f2->largest_seqno;
+ }))
+ ->largest_seqno;
+ if (level_largest_seqno != 0) {
+ *assigned_seqno = level_largest_seqno;
+ } else {
+ continue;
+ }
+ }
+ } else if (compaction_style == kCompactionStyleUniversal) {
+ continue;
}
// We dont overlap with any keys in this level, but we still need to check
// if our file can fit in it
-
if (IngestedFileFitInLevel(file_to_ingest, lvl)) {
target_level = lvl;
}
}
+ TEST_SYNC_POINT_CALLBACK(
+ "ExternalSstFileIngestionJob::AssignLevelAndSeqnoForIngestedFile",
+ &overlap_with_db);
file_to_ingest->picked_level = target_level;
+ if (overlap_with_db && *assigned_seqno == 0) {
+ *assigned_seqno = last_seqno + 1;
+ }
return status;
}
ASSERT_EQ(Get(Key(k)), value);
}
DestroyAndRecreateExternalSSTFilesDir();
- } while (ChangeOptions(kSkipPlainTable | kSkipUniversalCompaction |
- kSkipFIFOCompaction));
+ } while (ChangeOptions(kSkipPlainTable | kSkipFIFOCompaction));
}
class SstFileWriterCollector : public TablePropertiesCollector {
public:
ASSERT_EQ(Get(Key(k)), value);
}
DestroyAndRecreateExternalSSTFilesDir();
- } while (ChangeOptions(kSkipPlainTable | kSkipUniversalCompaction |
- kSkipFIFOCompaction));
+ } while (ChangeOptions(kSkipPlainTable | kSkipFIFOCompaction));
}
TEST_F(ExternalSSTFileTest, AddListAtomicity) {
ASSERT_EQ(Get(Key(k)), value);
}
DestroyAndRecreateExternalSSTFilesDir();
- } while (ChangeOptions(kSkipPlainTable | kSkipUniversalCompaction |
- kSkipFIFOCompaction));
+ } while (ChangeOptions(kSkipPlainTable | kSkipFIFOCompaction));
}
// This test reporduce a bug that can happen in some cases if the DB started
// purging obsolete files when we are adding an external sst file.
fprintf(stderr, "Verified %d values\n", num_files * keys_per_file);
DestroyAndRecreateExternalSSTFilesDir();
- } while (ChangeOptions(kSkipPlainTable | kSkipUniversalCompaction |
- kSkipFIFOCompaction));
+ } while (ChangeOptions(kSkipPlainTable | kSkipFIFOCompaction));
}
TEST_F(ExternalSSTFileTest, OverlappingRanges) {
Random rnd(301);
+ int picked_level = 0;
+ rocksdb::SyncPoint::GetInstance()->SetCallBack(
+ "ExternalSstFileIngestionJob::Run", [&picked_level](void* arg) {
+ ASSERT_TRUE(arg != nullptr);
+ picked_level = *(static_cast<int*>(arg));
+ });
+ bool need_flush = false;
+ rocksdb::SyncPoint::GetInstance()->SetCallBack(
+ "DBImpl::IngestExternalFile:NeedFlush", [&need_flush](void* arg) {
+ ASSERT_TRUE(arg != nullptr);
+ need_flush = *(static_cast<bool*>(arg));
+ });
+ bool overlap_with_db = false;
+ rocksdb::SyncPoint::GetInstance()->SetCallBack(
+ "ExternalSstFileIngestionJob::AssignLevelAndSeqnoForIngestedFile",
+ [&overlap_with_db](void* arg) {
+ ASSERT_TRUE(arg != nullptr);
+ overlap_with_db = *(static_cast<bool*>(arg));
+ });
+ rocksdb::SyncPoint::GetInstance()->EnableProcessing();
do {
Options options = CurrentOptions();
DestroyAndReopen(options);
// Insert the generated file
s = DeprecatedAddFile({file_name});
-
auto it = true_data.lower_bound(Key(range_start));
- if (it != true_data.end() && it->first <= Key(range_end)) {
- // This range overlap with data already exist in DB
- ASSERT_NOK(s);
- failed_add_file++;
+ if (option_config_ != kUniversalCompaction &&
+ option_config_ != kUniversalCompactionMultiLevel) {
+ if (it != true_data.end() && it->first <= Key(range_end)) {
+ // This range overlap with data already exist in DB
+ ASSERT_NOK(s);
+ failed_add_file++;
+ } else {
+ ASSERT_OK(s);
+ success_add_file++;
+ }
} else {
- ASSERT_OK(s);
- success_add_file++;
+ if ((it != true_data.end() && it->first <= Key(range_end)) ||
+ need_flush || picked_level > 0 || overlap_with_db) {
+ // This range overlap with data already exist in DB
+ ASSERT_NOK(s);
+ failed_add_file++;
+ } else {
+ ASSERT_OK(s);
+ success_add_file++;
+ }
}
}
}
printf("keys/values verified\n");
DestroyAndRecreateExternalSSTFilesDir();
- } while (ChangeOptions(kSkipPlainTable | kSkipUniversalCompaction |
- kSkipFIFOCompaction));
+ } while (ChangeOptions(kSkipPlainTable | kSkipFIFOCompaction));
}
TEST_F(ExternalSSTFileTest, PickedLevel) {