snapshots.empty() ? 0 : snapshots.back());
CompactionIterator c_iter(iter, internal_comparator.user_comparator(),
- &merge, kMaxSequenceNumber, &snapshots,
- kMaxSequenceNumber, env,
+ &merge, kMaxSequenceNumber, &snapshots, env,
true /* internal key corruption is not ok */);
c_iter.SeekToFirst();
for (; c_iter.Valid(); c_iter.Next()) {
CompactionIterator::CompactionIterator(
InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper,
SequenceNumber last_sequence, std::vector<SequenceNumber>* snapshots,
- SequenceNumber earliest_write_conflict_snapshot, Env* env,
- bool expect_valid_internal_key, Compaction* compaction,
+ Env* env, bool expect_valid_internal_key, Compaction* compaction,
const CompactionFilter* compaction_filter, LogBuffer* log_buffer)
: input_(input),
cmp_(cmp),
merge_helper_(merge_helper),
snapshots_(snapshots),
- earliest_write_conflict_snapshot_(earliest_write_conflict_snapshot),
env_(env),
expect_valid_internal_key_(expect_valid_internal_key),
compaction_(compaction),
ParsedInternalKey next_ikey;
input_->Next();
- if (earliest_write_conflict_snapshot_) {
- // TODO(agiardullo): to be used in D50295
- // adding this if statement to keep CLANG happy in the meantime
- }
-
// Check whether the current key is valid, not corrupt and the same
// as the single delete.
if (input_->Valid() && ParseInternalKey(input_->key(), &next_ikey) &&
public:
CompactionIterator(InternalIterator* input, const Comparator* cmp,
MergeHelper* merge_helper, SequenceNumber last_sequence,
- std::vector<SequenceNumber>* snapshots,
- SequenceNumber earliest_write_conflict_snapshot, Env* env,
+ std::vector<SequenceNumber>* snapshots, Env* env,
bool expect_valid_internal_key,
Compaction* compaction = nullptr,
const CompactionFilter* compaction_filter = nullptr,
const Comparator* cmp_;
MergeHelper* merge_helper_;
const std::vector<SequenceNumber>* snapshots_;
- const SequenceNumber earliest_write_conflict_snapshot_;
Env* env_;
bool expect_valid_internal_key_;
Compaction* compaction_;
nullptr, 0U, false, 0));
iter_.reset(new test::VectorIterator(ks, vs));
iter_->SeekToFirst();
- c_iter_.reset(new CompactionIterator(
- iter_.get(), cmp_, merge_helper_.get(), last_sequence, &snapshots_,
- kMaxSequenceNumber, Env::Default(), false));
+ c_iter_.reset(new CompactionIterator(iter_.get(), cmp_, merge_helper_.get(),
+ last_sequence, &snapshots_,
+ Env::Default(), false));
}
const Comparator* cmp_;
std::atomic<bool>* shutting_down, LogBuffer* log_buffer,
Directory* db_directory, Directory* output_directory, Statistics* stats,
std::vector<SequenceNumber> existing_snapshots,
- SequenceNumber earliest_write_conflict_snapshot,
std::shared_ptr<Cache> table_cache, EventLogger* event_logger,
bool paranoid_file_checks, bool measure_io_stats, const std::string& dbname,
CompactionJobStats* compaction_job_stats)
output_directory_(output_directory),
stats_(stats),
existing_snapshots_(std::move(existing_snapshots)),
- earliest_write_conflict_snapshot_(earliest_write_conflict_snapshot),
table_cache_(std::move(table_cache)),
event_logger_(event_logger),
paranoid_file_checks_(paranoid_file_checks),
Status status;
sub_compact->c_iter.reset(new CompactionIterator(
input.get(), cfd->user_comparator(), &merge, versions_->LastSequence(),
- &existing_snapshots_, earliest_write_conflict_snapshot_, env_, false,
- sub_compact->compaction, compaction_filter));
+ &existing_snapshots_, env_, false, sub_compact->compaction,
+ compaction_filter));
auto c_iter = sub_compact->c_iter.get();
c_iter->SeekToFirst();
const auto& c_iter_stats = c_iter->iter_stats();
Directory* db_directory, Directory* output_directory,
Statistics* stats,
std::vector<SequenceNumber> existing_snapshots,
- SequenceNumber earliest_write_conflict_snapshot,
std::shared_ptr<Cache> table_cache, EventLogger* event_logger,
bool paranoid_file_checks, bool measure_io_stats,
const std::string& dbname,
// entirely within s1 and s2, then the earlier version of k1 can be safely
// deleted because that version is not visible in any snapshot.
std::vector<SequenceNumber> existing_snapshots_;
-
- // This is the earliest snapshot that could be used for write-conflict
- // checking by a transaction. For any user-key newer than this snapshot, we
- // should make sure not to remove evidence that a write occured.
- SequenceNumber earliest_write_conflict_snapshot_;
-
std::shared_ptr<Cache> table_cache_;
EventLogger* event_logger_;
LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, db_options_.info_log.get());
mutex_.Lock();
EventLogger event_logger(db_options_.info_log.get());
- CompactionJob compaction_job(
- 0, &compaction, db_options_, env_options_, versions_.get(),
- &shutting_down_, &log_buffer, nullptr, nullptr, nullptr, snapshots,
- kMaxSequenceNumber, table_cache_, &event_logger, false, false, dbname_,
- &compaction_job_stats_);
+ CompactionJob compaction_job(0, &compaction, db_options_, env_options_,
+ versions_.get(), &shutting_down_, &log_buffer,
+ nullptr, nullptr, nullptr, snapshots,
+ table_cache_, &event_logger, false, false,
+ dbname_, &compaction_job_stats_);
VerifyInitializationOfCompactionJobStats(compaction_job_stats_);
// deletion compaction currently not allowed in CompactFiles.
assert(!c->deletion_compaction());
- SequenceNumber earliest_write_conflict_snapshot;
- std::vector<SequenceNumber> snapshot_seqs =
- snapshots_.GetAll(&earliest_write_conflict_snapshot);
-
assert(is_snapshot_supported_ || snapshots_.empty());
CompactionJob compaction_job(
job_context->job_id, c.get(), db_options_, env_options_, versions_.get(),
&shutting_down_, log_buffer, directories_.GetDbDir(),
- directories_.GetDataDir(c->output_path_id()), stats_, snapshot_seqs,
- earliest_write_conflict_snapshot, table_cache_, &event_logger_,
+ directories_.GetDataDir(c->output_path_id()), stats_, snapshots_.GetAll(),
+ table_cache_, &event_logger_,
c->mutable_cf_options()->paranoid_file_checks,
c->mutable_cf_options()->compaction_measure_io_stats, dbname_,
nullptr); // Here we pass a nullptr for CompactionJobStats because
int output_level __attribute__((unused)) = c->output_level();
TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCompaction:NonTrivial",
&output_level);
-
- SequenceNumber earliest_write_conflict_snapshot;
- std::vector<SequenceNumber> snapshot_seqs =
- snapshots_.GetAll(&earliest_write_conflict_snapshot);
-
assert(is_snapshot_supported_ || snapshots_.empty());
CompactionJob compaction_job(
job_context->job_id, c.get(), db_options_, env_options_,
versions_.get(), &shutting_down_, log_buffer, directories_.GetDbDir(),
- directories_.GetDataDir(c->output_path_id()), stats_, snapshot_seqs,
- earliest_write_conflict_snapshot, table_cache_, &event_logger_,
+ directories_.GetDataDir(c->output_path_id()), stats_,
+ snapshots_.GetAll(), table_cache_, &event_logger_,
c->mutable_cf_options()->paranoid_file_checks,
c->mutable_cf_options()->compaction_measure_io_stats, dbname_,
&compaction_job_stats);
return Status::OK();
}
-const Snapshot* DBImpl::GetSnapshot() { return GetSnapshotImpl(false); }
-
-const Snapshot* DBImpl::GetSnapshotForWriteConflictBoundary() {
- return GetSnapshotImpl(true);
-}
-
-const Snapshot* DBImpl::GetSnapshotImpl(bool is_write_conflict_boundary) {
+const Snapshot* DBImpl::GetSnapshot() {
int64_t unix_time = 0;
env_->GetCurrentTime(&unix_time); // Ignore error
SnapshotImpl* s = new SnapshotImpl;
delete s;
return nullptr;
}
- return snapshots_.New(s, versions_->LastSequence(), unix_time,
- is_write_conflict_boundary);
+ return snapshots_.New(s, versions_->LastSequence(), unix_time);
}
void DBImpl::ReleaseSnapshot(const Snapshot* s) {
#endif // ROCKSDB_LITE
- // Similar to GetSnapshot(), but also lets the db know that this snapshot
- // will be used for transaction write-conflict checking. The DB can then
- // make sure not to compact any keys that would prevent a write-conflict from
- // being detected.
- const Snapshot* GetSnapshotForWriteConflictBoundary();
-
// checks if all live files exist on file system and that their file sizes
// match to our in-memory records
virtual Status CheckConsistency();
// helper function to call after some of the logs_ were synced
void MarkLogsSynced(uint64_t up_to, bool synced_dir, const Status& status);
- const Snapshot* GetSnapshotImpl(bool is_write_conflict_boundary);
-
// table_cache_ provides its own synchronization
std::shared_ptr<Cache> table_cache_;
ManagedSnapshot::ManagedSnapshot(DB* db) : db_(db),
snapshot_(db->GetSnapshot()) {}
-ManagedSnapshot::ManagedSnapshot(DB* db, const Snapshot* _snapshot)
- : db_(db), snapshot_(_snapshot) {}
-
ManagedSnapshot::~ManagedSnapshot() {
if (snapshot_) {
db_->ReleaseSnapshot(snapshot_);
SnapshotList* list_; // just for sanity checks
int64_t unix_time_;
-
- // Will this snapshot be used by a Transaction to do write-conflict checking?
- bool is_write_conflict_boundary_;
};
class SnapshotList {
SnapshotImpl* newest() const { assert(!empty()); return list_.prev_; }
const SnapshotImpl* New(SnapshotImpl* s, SequenceNumber seq,
- uint64_t unix_time, bool is_write_conflict_boundary) {
+ uint64_t unix_time) {
s->number_ = seq;
s->unix_time_ = unix_time;
- s->is_write_conflict_boundary_ = is_write_conflict_boundary;
s->list_ = this;
s->next_ = &list_;
s->prev_ = list_.prev_;
}
// retrieve all snapshot numbers. They are sorted in ascending order.
- std::vector<SequenceNumber> GetAll(
- SequenceNumber* oldest_write_conflict_snapshot = nullptr) {
+ std::vector<SequenceNumber> GetAll() {
std::vector<SequenceNumber> ret;
-
- if (oldest_write_conflict_snapshot != nullptr) {
- *oldest_write_conflict_snapshot = kMaxSequenceNumber;
- }
-
if (empty()) {
return ret;
}
SnapshotImpl* s = &list_;
while (s->next_ != &list_) {
ret.push_back(s->next_->number_);
-
- if (oldest_write_conflict_snapshot != nullptr &&
- *oldest_write_conflict_snapshot != kMaxSequenceNumber &&
- s->next_->is_write_conflict_boundary_) {
- // If this is the first write-conflict boundary snapshot in the list,
- // it is the oldest
- *oldest_write_conflict_snapshot = s->next_->number_;
- }
-
s = s->next_;
}
return ret;
public:
explicit ManagedSnapshot(DB* db);
- // Instead of creating a snapshot, take ownership of the input snapshot.
- ManagedSnapshot(DB* db, const Snapshot* _snapshot);
-
~ManagedSnapshot();
const Snapshot* snapshot();
#include "utilities/transactions/transaction_base.h"
-#include "db/db_impl.h"
#include "db/column_family.h"
#include "rocksdb/comparator.h"
#include "rocksdb/db.h"
}
void TransactionBaseImpl::SetSnapshot() {
- assert(dynamic_cast<DBImpl*>(db_) != nullptr);
- auto db_impl = reinterpret_cast<DBImpl*>(db_);
-
- const Snapshot* snapshot = db_impl->GetSnapshotForWriteConflictBoundary();
- snapshot_.reset(new ManagedSnapshot(db_, snapshot));
+ snapshot_.reset(new ManagedSnapshot(db_));
snapshot_needed_ = false;
snapshot_notifier_ = nullptr;
}