}
void VerifyDb(ThreadState* /* thread */) const override {}
+
+ void ContinuouslyVerifyDb(ThreadState* /* thread */) const override {}
};
StressTest* CreateBatchedOpsStressTest() { return new BatchedOpsStressTest(); }
DB* db_ptr = cmp_db_ ? cmp_db_ : db_;
const auto& cfhs = cmp_db_ ? cmp_cfhs_ : column_families_;
- const auto ss_deleter = [&](const Snapshot* ss) {
- db_ptr->ReleaseSnapshot(ss);
- };
- std::unique_ptr<const Snapshot, decltype(ss_deleter)> snapshot_guard(
- db_ptr->GetSnapshot(), ss_deleter);
- if (cmp_db_) {
- status = cmp_db_->TryCatchUpWithPrimary();
- }
+
+ // Take a snapshot to preserve the state of primary db.
+ ManagedSnapshot snapshot_guard(db_);
+
SharedState* shared = thread->shared;
assert(shared);
- if (!status.ok()) {
- shared->SetShouldStopTest();
- return;
+
+ if (cmp_db_) {
+ status = cmp_db_->TryCatchUpWithPrimary();
+ if (!status.ok()) {
+ fprintf(stderr, "TryCatchUpWithPrimary: %s\n",
+ status.ToString().c_str());
+ shared->SetShouldStopTest();
+ assert(false);
+ return;
+ }
}
- assert(cmp_db_ || snapshot_guard.get());
+
const auto checksum_column_family = [](Iterator* iter,
uint32_t* checksum) -> Status {
assert(nullptr != checksum);
};
// This `ReadOptions` is for validation purposes. Ignore
// `FLAGS_rate_limit_user_ops` to avoid slowing any validation.
- ReadOptions ropts;
+ ReadOptions ropts(FLAGS_verify_checksum, true);
ropts.total_order_seek = true;
- ropts.snapshot = snapshot_guard.get();
+ if (nullptr == cmp_db_) {
+ ropts.snapshot = snapshot_guard.snapshot();
+ }
uint32_t crc = 0;
{
// Compute crc for all key-values of default column family.
std::unique_ptr<Iterator> it(db_ptr->NewIterator(ropts));
status = checksum_column_family(it.get(), &crc);
+ if (!status.ok()) {
+ fprintf(stderr, "Computing checksum of default cf: %s\n",
+ status.ToString().c_str());
+ assert(false);
+ }
}
- uint32_t tmp_crc = 0;
- if (status.ok()) {
+ // Since we currently intentionally disallow reading from the secondary
+ // instance with snapshot, we cannot achieve cross-cf consistency if WAL is
+ // enabled because there is no guarantee that secondary instance replays
+ // the primary's WAL to a consistent point where all cfs have the same
+ // data.
+ if (status.ok() && FLAGS_disable_wal) {
+ uint32_t tmp_crc = 0;
for (ColumnFamilyHandle* cfh : cfhs) {
if (cfh == db_ptr->DefaultColumnFamily()) {
continue;
break;
}
}
- }
- if (!status.ok() || tmp_crc != crc) {
- shared->SetShouldStopTest();
+ if (!status.ok()) {
+ fprintf(stderr, "status: %s\n", status.ToString().c_str());
+ shared->SetShouldStopTest();
+ assert(false);
+ } else if (tmp_crc != crc) {
+ fprintf(stderr, "tmp_crc=%" PRIu32 " crc=%" PRIu32 "\n", tmp_crc, crc);
+ shared->SetShouldStopTest();
+ assert(false);
+ }
}
}
+#else // ROCKSDB_LITE
+ void ContinuouslyVerifyDb(ThreadState* /*thread*/) const override {}
#endif // !ROCKSDB_LITE
std::vector<int> GenerateColumnFamilies(
DECLARE_uint64(log2_keys_per_lock);
DECLARE_uint64(max_manifest_file_size);
DECLARE_bool(in_place_update);
-DECLARE_int32(secondary_catch_up_one_in);
DECLARE_string(memtablerep);
DECLARE_int32(prefix_size);
DECLARE_bool(use_merge);
}
}
- if (!stress->VerifySecondaries()) {
- return false;
- }
-
if (shared.HasVerificationFailedYet()) {
fprintf(stderr, "Verification failed :(\n");
return false;
DEFINE_string(secondaries_base, "",
"Use this path as the base path for secondary instances.");
-DEFINE_bool(test_secondary, false, "Test secondary instance.");
+DEFINE_bool(test_secondary, false,
+ "If true, start an additional secondary instance which can be used "
+ "for verification.");
DEFINE_string(
expected_values_dir, "",
DEFINE_bool(in_place_update, false, "On true, does inplace update in memtable");
-DEFINE_int32(secondary_catch_up_one_in, 0,
- "If non-zero, the secondaries attemp to catch up with the primary "
- "once for every N operations on average. 0 indicates the "
- "secondaries do not try to catch up after open.");
-
DEFINE_string(memtablerep, "skip_list", "");
inline static bool ValidatePrefixSize(const char* flagname, int32_t value) {
column_families_.clear();
delete db_;
- assert(secondaries_.size() == secondary_cfh_lists_.size());
- size_t n = secondaries_.size();
- for (size_t i = 0; i != n; ++i) {
- for (auto* cf : secondary_cfh_lists_[i]) {
- delete cf;
- }
- secondary_cfh_lists_[i].clear();
- delete secondaries_[i];
- }
- secondaries_.clear();
-
for (auto* cf : cmp_cfhs_) {
delete cf;
}
}
}
-bool StressTest::VerifySecondaries() {
-#ifndef ROCKSDB_LITE
- if (FLAGS_test_secondary) {
- uint64_t now = clock_->NowMicros();
- fprintf(stdout, "%s Start to verify secondaries against primary\n",
- clock_->TimeToString(static_cast<uint64_t>(now) / 1000000).c_str());
- }
- for (size_t k = 0; k != secondaries_.size(); ++k) {
- Status s = secondaries_[k]->TryCatchUpWithPrimary();
- if (!s.ok()) {
- fprintf(stderr, "Secondary failed to catch up with primary\n");
- return false;
- }
- // This `ReadOptions` is for validation purposes. Ignore
- // `FLAGS_rate_limit_user_ops` to avoid slowing any validation.
- ReadOptions ropts;
- ropts.total_order_seek = true;
- // Verify only the default column family since the primary may have
- // dropped other column families after most recent reopen.
- std::unique_ptr<Iterator> iter1(db_->NewIterator(ropts));
- std::unique_ptr<Iterator> iter2(secondaries_[k]->NewIterator(ropts));
- for (iter1->SeekToFirst(), iter2->SeekToFirst();
- iter1->Valid() && iter2->Valid(); iter1->Next(), iter2->Next()) {
- if (iter1->key().compare(iter2->key()) != 0 ||
- iter1->value().compare(iter2->value())) {
- fprintf(stderr,
- "Secondary %d contains different data from "
- "primary.\nPrimary: %s : %s\nSecondary: %s : %s\n",
- static_cast<int>(k),
- iter1->key().ToString(/*hex=*/true).c_str(),
- iter1->value().ToString(/*hex=*/true).c_str(),
- iter2->key().ToString(/*hex=*/true).c_str(),
- iter2->value().ToString(/*hex=*/true).c_str());
- return false;
- }
- }
- if (iter1->Valid() && !iter2->Valid()) {
- fprintf(stderr,
- "Secondary %d record count is smaller than that of primary\n",
- static_cast<int>(k));
- return false;
- } else if (!iter1->Valid() && iter2->Valid()) {
- fprintf(stderr,
- "Secondary %d record count is larger than that of primary\n",
- static_cast<int>(k));
- return false;
- }
- }
- if (FLAGS_test_secondary) {
- uint64_t now = clock_->NowMicros();
- fprintf(stdout, "%s Verification of secondaries succeeded\n",
- clock_->TimeToString(static_cast<uint64_t>(now) / 1000000).c_str());
- }
-#endif // ROCKSDB_LITE
- return true;
-}
-
Status StressTest::AssertSame(DB* db, ColumnFamilyHandle* cf,
ThreadState::SnapshotState& snap_state) {
Status s;
TestCustomOperations(thread, rand_column_families);
}
thread->stats.FinishedSingleOp();
-#ifndef ROCKSDB_LITE
- uint32_t tid = thread->tid;
- assert(secondaries_.empty() ||
- static_cast<size_t>(tid) < secondaries_.size());
- if (thread->rand.OneInOpt(FLAGS_secondary_catch_up_one_in)) {
- Status s = secondaries_[tid]->TryCatchUpWithPrimary();
- if (!s.ok()) {
- VerificationAbort(shared, "Secondary instance failed to catch up", s);
- break;
- }
- }
-#endif
}
}
while (!thread->snapshot_queue.empty()) {
assert(trans.size() == 0);
#endif
}
- assert(!s.ok() || column_families_.size() ==
- static_cast<size_t>(FLAGS_column_families));
+ assert(s.ok());
+ assert(column_families_.size() ==
+ static_cast<size_t>(FLAGS_column_families));
- if (s.ok() && FLAGS_test_secondary) {
-#ifndef ROCKSDB_LITE
- secondaries_.resize(FLAGS_threads);
- std::fill(secondaries_.begin(), secondaries_.end(), nullptr);
- secondary_cfh_lists_.clear();
- secondary_cfh_lists_.resize(FLAGS_threads);
- Options tmp_opts;
- // TODO(yanqin) support max_open_files != -1 for secondary instance.
- tmp_opts.max_open_files = -1;
- tmp_opts.statistics = dbstats_secondaries;
- tmp_opts.env = db_stress_env;
- for (size_t i = 0; i != static_cast<size_t>(FLAGS_threads); ++i) {
- const std::string secondary_path =
- FLAGS_secondaries_base + "/" + std::to_string(i);
- s = DB::OpenAsSecondary(tmp_opts, FLAGS_db, secondary_path,
- cf_descriptors, &secondary_cfh_lists_[i],
- &secondaries_[i]);
- if (!s.ok()) {
- break;
- }
- }
-#else
- fprintf(stderr, "Secondary is not supported in RocksDBLite\n");
- exit(1);
-#endif
- }
// Secondary instance does not support write-prepared/write-unprepared
// transactions, thus just disable secondary instance if we use
// transaction.
- if (s.ok() && FLAGS_continuous_verification_interval > 0 &&
- !FLAGS_use_txn && !cmp_db_) {
+ if (s.ok() && FLAGS_test_secondary && !FLAGS_use_txn) {
+#ifndef ROCKSDB_LITE
Options tmp_opts;
// TODO(yanqin) support max_open_files != -1 for secondary instance.
tmp_opts.max_open_files = -1;
tmp_opts.env = db_stress_env;
- std::string secondary_path = FLAGS_secondaries_base + "/cmp_database";
+ const std::string& secondary_path = FLAGS_secondaries_base;
s = DB::OpenAsSecondary(tmp_opts, FLAGS_db, secondary_path,
cf_descriptors, &cmp_cfhs_, &cmp_db_);
- assert(!s.ok() ||
- cmp_cfhs_.size() == static_cast<size_t>(FLAGS_column_families));
+ assert(s.ok());
+ assert(cmp_cfhs_.size() == static_cast<size_t>(FLAGS_column_families));
+#else
+ fprintf(stderr, "Secondary is not supported in RocksDBLite\n");
+ exit(1);
+#endif // !ROCKSDB_LITE
}
} else {
#ifndef ROCKSDB_LITE
DBWithTTL* db_with_ttl;
s = DBWithTTL::Open(options_, FLAGS_db, &db_with_ttl, FLAGS_ttl);
db_ = db_with_ttl;
- if (FLAGS_test_secondary) {
- secondaries_.resize(FLAGS_threads);
- std::fill(secondaries_.begin(), secondaries_.end(), nullptr);
- Options tmp_opts;
- tmp_opts.env = options_.env;
- // TODO(yanqin) support max_open_files != -1 for secondary instance.
- tmp_opts.max_open_files = -1;
- for (size_t i = 0; i != static_cast<size_t>(FLAGS_threads); ++i) {
- const std::string secondary_path =
- FLAGS_secondaries_base + "/" + std::to_string(i);
- s = DB::OpenAsSecondary(tmp_opts, FLAGS_db, secondary_path,
- &secondaries_[i]);
- if (!s.ok()) {
- break;
- }
- }
- }
#else
fprintf(stderr, "TTL is not supported in RocksDBLite\n");
exit(1);
txn_db_ = nullptr;
#endif
- assert(secondaries_.size() == secondary_cfh_lists_.size());
- size_t n = secondaries_.size();
- for (size_t i = 0; i != n; ++i) {
- for (auto* cf : secondary_cfh_lists_[i]) {
- delete cf;
- }
- secondary_cfh_lists_[i].clear();
- delete secondaries_[i];
- }
- secondaries_.clear();
-
num_times_reopened_++;
auto now = clock_->NowMicros();
fprintf(stdout, "%s Reopening database for the %dth time\n",
fprintf(stderr, "TransactionDB does not support timestamp yet.\n");
exit(1);
}
- if (FLAGS_read_only) {
- fprintf(stderr, "When opened as read-only, timestamp not supported.\n");
- exit(1);
- }
- if (FLAGS_test_secondary || FLAGS_secondary_catch_up_one_in > 0 ||
- FLAGS_continuous_verification_interval > 0) {
- fprintf(stderr, "Secondary instance does not support timestamp.\n");
- exit(1);
- }
#ifndef ROCKSDB_LITE
if (FLAGS_enable_blob_files || FLAGS_use_blob_db) {
fprintf(stderr, "BlobDB not supported with timestamp.\n");
void TrackExpectedState(SharedState* shared);
- // Return false if verification fails.
- bool VerifySecondaries();
-
void OperateDb(ThreadState* thread);
virtual void VerifyDb(ThreadState* thread) const = 0;
- virtual void ContinuouslyVerifyDb(ThreadState* /*thread*/) const {}
+ virtual void ContinuouslyVerifyDb(ThreadState* /*thread*/) const = 0;
void PrintStatistics();
std::vector<std::string> options_index_;
std::atomic<bool> db_preload_finished_;
- // Fields used for stress-testing secondary instance in the same process
- std::vector<DB*> secondaries_;
- std::vector<std::vector<ColumnFamilyHandle*>> secondary_cfh_lists_;
-
// Fields used for continuous verification from another thread
DB* cmp_db_;
std::vector<ColumnFamilyHandle*> cmp_cfhs_;
FLAGS_secondaries_base = default_secondaries_path;
}
- if (!FLAGS_test_secondary && FLAGS_secondary_catch_up_one_in > 0) {
- fprintf(
- stderr,
- "Must set -test_secondary=true if secondary_catch_up_one_in > 0.\n");
- exit(1);
- }
if (FLAGS_best_efforts_recovery && !FLAGS_skip_verifydb &&
!FLAGS_disable_wal) {
fprintf(stderr,
}
}
+#ifndef ROCKSDB_LITE
+ void ContinuouslyVerifyDb(ThreadState* thread) const override {
+ if (!cmp_db_) {
+ return;
+ }
+ assert(cmp_db_);
+ assert(!cmp_cfhs_.empty());
+ Status s = cmp_db_->TryCatchUpWithPrimary();
+ if (!s.ok()) {
+ assert(false);
+ exit(1);
+ }
+
+ const auto checksum_column_family = [](Iterator* iter,
+ uint32_t* checksum) -> Status {
+ assert(nullptr != checksum);
+ uint32_t ret = 0;
+ for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
+ ret = crc32c::Extend(ret, iter->key().data(), iter->key().size());
+ ret = crc32c::Extend(ret, iter->value().data(), iter->value().size());
+ }
+ *checksum = ret;
+ return iter->status();
+ };
+
+ auto* shared = thread->shared;
+ assert(shared);
+ const int64_t max_key = shared->GetMaxKey();
+ ReadOptions read_opts(FLAGS_verify_checksum, true);
+ std::string ts_str;
+ Slice ts;
+ if (FLAGS_user_timestamp_size > 0) {
+ ts_str = GenerateTimestampForRead();
+ ts = ts_str;
+ read_opts.timestamp = &ts;
+ }
+
+ static Random64 rand64(shared->GetSeed());
+
+ {
+ uint32_t crc = 0;
+ std::unique_ptr<Iterator> it(cmp_db_->NewIterator(read_opts));
+ s = checksum_column_family(it.get(), &crc);
+ if (!s.ok()) {
+ fprintf(stderr, "Computing checksum of default cf: %s\n",
+ s.ToString().c_str());
+ assert(false);
+ }
+ }
+
+ for (auto* handle : cmp_cfhs_) {
+ if (thread->rand.OneInOpt(3)) {
+ // Use Get()
+ uint64_t key = rand64.Uniform(static_cast<uint64_t>(max_key));
+ std::string key_str = Key(key);
+ std::string value;
+ std::string key_ts;
+ s = cmp_db_->Get(read_opts, handle, key_str, &value,
+ FLAGS_user_timestamp_size > 0 ? &key_ts : nullptr);
+ s.PermitUncheckedError();
+ } else {
+ // Use range scan
+ std::unique_ptr<Iterator> iter(cmp_db_->NewIterator(read_opts, handle));
+ uint32_t rnd = (thread->rand.Next()) % 4;
+ if (0 == rnd) {
+ // SeekToFirst() + Next()*5
+ read_opts.total_order_seek = true;
+ iter->SeekToFirst();
+ for (int i = 0; i < 5 && iter->Valid(); ++i, iter->Next()) {
+ }
+ } else if (1 == rnd) {
+ // SeekToLast() + Prev()*5
+ read_opts.total_order_seek = true;
+ iter->SeekToLast();
+ for (int i = 0; i < 5 && iter->Valid(); ++i, iter->Prev()) {
+ }
+ } else if (2 == rnd) {
+ // Seek() +Next()*5
+ uint64_t key = rand64.Uniform(static_cast<uint64_t>(max_key));
+ std::string key_str = Key(key);
+ iter->Seek(key_str);
+ for (int i = 0; i < 5 && iter->Valid(); ++i, iter->Next()) {
+ }
+ } else {
+ // SeekForPrev() + Prev()*5
+ uint64_t key = rand64.Uniform(static_cast<uint64_t>(max_key));
+ std::string key_str = Key(key);
+ iter->SeekForPrev(key_str);
+ for (int i = 0; i < 5 && iter->Valid(); ++i, iter->Prev()) {
+ }
+ }
+ }
+ }
+ }
+#else
+ void ContinuouslyVerifyDb(ThreadState* /*thread*/) const override {}
+#endif // ROCKSDB_LITE
+
void MaybeClearOneColumnFamily(ThreadState* thread) override {
if (FLAGS_column_families > 1) {
if (thread->rand.OneInOpt(FLAGS_clear_column_family_one_in)) {
"use_merge": 0,
"use_full_merge_v1": 0,
"use_txn": 0,
- "secondary_catch_up_one_in": 0,
- "continuous_verification_interval": 0,
"enable_blob_files": 0,
"use_blob_db": 0,
"enable_compaction_filter": 0,