auto num_files = files_to_import_.size();
if (num_files == 0) {
return Status::InvalidArgument("The list of files is empty");
- } else if (num_files > 1) {
- // Verify that passed files don't have overlapping ranges in any particular
- // level.
- int min_level = 1; // Check for overlaps in Level 1 and above.
- int max_level = -1;
- for (const auto& file_metadata : metadata_) {
- if (file_metadata.level > max_level) {
- max_level = file_metadata.level;
- }
- }
- for (int level = min_level; level <= max_level; ++level) {
- autovector<const IngestedFileInfo*> sorted_files;
- for (size_t i = 0; i < num_files; i++) {
- if (metadata_[i].level == level) {
- sorted_files.push_back(&files_to_import_[i]);
- }
- }
-
- std::sort(
- sorted_files.begin(), sorted_files.end(),
- [this](const IngestedFileInfo* info1, const IngestedFileInfo* info2) {
- return cfd_->internal_comparator().Compare(
- info1->smallest_internal_key,
- info2->smallest_internal_key) < 0;
- });
-
- for (size_t i = 0; i + 1 < sorted_files.size(); i++) {
- if (cfd_->internal_comparator().Compare(
- sorted_files[i]->largest_internal_key,
- sorted_files[i + 1]->smallest_internal_key) >= 0) {
- return Status::InvalidArgument("Files have overlapping ranges");
- }
- }
- }
}
for (const auto& f : files_to_import_) {
// REQUIRES: we have become the only writer by entering both write_thread_ and
// nonmem_write_thread_
Status ImportColumnFamilyJob::Run() {
- Status status;
- edit_.SetColumnFamily(cfd_->GetID());
-
// We use the import time as the ancester time. This is the time the data
// is written to the database.
int64_t temp_current_time = 0;
static_cast<uint64_t>(temp_current_time);
}
- for (size_t i = 0; i < files_to_import_.size(); ++i) {
+ VersionBuilder version_builder(
+ cfd_->current()->version_set()->file_options(), cfd_->ioptions(),
+ cfd_->table_cache(), cfd_->current()->storage_info(),
+ cfd_->current()->version_set(),
+ cfd_->GetFileMetadataCacheReservationManager());
+ VersionStorageInfo vstorage(
+ &cfd_->internal_comparator(), cfd_->user_comparator(),
+ cfd_->NumberLevels(), cfd_->ioptions()->compaction_style,
+ nullptr /* src_vstorage */, cfd_->ioptions()->force_consistency_checks);
+ Status s;
+ for (size_t i = 0; s.ok() && i < files_to_import_.size(); ++i) {
const auto& f = files_to_import_[i];
const auto& file_metadata = metadata_[i];
- edit_.AddFile(file_metadata.level, f.fd.GetNumber(), f.fd.GetPathId(),
- f.fd.GetFileSize(), f.smallest_internal_key,
- f.largest_internal_key, file_metadata.smallest_seqno,
- file_metadata.largest_seqno, false, file_metadata.temperature,
- kInvalidBlobFileNumber, oldest_ancester_time, current_time,
- kUnknownFileChecksum, kUnknownFileChecksumFuncName,
- f.unique_id);
-
- // If incoming sequence number is higher, update local sequence number.
- if (file_metadata.largest_seqno > versions_->LastSequence()) {
- versions_->SetLastAllocatedSequence(file_metadata.largest_seqno);
- versions_->SetLastPublishedSequence(file_metadata.largest_seqno);
- versions_->SetLastSequence(file_metadata.largest_seqno);
+ VersionEdit version_edit;
+ version_edit.AddFile(
+ file_metadata.level, f.fd.GetNumber(), f.fd.GetPathId(),
+ f.fd.GetFileSize(), f.smallest_internal_key, f.largest_internal_key,
+ file_metadata.smallest_seqno, file_metadata.largest_seqno, false,
+ file_metadata.temperature, kInvalidBlobFileNumber, oldest_ancester_time,
+ current_time, kUnknownFileChecksum, kUnknownFileChecksumFuncName,
+ f.unique_id);
+ s = version_builder.Apply(&version_edit);
+ }
+ if (s.ok()) {
+ s = version_builder.SaveTo(&vstorage);
+ }
+ if (s.ok()) {
+ edit_.SetColumnFamily(cfd_->GetID());
+
+ for (int level = 0; level < vstorage.num_levels(); level++) {
+ for (FileMetaData* file_meta : vstorage.LevelFiles(level)) {
+ edit_.AddFile(level, *file_meta);
+ // If incoming sequence number is higher, update local sequence number.
+ if (file_meta->fd.largest_seqno > versions_->LastSequence()) {
+ versions_->SetLastAllocatedSequence(file_meta->fd.largest_seqno);
+ versions_->SetLastPublishedSequence(file_meta->fd.largest_seqno);
+ versions_->SetLastSequence(file_meta->fd.largest_seqno);
+ }
+ }
}
}
-
- return status;
+ for (int level = 0; level < vstorage.num_levels(); level++) {
+ for (FileMetaData* file_meta : vstorage.LevelFiles(level)) {
+ file_meta->refs--;
+ if (file_meta->refs <= 0) {
+ delete file_meta;
+ }
+ }
+ }
+ return s;
}
void ImportColumnFamilyJob::Cleanup(const Status& status) {