#include "db_stress_tool/db_stress_common.h"
namespace ROCKSDB_NAMESPACE {
-class DbStressEnvWrapper : public EnvWrapper {
+class DbStressFSWrapper : public FileSystemWrapper {
public:
- explicit DbStressEnvWrapper(Env* t) : EnvWrapper(t) {}
- static const char* kClassName() { return "DbStressEnv"; }
+ explicit DbStressFSWrapper(const std::shared_ptr<FileSystem>& t)
+ : FileSystemWrapper(t) {}
+ static const char* kClassName() { return "DbStressFS"; }
const char* Name() const override { return kClassName(); }
- Status DeleteFile(const std::string& f) override {
+ IOStatus DeleteFile(const std::string& f, const IOOptions& opts,
+ IODebugContext* dbg) override {
// We determine whether it is a manifest file by searching a strong,
// so that there will be false positive if the directory path contains the
// keyword but it is unlikely.
f.find("checkpoint") != std::string::npos ||
f.find(".backup") != std::string::npos ||
f.find(".restore") != std::string::npos) {
- return target()->DeleteFile(f);
+ return target()->DeleteFile(f, opts, dbg);
}
// Rename the file instead of deletion to keep the history, and
// at the same time it is not visible to RocksDB.
- return target()->RenameFile(f, f + "_renamed_");
+ return target()->RenameFile(f, f + "_renamed_", opts, dbg);
}
// If true, all manifest files will not be delted in DeleteFile().
TestIterateAgainstExpected(thread, read_opts, rand_column_families,
rand_keys);
} else {
- int num_seeks = static_cast<int>(
- std::min(static_cast<uint64_t>(thread->rand.Uniform(4)),
- FLAGS_ops_per_thread - i - 1));
+ int num_seeks = static_cast<int>(std::min(
+ std::max(static_cast<uint64_t>(thread->rand.Uniform(4)),
+ static_cast<uint64_t>(1)),
+ std::max(static_cast<uint64_t>(FLAGS_ops_per_thread - i - 1),
+ static_cast<uint64_t>(1))));
rand_keys = GenerateNKeys(thread, num_seeks, i);
i += num_seeks - 1;
TestIterate(thread, read_opts, rand_column_families, rand_keys);
FLAGS_options_file.c_str(), s.ToString().c_str());
exit(1);
}
- db_options.env = new DbStressEnvWrapper(db_stress_env);
+ db_options.env = new CompositeEnvWrapper(db_stress_env);
options = Options(db_options, cf_descriptors[0].options);
return true;
}
namespace ROCKSDB_NAMESPACE {
namespace {
static std::shared_ptr<ROCKSDB_NAMESPACE::Env> env_guard;
-static std::shared_ptr<ROCKSDB_NAMESPACE::DbStressEnvWrapper> env_wrapper_guard;
-static std::shared_ptr<ROCKSDB_NAMESPACE::DbStressEnvWrapper>
+static std::shared_ptr<ROCKSDB_NAMESPACE::CompositeEnvWrapper>
+ env_wrapper_guard;
+static std::shared_ptr<ROCKSDB_NAMESPACE::CompositeEnvWrapper>
dbsl_env_wrapper_guard;
static std::shared_ptr<CompositeEnvWrapper> fault_env_guard;
} // namespace
s.ToString().c_str());
exit(1);
}
- dbsl_env_wrapper_guard = std::make_shared<DbStressEnvWrapper>(raw_env);
+ dbsl_env_wrapper_guard = std::make_shared<CompositeEnvWrapper>(raw_env);
db_stress_listener_env = dbsl_env_wrapper_guard.get();
if (FLAGS_read_fault_one_in || FLAGS_sync_fault_injection ||
raw_env = fault_env_guard.get();
}
- env_wrapper_guard = std::make_shared<DbStressEnvWrapper>(raw_env);
+ env_wrapper_guard = std::make_shared<CompositeEnvWrapper>(
+ raw_env, std::make_shared<DbStressFSWrapper>(raw_env->GetFileSystem()));
db_stress_env = env_wrapper_guard.get();
- if (FLAGS_write_fault_one_in) {
- // In the write injection case, we need to use the FS interface and returns
- // the IOStatus with different error and flags. Therefore,
- // DbStressEnvWrapper cannot be used which will swallow the FS
- // implementations. We should directly use the raw_env which is the
- // CompositeEnvWrapper of env and fault_fs.
- db_stress_env = raw_env;
- }
-
FLAGS_rep_factory = StringToRepFactory(FLAGS_memtablerep.c_str());
// The number of background threads should be at least as much the
uint64_t count = 0;
Status s;
+ if (fault_fs_guard) {
+ fault_fs_guard->EnableErrorInjection();
+ SharedState::ignore_read_error = false;
+ }
+
for (iter->Seek(prefix); iter->Valid() && iter->key().starts_with(prefix);
iter->Next()) {
++count;
s = iter->status();
}
- if (!s.ok()) {
+ uint64_t error_count = 0;
+ if (fault_fs_guard) {
+ error_count = fault_fs_guard->GetAndResetErrorCount();
+ }
+ if (!s.ok() && (!fault_fs_guard || (fault_fs_guard && !error_count))) {
fprintf(stderr, "TestPrefixScan error: %s\n", s.ToString().c_str());
thread->stats.AddErrors(1);
return s;
}
+ if (fault_fs_guard) {
+ fault_fs_guard->DisableErrorInjection();
+ }
thread->stats.AddPrefixes(1, count);
return Status::OK();
#include "test_util/sync_point.h"
#include "util/coding.h"
#include "util/crc32c.h"
+#include "util/mutexlock.h"
#include "util/random.h"
#include "util/string_util.h"
#include "util/xxhash.h"
return s;
}
+IOStatus TestFSRandomAccessFile::ReadAsync(
+ FSReadRequest& req, const IOOptions& opts,
+ std::function<void(const FSReadRequest&, void*)> cb, void* cb_arg,
+ void** io_handle, IOHandleDeleter* del_fn, IODebugContext* /*dbg*/) {
+ IOStatus ret;
+ IOStatus s;
+ FSReadRequest res;
+ if (!fs_->IsFilesystemActive()) {
+ ret = fs_->GetError();
+ } else {
+ ret = fs_->InjectThreadSpecificReadError(
+ FaultInjectionTestFS::ErrorOperation::kRead, &res.result,
+ use_direct_io(), req.scratch, /*need_count_increase=*/true,
+ /*fault_injected=*/nullptr);
+ }
+ if (ret.ok()) {
+ if (fs_->ShouldInjectRandomReadError()) {
+ ret = IOStatus::IOError("Injected read error");
+ } else {
+ s = target_->ReadAsync(req, opts, cb, cb_arg, io_handle, del_fn, nullptr);
+ }
+ }
+ if (!ret.ok()) {
+ res.status = ret;
+ cb(res, cb_arg);
+ }
+ return s;
+}
+
IOStatus TestFSRandomAccessFile::MultiRead(FSReadRequest* reqs, size_t num_reqs,
const IOOptions& options,
IODebugContext* dbg) {
return io_s;
}
+IOStatus FaultInjectionTestFS::Poll(std::vector<void*>& io_handles,
+ size_t min_completions) {
+ return target()->Poll(io_handles, min_completions);
+}
+
+IOStatus FaultInjectionTestFS::AbortIO(std::vector<void*>& io_handles) {
+ return target()->AbortIO(io_handles);
+}
+
void FaultInjectionTestFS::WritableFileClosed(const FSFileState& state) {
MutexLock l(&mutex_);
if (open_managed_files_.find(state.filename_) != open_managed_files_.end()) {
IOStatus Read(uint64_t offset, size_t n, const IOOptions& options,
Slice* result, char* scratch,
IODebugContext* dbg) const override;
+ IOStatus ReadAsync(FSReadRequest& req, const IOOptions& opts,
+ std::function<void(const FSReadRequest&, void*)> cb,
+ void* cb_arg, void** io_handle, IOHandleDeleter* del_fn,
+ IODebugContext* dbg) override;
IOStatus MultiRead(FSReadRequest* reqs, size_t num_reqs,
const IOOptions& options, IODebugContext* dbg) override;
size_t GetRequiredBufferAlignment() const override {
return io_s;
}
+ virtual IOStatus Poll(std::vector<void*>& io_handles,
+ size_t min_completions) override;
+
+ virtual IOStatus AbortIO(std::vector<void*>& io_handles) override;
+
void WritableFileClosed(const FSFileState& state);
void WritableFileSynced(const FSFileState& state);