versions_->SetLastSequence(last_sequence);
}
MemTableInsertStatusCheck(w.status);
- write_thread_.ExitAsBatchGroupLeader(write_group, w.status);
+ write_thread_.ExitAsBatchGroupLeader(write_group, status);
}
if (status.ok()) {
if (!w.CallbackFailed()) {
WriteCallbackStatusCheck(status);
}
- nonmem_write_thread_.ExitAsBatchGroupLeader(write_group, w.status);
+ nonmem_write_thread_.ExitAsBatchGroupLeader(write_group, status);
if (status.ok()) {
status = w.FinalStatus();
}
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
+#include <atomic>
#include <memory>
#include <thread>
#include <vector>
#include "db/db_test_util.h"
#include "db/write_batch_internal.h"
+#include "db/write_thread.h"
+#include "port/port.h"
#include "port/stack_trace.h"
+#include "util/fault_injection_test_env.h"
+#include "util/string_util.h"
#include "util/sync_point.h"
namespace rocksdb {
public:
DBWriteTest() : DBTestBase("/db_write_test") {}
- void Open() { DBTestBase::Reopen(GetOptions(GetParam())); }
+ Options GetOptions() { return DBTestBase::GetOptions(GetParam()); }
+
+ void Open() { DBTestBase::Reopen(GetOptions()); }
};
// Sequence number should be return through input write batch.
}
}
+TEST_P(DBWriteTest, IOErrorOnWALWritePropagateToWriteThreadFollower) {
+ constexpr int kNumThreads = 5;
+ std::unique_ptr<FaultInjectionTestEnv> mock_env(
+ new FaultInjectionTestEnv(Env::Default()));
+ Options options = GetOptions();
+ options.env = mock_env.get();
+ Reopen(options);
+ std::atomic<int> ready_count{0};
+ std::atomic<int> leader_count{0};
+ std::vector<port::Thread> threads;
+ mock_env->SetFilesystemActive(false);
+ // Wait until all threads linked to write threads, to make sure
+ // all threads join the same batch group.
+ SyncPoint::GetInstance()->SetCallBack(
+ "WriteThread::JoinBatchGroup:Wait", [&](void* arg) {
+ ready_count++;
+ auto* w = reinterpret_cast<WriteThread::Writer*>(arg);
+ if (w->state == WriteThread::STATE_GROUP_LEADER) {
+ leader_count++;
+ while (ready_count < kNumThreads) {
+ // busy waiting
+ }
+ }
+ });
+ SyncPoint::GetInstance()->EnableProcessing();
+ for (int i = 0; i < kNumThreads; i++) {
+ threads.push_back(port::Thread(
+ [&](int index) {
+ // All threads should fail.
+ ASSERT_FALSE(Put("key" + ToString(index), "value").ok());
+ },
+ i));
+ }
+ for (int i = 0; i < kNumThreads; i++) {
+ threads[i].join();
+ }
+ ASSERT_EQ(1, leader_count);
+ // Close before mock_env destruct.
+ Close();
+}
+
INSTANTIATE_TEST_CASE_P(DBWriteTestInstance, DBWriteTest,
testing::Values(DBTestBase::kDefault,
DBTestBase::kConcurrentWALWrites,