bool writable_file_error_;
int num_writable_file_errors_;
- explicit CorruptionFS(const std::shared_ptr<FileSystem>& _target)
+ explicit CorruptionFS(const std::shared_ptr<FileSystem>& _target,
+ bool fs_buffer)
: FileSystemWrapper(_target),
writable_file_error_(false),
num_writable_file_errors_(0),
corruption_trigger_(INT_MAX),
read_count_(0),
- rnd_(300) {}
+ rnd_(300),
+ fs_buffer_(fs_buffer) {}
~CorruptionFS() override {
// Assert that the corruption was reset, which means it got triggered
assert(corruption_trigger_ == INT_MAX);
IOStatus MultiRead(FSReadRequest* reqs, size_t num_reqs,
const IOOptions& options,
IODebugContext* dbg) override {
- return FSRandomAccessFile::MultiRead(reqs, num_reqs, options, dbg);
+ for (size_t i = 0; i < num_reqs; ++i) {
+ FSReadRequest& req = reqs[i];
+ if (fs_.fs_buffer_) {
+ FSAllocationPtr buffer(new char[req.len], [](void* ptr) {
+ delete[] static_cast<char*>(ptr);
+ });
+ req.fs_scratch = std::move(buffer);
+ req.status = Read(req.offset, req.len, options, &req.result,
+ static_cast<char*>(req.fs_scratch.get()), dbg);
+ } else {
+ req.status = Read(req.offset, req.len, options, &req.result,
+ req.scratch, dbg);
+ }
+ }
+ return IOStatus::OK();
}
private:
void SupportedOps(int64_t& supported_ops) override {
supported_ops = 1 << FSSupportedOps::kVerifyAndReconstructRead |
1 << FSSupportedOps::kAsyncIO;
+ if (fs_buffer_) {
+ supported_ops |= 1 << FSSupportedOps::kFSBuffer;
+ }
}
private:
int corruption_trigger_;
int read_count_;
Random rnd_;
+ bool fs_buffer_;
};
} // anonymous namespace
}
#endif // !(defined NDEBUG) || !defined(OS_WIN)
-class DBIOCorruptionTest : public DBIOFailureTest,
- public testing::WithParamInterface<bool> {
+class DBIOCorruptionTest
+ : public DBIOFailureTest,
+ public testing::WithParamInterface<std::tuple<bool, bool>> {
public:
DBIOCorruptionTest() : DBIOFailureTest() {
BlockBasedTableOptions bbto;
base_env_ = env_;
EXPECT_NE(base_env_, nullptr);
- fs_.reset(new CorruptionFS(base_env_->GetFileSystem()));
+ fs_.reset(
+ new CorruptionFS(base_env_->GetFileSystem(), std::get<0>(GetParam())));
env_guard_ = NewCompositeEnv(fs_);
options.env = env_guard_.get();
bbto.num_file_reads_for_auto_readahead = 0;
std::string val;
ReadOptions ro;
- ro.async_io = GetParam();
+ ro.async_io = std::get<1>(GetParam());
ASSERT_OK(dbfull()->Get(ReadOptions(), "key1", &val));
ASSERT_EQ(val, "val1");
}
ReadOptions ro;
ro.readahead_size = 65536;
- ro.async_io = GetParam();
+ ro.async_io = std::get<1>(GetParam());
Iterator* iter = dbfull()->NewIterator(ro);
iter->SeekToFirst();
std::vector<PinnableSlice> values(keys.size());
std::vector<Status> statuses(keys.size());
ReadOptions ro;
- ro.async_io = GetParam();
+ ro.async_io = std::get<1>(GetParam());
dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
keys.data(), values.data(), statuses.data());
ASSERT_EQ(values[0].ToString(), "val1");
std::string val;
ReadOptions ro;
- ro.async_io = GetParam();
+ ro.async_io = std::get<1>(GetParam());
ASSERT_OK(dbfull()->Get(ro, "key1", &val));
ASSERT_EQ(val, "val1");
}
std::string val;
ReadOptions ro;
- ro.async_io = GetParam();
+ ro.async_io = std::get<1>(GetParam());
ASSERT_OK(dbfull()->Get(ro, "key1", &val));
ASSERT_EQ(val, "val1");
}
INSTANTIATE_TEST_CASE_P(DBIOCorruptionTest, DBIOCorruptionTest,
- testing::Bool());
+ testing::Combine(testing::Bool(), testing::Bool()));
} // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) {
// Read a block from the file and verify its checksum. Upon return, io_status_
// will be updated with the status of the read, and slice_ will be updated
// with a pointer to the data.
-void BlockFetcher::ReadBlock(bool retry) {
+void BlockFetcher::ReadBlock(bool retry, FSAllocationPtr& fs_buf) {
FSReadRequest read_req;
IOOptions opts;
io_status_ = file_->PrepareIOOptions(read_options_, opts);
if (io_status_.ok()) {
InsertCompressedBlockToPersistentCacheIfNeeded();
+ fs_buf = std::move(read_req.fs_scratch);
} else {
ReleaseFileSystemProvidedBuffer(&read_req);
direct_io_buf_.reset();
}
IOStatus BlockFetcher::ReadBlockContents() {
- FSReadRequest read_req;
- read_req.status.PermitUncheckedError();
+ FSAllocationPtr fs_buf;
if (TryGetUncompressBlockFromPersistentCache()) {
compression_type_ = kNoCompression;
#ifndef NDEBUG
return io_status_;
}
} else if (!TryGetSerializedBlockFromPersistentCache()) {
- ReadBlock(/*retry =*/false);
+ ReadBlock(/*retry =*/false, fs_buf);
// If the file system supports retry after corruption, then try to
// re-read the block and see if it succeeds.
if (io_status_.IsCorruption() && retry_corrupt_read_) {
- ReadBlock(/*retry=*/true);
+ assert(!fs_buf);
+ ReadBlock(/*retry=*/true, fs_buf);
}
if (!io_status_.ok()) {
+ assert(!fs_buf);
return io_status_;
}
}
}
InsertUncompressedBlockToPersistentCacheIfNeeded();
- ReleaseFileSystemProvidedBuffer(&read_req);
return io_status_;
}
return io_s;
}
if (io_s.ok()) {
+ FSAllocationPtr fs_buf;
// Data Block is already in prefetch.
got_from_prefetch_buffer_ = true;
ProcessTrailerIfPresent();
if (io_status_.IsCorruption() && retry_corrupt_read_) {
got_from_prefetch_buffer_ = false;
- ReadBlock(/*retry = */ true);
+ ReadBlock(/*retry = */ true, fs_buf);
}
if (!io_status_.ok()) {
+ assert(!fs_buf);
return io_status_;
}
used_buf_ = const_cast<char*>(slice_.data());